[mvapich-discuss] mvapich jobs cleanup

Mark Potts potts at hpcapplications.com
Sun Jul 15 22:52:03 EDT 2007


Sayantan,
    Of necessity I am keeping a separate copy of MVAPICH with your patch
    and had a stale reference to the old, "official" shared libs in my
    builds/runs.

    When I straightened that out, all runs fine.  I've tried several ways
    of causing jobs to abort and your fix caught them all and killed the
    remote processes.  Looks good.

    Thanks.
          regards,


Sayantan Sur wrote:
> Hi Mark,
> 
> Thanks for trying out the patch. I'm wondering if you plugged in the 
> patch and recompiled everything (MPI libraries, and benchmarks)?
> 
> Thanks,
> Sayantan.
> 
> Mark Potts wrote:
>> Sayantan,
>>    No luck yet with that mpirun_rsh patch.
>>    I'm getting:
>>    "mpirun: executable version 5 does not match our version 6"
>>    "Killing remote processes...DONE"
>>    as soon as I attempt to start up the new mpirun_rsh.
>>
>>    I can see that the patch changes the version number but can't yet
>>    understand to whom "executable version" and "our" refer.
>>    Hints?  Directions?
>>
>>            regards,
>>
>> Sayantan Sur wrote:
>>> Hi Mark,
>>>
>>> We have a patch to solve this stray process issue with MVAPICH-0.9.9. 
>>> I'm attaching the patch with this email. To apply the patch please 
>>> follow these steps:
>>>
>>> $ cd mvapich-0.9.9
>>> $ #save mpirun_rsh_patch to this directory
>>> $ patch -p1 < mpirun_rsh_patch
>>>
>>> Could you please let us know if this patch solves the problem for you?
>>>
>>> Thanks,
>>> Sayantan.
>>>
>>>
>>> Mark Potts wrote:
>>>> Hi,
>>>>    We are observing a number of cases in which MVAPICH-0.9.9
>>>>    jobs launched with mpirun_rsh leave stray processes on some
>>>>    nodes when the job terminates abnormally.  Those stray
>>>>    processes continue to run forever and require recognition
>>>>    and killing.
>>>>
>>>>    Is there a reason this happens with MVAPICH, and is there a
>>>>    way to prevent it.  This doesn't seem to be the behavior
>>>>    that occurs for abnormally terminated Voltaire MPI or Intel
>>>>    MPI jobs.
>>>>          regards,
>>>
>>>
>>>
>>> ------------------------------------------------------------------------
>>>
>>> diff -ruN 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c 
>>> exp1/mpid/ch_gen2/process/mpirun_rsh.c
>>> --- 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c    2007-05-29 
>>> 03:47:10.000000000 -0400
>>> +++ exp1/mpid/ch_gen2/process/mpirun_rsh.c    2007-07-09 
>>> 11:56:32.000000000 -0400
>>> @@ -59,6 +59,7 @@
>>>  #define _GNU_SOURCE
>>>  #include <getopt.h>
>>>  #include <stdlib.h>
>>> +#include <stddef.h>
>>>  #include <stdio.h>
>>>  #include <signal.h>
>>>  #include <unistd.h>
>>> @@ -91,20 +92,34 @@
>>>  typedef struct {
>>>      char *hostname;
>>>      char *device;
>>> -    int pid;
>>> +    pid_t pid;
>>> +    pid_t remote_pid;
>>>      int port;
>>>      int control_socket;
>>>      process_state state;
>>>  } process;
>>>  
>>> +typedef struct {
>>> +    const char * hostname;
>>> +    pid_t * pids;
>>> +    size_t npids, npids_allocated;
>>> +} process_group;
>>> +
>>> +typedef struct {
>>> +    process_group * data;
>>> +    process_group ** index;
>>> +    size_t npgs, npgs_allocated;
>>> +} process_groups;
>>> +
>>>  #define RUNNING(i) ((plist[i].state == P_STARTED ||                 \
>>>              plist[i].state == P_CONNECTED ||                        \
>>>              plist[i].state == P_RUNNING) ? 1 : 0)
>>>  
>>>  /* other information: a.out and rank are implicit. */
>>>  
>>> -process *plist;
>>> -int nprocs;
>>> +process_groups * pglist = NULL;
>>> +process * plist = NULL;
>>> +int nprocs = 0;
>>>  int aout_index, port;
>>>  #define MAX_WD_LEN 256
>>>  char wd[MAX_WD_LEN];            /* working directory of current 
>>> process */
>>> @@ -112,11 +127,19 @@
>>>  char mpirun_host[MAX_HOST_LEN]; /* hostname of current process */
>>>  /* xxx need to add checking for string overflow, do this more 
>>> carefully ... */
>>>  
>>> +/*
>>> + * Message notifying user of what timed out
>>> + */
>>> +static const char * alarm_msg = NULL;
>>>  
>>>  #define COMMAND_LEN 2000
>>>  #define SEPARATOR ':'
>>>  
>>> -
>>> +void free_memory(void);
>>> +void pglist_print(void);
>>> +void pglist_insert(const char * const, const pid_t const);
>>> +void rkill_fast(void);
>>> +void rkill_linear(void);
>>>  void cleanup_handler(int);
>>>  void nostop_handler(int);
>>>  void alarm_handler(int);
>>> @@ -239,15 +262,19 @@
>>>      int    hostname_len = 0;
>>>      totalview_cmd[199] = 0;
>>>      display[0]='\0';   -     +    pidglen = sizeof(pid_t); +
>>>      /* mpirun [-debug] [-xterm] -np N [-hostfile hfile | h1 h2 h3 
>>> ... hN] a.out [args] */
>>>  
>>> +    atexit(free_memory);
>>> +
>>>      do {
>>>          c = getopt_long_only(argc, argv, "+", option_table, 
>>> &option_index);
>>>          switch (c) {
>>>          case '?':
>>>          case ':':
>>>              usage();
>>> +        exit(EXIT_FAILURE);
>>>              break;
>>>          case EOF:
>>>              break;
>>> @@ -255,8 +282,10 @@
>>>              switch (option_index) {
>>>              case 0:
>>>                  nprocs = atoi(optarg);
>>> -                if (nprocs < 1)
>>> +                if (nprocs < 1) {
>>>                      usage();
>>> +            exit(EXIT_FAILURE);
>>> +        }
>>>                  break;
>>>              case 1:
>>>                  debug_on = 1;
>>> @@ -290,11 +319,11 @@
>>>              case 8:
>>>                  show_version();
>>>                  usage();
>>> -                exit(0);
>>> +                exit(EXIT_SUCCESS);
>>>                  break;
>>>              case 9:
>>>                  show_version();
>>> -                exit(0);
>>> +                exit(EXIT_SUCCESS);
>>>                  break;
>>>           case 10:
>>>           use_totalview = 1;
>>> @@ -311,17 +340,19 @@
>>>            break;
>>>              case 11:
>>>                  usage();
>>> -                exit(0);
>>> +                exit(EXIT_SUCCESS);
>>>                  break;
>>>              default:
>>>                  fprintf(stderr, "Unknown option\n");
>>>                  usage();
>>> +        exit(EXIT_FAILURE);
>>>                  break;
>>>              }
>>>              break;
>>>          default:
>>>              fprintf(stderr, "Unreachable statement!\n");
>>>              usage();
>>> +        exit(EXIT_FAILURE);
>>>              break;
>>>          }
>>>      } while (c != EOF);
>>> @@ -332,7 +363,7 @@
>>>              fprintf(stderr, "Without hostfile option, hostnames must 
>>> be "
>>>                      "specified on command line.\n");
>>>              usage();
>>> -            exit(1);
>>> +            exit(EXIT_FAILURE);
>>>          }
>>>          aout_index = nprocs + optind;
>>>      } else {
>>> @@ -361,13 +392,14 @@
>>>      plist = malloc(nprocs * sizeof(process));
>>>      if (plist == NULL) {
>>>          perror("malloc");
>>> -        exit(1);
>>> +        exit(EXIT_FAILURE);
>>>      }
>>>  
>>>      for (i = 0; i < nprocs; i++) {
>>>          plist[i].state = P_NOTSTARTED;
>>>          plist[i].device = NULL;
>>>          plist[i].port = -1;
>>> +    plist[i].remote_pid = 0;
>>>      }
>>>  
>>>      /* grab hosts from command line or file */
>>> @@ -376,7 +408,7 @@
>>>          hostname_len = read_hostfile(hostfile);
>>>      } else {
>>>          for (i = 0; i < nprocs; i++) {
>>> -            plist[i].hostname = argv[optind + i];
>>> +            plist[i].hostname = (char *)strndup(argv[optind + i], 100);
>>>              hostname_len = hostname_len > strlen(plist[i].hostname) ?
>>>                  hostname_len : strlen(plist[i].hostname);          }
>>> @@ -388,7 +420,7 @@
>>>                   if (!mpirun_processes) {
>>>              perror("malloc");
>>> -            exit(1);
>>> +            exit(EXIT_FAILURE);
>>>          } else {              memset(mpirun_processes, 0, nprocs * 
>>> (hostname_len + 4));
>>>          }
>>> @@ -412,18 +444,18 @@
>>>      s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
>>>      if (s < 0) {
>>>          perror("socket");
>>> -        exit(1);
>>> +        exit(EXIT_FAILURE);
>>>      }
>>>      sockaddr.sin_addr.s_addr = INADDR_ANY;
>>>      sockaddr.sin_port = 0;
>>>      if (bind(s, (struct sockaddr *) &sockaddr, sockaddr_len) < 0) {
>>>          perror("bind");
>>> -        exit(1);
>>> +        exit(EXIT_FAILURE);
>>>      }
>>>  
>>>      if (getsockname(s, (struct sockaddr *) &sockaddr, &sockaddr_len) 
>>> < 0) {
>>>          perror("getsockname");
>>> -        exit(1);
>>> +        exit(EXIT_FAILURE);
>>>      }
>>>  
>>>      port = (int) ntohs(sockaddr.sin_port);
>>> @@ -431,14 +463,31 @@
>>>  
>>>  
>>>      if (!show_on) {
>>> -    signal(SIGHUP, cleanup_handler);
>>> -    signal(SIGINT, cleanup_handler);
>>> -    signal(SIGTSTP, nostop_handler);
>>> -    signal(SIGCHLD, child_handler);
>>> -    signal(SIGALRM, alarm_handler);
>>> +    struct sigaction signal_handler;
>>> +    signal_handler.sa_handler = cleanup_handler;
>>> +    sigfillset(&signal_handler.sa_mask);
>>> +    signal_handler.sa_flags = 0;
>>> +
>>> +    sigaction(SIGHUP, &signal_handler, NULL);
>>> +    sigaction(SIGINT, &signal_handler, NULL);
>>> +    sigaction(SIGTERM, &signal_handler, NULL);
>>> +
>>> +    signal_handler.sa_handler = nostop_handler;
>>> +
>>> +    sigaction(SIGTSTP, &signal_handler, NULL);
>>> +
>>> +    signal_handler.sa_handler = alarm_handler;
>>> +
>>> +    sigaction(SIGALRM, &signal_handler, NULL);
>>> +
>>> +    signal_handler.sa_handler = child_handler;
>>> +    sigemptyset(&signal_handler.sa_mask);
>>> +
>>> +    sigaction(SIGCHLD, &signal_handler, NULL);
>>>      }
>>>  
>>>      alarm(1000);
>>> +    alarm_msg = "Timeout during client startup.\n";
>>>      /* long timeout for testing, where process may be stopped in 
>>> debugger */
>>>  
>>>  #ifdef USE_DDD
>>> @@ -511,7 +560,7 @@
>>>      }
>>>  
>>>      if (show_on)
>>> -        exit(0);
>>> +        exit(EXIT_SUCCESS);
>>>           /*Hostid exchange start */
>>>      /* accept incoming connections, read port numbers */
>>> @@ -522,6 +571,9 @@
>>>  ACCEPT_HID:
>>>          sockaddr_len = sizeof(sockaddr);
>>>          s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
>>> +
>>> +    alarm_msg = "Timeout during hostid exchange.\n";
>>> +
>>>          if (s1 < 0) {
>>>              if (errno == EINTR)
>>>                  goto ACCEPT_HID;
>>> @@ -592,7 +644,7 @@
>>>              hostids = (int *) malloc(hostidlen * nprocs);
>>>              if (hostids == NULL) {
>>>                  perror("malloc");
>>> -                exit(1);
>>> +                exit(EXIT_FAILURE);
>>>              }
>>>          }
>>>  
>>> @@ -626,66 +678,33 @@
>>>          }
>>>      }
>>>  
>>> -    /* close all opend sockets */
>>> -    for (i = 0; i < nprocs; i++) {
>>> -        close(plist[i].control_socket);
>>> -    }
>>> -
>>>      alarm(1000); -    /* let enbale the timer again*/
>>> +    alarm_msg = "Timeout during address exchange.\n";
>>> +    /* lets enable the timer again*/
>>>  
>>>      /* Lets read all other information, LID QP,etc..*/
>>>  
>>>      /* accept incoming connections, read port numbers */
>>>      for (i = 0; i < nprocs; i++) {
>>> -        int version, rank, nread;
>>> -        char pidstr[12];
>>> -ACCEPT:
>>> -        sockaddr_len = sizeof(sockaddr);
>>> -        s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
>>> -        if (s1 < 0) {
>>> -            if (errno == EINTR)
>>> -                goto ACCEPT;
>>> -            perror("accept");
>>> -            cleanup();
>>> -        }
>>> +        int nread;
>>>  
>>>          /*           * protocol: -         *  We don't need version 
>>> number,
>>> -         *  0. read rank of process
>>> -         *  1. read address length
>>> -         *  2. read address itself
>>> -         *  3. send array of all addresses
>>> +         *  We don't need the version number or the rank,
>>> +         *  0. read address length
>>> +         *  1. read address itself
>>> +         *  2. send array of all addresses
>>>           */
>>>  
>>> -        /* 0. Find out who we're talking to */
>>> -        nread = read(s1, &rank, sizeof(rank));
>>> -        if (nread != sizeof(rank)) {
>>> -            perror("read");
>>> -            cleanup();
>>> -        }
>>> -
>>> -        if (rank < 0 || rank >= nprocs -                || 
>>> plist[rank].state != P_STARTED) {
>>> -            fprintf(stderr, "mpirun: invalid rank received. \n");
>>> -            cleanup();
>>> -        }
>>> -
>>> -        plist[rank].control_socket = s1;
>>> -        plist[rank].state = P_CONNECTED;
>>> +        plist[i].state = P_CONNECTED;
>>>  
>>>          /* Let us know connection was established
>>>           * printf("MPIRUN_RSH: Process rank %d connected\n",rank);
>>>           */
>>>  
>>>          /* 1. Find out length of the data */
>>> -        nread = read(s1, &addrlen, sizeof(addrlen));
>>> +        nread = read(plist[i].control_socket, &addrlen, 
>>> sizeof(addrlen));
>>>          if (nread != sizeof(addrlen)) {
>>> -            /* nread == 0 is not actually an error! */
>>> -            if (nread == 0) -                continue;
>>> -
>>>              perror("read");
>>>              cleanup();
>>>          }
>>> @@ -707,21 +726,20 @@
>>>              alladdrs = (int *) malloc(addrlen * nprocs);
>>>              if (alladdrs == NULL) {
>>>                  perror("malloc");
>>> -                exit(1);
>>> +                exit(EXIT_FAILURE);
>>>              }
>>>          }
>>>  
>>>          /* 2. Read info from each process */
>>>  
>>>          /* for byte location */
>>> -        alladdrs_char = (char *) &alladdrs[rank * addrlen / 
>>> sizeof(int)];
>>> +        alladdrs_char = (char *) &alladdrs[i * addrlen / sizeof(int)];
>>>  
>>>          tot_nread = 0;
>>>  
>>>          while (tot_nread < addrlen) {
>>> -            nread =
>>> -                read(s1, (void *) (alladdrs_char + tot_nread),
>>> -                        addrlen - tot_nread);
>>> +            nread = read(plist[i].control_socket,
>>> +        (void *) (alladdrs_char + tot_nread), addrlen - tot_nread);
>>>  
>>>              if (nread < 0) {
>>>                  perror("read");
>>> @@ -733,36 +751,32 @@
>>>  
>>>  read_pid:
>>>          /* 3. Find out length of the data */
>>> -        nread = read(s1, &pidlen, sizeof(pidlen));
>>> +        nread = read(plist[i].control_socket, &pidlen, sizeof(pidlen));
>>>          if (nread != sizeof(pidlen)) {
>>>              perror("read");
>>>              cleanup();
>>>          }
>>>  
>>>          /*fprintf(stderr, "read Pid lengths %d and %d \n", pidlen, 
>>> nread);*/
>>> -        if (i == 0) {
>>> -            pidglen = pidlen;
>>> -        } else {
>>> -            if (pidlen != pidglen) {
>>> -                fprintf(stderr, "Pid lengths %d and %d do not 
>>> match\n", -                        pidlen, pidglen);
>>> -                cleanup();
>>> -            }
>>> -        }
>>> +    if (pidlen != pidglen) {
>>> +      fprintf(stderr, "Pid lengths %d and %d do not match\n", 
>>> +          pidlen, pidglen);
>>> +      cleanup();
>>> +    }
>>>  
>>>          if (i == 0) {
>>> -            /* allocate as soon as we know the address length */
>>> +            /* allocate as soon as we know the pid length */
>>>              allpids = (char *)malloc(pidlen * nprocs);
>>>              if (allpids == NULL) {
>>>                  perror("malloc");
>>> -                exit(1);
>>> +                exit(EXIT_FAILURE);
>>>              }
>>>          }
>>>  
>>>          tot_nread=0;
>>>          while(tot_nread < pidlen) {
>>> -            nread = read(s1, (void*)(allpids+rank*pidlen+tot_nread), 
>>> -                    pidlen - tot_nread);
>>> +            nread = read(plist[i].control_socket,
>>> +        (void*)(allpids+i*pidlen+tot_nread), pidlen - tot_nread);
>>>              /*fprintf(stderr, "read length %d \n", nread);*/
>>>              if(nread < 0) {
>>>                  perror("read");
>>> @@ -770,6 +784,9 @@
>>>              }
>>>              tot_nread += nread;
>>>          }
>>> +
>>> +    plist[i].remote_pid = *((pid_t *)(allpids+i*pidlen));
>>> +    pglist_insert(plist[i].hostname, plist[i].remote_pid);
>>>      }
>>>  
>>>  
>>> @@ -795,7 +812,7 @@
>>>      out_addrs = (int *) malloc(out_addrs_len);
>>>      if (out_addrs == NULL) {
>>>          perror("malloc");
>>> -        exit(1);
>>> +        exit(EXIT_FAILURE);
>>>      }
>>>  
>>>      for (i = 0; i < nprocs; i++) {
>>> @@ -876,8 +893,7 @@
>>>          sleep(100);
>>>      }
>>>      close(s);
>>> -    exit(0);
>>> -
>>> +    exit(EXIT_SUCCESS);
>>>  }
>>>  
>>>  int start_process(int i, char *command_name, char *env)
>>> @@ -925,12 +941,12 @@
>>>      if ((remote_command = malloc(str_len)) == NULL) {
>>>          fprintf(stderr, "Failed to malloc %d bytes for 
>>> remote_command\n",
>>>                  str_len);
>>> -        exit(1);
>>> +        exit(EXIT_FAILURE);
>>>      }
>>>      if ((xterm_command = malloc(str_len)) == NULL) {
>>>          fprintf(stderr, "Failed to malloc %d bytes for 
>>> xterm_command\n",
>>>                  str_len);
>>> -        exit(1);
>>> +        exit(EXIT_FAILURE);
>>>      }
>>>  
>>>  
>>> @@ -1010,7 +1026,7 @@
>>>          if (!show_on) {
>>>              perror("RSH/SSH command failed!");
>>>          }
>>> -        exit(1);
>>> +        exit(EXIT_FAILURE);
>>>      }
>>>  
>>>      free(remote_command);
>>> @@ -1189,8 +1205,6 @@
>>>      fprintf(stderr, "\ta.out      => " "name of MPI binary\n");
>>>      fprintf(stderr, "\targs       => " "arguments for MPI binary\n");
>>>      fprintf(stderr, "\n");
>>> -
>>> -    exit(1);
>>>  }
>>>  
>>>  /* finds first non-whitespace char in input string */
>>> @@ -1221,7 +1235,7 @@
>>>      if (hf == NULL) {
>>>          fprintf(stderr, "Can't open hostfile %s\n", hostfile_name);
>>>          perror("open");
>>> -        exit(1);
>>> +        exit(EXIT_FAILURE);
>>>      }
>>>           for (i = 0; i < nprocs; i++) {
>>> @@ -1287,7 +1301,7 @@
>>>          } else {
>>>              fprintf(stderr, "End of file reached on "
>>>                      "hostfile at %d of %d hostnames\n", i, nprocs);
>>> -            exit(1);
>>> +            exit(EXIT_FAILURE);
>>>          }
>>>      }
>>>      fclose(hf);
>>> @@ -1321,14 +1335,14 @@
>>>      if ((pf = fopen(paramfile, "r")) == NULL) {
>>>          sprintf(errstr, "Cant open paramfile = %s", paramfile);
>>>          perror(errstr);
>>> -        exit(1);
>>> +        exit(EXIT_FAILURE);
>>>      }
>>>  
>>>      if ( strlen(env) == 0 ){
>>>          /* Allocating space for env first time */
>>>          if ((env = malloc(ENV_LEN)) == NULL) {
>>>              fprintf(stderr, "Malloc of env failed in 
>>> read_param_file\n");
>>> -            exit(1);
>>> +            exit(EXIT_FAILURE);
>>>          }
>>>          env_left = ENV_LEN - 1;
>>>      }else{
>>> @@ -1367,7 +1381,7 @@
>>>                  (ENV_LEN > e_len + 1 ? ENV_LEN : e_len + 1) + 
>>> strlen(env);
>>>              if ((env = realloc(env, newlen)) == NULL) {
>>>                  fprintf(stderr, "realloc failed in read_param_file\n");
>>> -                exit(1);
>>> +                exit(EXIT_FAILURE);
>>>              }
>>>              if (param_debug) {
>>>                  printf("realloc to %d\n", newlen);
>>> @@ -1395,15 +1409,213 @@
>>>      }
>>>      cleanup();
>>>  
>>> -    exit(1);
>>> +    exit(EXIT_FAILURE);
>>> +}
>>> +
>>> +void pglist_print(void) {
>>> +    if(pglist) {
>>> +    int i, j;
>>> +    size_t npids = 0, npids_allocated = 0;
>>> +
>>> +    fprintf(stderr, "\n--pglist--\ndata:\n");
>>> +    for(i = 0; i < pglist->npgs; i++) {
>>> +        fprintf(stderr, "%p - %s:", &pglist->data[i],
>>> +            pglist->data[i].hostname);
>>> +
>>> +        for(j = 0; j < pglist->data[i].npids; j++) {
>>> +        fprintf(stderr, " %d", pglist->data[i].pids[j]);
>>> +        }
>>> +
>>> +        fprintf(stderr, "\n");
>>> +        npids        += pglist->data[i].npids;
>>> +        npids_allocated += pglist->data[i].npids_allocated;
>>> +    }
>>> +
>>> +    fprintf(stderr, "\nindex:");
>>> +    for(i = 0; i < pglist->npgs; i++) {
>>> +        fprintf(stderr, " %p", pglist->index[i]);
>>> +    }
>>> +
>>> +    fprintf(stderr, "\nnpgs/allocated: %d/%d (%d%%)\n", pglist->npgs,
>>> +        pglist->npgs_allocated, (int)(pglist->npgs_allocated ? 100. *
>>> +            pglist->npgs / pglist->npgs_allocated : 100.));
>>> +    fprintf(stderr, "npids/allocated: %d/%d (%d%%)\n", npids,
>>> +        npids_allocated, (int)(npids_allocated ? 100. * npids /
>>> +            npids_allocated : 100.));
>>> +    fprintf(stderr, "--pglist--\n\n");
>>> +    }
>>> +}
>>> +
>>> +void pglist_insert(const char * const hostname, const pid_t const 
>>> pid) {
>>> +    const size_t increment = nprocs > 4 ? nprocs / 4 : 1;
>>> +    size_t index = 0, bottom = 0, top;
>>> +    static size_t alloc_error = 0;
>>> +    int i, strcmp_result;
>>> +    process_group * pg;
>>> +    void * backup_ptr;
>>> +
>>> +    if(alloc_error) return;
>>> +    if(pglist == NULL) goto init_pglist;
>>> +
>>> +    top = pglist->npgs - 1;
>>> +    index = (top + bottom) / 2;
>>> +
>>> +    while(strcmp_result = strcmp(hostname, 
>>> pglist->index[index]->hostname)) {
>>> +    if(bottom >= top) break;
>>> +
>>> +    if(strcmp_result > 0) {
>>> +        bottom = index + 1;
>>> +    }
>>> +
>>> +    else {
>>> +        top = index - 1;
>>> +    }
>>> +
>>> +    index = (top + bottom) / 2;
>>> +    }
>>> +
>>> +    if(!strcmp_result) goto insert_pid;
>>> +    if(strcmp_result > 0) index++;
>>> +
>>> +    goto add_process_group;
>>> +
>>> +init_pglist:
>>> +    pglist = malloc(sizeof(process_groups));
>>> +
>>> +    if(pglist) {
>>> +    pglist->data        = NULL;
>>> +    pglist->index        = NULL;
>>> +    pglist->npgs        = 0;
>>> +    pglist->npgs_allocated    = 0;
>>> +    }
>>> +
>>> +    else {
>>> +    goto register_alloc_error;
>>> +    }
>>> +
>>> +add_process_group:
>>> +    if(pglist->npgs == pglist->npgs_allocated) {
>>> +    process_group * pglist_data_backup    = pglist->data;
>>> +    process_group ** pglist_index_backup    = pglist->index;
>>> +    ptrdiff_t offset;
>>> +
>>> +    pglist->npgs_allocated += increment;
>>> +
>>> +    backup_ptr = pglist->data;
>>> +    pglist->data = realloc(pglist->data, sizeof(process_group) *
>>> +        pglist->npgs_allocated);
>>> +
>>> +    if(pglist->data == NULL) {
>>> +        pglist->data = backup_ptr;
>>> +        goto register_alloc_error;
>>> +    }
>>> +
>>> +    backup_ptr = pglist->index;
>>> +    pglist->index = realloc(pglist->index, sizeof(process_group *) *
>>> +        pglist->npgs_allocated);
>>> +
>>> +    if(pglist->index == NULL) {
>>> +        pglist->index = backup_ptr;
>>> +        goto register_alloc_error;
>>> +    }
>>> +
>>> +    if(offset = (size_t)pglist->data - (size_t)pglist_data_backup) { 
>>> +        for(i = 0; i < pglist->npgs; i++) {
>>> +        pglist->index[i] = (process_group *)((size_t)pglist->index[i] +
>>> +            offset);
>>> +        }
>>> +    }
>>> +    }
>>> +
>>> +    for(i = pglist->npgs; i > index; i--) {
>>> +    pglist->index[i] = pglist->index[i-1];
>>> +    }
>>> +
>>> +    pglist->data[pglist->npgs].hostname        = hostname;
>>> +    pglist->data[pglist->npgs].pids        = NULL;
>>> +    pglist->data[pglist->npgs].npids        = 0;
>>> +    pglist->data[pglist->npgs].npids_allocated    = 0;
>>> +
>>> +    pglist->index[index] = &pglist->data[pglist->npgs++];
>>> +
>>> +insert_pid:
>>> +    pg = pglist->index[index];
>>> +
>>> +    if(pg->npids == pg->npids_allocated) {
>>> +    if(pg->npids_allocated) {
>>> +        pg->npids_allocated <<= 1;
>>> +
>>> +        if(pg->npids_allocated < pg->npids) pg->npids_allocated = 
>>> SIZE_MAX;
>>> +        if(pg->npids_allocated > nprocs) pg->npids_allocated = nprocs;
>>> +    }
>>> +
>>> +    else {
>>> +        pg->npids_allocated = 1;
>>> +    }
>>> +
>>> +    backup_ptr = pg->pids;
>>> +    pg->pids = realloc(pg->pids, pg->npids_allocated * sizeof(pid_t));
>>> +
>>> +    if(pg->pids == NULL) {
>>> +        pg->pids = backup_ptr;
>>> +        goto register_alloc_error;
>>> +    }
>>> +    }
>>> +
>>> +    pg->pids[pg->npids++] = pid;
>>> +
>>> +    return;
>>> +
>>> +register_alloc_error:
>>> +    if(pglist) {
>>> +    if(pglist->data) {
>>> +        process_group * pg = pglist->data;
>>> +
>>> +        while(pglist->npgs--) {
>>> +        if(pg->pids) free((pg++)->pids);
>>> +        }
>>> +
>>> +        free(pglist->data);
>>> +    }
>>> +
>>> +    if(pglist->index) free(pglist->index);
>>> +
>>> +    free(pglist);
>>> +    }
>>> +
>>> +    alloc_error = 1;
>>> +}
>>> +
>>> +void free_memory(void) {
>>> +    if(pglist) {
>>> +    if(pglist->data) {
>>> +        process_group * pg = pglist->data;
>>> +
>>> +        while(pglist->npgs--) {
>>> +        if(pg->pids) free((pg++)->pids);
>>> +        }
>>> +
>>> +        free(pglist->data);
>>> +    }
>>> +
>>> +    if(pglist->index) free(pglist->index);
>>> +
>>> +    free(pglist);
>>> +    }
>>> +
>>> +    if(plist) {
>>> +    while(nprocs--) {
>>> +        if(plist[nprocs].device) free(plist[nprocs].device);
>>> +        if(plist[nprocs].hostname) free(plist[nprocs].hostname);
>>> +    }
>>> +
>>> +    free(plist);
>>> +    }
>>>  }
>>>  
>>>  void cleanup(void)
>>>  {
>>>      int i;
>>> -    /* could walk through list of processes, but it looks
>>> -       like we can just send the signal to the process group
>>> -     */
>>>  
>>>      if (use_totalview) {
>>>      fprintf(stderr, "Cleaning up all processes ...");
>>> @@ -1417,36 +1629,180 @@
>>>      }
>>>  
>>>      for (i = 0; i < nprocs; i++) {
>>> -        if (RUNNING(i)) {
>>> -            /* send terminal interrupt, which will hopefully 
>>> -               propagate to the other side. (not sure what xterm will
>>> -               do here.
>>> -             */
>>> -            kill(plist[i].pid, SIGINT);
>>> -        }
>>> +    if (RUNNING(i)) {
>>> +        /* send terminal interrupt, which will hopefully +           
>>> propagate to the other side. (not sure what xterm will
>>> +           do here.
>>> +         */
>>> +        kill(plist[i].pid, SIGINT);
>>> +    }
>>>      }
>>> +
>>>      sleep(1);
>>>  
>>>      for (i = 0; i < nprocs; i++) {
>>> -        if (plist[i].state != P_NOTSTARTED) {
>>> -            /* send regular interrupt to rsh */
>>> -            kill(plist[i].pid, SIGTERM);
>>> -        }
>>> +    if (plist[i].state != P_NOTSTARTED) {
>>> +        /* send regular interrupt to rsh */
>>> +        kill(plist[i].pid, SIGTERM);
>>> +    }
>>>      }
>>>  
>>>      sleep(1);
>>>  
>>>      for (i = 0; i < nprocs; i++) {
>>> -        if (plist[i].state != P_NOTSTARTED) {
>>> -            /* Kill the processes */
>>> -            kill(plist[i].pid, SIGKILL);
>>> -        }
>>> +    if (plist[i].state != P_NOTSTARTED) {
>>> +        /* Kill the processes */
>>> +        kill(plist[i].pid, SIGKILL);
>>> +    }
>>> +    }
>>> +
>>> +    if(pglist) {
>>> +    rkill_fast();
>>> +    }
>>> +
>>> +    else {
>>> +    rkill_linear();
>>> +    }
>>> +
>>> +    exit(EXIT_FAILURE);
>>> +}
>>> +
>>> +void rkill_fast(void) {
>>> +    int i, j, tryagain, spawned_pid[pglist->npgs];
>>> +
>>> +    fprintf(stderr, "Killing remote processes...");
>>> +
>>> +    for(i = 0; i < pglist->npgs; i++) {
>>> +    if(0 == (spawned_pid[i] = fork())) {
>>> +        if(pglist->index[i]->npids) {
>>> +        const size_t bufsize = 40 + 10 * pglist->index[i]->npids;
>>> +        const process_group * pg = pglist->index[i];
>>> +        char kill_cmd[bufsize], tmp[10];
>>> +
>>> +        kill_cmd[0] = '\0';
>>> +        strcat(kill_cmd, "kill -s SIGKILL");
>>> +
>>> +        for(j = 0; j < pg->npids; j++) {
>>> +            snprintf(tmp, 10, " %d", pg->pids[j]);
>>> +            strcat(kill_cmd, tmp);
>>> +        }
>>> +
>>> +        strcat(kill_cmd, " >&/dev/null");
>>> +
>>> +        if(use_rsh) {
>>> +            execl(RSH_CMD, RSH_CMD, pg->hostname, kill_cmd, NULL);
>>> +        }
>>> +
>>> +        else {
>>> +            execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x", pg->hostname,
>>> +                kill_cmd, NULL);
>>> +        }
>>> +
>>> +        perror(NULL);
>>> +        exit(EXIT_FAILURE);
>>> +        }
>>> +
>>> +        else {
>>> +        exit(EXIT_SUCCESS);
>>> +        }
>>> +    }
>>> +    }
>>> +
>>> +    while(1) {
>>> +    static int iteration = 0;
>>> +    tryagain = 0;
>>> +
>>> +    sleep(1 << iteration);
>>> +
>>> +    for (i = 0; i < pglist->npgs; i++) {
>>> +        if(spawned_pid[i]) {
>>> +        if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, 
>>> WNOHANG))) {
>>> +            tryagain = 1;
>>> +        }
>>> +        }
>>> +    }
>>> +
>>> +    if(++iteration == 5 || !tryagain) {
>>> +        fprintf(stderr, "DONE\n");
>>> +        break;
>>> +    }
>>> +    }
>>> +
>>> +    if(tryagain) {
>>> +    fprintf(stderr, "The following processes may have not been 
>>> killed:\n");
>>> +    for (i = 0; i < pglist->npgs; i++) {
>>> +        if(spawned_pid[i]) {
>>> +        const process_group * pg = pglist->index[i];
>>> +
>>> +        fprintf(stderr, "%s:", pg->hostname);
>>> +
>>> +        for (j = 0; j < pg->npids; j++) {
>>> +            fprintf(stderr, " %d", pg->pids[j]);
>>> +        }
>>> +
>>> +        fprintf(stderr, "\n");
>>> +        }
>>> +    }
>>> +    }
>>> +}
>>> +
>>> +void rkill_linear(void) {
>>> +    int i, j, tryagain, spawned_pid[nprocs];
>>> +
>>> +    fprintf(stderr, "Killing remote processes...");
>>> +
>>> +    for (i = 0; i < nprocs; i++) {
>>> +    if(0 == (spawned_pid[i] = fork())) {
>>> +        char kill_cmd[80];
>>> +
>>> +        if(!plist[i].remote_pid) exit(EXIT_SUCCESS);
>>> +
>>> +        snprintf(kill_cmd, 80, "kill -s SIGKILL %d >&/dev/null",
>>> +        plist[i].remote_pid);
>>> +
>>> +        if(use_rsh) {
>>> +        execl(RSH_CMD, RSH_CMD, plist[i].hostname, kill_cmd, NULL);
>>> +        }
>>> +
>>> +        else {
>>> +        execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x",
>>> +            plist[i].hostname, kill_cmd, NULL);
>>> +        }
>>> +
>>> +        perror(NULL);
>>> +        exit(EXIT_FAILURE);
>>> +    }
>>>      }
>>>  
>>> -    fprintf(stderr, "done.\n");
>>> +    while(1) {
>>> +    static int iteration = 0;
>>> +    tryagain = 0;
>>> +
>>> +    sleep(1 << iteration);
>>> +
>>> +    for (i = 0; i < nprocs; i++) {
>>> +        if(spawned_pid[i]) {
>>> +        if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, 
>>> WNOHANG))) {
>>> +            tryagain = 1;
>>> +        }
>>> +        }
>>> +    }
>>>  
>>> -    exit(1);
>>> +    if(++iteration == 5 || !tryagain) {
>>> +        fprintf(stderr, "DONE\n");
>>> +        break;
>>> +    }
>>> +    }
>>>  
>>> +    if(tryagain) {
>>> +    fprintf(stderr, "The following processes may have not been 
>>> killed:\n");
>>> +    for (i = 0; i < nprocs; i++) {
>>> +        if(spawned_pid[i]) {
>>> +        fprintf(stderr, "%s [%d]\n", plist[i].hostname,
>>> +            plist[i].remote_pid);
>>> +        }
>>> +    }
>>> +    }
>>>  }
>>>  
>>>  
>>> @@ -1457,9 +1813,13 @@
>>>  
>>>  void alarm_handler(int signal)
>>>  {
>>> +    extern const char * alarm_msg;
>>> +
>>>      if (use_totalview) {
>>>      fprintf(stderr, "Timeout alarm signaled\n");
>>>      }
>>> +
>>> +    if(alarm_msg) fprintf(stderr, alarm_msg);
>>>      cleanup();
>>>  }
>>>  
>>> @@ -1467,19 +1827,21 @@
>>>  void child_handler(int signal)
>>>  {
>>>      int status, i, child, pid;
>>> -    int exitstatus = 0;
>>> +    int exitstatus = EXIT_SUCCESS;
>>>  
>>>      if (use_totalview) {
>>>      fprintf(stderr, "mpirun: child died. Waiting for others.\n"); 
>>>      }
>>>      alarm(10);
>>> +    alarm_msg = "Child died. Timeout while waiting for others.\n";
>>> +
>>>      for (i = 0; i < nprocs; i++) {
>>>          pid = wait(&status);
>>>          if (pid == -1) {
>>>              perror("wait");
>>> -            exitstatus = 1;
>>> +            exitstatus = EXIT_FAILURE;
>>>          } else if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
>>> -            exitstatus = 1;
>>> +            exitstatus = EXIT_FAILURE;
>>>          }
>>>          for (child = 0; child < nprocs; child++) {
>>>              if (plist[child].pid == pid) {
>>> @@ -1489,9 +1851,11 @@
>>>          }
>>>          if (child == nprocs) {
>>>              fprintf(stderr, "Unable to find child %d!\n", pid);
>>> -            exitstatus = 1;
>>> +            exitstatus = EXIT_FAILURE;
>>>          }
>>>      }
>>>      alarm(0);
>>>      exit(exitstatus);
>>>  }
>>> +
>>> +/* vi:set sw=4 sts=4 tw=80: */
>>> diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client.h 
>>> exp1/mpid/ch_gen2/process/pmgr_client.h
>>> --- 0.9.9/mpid/ch_gen2/process/pmgr_client.h    2007-05-29 
>>> 03:47:10.000000000 -0400
>>> +++ exp1/mpid/ch_gen2/process/pmgr_client.h    2007-07-02 
>>> 12:59:51.000000000 -0400
>>> @@ -108,6 +108,6 @@
>>>   * of the spawner, e.g. mpirun_rsh, to check that it understands
>>>   * the version of the executable.
>>>   */
>>> -#define PMGR_VERSION 5
>>> +#define PMGR_VERSION 6
>>>  
>>>  #endif
>>> diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c 
>>> exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c
>>> --- 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c    2007-05-29 
>>> 03:47:10.000000000 -0400
>>> +++ exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c    2007-07-02 
>>> 12:59:51.000000000 -0400
>>> @@ -171,6 +171,9 @@
>>>      int  nwritten;
>>>      int version;
>>>      struct sockaddr_in sockaddr;
>>> +
>>> +    if(phase != 0) return;
>>> +
>>>     /*
>>>       * Exchange information with the mpirun program. Send it our
>>>       * socket address, get back addresses for our siblings.
>>> @@ -208,14 +211,12 @@
>>>       */
>>>  
>>>      version = PMGR_VERSION;
>>> -    if (0 == phase) {
>>> -        /* first, send a version number */
>>> -        nwritten = write(mpirun_socket, &version, sizeof(version));
>>> -        if (nwritten != sizeof(version)) {
>>> -            sleep(2);
>>> -            perror("write");
>>> -            exit(1);
>>> -        }
>>> +    /* first, send a version number */
>>> +    nwritten = write(mpirun_socket, &version, sizeof(version));
>>> +    if (nwritten != sizeof(version)) {
>>> +    sleep(2);
>>> +    perror("write");
>>> +    exit(1);
>>>      }
>>>           /* next, send our rank */
>>> @@ -264,7 +265,6 @@
>>>          tot_nread = tot_nread + nread;
>>>      }
>>>      fflush(stdout);
>>> -    close(mpirun_socket);
>>>      return 1;
>>>  }
>>>  
>>> @@ -280,7 +280,6 @@
>>>      pid_t *ppids = (pid_t *)pallpids;
>>>      pid_t *allpids = NULL;
>>>  
>>> -    pmgr_init_connection(1);
>>>      /* next, send size of addr */
>>>      nwritten = write(mpirun_socket, &addrlen, sizeof(addrlen));
>>>      if (nwritten != sizeof(addrlen)) {
>>> @@ -314,7 +313,7 @@
>>>         exit(1);
>>>      }
>>>  
>>> -    /* next, send size of addr */
>>> +    /* next, send size of pid */
>>>      nwritten = write(mpirun_socket, &pidlen, sizeof(pidlen));
>>>      if (nwritten != sizeof(mypid_len)) {
>>>        sleep(2);
>>> @@ -322,6 +321,7 @@
>>>        exit(1);
>>>      }
>>>  
>>> +    /* next, send our pid */
>>>      if (pidlen != 0) {
>>>        nwritten = write(mpirun_socket, &my_pid_int, (size_t) pidlen);
>>>        if (nwritten != pidlen) {
>>> @@ -345,7 +345,7 @@
>>>  
>>>      if (pidlen != 0) {
>>>             tot_nread=0;
>>> -           /* finally, read addresses from all processes */
>>> +           /* finally, read pids from all processes */
>>>             while (tot_nread < pmgr_nprocs*pidlen) {
>>>          nread = read(mpirun_socket, (void*)((char 
>>> *)allpids+tot_nread),                      (size_t) 
>>> ((pmgr_nprocs*pidlen)-tot_nread));
>>
> 

-- 
***********************************
 >> Mark J. Potts, PhD
 >>
 >> HPC Applications Inc.
 >> phone: 410-992-8360 Bus
 >>        410-313-9318 Home
 >>        443-418-4375 Cell
 >> email: potts at hpcapplications.com
 >>        potts at excray.com
***********************************


More information about the mvapich-discuss mailing list