[mvapich-discuss] mvapich & mpiexec

Pete Wyckoff pw at osc.edu
Mon May 28 12:40:09 EDT 2007


koop at cse.ohio-state.edu wrote on Wed, 09 May 2007 14:56 -0400:
> > If mvapich work witch mpiexec? I use  MVAPICH 0.9.9-beta and mpiexec 0.82.
> > When I try run tasks I see messages:
> >      mpiexec: Warning: read_ib_one: protocol version 5 not known, but
> > might still work. But nothig to do. If last mvapich version is not
> > supported by mpiexec. Or are there other methods to run mpi tasks with
> > batch system? I use the latest torque batch system. Batch scripts with
> > mpirun work, but for that is needed install distributive of mvapich on
> > all nodes.
> 
> Currently there is no way to enable the old startup protocol in 0.9.9.
> 
> mpiexec will need to be updated to accommodate the new startup protocol
> that is used in 0.9.9.

Matt,

With Jan's encouragement and testing, we may have working support
for mvapich 0.9.9 in mpiexec.  It is a more complex change than we
had imagined.

Like in previous mvapich versions, each task in the parallel job
contacts the job launcher and gives it some information; then after
they have all connected, the job launcher sends back the entire set
of information to all processes.  That is typical for MPI job
startup.  What's new in this version is an entire second phase of
connections from each task for more information, with a global
scatter.

Before I check it in, can you provide some commentary on the reason
for the two sets of accept(), read(), write() for every task?  On
the surface this would seem like a major barrier to scalability.
There must be a good reason to get the hostids out first, then have
the tasks come back for the addresses.

The comment so far is:

* Version 5:
*   Added another phase, with socket close/reaccept between the two.  No
*   clue why this is necessary.  First phase distributes hostids:
*   ...

Egor, 

The current working patch is below.  It applies against SVN, but
should probably work okay against 0.82 too.  If you would be willing
to verify that this works on your system, I can check it in with
confidence and put a notice on the web page for others.

		-- Pete


Index: ib.c
===================================================================
--- ib.c	(revision 402)
+++ ib.c	(working copy)
@@ -30,6 +30,7 @@ static char *address = 0;
 static int address_size = 0;
 static char *pids = 0;
 static int pids_size = 0;
+static int phase = 0;  /* for two-phase version 5 */
 
 /* state of all the sockets */
 static int num_waiting_to_accept;  /* first accept all numtasks */
@@ -150,6 +151,22 @@ prepare_ib_startup_port(int *fd)
  *   Same as 2 with addition of pid array.  We send back the
  *   entire array of pids (unpersonalized) after the addresses array.
  *
+ * Version 5:
+ *   Added another phase, with socket close/reaccept between the two.  No
+ *   clue why this is necessary.  First phase distributes hostids:
+ *     version   # 5
+ *     rank      # 0..np-1
+ *     hostidlen # 4 bytes
+ *     hostid    # <hostidlen> bytes
+ *   Write back entire hostid[] array.
+ *   Close fds, go to phase 2.  At each accept, gather:
+ *     rank      # 0..np-1
+ *     addrlen   # 4 bytes, could be 0
+ *     addrs[]   # <addrlen> bytes
+ *     pidlen    # 4 bytes
+ *     pids[]    # <pidlen> bytes
+ *   Write back personalized out_addrs[] and full pids[].
+ *
  * Return negative on error, or new rank number for success.
  */
 static int read_ib_one(int fd)
@@ -159,8 +176,13 @@ static int read_ib_one(int fd)
     int j, ret = -1;
     pid_t pidlen;
 
-    if (read_full_ret(fd, &testvers, sizeof(int)) != sizeof(int))
-	goto out;
+    if (version == 5 && phase == 1) {
+	/* no version again on second phase */
+	testvers = version;
+    } else {
+	if (read_full_ret(fd, &testvers, sizeof(int)) != sizeof(int))
+	    goto out;
+    }
     if (read_full_ret(fd, &rank, sizeof(int)) != sizeof(int))
 	goto out;
     if (read_full_ret(fd, &addrlen, sizeof(int)) != sizeof(int))
@@ -194,11 +216,11 @@ static int read_ib_one(int fd)
 
     if (version == -1) {
 	version = testvers;
-	if (!(version == 1 || version == 2 || version == 3)) {
+	if (!(version == 1 || version == 2 || version == 3 || version == 5)) {
 	    warning(
 	      "%s: protocol version %d not known, but might still work",
 	      __func__, version);
-	    version = 3;  /* guess the latest still works */
+	    version = 5;  /* guess the latest still works */
 	}
 	debug(1, "%s: version %d startup%s", __func__, version,
 	  non_versioned_092 ? " (unversioned)" : "");
@@ -238,7 +260,7 @@ static int read_ib_one(int fd)
 	    goto out;
     }
 
-    if (version >= 3) {
+    if (version == 3 || (version == 5 && phase == 1)) {
 	read_full(fd, &pidlen, sizeof(pidlen));
 	if (!pids) {
 	    pids_size = pidlen;
@@ -291,6 +313,7 @@ read_ib_startup_ports(void)
     int numleft;
     int ret = 0;
 
+next_phase:
     debug(1, "%s: waiting for checkin: %d to accept, %d to read", __func__,
       num_waiting_to_accept, num_waiting_to_read);
 
@@ -309,17 +332,6 @@ read_ib_startup_ports(void)
     }
 
     /*
-     * Put listen socket back in blocking, and give it to the stdio listener.
-     */
-    flags = fcntl(mport_fd, F_GETFL);
-    if (flags < 0)
-	error_errno("%s: get socket flags", __func__);
-    if (fcntl(mport_fd, F_SETFL, flags & ~O_NONBLOCK) < 0)
-	error_errno("%s: set listen socket blocking", __func__);
-    close(mport_fd);
-    stdio_msg_parent_say_abort_fd(0);
-
-    /*
      * Now send the information back to all of them.
      */
     if (version == 1) {
@@ -352,10 +364,70 @@ read_ib_startup_ports(void)
 	    }
 	    free(pids);
 	}
+    } else if (version == 5) {
+	if (phase == 0) {
+	    /* These are actually the hostids, in mvapich parlance.  Next
+	     * phase will be the personalized addresses. */
+	    for (i=0; i<numtasks; i++) {
+		if (write_full(fds[i], address, numtasks * address_size) < 0)
+		    error_errno("%s: write addresses to rank %d", __func__, i);
+	    }
+	    phase = 1;
+	    for (i=0; i<numtasks; i++) {
+		close(fds[i]);
+		fds[i] = -1;
+	    }
+	    address_size = 0;
+	    free(address);
+	    address = NULL;
+	    num_waiting_to_accept = numtasks;
+	    goto next_phase;
+	} else if (phase == 1) {
+	    /*
+	     * Very similar to version 3, but with -1 for i == j in
+	     * outaddrs.  Not sure if that matters.
+	     */
+	    int outsize = 3 * numtasks * sizeof(int);
+	    int *outaddrs = Malloc(outsize);
+	    int *inaddrs = (int *) (unsigned long) address;
+	    int inaddrs_size = address_size / sizeof(int);
+	    /* fill in the common information first: lids, hostids */
+	    for (i=0; i<numtasks; i++)
+		outaddrs[i] = inaddrs[i*inaddrs_size + i];
+	    for (i=0; i<numtasks; i++)
+		outaddrs[2*numtasks+i] = inaddrs[i*inaddrs_size + numtasks];
+	    /* personalize the array with qp info for each */
+	    for (i=0; i<numtasks; i++) {
+		for (j=0; j<numtasks; j++)
+		    outaddrs[numtasks+j] = inaddrs[j*inaddrs_size + i];
+		outaddrs[numtasks + i] = -1;
+		if (write_full(fds[i], outaddrs, outsize) < 0)
+		    error_errno("%s: write addresses to rank %d", __func__, i);
+	    }
+	    free(outaddrs);
+	    for (i=0; i<numtasks; i++) {
+		if (write_full(fds[i], pids, pids_size * numtasks) < 0)
+		    error_errno("%s: write pids to rank %d", __func__, i);
+	    }
+	    free(pids);
+	} else
+	    error("%s: programmer error, unknown version 5 phase %d", __func__,
+		  phase);
     } else
 	error("%s: programmer error, unknown version %d", __func__, version);
 
     /*
+     * Put listen socket back in blocking, and give it to the stdio listener.
+     */
+    flags = fcntl(mport_fd, F_GETFL);
+    if (flags < 0)
+	error_errno("%s: get socket flags", __func__);
+    if (fcntl(mport_fd, F_SETFL, flags & ~O_NONBLOCK) < 0)
+	error_errno("%s: set listen socket blocking", __func__);
+    close(mport_fd);
+    stdio_msg_parent_say_abort_fd(0);
+
+    /*
      * Finally, implement a simple barrier.  Use a select loop to avoid
      * hanging on a sequential read from #0 which is always quite busy and
      * slow to respond.


More information about the mvapich-discuss mailing list