YARP
Yet Another Robot Platform
LocalCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
13 #include <yarp/os/Portable.h>
14 #include <yarp/os/Route.h>
15 #include <yarp/os/SizedWriter.h>
16 
18 
19 using namespace yarp::os;
20 
21 namespace {
22 YARP_OS_LOG_COMPONENT(LOCALCARRIER, "yarp.os.impl.LocalCarrier")
23 } // namespace
24 
26 
28  senderMutex(),
29  receiverMutex(),
30  received(0),
31  sender(nullptr),
32  receiver(nullptr)
33 {
34 }
35 
37 {
38  senderMutex.lock();
39  this->sender = sender;
40 }
41 
43 {
44  received.wait();
45  LocalCarrier* result = receiver;
46  sender = nullptr;
47  senderMutex.unlock();
48  return result;
49 }
50 
52 {
53  receiverMutex.lock();
54  this->receiver = receiver;
55  LocalCarrier* result = sender;
56  received.post();
57  receiverMutex.unlock();
58  return result;
59 }
60 
62 {
63  if (sender == carrier) {
64  senderMutex.unlock();
65  }
66 }
67 
68 
70 {
71  this->owner = owner;
72  this->sender = sender;
73  done = false;
74 }
75 
77 {
78  return *this;
79 }
80 
82 {
83  return *this;
84 }
85 
87 {
88  return localAddress;
89 }
90 
92 {
93  return remoteAddress;
94 }
95 
97 {
98  YARP_UNUSED(tos);
99  return true;
100 }
101 
103 {
104  yCAssert(LOCALCARRIER, false);
105  return b.length();
106 }
107 
109 {
110  YARP_UNUSED(b);
111  yCAssert(LOCALCARRIER, false);
112 }
113 
115 {
116 }
117 
119 {
120 }
121 
123 {
124 }
125 
127 {
128  done = true;
129 }
130 
132 {
133  if (owner != nullptr) {
134  LocalCarrier* owned = owner;
135  owner = nullptr;
136  owned->shutdown();
137  }
138  done = true;
139 }
140 
142 {
143  return !done;
144 }
145 
146 
148  peerMutex(), sent(0), received(0)
149 {
150  ref = nullptr;
151  peer = nullptr;
152  doomed = false;
153 }
154 
156 {
157  shutdown();
158 }
159 
161 {
162  return new LocalCarrier();
163 }
164 
166 {
167  if (!doomed) {
168  peerMutex.lock();
169  peer = nullptr;
170  peerMutex.unlock();
171  }
172 }
173 
175 {
176  if (!doomed) {
177  doomed = true;
178  peerMutex.lock();
179  if (peer != nullptr) {
180  peer->accept(nullptr);
181  LocalCarrier* wasPeer = peer;
182  peer = nullptr;
183  wasPeer->removePeer();
184  }
185  peerMutex.unlock();
186  }
187 }
188 
190 {
191  return "local";
192 }
193 
195 {
196  return false;
197 }
198 
200 {
201  return false;
202 }
203 
205 {
206  return false;
207 }
208 
210 {
211  return true;
212 }
213 
215 {
216  return "LOCALITY";
217 }
218 
220 {
221  if (header.length() == 8) {
222  std::string target = getSpecifierName();
223  for (int i = 0; i < 8; i++) {
224  if (!(target[i] == header.get()[i])) {
225  return false;
226  }
227  }
228  return true;
229  }
230  return false;
231 }
232 
234 {
235  if (header.length() == 8) {
236  std::string target = getSpecifierName();
237  for (int i = 0; i < 8; i++) {
238  header.get()[i] = target[i];
239  }
240  }
241 }
242 
244 {
245  YARP_UNUSED(header);
246 }
247 
249 {
250  portName = proto.getRoute().getFromName();
251 
252  manager.setSender(this);
253 
254  defaultSendHeader(proto);
255  // now switch over to some local structure to communicate
256  peerMutex.lock();
257  peer = manager.getReceiver();
258  yCDebug(LOCALCARRIER,
259  "sender %p sees receiver %p",
260  this,
261  peer);
262  peerMutex.unlock();
263 
264  return true;
265 }
266 
268 {
269  portName = proto.getRoute().getToName();
270  // switch over to some local structure to communicate
271  peerMutex.lock();
272  peer = manager.getSender(this);
273  yCDebug(LOCALCARRIER,
274  "receiver %p (%s) sees sender %p (%s)",
275  this,
276  portName.c_str(),
277  peer,
278  peer->portName.c_str());
279  Route route = proto.getRoute();
280  route.setFromName(peer->portName);
281  proto.setRoute(route);
282  peerMutex.unlock();
283 
284  return true;
285 }
286 
288 {
289  auto* stream = new LocalCarrierStream();
290  if (stream != nullptr) {
291  stream->attach(this, sender);
292  }
293  proto.takeStreams(stream);
294  return true;
295 }
296 
298 {
299  YARP_UNUSED(proto);
300  yarp::os::Portable* ref = writer.getReference();
301  if (ref != nullptr) {
302  peerMutex.lock();
303  if (peer != nullptr) {
304  peer->accept(ref);
305  } else {
306  yCError(LOCALCARRIER, "local send failed - write without peer");
307  }
308  peerMutex.unlock();
309  } else {
310  yCError(LOCALCARRIER, "local send failed - no object");
311  }
312 
313  return true;
314 }
315 
317 {
318  // I am the receiver
319  return becomeLocal(proto, false);
320 }
321 
322 
324 {
325  // I am the sender
326  return becomeLocal(proto, true);
327 }
328 
330 {
331 
332  yCDebug(LOCALCARRIER, "local recv: wait send");
333  sent.wait();
334  yCDebug(LOCALCARRIER, "local recv: got send");
335  proto.setReference(ref);
336  received.post();
337  if (ref != nullptr) {
338  yCDebug(LOCALCARRIER, "local recv: received");
339  } else {
340  yCDebug(LOCALCARRIER, "local recv: shutdown");
341  proto.is().interrupt();
342  return false;
343  }
344 
345  return true;
346 }
347 
349 {
350  this->ref = ref;
351  yCDebug(LOCALCARRIER, "local send: send ref");
352  sent.post();
353  if (ref != nullptr && !doomed) {
354  yCDebug(LOCALCARRIER, "local send: wait receipt");
355  received.wait();
356  yCDebug(LOCALCARRIER, "local send: received");
357  }
358 }
yarp::os::Portable
This is a base class for objects that can be both read from and be written to the YARP network.
Definition: Portable.h:29
yarp::os::impl::LocalCarrierManager::getSender
LocalCarrier * getSender(LocalCarrier *receiver)
Definition: LocalCarrier.cpp:51
yarp::os::impl::LocalCarrier::getSpecifierName
virtual std::string getSpecifierName() const
Definition: LocalCarrier.cpp:214
yarp::os::ConnectionState::setReference
virtual void setReference(yarp::os::Portable *ref)=0
Give a direct pointer to an object being sent on the connection.
yarp::os::impl::LocalCarrierManager::getReceiver
LocalCarrier * getReceiver()
Definition: LocalCarrier.cpp:42
yarp::os::Carrier
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:48
yarp::os::impl::LocalCarrier::respondToHeader
bool respondToHeader(ConnectionState &proto) override
Respond to the header.
Definition: LocalCarrier.cpp:316
yarp::os::impl::LocalCarrier::create
Carrier * create() const override
Factory method.
Definition: LocalCarrier.cpp:160
yarp::os::impl::LocalCarrier::accept
void accept(yarp::os::Portable *ref)
Definition: LocalCarrier.cpp:348
yarp::os::OutputStream
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:25
yarp::os::impl::LocalCarrier::ref
yarp::os::Portable * ref
Definition: LocalCarrier.h:123
yarp::os::InputStream::interrupt
virtual void interrupt()
Interrupt the stream.
Definition: InputStream.cpp:45
Portable.h
yarp::os::impl::LocalCarrierStream::attach
void attach(LocalCarrier *owner, bool sender)
Definition: LocalCarrier.cpp:69
yarp::os::impl::LocalCarrierManager
Coordinate ports communicating locally within a process.
Definition: LocalCarrier.h:32
yarp::os::impl::LocalCarrier::expectExtraHeader
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
Definition: LocalCarrier.cpp:267
yarp::os::impl::LocalCarrierStream::setTypeOfService
bool setTypeOfService(int tos) override
Definition: LocalCarrier.cpp:96
YARP_UNUSED
#define YARP_UNUSED(var)
Definition: api.h:159
yarp::os::Route
Information about a connection between two ports.
Definition: Route.h:32
LogComponent.h
yarp::os::impl::LocalCarrierManager::LocalCarrierManager
LocalCarrierManager()
Definition: LocalCarrier.cpp:27
yarp::os::impl::LocalCarrierStream::write
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
Definition: LocalCarrier.cpp:108
yarp::os::impl::LocalCarrier::LocalCarrier
LocalCarrier()
Definition: LocalCarrier.cpp:147
yarp::os::impl::LocalCarrierManager::revoke
void revoke(LocalCarrier *carrier)
Definition: LocalCarrier.cpp:61
yarp::os::impl::LocalCarrier::~LocalCarrier
virtual ~LocalCarrier()
Definition: LocalCarrier.cpp:155
yarp::os::impl::LocalCarrier
A carrier for communicating locally within a process.
Definition: LocalCarrier.h:91
yarp::os::impl::LocalCarrier::isConnectionless
bool isConnectionless() const override
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
Definition: LocalCarrier.cpp:199
yarp::os::SizedWriter::getReference
virtual Portable * getReference()=0
yarp::os::ConnectionState::takeStreams
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
yarp::os::ConnectionState::getRoute
virtual const Route & getRoute() const =0
Get the route associated with this connection.
yarp::os::impl::LocalCarrierStream::interrupt
void interrupt() override
Interrupt the stream.
Definition: LocalCarrier.cpp:126
Route.h
ConnectionState.h
yarp::os::impl::LocalCarrier::expectReplyToHeader
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
Definition: LocalCarrier.cpp:323
yarp::os::impl::LocalCarrierStream::beginPacket
void beginPacket() override
Mark the beginning of a logical packet.
Definition: LocalCarrier.cpp:118
yarp::os::impl::LocalCarrierStream::getLocalAddress
const Contact & getLocalAddress() const override
Get the address of the local side of the stream.
Definition: LocalCarrier.cpp:86
yarp::os::impl::LocalCarrierStream::getOutputStream
OutputStream & getOutputStream() override
Get an OutputStream to write to.
Definition: LocalCarrier.cpp:81
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::ConnectionState::setRoute
virtual void setRoute(const Route &route)=0
Set the route associated with this connection.
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
yarp::os::impl::LocalCarrier::setParameters
void setParameters(const Bytes &header) override
Configure this carrier based on the first 8 bytes of the connection.
Definition: LocalCarrier.cpp:243
LocalCarrier.h
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
yarp::os::impl::LocalCarrierStream::getRemoteAddress
const Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
Definition: LocalCarrier.cpp:91
yarp::os::impl::LocalCarrier::write
bool write(ConnectionState &proto, SizedWriter &writer) override
Write a message.
Definition: LocalCarrier.cpp:297
yarp::os::impl::LocalCarrierStream::endPacket
void endPacket() override
Mark the end of a logical packet (see beginPacket).
Definition: LocalCarrier.cpp:122
yarp::os::impl::LocalCarrierStream
A stream for communicating locally within a process.
Definition: LocalCarrier.h:56
yarp::os::impl::LocalCarrier::sendHeader
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
Definition: LocalCarrier.cpp:248
yarp::os::impl::LocalCarrier::peer
LocalCarrier * peer
Definition: LocalCarrier.h:124
yarp::os::impl::LocalCarrier::manager
static LocalCarrierManager manager
Definition: LocalCarrier.h:130
yarp::os::impl::LocalCarrierManager::setSender
void setSender(LocalCarrier *sender)
Definition: LocalCarrier.cpp:36
yarp::os::impl::LocalCarrier::canEscape
bool canEscape() const override
Check if carrier can encode administrative messages, as opposed to just user data.
Definition: LocalCarrier.cpp:204
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
yarp::os::impl::LocalCarrierStream::reset
void reset() override
Reset the stream.
Definition: LocalCarrier.cpp:114
yarp::os::impl::LocalCarrier::doomed
bool doomed
Definition: LocalCarrier.h:122
yCAssert
#define yCAssert(component, x)
Definition: LogComponent.h:172
yarp::os::ConnectionState
The basic state of a connection - route, streams in use, etc.
Definition: ConnectionState.h:31
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yarp::os::impl::LocalCarrierStream::isOk
bool isOk() const override
Check if the stream is ok or in an error state.
Definition: LocalCarrier.cpp:141
yarp::os::impl::LocalCarrierStream::close
void close() override
Terminate the stream.
Definition: LocalCarrier.cpp:131
yarp::os::InputStream::read
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:23
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::impl::LocalCarrier::getHeader
void getHeader(Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
Definition: LocalCarrier.cpp:233
yarp::os::Route::setFromName
void setFromName(const std::string &fromName)
Set the source of the route.
Definition: Route.cpp:101
yarp::os::ConnectionState::is
InputStream & is()
Shorthand for getInputStream()
Definition: ConnectionState.h:125
yarp::os::Contact
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
yarp::os::Route::getToName
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:106
yarp::os::impl::LocalCarrier::isLocal
bool isLocal() const override
Check if carrier operates within a single process.
Definition: LocalCarrier.cpp:209
yarp::os::impl::LocalCarrierStream::getInputStream
InputStream & getInputStream() override
Get an InputStream to read from.
Definition: LocalCarrier.cpp:76
yarp::os::impl::LocalCarrier::requireAck
bool requireAck() const override
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
Definition: LocalCarrier.cpp:194
yarp::os::impl::LocalCarrier::removePeer
void removePeer()
Definition: LocalCarrier.cpp:165
yarp::os::impl::LocalCarrier::getName
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
Definition: LocalCarrier.cpp:189
yarp::os::Route::getFromName
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
yarp::os::InputStream
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:29
YARP_OS_LOG_COMPONENT
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
yarp::os::impl::LocalCarrier::becomeLocal
virtual bool becomeLocal(ConnectionState &proto, bool sender)
Definition: LocalCarrier.cpp:287
yarp::os::impl::LocalCarrier::checkHeader
bool checkHeader(const Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
Definition: LocalCarrier.cpp:219
SizedWriter.h
yarp::os::impl::LocalCarrier::expectIndex
bool expectIndex(ConnectionState &proto) override
Expect a message header, if there is one for this carrier.
Definition: LocalCarrier.cpp:329
yarp::os::SizedWriter
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:36
yarp::os::impl::LocalCarrier::shutdown
void shutdown()
Definition: LocalCarrier.cpp:174