YARP
Yet Another Robot Platform
MpiBcastCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "MpiBcastCarrier.h"
11 
12 #include <yarp/os/Network.h>
13 #include <yarp/os/Log.h>
14 
15 using namespace yarp::os;
16 
18  yCTrace(MPI_CARRIER, "[MpiBcastCarrier @ %s] Destructor", name.c_str());
19 }
20 
22  yCDebug(MPI_CARRIER, "[MpiBcastCarrier @ %s] Closing carrier", name.c_str() );
23  if (electionMember) {
24  getCaster().remove(name, this);
25  MpiBcastCarrier* elect = getCaster().getElect(name);
26  if (elect == nullptr) {
27  delete comm;
28  }
29  } else {
30  delete comm;
31  }
32 }
33 
34 void MpiBcastCarrier::createStream(bool sender) {
35  if (sender) {
36  MpiBcastCarrier* elect = getCaster().getElect(name);
37  if (elect != nullptr) {
38  comm = elect->comm;
39  }
40  else {
41  comm = new MpiComm(name+"->bcast");
42  }
43  stream = new MpiBcastStream(name+"->bcast", comm);
44  auto* mpiStream = dynamic_cast<MpiBcastStream*> (stream);
45  if(mpiStream)
46  mpiStream->startJoin();
47  getCaster().add(name, this);
48  electionMember = true;
49  } else {
50  comm = new MpiComm(route);
51  stream = new MpiBcastStream(route, comm);
52  }
53 
54 }
55 
57  comm->sema.wait();
58  yCDebug(MPI_CARRIER, "[MpiBcastCarrier @ %s] Disconnect : %s", name.c_str(), other.c_str());
59  int cmd = CMD_DISCONNECT;
60  MPI_Bcast(&cmd, 1, MPI_INT, 0,comm->comm);
61  int length = other.length() + name.length() + 3;
62  char* remote_c = new char[length];
63  strcpy(remote_c, (other+"<-"+name).c_str());
64  MPI_Bcast(&length, 1, MPI_INT, 0,comm->comm);
65  MPI_Bcast(remote_c, length, MPI_CHAR, 0,comm->comm);
66  delete [] remote_c;
67  comm->disconnect(false);
68  comm->sema.post();
69 
70  //dynamic_cast<MpiBcastStream*> (stream)->disconnect(other);
71  }
72 
73 
74 
75 /*
76  * Adopted from MCastCarrier
77  * ----------------------------
78  */
79 ElectionOf<yarp::os::PeerRecord<MpiBcastCarrier> > *MpiBcastCarrier::caster = nullptr;
80 
81 ElectionOf<yarp::os::PeerRecord<MpiBcastCarrier> >& MpiBcastCarrier::getCaster() {
83  if (caster==nullptr) {
86  if (caster==nullptr) {
87  yCError(MPI_CARRIER, "No memory for MpiBcastCarrier::caster");
88  std::exit(1);
89  }
90  } else {
92  }
93  return *caster;
94 }
96  MpiBcastCarrier *elect = getCaster().getElect(name);
97  return elect==this || elect==nullptr;
98 }
99 
101  return isElect();
102 }
103 
104 /*
105  * ----------------------------
106  */
Network.h
yarp::os::NetworkBase::lock
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1462
MpiBcastStream
Implements communication via MPI broadcast.
Definition: MpiBcastStream.h:25
MpiCarrier::comm
MpiComm * comm
Definition: MpiCarrier.h:32
CMD_DISCONNECT
#define CMD_DISCONNECT
Definition: MpiBcastStream.h:18
MpiBcastCarrier::close
void close() override
Close the carrier.
Definition: MpiBcastCarrier.cpp:21
MpiBcastCarrier::prepareDisconnect
void prepareDisconnect() override
Do cleanup and preparation for the coming disconnect, if necessary.
Definition: MpiBcastCarrier.cpp:56
MpiBcastCarrier
Carrier for port communicating via MPI broadcast.
Definition: MpiBcastCarrier.h:31
Log.h
MpiBcastCarrier::isActive
bool isActive() const override
Check if carrier is alive and error free.
Definition: MpiBcastCarrier.cpp:100
MpiBcastStream::startJoin
void startJoin()
Definition: MpiBcastStream.cpp:15
MpiBcastCarrier::isElect
virtual bool isElect() const
Definition: MpiBcastCarrier.cpp:95
yarp::os::ElectionOf
Pick one of a set of peers to be "active".
Definition: Election.h:64
MpiBcastCarrier.h
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
MpiBcastCarrier::~MpiBcastCarrier
virtual ~MpiBcastCarrier()
Definition: MpiBcastCarrier.cpp:17
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::NetworkBase::unlock
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1467
MpiComm
Wrapper for MPI_Comm communicator.
Definition: MpiComm.h:53
yCTrace
#define yCTrace(component,...)
Definition: LogComponent.h:88
MPI_CARRIER
const yarp::os::LogComponent & MPI_CARRIER()
Definition: MpiLogComponent.cpp:16
MpiBcastCarrier::createStream
void createStream(bool sender) override
Definition: MpiBcastCarrier.cpp:34