YARP
Yet Another Robot Platform
MpiCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "MpiCarrier.h"
11 
12 #include <yarp/os/Route.h>
13 #include <sys/types.h>
14 
15 using namespace yarp::os;
16 
17 MpiCarrier::MpiCarrier() : stream(nullptr), comm(nullptr) {
18 }
19 
21  yCTrace(MPI_CARRIER, "[MpiCarrier @ %s] Destructor called", route.c_str() );
22 }
23 
24 void MpiCarrier::getHeader(Bytes& header) const {
25  for (size_t i=0; i<8 && i<header.length(); i++) {
26  header.get()[i] = target.c_str()[i];
27  }
28 }
29 
30 bool MpiCarrier::checkHeader(const Bytes& header) {
31  if (header.length()!=8) {
32  return false;
33  }
34  for (int i=0; i<8; i++) {
35  if (header.get()[i] != target.c_str()[i]) {
36  return false;
37  }
38  }
39  return true;
40 }
41 
43  // Send the "magic number" for this carrier
44  ManagedBytes header(8);
45  getHeader(header.bytes());
46  proto.os().write(header.bytes());
47  if (!proto.os().isOk()) return false;
48 
49  // Now we can do whatever we want, as long as somehow
50  // we also send the name of the originating port
51 
52  name = proto.getRoute().getFromName();
53  other = proto.getRoute().getToName();
54  Bytes b2((char*)name.c_str(),name.length());
55  proto.os().write(b2);
56  proto.os().write('\r');
57  proto.os().write('\n');
58 
59  // Sender
60  route = name + "->" + other;
61 
62  createStream(true);
63 
64  if (!MpiControl) return false;
65  if (! MpiControl->isRunning())
66  return false;
67  comm->openPort();
68  char* port = comm->port_name;
69  char* uid = comm->unique_id;
70 
71  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] setting up MpiPort '%s'", route.c_str(), port);
72 
73  Bytes b4(uid,strlen(uid));
74  proto.os().write(b4);
75  proto.os().write('\r');
76  proto.os().write('\n');
77 
78  Bytes b3(port,strlen(port));
79  proto.os().write(b3);
80  proto.os().write('\r');
81  proto.os().write('\n');
82  proto.os().flush();
83 
84  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Header sent", route.c_str());
85 
86  return proto.os().isOk();
87 }
88 
89 
90 
92  // interpret everything that sendHeader wrote
93  name = proto.getRoute().getToName();
94 
95  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Waiting for header", route.c_str());
96 
97  other = proto.is().readLine();
98  Route r = proto.getRoute();
99  r.setFromName(other);
100  proto.setRoute(r);
101 
102  // Receiver
103  route = name + "<-" + other;
104 
105  createStream(false);
106  if (!MpiControl) return false;
107  if (! MpiControl->isRunning())
108  return false;
109 
110  std::string other_id = proto.is().readLine();
111  bool notLocal = comm->notLocal(other_id);
112 
113  port = proto.is().readLine();
114 
115  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Header received", route.c_str());
116 
117  return notLocal && proto.is().isOk();
118 }
119 
121  // SWITCH TO NEW STREAM TYPE
122  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] trying to connect to MpiPort '%s'", route.c_str(), port.c_str());
123 
124  if (!comm->connect(port)) {
125  delete stream;
126  return false;
127  }
128  proto.takeStreams(stream);
129 
130  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] MpiStream successfully setup", route.c_str() );
131 
132  return proto.is().isOk();
133 }
134 
136  // SWITCH TO NEW STREAM TYPE
137  if (!comm->accept()) {
138  delete stream;
139  return false;
140  }
141  proto.takeStreams(stream);
142 
143  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] MpiStream successfully setup", route.c_str() );
144 
145  return proto.os().isOk();
146 }
MpiCarrier::createStream
virtual void createStream(bool sender)=0
MpiComm::connect
bool connect(std::string port)
Definition: MpiComm.cpp:116
yarp::os::OutputStream::write
virtual void write(char ch)
Write a single byte to the stream.
Definition: OutputStream.cpp:17
MpiCarrier::checkHeader
bool checkHeader(const yarp::os::Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
Definition: MpiCarrier.cpp:30
MpiCarrier::~MpiCarrier
virtual ~MpiCarrier()
Definition: MpiCarrier.cpp:20
MpiCarrier::name
std::string name
Definition: MpiCarrier.h:34
MpiCarrier::port
std::string port
Definition: MpiCarrier.h:33
MpiCarrier::comm
MpiComm * comm
Definition: MpiCarrier.h:32
yarp::os::Route
Information about a connection between two ports.
Definition: Route.h:32
MpiCarrier::respondToHeader
bool respondToHeader(yarp::os::ConnectionState &proto) override
Respond to the header.
Definition: MpiCarrier.cpp:120
MpiCarrier::MpiCarrier
MpiCarrier()
Definition: MpiCarrier.cpp:17
MpiComm::accept
bool accept()
Definition: MpiComm.cpp:146
yarp::os::Thread::isRunning
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition: Thread.cpp:108
MpiCarrier::route
std::string route
Definition: MpiCarrier.h:34
yarp::os::ManagedBytes::bytes
const Bytes & bytes() const
Definition: ManagedBytes.cpp:177
MpiCarrier::stream
MpiStream * stream
Definition: MpiCarrier.h:31
yarp::os::ConnectionState::takeStreams
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
yarp::os::ConnectionState::os
OutputStream & os()
Shorthand for getOutputStream()
Definition: ConnectionState.h:117
yarp::os::ConnectionState::getRoute
virtual const Route & getRoute() const =0
Get the route associated with this connection.
yarp::os::ManagedBytes
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:25
Route.h
MpiCarrier::getHeader
void getHeader(yarp::os::Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
Definition: MpiCarrier.cpp:24
MpiComm::unique_id
char unique_id[10+MPI_MAX_PROCESSOR_NAME]
Definition: MpiComm.h:58
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::ConnectionState::setRoute
virtual void setRoute(const Route &route)=0
Set the route associated with this connection.
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
MpiComm::openPort
void openPort()
Definition: MpiComm.h:73
MpiCarrier::target
std::string target
Definition: MpiCarrier.h:35
MpiComm::notLocal
bool notLocal(std::string other)
Definition: MpiComm.cpp:108
yarp::os::InputStream::readLine
std::string readLine(const char terminal='\n', bool *success=nullptr)
Read a block of text terminated with a specific marker (or EOF).
Definition: InputStream.cpp:57
yarp::os::OutputStream::flush
virtual void flush()
Make sure all pending write operations are finished.
Definition: OutputStream.cpp:28
MpiControl
MpiControlThread * MpiControl
Definition: MpiComm.cpp:25
MpiCarrier.h
yarp::os::InputStream::isOk
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
yarp::os::ConnectionState
The basic state of a connection - route, streams in use, etc.
Definition: ConnectionState.h:31
yarp::os::OutputStream::isOk
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
MpiCarrier::sendHeader
bool sendHeader(yarp::os::ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
Definition: MpiCarrier.cpp:42
yarp::os::Route::setFromName
void setFromName(const std::string &fromName)
Set the source of the route.
Definition: Route.cpp:101
yarp::os::ConnectionState::is
InputStream & is()
Shorthand for getInputStream()
Definition: ConnectionState.h:125
yarp::os::Route::getToName
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:106
MpiComm::port_name
char port_name[MPI_MAX_PORT_NAME]
Definition: MpiComm.h:57
MpiCarrier::expectReplyToHeader
bool expectReplyToHeader(yarp::os::ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
Definition: MpiCarrier.cpp:135
yCTrace
#define yCTrace(component,...)
Definition: LogComponent.h:88
MPI_CARRIER
const yarp::os::LogComponent & MPI_CARRIER()
Definition: MpiLogComponent.cpp:16
yarp::os::Route::getFromName
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
MpiCarrier::other
std::string other
Definition: MpiCarrier.h:34
MpiCarrier::expectSenderSpecifier
bool expectSenderSpecifier(yarp::os::ConnectionState &proto) override
Expect the name of the sending port.
Definition: MpiCarrier.cpp:91