Receiving from multiple intercomm in the same node

jaume m 1 Reputation point
2021-11-24T08:46:48.02+00:00

Hi, I have a mpi node setup in which the same node stablishes intercomms with two diferent groups with the willing of receiving from both intercomms at the same time. I have exprienced that in such scenario non intercomms receive messages although sender seems to work ok. Besides, I have seen that I attach and detach a few times to that very node it starts to receive normally.

I have isolated the problem in the following code:

/* manager */

include "mpi.h"

include <string>

include <stdlib.h>

include <thread>

include <chrono>

int main(int argc,char* argv[])
{
int world_size = 0, universe_size = 1;
MPI_Comm everyone1, everyone2,everyone3; /* intercommunicator */
char worker_program[100];

printf("Initializing master\n");

MPI_Init(NULL,NULL);

char** newargv;
newargv = (char**)malloc(1 * sizeof(char*));
char a = 'a';
newargv[0] = NULL;

MPI_Comm_spawn("worker.exe",MPI_ARGV_NULL,1,
          MPI_INFO_NULL,0,MPI_COMM_WORLD,&everyone1,
          MPI_ERRCODES_IGNORE);


printf("first child: %d\n",everyone1);

MPI_Comm merged1;
MPI_Intercomm_merge(everyone1,0,&merged1);

int myRank1;
MPI_Comm_rank(merged1,&myRank1);
int otherRank = (myRank1 == 0) ? 1 : 0;

std::string mergedStr = std::to_string(merged1);
char** _argv = (char**)malloc(3 * sizeof(char*));
_argv[0] = (char*)malloc(mergedStr.size()+1);
memset(_argv[0],0,mergedStr.size() + 1);
std::memcpy(_argv[0],mergedStr.data(),mergedStr.size());
_argv[1] = (char*)malloc(2);
mergedStr = "1";
memset(_argv[1],0,2);
std::memcpy(_argv[1],mergedStr.data(),mergedStr.size());
_argv[2] = NULL;

printf("launching child 2\n");

MPI_Comm_spawn("worker.exe",_argv,1,
      MPI_INFO_NULL,0,MPI_COMM_WORLD,&everyone2,
      MPI_ERRCODES_IGNORE);

MPI_Comm merged2;
MPI_Intercomm_merge(everyone2,0,&merged2);

MPI_Comm newInterComm;
MPI_Comm newIntraComm;
MPI_Intercomm_create(merged1,0,everyone2,0,0,&newInterComm);

std::string _mergedStr = std::to_string(merged2);
char** _argv_ = (char**)malloc(3 * sizeof(char*));
_argv_[0] = (char*)malloc(_mergedStr.size() + 1);
memset(_argv_[0],0,_mergedStr.size() + 1);
std::memcpy(_argv_[0],_mergedStr.data(),_mergedStr.size());
_argv_[1] = (char*)malloc(2);
memset(_argv_[1],0,2);
std::memcpy(_argv_[1],"2",1);
_argv_[2] = NULL;

MPI_Comm_spawn("worker.exe",_argv_,1,
  MPI_INFO_NULL,0,MPI_COMM_WORLD,&everyone3,
  MPI_ERRCODES_IGNORE);

MPI_Comm _newInterComm;
int mergedRank = 0;
printf("about to merge\n");
MPI_Comm_rank(merged2,&mergedRank);
MPI_Intercomm_create(merged2,mergedRank,everyone3,0,0,&_newInterComm);
printf("done last merge\n");
//MPI_Intercomm_merge(newInterComm,0,&newIntraComm);

//now receive both ranks
MPI_Comm rank1, rank2, rank3;
MPI_Status status1, status2,status3;

MPI_Finalize();
return 0;

}

include "mpi.h"

include <stdio.h>

include <string>

int main(int argc,char* argv[])
{
int size;
MPI_Comm parent;
MPI_Comm merged;
MPI_Comm peer = MPI_COMM_NULL;

if(argc == 3)
{
    peer = std::stoul(argv[1]);
    size = std::stoul(argv[2]);
}

printf("Initializing worker\n");
MPI_Init(&argc,&argv);

MPI_Comm_get_parent(&parent);

MPI_Comm newInterComm,_newInterComm;
MPI_Comm newIntraComm;
if(peer != MPI_COMM_NULL)
{
    if(size == 1)
    {
        MPI_Intercomm_merge(parent,1,&merged);

        MPI_Intercomm_create(MPI_COMM_SELF,0,parent,0,0,&newInterComm);

        MPI_Intercomm_create(merged,0,MPI_COMM_NULL,1,0,&_newInterComm);
    }
    else
    {
        MPI_Intercomm_create(MPI_COMM_SELF,0,parent,0,0,&newInterComm);
    }
}
else
{
    MPI_Comm_get_parent(&parent);

    MPI_Intercomm_merge(parent,1,&merged);

    MPI_Intercomm_create(merged,0,MPI_COMM_NULL,1,0,&newInterComm);
}

const size_t dataSize = 1024 * 1024;
char* data = static_cast<char*>(malloc(dataSize));
while(true)
{
    if(peer == MPI_COMM_NULL)
    {
        MPI_Request request;
        MPI_Isend(data,dataSize,MPI_CHAR,0,0,newInterComm,&request);

        MPI_Wait(&request,MPI_STATUS_IGNORE);
    }
    else
    {
        if(size == 1)
        {
            MPI_Request request;
            MPI_Status status;
            int probError = MPI_Probe(MPI_ANY_SOURCE,0,newInterComm,&status);

            if(probError == MPI_SUCCESS)
            {
                const int channel = status.MPI_SOURCE;
                int number_amount = 0;

                  MPI_Irecv(data,dataSize,MPI_CHAR,MPI_ANY_SOURCE,0,newInterComm,&request);

                  MPI_Wait(&request,MPI_STATUS_IGNORE);
            }

            probError = MPI_Probe(MPI_ANY_SOURCE,0,_newInterComm,&status);

            if(probError == MPI_SUCCESS)
            {
                const int channel = status.MPI_SOURCE;
                int number_amount = 0;

                MPI_Request request;
                MPI_Irecv(data,dataSize,MPI_CHAR,MPI_ANY_SOURCE,0,_newInterComm,&request);

                MPI_Wait(&request,MPI_STATUS_IGNORE);
            }
        }
        else
        {
            MPI_Request request;

            MPI_Isend(data,dataSize,MPI_CHAR,0,0,newInterComm,&request);

            MPI_Wait(&request,MPI_STATUS_IGNORE);
        }
    }
}

MPI_Finalize();
return 0;

}

C++
C++
A high-level, general-purpose programming language, created as an extension of the C programming language, that has object-oriented, generic, and functional features in addition to facilities for low-level memory manipulation.
3,451 questions
{count} votes