Receiving from multiple intercomm in the same node
Hi, I have a mpi node setup in which the same node stablishes intercomms with two diferent groups with the willing of receiving from both intercomms at the same time. I have exprienced that in such scenario non intercomms receive messages although sender seems to work ok. Besides, I have seen that I attach and detach a few times to that very node it starts to receive normally.
I have isolated the problem in the following code:
/* manager */
include "mpi.h"
include <string>
include <stdlib.h>
include <thread>
include <chrono>
int main(int argc,char* argv[])
{
int world_size = 0, universe_size = 1;
MPI_Comm everyone1, everyone2,everyone3; /* intercommunicator */
char worker_program[100];
printf("Initializing master\n");
MPI_Init(NULL,NULL);
char** newargv;
newargv = (char**)malloc(1 * sizeof(char*));
char a = 'a';
newargv[0] = NULL;
MPI_Comm_spawn("worker.exe",MPI_ARGV_NULL,1,
MPI_INFO_NULL,0,MPI_COMM_WORLD,&everyone1,
MPI_ERRCODES_IGNORE);
printf("first child: %d\n",everyone1);
MPI_Comm merged1;
MPI_Intercomm_merge(everyone1,0,&merged1);
int myRank1;
MPI_Comm_rank(merged1,&myRank1);
int otherRank = (myRank1 == 0) ? 1 : 0;
std::string mergedStr = std::to_string(merged1);
char** _argv = (char**)malloc(3 * sizeof(char*));
_argv[0] = (char*)malloc(mergedStr.size()+1);
memset(_argv[0],0,mergedStr.size() + 1);
std::memcpy(_argv[0],mergedStr.data(),mergedStr.size());
_argv[1] = (char*)malloc(2);
mergedStr = "1";
memset(_argv[1],0,2);
std::memcpy(_argv[1],mergedStr.data(),mergedStr.size());
_argv[2] = NULL;
printf("launching child 2\n");
MPI_Comm_spawn("worker.exe",_argv,1,
MPI_INFO_NULL,0,MPI_COMM_WORLD,&everyone2,
MPI_ERRCODES_IGNORE);
MPI_Comm merged2;
MPI_Intercomm_merge(everyone2,0,&merged2);
MPI_Comm newInterComm;
MPI_Comm newIntraComm;
MPI_Intercomm_create(merged1,0,everyone2,0,0,&newInterComm);
std::string _mergedStr = std::to_string(merged2);
char** _argv_ = (char**)malloc(3 * sizeof(char*));
_argv_[0] = (char*)malloc(_mergedStr.size() + 1);
memset(_argv_[0],0,_mergedStr.size() + 1);
std::memcpy(_argv_[0],_mergedStr.data(),_mergedStr.size());
_argv_[1] = (char*)malloc(2);
memset(_argv_[1],0,2);
std::memcpy(_argv_[1],"2",1);
_argv_[2] = NULL;
MPI_Comm_spawn("worker.exe",_argv_,1,
MPI_INFO_NULL,0,MPI_COMM_WORLD,&everyone3,
MPI_ERRCODES_IGNORE);
MPI_Comm _newInterComm;
int mergedRank = 0;
printf("about to merge\n");
MPI_Comm_rank(merged2,&mergedRank);
MPI_Intercomm_create(merged2,mergedRank,everyone3,0,0,&_newInterComm);
printf("done last merge\n");
//MPI_Intercomm_merge(newInterComm,0,&newIntraComm);
//now receive both ranks
MPI_Comm rank1, rank2, rank3;
MPI_Status status1, status2,status3;
MPI_Finalize();
return 0;
}
include "mpi.h"
include <stdio.h>
include <string>
int main(int argc,char* argv[])
{
int size;
MPI_Comm parent;
MPI_Comm merged;
MPI_Comm peer = MPI_COMM_NULL;
if(argc == 3)
{
peer = std::stoul(argv[1]);
size = std::stoul(argv[2]);
}
printf("Initializing worker\n");
MPI_Init(&argc,&argv);
MPI_Comm_get_parent(&parent);
MPI_Comm newInterComm,_newInterComm;
MPI_Comm newIntraComm;
if(peer != MPI_COMM_NULL)
{
if(size == 1)
{
MPI_Intercomm_merge(parent,1,&merged);
MPI_Intercomm_create(MPI_COMM_SELF,0,parent,0,0,&newInterComm);
MPI_Intercomm_create(merged,0,MPI_COMM_NULL,1,0,&_newInterComm);
}
else
{
MPI_Intercomm_create(MPI_COMM_SELF,0,parent,0,0,&newInterComm);
}
}
else
{
MPI_Comm_get_parent(&parent);
MPI_Intercomm_merge(parent,1,&merged);
MPI_Intercomm_create(merged,0,MPI_COMM_NULL,1,0,&newInterComm);
}
const size_t dataSize = 1024 * 1024;
char* data = static_cast<char*>(malloc(dataSize));
while(true)
{
if(peer == MPI_COMM_NULL)
{
MPI_Request request;
MPI_Isend(data,dataSize,MPI_CHAR,0,0,newInterComm,&request);
MPI_Wait(&request,MPI_STATUS_IGNORE);
}
else
{
if(size == 1)
{
MPI_Request request;
MPI_Status status;
int probError = MPI_Probe(MPI_ANY_SOURCE,0,newInterComm,&status);
if(probError == MPI_SUCCESS)
{
const int channel = status.MPI_SOURCE;
int number_amount = 0;
MPI_Irecv(data,dataSize,MPI_CHAR,MPI_ANY_SOURCE,0,newInterComm,&request);
MPI_Wait(&request,MPI_STATUS_IGNORE);
}
probError = MPI_Probe(MPI_ANY_SOURCE,0,_newInterComm,&status);
if(probError == MPI_SUCCESS)
{
const int channel = status.MPI_SOURCE;
int number_amount = 0;
MPI_Request request;
MPI_Irecv(data,dataSize,MPI_CHAR,MPI_ANY_SOURCE,0,_newInterComm,&request);
MPI_Wait(&request,MPI_STATUS_IGNORE);
}
}
else
{
MPI_Request request;
MPI_Isend(data,dataSize,MPI_CHAR,0,0,newInterComm,&request);
MPI_Wait(&request,MPI_STATUS_IGNORE);
}
}
}
MPI_Finalize();
return 0;
}