[mvapich-discuss] mvapich jobs cleanup
Sayantan Sur
surs at cse.ohio-state.edu
Tue Jul 10 13:58:55 EDT 2007
Hi Mark,
We have a patch to solve this stray process issue with MVAPICH-0.9.9.
I'm attaching the patch with this email. To apply the patch please
follow these steps:
$ cd mvapich-0.9.9
$ #save mpirun_rsh_patch to this directory
$ patch -p1 < mpirun_rsh_patch
Could you please let us know if this patch solves the problem for you?
Thanks,
Sayantan.
Mark Potts wrote:
> Hi,
> We are observing a number of cases in which MVAPICH-0.9.9
> jobs launched with mpirun_rsh leave stray processes on some
> nodes when the job terminates abnormally. Those stray
> processes continue to run forever and require recognition
> and killing.
>
> Is there a reason this happens with MVAPICH, and is there a
> way to prevent it. This doesn't seem to be the behavior
> that occurs for abnormally terminated Voltaire MPI or Intel
> MPI jobs.
> regards,
--
http://www.cse.ohio-state.edu/~surs
-------------- next part --------------
diff -ruN 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c exp1/mpid/ch_gen2/process/mpirun_rsh.c
--- 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c 2007-05-29 03:47:10.000000000 -0400
+++ exp1/mpid/ch_gen2/process/mpirun_rsh.c 2007-07-09 11:56:32.000000000 -0400
@@ -59,6 +59,7 @@
#define _GNU_SOURCE
#include <getopt.h>
#include <stdlib.h>
+#include <stddef.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
@@ -91,20 +92,34 @@
typedef struct {
char *hostname;
char *device;
- int pid;
+ pid_t pid;
+ pid_t remote_pid;
int port;
int control_socket;
process_state state;
} process;
+typedef struct {
+ const char * hostname;
+ pid_t * pids;
+ size_t npids, npids_allocated;
+} process_group;
+
+typedef struct {
+ process_group * data;
+ process_group ** index;
+ size_t npgs, npgs_allocated;
+} process_groups;
+
#define RUNNING(i) ((plist[i].state == P_STARTED || \
plist[i].state == P_CONNECTED || \
plist[i].state == P_RUNNING) ? 1 : 0)
/* other information: a.out and rank are implicit. */
-process *plist;
-int nprocs;
+process_groups * pglist = NULL;
+process * plist = NULL;
+int nprocs = 0;
int aout_index, port;
#define MAX_WD_LEN 256
char wd[MAX_WD_LEN]; /* working directory of current process */
@@ -112,11 +127,19 @@
char mpirun_host[MAX_HOST_LEN]; /* hostname of current process */
/* xxx need to add checking for string overflow, do this more carefully ... */
+/*
+ * Message notifying user of what timed out
+ */
+static const char * alarm_msg = NULL;
#define COMMAND_LEN 2000
#define SEPARATOR ':'
-
+void free_memory(void);
+void pglist_print(void);
+void pglist_insert(const char * const, const pid_t const);
+void rkill_fast(void);
+void rkill_linear(void);
void cleanup_handler(int);
void nostop_handler(int);
void alarm_handler(int);
@@ -239,15 +262,19 @@
int hostname_len = 0;
totalview_cmd[199] = 0;
display[0]='\0';
-
+ pidglen = sizeof(pid_t);
+
/* mpirun [-debug] [-xterm] -np N [-hostfile hfile | h1 h2 h3 ... hN] a.out [args] */
+ atexit(free_memory);
+
do {
c = getopt_long_only(argc, argv, "+", option_table, &option_index);
switch (c) {
case '?':
case ':':
usage();
+ exit(EXIT_FAILURE);
break;
case EOF:
break;
@@ -255,8 +282,10 @@
switch (option_index) {
case 0:
nprocs = atoi(optarg);
- if (nprocs < 1)
+ if (nprocs < 1) {
usage();
+ exit(EXIT_FAILURE);
+ }
break;
case 1:
debug_on = 1;
@@ -290,11 +319,11 @@
case 8:
show_version();
usage();
- exit(0);
+ exit(EXIT_SUCCESS);
break;
case 9:
show_version();
- exit(0);
+ exit(EXIT_SUCCESS);
break;
case 10:
use_totalview = 1;
@@ -311,17 +340,19 @@
break;
case 11:
usage();
- exit(0);
+ exit(EXIT_SUCCESS);
break;
default:
fprintf(stderr, "Unknown option\n");
usage();
+ exit(EXIT_FAILURE);
break;
}
break;
default:
fprintf(stderr, "Unreachable statement!\n");
usage();
+ exit(EXIT_FAILURE);
break;
}
} while (c != EOF);
@@ -332,7 +363,7 @@
fprintf(stderr, "Without hostfile option, hostnames must be "
"specified on command line.\n");
usage();
- exit(1);
+ exit(EXIT_FAILURE);
}
aout_index = nprocs + optind;
} else {
@@ -361,13 +392,14 @@
plist = malloc(nprocs * sizeof(process));
if (plist == NULL) {
perror("malloc");
- exit(1);
+ exit(EXIT_FAILURE);
}
for (i = 0; i < nprocs; i++) {
plist[i].state = P_NOTSTARTED;
plist[i].device = NULL;
plist[i].port = -1;
+ plist[i].remote_pid = 0;
}
/* grab hosts from command line or file */
@@ -376,7 +408,7 @@
hostname_len = read_hostfile(hostfile);
} else {
for (i = 0; i < nprocs; i++) {
- plist[i].hostname = argv[optind + i];
+ plist[i].hostname = (char *)strndup(argv[optind + i], 100);
hostname_len = hostname_len > strlen(plist[i].hostname) ?
hostname_len : strlen(plist[i].hostname);
}
@@ -388,7 +420,7 @@
if (!mpirun_processes) {
perror("malloc");
- exit(1);
+ exit(EXIT_FAILURE);
} else {
memset(mpirun_processes, 0, nprocs * (hostname_len + 4));
}
@@ -412,18 +444,18 @@
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (s < 0) {
perror("socket");
- exit(1);
+ exit(EXIT_FAILURE);
}
sockaddr.sin_addr.s_addr = INADDR_ANY;
sockaddr.sin_port = 0;
if (bind(s, (struct sockaddr *) &sockaddr, sockaddr_len) < 0) {
perror("bind");
- exit(1);
+ exit(EXIT_FAILURE);
}
if (getsockname(s, (struct sockaddr *) &sockaddr, &sockaddr_len) < 0) {
perror("getsockname");
- exit(1);
+ exit(EXIT_FAILURE);
}
port = (int) ntohs(sockaddr.sin_port);
@@ -431,14 +463,31 @@
if (!show_on) {
- signal(SIGHUP, cleanup_handler);
- signal(SIGINT, cleanup_handler);
- signal(SIGTSTP, nostop_handler);
- signal(SIGCHLD, child_handler);
- signal(SIGALRM, alarm_handler);
+ struct sigaction signal_handler;
+ signal_handler.sa_handler = cleanup_handler;
+ sigfillset(&signal_handler.sa_mask);
+ signal_handler.sa_flags = 0;
+
+ sigaction(SIGHUP, &signal_handler, NULL);
+ sigaction(SIGINT, &signal_handler, NULL);
+ sigaction(SIGTERM, &signal_handler, NULL);
+
+ signal_handler.sa_handler = nostop_handler;
+
+ sigaction(SIGTSTP, &signal_handler, NULL);
+
+ signal_handler.sa_handler = alarm_handler;
+
+ sigaction(SIGALRM, &signal_handler, NULL);
+
+ signal_handler.sa_handler = child_handler;
+ sigemptyset(&signal_handler.sa_mask);
+
+ sigaction(SIGCHLD, &signal_handler, NULL);
}
alarm(1000);
+ alarm_msg = "Timeout during client startup.\n";
/* long timeout for testing, where process may be stopped in debugger */
#ifdef USE_DDD
@@ -511,7 +560,7 @@
}
if (show_on)
- exit(0);
+ exit(EXIT_SUCCESS);
/*Hostid exchange start */
/* accept incoming connections, read port numbers */
@@ -522,6 +571,9 @@
ACCEPT_HID:
sockaddr_len = sizeof(sockaddr);
s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
+
+ alarm_msg = "Timeout during hostid exchange.\n";
+
if (s1 < 0) {
if (errno == EINTR)
goto ACCEPT_HID;
@@ -592,7 +644,7 @@
hostids = (int *) malloc(hostidlen * nprocs);
if (hostids == NULL) {
perror("malloc");
- exit(1);
+ exit(EXIT_FAILURE);
}
}
@@ -626,66 +678,33 @@
}
}
- /* close all opend sockets */
- for (i = 0; i < nprocs; i++) {
- close(plist[i].control_socket);
- }
-
alarm(1000);
- /* let enbale the timer again*/
+ alarm_msg = "Timeout during address exchange.\n";
+ /* lets enable the timer again*/
/* Lets read all other information, LID QP,etc..*/
/* accept incoming connections, read port numbers */
for (i = 0; i < nprocs; i++) {
- int version, rank, nread;
- char pidstr[12];
-ACCEPT:
- sockaddr_len = sizeof(sockaddr);
- s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
- if (s1 < 0) {
- if (errno == EINTR)
- goto ACCEPT;
- perror("accept");
- cleanup();
- }
+ int nread;
/*
* protocol:
- * We don't need version number,
- * 0. read rank of process
- * 1. read address length
- * 2. read address itself
- * 3. send array of all addresses
+ * We don't need the version number or the rank,
+ * 0. read address length
+ * 1. read address itself
+ * 2. send array of all addresses
*/
- /* 0. Find out who we're talking to */
- nread = read(s1, &rank, sizeof(rank));
- if (nread != sizeof(rank)) {
- perror("read");
- cleanup();
- }
-
- if (rank < 0 || rank >= nprocs
- || plist[rank].state != P_STARTED) {
- fprintf(stderr, "mpirun: invalid rank received. \n");
- cleanup();
- }
-
- plist[rank].control_socket = s1;
- plist[rank].state = P_CONNECTED;
+ plist[i].state = P_CONNECTED;
/* Let us know connection was established
* printf("MPIRUN_RSH: Process rank %d connected\n",rank);
*/
/* 1. Find out length of the data */
- nread = read(s1, &addrlen, sizeof(addrlen));
+ nread = read(plist[i].control_socket, &addrlen, sizeof(addrlen));
if (nread != sizeof(addrlen)) {
- /* nread == 0 is not actually an error! */
- if (nread == 0)
- continue;
-
perror("read");
cleanup();
}
@@ -707,21 +726,20 @@
alladdrs = (int *) malloc(addrlen * nprocs);
if (alladdrs == NULL) {
perror("malloc");
- exit(1);
+ exit(EXIT_FAILURE);
}
}
/* 2. Read info from each process */
/* for byte location */
- alladdrs_char = (char *) &alladdrs[rank * addrlen / sizeof(int)];
+ alladdrs_char = (char *) &alladdrs[i * addrlen / sizeof(int)];
tot_nread = 0;
while (tot_nread < addrlen) {
- nread =
- read(s1, (void *) (alladdrs_char + tot_nread),
- addrlen - tot_nread);
+ nread = read(plist[i].control_socket,
+ (void *) (alladdrs_char + tot_nread), addrlen - tot_nread);
if (nread < 0) {
perror("read");
@@ -733,36 +751,32 @@
read_pid:
/* 3. Find out length of the data */
- nread = read(s1, &pidlen, sizeof(pidlen));
+ nread = read(plist[i].control_socket, &pidlen, sizeof(pidlen));
if (nread != sizeof(pidlen)) {
perror("read");
cleanup();
}
/*fprintf(stderr, "read Pid lengths %d and %d \n", pidlen, nread);*/
- if (i == 0) {
- pidglen = pidlen;
- } else {
- if (pidlen != pidglen) {
- fprintf(stderr, "Pid lengths %d and %d do not match\n",
- pidlen, pidglen);
- cleanup();
- }
- }
+ if (pidlen != pidglen) {
+ fprintf(stderr, "Pid lengths %d and %d do not match\n",
+ pidlen, pidglen);
+ cleanup();
+ }
if (i == 0) {
- /* allocate as soon as we know the address length */
+ /* allocate as soon as we know the pid length */
allpids = (char *)malloc(pidlen * nprocs);
if (allpids == NULL) {
perror("malloc");
- exit(1);
+ exit(EXIT_FAILURE);
}
}
tot_nread=0;
while(tot_nread < pidlen) {
- nread = read(s1, (void*)(allpids+rank*pidlen+tot_nread),
- pidlen - tot_nread);
+ nread = read(plist[i].control_socket,
+ (void*)(allpids+i*pidlen+tot_nread), pidlen - tot_nread);
/*fprintf(stderr, "read length %d \n", nread);*/
if(nread < 0) {
perror("read");
@@ -770,6 +784,9 @@
}
tot_nread += nread;
}
+
+ plist[i].remote_pid = *((pid_t *)(allpids+i*pidlen));
+ pglist_insert(plist[i].hostname, plist[i].remote_pid);
}
@@ -795,7 +812,7 @@
out_addrs = (int *) malloc(out_addrs_len);
if (out_addrs == NULL) {
perror("malloc");
- exit(1);
+ exit(EXIT_FAILURE);
}
for (i = 0; i < nprocs; i++) {
@@ -876,8 +893,7 @@
sleep(100);
}
close(s);
- exit(0);
-
+ exit(EXIT_SUCCESS);
}
int start_process(int i, char *command_name, char *env)
@@ -925,12 +941,12 @@
if ((remote_command = malloc(str_len)) == NULL) {
fprintf(stderr, "Failed to malloc %d bytes for remote_command\n",
str_len);
- exit(1);
+ exit(EXIT_FAILURE);
}
if ((xterm_command = malloc(str_len)) == NULL) {
fprintf(stderr, "Failed to malloc %d bytes for xterm_command\n",
str_len);
- exit(1);
+ exit(EXIT_FAILURE);
}
@@ -1010,7 +1026,7 @@
if (!show_on) {
perror("RSH/SSH command failed!");
}
- exit(1);
+ exit(EXIT_FAILURE);
}
free(remote_command);
@@ -1189,8 +1205,6 @@
fprintf(stderr, "\ta.out => " "name of MPI binary\n");
fprintf(stderr, "\targs => " "arguments for MPI binary\n");
fprintf(stderr, "\n");
-
- exit(1);
}
/* finds first non-whitespace char in input string */
@@ -1221,7 +1235,7 @@
if (hf == NULL) {
fprintf(stderr, "Can't open hostfile %s\n", hostfile_name);
perror("open");
- exit(1);
+ exit(EXIT_FAILURE);
}
for (i = 0; i < nprocs; i++) {
@@ -1287,7 +1301,7 @@
} else {
fprintf(stderr, "End of file reached on "
"hostfile at %d of %d hostnames\n", i, nprocs);
- exit(1);
+ exit(EXIT_FAILURE);
}
}
fclose(hf);
@@ -1321,14 +1335,14 @@
if ((pf = fopen(paramfile, "r")) == NULL) {
sprintf(errstr, "Cant open paramfile = %s", paramfile);
perror(errstr);
- exit(1);
+ exit(EXIT_FAILURE);
}
if ( strlen(env) == 0 ){
/* Allocating space for env first time */
if ((env = malloc(ENV_LEN)) == NULL) {
fprintf(stderr, "Malloc of env failed in read_param_file\n");
- exit(1);
+ exit(EXIT_FAILURE);
}
env_left = ENV_LEN - 1;
}else{
@@ -1367,7 +1381,7 @@
(ENV_LEN > e_len + 1 ? ENV_LEN : e_len + 1) + strlen(env);
if ((env = realloc(env, newlen)) == NULL) {
fprintf(stderr, "realloc failed in read_param_file\n");
- exit(1);
+ exit(EXIT_FAILURE);
}
if (param_debug) {
printf("realloc to %d\n", newlen);
@@ -1395,15 +1409,213 @@
}
cleanup();
- exit(1);
+ exit(EXIT_FAILURE);
+}
+
+void pglist_print(void) {
+ if(pglist) {
+ int i, j;
+ size_t npids = 0, npids_allocated = 0;
+
+ fprintf(stderr, "\n--pglist--\ndata:\n");
+ for(i = 0; i < pglist->npgs; i++) {
+ fprintf(stderr, "%p - %s:", &pglist->data[i],
+ pglist->data[i].hostname);
+
+ for(j = 0; j < pglist->data[i].npids; j++) {
+ fprintf(stderr, " %d", pglist->data[i].pids[j]);
+ }
+
+ fprintf(stderr, "\n");
+ npids += pglist->data[i].npids;
+ npids_allocated += pglist->data[i].npids_allocated;
+ }
+
+ fprintf(stderr, "\nindex:");
+ for(i = 0; i < pglist->npgs; i++) {
+ fprintf(stderr, " %p", pglist->index[i]);
+ }
+
+ fprintf(stderr, "\nnpgs/allocated: %d/%d (%d%%)\n", pglist->npgs,
+ pglist->npgs_allocated, (int)(pglist->npgs_allocated ? 100. *
+ pglist->npgs / pglist->npgs_allocated : 100.));
+ fprintf(stderr, "npids/allocated: %d/%d (%d%%)\n", npids,
+ npids_allocated, (int)(npids_allocated ? 100. * npids /
+ npids_allocated : 100.));
+ fprintf(stderr, "--pglist--\n\n");
+ }
+}
+
+void pglist_insert(const char * const hostname, const pid_t const pid) {
+ const size_t increment = nprocs > 4 ? nprocs / 4 : 1;
+ size_t index = 0, bottom = 0, top;
+ static size_t alloc_error = 0;
+ int i, strcmp_result;
+ process_group * pg;
+ void * backup_ptr;
+
+ if(alloc_error) return;
+ if(pglist == NULL) goto init_pglist;
+
+ top = pglist->npgs - 1;
+ index = (top + bottom) / 2;
+
+ while(strcmp_result = strcmp(hostname, pglist->index[index]->hostname)) {
+ if(bottom >= top) break;
+
+ if(strcmp_result > 0) {
+ bottom = index + 1;
+ }
+
+ else {
+ top = index - 1;
+ }
+
+ index = (top + bottom) / 2;
+ }
+
+ if(!strcmp_result) goto insert_pid;
+ if(strcmp_result > 0) index++;
+
+ goto add_process_group;
+
+init_pglist:
+ pglist = malloc(sizeof(process_groups));
+
+ if(pglist) {
+ pglist->data = NULL;
+ pglist->index = NULL;
+ pglist->npgs = 0;
+ pglist->npgs_allocated = 0;
+ }
+
+ else {
+ goto register_alloc_error;
+ }
+
+add_process_group:
+ if(pglist->npgs == pglist->npgs_allocated) {
+ process_group * pglist_data_backup = pglist->data;
+ process_group ** pglist_index_backup = pglist->index;
+ ptrdiff_t offset;
+
+ pglist->npgs_allocated += increment;
+
+ backup_ptr = pglist->data;
+ pglist->data = realloc(pglist->data, sizeof(process_group) *
+ pglist->npgs_allocated);
+
+ if(pglist->data == NULL) {
+ pglist->data = backup_ptr;
+ goto register_alloc_error;
+ }
+
+ backup_ptr = pglist->index;
+ pglist->index = realloc(pglist->index, sizeof(process_group *) *
+ pglist->npgs_allocated);
+
+ if(pglist->index == NULL) {
+ pglist->index = backup_ptr;
+ goto register_alloc_error;
+ }
+
+ if(offset = (size_t)pglist->data - (size_t)pglist_data_backup) {
+ for(i = 0; i < pglist->npgs; i++) {
+ pglist->index[i] = (process_group *)((size_t)pglist->index[i] +
+ offset);
+ }
+ }
+ }
+
+ for(i = pglist->npgs; i > index; i--) {
+ pglist->index[i] = pglist->index[i-1];
+ }
+
+ pglist->data[pglist->npgs].hostname = hostname;
+ pglist->data[pglist->npgs].pids = NULL;
+ pglist->data[pglist->npgs].npids = 0;
+ pglist->data[pglist->npgs].npids_allocated = 0;
+
+ pglist->index[index] = &pglist->data[pglist->npgs++];
+
+insert_pid:
+ pg = pglist->index[index];
+
+ if(pg->npids == pg->npids_allocated) {
+ if(pg->npids_allocated) {
+ pg->npids_allocated <<= 1;
+
+ if(pg->npids_allocated < pg->npids) pg->npids_allocated = SIZE_MAX;
+ if(pg->npids_allocated > nprocs) pg->npids_allocated = nprocs;
+ }
+
+ else {
+ pg->npids_allocated = 1;
+ }
+
+ backup_ptr = pg->pids;
+ pg->pids = realloc(pg->pids, pg->npids_allocated * sizeof(pid_t));
+
+ if(pg->pids == NULL) {
+ pg->pids = backup_ptr;
+ goto register_alloc_error;
+ }
+ }
+
+ pg->pids[pg->npids++] = pid;
+
+ return;
+
+register_alloc_error:
+ if(pglist) {
+ if(pglist->data) {
+ process_group * pg = pglist->data;
+
+ while(pglist->npgs--) {
+ if(pg->pids) free((pg++)->pids);
+ }
+
+ free(pglist->data);
+ }
+
+ if(pglist->index) free(pglist->index);
+
+ free(pglist);
+ }
+
+ alloc_error = 1;
+}
+
+void free_memory(void) {
+ if(pglist) {
+ if(pglist->data) {
+ process_group * pg = pglist->data;
+
+ while(pglist->npgs--) {
+ if(pg->pids) free((pg++)->pids);
+ }
+
+ free(pglist->data);
+ }
+
+ if(pglist->index) free(pglist->index);
+
+ free(pglist);
+ }
+
+ if(plist) {
+ while(nprocs--) {
+ if(plist[nprocs].device) free(plist[nprocs].device);
+ if(plist[nprocs].hostname) free(plist[nprocs].hostname);
+ }
+
+ free(plist);
+ }
}
void cleanup(void)
{
int i;
- /* could walk through list of processes, but it looks
- like we can just send the signal to the process group
- */
if (use_totalview) {
fprintf(stderr, "Cleaning up all processes ...");
@@ -1417,36 +1629,180 @@
}
for (i = 0; i < nprocs; i++) {
- if (RUNNING(i)) {
- /* send terminal interrupt, which will hopefully
- propagate to the other side. (not sure what xterm will
- do here.
- */
- kill(plist[i].pid, SIGINT);
- }
+ if (RUNNING(i)) {
+ /* send terminal interrupt, which will hopefully
+ propagate to the other side. (not sure what xterm will
+ do here.
+ */
+ kill(plist[i].pid, SIGINT);
+ }
}
+
sleep(1);
for (i = 0; i < nprocs; i++) {
- if (plist[i].state != P_NOTSTARTED) {
- /* send regular interrupt to rsh */
- kill(plist[i].pid, SIGTERM);
- }
+ if (plist[i].state != P_NOTSTARTED) {
+ /* send regular interrupt to rsh */
+ kill(plist[i].pid, SIGTERM);
+ }
}
sleep(1);
for (i = 0; i < nprocs; i++) {
- if (plist[i].state != P_NOTSTARTED) {
- /* Kill the processes */
- kill(plist[i].pid, SIGKILL);
- }
+ if (plist[i].state != P_NOTSTARTED) {
+ /* Kill the processes */
+ kill(plist[i].pid, SIGKILL);
+ }
+ }
+
+ if(pglist) {
+ rkill_fast();
+ }
+
+ else {
+ rkill_linear();
+ }
+
+ exit(EXIT_FAILURE);
+}
+
+void rkill_fast(void) {
+ int i, j, tryagain, spawned_pid[pglist->npgs];
+
+ fprintf(stderr, "Killing remote processes...");
+
+ for(i = 0; i < pglist->npgs; i++) {
+ if(0 == (spawned_pid[i] = fork())) {
+ if(pglist->index[i]->npids) {
+ const size_t bufsize = 40 + 10 * pglist->index[i]->npids;
+ const process_group * pg = pglist->index[i];
+ char kill_cmd[bufsize], tmp[10];
+
+ kill_cmd[0] = '\0';
+ strcat(kill_cmd, "kill -s SIGKILL");
+
+ for(j = 0; j < pg->npids; j++) {
+ snprintf(tmp, 10, " %d", pg->pids[j]);
+ strcat(kill_cmd, tmp);
+ }
+
+ strcat(kill_cmd, " >&/dev/null");
+
+ if(use_rsh) {
+ execl(RSH_CMD, RSH_CMD, pg->hostname, kill_cmd, NULL);
+ }
+
+ else {
+ execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x", pg->hostname,
+ kill_cmd, NULL);
+ }
+
+ perror(NULL);
+ exit(EXIT_FAILURE);
+ }
+
+ else {
+ exit(EXIT_SUCCESS);
+ }
+ }
+ }
+
+ while(1) {
+ static int iteration = 0;
+ tryagain = 0;
+
+ sleep(1 << iteration);
+
+ for (i = 0; i < pglist->npgs; i++) {
+ if(spawned_pid[i]) {
+ if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, WNOHANG))) {
+ tryagain = 1;
+ }
+ }
+ }
+
+ if(++iteration == 5 || !tryagain) {
+ fprintf(stderr, "DONE\n");
+ break;
+ }
+ }
+
+ if(tryagain) {
+ fprintf(stderr, "The following processes may have not been killed:\n");
+ for (i = 0; i < pglist->npgs; i++) {
+ if(spawned_pid[i]) {
+ const process_group * pg = pglist->index[i];
+
+ fprintf(stderr, "%s:", pg->hostname);
+
+ for (j = 0; j < pg->npids; j++) {
+ fprintf(stderr, " %d", pg->pids[j]);
+ }
+
+ fprintf(stderr, "\n");
+ }
+ }
+ }
+}
+
+void rkill_linear(void) {
+ int i, j, tryagain, spawned_pid[nprocs];
+
+ fprintf(stderr, "Killing remote processes...");
+
+ for (i = 0; i < nprocs; i++) {
+ if(0 == (spawned_pid[i] = fork())) {
+ char kill_cmd[80];
+
+ if(!plist[i].remote_pid) exit(EXIT_SUCCESS);
+
+ snprintf(kill_cmd, 80, "kill -s SIGKILL %d >&/dev/null",
+ plist[i].remote_pid);
+
+ if(use_rsh) {
+ execl(RSH_CMD, RSH_CMD, plist[i].hostname, kill_cmd, NULL);
+ }
+
+ else {
+ execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x",
+ plist[i].hostname, kill_cmd, NULL);
+ }
+
+ perror(NULL);
+ exit(EXIT_FAILURE);
+ }
}
- fprintf(stderr, "done.\n");
+ while(1) {
+ static int iteration = 0;
+ tryagain = 0;
+
+ sleep(1 << iteration);
+
+ for (i = 0; i < nprocs; i++) {
+ if(spawned_pid[i]) {
+ if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, WNOHANG))) {
+ tryagain = 1;
+ }
+ }
+ }
- exit(1);
+ if(++iteration == 5 || !tryagain) {
+ fprintf(stderr, "DONE\n");
+ break;
+ }
+ }
+ if(tryagain) {
+ fprintf(stderr, "The following processes may have not been killed:\n");
+ for (i = 0; i < nprocs; i++) {
+ if(spawned_pid[i]) {
+ fprintf(stderr, "%s [%d]\n", plist[i].hostname,
+ plist[i].remote_pid);
+ }
+ }
+ }
}
@@ -1457,9 +1813,13 @@
void alarm_handler(int signal)
{
+ extern const char * alarm_msg;
+
if (use_totalview) {
fprintf(stderr, "Timeout alarm signaled\n");
}
+
+ if(alarm_msg) fprintf(stderr, alarm_msg);
cleanup();
}
@@ -1467,19 +1827,21 @@
void child_handler(int signal)
{
int status, i, child, pid;
- int exitstatus = 0;
+ int exitstatus = EXIT_SUCCESS;
if (use_totalview) {
fprintf(stderr, "mpirun: child died. Waiting for others.\n");
}
alarm(10);
+ alarm_msg = "Child died. Timeout while waiting for others.\n";
+
for (i = 0; i < nprocs; i++) {
pid = wait(&status);
if (pid == -1) {
perror("wait");
- exitstatus = 1;
+ exitstatus = EXIT_FAILURE;
} else if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
- exitstatus = 1;
+ exitstatus = EXIT_FAILURE;
}
for (child = 0; child < nprocs; child++) {
if (plist[child].pid == pid) {
@@ -1489,9 +1851,11 @@
}
if (child == nprocs) {
fprintf(stderr, "Unable to find child %d!\n", pid);
- exitstatus = 1;
+ exitstatus = EXIT_FAILURE;
}
}
alarm(0);
exit(exitstatus);
}
+
+/* vi:set sw=4 sts=4 tw=80: */
diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client.h exp1/mpid/ch_gen2/process/pmgr_client.h
--- 0.9.9/mpid/ch_gen2/process/pmgr_client.h 2007-05-29 03:47:10.000000000 -0400
+++ exp1/mpid/ch_gen2/process/pmgr_client.h 2007-07-02 12:59:51.000000000 -0400
@@ -108,6 +108,6 @@
* of the spawner, e.g. mpirun_rsh, to check that it understands
* the version of the executable.
*/
-#define PMGR_VERSION 5
+#define PMGR_VERSION 6
#endif
diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c
--- 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c 2007-05-29 03:47:10.000000000 -0400
+++ exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c 2007-07-02 12:59:51.000000000 -0400
@@ -171,6 +171,9 @@
int nwritten;
int version;
struct sockaddr_in sockaddr;
+
+ if(phase != 0) return;
+
/*
* Exchange information with the mpirun program. Send it our
* socket address, get back addresses for our siblings.
@@ -208,14 +211,12 @@
*/
version = PMGR_VERSION;
- if (0 == phase) {
- /* first, send a version number */
- nwritten = write(mpirun_socket, &version, sizeof(version));
- if (nwritten != sizeof(version)) {
- sleep(2);
- perror("write");
- exit(1);
- }
+ /* first, send a version number */
+ nwritten = write(mpirun_socket, &version, sizeof(version));
+ if (nwritten != sizeof(version)) {
+ sleep(2);
+ perror("write");
+ exit(1);
}
/* next, send our rank */
@@ -264,7 +265,6 @@
tot_nread = tot_nread + nread;
}
fflush(stdout);
- close(mpirun_socket);
return 1;
}
@@ -280,7 +280,6 @@
pid_t *ppids = (pid_t *)pallpids;
pid_t *allpids = NULL;
- pmgr_init_connection(1);
/* next, send size of addr */
nwritten = write(mpirun_socket, &addrlen, sizeof(addrlen));
if (nwritten != sizeof(addrlen)) {
@@ -314,7 +313,7 @@
exit(1);
}
- /* next, send size of addr */
+ /* next, send size of pid */
nwritten = write(mpirun_socket, &pidlen, sizeof(pidlen));
if (nwritten != sizeof(mypid_len)) {
sleep(2);
@@ -322,6 +321,7 @@
exit(1);
}
+ /* next, send our pid */
if (pidlen != 0) {
nwritten = write(mpirun_socket, &my_pid_int, (size_t) pidlen);
if (nwritten != pidlen) {
@@ -345,7 +345,7 @@
if (pidlen != 0) {
tot_nread=0;
- /* finally, read addresses from all processes */
+ /* finally, read pids from all processes */
while (tot_nread < pmgr_nprocs*pidlen) {
nread = read(mpirun_socket, (void*)((char *)allpids+tot_nread),
(size_t) ((pmgr_nprocs*pidlen)-tot_nread));
More information about the mvapich-discuss
mailing list