[mvapich-discuss] mvapich jobs cleanup
Sayantan Sur
surs at cse.ohio-state.edu
Sat Jul 14 14:09:45 EDT 2007
Hi Mark,
Thanks for trying out the patch. I'm wondering if you plugged in the
patch and recompiled everything (MPI libraries, and benchmarks)?
Thanks,
Sayantan.
Mark Potts wrote:
> Sayantan,
> No luck yet with that mpirun_rsh patch.
> I'm getting:
> "mpirun: executable version 5 does not match our version 6"
> "Killing remote processes...DONE"
> as soon as I attempt to start up the new mpirun_rsh.
>
> I can see that the patch changes the version number but can't yet
> understand to whom "executable version" and "our" refer.
> Hints? Directions?
>
> regards,
>
> Sayantan Sur wrote:
>> Hi Mark,
>>
>> We have a patch to solve this stray process issue with MVAPICH-0.9.9.
>> I'm attaching the patch with this email. To apply the patch please
>> follow these steps:
>>
>> $ cd mvapich-0.9.9
>> $ #save mpirun_rsh_patch to this directory
>> $ patch -p1 < mpirun_rsh_patch
>>
>> Could you please let us know if this patch solves the problem for you?
>>
>> Thanks,
>> Sayantan.
>>
>>
>> Mark Potts wrote:
>>> Hi,
>>> We are observing a number of cases in which MVAPICH-0.9.9
>>> jobs launched with mpirun_rsh leave stray processes on some
>>> nodes when the job terminates abnormally. Those stray
>>> processes continue to run forever and require recognition
>>> and killing.
>>>
>>> Is there a reason this happens with MVAPICH, and is there a
>>> way to prevent it. This doesn't seem to be the behavior
>>> that occurs for abnormally terminated Voltaire MPI or Intel
>>> MPI jobs.
>>> regards,
>>
>>
>>
>> ------------------------------------------------------------------------
>>
>> diff -ruN 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c
>> exp1/mpid/ch_gen2/process/mpirun_rsh.c
>> --- 0.9.9/mpid/ch_gen2/process/mpirun_rsh.c 2007-05-29
>> 03:47:10.000000000 -0400
>> +++ exp1/mpid/ch_gen2/process/mpirun_rsh.c 2007-07-09
>> 11:56:32.000000000 -0400
>> @@ -59,6 +59,7 @@
>> #define _GNU_SOURCE
>> #include <getopt.h>
>> #include <stdlib.h>
>> +#include <stddef.h>
>> #include <stdio.h>
>> #include <signal.h>
>> #include <unistd.h>
>> @@ -91,20 +92,34 @@
>> typedef struct {
>> char *hostname;
>> char *device;
>> - int pid;
>> + pid_t pid;
>> + pid_t remote_pid;
>> int port;
>> int control_socket;
>> process_state state;
>> } process;
>>
>> +typedef struct {
>> + const char * hostname;
>> + pid_t * pids;
>> + size_t npids, npids_allocated;
>> +} process_group;
>> +
>> +typedef struct {
>> + process_group * data;
>> + process_group ** index;
>> + size_t npgs, npgs_allocated;
>> +} process_groups;
>> +
>> #define RUNNING(i) ((plist[i].state == P_STARTED || \
>> plist[i].state == P_CONNECTED || \
>> plist[i].state == P_RUNNING) ? 1 : 0)
>>
>> /* other information: a.out and rank are implicit. */
>>
>> -process *plist;
>> -int nprocs;
>> +process_groups * pglist = NULL;
>> +process * plist = NULL;
>> +int nprocs = 0;
>> int aout_index, port;
>> #define MAX_WD_LEN 256
>> char wd[MAX_WD_LEN]; /* working directory of current
>> process */
>> @@ -112,11 +127,19 @@
>> char mpirun_host[MAX_HOST_LEN]; /* hostname of current process */
>> /* xxx need to add checking for string overflow, do this more
>> carefully ... */
>>
>> +/*
>> + * Message notifying user of what timed out
>> + */
>> +static const char * alarm_msg = NULL;
>>
>> #define COMMAND_LEN 2000
>> #define SEPARATOR ':'
>>
>> -
>> +void free_memory(void);
>> +void pglist_print(void);
>> +void pglist_insert(const char * const, const pid_t const);
>> +void rkill_fast(void);
>> +void rkill_linear(void);
>> void cleanup_handler(int);
>> void nostop_handler(int);
>> void alarm_handler(int);
>> @@ -239,15 +262,19 @@
>> int hostname_len = 0;
>> totalview_cmd[199] = 0;
>> display[0]='\0';
>> - + pidglen = sizeof(pid_t); +
>> /* mpirun [-debug] [-xterm] -np N [-hostfile hfile | h1 h2 h3
>> ... hN] a.out [args] */
>>
>> + atexit(free_memory);
>> +
>> do {
>> c = getopt_long_only(argc, argv, "+", option_table,
>> &option_index);
>> switch (c) {
>> case '?':
>> case ':':
>> usage();
>> + exit(EXIT_FAILURE);
>> break;
>> case EOF:
>> break;
>> @@ -255,8 +282,10 @@
>> switch (option_index) {
>> case 0:
>> nprocs = atoi(optarg);
>> - if (nprocs < 1)
>> + if (nprocs < 1) {
>> usage();
>> + exit(EXIT_FAILURE);
>> + }
>> break;
>> case 1:
>> debug_on = 1;
>> @@ -290,11 +319,11 @@
>> case 8:
>> show_version();
>> usage();
>> - exit(0);
>> + exit(EXIT_SUCCESS);
>> break;
>> case 9:
>> show_version();
>> - exit(0);
>> + exit(EXIT_SUCCESS);
>> break;
>> case 10:
>> use_totalview = 1;
>> @@ -311,17 +340,19 @@
>> break;
>> case 11:
>> usage();
>> - exit(0);
>> + exit(EXIT_SUCCESS);
>> break;
>> default:
>> fprintf(stderr, "Unknown option\n");
>> usage();
>> + exit(EXIT_FAILURE);
>> break;
>> }
>> break;
>> default:
>> fprintf(stderr, "Unreachable statement!\n");
>> usage();
>> + exit(EXIT_FAILURE);
>> break;
>> }
>> } while (c != EOF);
>> @@ -332,7 +363,7 @@
>> fprintf(stderr, "Without hostfile option, hostnames must
>> be "
>> "specified on command line.\n");
>> usage();
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>> aout_index = nprocs + optind;
>> } else {
>> @@ -361,13 +392,14 @@
>> plist = malloc(nprocs * sizeof(process));
>> if (plist == NULL) {
>> perror("malloc");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>>
>> for (i = 0; i < nprocs; i++) {
>> plist[i].state = P_NOTSTARTED;
>> plist[i].device = NULL;
>> plist[i].port = -1;
>> + plist[i].remote_pid = 0;
>> }
>>
>> /* grab hosts from command line or file */
>> @@ -376,7 +408,7 @@
>> hostname_len = read_hostfile(hostfile);
>> } else {
>> for (i = 0; i < nprocs; i++) {
>> - plist[i].hostname = argv[optind + i];
>> + plist[i].hostname = (char *)strndup(argv[optind + i], 100);
>> hostname_len = hostname_len > strlen(plist[i].hostname) ?
>> hostname_len : strlen(plist[i].hostname); }
>> @@ -388,7 +420,7 @@
>> if (!mpirun_processes) {
>> perror("malloc");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> } else { memset(mpirun_processes, 0, nprocs *
>> (hostname_len + 4));
>> }
>> @@ -412,18 +444,18 @@
>> s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
>> if (s < 0) {
>> perror("socket");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>> sockaddr.sin_addr.s_addr = INADDR_ANY;
>> sockaddr.sin_port = 0;
>> if (bind(s, (struct sockaddr *) &sockaddr, sockaddr_len) < 0) {
>> perror("bind");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>>
>> if (getsockname(s, (struct sockaddr *) &sockaddr, &sockaddr_len)
>> < 0) {
>> perror("getsockname");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>>
>> port = (int) ntohs(sockaddr.sin_port);
>> @@ -431,14 +463,31 @@
>>
>>
>> if (!show_on) {
>> - signal(SIGHUP, cleanup_handler);
>> - signal(SIGINT, cleanup_handler);
>> - signal(SIGTSTP, nostop_handler);
>> - signal(SIGCHLD, child_handler);
>> - signal(SIGALRM, alarm_handler);
>> + struct sigaction signal_handler;
>> + signal_handler.sa_handler = cleanup_handler;
>> + sigfillset(&signal_handler.sa_mask);
>> + signal_handler.sa_flags = 0;
>> +
>> + sigaction(SIGHUP, &signal_handler, NULL);
>> + sigaction(SIGINT, &signal_handler, NULL);
>> + sigaction(SIGTERM, &signal_handler, NULL);
>> +
>> + signal_handler.sa_handler = nostop_handler;
>> +
>> + sigaction(SIGTSTP, &signal_handler, NULL);
>> +
>> + signal_handler.sa_handler = alarm_handler;
>> +
>> + sigaction(SIGALRM, &signal_handler, NULL);
>> +
>> + signal_handler.sa_handler = child_handler;
>> + sigemptyset(&signal_handler.sa_mask);
>> +
>> + sigaction(SIGCHLD, &signal_handler, NULL);
>> }
>>
>> alarm(1000);
>> + alarm_msg = "Timeout during client startup.\n";
>> /* long timeout for testing, where process may be stopped in
>> debugger */
>>
>> #ifdef USE_DDD
>> @@ -511,7 +560,7 @@
>> }
>>
>> if (show_on)
>> - exit(0);
>> + exit(EXIT_SUCCESS);
>> /*Hostid exchange start */
>> /* accept incoming connections, read port numbers */
>> @@ -522,6 +571,9 @@
>> ACCEPT_HID:
>> sockaddr_len = sizeof(sockaddr);
>> s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
>> +
>> + alarm_msg = "Timeout during hostid exchange.\n";
>> +
>> if (s1 < 0) {
>> if (errno == EINTR)
>> goto ACCEPT_HID;
>> @@ -592,7 +644,7 @@
>> hostids = (int *) malloc(hostidlen * nprocs);
>> if (hostids == NULL) {
>> perror("malloc");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>> }
>>
>> @@ -626,66 +678,33 @@
>> }
>> }
>>
>> - /* close all opend sockets */
>> - for (i = 0; i < nprocs; i++) {
>> - close(plist[i].control_socket);
>> - }
>> -
>> alarm(1000); - /* let enbale the timer again*/
>> + alarm_msg = "Timeout during address exchange.\n";
>> + /* lets enable the timer again*/
>>
>> /* Lets read all other information, LID QP,etc..*/
>>
>> /* accept incoming connections, read port numbers */
>> for (i = 0; i < nprocs; i++) {
>> - int version, rank, nread;
>> - char pidstr[12];
>> -ACCEPT:
>> - sockaddr_len = sizeof(sockaddr);
>> - s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
>> - if (s1 < 0) {
>> - if (errno == EINTR)
>> - goto ACCEPT;
>> - perror("accept");
>> - cleanup();
>> - }
>> + int nread;
>>
>> /* * protocol: - * We don't need version
>> number,
>> - * 0. read rank of process
>> - * 1. read address length
>> - * 2. read address itself
>> - * 3. send array of all addresses
>> + * We don't need the version number or the rank,
>> + * 0. read address length
>> + * 1. read address itself
>> + * 2. send array of all addresses
>> */
>>
>> - /* 0. Find out who we're talking to */
>> - nread = read(s1, &rank, sizeof(rank));
>> - if (nread != sizeof(rank)) {
>> - perror("read");
>> - cleanup();
>> - }
>> -
>> - if (rank < 0 || rank >= nprocs - ||
>> plist[rank].state != P_STARTED) {
>> - fprintf(stderr, "mpirun: invalid rank received. \n");
>> - cleanup();
>> - }
>> -
>> - plist[rank].control_socket = s1;
>> - plist[rank].state = P_CONNECTED;
>> + plist[i].state = P_CONNECTED;
>>
>> /* Let us know connection was established
>> * printf("MPIRUN_RSH: Process rank %d connected\n",rank);
>> */
>>
>> /* 1. Find out length of the data */
>> - nread = read(s1, &addrlen, sizeof(addrlen));
>> + nread = read(plist[i].control_socket, &addrlen,
>> sizeof(addrlen));
>> if (nread != sizeof(addrlen)) {
>> - /* nread == 0 is not actually an error! */
>> - if (nread == 0) - continue;
>> -
>> perror("read");
>> cleanup();
>> }
>> @@ -707,21 +726,20 @@
>> alladdrs = (int *) malloc(addrlen * nprocs);
>> if (alladdrs == NULL) {
>> perror("malloc");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>> }
>>
>> /* 2. Read info from each process */
>>
>> /* for byte location */
>> - alladdrs_char = (char *) &alladdrs[rank * addrlen /
>> sizeof(int)];
>> + alladdrs_char = (char *) &alladdrs[i * addrlen / sizeof(int)];
>>
>> tot_nread = 0;
>>
>> while (tot_nread < addrlen) {
>> - nread =
>> - read(s1, (void *) (alladdrs_char + tot_nread),
>> - addrlen - tot_nread);
>> + nread = read(plist[i].control_socket,
>> + (void *) (alladdrs_char + tot_nread), addrlen - tot_nread);
>>
>> if (nread < 0) {
>> perror("read");
>> @@ -733,36 +751,32 @@
>>
>> read_pid:
>> /* 3. Find out length of the data */
>> - nread = read(s1, &pidlen, sizeof(pidlen));
>> + nread = read(plist[i].control_socket, &pidlen, sizeof(pidlen));
>> if (nread != sizeof(pidlen)) {
>> perror("read");
>> cleanup();
>> }
>>
>> /*fprintf(stderr, "read Pid lengths %d and %d \n", pidlen,
>> nread);*/
>> - if (i == 0) {
>> - pidglen = pidlen;
>> - } else {
>> - if (pidlen != pidglen) {
>> - fprintf(stderr, "Pid lengths %d and %d do not
>> match\n", - pidlen, pidglen);
>> - cleanup();
>> - }
>> - }
>> + if (pidlen != pidglen) {
>> + fprintf(stderr, "Pid lengths %d and %d do not match\n",
>> + pidlen, pidglen);
>> + cleanup();
>> + }
>>
>> if (i == 0) {
>> - /* allocate as soon as we know the address length */
>> + /* allocate as soon as we know the pid length */
>> allpids = (char *)malloc(pidlen * nprocs);
>> if (allpids == NULL) {
>> perror("malloc");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>> }
>>
>> tot_nread=0;
>> while(tot_nread < pidlen) {
>> - nread = read(s1, (void*)(allpids+rank*pidlen+tot_nread),
>> - pidlen - tot_nread);
>> + nread = read(plist[i].control_socket,
>> + (void*)(allpids+i*pidlen+tot_nread), pidlen - tot_nread);
>> /*fprintf(stderr, "read length %d \n", nread);*/
>> if(nread < 0) {
>> perror("read");
>> @@ -770,6 +784,9 @@
>> }
>> tot_nread += nread;
>> }
>> +
>> + plist[i].remote_pid = *((pid_t *)(allpids+i*pidlen));
>> + pglist_insert(plist[i].hostname, plist[i].remote_pid);
>> }
>>
>>
>> @@ -795,7 +812,7 @@
>> out_addrs = (int *) malloc(out_addrs_len);
>> if (out_addrs == NULL) {
>> perror("malloc");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>>
>> for (i = 0; i < nprocs; i++) {
>> @@ -876,8 +893,7 @@
>> sleep(100);
>> }
>> close(s);
>> - exit(0);
>> -
>> + exit(EXIT_SUCCESS);
>> }
>>
>> int start_process(int i, char *command_name, char *env)
>> @@ -925,12 +941,12 @@
>> if ((remote_command = malloc(str_len)) == NULL) {
>> fprintf(stderr, "Failed to malloc %d bytes for
>> remote_command\n",
>> str_len);
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>> if ((xterm_command = malloc(str_len)) == NULL) {
>> fprintf(stderr, "Failed to malloc %d bytes for
>> xterm_command\n",
>> str_len);
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>>
>>
>> @@ -1010,7 +1026,7 @@
>> if (!show_on) {
>> perror("RSH/SSH command failed!");
>> }
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>>
>> free(remote_command);
>> @@ -1189,8 +1205,6 @@
>> fprintf(stderr, "\ta.out => " "name of MPI binary\n");
>> fprintf(stderr, "\targs => " "arguments for MPI binary\n");
>> fprintf(stderr, "\n");
>> -
>> - exit(1);
>> }
>>
>> /* finds first non-whitespace char in input string */
>> @@ -1221,7 +1235,7 @@
>> if (hf == NULL) {
>> fprintf(stderr, "Can't open hostfile %s\n", hostfile_name);
>> perror("open");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>> for (i = 0; i < nprocs; i++) {
>> @@ -1287,7 +1301,7 @@
>> } else {
>> fprintf(stderr, "End of file reached on "
>> "hostfile at %d of %d hostnames\n", i, nprocs);
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>> }
>> fclose(hf);
>> @@ -1321,14 +1335,14 @@
>> if ((pf = fopen(paramfile, "r")) == NULL) {
>> sprintf(errstr, "Cant open paramfile = %s", paramfile);
>> perror(errstr);
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>>
>> if ( strlen(env) == 0 ){
>> /* Allocating space for env first time */
>> if ((env = malloc(ENV_LEN)) == NULL) {
>> fprintf(stderr, "Malloc of env failed in
>> read_param_file\n");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>> env_left = ENV_LEN - 1;
>> }else{
>> @@ -1367,7 +1381,7 @@
>> (ENV_LEN > e_len + 1 ? ENV_LEN : e_len + 1) +
>> strlen(env);
>> if ((env = realloc(env, newlen)) == NULL) {
>> fprintf(stderr, "realloc failed in read_param_file\n");
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> }
>> if (param_debug) {
>> printf("realloc to %d\n", newlen);
>> @@ -1395,15 +1409,213 @@
>> }
>> cleanup();
>>
>> - exit(1);
>> + exit(EXIT_FAILURE);
>> +}
>> +
>> +void pglist_print(void) {
>> + if(pglist) {
>> + int i, j;
>> + size_t npids = 0, npids_allocated = 0;
>> +
>> + fprintf(stderr, "\n--pglist--\ndata:\n");
>> + for(i = 0; i < pglist->npgs; i++) {
>> + fprintf(stderr, "%p - %s:", &pglist->data[i],
>> + pglist->data[i].hostname);
>> +
>> + for(j = 0; j < pglist->data[i].npids; j++) {
>> + fprintf(stderr, " %d", pglist->data[i].pids[j]);
>> + }
>> +
>> + fprintf(stderr, "\n");
>> + npids += pglist->data[i].npids;
>> + npids_allocated += pglist->data[i].npids_allocated;
>> + }
>> +
>> + fprintf(stderr, "\nindex:");
>> + for(i = 0; i < pglist->npgs; i++) {
>> + fprintf(stderr, " %p", pglist->index[i]);
>> + }
>> +
>> + fprintf(stderr, "\nnpgs/allocated: %d/%d (%d%%)\n", pglist->npgs,
>> + pglist->npgs_allocated, (int)(pglist->npgs_allocated ? 100. *
>> + pglist->npgs / pglist->npgs_allocated : 100.));
>> + fprintf(stderr, "npids/allocated: %d/%d (%d%%)\n", npids,
>> + npids_allocated, (int)(npids_allocated ? 100. * npids /
>> + npids_allocated : 100.));
>> + fprintf(stderr, "--pglist--\n\n");
>> + }
>> +}
>> +
>> +void pglist_insert(const char * const hostname, const pid_t const
>> pid) {
>> + const size_t increment = nprocs > 4 ? nprocs / 4 : 1;
>> + size_t index = 0, bottom = 0, top;
>> + static size_t alloc_error = 0;
>> + int i, strcmp_result;
>> + process_group * pg;
>> + void * backup_ptr;
>> +
>> + if(alloc_error) return;
>> + if(pglist == NULL) goto init_pglist;
>> +
>> + top = pglist->npgs - 1;
>> + index = (top + bottom) / 2;
>> +
>> + while(strcmp_result = strcmp(hostname,
>> pglist->index[index]->hostname)) {
>> + if(bottom >= top) break;
>> +
>> + if(strcmp_result > 0) {
>> + bottom = index + 1;
>> + }
>> +
>> + else {
>> + top = index - 1;
>> + }
>> +
>> + index = (top + bottom) / 2;
>> + }
>> +
>> + if(!strcmp_result) goto insert_pid;
>> + if(strcmp_result > 0) index++;
>> +
>> + goto add_process_group;
>> +
>> +init_pglist:
>> + pglist = malloc(sizeof(process_groups));
>> +
>> + if(pglist) {
>> + pglist->data = NULL;
>> + pglist->index = NULL;
>> + pglist->npgs = 0;
>> + pglist->npgs_allocated = 0;
>> + }
>> +
>> + else {
>> + goto register_alloc_error;
>> + }
>> +
>> +add_process_group:
>> + if(pglist->npgs == pglist->npgs_allocated) {
>> + process_group * pglist_data_backup = pglist->data;
>> + process_group ** pglist_index_backup = pglist->index;
>> + ptrdiff_t offset;
>> +
>> + pglist->npgs_allocated += increment;
>> +
>> + backup_ptr = pglist->data;
>> + pglist->data = realloc(pglist->data, sizeof(process_group) *
>> + pglist->npgs_allocated);
>> +
>> + if(pglist->data == NULL) {
>> + pglist->data = backup_ptr;
>> + goto register_alloc_error;
>> + }
>> +
>> + backup_ptr = pglist->index;
>> + pglist->index = realloc(pglist->index, sizeof(process_group *) *
>> + pglist->npgs_allocated);
>> +
>> + if(pglist->index == NULL) {
>> + pglist->index = backup_ptr;
>> + goto register_alloc_error;
>> + }
>> +
>> + if(offset = (size_t)pglist->data - (size_t)pglist_data_backup) {
>> + for(i = 0; i < pglist->npgs; i++) {
>> + pglist->index[i] = (process_group *)((size_t)pglist->index[i] +
>> + offset);
>> + }
>> + }
>> + }
>> +
>> + for(i = pglist->npgs; i > index; i--) {
>> + pglist->index[i] = pglist->index[i-1];
>> + }
>> +
>> + pglist->data[pglist->npgs].hostname = hostname;
>> + pglist->data[pglist->npgs].pids = NULL;
>> + pglist->data[pglist->npgs].npids = 0;
>> + pglist->data[pglist->npgs].npids_allocated = 0;
>> +
>> + pglist->index[index] = &pglist->data[pglist->npgs++];
>> +
>> +insert_pid:
>> + pg = pglist->index[index];
>> +
>> + if(pg->npids == pg->npids_allocated) {
>> + if(pg->npids_allocated) {
>> + pg->npids_allocated <<= 1;
>> +
>> + if(pg->npids_allocated < pg->npids) pg->npids_allocated =
>> SIZE_MAX;
>> + if(pg->npids_allocated > nprocs) pg->npids_allocated = nprocs;
>> + }
>> +
>> + else {
>> + pg->npids_allocated = 1;
>> + }
>> +
>> + backup_ptr = pg->pids;
>> + pg->pids = realloc(pg->pids, pg->npids_allocated * sizeof(pid_t));
>> +
>> + if(pg->pids == NULL) {
>> + pg->pids = backup_ptr;
>> + goto register_alloc_error;
>> + }
>> + }
>> +
>> + pg->pids[pg->npids++] = pid;
>> +
>> + return;
>> +
>> +register_alloc_error:
>> + if(pglist) {
>> + if(pglist->data) {
>> + process_group * pg = pglist->data;
>> +
>> + while(pglist->npgs--) {
>> + if(pg->pids) free((pg++)->pids);
>> + }
>> +
>> + free(pglist->data);
>> + }
>> +
>> + if(pglist->index) free(pglist->index);
>> +
>> + free(pglist);
>> + }
>> +
>> + alloc_error = 1;
>> +}
>> +
>> +void free_memory(void) {
>> + if(pglist) {
>> + if(pglist->data) {
>> + process_group * pg = pglist->data;
>> +
>> + while(pglist->npgs--) {
>> + if(pg->pids) free((pg++)->pids);
>> + }
>> +
>> + free(pglist->data);
>> + }
>> +
>> + if(pglist->index) free(pglist->index);
>> +
>> + free(pglist);
>> + }
>> +
>> + if(plist) {
>> + while(nprocs--) {
>> + if(plist[nprocs].device) free(plist[nprocs].device);
>> + if(plist[nprocs].hostname) free(plist[nprocs].hostname);
>> + }
>> +
>> + free(plist);
>> + }
>> }
>>
>> void cleanup(void)
>> {
>> int i;
>> - /* could walk through list of processes, but it looks
>> - like we can just send the signal to the process group
>> - */
>>
>> if (use_totalview) {
>> fprintf(stderr, "Cleaning up all processes ...");
>> @@ -1417,36 +1629,180 @@
>> }
>>
>> for (i = 0; i < nprocs; i++) {
>> - if (RUNNING(i)) {
>> - /* send terminal interrupt, which will hopefully
>> - propagate to the other side. (not sure what xterm will
>> - do here.
>> - */
>> - kill(plist[i].pid, SIGINT);
>> - }
>> + if (RUNNING(i)) {
>> + /* send terminal interrupt, which will hopefully +
>> propagate to the other side. (not sure what xterm will
>> + do here.
>> + */
>> + kill(plist[i].pid, SIGINT);
>> + }
>> }
>> +
>> sleep(1);
>>
>> for (i = 0; i < nprocs; i++) {
>> - if (plist[i].state != P_NOTSTARTED) {
>> - /* send regular interrupt to rsh */
>> - kill(plist[i].pid, SIGTERM);
>> - }
>> + if (plist[i].state != P_NOTSTARTED) {
>> + /* send regular interrupt to rsh */
>> + kill(plist[i].pid, SIGTERM);
>> + }
>> }
>>
>> sleep(1);
>>
>> for (i = 0; i < nprocs; i++) {
>> - if (plist[i].state != P_NOTSTARTED) {
>> - /* Kill the processes */
>> - kill(plist[i].pid, SIGKILL);
>> - }
>> + if (plist[i].state != P_NOTSTARTED) {
>> + /* Kill the processes */
>> + kill(plist[i].pid, SIGKILL);
>> + }
>> + }
>> +
>> + if(pglist) {
>> + rkill_fast();
>> + }
>> +
>> + else {
>> + rkill_linear();
>> + }
>> +
>> + exit(EXIT_FAILURE);
>> +}
>> +
>> +void rkill_fast(void) {
>> + int i, j, tryagain, spawned_pid[pglist->npgs];
>> +
>> + fprintf(stderr, "Killing remote processes...");
>> +
>> + for(i = 0; i < pglist->npgs; i++) {
>> + if(0 == (spawned_pid[i] = fork())) {
>> + if(pglist->index[i]->npids) {
>> + const size_t bufsize = 40 + 10 * pglist->index[i]->npids;
>> + const process_group * pg = pglist->index[i];
>> + char kill_cmd[bufsize], tmp[10];
>> +
>> + kill_cmd[0] = '\0';
>> + strcat(kill_cmd, "kill -s SIGKILL");
>> +
>> + for(j = 0; j < pg->npids; j++) {
>> + snprintf(tmp, 10, " %d", pg->pids[j]);
>> + strcat(kill_cmd, tmp);
>> + }
>> +
>> + strcat(kill_cmd, " >&/dev/null");
>> +
>> + if(use_rsh) {
>> + execl(RSH_CMD, RSH_CMD, pg->hostname, kill_cmd, NULL);
>> + }
>> +
>> + else {
>> + execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x", pg->hostname,
>> + kill_cmd, NULL);
>> + }
>> +
>> + perror(NULL);
>> + exit(EXIT_FAILURE);
>> + }
>> +
>> + else {
>> + exit(EXIT_SUCCESS);
>> + }
>> + }
>> + }
>> +
>> + while(1) {
>> + static int iteration = 0;
>> + tryagain = 0;
>> +
>> + sleep(1 << iteration);
>> +
>> + for (i = 0; i < pglist->npgs; i++) {
>> + if(spawned_pid[i]) {
>> + if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL,
>> WNOHANG))) {
>> + tryagain = 1;
>> + }
>> + }
>> + }
>> +
>> + if(++iteration == 5 || !tryagain) {
>> + fprintf(stderr, "DONE\n");
>> + break;
>> + }
>> + }
>> +
>> + if(tryagain) {
>> + fprintf(stderr, "The following processes may have not been
>> killed:\n");
>> + for (i = 0; i < pglist->npgs; i++) {
>> + if(spawned_pid[i]) {
>> + const process_group * pg = pglist->index[i];
>> +
>> + fprintf(stderr, "%s:", pg->hostname);
>> +
>> + for (j = 0; j < pg->npids; j++) {
>> + fprintf(stderr, " %d", pg->pids[j]);
>> + }
>> +
>> + fprintf(stderr, "\n");
>> + }
>> + }
>> + }
>> +}
>> +
>> +void rkill_linear(void) {
>> + int i, j, tryagain, spawned_pid[nprocs];
>> +
>> + fprintf(stderr, "Killing remote processes...");
>> +
>> + for (i = 0; i < nprocs; i++) {
>> + if(0 == (spawned_pid[i] = fork())) {
>> + char kill_cmd[80];
>> +
>> + if(!plist[i].remote_pid) exit(EXIT_SUCCESS);
>> +
>> + snprintf(kill_cmd, 80, "kill -s SIGKILL %d >&/dev/null",
>> + plist[i].remote_pid);
>> +
>> + if(use_rsh) {
>> + execl(RSH_CMD, RSH_CMD, plist[i].hostname, kill_cmd, NULL);
>> + }
>> +
>> + else {
>> + execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x",
>> + plist[i].hostname, kill_cmd, NULL);
>> + }
>> +
>> + perror(NULL);
>> + exit(EXIT_FAILURE);
>> + }
>> }
>>
>> - fprintf(stderr, "done.\n");
>> + while(1) {
>> + static int iteration = 0;
>> + tryagain = 0;
>> +
>> + sleep(1 << iteration);
>> +
>> + for (i = 0; i < nprocs; i++) {
>> + if(spawned_pid[i]) {
>> + if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL,
>> WNOHANG))) {
>> + tryagain = 1;
>> + }
>> + }
>> + }
>>
>> - exit(1);
>> + if(++iteration == 5 || !tryagain) {
>> + fprintf(stderr, "DONE\n");
>> + break;
>> + }
>> + }
>>
>> + if(tryagain) {
>> + fprintf(stderr, "The following processes may have not been
>> killed:\n");
>> + for (i = 0; i < nprocs; i++) {
>> + if(spawned_pid[i]) {
>> + fprintf(stderr, "%s [%d]\n", plist[i].hostname,
>> + plist[i].remote_pid);
>> + }
>> + }
>> + }
>> }
>>
>>
>> @@ -1457,9 +1813,13 @@
>>
>> void alarm_handler(int signal)
>> {
>> + extern const char * alarm_msg;
>> +
>> if (use_totalview) {
>> fprintf(stderr, "Timeout alarm signaled\n");
>> }
>> +
>> + if(alarm_msg) fprintf(stderr, alarm_msg);
>> cleanup();
>> }
>>
>> @@ -1467,19 +1827,21 @@
>> void child_handler(int signal)
>> {
>> int status, i, child, pid;
>> - int exitstatus = 0;
>> + int exitstatus = EXIT_SUCCESS;
>>
>> if (use_totalview) {
>> fprintf(stderr, "mpirun: child died. Waiting for others.\n");
>> }
>> alarm(10);
>> + alarm_msg = "Child died. Timeout while waiting for others.\n";
>> +
>> for (i = 0; i < nprocs; i++) {
>> pid = wait(&status);
>> if (pid == -1) {
>> perror("wait");
>> - exitstatus = 1;
>> + exitstatus = EXIT_FAILURE;
>> } else if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
>> - exitstatus = 1;
>> + exitstatus = EXIT_FAILURE;
>> }
>> for (child = 0; child < nprocs; child++) {
>> if (plist[child].pid == pid) {
>> @@ -1489,9 +1851,11 @@
>> }
>> if (child == nprocs) {
>> fprintf(stderr, "Unable to find child %d!\n", pid);
>> - exitstatus = 1;
>> + exitstatus = EXIT_FAILURE;
>> }
>> }
>> alarm(0);
>> exit(exitstatus);
>> }
>> +
>> +/* vi:set sw=4 sts=4 tw=80: */
>> diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client.h
>> exp1/mpid/ch_gen2/process/pmgr_client.h
>> --- 0.9.9/mpid/ch_gen2/process/pmgr_client.h 2007-05-29
>> 03:47:10.000000000 -0400
>> +++ exp1/mpid/ch_gen2/process/pmgr_client.h 2007-07-02
>> 12:59:51.000000000 -0400
>> @@ -108,6 +108,6 @@
>> * of the spawner, e.g. mpirun_rsh, to check that it understands
>> * the version of the executable.
>> */
>> -#define PMGR_VERSION 5
>> +#define PMGR_VERSION 6
>>
>> #endif
>> diff -ruN 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c
>> exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c
>> --- 0.9.9/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c 2007-05-29
>> 03:47:10.000000000 -0400
>> +++ exp1/mpid/ch_gen2/process/pmgr_client_mpirun_rsh.c 2007-07-02
>> 12:59:51.000000000 -0400
>> @@ -171,6 +171,9 @@
>> int nwritten;
>> int version;
>> struct sockaddr_in sockaddr;
>> +
>> + if(phase != 0) return;
>> +
>> /*
>> * Exchange information with the mpirun program. Send it our
>> * socket address, get back addresses for our siblings.
>> @@ -208,14 +211,12 @@
>> */
>>
>> version = PMGR_VERSION;
>> - if (0 == phase) {
>> - /* first, send a version number */
>> - nwritten = write(mpirun_socket, &version, sizeof(version));
>> - if (nwritten != sizeof(version)) {
>> - sleep(2);
>> - perror("write");
>> - exit(1);
>> - }
>> + /* first, send a version number */
>> + nwritten = write(mpirun_socket, &version, sizeof(version));
>> + if (nwritten != sizeof(version)) {
>> + sleep(2);
>> + perror("write");
>> + exit(1);
>> }
>> /* next, send our rank */
>> @@ -264,7 +265,6 @@
>> tot_nread = tot_nread + nread;
>> }
>> fflush(stdout);
>> - close(mpirun_socket);
>> return 1;
>> }
>>
>> @@ -280,7 +280,6 @@
>> pid_t *ppids = (pid_t *)pallpids;
>> pid_t *allpids = NULL;
>>
>> - pmgr_init_connection(1);
>> /* next, send size of addr */
>> nwritten = write(mpirun_socket, &addrlen, sizeof(addrlen));
>> if (nwritten != sizeof(addrlen)) {
>> @@ -314,7 +313,7 @@
>> exit(1);
>> }
>>
>> - /* next, send size of addr */
>> + /* next, send size of pid */
>> nwritten = write(mpirun_socket, &pidlen, sizeof(pidlen));
>> if (nwritten != sizeof(mypid_len)) {
>> sleep(2);
>> @@ -322,6 +321,7 @@
>> exit(1);
>> }
>>
>> + /* next, send our pid */
>> if (pidlen != 0) {
>> nwritten = write(mpirun_socket, &my_pid_int, (size_t) pidlen);
>> if (nwritten != pidlen) {
>> @@ -345,7 +345,7 @@
>>
>> if (pidlen != 0) {
>> tot_nread=0;
>> - /* finally, read addresses from all processes */
>> + /* finally, read pids from all processes */
>> while (tot_nread < pmgr_nprocs*pidlen) {
>> nread = read(mpirun_socket, (void*)((char
>> *)allpids+tot_nread), (size_t)
>> ((pmgr_nprocs*pidlen)-tot_nread));
>
--
http://www.cse.ohio-state.edu/~surs
More information about the mvapich-discuss
mailing list