[mvapich-discuss] mvapich jobs cleanup

Sayantan Sur surs at cse.ohio-state.edu
Tue Jul 10 13:58:55 EDT 2007


Hi Mark,

We have a patch to solve this stray process issue with MVAPICH-0.9.9. 
I'm attaching the patch with this email. To apply the patch please 
follow these steps:

$ cd mvapich-0.9.9
$ #save mpirun_rsh_patch to this directory
$ patch -p1 < mpirun_rsh_patch

Could you please let us know if this patch solves the problem for you?

Thanks,
Sayantan.


Mark Potts wrote:
> Hi,
>    We are observing a number of cases in which MVAPICH-0.9.9
>    jobs launched with mpirun_rsh leave stray processes on some
>    nodes when the job terminates abnormally.  Those stray
>    processes continue to run forever and require recognition
>    and killing.
>
>    Is there a reason this happens with MVAPICH, and is there a
>    way to prevent it.  This doesn't seem to be the behavior
>    that occurs for abnormally terminated Voltaire MPI or Intel
>    MPI jobs.
>          regards,


-- 
http://www.cse.ohio-state.edu/~surs

-------------- next part --------------
diff -ruN 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c exp1/mpid/ch_gen2/process/mpirun_rsh.c
--- 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c	2007-05-29 03:47:10.000000000 -0400
+++ exp1/mpid/ch_gen2/process/mpirun_rsh.c	2007-07-09 11:56:32.000000000 -0400
@@ -59,6 +59,7 @@
 #define _GNU_SOURCE
 #include <getopt.h>
 #include <stdlib.h>
+#include <stddef.h>
 #include <stdio.h>
 #include <signal.h>
 #include <unistd.h>
@@ -91,20 +92,34 @@
 typedef struct {
     char *hostname;
     char *device;
-    int pid;
+    pid_t pid;
+    pid_t remote_pid;
     int port;
     int control_socket;
     process_state state;
 } process;
 
+typedef struct {
+    const char * hostname;
+    pid_t * pids;
+    size_t npids, npids_allocated;
+} process_group;
+
+typedef struct {
+    process_group * data;
+    process_group ** index;
+    size_t npgs, npgs_allocated;
+} process_groups;
+
 #define RUNNING(i) ((plist[i].state == P_STARTED ||                 \
             plist[i].state == P_CONNECTED ||                        \
             plist[i].state == P_RUNNING) ? 1 : 0)
 
 /* other information: a.out and rank are implicit. */
 
-process *plist;
-int nprocs;
+process_groups * pglist = NULL;
+process * plist = NULL;
+int nprocs = 0;
 int aout_index, port;
 #define MAX_WD_LEN 256
 char wd[MAX_WD_LEN];            /* working directory of current process */
@@ -112,11 +127,19 @@
 char mpirun_host[MAX_HOST_LEN]; /* hostname of current process */
 /* xxx need to add checking for string overflow, do this more carefully ... */
 
+/*
+ * Message notifying user of what timed out
+ */
+static const char * alarm_msg = NULL;
 
 #define COMMAND_LEN 2000
 #define SEPARATOR ':'
 
-
+void free_memory(void);
+void pglist_print(void);
+void pglist_insert(const char * const, const pid_t const);
+void rkill_fast(void);
+void rkill_linear(void);
 void cleanup_handler(int);
 void nostop_handler(int);
 void alarm_handler(int);
@@ -239,15 +262,19 @@
     int    hostname_len = 0;
     totalview_cmd[199] = 0;
     display[0]='\0';	
-     
+    pidglen = sizeof(pid_t); 
+
     /* mpirun [-debug] [-xterm] -np N [-hostfile hfile | h1 h2 h3 ... hN] a.out [args] */
 
+    atexit(free_memory);
+
     do {
         c = getopt_long_only(argc, argv, "+", option_table, &option_index);
         switch (c) {
         case '?':
         case ':':
             usage();
+	    exit(EXIT_FAILURE);
             break;
         case EOF:
             break;
@@ -255,8 +282,10 @@
             switch (option_index) {
             case 0:
                 nprocs = atoi(optarg);
-                if (nprocs < 1)
+                if (nprocs < 1) {
                     usage();
+		    exit(EXIT_FAILURE);
+		}
                 break;
             case 1:
                 debug_on = 1;
@@ -290,11 +319,11 @@
             case 8:
                 show_version();
                 usage();
-                exit(0);
+                exit(EXIT_SUCCESS);
                 break;
             case 9:
                 show_version();
-                exit(0);
+                exit(EXIT_SUCCESS);
                 break;
  	    case 10:
  		use_totalview = 1;
@@ -311,17 +340,19 @@
   		break;
             case 11:
                 usage();
-                exit(0);
+                exit(EXIT_SUCCESS);
                 break;
             default:
                 fprintf(stderr, "Unknown option\n");
                 usage();
+		exit(EXIT_FAILURE);
                 break;
             }
             break;
         default:
             fprintf(stderr, "Unreachable statement!\n");
             usage();
+	    exit(EXIT_FAILURE);
             break;
         }
     } while (c != EOF);
@@ -332,7 +363,7 @@
             fprintf(stderr, "Without hostfile option, hostnames must be "
                     "specified on command line.\n");
             usage();
-            exit(1);
+            exit(EXIT_FAILURE);
         }
         aout_index = nprocs + optind;
     } else {
@@ -361,13 +392,14 @@
     plist = malloc(nprocs * sizeof(process));
     if (plist == NULL) {
         perror("malloc");
-        exit(1);
+        exit(EXIT_FAILURE);
     }
 
     for (i = 0; i < nprocs; i++) {
         plist[i].state = P_NOTSTARTED;
         plist[i].device = NULL;
         plist[i].port = -1;
+	plist[i].remote_pid = 0;
     }
 
     /* grab hosts from command line or file */
@@ -376,7 +408,7 @@
         hostname_len = read_hostfile(hostfile);
     } else {
         for (i = 0; i < nprocs; i++) {
-            plist[i].hostname = argv[optind + i];
+            plist[i].hostname = (char *)strndup(argv[optind + i], 100);
     	    hostname_len = hostname_len > strlen(plist[i].hostname) ?
     		    hostname_len : strlen(plist[i].hostname); 
         }
@@ -388,7 +420,7 @@
         
         if (!mpirun_processes) {
             perror("malloc");
-            exit(1);
+            exit(EXIT_FAILURE);
         } else { 
             memset(mpirun_processes, 0, nprocs * (hostname_len + 4));
         }
@@ -412,18 +444,18 @@
     s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
     if (s < 0) {
         perror("socket");
-        exit(1);
+        exit(EXIT_FAILURE);
     }
     sockaddr.sin_addr.s_addr = INADDR_ANY;
     sockaddr.sin_port = 0;
     if (bind(s, (struct sockaddr *) &sockaddr, sockaddr_len) < 0) {
         perror("bind");
-        exit(1);
+        exit(EXIT_FAILURE);
     }
 
     if (getsockname(s, (struct sockaddr *) &sockaddr, &sockaddr_len) < 0) {
         perror("getsockname");
-        exit(1);
+        exit(EXIT_FAILURE);
     }
 
     port = (int) ntohs(sockaddr.sin_port);
@@ -431,14 +463,31 @@
 
 
     if (!show_on) {
-	signal(SIGHUP, cleanup_handler);
-	signal(SIGINT, cleanup_handler);
-	signal(SIGTSTP, nostop_handler);
-	signal(SIGCHLD, child_handler);
-	signal(SIGALRM, alarm_handler);
+	struct sigaction signal_handler;
+	signal_handler.sa_handler = cleanup_handler;
+	sigfillset(&signal_handler.sa_mask);
+	signal_handler.sa_flags = 0;
+
+	sigaction(SIGHUP, &signal_handler, NULL);
+	sigaction(SIGINT, &signal_handler, NULL);
+	sigaction(SIGTERM, &signal_handler, NULL);
+
+	signal_handler.sa_handler = nostop_handler;
+
+	sigaction(SIGTSTP, &signal_handler, NULL);
+
+	signal_handler.sa_handler = alarm_handler;
+
+	sigaction(SIGALRM, &signal_handler, NULL);
+
+	signal_handler.sa_handler = child_handler;
+	sigemptyset(&signal_handler.sa_mask);
+
+	sigaction(SIGCHLD, &signal_handler, NULL);
     }
 
     alarm(1000);
+    alarm_msg = "Timeout during client startup.\n";
     /* long timeout for testing, where process may be stopped in debugger */
 
 #ifdef USE_DDD
@@ -511,7 +560,7 @@
     }
 
     if (show_on)
-        exit(0);
+        exit(EXIT_SUCCESS);
     
     /*Hostid exchange start */
     /* accept incoming connections, read port numbers */
@@ -522,6 +571,9 @@
 ACCEPT_HID:
         sockaddr_len = sizeof(sockaddr);
         s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
+
+	alarm_msg = "Timeout during hostid exchange.\n";
+
         if (s1 < 0) {
             if (errno == EINTR)
                 goto ACCEPT_HID;
@@ -592,7 +644,7 @@
             hostids = (int *) malloc(hostidlen * nprocs);
             if (hostids == NULL) {
                 perror("malloc");
-                exit(1);
+                exit(EXIT_FAILURE);
             }
         }
 
@@ -626,66 +678,33 @@
         }
     }
 
-    /* close all opend sockets */
-    for (i = 0; i < nprocs; i++) {
-        close(plist[i].control_socket);
-    }
-
     alarm(1000); 
-    /* let enbale the timer again*/
+    alarm_msg = "Timeout during address exchange.\n";
+    /* lets enable the timer again*/
 
     /* Lets read all other information, LID QP,etc..*/
 
     /* accept incoming connections, read port numbers */
     for (i = 0; i < nprocs; i++) {
-        int version, rank, nread;
-        char pidstr[12];
-ACCEPT:
-        sockaddr_len = sizeof(sockaddr);
-        s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
-        if (s1 < 0) {
-            if (errno == EINTR)
-                goto ACCEPT;
-            perror("accept");
-            cleanup();
-        }
+        int nread;
 
         /* 
          * protocol: 
-         *  We don't need version number,
-         *  0. read rank of process
-         *  1. read address length
-         *  2. read address itself
-         *  3. send array of all addresses
+         *  We don't need the version number or the rank,
+         *  0. read address length
+         *  1. read address itself
+         *  2. send array of all addresses
          */
 
-        /* 0. Find out who we're talking to */
-        nread = read(s1, &rank, sizeof(rank));
-        if (nread != sizeof(rank)) {
-            perror("read");
-            cleanup();
-        }
-
-        if (rank < 0 || rank >= nprocs 
-                || plist[rank].state != P_STARTED) {
-            fprintf(stderr, "mpirun: invalid rank received. \n");
-            cleanup();
-        }
-
-        plist[rank].control_socket = s1;
-        plist[rank].state = P_CONNECTED;
+        plist[i].state = P_CONNECTED;
 
         /* Let us know connection was established
          * printf("MPIRUN_RSH: Process rank %d connected\n",rank);
          */
 
         /* 1. Find out length of the data */
-        nread = read(s1, &addrlen, sizeof(addrlen));
+        nread = read(plist[i].control_socket, &addrlen, sizeof(addrlen));
         if (nread != sizeof(addrlen)) {
-            /* nread == 0 is not actually an error! */
-            if (nread == 0) 
-                continue;
-
             perror("read");
             cleanup();
         }
@@ -707,21 +726,20 @@
             alladdrs = (int *) malloc(addrlen * nprocs);
             if (alladdrs == NULL) {
                 perror("malloc");
-                exit(1);
+                exit(EXIT_FAILURE);
             }
         }
 
         /* 2. Read info from each process */
 
         /* for byte location */
-        alladdrs_char = (char *) &alladdrs[rank * addrlen / sizeof(int)];
+        alladdrs_char = (char *) &alladdrs[i * addrlen / sizeof(int)];
 
         tot_nread = 0;
 
         while (tot_nread < addrlen) {
-            nread =
-                read(s1, (void *) (alladdrs_char + tot_nread),
-                        addrlen - tot_nread);
+            nread = read(plist[i].control_socket,
+		(void *) (alladdrs_char + tot_nread), addrlen - tot_nread);
 
             if (nread < 0) {
                 perror("read");
@@ -733,36 +751,32 @@
 
 read_pid:
         /* 3. Find out length of the data */
-        nread = read(s1, &pidlen, sizeof(pidlen));
+        nread = read(plist[i].control_socket, &pidlen, sizeof(pidlen));
         if (nread != sizeof(pidlen)) {
             perror("read");
             cleanup();
         }
 
         /*fprintf(stderr, "read Pid lengths %d and %d \n", pidlen, nread);*/
-        if (i == 0) {
-            pidglen = pidlen;
-        } else {
-            if (pidlen != pidglen) {
-                fprintf(stderr, "Pid lengths %d and %d do not match\n", 
-                        pidlen, pidglen);
-                cleanup();
-            }
-        }
+	if (pidlen != pidglen) {
+	  fprintf(stderr, "Pid lengths %d and %d do not match\n", 
+	      pidlen, pidglen);
+	  cleanup();
+	}
 
         if (i == 0) {
-            /* allocate as soon as we know the address length */
+            /* allocate as soon as we know the pid length */
             allpids = (char *)malloc(pidlen * nprocs);
             if (allpids == NULL) {
                 perror("malloc");
-                exit(1);
+                exit(EXIT_FAILURE);
             }
         }
 
         tot_nread=0;
         while(tot_nread < pidlen) {
-            nread = read(s1, (void*)(allpids+rank*pidlen+tot_nread), 
-                    pidlen - tot_nread);
+            nread = read(plist[i].control_socket,
+		(void*)(allpids+i*pidlen+tot_nread), pidlen - tot_nread);
             /*fprintf(stderr, "read length %d \n", nread);*/
             if(nread < 0) {
                 perror("read");
@@ -770,6 +784,9 @@
             }
             tot_nread += nread;
         }
+
+	plist[i].remote_pid = *((pid_t *)(allpids+i*pidlen));
+	pglist_insert(plist[i].hostname, plist[i].remote_pid);
     }
 
 
@@ -795,7 +812,7 @@
     out_addrs = (int *) malloc(out_addrs_len);
     if (out_addrs == NULL) {
         perror("malloc");
-        exit(1);
+        exit(EXIT_FAILURE);
     }
 
     for (i = 0; i < nprocs; i++) {
@@ -876,8 +893,7 @@
         sleep(100);
     }
     close(s);
-    exit(0);
-
+    exit(EXIT_SUCCESS);
 }
 
 int start_process(int i, char *command_name, char *env)
@@ -925,12 +941,12 @@
     if ((remote_command = malloc(str_len)) == NULL) {
         fprintf(stderr, "Failed to malloc %d bytes for remote_command\n",
                 str_len);
-        exit(1);
+        exit(EXIT_FAILURE);
     }
     if ((xterm_command = malloc(str_len)) == NULL) {
         fprintf(stderr, "Failed to malloc %d bytes for xterm_command\n",
                 str_len);
-        exit(1);
+        exit(EXIT_FAILURE);
     }
 
 
@@ -1010,7 +1026,7 @@
         if (!show_on) {
             perror("RSH/SSH command failed!");
         }
-        exit(1);
+        exit(EXIT_FAILURE);
     }
 
     free(remote_command);
@@ -1189,8 +1205,6 @@
     fprintf(stderr, "\ta.out      => " "name of MPI binary\n");
     fprintf(stderr, "\targs       => " "arguments for MPI binary\n");
     fprintf(stderr, "\n");
-
-    exit(1);
 }
 
 /* finds first non-whitespace char in input string */
@@ -1221,7 +1235,7 @@
     if (hf == NULL) {
         fprintf(stderr, "Can't open hostfile %s\n", hostfile_name);
         perror("open");
-        exit(1);
+        exit(EXIT_FAILURE);
     }
     
     for (i = 0; i < nprocs; i++) {
@@ -1287,7 +1301,7 @@
         } else {
             fprintf(stderr, "End of file reached on "
                     "hostfile at %d of %d hostnames\n", i, nprocs);
-            exit(1);
+            exit(EXIT_FAILURE);
         }
     }
     fclose(hf);
@@ -1321,14 +1335,14 @@
     if ((pf = fopen(paramfile, "r")) == NULL) {
         sprintf(errstr, "Cant open paramfile = %s", paramfile);
         perror(errstr);
-        exit(1);
+        exit(EXIT_FAILURE);
     }
 
     if ( strlen(env) == 0 ){
 	    /* Allocating space for env first time */
 	    if ((env = malloc(ENV_LEN)) == NULL) {
 		    fprintf(stderr, "Malloc of env failed in read_param_file\n");
-		    exit(1);
+		    exit(EXIT_FAILURE);
 	    }
 	    env_left = ENV_LEN - 1;
     }else{
@@ -1367,7 +1381,7 @@
                 (ENV_LEN > e_len + 1 ? ENV_LEN : e_len + 1) + strlen(env);
             if ((env = realloc(env, newlen)) == NULL) {
                 fprintf(stderr, "realloc failed in read_param_file\n");
-                exit(1);
+                exit(EXIT_FAILURE);
             }
             if (param_debug) {
                 printf("realloc to %d\n", newlen);
@@ -1395,15 +1409,213 @@
     }
     cleanup();
 
-    exit(1);
+    exit(EXIT_FAILURE);
+}
+
+void pglist_print(void) {
+    if(pglist) {
+	int i, j;
+	size_t npids = 0, npids_allocated = 0;
+
+	fprintf(stderr, "\n--pglist--\ndata:\n");
+	for(i = 0; i < pglist->npgs; i++) {
+	    fprintf(stderr, "%p - %s:", &pglist->data[i],
+		    pglist->data[i].hostname);
+
+	    for(j = 0; j < pglist->data[i].npids; j++) {
+		fprintf(stderr, " %d", pglist->data[i].pids[j]);
+	    }
+
+	    fprintf(stderr, "\n");
+	    npids	    += pglist->data[i].npids;
+	    npids_allocated += pglist->data[i].npids_allocated;
+	}
+
+	fprintf(stderr, "\nindex:");
+	for(i = 0; i < pglist->npgs; i++) {
+	    fprintf(stderr, " %p", pglist->index[i]);
+	}
+
+	fprintf(stderr, "\nnpgs/allocated: %d/%d (%d%%)\n", pglist->npgs,
+		pglist->npgs_allocated, (int)(pglist->npgs_allocated ? 100. *
+		    pglist->npgs / pglist->npgs_allocated : 100.));
+	fprintf(stderr, "npids/allocated: %d/%d (%d%%)\n", npids,
+		npids_allocated, (int)(npids_allocated ? 100. * npids /
+		    npids_allocated : 100.));
+	fprintf(stderr, "--pglist--\n\n");
+    }
+}
+
+void pglist_insert(const char * const hostname, const pid_t const pid) {
+    const size_t increment = nprocs > 4 ? nprocs / 4 : 1;
+    size_t index = 0, bottom = 0, top;
+    static size_t alloc_error = 0;
+    int i, strcmp_result;
+    process_group * pg;
+    void * backup_ptr;
+
+    if(alloc_error) return;
+    if(pglist == NULL) goto init_pglist;
+
+    top = pglist->npgs - 1;
+    index = (top + bottom) / 2;
+
+    while(strcmp_result = strcmp(hostname, pglist->index[index]->hostname)) {
+	if(bottom >= top) break;
+
+	if(strcmp_result > 0) {
+	    bottom = index + 1;
+	}
+
+	else {
+	    top = index - 1;
+	}
+
+	index = (top + bottom) / 2;
+    }
+
+    if(!strcmp_result) goto insert_pid;
+    if(strcmp_result > 0) index++;
+
+    goto add_process_group;
+
+init_pglist:
+    pglist = malloc(sizeof(process_groups));
+
+    if(pglist) {
+	pglist->data		= NULL;
+	pglist->index		= NULL;
+	pglist->npgs		= 0;
+	pglist->npgs_allocated	= 0;
+    }
+
+    else {
+	goto register_alloc_error;
+    }
+
+add_process_group:
+    if(pglist->npgs == pglist->npgs_allocated) {
+	process_group * pglist_data_backup	= pglist->data;
+	process_group ** pglist_index_backup	= pglist->index;
+	ptrdiff_t offset;
+
+	pglist->npgs_allocated += increment;
+
+	backup_ptr = pglist->data;
+	pglist->data = realloc(pglist->data, sizeof(process_group) *
+		pglist->npgs_allocated);
+
+	if(pglist->data == NULL) {
+	    pglist->data = backup_ptr;
+	    goto register_alloc_error;
+	}
+
+	backup_ptr = pglist->index;
+	pglist->index = realloc(pglist->index, sizeof(process_group *) *
+		pglist->npgs_allocated);
+
+	if(pglist->index == NULL) {
+	    pglist->index = backup_ptr;
+	    goto register_alloc_error;
+	}
+
+	if(offset = (size_t)pglist->data - (size_t)pglist_data_backup) { 
+	    for(i = 0; i < pglist->npgs; i++) {
+		pglist->index[i] = (process_group *)((size_t)pglist->index[i] +
+			offset);
+	    }
+	}
+    }
+
+    for(i = pglist->npgs; i > index; i--) {
+	pglist->index[i] = pglist->index[i-1];
+    }
+
+    pglist->data[pglist->npgs].hostname		= hostname;
+    pglist->data[pglist->npgs].pids		= NULL;
+    pglist->data[pglist->npgs].npids		= 0;
+    pglist->data[pglist->npgs].npids_allocated	= 0;
+
+    pglist->index[index] = &pglist->data[pglist->npgs++];
+
+insert_pid:
+    pg = pglist->index[index];
+
+    if(pg->npids == pg->npids_allocated) {
+	if(pg->npids_allocated) {
+	    pg->npids_allocated <<= 1;
+
+	    if(pg->npids_allocated < pg->npids) pg->npids_allocated = SIZE_MAX;
+	    if(pg->npids_allocated > nprocs) pg->npids_allocated = nprocs;
+	}
+
+	else {
+	    pg->npids_allocated = 1;
+	}
+
+	backup_ptr = pg->pids;
+	pg->pids = realloc(pg->pids, pg->npids_allocated * sizeof(pid_t));
+
+	if(pg->pids == NULL) {
+	    pg->pids = backup_ptr;
+	    goto register_alloc_error;
+	}
+    }
+
+    pg->pids[pg->npids++] = pid;
+
+    return;
+
+register_alloc_error:
+    if(pglist) {
+	if(pglist->data) {
+	    process_group * pg = pglist->data;
+
+	    while(pglist->npgs--) {
+		if(pg->pids) free((pg++)->pids);
+	    }
+
+	    free(pglist->data);
+	}
+
+	if(pglist->index) free(pglist->index);
+
+	free(pglist);
+    }
+
+    alloc_error = 1;
+}
+
+void free_memory(void) {
+    if(pglist) {
+	if(pglist->data) {
+	    process_group * pg = pglist->data;
+
+	    while(pglist->npgs--) {
+		if(pg->pids) free((pg++)->pids);
+	    }
+
+	    free(pglist->data);
+	}
+
+	if(pglist->index) free(pglist->index);
+
+	free(pglist);
+    }
+
+    if(plist) {
+	while(nprocs--) {
+	    if(plist[nprocs].device) free(plist[nprocs].device);
+	    if(plist[nprocs].hostname) free(plist[nprocs].hostname);
+	}
+
+	free(plist);
+    }
 }
 
 void cleanup(void)
 {
     int i;
-    /* could walk through list of processes, but it looks
-       like we can just send the signal to the process group
-     */
 
     if (use_totalview) {
 	fprintf(stderr, "Cleaning up all processes ...");
@@ -1417,36 +1629,180 @@
     }
 
     for (i = 0; i < nprocs; i++) {
-        if (RUNNING(i)) {
-            /* send terminal interrupt, which will hopefully 
-               propagate to the other side. (not sure what xterm will
-               do here.
-             */
-            kill(plist[i].pid, SIGINT);
-        }
+	if (RUNNING(i)) {
+	    /* send terminal interrupt, which will hopefully 
+	       propagate to the other side. (not sure what xterm will
+	       do here.
+	     */
+	    kill(plist[i].pid, SIGINT);
+	}
     }
+
     sleep(1);
 
     for (i = 0; i < nprocs; i++) {
-        if (plist[i].state != P_NOTSTARTED) {
-            /* send regular interrupt to rsh */
-            kill(plist[i].pid, SIGTERM);
-        }
+	if (plist[i].state != P_NOTSTARTED) {
+	    /* send regular interrupt to rsh */
+	    kill(plist[i].pid, SIGTERM);
+	}
     }
 
     sleep(1);
 
     for (i = 0; i < nprocs; i++) {
-        if (plist[i].state != P_NOTSTARTED) {
-            /* Kill the processes */
-            kill(plist[i].pid, SIGKILL);
-        }
+	if (plist[i].state != P_NOTSTARTED) {
+	    /* Kill the processes */
+	    kill(plist[i].pid, SIGKILL);
+	}
+    }
+
+    if(pglist) {
+	rkill_fast();
+    }
+
+    else {
+	rkill_linear();
+    }
+
+    exit(EXIT_FAILURE);
+}
+
+void rkill_fast(void) {
+    int i, j, tryagain, spawned_pid[pglist->npgs];
+
+    fprintf(stderr, "Killing remote processes...");
+
+    for(i = 0; i < pglist->npgs; i++) {
+	if(0 == (spawned_pid[i] = fork())) {
+	    if(pglist->index[i]->npids) {
+		const size_t bufsize = 40 + 10 * pglist->index[i]->npids;
+		const process_group * pg = pglist->index[i];
+		char kill_cmd[bufsize], tmp[10];
+
+		kill_cmd[0] = '\0';
+		strcat(kill_cmd, "kill -s SIGKILL");
+
+		for(j = 0; j < pg->npids; j++) {
+		    snprintf(tmp, 10, " %d", pg->pids[j]);
+		    strcat(kill_cmd, tmp);
+		}
+
+		strcat(kill_cmd, " >&/dev/null");
+
+		if(use_rsh) {
+		    execl(RSH_CMD, RSH_CMD, pg->hostname, kill_cmd, NULL);
+		}
+
+		else {
+		    execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x", pg->hostname,
+			    kill_cmd, NULL);
+		}
+
+		perror(NULL);
+		exit(EXIT_FAILURE);
+	    }
+
+	    else {
+		exit(EXIT_SUCCESS);
+	    }
+	}
+    }
+
+    while(1) {
+	static int iteration = 0;
+	tryagain = 0;
+
+	sleep(1 << iteration);
+
+	for (i = 0; i < pglist->npgs; i++) {
+	    if(spawned_pid[i]) {
+		if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, WNOHANG))) {
+		    tryagain = 1;
+		}
+	    }
+	}
+
+	if(++iteration == 5 || !tryagain) {
+	    fprintf(stderr, "DONE\n");
+	    break;
+	}
+    }
+
+    if(tryagain) {
+	fprintf(stderr, "The following processes may have not been killed:\n");
+	for (i = 0; i < pglist->npgs; i++) {
+	    if(spawned_pid[i]) {
+		const process_group * pg = pglist->index[i];
+
+		fprintf(stderr, "%s:", pg->hostname);
+
+		for (j = 0; j < pg->npids; j++) {
+		    fprintf(stderr, " %d", pg->pids[j]);
+		}
+
+		fprintf(stderr, "\n");
+	    }
+	}
+    }
+}
+
+void rkill_linear(void) {
+    int i, j, tryagain, spawned_pid[nprocs];
+
+    fprintf(stderr, "Killing remote processes...");
+
+    for (i = 0; i < nprocs; i++) {
+	if(0 == (spawned_pid[i] = fork())) {
+	    char kill_cmd[80];
+
+	    if(!plist[i].remote_pid) exit(EXIT_SUCCESS);
+
+	    snprintf(kill_cmd, 80, "kill -s SIGKILL %d >&/dev/null",
+		plist[i].remote_pid);
+
+	    if(use_rsh) {
+		execl(RSH_CMD, RSH_CMD, plist[i].hostname, kill_cmd, NULL);
+	    }
+
+	    else {
+		execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x",
+			plist[i].hostname, kill_cmd, NULL);
+	    }
+
+	    perror(NULL);
+	    exit(EXIT_FAILURE);
+	}
     }
 
-    fprintf(stderr, "done.\n");
+    while(1) {
+	static int iteration = 0;
+	tryagain = 0;
+
+	sleep(1 << iteration);
+
+	for (i = 0; i < nprocs; i++) {
+	    if(spawned_pid[i]) {
+		if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, WNOHANG))) {
+		    tryagain = 1;
+		}
+	    }
+	}
 
-    exit(1);
+	if(++iteration == 5 || !tryagain) {
+	    fprintf(stderr, "DONE\n");
+	    break;
+	}
+    }
 
+    if(tryagain) {
+	fprintf(stderr, "The following processes may have not been killed:\n");
+	for (i = 0; i < nprocs; i++) {
+	    if(spawned_pid[i]) {
+		fprintf(stderr, "%s [%d]\n", plist[i].hostname,
+			plist[i].remote_pid);
+	    }
+	}
+    }
 }
 
 
@@ -1457,9 +1813,13 @@
 
 void alarm_handler(int signal)
 {
+    extern const char * alarm_msg;
+
     if (use_totalview) {
 	fprintf(stderr, "Timeout alarm signaled\n");
     }
+
+    if(alarm_msg) fprintf(stderr, alarm_msg);
     cleanup();
 }
 
@@ -1467,19 +1827,21 @@
 void child_handler(int signal)
 {
     int status, i, child, pid;
-    int exitstatus = 0;
+    int exitstatus = EXIT_SUCCESS;
 
     if (use_totalview) {
 	fprintf(stderr, "mpirun: child died. Waiting for others.\n"); 
     }
     alarm(10);
+    alarm_msg = "Child died. Timeout while waiting for others.\n";
+
     for (i = 0; i < nprocs; i++) {
         pid = wait(&status);
         if (pid == -1) {
             perror("wait");
-            exitstatus = 1;
+            exitstatus = EXIT_FAILURE;
         } else if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
-            exitstatus = 1;
+            exitstatus = EXIT_FAILURE;
         }
         for (child = 0; child < nprocs; child++) {
             if (plist[child].pid == pid) {
@@ -1489,9 +1851,11 @@
         }
         if (child == nprocs) {
             fprintf(stderr, "Unable to find child %d!\n", pid);
-            exitstatus = 1;
+            exitstatus = EXIT_FAILURE;
         }
     }
     alarm(0);
     exit(exitstatus);
 }
+
+/* vi:set sw=4 sts=4 tw=80: */
diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client.h exp1/mpid/ch_gen2/process/pmgr_client.h
--- 0.9.9/mpid/ch_gen2/process/pmgr_client.h	2007-05-29 03:47:10.000000000 -0400
+++ exp1/mpid/ch_gen2/process/pmgr_client.h	2007-07-02 12:59:51.000000000 -0400
@@ -108,6 +108,6 @@
  * of the spawner, e.g. mpirun_rsh, to check that it understands
  * the version of the executable.
  */
-#define PMGR_VERSION 5
+#define PMGR_VERSION 6
 
 #endif
diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c
--- 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c	2007-05-29 03:47:10.000000000 -0400
+++ exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c	2007-07-02 12:59:51.000000000 -0400
@@ -171,6 +171,9 @@
     int  nwritten;
     int version;
     struct sockaddr_in sockaddr;
+
+    if(phase != 0) return;
+
    /*
      * Exchange information with the mpirun program. Send it our
      * socket address, get back addresses for our siblings.
@@ -208,14 +211,12 @@
      */
 
     version = PMGR_VERSION;
-    if (0 == phase) {
-        /* first, send a version number */
-        nwritten = write(mpirun_socket, &version, sizeof(version));
-        if (nwritten != sizeof(version)) {
-            sleep(2);
-            perror("write");
-            exit(1);
-        }
+    /* first, send a version number */
+    nwritten = write(mpirun_socket, &version, sizeof(version));
+    if (nwritten != sizeof(version)) {
+	sleep(2);
+	perror("write");
+	exit(1);
     }
     
     /* next, send our rank */
@@ -264,7 +265,6 @@
         tot_nread = tot_nread + nread;
     }
     fflush(stdout);
-    close(mpirun_socket);
     return 1;
 }
 
@@ -280,7 +280,6 @@
     pid_t *ppids = (pid_t *)pallpids;
     pid_t *allpids = NULL;
 
-    pmgr_init_connection(1);
     /* next, send size of addr */
     nwritten = write(mpirun_socket, &addrlen, sizeof(addrlen));
     if (nwritten != sizeof(addrlen)) {
@@ -314,7 +313,7 @@
        exit(1);
     }
 
-    /* next, send size of addr */
+    /* next, send size of pid */
     nwritten = write(mpirun_socket, &pidlen, sizeof(pidlen));
     if (nwritten != sizeof(mypid_len)) {
       sleep(2);
@@ -322,6 +321,7 @@
       exit(1);
     }
 
+    /* next, send our pid */
     if (pidlen != 0) {
       nwritten = write(mpirun_socket, &my_pid_int, (size_t) pidlen);
       if (nwritten != pidlen) {
@@ -345,7 +345,7 @@
 
     if (pidlen != 0) {
        	tot_nread=0;
-       	/* finally, read addresses from all processes */
+       	/* finally, read pids from all processes */
        	while (tot_nread < pmgr_nprocs*pidlen) {
 	    nread = read(mpirun_socket, (void*)((char *)allpids+tot_nread), 
                     (size_t) ((pmgr_nprocs*pidlen)-tot_nread));


More information about the mvapich-discuss mailing list