[Fwd: Re: [mvapich-discuss] error closing socket at end of mpirun_rsh]
Mark Potts
potts at hpcapplications.com
Fri Oct 12 16:42:21 EDT 2007
Re-send to include mvapich-discuss.
regards,
-------- Original Message --------
Subject: Re: [mvapich-discuss] error closing socket at end of mpirun_rsh
Date: Fri, 12 Oct 2007 16:20:25 -0400
From: Mark Potts <potts at hpcapplications.com>
Reply-To: potts at hpcapplications.com
Organization: HPC Applications Inc.
To: Jonathan L. Perkins <perkinjo at cse.ohio-state.edu>
References: <470D3936.1050403 at hpcapplications.com>
<470E4CFB.8000102 at cse.ohio-state.edu>
Jonathan,
I've checked and the "Termination socket read failed: Bad file
descriptor" message is emitted even in the previous OS version.
So it appears there was an intermittent message (if not an actual
error) that our patched mpirun_rsh was generating for some time
now.
To close the loop on whether we have the correct patches correctly
installed I've attached the patched mpirun_rsh.c used to build our
installation RPM. This patched file is used together with the
MVAPICH 0.9.9 source contained in OFED 1.2 build(?) 1326 -- in case
there are any cross conflicts with other OFED MVAPICH bits.
You asked about example code to generate a failure... below is a
really simple code that fairly consistently leads to a single
error message when run with say -np 10 but much less often
with -np 2 .
#include <stdio.h>
#include <mpi.h>
int main (int argc, char **argv)
{
int rank;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
printf ("Rank=%d present and calling MPI_Finalize\n", rank);
MPI_Finalize();
printf ("Rank=%d bailing, nicely\n", rank);
return (0);
}
regards,
Jonathan L. Perkins wrote:
> Mark Potts wrote:
>> Hi,
>> Can you explain the functioning of the wait_for_errors() function
>> in .../mpid/ch_gen2/processes/mpirun_rsh.c in MVAPICH 0.9.9 and
>> what might be happening to cause even small (2 process jobs) to
>> frequently fail with the message
>> "Termination socket read failed: Bad file descriptor" .
>> I'm not clear what the socket s/s1 does and therefore how we
>> could be getting the above error message upon reading either
>> "flag" or "local_id" in this code.
>>
>> This error occurs frequently but not for every job and is
>> emitted following full, proper termination of the processes on
>> the client nodes. We are using MVAPICH 0.9.9 ch_gen2.
>>
>> Thanks.
>> regards,
>
> This function provides information about which host an abort originated
> from. You shouldn't get this error unless one of the clients (MPI
> processes) tried to open up a connection to tell mpirun_rsh about an abort.
>
> We haven't seen this issue during internal testing. Is there a
> particular base case program that you could send us that should
> reproduce the problem?
>
> Also, when did you first start experiencing this problem. Was it after
> applying one of mpirun_rsh patches that we sent you?
>
--
***********************************
>> Mark J. Potts, PhD
>>
>> HPC Applications Inc.
>> phone: 410-992-8360 Bus
>> 410-313-9318 Home
>> 443-418-4375 Cell
>> email: potts at hpcapplications.com
>> potts at excray.com
***********************************
--
***********************************
>> Mark J. Potts, PhD
>>
>> HPC Applications Inc.
>> phone: 410-992-8360 Bus
>> 410-313-9318 Home
>> 443-418-4375 Cell
>> email: potts at hpcapplications.com
>> potts at excray.com
***********************************
-------------- next part --------------
/*RAM
* Copyright (C) 1999-2001 The Regents of the University of California
* (through E.O. Lawrence Berkeley National Laboratory), subject to
* approval by the U.S. Department of Energy.
*
* Use of this software is under license. The license agreement is included
* in the file MVICH_LICENSE.TXT.
*
* Developed at Berkeley Lab as part of MVICH.
*
* Authors: Bill Saphir <wcsaphir at lbl.gov>
* Michael Welcome <mlwelcome at lbl.gov>
*/
/* Copyright (c) 2002-2007, The Ohio State University. All rights
* reserved.
*
* This file is part of the MVAPICH software package developed by the
* team members of The Ohio State University's Network-Based Computing
* Laboratory (NBCL), headed by Professor Dhabaleswar K. (DK) Panda.
*
* For detailed copyright and licensing information, please refer to the
* copyright file COPYRIGHT_MVAPICH in the top level MPICH directory.
*
*/
/*
* ==================================================================
* This file contains the source for a simple MPI process manager
* used by MVICH.
* It simply collects the arguments and execs either RSH or SSH
* to execute the processes on the remote (or local) hosts.
* Some critical information is passed to the remote processes
* through environment variables using the "env" utility.
*
* The information passed through the environment variables is:
* MPIRUN_HOST = host running this mpirun_rsh command
* MPIRUN_PORT = port number mpirun_rsh is listening on for TCP connection
* MPIRUN_RANK = numerical MPI rank of remote process
* MPIRUN_NPROCS = number of processes in application
* MPIRUN_ID = pid of the mpirun_rsh process
*
* The remote processes use this to establish TCP connections to
* this mpirun_rsh process. The TCP connections are used to exchange
* address data needed to establish the VI connections.
* The TCP connections are also used for a simple barrier syncronization
* at process termination time.
*
* MVICH allows for the specification of certain tuning parameters
* at run-time. These parameters are read by mpirun_rsh from a
* file given on the command line. Currently, these parameters are
* passed to the remote processes through environment variables, but
* they could be sent as a string over the TCP connection. It was
* thought that using environment variables might be more portable
* to other process managers.
* ==================================================================
*/
#define _GNU_SOURCE
#include <getopt.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <netdb.h>
#include <string.h>
#ifdef MAC_OSX
#include <sys/wait.h>
#else
#include <wait.h>
#endif
#include <assert.h>
#include <fcntl.h>
#include "pmgr_client.h"
#define BASE_ENV_LEN 17
typedef enum {
P_NOTSTARTED,
P_STARTED,
P_CONNECTED,
P_DISCONNECTED,
P_RUNNING,
P_FINISHED,
P_EXITED
} process_state;
typedef struct {
char *hostname;
char *device;
pid_t pid;
pid_t remote_pid;
int port;
int control_socket;
process_state state;
} process;
typedef struct {
const char * hostname;
pid_t * pids;
size_t npids, npids_allocated;
} process_group;
typedef struct {
process_group * data;
process_group ** index;
size_t npgs, npgs_allocated;
} process_groups;
#define RUNNING(i) ((plist[i].state == P_STARTED || \
plist[i].state == P_CONNECTED || \
plist[i].state == P_RUNNING) ? 1 : 0)
/* other information: a.out and rank are implicit. */
process_groups * pglist = NULL;
process * plist = NULL;
int nprocs = 0;
int aout_index, port;
#define MAX_WD_LEN 256
char wd[MAX_WD_LEN]; /* working directory of current process */
#define MAX_HOST_LEN 256
char mpirun_host[MAX_HOST_LEN]; /* hostname of current process */
/* xxx need to add checking for string overflow, do this more carefully ... */
/*
* Message notifying user of what timed out
*/
static const char * alarm_msg = NULL;
#define COMMAND_LEN 2000
#define SEPARATOR ':'
void free_memory(void);
void pglist_print(void);
void pglist_insert(const char * const, const pid_t const);
void rkill_fast(void);
void rkill_linear(void);
void cleanup_handler(int);
void nostop_handler(int);
void alarm_handler(int);
void child_handler(int);
void usage(void);
int start_process(int i, char *command_name, char *env);
void cleanup(void);
char *skip_white(char *s);
char *read_param_file(char *paramfile,char *env);
void process_termination(void);
void wait_for_errors(int s,struct sockaddr_in *sockaddr,unsigned int sockaddr_len);
int set_fds(fd_set * rfds, fd_set * efds);
static int read_hostfile(char *hostfile_name);
#define RSH_CMD "/usr/bin/rsh"
#define SSH_CMD "/usr/bin/ssh"
#ifndef PARAM_GLOBAL
#define PARAM_GLOBAL "/etc/mvapich.conf"
#endif
#ifndef LD_LIBRARY_PATH_MPI
#define LD_LIBRARY_PATH_MPI "/usr/mvapich/lib/shared"
#endif
#ifdef USE_SSH
int use_rsh = 0;
#else
int use_rsh = 1;
#endif
#define SSH_ARG "-q"
#define SH_NAME_LEN (128)
char sh_cmd[SH_NAME_LEN];
#define XTERM "/usr/X11R6/bin/xterm"
#define ENV_CMD "/usr/bin/env"
#define TOTALVIEW_CMD "/usr/totalview/bin/totalview"
static struct option option_table[] = {
{"np", required_argument, 0, 0},
{"debug", no_argument, 0, 0},
{"xterm", no_argument, 0, 0},
{"hostfile", required_argument, 0, 0},
{"paramfile", required_argument, 0, 0},
{"show", no_argument, 0, 0},
{"rsh", no_argument, 0, 0},
{"ssh", no_argument, 0, 0},
{"help", no_argument, 0, 0},
{"v", no_argument, 0, 0},
{"tv", no_argument, 0, 0},
{0, 0, 0, 0}
};
#define MVAPICH_VERSION "0.9.9"
#ifndef MVAPICH_BUILDID
#define MVAPICH_BUILDID "custom"
#endif
static void show_version(void)
{
fprintf(stderr,"OSU MVAPICH VERSION %s-SingleRail\n"
"Build-ID: %s\n", MVAPICH_VERSION, MVAPICH_BUILDID);
}
int debug_on = 0, xterm_on = 0, show_on = 0;
int param_debug = 0;
int use_totalview = 0;
char * mpirun_processes;
static char display[200];
static void get_display_str()
{
char *p;
char str[200];
if ( (p = getenv( "DISPLAY" ) ) != NULL ) {
strcpy(str, p ); /* For X11 programs */
sprintf(display,"DISPLAY=%s",str);
}
}
int main(int argc, char *argv[])
{
int i, s, s1, c, option_index;
int hostfile_on = 0;
#define HOSTFILE_LEN 256
char hostfile[HOSTFILE_LEN + 1];
int paramfile_on = 0;
#define PARAMFILE_LEN 256
char paramfile[PARAMFILE_LEN + 1];
char *param_env;
FILE *hf;
struct sockaddr_in sockaddr;
unsigned int sockaddr_len = sizeof(sockaddr);
int addrlen, global_addrlen = 0;
char *env = "\0";
int tot_nread = 0;
int *alladdrs = NULL;
char *alladdrs_char = NULL; /* for byte location */
int *out_addrs;
int out_addrs_len;
int j;
char *allpids = NULL;
int pidlen, pidglen;
char command_name[COMMAND_LEN];
char command_name_tv[COMMAND_LEN];
char totalview_cmd[200];
char *tv_env;
int hostidlen, global_hostidlen = 0;
int *hostids = NULL;
int hostname_len = 0;
totalview_cmd[199] = 0;
display[0]='\0';
pidglen = sizeof(pid_t);
/* mpirun [-debug] [-xterm] -np N [-hostfile hfile | h1 h2 h3 ... hN] a.out [args] */
atexit(free_memory);
do {
c = getopt_long_only(argc, argv, "+", option_table, &option_index);
switch (c) {
case '?':
case ':':
usage();
exit(EXIT_FAILURE);
break;
case EOF:
break;
case 0:
switch (option_index) {
case 0:
nprocs = atoi(optarg);
if (nprocs < 1) {
usage();
exit(EXIT_FAILURE);
}
break;
case 1:
debug_on = 1;
xterm_on = 1;
break;
case 2:
xterm_on = 1;
break;
case 3:
hostfile_on = 1;
strncpy(hostfile, optarg, HOSTFILE_LEN);
if (strlen(optarg) >= HOSTFILE_LEN - 1)
hostfile[HOSTFILE_LEN] = '\0';
break;
case 4:
paramfile_on = 1;
strncpy(paramfile, optarg, PARAMFILE_LEN);
if (strlen(optarg) >= PARAMFILE_LEN - 1) {
paramfile[PARAMFILE_LEN] = '\0';
}
break;
case 5:
show_on = 1;
break;
case 6:
use_rsh = 1;
break;
case 7:
use_rsh = 0;
break;
case 8:
show_version();
usage();
exit(EXIT_SUCCESS);
break;
case 9:
show_version();
exit(EXIT_SUCCESS);
break;
case 10:
use_totalview = 1;
debug_on = 1;
tv_env = getenv("TOTALVIEW");
if(tv_env != NULL) {
strcpy(totalview_cmd,tv_env);
} else {
fprintf(stderr,
"TOTALVIEW env is NULL, use default: %s\n",
TOTALVIEW_CMD);
sprintf(totalview_cmd, "%s", TOTALVIEW_CMD);
}
break;
case 11:
usage();
exit(EXIT_SUCCESS);
break;
default:
fprintf(stderr, "Unknown option\n");
usage();
exit(EXIT_FAILURE);
break;
}
break;
default:
fprintf(stderr, "Unreachable statement!\n");
usage();
exit(EXIT_FAILURE);
break;
}
} while (c != EOF);
if (!hostfile_on) {
/* get hostnames from argument list */
if (argc - optind < nprocs + 1) {
fprintf(stderr, "Without hostfile option, hostnames must be "
"specified on command line.\n");
usage();
exit(EXIT_FAILURE);
}
aout_index = nprocs + optind;
} else {
aout_index = optind;
}
/* reading default param file */
if ( 0 == (access(PARAM_GLOBAL, R_OK))) {
env=read_param_file(PARAM_GLOBAL,env);
}
/* reading file specified by user env */
if (( param_env = getenv("MVAPICH_DEF_PARAMFILE")) != NULL ){
env = read_param_file(param_env, env);
}
if (paramfile_on) {
/* construct a string of environment variable definitions from
* the entries in the paramfile. These environment variables
* will be available to the remote processes, which
* will use them to over-ride default parameter settings
*/
env = read_param_file(paramfile, env);
}
plist = malloc(nprocs * sizeof(process));
if (plist == NULL) {
perror("malloc");
exit(EXIT_FAILURE);
}
for (i = 0; i < nprocs; i++) {
plist[i].state = P_NOTSTARTED;
plist[i].device = NULL;
plist[i].port = -1;
plist[i].remote_pid = 0;
}
/* grab hosts from command line or file */
if (hostfile_on) {
hostname_len = read_hostfile(hostfile);
} else {
for (i = 0; i < nprocs; i++) {
plist[i].hostname = (char *)strndup(argv[optind + i], 100);
hostname_len = hostname_len > strlen(plist[i].hostname) ?
hostname_len : strlen(plist[i].hostname);
}
}
if(use_totalview) {
mpirun_processes = (char*) malloc(sizeof(char)
* nprocs * (hostname_len + 4));
if (!mpirun_processes) {
perror("malloc");
exit(EXIT_FAILURE);
} else {
memset(mpirun_processes, 0, nprocs * (hostname_len + 4));
}
for (i = 0; i < nprocs; ++i) {
strcat(mpirun_processes, plist[i].hostname);
strcat(mpirun_processes, ":");
}
} else {
/* If we are not using Totalview, then we
* need not do much */
mpirun_processes = (char *) malloc(sizeof(char));
mpirun_processes[0] = '\0';
}
getcwd(wd, MAX_WD_LEN);
gethostname(mpirun_host, MAX_HOST_LEN);
get_display_str();
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (s < 0) {
perror("socket");
exit(EXIT_FAILURE);
}
sockaddr.sin_addr.s_addr = INADDR_ANY;
sockaddr.sin_port = 0;
if (bind(s, (struct sockaddr *) &sockaddr, sockaddr_len) < 0) {
perror("bind");
exit(EXIT_FAILURE);
}
if (getsockname(s, (struct sockaddr *) &sockaddr, &sockaddr_len) < 0) {
perror("getsockname");
exit(EXIT_FAILURE);
}
port = (int) ntohs(sockaddr.sin_port);
listen(s, nprocs);
if (!show_on) {
struct sigaction signal_handler;
signal_handler.sa_handler = cleanup_handler;
sigfillset(&signal_handler.sa_mask);
signal_handler.sa_flags = 0;
sigaction(SIGHUP, &signal_handler, NULL);
sigaction(SIGINT, &signal_handler, NULL);
sigaction(SIGTERM, &signal_handler, NULL);
signal_handler.sa_handler = nostop_handler;
sigaction(SIGTSTP, &signal_handler, NULL);
signal_handler.sa_handler = alarm_handler;
sigaction(SIGALRM, &signal_handler, NULL);
signal_handler.sa_handler = child_handler;
sigemptyset(&signal_handler.sa_mask);
sigaction(SIGCHLD, &signal_handler, NULL);
}
alarm(1000);
alarm_msg = "Timeout during client startup.\n";
/* long timeout for testing, where process may be stopped in debugger */
#ifdef USE_DDD
#define DEBUGGER "/usr/bin/ddd"
#else
#define DEBUGGER "gdb"
#endif
if (debug_on) {
char keyval_list[COMMAND_LEN];
sprintf(keyval_list, "%s", " ");
/* Take more env variables if present */
while (strchr(argv[aout_index], '=')) {
strcat(keyval_list, argv[aout_index]);
strcat(keyval_list, " ");
aout_index ++;
}
if(use_totalview) {
sprintf(command_name_tv, "%s %s %s", keyval_list,
totalview_cmd, argv[aout_index]);
sprintf(command_name, "%s %s ", keyval_list, argv[aout_index]);
} else {
sprintf(command_name, "%s %s %s", keyval_list,
DEBUGGER, argv[aout_index]);
}
} else {
sprintf(command_name, "%s", argv[aout_index]);
}
if(use_totalview) {
/* Only needed for root */
strcat(command_name_tv, " -a ");
}
/* add the arguments */
for (i = aout_index + 1; i < argc; i++) {
strcat(command_name, " ");
strcat(command_name, argv[i]);
}
if(use_totalview) {
/* Complete the command for non-root processes */
strcat(command_name, " -mpichtv");
/* Complete the command for root process */
for (i = aout_index + 1; i < argc; i++) {
strcat(command_name_tv, " ");
strcat(command_name_tv, argv[i]);
}
strcat(command_name_tv, " -mpichtv");
}
/* start all processes */
for (i = 0; i < nprocs; i++) {
if((use_totalview) && (i == 0)) {
if (start_process(i, command_name_tv, env) < 0) {
fprintf(stderr,
"Unable to start process %d on %s. Aborting.\n",
i, plist[i].hostname);
cleanup();
}
} else {
if (start_process(i, command_name, env) < 0) {
fprintf(stderr,
"Unable to start process %d on %s. Aborting.\n",
i, plist[i].hostname);
cleanup();
}
}
}
if (show_on)
exit(EXIT_SUCCESS);
/*Hostid exchange start */
/* accept incoming connections, read port numbers */
for (i = 0; i < nprocs; i++) {
int version, rank, nread;
char pidstr[12];
ACCEPT_HID:
sockaddr_len = sizeof(sockaddr);
s1 = accept(s, (struct sockaddr *) &sockaddr, &sockaddr_len);
alarm_msg = "Timeout during hostid exchange.\n";
if (s1 < 0) {
if (errno == EINTR)
goto ACCEPT_HID;
perror("accept");
cleanup();
}
/*
* protocol:
* 0. read protocol version number
* 1. read rank of process
* 2. read hostid length
* 3. read hostid itself
* 4. send array of all addresses
*/
/* 0. Find out what version of the startup protocol the executable
* was compiled to use. */
nread = read(s1, &version, sizeof(version));
if (nread != sizeof(version)) {
perror("read");
cleanup();
}
if (version != PMGR_VERSION) {
fprintf(stderr, "mpirun: executable version %d does not match"
" our version %d.\n", version, PMGR_VERSION);
cleanup();
}
/* 1. Find out who we're talking to */
nread = read(s1, &rank, sizeof(rank));
if (nread != sizeof(rank)) {
perror("read");
cleanup();
}
if (rank < 0 || rank >= nprocs || plist[rank].state != P_STARTED) {
fprintf(stderr, "mpirun: invalid rank received. \n");
cleanup();
}
plist[rank].control_socket = s1;
/* 2. Find out length of the data */
nread = read(s1, &hostidlen, sizeof(hostidlen));
if (nread != sizeof(hostidlen)) {
/* nread == 0 is not actually an error! */
if (nread == 0)
continue;
perror("read");
cleanup();
}
if (i == 0)
global_hostidlen = hostidlen;
else if (hostidlen != global_hostidlen) {
fprintf(stderr, "Address lengths %d and %d do not match\n",
hostidlen, global_hostidlen);
cleanup();
}
if (i == 0) {
/* allocate as soon as we know the address length */
hostids = (int *) malloc(hostidlen * nprocs);
if (hostids == NULL) {
perror("malloc");
exit(EXIT_FAILURE);
}
}
/* 3. Read info from each process */
tot_nread = 0;
while(tot_nread < hostidlen) {
nread = read(s1, (void*)((&hostids[rank])+tot_nread),
hostidlen- tot_nread);
if(nread < 0) {
perror("read");
cleanup();
}
tot_nread += nread;
}
}
/* at this point, all processes have checked in hostids */
/* cancel the timeout */
alarm(0);
/* Lets write back all hostids */
for (i = 0; i < nprocs; i++) {
int nwritten;
nwritten = write(plist[i].control_socket, hostids,
nprocs * hostidlen);
if (nwritten != nprocs * hostidlen ) {
perror("write");
cleanup();
}
}
alarm(1000);
alarm_msg = "Timeout during address exchange.\n";
/* lets enable the timer again*/
/* Lets read all other information, LID QP,etc..*/
/* accept incoming connections, read port numbers */
for (i = 0; i < nprocs; i++) {
int nread;
/*
* protocol:
* We don't need the version number or the rank,
* 0. read address length
* 1. read address itself
* 2. send array of all addresses
*/
plist[i].state = P_CONNECTED;
/* Let us know connection was established
* printf("MPIRUN_RSH: Process rank %d connected\n",rank);
*/
/* 1. Find out length of the data */
nread = read(plist[i].control_socket, &addrlen, sizeof(addrlen));
if (nread != sizeof(addrlen)) {
perror("read");
cleanup();
}
if (i == 0) {
global_addrlen = addrlen;
} else if (addrlen != global_addrlen) {
fprintf(stderr, "Address lengths %d and %d do not match\n",
addrlen, global_addrlen);
cleanup();
}
if (addrlen == 0) {
goto read_pid;
}
if (i == 0) {
/* allocate as soon as we know the address length */
alladdrs = (int *) malloc(addrlen * nprocs);
if (alladdrs == NULL) {
perror("malloc");
exit(EXIT_FAILURE);
}
}
/* 2. Read info from each process */
/* for byte location */
alladdrs_char = (char *) &alladdrs[i * addrlen / sizeof(int)];
tot_nread = 0;
while (tot_nread < addrlen) {
nread = read(plist[i].control_socket,
(void *) (alladdrs_char + tot_nread), addrlen - tot_nread);
if (nread < 0) {
perror("read");
cleanup();
}
tot_nread += nread;
}
read_pid:
/* 3. Find out length of the data */
nread = read(plist[i].control_socket, &pidlen, sizeof(pidlen));
if (nread != sizeof(pidlen)) {
perror("read");
cleanup();
}
/*fprintf(stderr, "read Pid lengths %d and %d \n", pidlen, nread);*/
if (pidlen != pidglen) {
fprintf(stderr, "Pid lengths %d and %d do not match\n",
pidlen, pidglen);
cleanup();
}
if (i == 0) {
/* allocate as soon as we know the pid length */
allpids = (char *)malloc(pidlen * nprocs);
if (allpids == NULL) {
perror("malloc");
exit(EXIT_FAILURE);
}
}
tot_nread=0;
while(tot_nread < pidlen) {
nread = read(plist[i].control_socket,
(void*)(allpids+i*pidlen+tot_nread), pidlen - tot_nread);
/*fprintf(stderr, "read length %d \n", nread);*/
if(nread < 0) {
perror("read");
cleanup();
}
tot_nread += nread;
}
plist[i].remote_pid = *((pid_t *)(allpids+i*pidlen));
pglist_insert(plist[i].hostname, plist[i].remote_pid);
}
/* at this point, all processes have checked in. */
/* cancel the timeout */
alarm(0);
/* send ports to all but highest ranking process, as it needs none */
#ifdef DEBUG
for (i = 0; i < nprocs; i++) {
for (j = 0; j < addrlen / sizeof(int); j++) {
fprintf(stderr, "%d\t",
alladdrs[i * addrlen / sizeof(int) + j]);
}
fprintf(stderr, "\n");
}
fprintf(stderr, "\n");
#endif
out_addrs_len = 3 * nprocs * sizeof(int);
out_addrs = (int *) malloc(out_addrs_len);
if (out_addrs == NULL) {
perror("malloc");
exit(EXIT_FAILURE);
}
for (i = 0; i < nprocs; i++) {
/* put hca_lid information at the first beginning */
out_addrs[i] = alladdrs[i * addrlen / sizeof(int) + i];
/* put host id information in the third round */
out_addrs[2 * nprocs + i] =
alladdrs[i * addrlen / sizeof(int) + nprocs];
}
for (i = 0; i < nprocs; i++) {
int nwritten;
/* personalized address information for each process */
for (j = 0; j < nprocs; j++) {
/* put qp information here */
if (i == j)
/* No QP is allocated for a process itself,
* If you change this, please change viainit.cc:1514 too */
out_addrs[nprocs + j] = -1;
else
out_addrs[nprocs + j] =
alladdrs[j * addrlen / sizeof(int) + i];
}
#ifdef DEBUG
for (j = 0; j < out_addrs_len / sizeof(int); j++)
fprintf(stderr, "%d\t", out_addrs[j]);
fprintf(stderr, "\n");
#endif
nwritten =
write(plist[i].control_socket, out_addrs, out_addrs_len);
if (nwritten != out_addrs_len) {
perror("write");
cleanup();
}
if(pidlen != 0) {
nwritten = 0;
nwritten = write(plist[i].control_socket, allpids, nprocs*pidlen);
if (nwritten != nprocs*pidlen) {
perror("write");
cleanup();
}
}
plist[i].state = P_RUNNING;
}
#ifndef USE_VIADEV_BARRIER
/* Not using the VI barrier. Implement the termination
* barrier using the socket network we have already
* established.
*/
process_termination();
#endif
/* shut down all our ports */
/*close(s);
for (i = 0; i < nprocs; i++)
close(plist[i].control_socket);
*/
/* close all opend sockets */
for (i = 0; i < nprocs; i++)
close(plist[i].control_socket);
/* clients have all information now. Just sit and wait for them
to die, which we will detect via sockets or signal from ssh/rsh signal.
*/
wait_for_errors(s,&sockaddr,sockaddr_len);
/* this while is unused now. We are in block wait in wait_for_errors */
while (1) {
sleep(100);
}
close(s);
exit(EXIT_SUCCESS);
}
int start_process(int i, char *command_name, char *env)
{
char *remote_command;
char *xterm_command;
char xterm_title[100];
char *ld_library_path;
char *device_port_env = NULL;
int id = getpid();
int str_len;
if (plist[i].device != NULL && strlen(plist[i].device) != 0){
device_port_env = (char * )malloc(BASE_ENV_LEN + strlen(plist[i].device) + 1);
sprintf(device_port_env, "VIADEV_DEVICE=%s \0", plist[i].device);
}
if (plist[i].port != -1){
if (device_port_env != NULL){
device_port_env = (char*)realloc(device_port_env,
strlen(device_port_env) + 1 + BASE_ENV_LEN
+sizeof(plist[i].port) + 1);
sprintf(&device_port_env[strlen(device_port_env)], "VIADEV_DEFAULT_PORT=%d \0",
plist[i].port);
} else {
device_port_env = (char *) malloc(BASE_ENV_LEN +
sizeof(plist[i].port) + 1);
sprintf(device_port_env, "VIADEV_DEFAULT_PORT=%d \0", plist[i].port);
}
}
if (device_port_env==NULL) {
device_port_env=strdup("\0");
}
if(use_totalview) {
str_len = strlen(command_name) + strlen(env) + strlen(wd) +
strlen(mpirun_processes) + strlen(device_port_env) + 512;
} else {
str_len = strlen(command_name) + strlen(env) + strlen(wd) +
strlen(device_port_env) + 530;
}
if ((remote_command = malloc(str_len)) == NULL) {
fprintf(stderr, "Failed to malloc %d bytes for remote_command\n",
str_len);
exit(EXIT_FAILURE);
}
if ((xterm_command = malloc(str_len)) == NULL) {
fprintf(stderr, "Failed to malloc %d bytes for xterm_command\n",
str_len);
exit(EXIT_FAILURE);
}
/*
* this is the remote command we execute whether we were are using
* an xterm or using rsh directly
*/
if ((ld_library_path = getenv( "LD_LIBRARY_PATH" ) ) != NULL ) {
sprintf(remote_command, "cd %s; %s LD_LIBRARY_PATH=%s:%s "
"MPIRUN_MPD=0 MPIRUN_HOST=%s MPIRUN_PORT=%d "
"MPIRUN_RANK=%d MPIRUN_NPROCS=%d MPIRUN_ID=%d %s %s %s",
wd, ENV_CMD,LD_LIBRARY_PATH_MPI,ld_library_path,
mpirun_host, port, i,
nprocs, id, display,env,device_port_env);
} else {
sprintf(remote_command, "cd %s; %s LD_LIBRARY_PATH=%s "
"MPIRUN_MPD=0 MPIRUN_HOST=%s MPIRUN_PORT=%d "
"MPIRUN_RANK=%d MPIRUN_NPROCS=%d MPIRUN_ID=%d %s %s %s",
wd, ENV_CMD,LD_LIBRARY_PATH_MPI, mpirun_host, port, i,
nprocs, id, display,env,device_port_env);
}
if(use_totalview) {
sprintf(remote_command, "%s MPIRUN_PROCESSES='%s' %s ", remote_command, mpirun_processes, command_name);
} else {
sprintf(remote_command, "%s NOT_USE_TOTALVIEW=1 %s ", remote_command, command_name);
}
if (xterm_on) {
sprintf(xterm_command, "%s; echo process exited", remote_command);
sprintf(xterm_title, "\"mpirun process %d of %d\"", i, nprocs);
}
plist[i].pid = fork();
plist[i].state = P_STARTED;
/* putting after fork() avoids a race */
if (plist[i].pid == 0) {
if (i != 0) {
int fd = open("/dev/null", O_RDWR, 0);
(void) dup2(fd, STDIN_FILENO);
}
if (use_rsh)
strcpy(sh_cmd, RSH_CMD);
else
strcpy(sh_cmd, SSH_CMD);
if (xterm_on) {
if (show_on) {
printf("command: %s -T %s -e %s %s %s %s\n", XTERM,
xterm_title, sh_cmd, use_rsh ? "" : SSH_ARG,
plist[i].hostname, xterm_command);
} else {
if (use_rsh) {
execl(XTERM, XTERM, "-T", xterm_title, "-e",
sh_cmd, plist[i].hostname, xterm_command, NULL);
} else {
execl(XTERM, XTERM, "-T", xterm_title, "-e",
sh_cmd, SSH_ARG, plist[i].hostname,
xterm_command, NULL);
}
}
} else {
if (show_on) {
printf("command: %s %s %s\n", sh_cmd, plist[i].hostname,
remote_command);
} else {
if (use_rsh) {
execl(sh_cmd, sh_cmd, plist[i].hostname,
remote_command, NULL);
} else {
execl(sh_cmd, sh_cmd, SSH_ARG, plist[i].hostname,
remote_command, NULL);
}
}
}
if (!show_on) {
perror("RSH/SSH command failed!");
}
exit(EXIT_FAILURE);
}
free(remote_command);
free(xterm_command);
return (0);
}
void wait_for_errors(int s,struct sockaddr_in *sockaddr,unsigned int sockaddr_len){
int nread,remote_id,local_id,s1,i,flag;
s1 = accept(s,(struct sockaddr *) sockaddr,&sockaddr_len);
nread = read(s1, &flag, sizeof(flag));
if (nread == -1) {
perror("Termination socket read failed");
} else if (nread == 0) {
} else if (nread != sizeof(flag)) {
printf("Invalid termination socket on read\n");
cleanup();
} else {
nread = read(s1, &local_id, sizeof(local_id));
if (nread == -1) {
perror("Termination socket read failed");
} else if (nread == 0) {
} else if (nread != sizeof(local_id)) {
printf("Invalid termination socket on read\n");
cleanup();
} else if (flag > -1) {
remote_id=flag;
printf("mpirun_rsh: Abort signaled from [%d : %s] remote host is [%d : %s ]\n",local_id,plist[local_id].hostname,remote_id, plist[remote_id].hostname);
close(s);
close(s1);
cleanup();
}
else
{
printf("mpirun_rsh: Abort signaled from [%d]\n",local_id);
close(s);
close(s1);
cleanup();
}
}
}
void process_termination()
{
int i;
int rval;
int send_val = 1000;
int remote_id;
#ifdef MCST_SUPPORT
int my_rank;
int mcs_read, mcs_written;
char grp_info[GRP_INFO_LEN];
int s;
#endif
/* we want to wait until all processes have reached a point
* in their termination where they have cancelled the VIA async
* error handlers.
* We implement a simple barrier by waiting in a read on each
* socket until the process has cancelled its error handler.
* NOTE: If EOF is returned, this implies the remote process has
* failed and the connection is broken.
* Once all processes respond, we inform them to continue by
* writing a value back down the socket.
*/
for (i = 0; i < nprocs; i++) {
int s = plist[i].control_socket;
int nread;
remote_id = -1;
nread = read(s, &remote_id, sizeof(remote_id));
if (nread == -1) {
perror("termination socket read failed");
plist[i].state = P_DISCONNECTED;
} else if (nread == 0) {
plist[i].state = P_DISCONNECTED;
} else if (nread != sizeof(remote_id)) {
printf("Invalid termination socket read on [%d] "
"returned [%d] got [%d]\n", i, nread, remote_id);
cleanup();
} else {
plist[i].state = P_FINISHED;
}
}
/* now, everyone who is still alive has responded */
for (i = 0; i < nprocs; i++) {
int s = plist[i].control_socket;
if (plist[i].state == P_FINISHED) {
int nwritten = write(s, &send_val, sizeof(send_val));
if (nwritten != sizeof(send_val)) {
perror("socket write");
cleanup();
}
}
}
#ifdef MCST_SUPPORT
/* First, receive root's group information. Root=0 */
s = plist[0].control_socket;
mcs_read = read(s, grp_info, GRP_INFO_LEN);
if (mcs_read != GRP_INFO_LEN) {
fprintf(stderr, "mpirun_rsh: read grp info failed "
"(len %d %s)\n", mcs_read, grp_info);
cleanup();
}
/* Second, send grp info to all others */
for (i = 1; i < nprocs; i++) {
s = plist[i].control_socket;
mcs_written = write(s, grp_info, GRP_INFO_LEN);
if (mcs_written != GRP_INFO_LEN) {
fprintf(stderr, "mpirun_rsh: write grp info failed %d\n",
mcs_written);
cleanup();
}
}
/* Third, receive ack from all non-root nodes */
for (i = 1; i < nprocs; i++) {
s = plist[i].control_socket;
mcs_read = read(s, &my_rank, sizeof(my_rank));
if (mcs_read != sizeof(my_rank)) {
fprintf(stderr, "mpirun_rsh: write grp info failed %d\n",
mcs_written);
perror("mpirun_rsh: read grp ack");
cleanup();
}
}
/* Fourth, write ack to the root */
my_rank = 0;
s = plist[0].control_socket;
mcs_written = write(s, &my_rank, sizeof(my_rank));
if (mcs_written != sizeof(my_rank)) {
perror("pmgr_exchange_mcs_group_sync: mpirun_rsh "
"write ack to root");
cleanup();
}
#endif
}
void usage(void)
{
fprintf(stderr, "usage: mpirun_rsh [-v] [-rsh|-ssh] "
"[-paramfile=pfile] "
"[-debug] -[tv] [-xterm] [-show] -np N "
"(-hostfile hfile | h1 h2 ... hN) a.out args\n");
fprintf(stderr, "Where:\n");
fprintf(stderr, "\tv => Show version and exit\n");
fprintf(stderr, "\trsh => " "to use rsh for connecting\n");
fprintf(stderr, "\tssh => " "to use ssh for connecting\n");
fprintf(stderr, "\tparamfile => "
"file containing run-time MVICH parameters\n");
fprintf(stderr, "\tdebug => "
"run each process under the control of gdb\n");
fprintf(stderr,"\ttv => "
"run each process under the control of totalview\n");
fprintf(stderr, "\txterm => "
"run remote processes under xterm\n");
fprintf(stderr, "\tshow => "
"show command for remote execution but dont run it\n");
fprintf(stderr, "\tnp => "
"specify the number of processes\n");
fprintf(stderr, "\th1 h2... => "
"names of hosts where processes should run\n");
fprintf(stderr, "or\thostfile => "
"name of file contining hosts, one per line\n");
fprintf(stderr, "\ta.out => " "name of MPI binary\n");
fprintf(stderr, "\targs => " "arguments for MPI binary\n");
fprintf(stderr, "\n");
}
/* finds first non-whitespace char in input string */
char *skip_white(char *s)
{
int len;
/* return pointer to first non-whitespace char in string */
/* Assumes string is null terminated */
/* Clean from start */
while ((*s == ' ') || (*s == '\t'))
s++;
/* Clean from end */
len = strlen(s) - 1;
while (((s[len] == ' ')
|| (s[len] == '\t')) && (len >=0)){
s[len]='\0';
len--;
}
return s;
}
/* Read hostfile */
static int read_hostfile(char *hostfile_name)
{
size_t i, j, hostname_len = 0;
FILE *hf = fopen(hostfile_name, "r");
if (hf == NULL) {
fprintf(stderr, "Can't open hostfile %s\n", hostfile_name);
perror("open");
exit(EXIT_FAILURE);
}
for (i = 0; i < nprocs; i++) {
char line[100];
char *trimmed_line;
int separator_count = 0,j = 0,prev_j = 0;
if (fgets(line, 100, hf) != NULL) {
int len = strlen(line);
if (line[len - 1] == '\n') {
line[len - 1] = '\0';
}
/* Remove comments and empty lines*/
if (strchr(line, '#') != NULL) {
line[strlen(line) - strlen(strchr(line, '#'))] = '\0';
}
trimmed_line = skip_white(line);
if (strlen(trimmed_line) == 0) {
/* The line is empty, drop it */
i--;
continue;
}
/*Update len and continue patch ?! move it to func ?*/
len = strlen(trimmed_line);
/* Parsing format:
* hostname SEPARATOR hca_name SEPARATOR port
*/
for (j =0; j < len; j++){
if ( trimmed_line[j] == SEPARATOR && separator_count == 0){
plist[i].hostname = (char *)strndup(trimmed_line, j + 1);
plist[i].hostname[j] = '\0';
prev_j = j;
separator_count++;
hostname_len = hostname_len > len ? hostname_len : len;
continue;
}
if ( trimmed_line[j] == SEPARATOR && separator_count == 1){
plist[i].device = (char *)strndup(&trimmed_line[prev_j + 1],
j - prev_j);
plist[i].device[j-prev_j-1] = '\0';
separator_count++;
continue;
}
if ( separator_count == 2){
plist[i].port = atoi(&trimmed_line[j]);
break;
}
}
if (0 == separator_count) {
plist[i].hostname = strdup(trimmed_line);
hostname_len = hostname_len > len ? hostname_len : len;
}
if (1 == separator_count) {
plist[i].device = (char*)strdup(&trimmed_line[prev_j+1]);
}
} else {
fprintf(stderr, "End of file reached on "
"hostfile at %d of %d hostnames\n", i, nprocs);
exit(EXIT_FAILURE);
}
}
fclose(hf);
return hostname_len;
}
/*
* reads the param file and constructs the environment strings
* for each of the environment variables.
* The caller is responsible for de-allocating the returned string.
*
* NOTE: we cant just append these to our current environment because
* RSH and SSH do not export our environment vars to the remote host.
* Rather, the RSH command that starts the remote process looks
* something like:
* rsh remote_host "cd workdir; env ENVNAME=value ... command"
*/
#define ENV_LEN 1024
#define LINE_LEN 256
char *read_param_file(char *paramfile,char *env)
{
FILE *pf;
char errstr[256];
char name[128], value[128];
char buf[384];
char line[LINE_LEN];
char *p;
int num, e_len;
int env_left = 0;
if ((pf = fopen(paramfile, "r")) == NULL) {
sprintf(errstr, "Cant open paramfile = %s", paramfile);
perror(errstr);
exit(EXIT_FAILURE);
}
if ( strlen(env) == 0 ){
/* Allocating space for env first time */
if ((env = malloc(ENV_LEN)) == NULL) {
fprintf(stderr, "Malloc of env failed in read_param_file\n");
exit(EXIT_FAILURE);
}
env_left = ENV_LEN - 1;
}else{
/* already allocated */
env_left = ENV_LEN - (strlen(env) + 1) - 1;
}
while (fgets(line, LINE_LEN, pf) != NULL) {
p = skip_white(line);
if (*p == '#' || *p == '\n') {
/* a comment or a blank line, ignore it */
continue;
}
/* look for NAME = VALUE, where NAME == MVICH_... */
name[0] = value[0] = '\0';
if (param_debug) {
printf("Scanning: %s\n", p);
}
if ((num = sscanf(p, "%64[A-Z_] = %192s", name, value)) != 2) {
/* debug */
if (param_debug) {
printf("FAILED: matched = %d, name = %s, "
"value = %s in \n\t%s\n", num, name, value, p);
}
continue;
}
/* construct the environment string */
buf[0] = '\0';
sprintf(buf, "%s=%s ", name, value);
/* concat to actual environment string */
e_len = strlen(buf);
if (e_len > env_left) {
/* oops, need to grow env string */
int newlen =
(ENV_LEN > e_len + 1 ? ENV_LEN : e_len + 1) + strlen(env);
if ((env = realloc(env, newlen)) == NULL) {
fprintf(stderr, "realloc failed in read_param_file\n");
exit(EXIT_FAILURE);
}
if (param_debug) {
printf("realloc to %d\n", newlen);
}
env_left = ENV_LEN - 1;
}
strcat(env, buf);
env_left -= e_len;
if (param_debug) {
printf("Added: [%s]\n", buf);
printf("env len = %d, env left = %d\n", strlen(env), env_left);
}
}
fclose(pf);
return env;
}
void cleanup_handler(int sig)
{
static int already_called = 0;
printf("Signal %d received.\n", sig);
if (already_called == 0) {
already_called = 1;
}
cleanup();
exit(EXIT_FAILURE);
}
void pglist_print(void) {
if(pglist) {
int i, j;
size_t npids = 0, npids_allocated = 0;
fprintf(stderr, "\n--pglist--\ndata:\n");
for(i = 0; i < pglist->npgs; i++) {
fprintf(stderr, "%p - %s:", &pglist->data[i],
pglist->data[i].hostname);
for(j = 0; j < pglist->data[i].npids; j++) {
fprintf(stderr, " %d", pglist->data[i].pids[j]);
}
fprintf(stderr, "\n");
npids += pglist->data[i].npids;
npids_allocated += pglist->data[i].npids_allocated;
}
fprintf(stderr, "\nindex:");
for(i = 0; i < pglist->npgs; i++) {
fprintf(stderr, " %p", pglist->index[i]);
}
fprintf(stderr, "\nnpgs/allocated: %d/%d (%d%%)\n", pglist->npgs,
pglist->npgs_allocated, (int)(pglist->npgs_allocated ? 100. *
pglist->npgs / pglist->npgs_allocated : 100.));
fprintf(stderr, "npids/allocated: %d/%d (%d%%)\n", npids,
npids_allocated, (int)(npids_allocated ? 100. * npids /
npids_allocated : 100.));
fprintf(stderr, "--pglist--\n\n");
}
}
void pglist_insert(const char * const hostname, const pid_t const pid) {
const size_t increment = nprocs > 4 ? nprocs / 4 : 1;
size_t index = 0, bottom = 0, top;
static size_t alloc_error = 0;
int i, strcmp_result;
process_group * pg;
void * backup_ptr;
if(alloc_error) return;
if(pglist == NULL) goto init_pglist;
top = pglist->npgs - 1;
index = (top + bottom) / 2;
while(strcmp_result = strcmp(hostname, pglist->index[index]->hostname)) {
if(bottom >= top) break;
if(strcmp_result > 0) {
bottom = index + 1;
}
else {
top = index - 1;
}
index = (top + bottom) / 2;
}
if(!strcmp_result) goto insert_pid;
if(strcmp_result > 0) index++;
goto add_process_group;
init_pglist:
pglist = malloc(sizeof(process_groups));
if(pglist) {
pglist->data = NULL;
pglist->index = NULL;
pglist->npgs = 0;
pglist->npgs_allocated = 0;
}
else {
goto register_alloc_error;
}
add_process_group:
if(pglist->npgs == pglist->npgs_allocated) {
process_group * pglist_data_backup = pglist->data;
process_group ** pglist_index_backup = pglist->index;
ptrdiff_t offset;
pglist->npgs_allocated += increment;
backup_ptr = pglist->data;
pglist->data = realloc(pglist->data, sizeof(process_group) *
pglist->npgs_allocated);
if(pglist->data == NULL) {
pglist->data = backup_ptr;
goto register_alloc_error;
}
backup_ptr = pglist->index;
pglist->index = realloc(pglist->index, sizeof(process_group *) *
pglist->npgs_allocated);
if(pglist->index == NULL) {
pglist->index = backup_ptr;
goto register_alloc_error;
}
if(offset = (size_t)pglist->data - (size_t)pglist_data_backup) {
for(i = 0; i < pglist->npgs; i++) {
pglist->index[i] = (process_group *)((size_t)pglist->index[i] +
offset);
}
}
}
for(i = pglist->npgs; i > index; i--) {
pglist->index[i] = pglist->index[i-1];
}
pglist->data[pglist->npgs].hostname = hostname;
pglist->data[pglist->npgs].pids = NULL;
pglist->data[pglist->npgs].npids = 0;
pglist->data[pglist->npgs].npids_allocated = 0;
pglist->index[index] = &pglist->data[pglist->npgs++];
insert_pid:
pg = pglist->index[index];
if(pg->npids == pg->npids_allocated) {
if(pg->npids_allocated) {
pg->npids_allocated <<= 1;
if(pg->npids_allocated < pg->npids) pg->npids_allocated = SIZE_MAX;
if(pg->npids_allocated > nprocs) pg->npids_allocated = nprocs;
}
else {
pg->npids_allocated = 1;
}
backup_ptr = pg->pids;
pg->pids = realloc(pg->pids, pg->npids_allocated * sizeof(pid_t));
if(pg->pids == NULL) {
pg->pids = backup_ptr;
goto register_alloc_error;
}
}
pg->pids[pg->npids++] = pid;
return;
register_alloc_error:
if(pglist) {
if(pglist->data) {
process_group * pg = pglist->data;
while(pglist->npgs--) {
if(pg->pids) free((pg++)->pids);
}
free(pglist->data);
}
if(pglist->index) free(pglist->index);
free(pglist);
}
alloc_error = 1;
}
void free_memory(void) {
if(pglist) {
if(pglist->data) {
process_group * pg = pglist->data;
while(pglist->npgs--) {
if(pg->pids) free((pg++)->pids);
}
free(pglist->data);
}
if(pglist->index) free(pglist->index);
free(pglist);
}
if(plist) {
while(nprocs--) {
if(plist[nprocs].device) free(plist[nprocs].device);
if(plist[nprocs].hostname) free(plist[nprocs].hostname);
}
free(plist);
}
}
void cleanup(void)
{
int i;
if (use_totalview) {
fprintf(stderr, "Cleaning up all processes ...");
}
#ifdef MAC_OSX
for (i = 0; i < NSIG; i++) {
#else
for (i = 0; i < _NSIG; i++) {
#endif
signal(i, SIG_DFL);
}
for (i = 0; i < nprocs; i++) {
if (RUNNING(i)) {
/* send terminal interrupt, which will hopefully
propagate to the other side. (not sure what xterm will
do here.
*/
kill(plist[i].pid, SIGINT);
}
}
sleep(1);
for (i = 0; i < nprocs; i++) {
if (plist[i].state != P_NOTSTARTED) {
/* send regular interrupt to rsh */
kill(plist[i].pid, SIGTERM);
}
}
sleep(1);
for (i = 0; i < nprocs; i++) {
if (plist[i].state != P_NOTSTARTED) {
/* Kill the processes */
kill(plist[i].pid, SIGKILL);
}
}
if(pglist) {
rkill_fast();
}
else {
rkill_linear();
}
exit(EXIT_FAILURE);
}
void rkill_fast(void) {
int i, j, tryagain, spawned_pid[pglist->npgs];
fprintf(stderr, "Killing remote processes...");
for(i = 0; i < pglist->npgs; i++) {
if(0 == (spawned_pid[i] = fork())) {
if(pglist->index[i]->npids) {
const size_t bufsize = 40 + 10 * pglist->index[i]->npids;
const process_group * pg = pglist->index[i];
char kill_cmd[bufsize], tmp[10];
kill_cmd[0] = '\0';
strcat(kill_cmd, "kill -s 9");
for(j = 0; j < pg->npids; j++) {
snprintf(tmp, 10, " %d", pg->pids[j]);
strcat(kill_cmd, tmp);
}
strcat(kill_cmd, " >&/dev/null");
if(use_rsh) {
execl(RSH_CMD, RSH_CMD, pg->hostname, kill_cmd, NULL);
}
else {
execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x", pg->hostname,
kill_cmd, NULL);
}
perror(NULL);
exit(EXIT_FAILURE);
}
else {
exit(EXIT_SUCCESS);
}
}
}
while(1) {
static int iteration = 0;
tryagain = 0;
sleep(1 << iteration);
for (i = 0; i < pglist->npgs; i++) {
if(spawned_pid[i]) {
if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, WNOHANG))) {
tryagain = 1;
}
}
}
if(++iteration == 5 || !tryagain) {
fprintf(stderr, "DONE\n");
break;
}
}
if(tryagain) {
fprintf(stderr, "The following processes may have not been killed:\n");
for (i = 0; i < pglist->npgs; i++) {
if(spawned_pid[i]) {
const process_group * pg = pglist->index[i];
fprintf(stderr, "%s:", pg->hostname);
for (j = 0; j < pg->npids; j++) {
fprintf(stderr, " %d", pg->pids[j]);
}
fprintf(stderr, "\n");
}
}
}
}
void rkill_linear(void) {
int i, j, tryagain, spawned_pid[nprocs];
fprintf(stderr, "Killing remote processes...");
for (i = 0; i < nprocs; i++) {
if(0 == (spawned_pid[i] = fork())) {
char kill_cmd[80];
if(!plist[i].remote_pid) exit(EXIT_SUCCESS);
snprintf(kill_cmd, 80, "kill -s 9 %d >&/dev/null",
plist[i].remote_pid);
if(use_rsh) {
execl(RSH_CMD, RSH_CMD, plist[i].hostname, kill_cmd, NULL);
}
else {
execl(SSH_CMD, SSH_CMD, SSH_ARG, "-x",
plist[i].hostname, kill_cmd, NULL);
}
perror(NULL);
exit(EXIT_FAILURE);
}
}
while(1) {
static int iteration = 0;
tryagain = 0;
sleep(1 << iteration);
for (i = 0; i < nprocs; i++) {
if(spawned_pid[i]) {
if(!(spawned_pid[i] = waitpid(spawned_pid[i], NULL, WNOHANG))) {
tryagain = 1;
}
}
}
if(++iteration == 5 || !tryagain) {
fprintf(stderr, "DONE\n");
break;
}
}
if(tryagain) {
fprintf(stderr, "The following processes may have not been killed:\n");
for (i = 0; i < nprocs; i++) {
if(spawned_pid[i]) {
fprintf(stderr, "%s [%d]\n", plist[i].hostname,
plist[i].remote_pid);
}
}
}
}
void nostop_handler(int signal)
{
printf("Stopping from the terminal not allowed\n");
}
void alarm_handler(int signal)
{
extern const char * alarm_msg;
if (use_totalview) {
fprintf(stderr, "Timeout alarm signaled\n");
}
if(alarm_msg) fprintf(stderr, alarm_msg);
cleanup();
}
void child_handler(int signal)
{
static int num_exited = 0;
int status, i, pid;
while(1) {
pid = waitpid(-1, &status, WNOHANG);
if(pid == 0) break;
if(pid != -1 && WIFEXITED(status) && WEXITSTATUS(status) == 0) {
if(++num_exited == nprocs) exit(WEXITSTATUS(status));
}
else {
fprintf(stderr, "\nChild exited abnormally!\n");
cleanup();
}
}
}
/* vi:set sw=4 sts=4 tw=80: */
More information about the mvapich-discuss
mailing list