[mvapich-discuss] mvapich jobs cleanup
Mark Potts
potts at hpcapplications.com
Fri Jul 13 21:21:03 EDT 2007
Sayantan,
No luck yet with that mpirun_rsh patch.
I'm getting:
"mpirun: executable version 5 does not match our version 6"
"Killing remote processes...DONE"
as soon as I attempt to start up the new mpirun_rsh.
I can see that the patch changes the version number but can't yet
understand to whom "executable version" and "our" refer.
Hints? Directions?
regards,
Sayantan Sur wrote:
> Hi Mark,
>
> We have a patch to solve this stray process issue with MVAPICH-0.9.9.
> I'm attaching the patch with this email. To apply the patch please
> follow these steps:
>
> $ cd mvapich-0.9.9
> $ #save mpirun_rsh_patch to this directory
> $ patch -p1 < mpirun_rsh_patch
>
> Could you please let us know if this patch solves the problem for you?
>
> Thanks,
> Sayantan.
>
>
> Mark Potts wrote:
>> Hi,
>> We are observing a number of cases in which MVAPICH-0.9.9
>> jobs launched with mpirun_rsh leave stray processes on some
>> nodes when the job terminates abnormally. Those stray
>> processes continue to run forever and require recognition
>> and killing.
>>
>> Is there a reason this happens with MVAPICH, and is there a
>> way to prevent it. This doesn't seem to be the behavior
>> that occurs for abnormally terminated Voltaire MPI or Intel
>> MPI jobs.
>> regards,
>
>
>
> ------------------------------------------------------------------------
>
> diff -ruN 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c exp1/mpid/ch_gen2/process/mpirun_rsh.c
> --- 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c 2007-05-29 03:47:10.000000000 -0400
> +++ exp1/mpid/ch_gen2/process/mpirun_rsh.c 2007-07-09 11:56:32.000000000 -0400
> @@ -59,6 +59,7 @@
> #define _GNU_SOURCE
> #include <getopt.h>
> #include <stdlib.h>
> +#include <stddef.h>
> #include <stdio.h>
> #include <signal.h>
> #include <unistd.h>
> @@ -91,20 +92,34 @@
> typedef struct {
> char *hostname;
> char *device;
> - int pid;
> + pid_t pid;
> + pid_t remote_pid;
> int port;
> int control_socket;
> process_state state;
> } process;
>
> +typedef struct {
> + const char * hostname;
> + pid_t * pids;
> + size_t npids, npids_allocated;
> +} process_group;
> +
> +typedef struct {
> + process_group * data;
> + process_group ** index;
> + size_t npgs, npgs_allocated;
> +} process_groups;
> +
> #define RUNNING(i) ((plist[i].state == P_STARTED || \
> plist[i].state == P_CONNECTED || \
> plist[i].state == P_RUNNING) ? 1 : 0)
>
> /* other information: a.out and rank are implicit. */
>
> -process *plist;
> -int nprocs;
> +process_groups * pglist = NULL;
> +process * plist = NULL;
> +int nprocs = 0;
> int aout_index, port;
> #define MAX_WD_LEN 256
> char wd[MAX_WD_LEN]; /* working directory of current process */
> @@ -112,11 +127,19 @@
> char mpirun_host[MAX_HOST_LEN]; /* hostname of current process */
> /* xxx need to add checking for string overflow, do this more carefully ... */
>
> +/*
> + * Message notifying user of what timed out
> + */
> +static const char * alarm_msg = NULL;
>
> #define COMMAND_LEN 2000
> #define SEPARATOR ':'
>
> -
> +void free_memory(void);
> +void pglist_print(void);
> +void pglist_insert(const char * const, const pid_t const);
> +void rkill_fast(void);
> +void rkill_linear(void);
> void cleanup_handler(int);
> void nostop_handler(int);
> void alarm_handler(int);
> @@ -239,15 +262,19 @@
> int hostname_len = 0;
> totalview_cmd[199] = 0;
> display[0]='\0';
> -
> + pidglen = sizeof(pid_t);
> +
> /* mpirun [-debug] [-xterm] -np N [-hostfile hfile | h1 h2 h3 ... hN] a.out [args] */
>
> + atexit(free_memory);
> +
> do {
> c = getopt_long_only(argc, argv, "+", option_table, &option_index);
> switch (c) {
> case '?':
> case ':':
> usage();
> + exit(EXIT_FAILURE);
> break;
> case EOF:
> break;
> @@ -255,8 +282,10 @@
> switch (option_index) {
> case 0:
> nprocs = atoi(optarg);
> - if (nprocs < 1)
> + if (nprocs < 1) {
> usage();
> + exit(EXIT_FAILURE);
> + }
> break;
> case 1:
> debug_on = 1;
> @@ -290,11 +319,11 @@
> case 8:
> show_version();
> usage();
> - exit(0);
> + exit(EXIT_SUCCESS);
> break;
> case 9:
> show_version();
> - exit(0);
> + exit(EXIT_SUCCESS);
> break;
> case 10:
> use_totalview = 1;
> @@ -311,17 +340,19 @@
> break;
> case 11:
> usage();
> - exit(0);
> + exit(EXIT_SUCCESS);
> break;
> default:
> fprintf(stderr, "Unknown option\n");
> usage();
> + exit(EXIT_FAILURE);
> break;
> }
> break;
> default:
> fprintf(stderr, "Unreachable statement!\n");
> usage();
> + exit(EXIT_FAILURE);
> break;
> }
> } while (c != EOF);
> @@ -332,7 +363,7 @@
> fprintf(stderr, "Without hostfile option, hostnames must be "
> "specified on command line.\n");
> usage();
> - exit(1);
> + exit(EXIT_FAILURE);
> }
> aout_index = nprocs + optind;
> } else {
> @@ -361,13 +392,14 @@
> plist = malloc(nprocs * sizeof(process));
> if (plist == NULL) {
> perror("malloc");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
>
> for (i = 0; i < nprocs; i++) {
> plist[i].state = P_NOTSTARTED;
> plist[i].device = NULL;
> plist[i].port = -1;
> + plist[i].remote_pid = 0;
> }
>
> /* grab hosts from command line or file */
> @@ -376,7 +408,7 @@
> hostname_len = read_hostfile(hostfile);
> } else {
> for (i = 0; i < nprocs; i++) {
> - plist[i].hostname = argv[optind + i];
> + plist[i].hostname = (char *)strndup(argv[optind + i], 100);
> hostname_len = hostname_len > strlen(plist[i].hostname) ?
> hostname_len : strlen(plist[i].hostname);
> }
> @@ -388,7 +420,7 @@
>
> if (!mpirun_processes) {
> perror("malloc");
> - exit(1);
> + exit(EXIT_FAILURE);
> } else {
> memset(mpirun_processes, 0, nprocs * (hostname_len + 4));
> }
> @@ -412,18 +444,18 @@
> s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
> if (s < 0) {
> perror("socket");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
> sockaddr.sin_addr.s_addr = INADDR_ANY;
> sockaddr.sin_port = 0;
> if (bind(s, (struct sockaddr *) &sockaddr, sockaddr_len) < 0) {
> perror("bind");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
>
> if (getsockname(s, (struct sockaddr *) &sockaddr, &sockaddr_len) < 0) {
> perror("getsockname");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
>
> port = (int) ntohs(sockaddr.sin_port);
> @@ -431,14 +463,31 @@
>
>
> if (!show_on) {
> - signal(SIGHUP, cleanup_handler);
> - signal(SIGINT, cleanup_handler);
> - signal(SIGTSTP, nostop_handler);
> - signal(SIGCHLD, child_handler);
> - signal(SIGALRM, alarm_handler);
> + struct sigaction signal_handler;
> + signal_handler.sa_handler = cleanup_handler;
> + sigfillset(&signal_handler.sa_mask);
> + signal_handler.sa_flags = 0;
> +
> + sigaction(SIGHUP, &signal_handler, NULL);
> + sigaction(SIGINT, &signal_handler, NULL);
> + sigaction(SIGTERM, &signal_handler, NULL);
> +
> + signal_handler.sa_handler = nostop_handler;
> +
> + sigaction(SIGTSTP, &signal_handler, NULL);
> +
> + signal_handler.sa_handler = alarm_handler;
> +
> + sigaction(SIGALRM, &signal_handler, NULL);
> +
> + signal_handler.sa_handler = child_handler;
> + sigemptyset(&signal_handler.sa_mask);
> +
> + sigaction(SIGCHLD, &signal_handler, NULL);
> }
>
> alarm(1000);
> + alarm_msg = "Timeout during client startup.\n";
> /* long timeout for testing, where process may be stopped in debugger */
>
> #ifdef USE_DDD
> @@ -511,7 +560,7 @@
> }
>
> if (show_on)
> - exit(0);
> + exit(EXIT_SUCCESS);
>
> /*Hostid exchange start */
> /* accept incoming connections, read port numbers */
> @@ -522,6 +571,9 @@
> ACCEPT_HID:
> sockaddr_len = sizeof(sockaddr);
> s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
> +
> + alarm_msg = "Timeout during hostid exchange.\n";
> +
> if (s1 < 0) {
> if (errno == EINTR)
> goto ACCEPT_HID;
> @@ -592,7 +644,7 @@
> hostids = (int *) malloc(hostidlen * nprocs);
> if (hostids == NULL) {
> perror("malloc");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
> }
>
> @@ -626,66 +678,33 @@
> }
> }
>
> - /* close all opend sockets */
> - for (i = 0; i < nprocs; i++) {
> - close(plist[i].control_socket);
> - }
> -
> alarm(1000);
> - /* let enbale the timer again*/
> + alarm_msg = "Timeout during address exchange.\n";
> + /* lets enable the timer again*/
>
> /* Lets read all other information, LID QP,etc..*/
>
> /* accept incoming connections, read port numbers */
> for (i = 0; i < nprocs; i++) {
> - int version, rank, nread;
> - char pidstr[12];
> -ACCEPT:
> - sockaddr_len = sizeof(sockaddr);
> - s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
> - if (s1 < 0) {
> - if (errno == EINTR)
> - goto ACCEPT;
> - perror("accept");
> - cleanup();
> - }
> + int nread;
>
> /*
> * protocol:
> - * We don't need version number,
> - * 0. read rank of process
> - * 1. read address length
> - * 2. read address itself
> - * 3. send array of all addresses
> + * We don't need the version number or the rank,
> + * 0. read address length
> + * 1. read address itself
> + * 2. send array of all addresses
> */
>
> - /* 0. Find out who we're talking to */
> - nread = read(s1, &rank, sizeof(rank));
> - if (nread != sizeof(rank)) {
> - perror("read");
> - cleanup();
> - }
> -
> - if (rank < 0 || rank >= nprocs
> - || plist[rank].state != P_STARTED) {
> - fprintf(stderr, "mpirun: invalid rank received. \n");
> - cleanup();
> - }
> -
> - plist[rank].control_socket = s1;
> - plist[rank].state = P_CONNECTED;
> + plist[i].state = P_CONNECTED;
>
> /* Let us know connection was established
> * printf("MPIRUN_RSH: Process rank %d connected\n",rank);
> */
>
> /* 1. Find out length of the data */
> - nread = read(s1, &addrlen, sizeof(addrlen));
> + nread = read(plist[i].control_socket, &addrlen, sizeof(addrlen));
> if (nread != sizeof(addrlen)) {
> - /* nread == 0 is not actually an error! */
> - if (nread == 0)
> - continue;
> -
> perror("read");
> cleanup();
> }
> @@ -707,21 +726,20 @@
> alladdrs = (int *) malloc(addrlen * nprocs);
> if (alladdrs == NULL) {
> perror("malloc");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
> }
>
> /* 2. Read info from each process */
>
> /* for byte location */
> - alladdrs_char = (char *) &alladdrs[rank * addrlen / sizeof(int)];
> + alladdrs_char = (char *) &alladdrs[i * addrlen / sizeof(int)];
>
> tot_nread = 0;
>
> while (tot_nread < addrlen) {
> - nread =
> - read(s1, (void *) (alladdrs_char + tot_nread),
> - addrlen - tot_nread);
> + nread = read(plist[i].control_socket,
> + (void *) (alladdrs_char + tot_nread), addrlen - tot_nread);
>
> if (nread < 0) {
> perror("read");
> @@ -733,36 +751,32 @@
>
> read_pid:
> /* 3. Find out length of the data */
> - nread = read(s1, &pidlen, sizeof(pidlen));
> + nread = read(plist[i].control_socket, &pidlen, sizeof(pidlen));
> if (nread != sizeof(pidlen)) {
> perror("read");
> cleanup();
> }
>
> /*fprintf(stderr, "read Pid lengths %d and %d \n", pidlen, nread);*/
> - if (i == 0) {
> - pidglen = pidlen;
> - } else {
> - if (pidlen != pidglen) {
> - fprintf(stderr, "Pid lengths %d and %d do not match\n",
> - pidlen, pidglen);
> - cleanup();
> - }
> - }
> + if (pidlen != pidglen) {
> + fprintf(stderr, "Pid lengths %d and %d do not match\n",
> + pidlen, pidglen);
> + cleanup();
> + }
>
> if (i == 0) {
> - /* allocate as soon as we know the address length */
> + /* allocate as soon as we know the pid length */
> allpids = (char *)malloc(pidlen * nprocs);
> if (allpids == NULL) {
> perror("malloc");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
> }
>
> tot_nread=0;
> while(tot_nread < pidlen) {
> - nread = read(s1, (void*)(allpids+rank*pidlen+tot_nread),
> - pidlen - tot_nread);
> + nread = read(plist[i].control_socket,
> + (void*)(allpids+i*pidlen+tot_nread), pidlen - tot_nread);
> /*fprintf(stderr, "read length %d \n", nread);*/
> if(nread < 0) {
> perror("read");
> @@ -770,6 +784,9 @@
> }
> tot_nread += nread;
> }
> +
> + plist[i].remote_pid = *((pid_t *)(allpids+i*pidlen));
> + pglist_insert(plist[i].hostname, plist[i].remote_pid);
> }
>
>
> @@ -795,7 +812,7 @@
> out_addrs = (int *) malloc(out_addrs_len);
> if (out_addrs == NULL) {
> perror("malloc");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
>
> for (i = 0; i < nprocs; i++) {
> @@ -876,8 +893,7 @@
> sleep(100);
> }
> close(s);
> - exit(0);
> -
> + exit(EXIT_SUCCESS);
> }
>
> int start_process(int i, char *command_name, char *env)
> @@ -925,12 +941,12 @@
> if ((remote_command = malloc(str_len)) == NULL) {
> fprintf(stderr, "Failed to malloc %d bytes for remote_command\n",
> str_len);
> - exit(1);
> + exit(EXIT_FAILURE);
> }
> if ((xterm_command = malloc(str_len)) == NULL) {
> fprintf(stderr, "Failed to malloc %d bytes for xterm_command\n",
> str_len);
> - exit(1);
> + exit(EXIT_FAILURE);
> }
>
>
> @@ -1010,7 +1026,7 @@
> if (!show_on) {
> perror("RSH/SSH command failed!");
> }
> - exit(1);
> + exit(EXIT_FAILURE);
> }
>
> free(remote_command);
> @@ -1189,8 +1205,6 @@
> fprintf(stderr, "\ta.out => " "name of MPI binary\n");
> fprintf(stderr, "\targs => " "arguments for MPI binary\n");
> fprintf(stderr, "\n");
> -
> - exit(1);
> }
>
> /* finds first non-whitespace char in input string */
> @@ -1221,7 +1235,7 @@
> if (hf == NULL) {
> fprintf(stderr, "Can't open hostfile %s\n", hostfile_name);
> perror("open");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
>
> for (i = 0; i < nprocs; i++) {
> @@ -1287,7 +1301,7 @@
> } else {
> fprintf(stderr, "End of file reached on "
> "hostfile at %d of %d hostnames\n", i, nprocs);
> - exit(1);
> + exit(EXIT_FAILURE);
> }
> }
> fclose(hf);
> @@ -1321,14 +1335,14 @@
> if ((pf = fopen(paramfile, "r")) == NULL) {
> sprintf(errstr, "Cant open paramfile = %s", paramfile);
> perror(errstr);
> - exit(1);
> + exit(EXIT_FAILURE);
> }
>
> if ( strlen(env) == 0 ){
> /* Allocating space for env first time */
> if ((env = malloc(ENV_LEN)) == NULL) {
> fprintf(stderr, "Malloc of env failed in read_param_file\n");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
> env_left = ENV_LEN - 1;
> }else{
> @@ -1367,7 +1381,7 @@
> (ENV_LEN > e_len + 1 ? ENV_LEN : e_len + 1) + strlen(env);
> if ((env = realloc(env, newlen)) == NULL) {
> fprintf(stderr, "realloc failed in read_param_file\n");
> - exit(1);
> + exit(EXIT_FAILURE);
> }
> if (param_debug) {
> printf("realloc to %d\n", newlen);
> @@ -1395,15 +1409,213 @@
> }
> cleanup();
>
> - exit(1);
> + exit(EXIT_FAILURE);
> +}
> +
> +void pglist_print(void) {
> + if(pglist) {
> + int i, j;
> + size_t npids = 0, npids_allocated = 0;
> +
> + fprintf(stderr, "\n--pglist--\ndata:\n");
> + for(i = 0; i < pglist->npgs; i++) {
> + fprintf(stderr, "%p - %s:", &pglist->data[i],
> + pglist->data[i].hostname);
> +
> + for(j = 0; j < pglist->data[i].npids; j++) {
> + fprintf(stderr, " %d", pglist->data[i].pids[j]);
> + }
> +
> + fprintf(stderr, "\n");
> + npids += pglist->data[i].npids;
> + npids_allocated += pglist->data[i].npids_allocated;
> + }
> +
> + fprintf(stderr, "\nindex:");
> + for(i = 0; i < pglist->npgs; i++) {
> + fprintf(stderr, " %p", pglist->index[i]);
> + }
> +
> + fprintf(stderr, "\nnpgs/allocated: %d/%d (%d%%)\n", pglist->npgs,
> + pglist->npgs_allocated, (int)(pglist->npgs_allocated ? 100. *
> + pglist->npgs / pglist->npgs_allocated : 100.));
> + fprintf(stderr, "npids/allocated: %d/%d (%d%%)\n", npids,
> + npids_allocated, (int)(npids_allocated ? 100. * npids /
> + npids_allocated : 100.));
> + fprintf(stderr, "--pglist--\n\n");
> + }
> +}
> +
> +void pglist_insert(const char * const hostname, const pid_t const pid) {
> + const size_t increment = nprocs > 4 ? nprocs / 4 : 1;
> + size_t index = 0, bottom = 0, top;
> + static size_t alloc_error = 0;
> + int i, strcmp_result;
> + process_group * pg;
> + void * backup_ptr;
> +
> + if(alloc_error) return;
> + if(pglist == NULL) goto init_pglist;
> +
> + top = pglist->npgs - 1;
> + index = (top + bottom) / 2;
> +
> + while(strcmp_result = strcmp(hostname, pglist->index[index]->hostname)) {
> + if(bottom >= top) break;
> +
> + if(strcmp_result > 0) {
> + bottom = index + 1;
> + }
> +
> + else {
> + top = index - 1;
> + }
> +
> + index = (top + bottom) / 2;
> + }
> +
> + if(!strcmp_result) goto insert_pid;
> + if(strcmp_result > 0) index++;
> +
> + goto add_process_group;
> +
> +init_pglist:
> + pglist = malloc(sizeof(process_groups));
> +
> + if(pglist) {
> + pglist->data = NULL;
> + pglist->index = NULL;
> + pglist->npgs = 0;
> + pglist->npgs_allocated = 0;
> + }
> +
> + else {
> + goto register_alloc_error;
> + }
> +
> +add_process_group:
> + if(pglist->npgs == pglist->npgs_allocated) {
> + process_group * pglist_data_backup = pglist->data;
> + process_group ** pglist_index_backup = pglist->index;
> + ptrdiff_t offset;
> +
> + pglist->npgs_allocated += increment;
> +
> + backup_ptr = pglist->data;
> + pglist->data = realloc(pglist->data, sizeof(process_group) *
> + pglist->npgs_allocated);
> +
> + if(pglist->data == NULL) {
> + pglist->data = backup_ptr;
> + goto register_alloc_error;
> + }
> +
> + backup_ptr = pglist->index;
> + pglist->index = realloc(pglist->index, sizeof(process_group *) *
> + pglist->npgs_allocated);
> +
> + if(pglist->index == NULL) {
> + pglist->index = backup_ptr;
> + goto register_alloc_error;
> + }
> +
> + if(offset = (size_t)pglist->data - (size_t)pglist_data_backup) {
> + for(i = 0; i < pglist->npgs; i++) {
> + pglist->index[i] = (process_group *)((size_t)pglist->index[i] +
> + offset);
> + }
> + }
> + }
> +
> + for(i = pglist->npgs; i > index; i--) {
> + pglist->index[i] = pglist->index[i-1];
> + }
> +
> + pglist->data[pglist->npgs].hostname = hostname;
> + pglist->data[pglist->npgs].pids = NULL;
> + pglist->data[pglist->npgs].npids = 0;
> + pglist->data[pglist->npgs].npids_allocated = 0;
> +
> + pglist->index[index] = &pglist->data[pglist->npgs++];
> +
> +insert_pid:
> + pg = pglist->index[index];
> +
> + if(pg->npids == pg->npids_allocated) {
> + if(pg->npids_allocated) {
> + pg->npids_allocated <<= 1;
> +
> + if(pg->npids_allocated < pg->npids) pg->npids_allocated = SIZE_MAX;
> + if(pg->npids_allocated > nprocs) pg->npids_allocated = nprocs;
> + }
> +
> + else {
> + pg->npids_allocated = 1;
> + }
> +
> + backup_ptr = pg->pids;
> + pg->pids = realloc(pg->pids, pg->npids_allocated * sizeof(pid_t));
> +
> + if(pg->pids == NULL) {
> + pg->pids = backup_ptr;
> + goto register_alloc_error;
> + }
> + }
> +
> + pg->pids[pg->npids++] = pid;
> +
> + return;
> +
> +register_alloc_error:
> + if(pglist) {
> + if(pglist->data) {
> + process_group * pg = pglist->data;
> +
> + while(pglist->npgs--) {
> + if(pg->pids) free((pg++)->pids);
> + }
> +
> + free(pglist->data);
> + }
> +
> + if(pglist->index) free(pglist->index);
> +
> + free(pglist);
> + }
> +
> + alloc_error = 1;
> +}
> +
> +void free_memory(void) {
> + if(pglist) {
> + if(pglist->data) {
> + process_group * pg = pglist->data;
> +
> + while(pglist->npgs--) {
> + if(pg->pids) free((pg++)->pids);
> + }
> +
> + free(pglist->data);
> + }
> +
> + if(pglist->index) free(pglist->index);
> +
> + free(pglist);
> + }
> +
> + if(plist) {
> + while(nprocs--) {
> + if(plist[nprocs].device) free(plist[nprocs].device);
> + if(plist[nprocs].hostname) free(plist[nprocs].hostname);
> + }
> +
> + free(plist);
> + }
> }
>
> void cleanup(void)
> {
> int i;
> - /* could walk through list of processes, but it looks
> - like we can just send the signal to the process group
> - */
>
> if (use_totalview) {
> fprintf(stderr, "Cleaning up all processes ...");
> @@ -1417,36 +1629,180 @@
> }
>
> for (i = 0; i < nprocs; i++) {
> - if (RUNNING(i)) {
> - /* send terminal interrupt, which will hopefully
> - propagate to the other side. (not sure what xterm will
> - do here.
> - */
> - kill(plist[i].pid, SIGINT);
> - }
> + if (RUNNING(i)) {
> + /* send terminal interrupt, which will hopefully
> + propagate to the other side. (not sure what xterm will
> + do here.
> + */
> + kill(plist[i].pid, SIGINT);
> + }
> }
> +
> sleep(1);
>
> for (i = 0; i < nprocs; i++) {
> - if (plist[i].state != P_NOTSTARTED) {
> - /* send regular interrupt to rsh */
> - kill(plist[i].pid, SIGTERM);
> - }
> + if (plist[i].state != P_NOTSTARTED) {
> + /* send regular interrupt to rsh */
> + kill(plist[i].pid, SIGTERM);
> + }
> }
>
> sleep(1);
>
> for (i = 0; i < nprocs; i++) {
> - if (plist[i].state != P_NOTSTARTED) {
> - /* Kill the processes */
> - kill(plist[i].pid, SIGKILL);
> - }
> + if (plist[i].state != P_NOTSTARTED) {
> + /* Kill the processes */
> + kill(plist[i].pid, SIGKILL);
> + }
> + }
> +
> + if(pglist) {
> + rkill_fast();
> + }
> +
> + else {
> + rkill_linear();
> + }
> +
> + exit(EXIT_FAILURE);
> +}
> +
> +void rkill_fast(void) {
> + int i, j, tryagain, spawned_pid[pglist->npgs];
> +
> + fprintf(stderr, "Killing remote processes...");
> +
> + for(i = 0; i < pglist->npgs; i++) {
> + if(0 == (spawned_pid[i] = fork())) {
> + if(pglist->index[i]->npids) {
> + const size_t bufsize = 40 + 10 * pglist->index[i]->npids;
> + const process_group * pg = pglist->index[i];
> + char kill_cmd[bufsize], tmp[10];
> +
> + kill_cmd[0] = '\0';
> + strcat(kill_cmd, "kill -s SIGKILL");
> +
> + for(j = 0; j < pg->npids; j++) {
> + snprintf(tmp, 10, " %d", pg->pids[j]);
> + strcat(kill_cmd, tmp);
> + }
> +
> + strcat(kill_cmd, " >&/dev/null");
> +
> + if(use_rsh) {
> + execl(RSH_CMD, RSH_CMD, pg->hostname, kill_cmd, NULL);
> + }
> +
> + else {
> + execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x", pg->hostname,
> + kill_cmd, NULL);
> + }
> +
> + perror(NULL);
> + exit(EXIT_FAILURE);
> + }
> +
> + else {
> + exit(EXIT_SUCCESS);
> + }
> + }
> + }
> +
> + while(1) {
> + static int iteration = 0;
> + tryagain = 0;
> +
> + sleep(1 << iteration);
> +
> + for (i = 0; i < pglist->npgs; i++) {
> + if(spawned_pid[i]) {
> + if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, WNOHANG))) {
> + tryagain = 1;
> + }
> + }
> + }
> +
> + if(++iteration == 5 || !tryagain) {
> + fprintf(stderr, "DONE\n");
> + break;
> + }
> + }
> +
> + if(tryagain) {
> + fprintf(stderr, "The following processes may have not been killed:\n");
> + for (i = 0; i < pglist->npgs; i++) {
> + if(spawned_pid[i]) {
> + const process_group * pg = pglist->index[i];
> +
> + fprintf(stderr, "%s:", pg->hostname);
> +
> + for (j = 0; j < pg->npids; j++) {
> + fprintf(stderr, " %d", pg->pids[j]);
> + }
> +
> + fprintf(stderr, "\n");
> + }
> + }
> + }
> +}
> +
> +void rkill_linear(void) {
> + int i, j, tryagain, spawned_pid[nprocs];
> +
> + fprintf(stderr, "Killing remote processes...");
> +
> + for (i = 0; i < nprocs; i++) {
> + if(0 == (spawned_pid[i] = fork())) {
> + char kill_cmd[80];
> +
> + if(!plist[i].remote_pid) exit(EXIT_SUCCESS);
> +
> + snprintf(kill_cmd, 80, "kill -s SIGKILL %d >&/dev/null",
> + plist[i].remote_pid);
> +
> + if(use_rsh) {
> + execl(RSH_CMD, RSH_CMD, plist[i].hostname, kill_cmd, NULL);
> + }
> +
> + else {
> + execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x",
> + plist[i].hostname, kill_cmd, NULL);
> + }
> +
> + perror(NULL);
> + exit(EXIT_FAILURE);
> + }
> }
>
> - fprintf(stderr, "done.\n");
> + while(1) {
> + static int iteration = 0;
> + tryagain = 0;
> +
> + sleep(1 << iteration);
> +
> + for (i = 0; i < nprocs; i++) {
> + if(spawned_pid[i]) {
> + if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, WNOHANG))) {
> + tryagain = 1;
> + }
> + }
> + }
>
> - exit(1);
> + if(++iteration == 5 || !tryagain) {
> + fprintf(stderr, "DONE\n");
> + break;
> + }
> + }
>
> + if(tryagain) {
> + fprintf(stderr, "The following processes may have not been killed:\n");
> + for (i = 0; i < nprocs; i++) {
> + if(spawned_pid[i]) {
> + fprintf(stderr, "%s [%d]\n", plist[i].hostname,
> + plist[i].remote_pid);
> + }
> + }
> + }
> }
>
>
> @@ -1457,9 +1813,13 @@
>
> void alarm_handler(int signal)
> {
> + extern const char * alarm_msg;
> +
> if (use_totalview) {
> fprintf(stderr, "Timeout alarm signaled\n");
> }
> +
> + if(alarm_msg) fprintf(stderr, alarm_msg);
> cleanup();
> }
>
> @@ -1467,19 +1827,21 @@
> void child_handler(int signal)
> {
> int status, i, child, pid;
> - int exitstatus = 0;
> + int exitstatus = EXIT_SUCCESS;
>
> if (use_totalview) {
> fprintf(stderr, "mpirun: child died. Waiting for others.\n");
> }
> alarm(10);
> + alarm_msg = "Child died. Timeout while waiting for others.\n";
> +
> for (i = 0; i < nprocs; i++) {
> pid = wait(&status);
> if (pid == -1) {
> perror("wait");
> - exitstatus = 1;
> + exitstatus = EXIT_FAILURE;
> } else if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
> - exitstatus = 1;
> + exitstatus = EXIT_FAILURE;
> }
> for (child = 0; child < nprocs; child++) {
> if (plist[child].pid == pid) {
> @@ -1489,9 +1851,11 @@
> }
> if (child == nprocs) {
> fprintf(stderr, "Unable to find child %d!\n", pid);
> - exitstatus = 1;
> + exitstatus = EXIT_FAILURE;
> }
> }
> alarm(0);
> exit(exitstatus);
> }
> +
> +/* vi:set sw=4 sts=4 tw=80: */
> diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client.h exp1/mpid/ch_gen2/process/pmgr_client.h
> --- 0.9.9/mpid/ch_gen2/process/pmgr_client.h 2007-05-29 03:47:10.000000000 -0400
> +++ exp1/mpid/ch_gen2/process/pmgr_client.h 2007-07-02 12:59:51.000000000 -0400
> @@ -108,6 +108,6 @@
> * of the spawner, e.g. mpirun_rsh, to check that it understands
> * the version of the executable.
> */
> -#define PMGR_VERSION 5
> +#define PMGR_VERSION 6
>
> #endif
> diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c
> --- 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c 2007-05-29 03:47:10.000000000 -0400
> +++ exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c 2007-07-02 12:59:51.000000000 -0400
> @@ -171,6 +171,9 @@
> int nwritten;
> int version;
> struct sockaddr_in sockaddr;
> +
> + if(phase != 0) return;
> +
> /*
> * Exchange information with the mpirun program. Send it our
> * socket address, get back addresses for our siblings.
> @@ -208,14 +211,12 @@
> */
>
> version = PMGR_VERSION;
> - if (0 == phase) {
> - /* first, send a version number */
> - nwritten = write(mpirun_socket, &version, sizeof(version));
> - if (nwritten != sizeof(version)) {
> - sleep(2);
> - perror("write");
> - exit(1);
> - }
> + /* first, send a version number */
> + nwritten = write(mpirun_socket, &version, sizeof(version));
> + if (nwritten != sizeof(version)) {
> + sleep(2);
> + perror("write");
> + exit(1);
> }
>
> /* next, send our rank */
> @@ -264,7 +265,6 @@
> tot_nread = tot_nread + nread;
> }
> fflush(stdout);
> - close(mpirun_socket);
> return 1;
> }
>
> @@ -280,7 +280,6 @@
> pid_t *ppids = (pid_t *)pallpids;
> pid_t *allpids = NULL;
>
> - pmgr_init_connection(1);
> /* next, send size of addr */
> nwritten = write(mpirun_socket, &addrlen, sizeof(addrlen));
> if (nwritten != sizeof(addrlen)) {
> @@ -314,7 +313,7 @@
> exit(1);
> }
>
> - /* next, send size of addr */
> + /* next, send size of pid */
> nwritten = write(mpirun_socket, &pidlen, sizeof(pidlen));
> if (nwritten != sizeof(mypid_len)) {
> sleep(2);
> @@ -322,6 +321,7 @@
> exit(1);
> }
>
> + /* next, send our pid */
> if (pidlen != 0) {
> nwritten = write(mpirun_socket, &my_pid_int, (size_t) pidlen);
> if (nwritten != pidlen) {
> @@ -345,7 +345,7 @@
>
> if (pidlen != 0) {
> tot_nread=0;
> - /* finally, read addresses from all processes */
> + /* finally, read pids from all processes */
> while (tot_nread < pmgr_nprocs*pidlen) {
> nread = read(mpirun_socket, (void*)((char *)allpids+tot_nread),
> (size_t) ((pmgr_nprocs*pidlen)-tot_nread));
--
***********************************
>> Mark J. Potts, PhD
>>
>> HPC Applications Inc.
>> phone: 410-992-8360 Bus
>> 410-313-9318 Home
>> 443-418-4375 Cell
>> email: potts at hpcapplications.com
>> potts at excray.com
***********************************
More information about the mvapich-discuss
mailing list