[mvapich-discuss] multithreaded mpi_get performance

Thiago Ize thiago at sci.utah.edu
Mon Nov 8 23:34:09 EST 2010


Skipped content of type multipart/alternative-------------- next part --------------
#include <mpi.h>
#include <iostream>
#include <stdio.h>
#include <pthread.h>

using namespace std;

int numThreads=0;
const unsigned long NUM_GETS = 100000; // something big
const unsigned long BUFFER_SIZE = 4096;
char* sendBuffer = new char[BUFFER_SIZE];
MPI::Win *win;

void* runThread(void* threadID) {
  const int proc = *((int*) threadID);
  char* receiveBuffer = new char[BUFFER_SIZE];

  for (unsigned long i = 0; i < NUM_GETS; ++i) {
    int owner = 0;
#if 1 // Set to 0 to verify that this code is actually running on separate cores.
    int err = MPI_Win_lock(MPI_LOCK_SHARED, owner, 0, win[proc]);
    if (err != MPI_SUCCESS) cout << "ERROR lock: " <<err<<endl;

    err = MPI_Get(receiveBuffer, BUFFER_SIZE, MPI_CHAR,
                  owner, 0, BUFFER_SIZE, MPI_CHAR, win[proc]);
    if (err != MPI_SUCCESS) cout << "ERROR get: " <<err<<endl;

    err = MPI_Win_unlock(owner, win[proc]);
    if (err != MPI_SUCCESS) cout << "ERROR unlock: " <<err<<endl;
#else
    for (int k=0; k < 1000; ++k) // do lots of work
      receiveBuffer[i%BUFFER_SIZE] = k*234/(k+(double)receiveBuffer[(i-1)%BUFFER_SIZE]);
    if (receiveBuffer[i%BUFFER_SIZE]==-1)
      printf("This will never get printed, but don't tell the optimizer that\n");
#endif
  }
}

int main(int argc, char* argv[]) {

  int provided=-1;
  int requested = MPI_THREAD_MULTIPLE;
  MPI_Init_thread(&argc, &argv, requested, &provided);

  const int rank = MPI::COMM_WORLD.Get_rank();

  if (rank == 0)
    cout << "multithreading support level requested/provided: " <<requested << "  " <<provided<<endl;

  if (argc != 2) {
    cerr << "wrong input.\n"; return 1;
  }

  numThreads = atoi(argv[1]);

  win = new MPI::Win[numThreads];
  for (int i=0; i < numThreads; ++i) {
    win[i] = MPI::Win::Create(sendBuffer, BUFFER_SIZE, 1, MPI_INFO_NULL, MPI_COMM_WORLD);
  }

  pthread_t* threads = new pthread_t[numThreads];
  int *threadIDs = new int[numThreads];

  for (int i=1; i <= numThreads; ++i) {
    if (rank==0)
      printf("running with %d threads\n", i);

    double startTime = MPI::Wtime();
    for (int j=0; j < i; ++j) {
      threadIDs[j] = j;
      pthread_create(&threads[j], NULL, runThread, &threadIDs[j]);
    }

    for (int j=0; j < i; ++j) {
      pthread_join( threads[j], NULL);
    }

    double endTime = MPI::Wtime();
    double sec = (endTime-startTime);
    unsigned long bitsGot = i*8*BUFFER_SIZE*NUM_GETS;
    float GbitsGot = bitsGot/1073741824;
    printf("rank %d did %.3lf Gb/s in %.3lf seconds \n",rank, (GbitsGot/sec), sec);
    if (rank == 0) {
      double startTime = MPI::Wtime();
      while(true) {
        if (MPI::Wtime()-startTime > 2)
          break;
      }
    }

    MPI_Barrier(MPI_COMM_WORLD);
    sleep(1); // so that the output is in sync and looks pretty
  }

  MPI::Finalize();
  return 0;
}


More information about the mvapich-discuss mailing list