[mvapich-discuss] mvapich jobs cleanup

Mark Potts potts at hpcapplications.com
Fri Jul 13 21:21:03 EDT 2007


Sayantan,
    No luck yet with that mpirun_rsh patch.
    I'm getting:
    "mpirun: executable version 5 does not match our version 6"
    "Killing remote processes...DONE"
    as soon as I attempt to start up the new mpirun_rsh.

    I can see that the patch changes the version number but can't yet
    understand to whom "executable version" and "our" refer.
    Hints?  Directions?

            regards,

Sayantan Sur wrote:
> Hi Mark,
> 
> We have a patch to solve this stray process issue with MVAPICH-0.9.9. 
> I'm attaching the patch with this email. To apply the patch please 
> follow these steps:
> 
> $ cd mvapich-0.9.9
> $ #save mpirun_rsh_patch to this directory
> $ patch -p1 < mpirun_rsh_patch
> 
> Could you please let us know if this patch solves the problem for you?
> 
> Thanks,
> Sayantan.
> 
> 
> Mark Potts wrote:
>> Hi,
>>    We are observing a number of cases in which MVAPICH-0.9.9
>>    jobs launched with mpirun_rsh leave stray processes on some
>>    nodes when the job terminates abnormally.  Those stray
>>    processes continue to run forever and require recognition
>>    and killing.
>>
>>    Is there a reason this happens with MVAPICH, and is there a
>>    way to prevent it.  This doesn't seem to be the behavior
>>    that occurs for abnormally terminated Voltaire MPI or Intel
>>    MPI jobs.
>>          regards,
> 
> 
> 
> ------------------------------------------------------------------------
> 
> diff -ruN 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c exp1/mpid/ch_gen2/process/mpirun_rsh.c
> --- 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c	2007-05-29 03:47:10.000000000 -0400
> +++ exp1/mpid/ch_gen2/process/mpirun_rsh.c	2007-07-09 11:56:32.000000000 -0400
> @@ -59,6 +59,7 @@
>  #define _GNU_SOURCE
>  #include <getopt.h>
>  #include <stdlib.h>
> +#include <stddef.h>
>  #include <stdio.h>
>  #include <signal.h>
>  #include <unistd.h>
> @@ -91,20 +92,34 @@
>  typedef struct {
>      char *hostname;
>      char *device;
> -    int pid;
> +    pid_t pid;
> +    pid_t remote_pid;
>      int port;
>      int control_socket;
>      process_state state;
>  } process;
>  
> +typedef struct {
> +    const char * hostname;
> +    pid_t * pids;
> +    size_t npids, npids_allocated;
> +} process_group;
> +
> +typedef struct {
> +    process_group * data;
> +    process_group ** index;
> +    size_t npgs, npgs_allocated;
> +} process_groups;
> +
>  #define RUNNING(i) ((plist[i].state == P_STARTED ||                 \
>              plist[i].state == P_CONNECTED ||                        \
>              plist[i].state == P_RUNNING) ? 1 : 0)
>  
>  /* other information: a.out and rank are implicit. */
>  
> -process *plist;
> -int nprocs;
> +process_groups * pglist = NULL;
> +process * plist = NULL;
> +int nprocs = 0;
>  int aout_index, port;
>  #define MAX_WD_LEN 256
>  char wd[MAX_WD_LEN];            /* working directory of current process */
> @@ -112,11 +127,19 @@
>  char mpirun_host[MAX_HOST_LEN]; /* hostname of current process */
>  /* xxx need to add checking for string overflow, do this more carefully ... */
>  
> +/*
> + * Message notifying user of what timed out
> + */
> +static const char * alarm_msg = NULL;
>  
>  #define COMMAND_LEN 2000
>  #define SEPARATOR ':'
>  
> -
> +void free_memory(void);
> +void pglist_print(void);
> +void pglist_insert(const char * const, const pid_t const);
> +void rkill_fast(void);
> +void rkill_linear(void);
>  void cleanup_handler(int);
>  void nostop_handler(int);
>  void alarm_handler(int);
> @@ -239,15 +262,19 @@
>      int    hostname_len = 0;
>      totalview_cmd[199] = 0;
>      display[0]='\0';	
> -     
> +    pidglen = sizeof(pid_t); 
> +
>      /* mpirun [-debug] [-xterm] -np N [-hostfile hfile | h1 h2 h3 ... hN] a.out [args] */
>  
> +    atexit(free_memory);
> +
>      do {
>          c = getopt_long_only(argc, argv, "+", option_table, &option_index);
>          switch (c) {
>          case '?':
>          case ':':
>              usage();
> +	    exit(EXIT_FAILURE);
>              break;
>          case EOF:
>              break;
> @@ -255,8 +282,10 @@
>              switch (option_index) {
>              case 0:
>                  nprocs = atoi(optarg);
> -                if (nprocs < 1)
> +                if (nprocs < 1) {
>                      usage();
> +		    exit(EXIT_FAILURE);
> +		}
>                  break;
>              case 1:
>                  debug_on = 1;
> @@ -290,11 +319,11 @@
>              case 8:
>                  show_version();
>                  usage();
> -                exit(0);
> +                exit(EXIT_SUCCESS);
>                  break;
>              case 9:
>                  show_version();
> -                exit(0);
> +                exit(EXIT_SUCCESS);
>                  break;
>   	    case 10:
>   		use_totalview = 1;
> @@ -311,17 +340,19 @@
>    		break;
>              case 11:
>                  usage();
> -                exit(0);
> +                exit(EXIT_SUCCESS);
>                  break;
>              default:
>                  fprintf(stderr, "Unknown option\n");
>                  usage();
> +		exit(EXIT_FAILURE);
>                  break;
>              }
>              break;
>          default:
>              fprintf(stderr, "Unreachable statement!\n");
>              usage();
> +	    exit(EXIT_FAILURE);
>              break;
>          }
>      } while (c != EOF);
> @@ -332,7 +363,7 @@
>              fprintf(stderr, "Without hostfile option, hostnames must be "
>                      "specified on command line.\n");
>              usage();
> -            exit(1);
> +            exit(EXIT_FAILURE);
>          }
>          aout_index = nprocs + optind;
>      } else {
> @@ -361,13 +392,14 @@
>      plist = malloc(nprocs * sizeof(process));
>      if (plist == NULL) {
>          perror("malloc");
> -        exit(1);
> +        exit(EXIT_FAILURE);
>      }
>  
>      for (i = 0; i < nprocs; i++) {
>          plist[i].state = P_NOTSTARTED;
>          plist[i].device = NULL;
>          plist[i].port = -1;
> +	plist[i].remote_pid = 0;
>      }
>  
>      /* grab hosts from command line or file */
> @@ -376,7 +408,7 @@
>          hostname_len = read_hostfile(hostfile);
>      } else {
>          for (i = 0; i < nprocs; i++) {
> -            plist[i].hostname = argv[optind + i];
> +            plist[i].hostname = (char *)strndup(argv[optind + i], 100);
>      	    hostname_len = hostname_len > strlen(plist[i].hostname) ?
>      		    hostname_len : strlen(plist[i].hostname); 
>          }
> @@ -388,7 +420,7 @@
>          
>          if (!mpirun_processes) {
>              perror("malloc");
> -            exit(1);
> +            exit(EXIT_FAILURE);
>          } else { 
>              memset(mpirun_processes, 0, nprocs * (hostname_len + 4));
>          }
> @@ -412,18 +444,18 @@
>      s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
>      if (s < 0) {
>          perror("socket");
> -        exit(1);
> +        exit(EXIT_FAILURE);
>      }
>      sockaddr.sin_addr.s_addr = INADDR_ANY;
>      sockaddr.sin_port = 0;
>      if (bind(s, (struct sockaddr *) &sockaddr, sockaddr_len) < 0) {
>          perror("bind");
> -        exit(1);
> +        exit(EXIT_FAILURE);
>      }
>  
>      if (getsockname(s, (struct sockaddr *) &sockaddr, &sockaddr_len) < 0) {
>          perror("getsockname");
> -        exit(1);
> +        exit(EXIT_FAILURE);
>      }
>  
>      port = (int) ntohs(sockaddr.sin_port);
> @@ -431,14 +463,31 @@
>  
>  
>      if (!show_on) {
> -	signal(SIGHUP, cleanup_handler);
> -	signal(SIGINT, cleanup_handler);
> -	signal(SIGTSTP, nostop_handler);
> -	signal(SIGCHLD, child_handler);
> -	signal(SIGALRM, alarm_handler);
> +	struct sigaction signal_handler;
> +	signal_handler.sa_handler = cleanup_handler;
> +	sigfillset(&signal_handler.sa_mask);
> +	signal_handler.sa_flags = 0;
> +
> +	sigaction(SIGHUP, &signal_handler, NULL);
> +	sigaction(SIGINT, &signal_handler, NULL);
> +	sigaction(SIGTERM, &signal_handler, NULL);
> +
> +	signal_handler.sa_handler = nostop_handler;
> +
> +	sigaction(SIGTSTP, &signal_handler, NULL);
> +
> +	signal_handler.sa_handler = alarm_handler;
> +
> +	sigaction(SIGALRM, &signal_handler, NULL);
> +
> +	signal_handler.sa_handler = child_handler;
> +	sigemptyset(&signal_handler.sa_mask);
> +
> +	sigaction(SIGCHLD, &signal_handler, NULL);
>      }
>  
>      alarm(1000);
> +    alarm_msg = "Timeout during client startup.\n";
>      /* long timeout for testing, where process may be stopped in debugger */
>  
>  #ifdef USE_DDD
> @@ -511,7 +560,7 @@
>      }
>  
>      if (show_on)
> -        exit(0);
> +        exit(EXIT_SUCCESS);
>      
>      /*Hostid exchange start */
>      /* accept incoming connections, read port numbers */
> @@ -522,6 +571,9 @@
>  ACCEPT_HID:
>          sockaddr_len = sizeof(sockaddr);
>          s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
> +
> +	alarm_msg = "Timeout during hostid exchange.\n";
> +
>          if (s1 < 0) {
>              if (errno == EINTR)
>                  goto ACCEPT_HID;
> @@ -592,7 +644,7 @@
>              hostids = (int *) malloc(hostidlen * nprocs);
>              if (hostids == NULL) {
>                  perror("malloc");
> -                exit(1);
> +                exit(EXIT_FAILURE);
>              }
>          }
>  
> @@ -626,66 +678,33 @@
>          }
>      }
>  
> -    /* close all opend sockets */
> -    for (i = 0; i < nprocs; i++) {
> -        close(plist[i].control_socket);
> -    }
> -
>      alarm(1000); 
> -    /* let enbale the timer again*/
> +    alarm_msg = "Timeout during address exchange.\n";
> +    /* lets enable the timer again*/
>  
>      /* Lets read all other information, LID QP,etc..*/
>  
>      /* accept incoming connections, read port numbers */
>      for (i = 0; i < nprocs; i++) {
> -        int version, rank, nread;
> -        char pidstr[12];
> -ACCEPT:
> -        sockaddr_len = sizeof(sockaddr);
> -        s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
> -        if (s1 < 0) {
> -            if (errno == EINTR)
> -                goto ACCEPT;
> -            perror("accept");
> -            cleanup();
> -        }
> +        int nread;
>  
>          /* 
>           * protocol: 
> -         *  We don't need version number,
> -         *  0. read rank of process
> -         *  1. read address length
> -         *  2. read address itself
> -         *  3. send array of all addresses
> +         *  We don't need the version number or the rank,
> +         *  0. read address length
> +         *  1. read address itself
> +         *  2. send array of all addresses
>           */
>  
> -        /* 0. Find out who we're talking to */
> -        nread = read(s1, &rank, sizeof(rank));
> -        if (nread != sizeof(rank)) {
> -            perror("read");
> -            cleanup();
> -        }
> -
> -        if (rank < 0 || rank >= nprocs 
> -                || plist[rank].state != P_STARTED) {
> -            fprintf(stderr, "mpirun: invalid rank received. \n");
> -            cleanup();
> -        }
> -
> -        plist[rank].control_socket = s1;
> -        plist[rank].state = P_CONNECTED;
> +        plist[i].state = P_CONNECTED;
>  
>          /* Let us know connection was established
>           * printf("MPIRUN_RSH: Process rank %d connected\n",rank);
>           */
>  
>          /* 1. Find out length of the data */
> -        nread = read(s1, &addrlen, sizeof(addrlen));
> +        nread = read(plist[i].control_socket, &addrlen, sizeof(addrlen));
>          if (nread != sizeof(addrlen)) {
> -            /* nread == 0 is not actually an error! */
> -            if (nread == 0) 
> -                continue;
> -
>              perror("read");
>              cleanup();
>          }
> @@ -707,21 +726,20 @@
>              alladdrs = (int *) malloc(addrlen * nprocs);
>              if (alladdrs == NULL) {
>                  perror("malloc");
> -                exit(1);
> +                exit(EXIT_FAILURE);
>              }
>          }
>  
>          /* 2. Read info from each process */
>  
>          /* for byte location */
> -        alladdrs_char = (char *) &alladdrs[rank * addrlen / sizeof(int)];
> +        alladdrs_char = (char *) &alladdrs[i * addrlen / sizeof(int)];
>  
>          tot_nread = 0;
>  
>          while (tot_nread < addrlen) {
> -            nread =
> -                read(s1, (void *) (alladdrs_char + tot_nread),
> -                        addrlen - tot_nread);
> +            nread = read(plist[i].control_socket,
> +		(void *) (alladdrs_char + tot_nread), addrlen - tot_nread);
>  
>              if (nread < 0) {
>                  perror("read");
> @@ -733,36 +751,32 @@
>  
>  read_pid:
>          /* 3. Find out length of the data */
> -        nread = read(s1, &pidlen, sizeof(pidlen));
> +        nread = read(plist[i].control_socket, &pidlen, sizeof(pidlen));
>          if (nread != sizeof(pidlen)) {
>              perror("read");
>              cleanup();
>          }
>  
>          /*fprintf(stderr, "read Pid lengths %d and %d \n", pidlen, nread);*/
> -        if (i == 0) {
> -            pidglen = pidlen;
> -        } else {
> -            if (pidlen != pidglen) {
> -                fprintf(stderr, "Pid lengths %d and %d do not match\n", 
> -                        pidlen, pidglen);
> -                cleanup();
> -            }
> -        }
> +	if (pidlen != pidglen) {
> +	  fprintf(stderr, "Pid lengths %d and %d do not match\n", 
> +	      pidlen, pidglen);
> +	  cleanup();
> +	}
>  
>          if (i == 0) {
> -            /* allocate as soon as we know the address length */
> +            /* allocate as soon as we know the pid length */
>              allpids = (char *)malloc(pidlen * nprocs);
>              if (allpids == NULL) {
>                  perror("malloc");
> -                exit(1);
> +                exit(EXIT_FAILURE);
>              }
>          }
>  
>          tot_nread=0;
>          while(tot_nread < pidlen) {
> -            nread = read(s1, (void*)(allpids+rank*pidlen+tot_nread), 
> -                    pidlen - tot_nread);
> +            nread = read(plist[i].control_socket,
> +		(void*)(allpids+i*pidlen+tot_nread), pidlen - tot_nread);
>              /*fprintf(stderr, "read length %d \n", nread);*/
>              if(nread < 0) {
>                  perror("read");
> @@ -770,6 +784,9 @@
>              }
>              tot_nread += nread;
>          }
> +
> +	plist[i].remote_pid = *((pid_t *)(allpids+i*pidlen));
> +	pglist_insert(plist[i].hostname, plist[i].remote_pid);
>      }
>  
>  
> @@ -795,7 +812,7 @@
>      out_addrs = (int *) malloc(out_addrs_len);
>      if (out_addrs == NULL) {
>          perror("malloc");
> -        exit(1);
> +        exit(EXIT_FAILURE);
>      }
>  
>      for (i = 0; i < nprocs; i++) {
> @@ -876,8 +893,7 @@
>          sleep(100);
>      }
>      close(s);
> -    exit(0);
> -
> +    exit(EXIT_SUCCESS);
>  }
>  
>  int start_process(int i, char *command_name, char *env)
> @@ -925,12 +941,12 @@
>      if ((remote_command = malloc(str_len)) == NULL) {
>          fprintf(stderr, "Failed to malloc %d bytes for remote_command\n",
>                  str_len);
> -        exit(1);
> +        exit(EXIT_FAILURE);
>      }
>      if ((xterm_command = malloc(str_len)) == NULL) {
>          fprintf(stderr, "Failed to malloc %d bytes for xterm_command\n",
>                  str_len);
> -        exit(1);
> +        exit(EXIT_FAILURE);
>      }
>  
>  
> @@ -1010,7 +1026,7 @@
>          if (!show_on) {
>              perror("RSH/SSH command failed!");
>          }
> -        exit(1);
> +        exit(EXIT_FAILURE);
>      }
>  
>      free(remote_command);
> @@ -1189,8 +1205,6 @@
>      fprintf(stderr, "\ta.out      => " "name of MPI binary\n");
>      fprintf(stderr, "\targs       => " "arguments for MPI binary\n");
>      fprintf(stderr, "\n");
> -
> -    exit(1);
>  }
>  
>  /* finds first non-whitespace char in input string */
> @@ -1221,7 +1235,7 @@
>      if (hf == NULL) {
>          fprintf(stderr, "Can't open hostfile %s\n", hostfile_name);
>          perror("open");
> -        exit(1);
> +        exit(EXIT_FAILURE);
>      }
>      
>      for (i = 0; i < nprocs; i++) {
> @@ -1287,7 +1301,7 @@
>          } else {
>              fprintf(stderr, "End of file reached on "
>                      "hostfile at %d of %d hostnames\n", i, nprocs);
> -            exit(1);
> +            exit(EXIT_FAILURE);
>          }
>      }
>      fclose(hf);
> @@ -1321,14 +1335,14 @@
>      if ((pf = fopen(paramfile, "r")) == NULL) {
>          sprintf(errstr, "Cant open paramfile = %s", paramfile);
>          perror(errstr);
> -        exit(1);
> +        exit(EXIT_FAILURE);
>      }
>  
>      if ( strlen(env) == 0 ){
>  	    /* Allocating space for env first time */
>  	    if ((env = malloc(ENV_LEN)) == NULL) {
>  		    fprintf(stderr, "Malloc of env failed in read_param_file\n");
> -		    exit(1);
> +		    exit(EXIT_FAILURE);
>  	    }
>  	    env_left = ENV_LEN - 1;
>      }else{
> @@ -1367,7 +1381,7 @@
>                  (ENV_LEN > e_len + 1 ? ENV_LEN : e_len + 1) + strlen(env);
>              if ((env = realloc(env, newlen)) == NULL) {
>                  fprintf(stderr, "realloc failed in read_param_file\n");
> -                exit(1);
> +                exit(EXIT_FAILURE);
>              }
>              if (param_debug) {
>                  printf("realloc to %d\n", newlen);
> @@ -1395,15 +1409,213 @@
>      }
>      cleanup();
>  
> -    exit(1);
> +    exit(EXIT_FAILURE);
> +}
> +
> +void pglist_print(void) {
> +    if(pglist) {
> +	int i, j;
> +	size_t npids = 0, npids_allocated = 0;
> +
> +	fprintf(stderr, "\n--pglist--\ndata:\n");
> +	for(i = 0; i < pglist->npgs; i++) {
> +	    fprintf(stderr, "%p - %s:", &pglist->data[i],
> +		    pglist->data[i].hostname);
> +
> +	    for(j = 0; j < pglist->data[i].npids; j++) {
> +		fprintf(stderr, " %d", pglist->data[i].pids[j]);
> +	    }
> +
> +	    fprintf(stderr, "\n");
> +	    npids	    += pglist->data[i].npids;
> +	    npids_allocated += pglist->data[i].npids_allocated;
> +	}
> +
> +	fprintf(stderr, "\nindex:");
> +	for(i = 0; i < pglist->npgs; i++) {
> +	    fprintf(stderr, " %p", pglist->index[i]);
> +	}
> +
> +	fprintf(stderr, "\nnpgs/allocated: %d/%d (%d%%)\n", pglist->npgs,
> +		pglist->npgs_allocated, (int)(pglist->npgs_allocated ? 100. *
> +		    pglist->npgs / pglist->npgs_allocated : 100.));
> +	fprintf(stderr, "npids/allocated: %d/%d (%d%%)\n", npids,
> +		npids_allocated, (int)(npids_allocated ? 100. * npids /
> +		    npids_allocated : 100.));
> +	fprintf(stderr, "--pglist--\n\n");
> +    }
> +}
> +
> +void pglist_insert(const char * const hostname, const pid_t const pid) {
> +    const size_t increment = nprocs > 4 ? nprocs / 4 : 1;
> +    size_t index = 0, bottom = 0, top;
> +    static size_t alloc_error = 0;
> +    int i, strcmp_result;
> +    process_group * pg;
> +    void * backup_ptr;
> +
> +    if(alloc_error) return;
> +    if(pglist == NULL) goto init_pglist;
> +
> +    top = pglist->npgs - 1;
> +    index = (top + bottom) / 2;
> +
> +    while(strcmp_result = strcmp(hostname, pglist->index[index]->hostname)) {
> +	if(bottom >= top) break;
> +
> +	if(strcmp_result > 0) {
> +	    bottom = index + 1;
> +	}
> +
> +	else {
> +	    top = index - 1;
> +	}
> +
> +	index = (top + bottom) / 2;
> +    }
> +
> +    if(!strcmp_result) goto insert_pid;
> +    if(strcmp_result > 0) index++;
> +
> +    goto add_process_group;
> +
> +init_pglist:
> +    pglist = malloc(sizeof(process_groups));
> +
> +    if(pglist) {
> +	pglist->data		= NULL;
> +	pglist->index		= NULL;
> +	pglist->npgs		= 0;
> +	pglist->npgs_allocated	= 0;
> +    }
> +
> +    else {
> +	goto register_alloc_error;
> +    }
> +
> +add_process_group:
> +    if(pglist->npgs == pglist->npgs_allocated) {
> +	process_group * pglist_data_backup	= pglist->data;
> +	process_group ** pglist_index_backup	= pglist->index;
> +	ptrdiff_t offset;
> +
> +	pglist->npgs_allocated += increment;
> +
> +	backup_ptr = pglist->data;
> +	pglist->data = realloc(pglist->data, sizeof(process_group) *
> +		pglist->npgs_allocated);
> +
> +	if(pglist->data == NULL) {
> +	    pglist->data = backup_ptr;
> +	    goto register_alloc_error;
> +	}
> +
> +	backup_ptr = pglist->index;
> +	pglist->index = realloc(pglist->index, sizeof(process_group *) *
> +		pglist->npgs_allocated);
> +
> +	if(pglist->index == NULL) {
> +	    pglist->index = backup_ptr;
> +	    goto register_alloc_error;
> +	}
> +
> +	if(offset = (size_t)pglist->data - (size_t)pglist_data_backup) { 
> +	    for(i = 0; i < pglist->npgs; i++) {
> +		pglist->index[i] = (process_group *)((size_t)pglist->index[i] +
> +			offset);
> +	    }
> +	}
> +    }
> +
> +    for(i = pglist->npgs; i > index; i--) {
> +	pglist->index[i] = pglist->index[i-1];
> +    }
> +
> +    pglist->data[pglist->npgs].hostname		= hostname;
> +    pglist->data[pglist->npgs].pids		= NULL;
> +    pglist->data[pglist->npgs].npids		= 0;
> +    pglist->data[pglist->npgs].npids_allocated	= 0;
> +
> +    pglist->index[index] = &pglist->data[pglist->npgs++];
> +
> +insert_pid:
> +    pg = pglist->index[index];
> +
> +    if(pg->npids == pg->npids_allocated) {
> +	if(pg->npids_allocated) {
> +	    pg->npids_allocated <<= 1;
> +
> +	    if(pg->npids_allocated < pg->npids) pg->npids_allocated = SIZE_MAX;
> +	    if(pg->npids_allocated > nprocs) pg->npids_allocated = nprocs;
> +	}
> +
> +	else {
> +	    pg->npids_allocated = 1;
> +	}
> +
> +	backup_ptr = pg->pids;
> +	pg->pids = realloc(pg->pids, pg->npids_allocated * sizeof(pid_t));
> +
> +	if(pg->pids == NULL) {
> +	    pg->pids = backup_ptr;
> +	    goto register_alloc_error;
> +	}
> +    }
> +
> +    pg->pids[pg->npids++] = pid;
> +
> +    return;
> +
> +register_alloc_error:
> +    if(pglist) {
> +	if(pglist->data) {
> +	    process_group * pg = pglist->data;
> +
> +	    while(pglist->npgs--) {
> +		if(pg->pids) free((pg++)->pids);
> +	    }
> +
> +	    free(pglist->data);
> +	}
> +
> +	if(pglist->index) free(pglist->index);
> +
> +	free(pglist);
> +    }
> +
> +    alloc_error = 1;
> +}
> +
> +void free_memory(void) {
> +    if(pglist) {
> +	if(pglist->data) {
> +	    process_group * pg = pglist->data;
> +
> +	    while(pglist->npgs--) {
> +		if(pg->pids) free((pg++)->pids);
> +	    }
> +
> +	    free(pglist->data);
> +	}
> +
> +	if(pglist->index) free(pglist->index);
> +
> +	free(pglist);
> +    }
> +
> +    if(plist) {
> +	while(nprocs--) {
> +	    if(plist[nprocs].device) free(plist[nprocs].device);
> +	    if(plist[nprocs].hostname) free(plist[nprocs].hostname);
> +	}
> +
> +	free(plist);
> +    }
>  }
>  
>  void cleanup(void)
>  {
>      int i;
> -    /* could walk through list of processes, but it looks
> -       like we can just send the signal to the process group
> -     */
>  
>      if (use_totalview) {
>  	fprintf(stderr, "Cleaning up all processes ...");
> @@ -1417,36 +1629,180 @@
>      }
>  
>      for (i = 0; i < nprocs; i++) {
> -        if (RUNNING(i)) {
> -            /* send terminal interrupt, which will hopefully 
> -               propagate to the other side. (not sure what xterm will
> -               do here.
> -             */
> -            kill(plist[i].pid, SIGINT);
> -        }
> +	if (RUNNING(i)) {
> +	    /* send terminal interrupt, which will hopefully 
> +	       propagate to the other side. (not sure what xterm will
> +	       do here.
> +	     */
> +	    kill(plist[i].pid, SIGINT);
> +	}
>      }
> +
>      sleep(1);
>  
>      for (i = 0; i < nprocs; i++) {
> -        if (plist[i].state != P_NOTSTARTED) {
> -            /* send regular interrupt to rsh */
> -            kill(plist[i].pid, SIGTERM);
> -        }
> +	if (plist[i].state != P_NOTSTARTED) {
> +	    /* send regular interrupt to rsh */
> +	    kill(plist[i].pid, SIGTERM);
> +	}
>      }
>  
>      sleep(1);
>  
>      for (i = 0; i < nprocs; i++) {
> -        if (plist[i].state != P_NOTSTARTED) {
> -            /* Kill the processes */
> -            kill(plist[i].pid, SIGKILL);
> -        }
> +	if (plist[i].state != P_NOTSTARTED) {
> +	    /* Kill the processes */
> +	    kill(plist[i].pid, SIGKILL);
> +	}
> +    }
> +
> +    if(pglist) {
> +	rkill_fast();
> +    }
> +
> +    else {
> +	rkill_linear();
> +    }
> +
> +    exit(EXIT_FAILURE);
> +}
> +
> +void rkill_fast(void) {
> +    int i, j, tryagain, spawned_pid[pglist->npgs];
> +
> +    fprintf(stderr, "Killing remote processes...");
> +
> +    for(i = 0; i < pglist->npgs; i++) {
> +	if(0 == (spawned_pid[i] = fork())) {
> +	    if(pglist->index[i]->npids) {
> +		const size_t bufsize = 40 + 10 * pglist->index[i]->npids;
> +		const process_group * pg = pglist->index[i];
> +		char kill_cmd[bufsize], tmp[10];
> +
> +		kill_cmd[0] = '\0';
> +		strcat(kill_cmd, "kill -s SIGKILL");
> +
> +		for(j = 0; j < pg->npids; j++) {
> +		    snprintf(tmp, 10, " %d", pg->pids[j]);
> +		    strcat(kill_cmd, tmp);
> +		}
> +
> +		strcat(kill_cmd, " >&/dev/null");
> +
> +		if(use_rsh) {
> +		    execl(RSH_CMD, RSH_CMD, pg->hostname, kill_cmd, NULL);
> +		}
> +
> +		else {
> +		    execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x", pg->hostname,
> +			    kill_cmd, NULL);
> +		}
> +
> +		perror(NULL);
> +		exit(EXIT_FAILURE);
> +	    }
> +
> +	    else {
> +		exit(EXIT_SUCCESS);
> +	    }
> +	}
> +    }
> +
> +    while(1) {
> +	static int iteration = 0;
> +	tryagain = 0;
> +
> +	sleep(1 << iteration);
> +
> +	for (i = 0; i < pglist->npgs; i++) {
> +	    if(spawned_pid[i]) {
> +		if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, WNOHANG))) {
> +		    tryagain = 1;
> +		}
> +	    }
> +	}
> +
> +	if(++iteration == 5 || !tryagain) {
> +	    fprintf(stderr, "DONE\n");
> +	    break;
> +	}
> +    }
> +
> +    if(tryagain) {
> +	fprintf(stderr, "The following processes may have not been killed:\n");
> +	for (i = 0; i < pglist->npgs; i++) {
> +	    if(spawned_pid[i]) {
> +		const process_group * pg = pglist->index[i];
> +
> +		fprintf(stderr, "%s:", pg->hostname);
> +
> +		for (j = 0; j < pg->npids; j++) {
> +		    fprintf(stderr, " %d", pg->pids[j]);
> +		}
> +
> +		fprintf(stderr, "\n");
> +	    }
> +	}
> +    }
> +}
> +
> +void rkill_linear(void) {
> +    int i, j, tryagain, spawned_pid[nprocs];
> +
> +    fprintf(stderr, "Killing remote processes...");
> +
> +    for (i = 0; i < nprocs; i++) {
> +	if(0 == (spawned_pid[i] = fork())) {
> +	    char kill_cmd[80];
> +
> +	    if(!plist[i].remote_pid) exit(EXIT_SUCCESS);
> +
> +	    snprintf(kill_cmd, 80, "kill -s SIGKILL %d >&/dev/null",
> +		plist[i].remote_pid);
> +
> +	    if(use_rsh) {
> +		execl(RSH_CMD, RSH_CMD, plist[i].hostname, kill_cmd, NULL);
> +	    }
> +
> +	    else {
> +		execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x",
> +			plist[i].hostname, kill_cmd, NULL);
> +	    }
> +
> +	    perror(NULL);
> +	    exit(EXIT_FAILURE);
> +	}
>      }
>  
> -    fprintf(stderr, "done.\n");
> +    while(1) {
> +	static int iteration = 0;
> +	tryagain = 0;
> +
> +	sleep(1 << iteration);
> +
> +	for (i = 0; i < nprocs; i++) {
> +	    if(spawned_pid[i]) {
> +		if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, WNOHANG))) {
> +		    tryagain = 1;
> +		}
> +	    }
> +	}
>  
> -    exit(1);
> +	if(++iteration == 5 || !tryagain) {
> +	    fprintf(stderr, "DONE\n");
> +	    break;
> +	}
> +    }
>  
> +    if(tryagain) {
> +	fprintf(stderr, "The following processes may have not been killed:\n");
> +	for (i = 0; i < nprocs; i++) {
> +	    if(spawned_pid[i]) {
> +		fprintf(stderr, "%s [%d]\n", plist[i].hostname,
> +			plist[i].remote_pid);
> +	    }
> +	}
> +    }
>  }
>  
>  
> @@ -1457,9 +1813,13 @@
>  
>  void alarm_handler(int signal)
>  {
> +    extern const char * alarm_msg;
> +
>      if (use_totalview) {
>  	fprintf(stderr, "Timeout alarm signaled\n");
>      }
> +
> +    if(alarm_msg) fprintf(stderr, alarm_msg);
>      cleanup();
>  }
>  
> @@ -1467,19 +1827,21 @@
>  void child_handler(int signal)
>  {
>      int status, i, child, pid;
> -    int exitstatus = 0;
> +    int exitstatus = EXIT_SUCCESS;
>  
>      if (use_totalview) {
>  	fprintf(stderr, "mpirun: child died. Waiting for others.\n"); 
>      }
>      alarm(10);
> +    alarm_msg = "Child died. Timeout while waiting for others.\n";
> +
>      for (i = 0; i < nprocs; i++) {
>          pid = wait(&status);
>          if (pid == -1) {
>              perror("wait");
> -            exitstatus = 1;
> +            exitstatus = EXIT_FAILURE;
>          } else if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
> -            exitstatus = 1;
> +            exitstatus = EXIT_FAILURE;
>          }
>          for (child = 0; child < nprocs; child++) {
>              if (plist[child].pid == pid) {
> @@ -1489,9 +1851,11 @@
>          }
>          if (child == nprocs) {
>              fprintf(stderr, "Unable to find child %d!\n", pid);
> -            exitstatus = 1;
> +            exitstatus = EXIT_FAILURE;
>          }
>      }
>      alarm(0);
>      exit(exitstatus);
>  }
> +
> +/* vi:set sw=4 sts=4 tw=80: */
> diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client.h exp1/mpid/ch_gen2/process/pmgr_client.h
> --- 0.9.9/mpid/ch_gen2/process/pmgr_client.h	2007-05-29 03:47:10.000000000 -0400
> +++ exp1/mpid/ch_gen2/process/pmgr_client.h	2007-07-02 12:59:51.000000000 -0400
> @@ -108,6 +108,6 @@
>   * of the spawner, e.g. mpirun_rsh, to check that it understands
>   * the version of the executable.
>   */
> -#define PMGR_VERSION 5
> +#define PMGR_VERSION 6
>  
>  #endif
> diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c
> --- 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c	2007-05-29 03:47:10.000000000 -0400
> +++ exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c	2007-07-02 12:59:51.000000000 -0400
> @@ -171,6 +171,9 @@
>      int  nwritten;
>      int version;
>      struct sockaddr_in sockaddr;
> +
> +    if(phase != 0) return;
> +
>     /*
>       * Exchange information with the mpirun program. Send it our
>       * socket address, get back addresses for our siblings.
> @@ -208,14 +211,12 @@
>       */
>  
>      version = PMGR_VERSION;
> -    if (0 == phase) {
> -        /* first, send a version number */
> -        nwritten = write(mpirun_socket, &version, sizeof(version));
> -        if (nwritten != sizeof(version)) {
> -            sleep(2);
> -            perror("write");
> -            exit(1);
> -        }
> +    /* first, send a version number */
> +    nwritten = write(mpirun_socket, &version, sizeof(version));
> +    if (nwritten != sizeof(version)) {
> +	sleep(2);
> +	perror("write");
> +	exit(1);
>      }
>      
>      /* next, send our rank */
> @@ -264,7 +265,6 @@
>          tot_nread = tot_nread + nread;
>      }
>      fflush(stdout);
> -    close(mpirun_socket);
>      return 1;
>  }
>  
> @@ -280,7 +280,6 @@
>      pid_t *ppids = (pid_t *)pallpids;
>      pid_t *allpids = NULL;
>  
> -    pmgr_init_connection(1);
>      /* next, send size of addr */
>      nwritten = write(mpirun_socket, &addrlen, sizeof(addrlen));
>      if (nwritten != sizeof(addrlen)) {
> @@ -314,7 +313,7 @@
>         exit(1);
>      }
>  
> -    /* next, send size of addr */
> +    /* next, send size of pid */
>      nwritten = write(mpirun_socket, &pidlen, sizeof(pidlen));
>      if (nwritten != sizeof(mypid_len)) {
>        sleep(2);
> @@ -322,6 +321,7 @@
>        exit(1);
>      }
>  
> +    /* next, send our pid */
>      if (pidlen != 0) {
>        nwritten = write(mpirun_socket, &my_pid_int, (size_t) pidlen);
>        if (nwritten != pidlen) {
> @@ -345,7 +345,7 @@
>  
>      if (pidlen != 0) {
>         	tot_nread=0;
> -       	/* finally, read addresses from all processes */
> +       	/* finally, read pids from all processes */
>         	while (tot_nread < pmgr_nprocs*pidlen) {
>  	    nread = read(mpirun_socket, (void*)((char *)allpids+tot_nread), 
>                      (size_t) ((pmgr_nprocs*pidlen)-tot_nread));

-- 
***********************************
 >> Mark J. Potts, PhD
 >>
 >> HPC Applications Inc.
 >> phone: 410-992-8360 Bus
 >>        410-313-9318 Home
 >>        443-418-4375 Cell
 >> email: potts at hpcapplications.com
 >>        potts at excray.com
***********************************


More information about the mvapich-discuss mailing list