YARP
Yet Another Robot Platform
MpiComm.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "MpiComm.h"
11 
12 #include <yarp/os/Log.h>
13 #include <yarp/os/NetType.h>
14 #include <mpi.h>
15 
16 #include <cstdlib>
17 #include <utility>
18 #include <unistd.h>
19 
20 using namespace yarp::os;
21 
22 /* --------------------------------------- */
23 /* MpiControlThread */
24 
26 
27 void finalizeMPI() {
28  if (MpiControl) {
30  delete MpiControl;
31  MpiControl = nullptr;
32  }
33  int ct = 0;
34  int finalized;
35  while (ct < 5) {
36  sleep(1);
37  MPI_Finalized(&finalized);
38  if (finalized) {
39  return;
40  }
41  ct++;
42  }
43  yCError(MPI_CARRIER, "MpiControlThread: Finalizing MPI failed! Calling MPI_Abort");
44  MPI_Abort(MPI_COMM_WORLD,1);
45 }
46 
48  yCInfo(MPI_CARRIER, "MpiControlThread: Trying to finalize MPI...");
49  MPI_Finalize();
50  yCInfo(MPI_CARRIER,"MpiControlThread: Successfully finalized MPI...");
51 }
52 
54  // We have to finalize MPI at process termination
55  atexit(finalizeMPI);
56 
57  yCDebug(MPI_CARRIER,"[MpiControl] Initialize");
58 
59  int provided;
60  // We need full multithread support for MPI
61  int requested = MPI_THREAD_MULTIPLE;
62  // Passing NULL for argc/argv pointers is fine for MPI-2
63  int err = MPI_Init_thread(nullptr, nullptr, requested , &provided);
64  if (err != MPI_SUCCESS ) {
65  yCError(MPI_CARRIER, "MpiControlThread: Couldn't initialize MPI");
66  return false;
67  }
68 
69  if (provided >= requested) {
70  return true;
71  }
72  else {
73  MPI_Finalize();
74  yCError(MPI_CARRIER, "MpiControlThread: MPI implementation doesn't provide required thread safety: requested %s, provided %s", NetType::toString(requested).c_str(), NetType::toString(provided).c_str());
75  return false;
76  }
77 }
78 
79 
80 /* --------------------------------------- */
81 /* MpiComm */
82 
83 MpiComm::MpiComm(std::string name) :
84  name(std::move(name))
85 {
86  if (MpiControl == nullptr) {
88  }
89  if (! MpiControl->isRunning()) {
90  MpiControl->start();
91  }
92 
93  // Complicated way of doing comm = MPI_COMM_SELF;
94  // but safer
95  MPI_Group self_group;
96  MPI_Comm_group( MPI_COMM_SELF, &self_group );
97  MPI_Comm_create( MPI_COMM_SELF, self_group, &comm );
98 
99 
100  // Create a unique identifier to prevent intra-process use of MPI
101  int length = 0;
102  MPI_Get_processor_name(unique_id, &length);
103  sprintf(unique_id + length, "____pid____%d", getpid());
104  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Unique id: %s", name.c_str(), unique_id);
105 }
106 
107 //TODO: replace by static variable check??!?
108 bool MpiComm::notLocal(std::string other) {
109  if (other == std::string(unique_id)) {
110  yCError(MPI_CARRIER, "MPI does not support process local communication");
111  return false;
112  }
113  return true;
114 }
115 
116 bool MpiComm::connect(std::string port) {
117 
118  char* port_name = new char[port.length()+1];
119  memcpy(port_name, port.c_str(), port.length());
120  port_name[port.length()] = '\0';
121 
122  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Waiting for accept", name.c_str());
123 
124  MPI_Comm intercomm;
125  MPI_Comm_set_errhandler(comm, MPI_ERRORS_RETURN);
126  int err = MPI_Comm_connect( port_name, MPI_INFO_NULL, 0, comm, &intercomm );
127  MPI_Comm_set_errhandler(comm, MPI_ERRORS_ARE_FATAL);
128 
129  if (err != MPI_SUCCESS ) {
130  yCError(MPI_CARRIER, "MpiCarrier: Couldn't create connection");
131  return false;
132  }
133 
134  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Connection established", name.c_str());
135 
136  bool high = true;
137  MPI_Intercomm_merge(intercomm, high, &comm);
138  MPI_Comm_disconnect(&intercomm);
139 
140  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Comms merged", name.c_str());
141 
142  delete[] port_name;
143 
144  return true;
145 }
147  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Waiting for connect", name.c_str());
148 
149  MPI_Comm intercomm, newintra;
150  MPI_Comm_accept( port_name, MPI_INFO_NULL, 0, comm, &intercomm );
151 
152  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Connection accepted", name.c_str());
153 
154  bool high = false;
155  // Complicated way of doing comm = Merge(intercomm)
156  // but necessary
157  MPI_Intercomm_merge(intercomm, high, &newintra);
158  MPI_Comm_disconnect(&intercomm);
159  MPI_Comm_disconnect(&comm);
160  comm = newintra;
161 
162  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Comms merged", name.c_str());
163 
164  return true;
165 }
166 
167 
168 void MpiComm::disconnect(bool disconn) {
169  yCDebug(MPI_CARRIER, "[MpiComm @ %s] split from group : %d", name.c_str(), disconn);
170  MPI_Comm new_comm;
171  MPI_Comm_split(comm, disconn, rank(), &new_comm);
172  MPI_Comm_disconnect(&comm);
173  comm = new_comm;
174  yCDebug(MPI_CARRIER, "[MpiComm @ %s] new rank : %d", name.c_str(), rank());
175 }
MpiControlThread::finalize
void finalize()
Definition: MpiComm.h:31
MpiComm::connect
bool connect(std::string port)
Definition: MpiComm.cpp:116
MpiComm::comm
MPI_Comm comm
Definition: MpiComm.h:59
yarp::os::getpid
int getpid()
Portable wrapper for the getppid() function.
Definition: Os.cpp:94
MpiComm::rank
int rank()
Definition: MpiComm.h:79
MpiComm::accept
bool accept()
Definition: MpiComm.cpp:146
NetType.h
yarp::os::Thread::isRunning
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition: Thread.cpp:108
Log.h
MpiControlThread
Definition: MpiComm.h:27
MpiComm::unique_id
char unique_id[10+MPI_MAX_PROCESSOR_NAME]
Definition: MpiComm.h:58
MpiComm.h
MpiComm::notLocal
bool notLocal(std::string other)
Definition: MpiComm.cpp:108
MpiControlThread::threadRelease
void threadRelease() override
Release method.
Definition: MpiComm.cpp:47
MpiControl
MpiControlThread * MpiControl
Definition: MpiComm.cpp:25
finalizeMPI
void finalizeMPI()
Definition: MpiComm.cpp:27
MpiComm::MpiComm
MpiComm(std::string name)
Definition: MpiComm.cpp:83
yarp::os::NetType::toString
static std::string toString(int x)
Definition: NetType.cpp:138
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yCInfo
#define yCInfo(component,...)
Definition: LogComponent.h:135
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
MpiComm::disconnect
void disconnect(bool disconn)
Definition: MpiComm.cpp:168
MpiComm::port_name
char port_name[MPI_MAX_PORT_NAME]
Definition: MpiComm.h:57
yarp::os::Thread::start
bool start()
Start the new thread running.
Definition: Thread.cpp:96
MpiControlThread::threadInit
bool threadInit() override
Initialization method.
Definition: MpiComm.cpp:53
MPI_CARRIER
const yarp::os::LogComponent & MPI_CARRIER()
Definition: MpiLogComponent.cpp:16