[mvapich-discuss] mvapich jobs cleanup

Sayantan Sur surs at cse.ohio-state.edu
Sat Jul 14 14:09:45 EDT 2007


Hi Mark,

Thanks for trying out the patch. I'm wondering if you plugged in the 
patch and recompiled everything (MPI libraries, and benchmarks)?

Thanks,
Sayantan.

Mark Potts wrote:
> Sayantan,
>    No luck yet with that mpirun_rsh patch.
>    I'm getting:
>    "mpirun: executable version 5 does not match our version 6"
>    "Killing remote processes...DONE"
>    as soon as I attempt to start up the new mpirun_rsh.
>
>    I can see that the patch changes the version number but can't yet
>    understand to whom "executable version" and "our" refer.
>    Hints?  Directions?
>
>            regards,
>
> Sayantan Sur wrote:
>> Hi Mark,
>>
>> We have a patch to solve this stray process issue with MVAPICH-0.9.9. 
>> I'm attaching the patch with this email. To apply the patch please 
>> follow these steps:
>>
>> $ cd mvapich-0.9.9
>> $ #save mpirun_rsh_patch to this directory
>> $ patch -p1 < mpirun_rsh_patch
>>
>> Could you please let us know if this patch solves the problem for you?
>>
>> Thanks,
>> Sayantan.
>>
>>
>> Mark Potts wrote:
>>> Hi,
>>>    We are observing a number of cases in which MVAPICH-0.9.9
>>>    jobs launched with mpirun_rsh leave stray processes on some
>>>    nodes when the job terminates abnormally.  Those stray
>>>    processes continue to run forever and require recognition
>>>    and killing.
>>>
>>>    Is there a reason this happens with MVAPICH, and is there a
>>>    way to prevent it.  This doesn't seem to be the behavior
>>>    that occurs for abnormally terminated Voltaire MPI or Intel
>>>    MPI jobs.
>>>          regards,
>>
>>
>>
>> ------------------------------------------------------------------------
>>
>> diff -ruN 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c 
>> exp1/mpid/ch_gen2/process/mpirun_rsh.c
>> --- 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c    2007-05-29 
>> 03:47:10.000000000 -0400
>> +++ exp1/mpid/ch_gen2/process/mpirun_rsh.c    2007-07-09 
>> 11:56:32.000000000 -0400
>> @@ -59,6 +59,7 @@
>>  #define _GNU_SOURCE
>>  #include <getopt.h>
>>  #include <stdlib.h>
>> +#include <stddef.h>
>>  #include <stdio.h>
>>  #include <signal.h>
>>  #include <unistd.h>
>> @@ -91,20 +92,34 @@
>>  typedef struct {
>>      char *hostname;
>>      char *device;
>> -    int pid;
>> +    pid_t pid;
>> +    pid_t remote_pid;
>>      int port;
>>      int control_socket;
>>      process_state state;
>>  } process;
>>  
>> +typedef struct {
>> +    const char * hostname;
>> +    pid_t * pids;
>> +    size_t npids, npids_allocated;
>> +} process_group;
>> +
>> +typedef struct {
>> +    process_group * data;
>> +    process_group ** index;
>> +    size_t npgs, npgs_allocated;
>> +} process_groups;
>> +
>>  #define RUNNING(i) ((plist[i].state == P_STARTED ||                 \
>>              plist[i].state == P_CONNECTED ||                        \
>>              plist[i].state == P_RUNNING) ? 1 : 0)
>>  
>>  /* other information: a.out and rank are implicit. */
>>  
>> -process *plist;
>> -int nprocs;
>> +process_groups * pglist = NULL;
>> +process * plist = NULL;
>> +int nprocs = 0;
>>  int aout_index, port;
>>  #define MAX_WD_LEN 256
>>  char wd[MAX_WD_LEN];            /* working directory of current 
>> process */
>> @@ -112,11 +127,19 @@
>>  char mpirun_host[MAX_HOST_LEN]; /* hostname of current process */
>>  /* xxx need to add checking for string overflow, do this more 
>> carefully ... */
>>  
>> +/*
>> + * Message notifying user of what timed out
>> + */
>> +static const char * alarm_msg = NULL;
>>  
>>  #define COMMAND_LEN 2000
>>  #define SEPARATOR ':'
>>  
>> -
>> +void free_memory(void);
>> +void pglist_print(void);
>> +void pglist_insert(const char * const, const pid_t const);
>> +void rkill_fast(void);
>> +void rkill_linear(void);
>>  void cleanup_handler(int);
>>  void nostop_handler(int);
>>  void alarm_handler(int);
>> @@ -239,15 +262,19 @@
>>      int    hostname_len = 0;
>>      totalview_cmd[199] = 0;
>>      display[0]='\0';   
>> -     +    pidglen = sizeof(pid_t); +
>>      /* mpirun [-debug] [-xterm] -np N [-hostfile hfile | h1 h2 h3 
>> ... hN] a.out [args] */
>>  
>> +    atexit(free_memory);
>> +
>>      do {
>>          c = getopt_long_only(argc, argv, "+", option_table, 
>> &option_index);
>>          switch (c) {
>>          case '?':
>>          case ':':
>>              usage();
>> +        exit(EXIT_FAILURE);
>>              break;
>>          case EOF:
>>              break;
>> @@ -255,8 +282,10 @@
>>              switch (option_index) {
>>              case 0:
>>                  nprocs = atoi(optarg);
>> -                if (nprocs < 1)
>> +                if (nprocs < 1) {
>>                      usage();
>> +            exit(EXIT_FAILURE);
>> +        }
>>                  break;
>>              case 1:
>>                  debug_on = 1;
>> @@ -290,11 +319,11 @@
>>              case 8:
>>                  show_version();
>>                  usage();
>> -                exit(0);
>> +                exit(EXIT_SUCCESS);
>>                  break;
>>              case 9:
>>                  show_version();
>> -                exit(0);
>> +                exit(EXIT_SUCCESS);
>>                  break;
>>           case 10:
>>           use_totalview = 1;
>> @@ -311,17 +340,19 @@
>>            break;
>>              case 11:
>>                  usage();
>> -                exit(0);
>> +                exit(EXIT_SUCCESS);
>>                  break;
>>              default:
>>                  fprintf(stderr, "Unknown option\n");
>>                  usage();
>> +        exit(EXIT_FAILURE);
>>                  break;
>>              }
>>              break;
>>          default:
>>              fprintf(stderr, "Unreachable statement!\n");
>>              usage();
>> +        exit(EXIT_FAILURE);
>>              break;
>>          }
>>      } while (c != EOF);
>> @@ -332,7 +363,7 @@
>>              fprintf(stderr, "Without hostfile option, hostnames must 
>> be "
>>                      "specified on command line.\n");
>>              usage();
>> -            exit(1);
>> +            exit(EXIT_FAILURE);
>>          }
>>          aout_index = nprocs + optind;
>>      } else {
>> @@ -361,13 +392,14 @@
>>      plist = malloc(nprocs * sizeof(process));
>>      if (plist == NULL) {
>>          perror("malloc");
>> -        exit(1);
>> +        exit(EXIT_FAILURE);
>>      }
>>  
>>      for (i = 0; i < nprocs; i++) {
>>          plist[i].state = P_NOTSTARTED;
>>          plist[i].device = NULL;
>>          plist[i].port = -1;
>> +    plist[i].remote_pid = 0;
>>      }
>>  
>>      /* grab hosts from command line or file */
>> @@ -376,7 +408,7 @@
>>          hostname_len = read_hostfile(hostfile);
>>      } else {
>>          for (i = 0; i < nprocs; i++) {
>> -            plist[i].hostname = argv[optind + i];
>> +            plist[i].hostname = (char *)strndup(argv[optind + i], 100);
>>              hostname_len = hostname_len > strlen(plist[i].hostname) ?
>>                  hostname_len : strlen(plist[i].hostname);          }
>> @@ -388,7 +420,7 @@
>>                   if (!mpirun_processes) {
>>              perror("malloc");
>> -            exit(1);
>> +            exit(EXIT_FAILURE);
>>          } else {              memset(mpirun_processes, 0, nprocs * 
>> (hostname_len + 4));
>>          }
>> @@ -412,18 +444,18 @@
>>      s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
>>      if (s < 0) {
>>          perror("socket");
>> -        exit(1);
>> +        exit(EXIT_FAILURE);
>>      }
>>      sockaddr.sin_addr.s_addr = INADDR_ANY;
>>      sockaddr.sin_port = 0;
>>      if (bind(s, (struct sockaddr *) &sockaddr, sockaddr_len) < 0) {
>>          perror("bind");
>> -        exit(1);
>> +        exit(EXIT_FAILURE);
>>      }
>>  
>>      if (getsockname(s, (struct sockaddr *) &sockaddr, &sockaddr_len) 
>> < 0) {
>>          perror("getsockname");
>> -        exit(1);
>> +        exit(EXIT_FAILURE);
>>      }
>>  
>>      port = (int) ntohs(sockaddr.sin_port);
>> @@ -431,14 +463,31 @@
>>  
>>  
>>      if (!show_on) {
>> -    signal(SIGHUP, cleanup_handler);
>> -    signal(SIGINT, cleanup_handler);
>> -    signal(SIGTSTP, nostop_handler);
>> -    signal(SIGCHLD, child_handler);
>> -    signal(SIGALRM, alarm_handler);
>> +    struct sigaction signal_handler;
>> +    signal_handler.sa_handler = cleanup_handler;
>> +    sigfillset(&signal_handler.sa_mask);
>> +    signal_handler.sa_flags = 0;
>> +
>> +    sigaction(SIGHUP, &signal_handler, NULL);
>> +    sigaction(SIGINT, &signal_handler, NULL);
>> +    sigaction(SIGTERM, &signal_handler, NULL);
>> +
>> +    signal_handler.sa_handler = nostop_handler;
>> +
>> +    sigaction(SIGTSTP, &signal_handler, NULL);
>> +
>> +    signal_handler.sa_handler = alarm_handler;
>> +
>> +    sigaction(SIGALRM, &signal_handler, NULL);
>> +
>> +    signal_handler.sa_handler = child_handler;
>> +    sigemptyset(&signal_handler.sa_mask);
>> +
>> +    sigaction(SIGCHLD, &signal_handler, NULL);
>>      }
>>  
>>      alarm(1000);
>> +    alarm_msg = "Timeout during client startup.\n";
>>      /* long timeout for testing, where process may be stopped in 
>> debugger */
>>  
>>  #ifdef USE_DDD
>> @@ -511,7 +560,7 @@
>>      }
>>  
>>      if (show_on)
>> -        exit(0);
>> +        exit(EXIT_SUCCESS);
>>           /*Hostid exchange start */
>>      /* accept incoming connections, read port numbers */
>> @@ -522,6 +571,9 @@
>>  ACCEPT_HID:
>>          sockaddr_len = sizeof(sockaddr);
>>          s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
>> +
>> +    alarm_msg = "Timeout during hostid exchange.\n";
>> +
>>          if (s1 < 0) {
>>              if (errno == EINTR)
>>                  goto ACCEPT_HID;
>> @@ -592,7 +644,7 @@
>>              hostids = (int *) malloc(hostidlen * nprocs);
>>              if (hostids == NULL) {
>>                  perror("malloc");
>> -                exit(1);
>> +                exit(EXIT_FAILURE);
>>              }
>>          }
>>  
>> @@ -626,66 +678,33 @@
>>          }
>>      }
>>  
>> -    /* close all opend sockets */
>> -    for (i = 0; i < nprocs; i++) {
>> -        close(plist[i].control_socket);
>> -    }
>> -
>>      alarm(1000); -    /* let enbale the timer again*/
>> +    alarm_msg = "Timeout during address exchange.\n";
>> +    /* lets enable the timer again*/
>>  
>>      /* Lets read all other information, LID QP,etc..*/
>>  
>>      /* accept incoming connections, read port numbers */
>>      for (i = 0; i < nprocs; i++) {
>> -        int version, rank, nread;
>> -        char pidstr[12];
>> -ACCEPT:
>> -        sockaddr_len = sizeof(sockaddr);
>> -        s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
>> -        if (s1 < 0) {
>> -            if (errno == EINTR)
>> -                goto ACCEPT;
>> -            perror("accept");
>> -            cleanup();
>> -        }
>> +        int nread;
>>  
>>          /*           * protocol: -         *  We don't need version 
>> number,
>> -         *  0. read rank of process
>> -         *  1. read address length
>> -         *  2. read address itself
>> -         *  3. send array of all addresses
>> +         *  We don't need the version number or the rank,
>> +         *  0. read address length
>> +         *  1. read address itself
>> +         *  2. send array of all addresses
>>           */
>>  
>> -        /* 0. Find out who we're talking to */
>> -        nread = read(s1, &rank, sizeof(rank));
>> -        if (nread != sizeof(rank)) {
>> -            perror("read");
>> -            cleanup();
>> -        }
>> -
>> -        if (rank < 0 || rank >= nprocs -                || 
>> plist[rank].state != P_STARTED) {
>> -            fprintf(stderr, "mpirun: invalid rank received. \n");
>> -            cleanup();
>> -        }
>> -
>> -        plist[rank].control_socket = s1;
>> -        plist[rank].state = P_CONNECTED;
>> +        plist[i].state = P_CONNECTED;
>>  
>>          /* Let us know connection was established
>>           * printf("MPIRUN_RSH: Process rank %d connected\n",rank);
>>           */
>>  
>>          /* 1. Find out length of the data */
>> -        nread = read(s1, &addrlen, sizeof(addrlen));
>> +        nread = read(plist[i].control_socket, &addrlen, 
>> sizeof(addrlen));
>>          if (nread != sizeof(addrlen)) {
>> -            /* nread == 0 is not actually an error! */
>> -            if (nread == 0) -                continue;
>> -
>>              perror("read");
>>              cleanup();
>>          }
>> @@ -707,21 +726,20 @@
>>              alladdrs = (int *) malloc(addrlen * nprocs);
>>              if (alladdrs == NULL) {
>>                  perror("malloc");
>> -                exit(1);
>> +                exit(EXIT_FAILURE);
>>              }
>>          }
>>  
>>          /* 2. Read info from each process */
>>  
>>          /* for byte location */
>> -        alladdrs_char = (char *) &alladdrs[rank * addrlen / 
>> sizeof(int)];
>> +        alladdrs_char = (char *) &alladdrs[i * addrlen / sizeof(int)];
>>  
>>          tot_nread = 0;
>>  
>>          while (tot_nread < addrlen) {
>> -            nread =
>> -                read(s1, (void *) (alladdrs_char + tot_nread),
>> -                        addrlen - tot_nread);
>> +            nread = read(plist[i].control_socket,
>> +        (void *) (alladdrs_char + tot_nread), addrlen - tot_nread);
>>  
>>              if (nread < 0) {
>>                  perror("read");
>> @@ -733,36 +751,32 @@
>>  
>>  read_pid:
>>          /* 3. Find out length of the data */
>> -        nread = read(s1, &pidlen, sizeof(pidlen));
>> +        nread = read(plist[i].control_socket, &pidlen, sizeof(pidlen));
>>          if (nread != sizeof(pidlen)) {
>>              perror("read");
>>              cleanup();
>>          }
>>  
>>          /*fprintf(stderr, "read Pid lengths %d and %d \n", pidlen, 
>> nread);*/
>> -        if (i == 0) {
>> -            pidglen = pidlen;
>> -        } else {
>> -            if (pidlen != pidglen) {
>> -                fprintf(stderr, "Pid lengths %d and %d do not 
>> match\n", -                        pidlen, pidglen);
>> -                cleanup();
>> -            }
>> -        }
>> +    if (pidlen != pidglen) {
>> +      fprintf(stderr, "Pid lengths %d and %d do not match\n", 
>> +          pidlen, pidglen);
>> +      cleanup();
>> +    }
>>  
>>          if (i == 0) {
>> -            /* allocate as soon as we know the address length */
>> +            /* allocate as soon as we know the pid length */
>>              allpids = (char *)malloc(pidlen * nprocs);
>>              if (allpids == NULL) {
>>                  perror("malloc");
>> -                exit(1);
>> +                exit(EXIT_FAILURE);
>>              }
>>          }
>>  
>>          tot_nread=0;
>>          while(tot_nread < pidlen) {
>> -            nread = read(s1, (void*)(allpids+rank*pidlen+tot_nread), 
>> -                    pidlen - tot_nread);
>> +            nread = read(plist[i].control_socket,
>> +        (void*)(allpids+i*pidlen+tot_nread), pidlen - tot_nread);
>>              /*fprintf(stderr, "read length %d \n", nread);*/
>>              if(nread < 0) {
>>                  perror("read");
>> @@ -770,6 +784,9 @@
>>              }
>>              tot_nread += nread;
>>          }
>> +
>> +    plist[i].remote_pid = *((pid_t *)(allpids+i*pidlen));
>> +    pglist_insert(plist[i].hostname, plist[i].remote_pid);
>>      }
>>  
>>  
>> @@ -795,7 +812,7 @@
>>      out_addrs = (int *) malloc(out_addrs_len);
>>      if (out_addrs == NULL) {
>>          perror("malloc");
>> -        exit(1);
>> +        exit(EXIT_FAILURE);
>>      }
>>  
>>      for (i = 0; i < nprocs; i++) {
>> @@ -876,8 +893,7 @@
>>          sleep(100);
>>      }
>>      close(s);
>> -    exit(0);
>> -
>> +    exit(EXIT_SUCCESS);
>>  }
>>  
>>  int start_process(int i, char *command_name, char *env)
>> @@ -925,12 +941,12 @@
>>      if ((remote_command = malloc(str_len)) == NULL) {
>>          fprintf(stderr, "Failed to malloc %d bytes for 
>> remote_command\n",
>>                  str_len);
>> -        exit(1);
>> +        exit(EXIT_FAILURE);
>>      }
>>      if ((xterm_command = malloc(str_len)) == NULL) {
>>          fprintf(stderr, "Failed to malloc %d bytes for 
>> xterm_command\n",
>>                  str_len);
>> -        exit(1);
>> +        exit(EXIT_FAILURE);
>>      }
>>  
>>  
>> @@ -1010,7 +1026,7 @@
>>          if (!show_on) {
>>              perror("RSH/SSH command failed!");
>>          }
>> -        exit(1);
>> +        exit(EXIT_FAILURE);
>>      }
>>  
>>      free(remote_command);
>> @@ -1189,8 +1205,6 @@
>>      fprintf(stderr, "\ta.out      => " "name of MPI binary\n");
>>      fprintf(stderr, "\targs       => " "arguments for MPI binary\n");
>>      fprintf(stderr, "\n");
>> -
>> -    exit(1);
>>  }
>>  
>>  /* finds first non-whitespace char in input string */
>> @@ -1221,7 +1235,7 @@
>>      if (hf == NULL) {
>>          fprintf(stderr, "Can't open hostfile %s\n", hostfile_name);
>>          perror("open");
>> -        exit(1);
>> +        exit(EXIT_FAILURE);
>>      }
>>           for (i = 0; i < nprocs; i++) {
>> @@ -1287,7 +1301,7 @@
>>          } else {
>>              fprintf(stderr, "End of file reached on "
>>                      "hostfile at %d of %d hostnames\n", i, nprocs);
>> -            exit(1);
>> +            exit(EXIT_FAILURE);
>>          }
>>      }
>>      fclose(hf);
>> @@ -1321,14 +1335,14 @@
>>      if ((pf = fopen(paramfile, "r")) == NULL) {
>>          sprintf(errstr, "Cant open paramfile = %s", paramfile);
>>          perror(errstr);
>> -        exit(1);
>> +        exit(EXIT_FAILURE);
>>      }
>>  
>>      if ( strlen(env) == 0 ){
>>          /* Allocating space for env first time */
>>          if ((env = malloc(ENV_LEN)) == NULL) {
>>              fprintf(stderr, "Malloc of env failed in 
>> read_param_file\n");
>> -            exit(1);
>> +            exit(EXIT_FAILURE);
>>          }
>>          env_left = ENV_LEN - 1;
>>      }else{
>> @@ -1367,7 +1381,7 @@
>>                  (ENV_LEN > e_len + 1 ? ENV_LEN : e_len + 1) + 
>> strlen(env);
>>              if ((env = realloc(env, newlen)) == NULL) {
>>                  fprintf(stderr, "realloc failed in read_param_file\n");
>> -                exit(1);
>> +                exit(EXIT_FAILURE);
>>              }
>>              if (param_debug) {
>>                  printf("realloc to %d\n", newlen);
>> @@ -1395,15 +1409,213 @@
>>      }
>>      cleanup();
>>  
>> -    exit(1);
>> +    exit(EXIT_FAILURE);
>> +}
>> +
>> +void pglist_print(void) {
>> +    if(pglist) {
>> +    int i, j;
>> +    size_t npids = 0, npids_allocated = 0;
>> +
>> +    fprintf(stderr, "\n--pglist--\ndata:\n");
>> +    for(i = 0; i < pglist->npgs; i++) {
>> +        fprintf(stderr, "%p - %s:", &pglist->data[i],
>> +            pglist->data[i].hostname);
>> +
>> +        for(j = 0; j < pglist->data[i].npids; j++) {
>> +        fprintf(stderr, " %d", pglist->data[i].pids[j]);
>> +        }
>> +
>> +        fprintf(stderr, "\n");
>> +        npids        += pglist->data[i].npids;
>> +        npids_allocated += pglist->data[i].npids_allocated;
>> +    }
>> +
>> +    fprintf(stderr, "\nindex:");
>> +    for(i = 0; i < pglist->npgs; i++) {
>> +        fprintf(stderr, " %p", pglist->index[i]);
>> +    }
>> +
>> +    fprintf(stderr, "\nnpgs/allocated: %d/%d (%d%%)\n", pglist->npgs,
>> +        pglist->npgs_allocated, (int)(pglist->npgs_allocated ? 100. *
>> +            pglist->npgs / pglist->npgs_allocated : 100.));
>> +    fprintf(stderr, "npids/allocated: %d/%d (%d%%)\n", npids,
>> +        npids_allocated, (int)(npids_allocated ? 100. * npids /
>> +            npids_allocated : 100.));
>> +    fprintf(stderr, "--pglist--\n\n");
>> +    }
>> +}
>> +
>> +void pglist_insert(const char * const hostname, const pid_t const 
>> pid) {
>> +    const size_t increment = nprocs > 4 ? nprocs / 4 : 1;
>> +    size_t index = 0, bottom = 0, top;
>> +    static size_t alloc_error = 0;
>> +    int i, strcmp_result;
>> +    process_group * pg;
>> +    void * backup_ptr;
>> +
>> +    if(alloc_error) return;
>> +    if(pglist == NULL) goto init_pglist;
>> +
>> +    top = pglist->npgs - 1;
>> +    index = (top + bottom) / 2;
>> +
>> +    while(strcmp_result = strcmp(hostname, 
>> pglist->index[index]->hostname)) {
>> +    if(bottom >= top) break;
>> +
>> +    if(strcmp_result > 0) {
>> +        bottom = index + 1;
>> +    }
>> +
>> +    else {
>> +        top = index - 1;
>> +    }
>> +
>> +    index = (top + bottom) / 2;
>> +    }
>> +
>> +    if(!strcmp_result) goto insert_pid;
>> +    if(strcmp_result > 0) index++;
>> +
>> +    goto add_process_group;
>> +
>> +init_pglist:
>> +    pglist = malloc(sizeof(process_groups));
>> +
>> +    if(pglist) {
>> +    pglist->data        = NULL;
>> +    pglist->index        = NULL;
>> +    pglist->npgs        = 0;
>> +    pglist->npgs_allocated    = 0;
>> +    }
>> +
>> +    else {
>> +    goto register_alloc_error;
>> +    }
>> +
>> +add_process_group:
>> +    if(pglist->npgs == pglist->npgs_allocated) {
>> +    process_group * pglist_data_backup    = pglist->data;
>> +    process_group ** pglist_index_backup    = pglist->index;
>> +    ptrdiff_t offset;
>> +
>> +    pglist->npgs_allocated += increment;
>> +
>> +    backup_ptr = pglist->data;
>> +    pglist->data = realloc(pglist->data, sizeof(process_group) *
>> +        pglist->npgs_allocated);
>> +
>> +    if(pglist->data == NULL) {
>> +        pglist->data = backup_ptr;
>> +        goto register_alloc_error;
>> +    }
>> +
>> +    backup_ptr = pglist->index;
>> +    pglist->index = realloc(pglist->index, sizeof(process_group *) *
>> +        pglist->npgs_allocated);
>> +
>> +    if(pglist->index == NULL) {
>> +        pglist->index = backup_ptr;
>> +        goto register_alloc_error;
>> +    }
>> +
>> +    if(offset = (size_t)pglist->data - (size_t)pglist_data_backup) { 
>> +        for(i = 0; i < pglist->npgs; i++) {
>> +        pglist->index[i] = (process_group *)((size_t)pglist->index[i] +
>> +            offset);
>> +        }
>> +    }
>> +    }
>> +
>> +    for(i = pglist->npgs; i > index; i--) {
>> +    pglist->index[i] = pglist->index[i-1];
>> +    }
>> +
>> +    pglist->data[pglist->npgs].hostname        = hostname;
>> +    pglist->data[pglist->npgs].pids        = NULL;
>> +    pglist->data[pglist->npgs].npids        = 0;
>> +    pglist->data[pglist->npgs].npids_allocated    = 0;
>> +
>> +    pglist->index[index] = &pglist->data[pglist->npgs++];
>> +
>> +insert_pid:
>> +    pg = pglist->index[index];
>> +
>> +    if(pg->npids == pg->npids_allocated) {
>> +    if(pg->npids_allocated) {
>> +        pg->npids_allocated <<= 1;
>> +
>> +        if(pg->npids_allocated < pg->npids) pg->npids_allocated = 
>> SIZE_MAX;
>> +        if(pg->npids_allocated > nprocs) pg->npids_allocated = nprocs;
>> +    }
>> +
>> +    else {
>> +        pg->npids_allocated = 1;
>> +    }
>> +
>> +    backup_ptr = pg->pids;
>> +    pg->pids = realloc(pg->pids, pg->npids_allocated * sizeof(pid_t));
>> +
>> +    if(pg->pids == NULL) {
>> +        pg->pids = backup_ptr;
>> +        goto register_alloc_error;
>> +    }
>> +    }
>> +
>> +    pg->pids[pg->npids++] = pid;
>> +
>> +    return;
>> +
>> +register_alloc_error:
>> +    if(pglist) {
>> +    if(pglist->data) {
>> +        process_group * pg = pglist->data;
>> +
>> +        while(pglist->npgs--) {
>> +        if(pg->pids) free((pg++)->pids);
>> +        }
>> +
>> +        free(pglist->data);
>> +    }
>> +
>> +    if(pglist->index) free(pglist->index);
>> +
>> +    free(pglist);
>> +    }
>> +
>> +    alloc_error = 1;
>> +}
>> +
>> +void free_memory(void) {
>> +    if(pglist) {
>> +    if(pglist->data) {
>> +        process_group * pg = pglist->data;
>> +
>> +        while(pglist->npgs--) {
>> +        if(pg->pids) free((pg++)->pids);
>> +        }
>> +
>> +        free(pglist->data);
>> +    }
>> +
>> +    if(pglist->index) free(pglist->index);
>> +
>> +    free(pglist);
>> +    }
>> +
>> +    if(plist) {
>> +    while(nprocs--) {
>> +        if(plist[nprocs].device) free(plist[nprocs].device);
>> +        if(plist[nprocs].hostname) free(plist[nprocs].hostname);
>> +    }
>> +
>> +    free(plist);
>> +    }
>>  }
>>  
>>  void cleanup(void)
>>  {
>>      int i;
>> -    /* could walk through list of processes, but it looks
>> -       like we can just send the signal to the process group
>> -     */
>>  
>>      if (use_totalview) {
>>      fprintf(stderr, "Cleaning up all processes ...");
>> @@ -1417,36 +1629,180 @@
>>      }
>>  
>>      for (i = 0; i < nprocs; i++) {
>> -        if (RUNNING(i)) {
>> -            /* send terminal interrupt, which will hopefully 
>> -               propagate to the other side. (not sure what xterm will
>> -               do here.
>> -             */
>> -            kill(plist[i].pid, SIGINT);
>> -        }
>> +    if (RUNNING(i)) {
>> +        /* send terminal interrupt, which will hopefully +           
>> propagate to the other side. (not sure what xterm will
>> +           do here.
>> +         */
>> +        kill(plist[i].pid, SIGINT);
>> +    }
>>      }
>> +
>>      sleep(1);
>>  
>>      for (i = 0; i < nprocs; i++) {
>> -        if (plist[i].state != P_NOTSTARTED) {
>> -            /* send regular interrupt to rsh */
>> -            kill(plist[i].pid, SIGTERM);
>> -        }
>> +    if (plist[i].state != P_NOTSTARTED) {
>> +        /* send regular interrupt to rsh */
>> +        kill(plist[i].pid, SIGTERM);
>> +    }
>>      }
>>  
>>      sleep(1);
>>  
>>      for (i = 0; i < nprocs; i++) {
>> -        if (plist[i].state != P_NOTSTARTED) {
>> -            /* Kill the processes */
>> -            kill(plist[i].pid, SIGKILL);
>> -        }
>> +    if (plist[i].state != P_NOTSTARTED) {
>> +        /* Kill the processes */
>> +        kill(plist[i].pid, SIGKILL);
>> +    }
>> +    }
>> +
>> +    if(pglist) {
>> +    rkill_fast();
>> +    }
>> +
>> +    else {
>> +    rkill_linear();
>> +    }
>> +
>> +    exit(EXIT_FAILURE);
>> +}
>> +
>> +void rkill_fast(void) {
>> +    int i, j, tryagain, spawned_pid[pglist->npgs];
>> +
>> +    fprintf(stderr, "Killing remote processes...");
>> +
>> +    for(i = 0; i < pglist->npgs; i++) {
>> +    if(0 == (spawned_pid[i] = fork())) {
>> +        if(pglist->index[i]->npids) {
>> +        const size_t bufsize = 40 + 10 * pglist->index[i]->npids;
>> +        const process_group * pg = pglist->index[i];
>> +        char kill_cmd[bufsize], tmp[10];
>> +
>> +        kill_cmd[0] = '\0';
>> +        strcat(kill_cmd, "kill -s SIGKILL");
>> +
>> +        for(j = 0; j < pg->npids; j++) {
>> +            snprintf(tmp, 10, " %d", pg->pids[j]);
>> +            strcat(kill_cmd, tmp);
>> +        }
>> +
>> +        strcat(kill_cmd, " >&/dev/null");
>> +
>> +        if(use_rsh) {
>> +            execl(RSH_CMD, RSH_CMD, pg->hostname, kill_cmd, NULL);
>> +        }
>> +
>> +        else {
>> +            execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x", pg->hostname,
>> +                kill_cmd, NULL);
>> +        }
>> +
>> +        perror(NULL);
>> +        exit(EXIT_FAILURE);
>> +        }
>> +
>> +        else {
>> +        exit(EXIT_SUCCESS);
>> +        }
>> +    }
>> +    }
>> +
>> +    while(1) {
>> +    static int iteration = 0;
>> +    tryagain = 0;
>> +
>> +    sleep(1 << iteration);
>> +
>> +    for (i = 0; i < pglist->npgs; i++) {
>> +        if(spawned_pid[i]) {
>> +        if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, 
>> WNOHANG))) {
>> +            tryagain = 1;
>> +        }
>> +        }
>> +    }
>> +
>> +    if(++iteration == 5 || !tryagain) {
>> +        fprintf(stderr, "DONE\n");
>> +        break;
>> +    }
>> +    }
>> +
>> +    if(tryagain) {
>> +    fprintf(stderr, "The following processes may have not been 
>> killed:\n");
>> +    for (i = 0; i < pglist->npgs; i++) {
>> +        if(spawned_pid[i]) {
>> +        const process_group * pg = pglist->index[i];
>> +
>> +        fprintf(stderr, "%s:", pg->hostname);
>> +
>> +        for (j = 0; j < pg->npids; j++) {
>> +            fprintf(stderr, " %d", pg->pids[j]);
>> +        }
>> +
>> +        fprintf(stderr, "\n");
>> +        }
>> +    }
>> +    }
>> +}
>> +
>> +void rkill_linear(void) {
>> +    int i, j, tryagain, spawned_pid[nprocs];
>> +
>> +    fprintf(stderr, "Killing remote processes...");
>> +
>> +    for (i = 0; i < nprocs; i++) {
>> +    if(0 == (spawned_pid[i] = fork())) {
>> +        char kill_cmd[80];
>> +
>> +        if(!plist[i].remote_pid) exit(EXIT_SUCCESS);
>> +
>> +        snprintf(kill_cmd, 80, "kill -s SIGKILL %d >&/dev/null",
>> +        plist[i].remote_pid);
>> +
>> +        if(use_rsh) {
>> +        execl(RSH_CMD, RSH_CMD, plist[i].hostname, kill_cmd, NULL);
>> +        }
>> +
>> +        else {
>> +        execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x",
>> +            plist[i].hostname, kill_cmd, NULL);
>> +        }
>> +
>> +        perror(NULL);
>> +        exit(EXIT_FAILURE);
>> +    }
>>      }
>>  
>> -    fprintf(stderr, "done.\n");
>> +    while(1) {
>> +    static int iteration = 0;
>> +    tryagain = 0;
>> +
>> +    sleep(1 << iteration);
>> +
>> +    for (i = 0; i < nprocs; i++) {
>> +        if(spawned_pid[i]) {
>> +        if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, 
>> WNOHANG))) {
>> +            tryagain = 1;
>> +        }
>> +        }
>> +    }
>>  
>> -    exit(1);
>> +    if(++iteration == 5 || !tryagain) {
>> +        fprintf(stderr, "DONE\n");
>> +        break;
>> +    }
>> +    }
>>  
>> +    if(tryagain) {
>> +    fprintf(stderr, "The following processes may have not been 
>> killed:\n");
>> +    for (i = 0; i < nprocs; i++) {
>> +        if(spawned_pid[i]) {
>> +        fprintf(stderr, "%s [%d]\n", plist[i].hostname,
>> +            plist[i].remote_pid);
>> +        }
>> +    }
>> +    }
>>  }
>>  
>>  
>> @@ -1457,9 +1813,13 @@
>>  
>>  void alarm_handler(int signal)
>>  {
>> +    extern const char * alarm_msg;
>> +
>>      if (use_totalview) {
>>      fprintf(stderr, "Timeout alarm signaled\n");
>>      }
>> +
>> +    if(alarm_msg) fprintf(stderr, alarm_msg);
>>      cleanup();
>>  }
>>  
>> @@ -1467,19 +1827,21 @@
>>  void child_handler(int signal)
>>  {
>>      int status, i, child, pid;
>> -    int exitstatus = 0;
>> +    int exitstatus = EXIT_SUCCESS;
>>  
>>      if (use_totalview) {
>>      fprintf(stderr, "mpirun: child died. Waiting for others.\n"); 
>>      }
>>      alarm(10);
>> +    alarm_msg = "Child died. Timeout while waiting for others.\n";
>> +
>>      for (i = 0; i < nprocs; i++) {
>>          pid = wait(&status);
>>          if (pid == -1) {
>>              perror("wait");
>> -            exitstatus = 1;
>> +            exitstatus = EXIT_FAILURE;
>>          } else if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
>> -            exitstatus = 1;
>> +            exitstatus = EXIT_FAILURE;
>>          }
>>          for (child = 0; child < nprocs; child++) {
>>              if (plist[child].pid == pid) {
>> @@ -1489,9 +1851,11 @@
>>          }
>>          if (child == nprocs) {
>>              fprintf(stderr, "Unable to find child %d!\n", pid);
>> -            exitstatus = 1;
>> +            exitstatus = EXIT_FAILURE;
>>          }
>>      }
>>      alarm(0);
>>      exit(exitstatus);
>>  }
>> +
>> +/* vi:set sw=4 sts=4 tw=80: */
>> diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client.h 
>> exp1/mpid/ch_gen2/process/pmgr_client.h
>> --- 0.9.9/mpid/ch_gen2/process/pmgr_client.h    2007-05-29 
>> 03:47:10.000000000 -0400
>> +++ exp1/mpid/ch_gen2/process/pmgr_client.h    2007-07-02 
>> 12:59:51.000000000 -0400
>> @@ -108,6 +108,6 @@
>>   * of the spawner, e.g. mpirun_rsh, to check that it understands
>>   * the version of the executable.
>>   */
>> -#define PMGR_VERSION 5
>> +#define PMGR_VERSION 6
>>  
>>  #endif
>> diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c 
>> exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c
>> --- 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c    2007-05-29 
>> 03:47:10.000000000 -0400
>> +++ exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c    2007-07-02 
>> 12:59:51.000000000 -0400
>> @@ -171,6 +171,9 @@
>>      int  nwritten;
>>      int version;
>>      struct sockaddr_in sockaddr;
>> +
>> +    if(phase != 0) return;
>> +
>>     /*
>>       * Exchange information with the mpirun program. Send it our
>>       * socket address, get back addresses for our siblings.
>> @@ -208,14 +211,12 @@
>>       */
>>  
>>      version = PMGR_VERSION;
>> -    if (0 == phase) {
>> -        /* first, send a version number */
>> -        nwritten = write(mpirun_socket, &version, sizeof(version));
>> -        if (nwritten != sizeof(version)) {
>> -            sleep(2);
>> -            perror("write");
>> -            exit(1);
>> -        }
>> +    /* first, send a version number */
>> +    nwritten = write(mpirun_socket, &version, sizeof(version));
>> +    if (nwritten != sizeof(version)) {
>> +    sleep(2);
>> +    perror("write");
>> +    exit(1);
>>      }
>>           /* next, send our rank */
>> @@ -264,7 +265,6 @@
>>          tot_nread = tot_nread + nread;
>>      }
>>      fflush(stdout);
>> -    close(mpirun_socket);
>>      return 1;
>>  }
>>  
>> @@ -280,7 +280,6 @@
>>      pid_t *ppids = (pid_t *)pallpids;
>>      pid_t *allpids = NULL;
>>  
>> -    pmgr_init_connection(1);
>>      /* next, send size of addr */
>>      nwritten = write(mpirun_socket, &addrlen, sizeof(addrlen));
>>      if (nwritten != sizeof(addrlen)) {
>> @@ -314,7 +313,7 @@
>>         exit(1);
>>      }
>>  
>> -    /* next, send size of addr */
>> +    /* next, send size of pid */
>>      nwritten = write(mpirun_socket, &pidlen, sizeof(pidlen));
>>      if (nwritten != sizeof(mypid_len)) {
>>        sleep(2);
>> @@ -322,6 +321,7 @@
>>        exit(1);
>>      }
>>  
>> +    /* next, send our pid */
>>      if (pidlen != 0) {
>>        nwritten = write(mpirun_socket, &my_pid_int, (size_t) pidlen);
>>        if (nwritten != pidlen) {
>> @@ -345,7 +345,7 @@
>>  
>>      if (pidlen != 0) {
>>             tot_nread=0;
>> -           /* finally, read addresses from all processes */
>> +           /* finally, read pids from all processes */
>>             while (tot_nread < pmgr_nprocs*pidlen) {
>>          nread = read(mpirun_socket, (void*)((char 
>> *)allpids+tot_nread),                      (size_t) 
>> ((pmgr_nprocs*pidlen)-tot_nread));
>

-- 
http://www.cse.ohio-state.edu/~surs



More information about the mvapich-discuss mailing list