YARP
Yet Another Robot Platform
PortCoreOutputUnit.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/os/Name.h>
13 #include <yarp/os/PortInfo.h>
14 #include <yarp/os/PortReport.h>
15 #include <yarp/os/Portable.h>
16 #include <yarp/os/Time.h>
20 
21 namespace {
22 YARP_OS_LOG_COMPONENT(PORTCOREOUTPUTUNIT, "yarp.os.impl.PortCoreOutputUnit")
23 } // namespace
24 
25 using namespace yarp::os::impl;
26 using namespace yarp::os;
27 
29  PortCoreUnit(owner, index),
30  op(op),
31  closing(false),
32  finished(false),
33  running(false),
34  threaded(false),
35  sending(false),
36  phase(1),
37  activate(0),
38  trackerMutex(),
39  cachedWriter(nullptr),
40  cachedReader(nullptr),
41  cachedCallback(nullptr),
42  cachedTracker(nullptr)
43 {
44  yCAssert(PORTCOREOUTPUTUNIT, op != nullptr);
45 }
46 
48 {
49  closeMain();
50 }
51 
52 
54 {
55  phase.wait();
56 
57  if (!threaded) {
58  running = false;
59  sending = false;
61  phase.post();
62  return true;
63  }
64 
65  bool result = PortCoreUnit::start();
66  if (result) {
67  phase.wait();
68  phase.post();
69  } else {
70  phase.post();
71  }
72 
73  return result;
74 }
75 
76 
78 {
79  running = true;
80  sending = false;
81 
82  // By default, we don't start up a thread for outputs.
83 
84  if (!threaded) {
86  phase.post();
87  } else {
88  phase.post();
89  Route r = getRoute();
90  while (!closing) {
91  yCDebug(PORTCOREOUTPUTUNIT, "waiting");
92  activate.wait();
93  yCDebug(PORTCOREOUTPUTUNIT, "woken");
94  if (!closing) {
95  if (sending) {
96  yCDebug(PORTCOREOUTPUTUNIT, "write something in background");
97  sendHelper();
98  yCDebug(PORTCOREOUTPUTUNIT, "wrote something in background");
99  trackerMutex.lock();
100  if (cachedTracker != nullptr) {
101  void* t = cachedTracker;
102  cachedTracker = nullptr;
103  sending = false;
105  } else {
106  sending = false;
107  }
108  trackerMutex.unlock();
109  }
110  }
111  yCDebug(PORTCOREOUTPUTUNIT, "wrote something in background");
112  }
113  yCDebug(PORTCOREOUTPUTUNIT, "thread closing");
114  sending = false;
115  }
116 }
117 
118 
120 {
121  if (op != nullptr) {
122  Route route = op->getRoute();
123  setMode();
124  getOwner().reportUnit(this, true);
125 
126  std::string msg = std::string("Sending output from ") + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
127  if (Name(route.getToName()).isRooted()) {
128  if (Name(route.getFromName()).isRooted()) {
129  yCInfo(PORTCOREOUTPUTUNIT, "%s", msg.c_str());
130  }
131  }
132 
133  // Report the new connection
134  PortInfo info;
135  info.message = msg;
137  info.incoming = false;
138  info.created = true;
139  info.sourceName = route.getFromName();
140  info.targetName = route.getToName();
141  info.portName = info.sourceName;
142  info.carrierName = route.getCarrierName();
143  getOwner().report(info);
144  }
145 
146  // no thread component
147  running = false;
148 }
149 
150 void PortCoreOutputUnit::closeBasic()
151 {
152  bool waitForOther = false;
153  if (op != nullptr) {
155  Route route = op->getRoute();
156  if (op->getConnection().isConnectionless() || op->getConnection().isBroadcast()) {
157  yCInfo(PORTCOREOUTPUTUNIT, "output for route %s asking other side to close by out-of-band means",
158  route.toString().c_str());
160  route.getFromName(),
161  true);
162  } else {
163  if (op->getConnection().canEscape()) {
165  op->getConnection().isBareMode());
166  PortCommand pc('\0', std::string("q"));
167  pc.write(buf);
168  //printf("Asked for %s to close...\n",
169  // op->getRoute().toString().c_str());
170  waitForOther = op->write(buf);
171  }
172  }
173 
174  std::string msg = std::string("Removing output from ") + route.getFromName() + " to " + route.getToName();
175 
176  if (Name(route.getToName()).isRooted()) {
177  if (Name(route.getFromName()).isRooted()) {
178  yCInfo(PORTCOREOUTPUTUNIT, "%s", msg.c_str());
179  }
180  }
181 
182  getOwner().reportUnit(this, false);
183 
184  // Report the disappearing connection
185  PortInfo info;
186  info.message = msg;
188  info.incoming = false;
189  info.created = false;
190  info.sourceName = route.getFromName();
191  info.targetName = route.getToName();
192  info.portName = info.sourceName;
193  info.carrierName = route.getCarrierName();
194  getOwner().report(info);
195  }
196 
197 
198  if (op != nullptr) {
199  if (waitForOther) {
200  // quit is only acknowledged in certain conditions
201  if (op->getConnection().isTextMode() && op->getConnection().supportReply()) {
202  InputStream& is = op->getInputStream();
203  ManagedBytes dummy(1);
204  is.read(dummy.bytes());
205  }
206  }
207  op->close();
208  delete op;
209  op = nullptr;
210  }
211 }
212 
213 void PortCoreOutputUnit::closeMain()
214 {
215  if (finished) {
216  return;
217  }
218 
219  yCDebug(PORTCOREOUTPUTUNIT, "closing");
220 
221  if (running) {
222  // give a kick (unfortunately unavoidable)
223 
224  if (op != nullptr) {
225  op->interrupt();
226  }
227 
228  closing = true;
229  phase.post();
230  activate.post();
231  join();
232  }
233 
234  yCDebug(PORTCOREOUTPUTUNIT, "internal join");
235 
236  closeBasic();
237  running = false;
238  closing = false;
239  finished = true;
240 
241  yCDebug(PORTCOREOUTPUTUNIT, "closed");
242 }
243 
244 
246 {
247  if (op != nullptr) {
248  Route r = op->getRoute();
249  op->beginWrite();
250  return r;
251  }
252  return PortCoreUnit::getRoute();
253 }
254 
255 bool PortCoreOutputUnit::sendHelper()
256 {
257  bool replied = false;
258  if (op != nullptr) {
259  bool done = false;
261  op->getConnection().isBareMode());
262  if (cachedReader != nullptr) {
263  buf.setReplyHandler(*cachedReader);
264  }
265 
266  if (op->getSender().modifiesOutgoingData()) {
267  if (op->getSender().acceptOutgoingData(*cachedWriter)) {
268  cachedWriter = &op->getSender().modifyOutgoingData(*cachedWriter);
269  } else {
270  return (done = true);
271  }
272  }
273 
274  if (op->getConnection().isLocal()) {
275  // WARNING Cast away const qualifier.
276  // This may actually cause bugs when using the local carrier
277  // with something that is actually const (i.e. that is using
278  // some parts of memory that cannot be written.
279  auto* pw = const_cast<yarp::os::PortWriter*>(cachedWriter);
280  auto* p = dynamic_cast<yarp::os::Portable*>(pw);
281  if (p == nullptr) {
282  yCError(PORTCOREOUTPUTUNIT, "cast failed.");
283  return false;
284  }
285  buf.setReference(p);
286  } else {
287  yCAssert(PORTCOREOUTPUTUNIT, cachedWriter != nullptr);
288  bool ok = cachedWriter->write(buf);
289  if (!ok) {
290  done = true;
291  }
292 
293  bool suppressReply = (buf.getReplyHandler() == nullptr);
294 
295  if (!done) {
296  if (!op->getConnection().canEscape()) {
297  if (!cachedEnvelope.empty()) {
298  op->getConnection().handleEnvelope(cachedEnvelope);
299  }
300  } else {
301  buf.addToHeader();
302 
303  if (!cachedEnvelope.empty()) {
304  if (cachedEnvelope == "__ADMIN") {
305  PortCommand pc('a', "");
306  pc.write(buf);
307  } else {
308  PortCommand pc('\0', std::string(suppressReply ? "D " : "d ") + cachedEnvelope);
309  pc.write(buf);
310  }
311  } else {
312  PortCommand pc(suppressReply ? 'D' : 'd', "");
313  pc.write(buf);
314  }
315  }
316  }
317  }
318 
319  if (!done) {
320  if (op->getConnection().isActive()) {
321  replied = op->write(buf);
322  if (replied && op->getSender().modifiesReply() && cachedReader != nullptr) {
323  cachedReader = &op->getSender().modifyReply(*cachedReader);
324  }
325  }
326  if (!op->isOk()) {
327  done = true;
328  }
329  }
330 
331  if (buf.dropRequested()) {
332  done = true;
333  }
334  if (done) {
335  closeBasic();
336  closing = true;
337  finished = true;
338  setDoomed();
339  }
340  }
341 
342 
343  return replied;
344 }
345 
347  yarp::os::PortReader* reader,
348  const yarp::os::PortWriter* callback,
349  void* tracker,
350  const std::string& envelopeString,
351  bool waitAfter,
352  bool waitBefore,
353  bool* gotReply)
354 {
355  bool replied = false;
356 
357  if (op != nullptr) {
358  if (!op->getConnection().isActive()) {
359  return tracker;
360  }
361  }
362 
363  if (!waitBefore || !waitAfter) {
364  if (!running) {
365  // we must have a thread if we're going to be skipping waits
366  threaded = true;
367  yCDebug(PORTCOREOUTPUTUNIT, "starting a thread for output");
368  start();
369  yCDebug(PORTCOREOUTPUTUNIT, "started a thread for output");
370  }
371  }
372 
373  if ((!waitBefore) && waitAfter) {
374  yCError(PORTCOREOUTPUTUNIT, "chosen port wait combination not yet implemented");
375  }
376  if (!sending) {
377  cachedWriter = &writer;
378  cachedReader = reader;
379  cachedCallback = callback;
380  cachedEnvelope = envelopeString;
381 
382  sending = true;
383  if (waitAfter) {
384  replied = sendHelper();
385  sending = false;
386  } else {
387  trackerMutex.lock();
388  void* nextTracker = tracker;
389  tracker = cachedTracker;
390  cachedTracker = nextTracker;
391  activate.post();
392  trackerMutex.unlock();
393  }
394  } else {
395  yCDebug(PORTCOREOUTPUTUNIT, "skipping connection tagged as sending something");
396  }
397 
398  if (waitAfter) {
399  if (gotReply != nullptr) {
400  *gotReply = replied;
401  }
402  }
403 
404  // return tracker that we no longer need
405  return tracker;
406 }
407 
408 
410 {
411  void* tracker = nullptr;
412  trackerMutex.lock();
413  if (!sending) {
414  tracker = cachedTracker;
415  cachedTracker = nullptr;
416  }
417  trackerMutex.unlock();
418  return tracker;
419 }
420 
422 {
423  return sending;
424 }
425 
427 {
428  if (op != nullptr) {
429  op->getConnection().setCarrierParams(params);
430  }
431 }
432 
434 {
435  if (op != nullptr) {
436  op->getConnection().getCarrierParams(params);
437  }
438 }
439 
441 {
442  return op;
443 }
yarp::os::Route::toString
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:141
yarp::os::impl::PortCoreUnit::setDoomed
void setDoomed()
Request that this connection be shut down as soon as possible.
Definition: PortCoreUnit.h:100
yarp::os::Connection::acceptOutgoingData
virtual bool acceptOutgoingData(const PortWriter &writer)=0
Determine whether outgoing data should be accepted.
yarp::os::OutputProtocol::interrupt
virtual void interrupt()=0
yarp::os::Portable
This is a base class for objects that can be both read from and be written to the YARP network.
Definition: Portable.h:29
yarp::os::Route::getCarrierName
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:126
yarp::os::PortInfo::PORTINFO_CONNECTION
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition: PortInfo.h:43
yarp::os::impl::ThreadImpl::start
virtual bool start()
Definition: ThreadImpl.cpp:187
yarp::os::impl::PortCoreUnit
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:30
yarp::os::impl::PortCoreOutputUnit::run
void run() override
The body of a thread managing background sends.
Definition: PortCoreOutputUnit.cpp:77
yarp::os::OutputProtocol
The output side of an active connection between two ports.
Definition: OutputProtocol.h:33
t
float t
Definition: FfmpegWriter.cpp:74
yarp::os::impl::PortCoreOutputUnit::getRoute
Route getRoute() override
Definition: PortCoreOutputUnit.cpp:245
yarp::os::impl::PortCoreOutputUnit::getCarrierParams
void getCarrierParams(yarp::os::Property &params) override
Definition: PortCoreOutputUnit.cpp:433
yarp::os::impl::PortCoreUnit::getRoute
virtual Route getRoute()
Definition: PortCoreUnit.h:83
yarp::os::Connection::isActive
virtual bool isActive() const =0
Check if carrier is alive and error free.
yarp::os::Connection::isBareMode
virtual bool isBareMode() const
Check if carrier excludes type information from payload.
Definition: Connection.cpp:20
Portable.h
yarp::os::impl::PortCoreOutputUnit::start
bool start() override
Prepare to serve this output.
Definition: PortCoreOutputUnit.cpp:53
yarp::os::OutputProtocol::getRoute
virtual const Route & getRoute() const =0
yarp::os::impl::PortCommand
Simple Readable and Writable object representing a command to a YARP port.
Definition: PortCommand.h:29
yarp::os::Connection::isTextMode
virtual bool isTextMode() const =0
Check if carrier is textual in nature.
yarp::os::impl::BufferedConnectionWriter
A helper for creating cached object descriptions.
Definition: BufferedConnectionWriter.h:50
yarp::os::PortInfo::created
bool created
True if a connection is created, false if destroyed.
Definition: PortInfo.h:57
yarp::os::Connection::isLocal
virtual bool isLocal() const =0
Check if carrier operates within a single process.
PortCoreOutputUnit.h
yarp::os::Connection::modifyReply
virtual PortReader & modifyReply(PortReader &reader)=0
Modify reply payload data, if appropriate.
yarp::os::Connection::modifiesReply
virtual bool modifiesReply() const =0
Check if this carrier modifies outgoing data through the Carrier::modifyReply method.
yarp::os::impl::PortCore
Definition: PortCore.h:155
yarp::os::Route
Information about a connection between two ports.
Definition: Route.h:32
LogComponent.h
yarp::os::OutputProtocol::getInputStream
virtual InputStream & getInputStream()=0
Access the input stream associated with the connection.
yarp::os::PortWriter
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
yarp::os::Connection::handleEnvelope
virtual void handleEnvelope(const std::string &envelope)=0
Carriers that do not distinguish data from administrative headers (i.e.
yarp::os::Semaphore::wait
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:99
yarp::os::Connection::canEscape
virtual bool canEscape() const =0
Check if carrier can encode administrative messages, as opposed to just user data.
yarp::os::PortInfo
Information about a port connection or event.
Definition: PortInfo.h:29
yarp::os::OutputProtocol::getSender
virtual Connection & getSender()=0
It is possible to chain a basic connection with a modifier.
yarp::os::PortInfo::carrierName
std::string carrierName
Name of protocol type, if releveant.
Definition: PortInfo.h:69
Name.h
yarp::os::impl::PortCore::notifyCompletion
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
Definition: PortCore.cpp:1437
yarp::os::impl::PortCoreOutputUnit::runSingleThreaded
virtual void runSingleThreaded()
Perform send operations without a separate thread.
Definition: PortCoreOutputUnit.cpp:119
yarp::os::NetworkBase::disconnectInput
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
Definition: Network.cpp:1555
yarp::os::Semaphore::post
void post()
Increment the counter.
Definition: Semaphore.cpp:114
yarp::os::PortInfo::targetName
std::string targetName
Name of connection target, if any.
Definition: PortInfo.h:66
PortCommand.h
yarp::os::ManagedBytes
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:25
yarp::os::PortReader
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
yarp::os::Connection::prepareDisconnect
virtual void prepareDisconnect()=0
Do cleanup and preparation for the coming disconnect, if necessary.
yarp::os::PortInfo::tag
int tag
Type of information.
Definition: PortInfo.h:51
yarp::os::impl::PortCoreOutputUnit::send
void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelopeString, bool waitAfter, bool waitBefore, bool *gotReply) override
Send a message on the connection.
Definition: PortCoreOutputUnit.cpp:346
BufferedConnectionWriter.h
yarp::os::impl::PortCore::reportUnit
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
Definition: PortCore.cpp:2893
yarp::os::impl::ThreadImpl::join
int join(double seconds=-1)
Definition: ThreadImpl.cpp:123
yarp::os::PortInfo::incoming
bool incoming
True if a connection is incoming, false if outgoing.
Definition: PortInfo.h:54
yarp::os::OutputProtocol::getConnection
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
yarp::os::PortInfo::portName
std::string portName
Name of port.
Definition: PortInfo.h:60
yarp::os::Connection::supportReply
virtual bool supportReply() const =0
This flag is used by YARP to determine whether the connection can carry RPC traffic,...
yarp::os::impl::PortCoreOutputUnit::getOutPutProtocol
OutputProtocol * getOutPutProtocol()
Definition: PortCoreOutputUnit.cpp:440
yarp::os::Name
Simple abstraction for a YARP port name.
Definition: Name.h:22
yarp::os::PortInfo::message
std::string message
A human-readable description of contents.
Definition: PortInfo.h:72
yarp::os::impl::PortCoreOutputUnit::takeTracker
void * takeTracker() override
Reacquire a tracker previously passed via send().
Definition: PortCoreOutputUnit.cpp:409
yarp::os::PortWriter::write
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
yarp::os::OutputProtocol::isOk
virtual bool isOk() const =0
Check if the connection is valid and can be used.
yarp::os::Connection::isConnectionless
virtual bool isConnectionless() const =0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
yarp::os::OutputProtocol::close
virtual void close()=0
Negotiate an end to operations.
yarp::os::Connection::getCarrierParams
virtual void getCarrierParams(yarp::os::Property &params) const =0
Get carrier configuration and deliver it by port administrative commands.
yCAssert
#define yCAssert(component, x)
Definition: LogComponent.h:172
yarp::os::impl::PortCoreOutputUnit::~PortCoreOutputUnit
~PortCoreOutputUnit() override
Destructor.
Definition: PortCoreOutputUnit.cpp:47
yarp::os::Connection::modifiesOutgoingData
virtual bool modifiesOutgoingData() const =0
Check if this carrier modifies outgoing data through the Carrier::modifyOutgoingData method.
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yarp::os::Connection::setCarrierParams
virtual void setCarrierParams(const yarp::os::Property &params)=0
Configure carrier from port administrative commands.
yarp::os::impl::PortCoreOutputUnit::setCarrierParams
void setCarrierParams(const yarp::os::Property &params) override
Set arbitrary parameters for this connection.
Definition: PortCoreOutputUnit.cpp:426
yCInfo
#define yCInfo(component,...)
Definition: LogComponent.h:135
yarp::os::InputStream::read
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:23
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::impl::PortCore::report
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
Definition: PortCore.cpp:1189
yarp::os::Route::getToName
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:106
yarp::os::impl::PortCoreUnit::getOwner
PortCore & getOwner()
Definition: PortCoreUnit.h:279
Time.h
yarp::os::Connection::isBroadcast
virtual bool isBroadcast() const =0
Check if this carrier uses a broadcast mechanism.
yarp::os::impl::PortCoreOutputUnit::isBusy
bool isBusy() override
Definition: PortCoreOutputUnit.cpp:421
yarp::os::Connection::modifyOutgoingData
virtual const PortWriter & modifyOutgoingData(const PortWriter &writer)=0
Modify outgoing payload data, if appropriate.
yarp::os::OutputProtocol::write
virtual bool write(SizedWriter &writer)=0
Write a message on the connection.
yarp::os::Route::getFromName
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
PortInfo.h
yarp::os::InputStream
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:29
YARP_OS_LOG_COMPONENT
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
yarp::os::impl::PortCoreUnit::setMode
void setMode()
Check the carrier used for the connection, and see if it has a "log" modifier.
Definition: PortCoreUnit.h:189
yarp::os::impl
The components from which ports and connections are built.
yarp::os::OutputProtocol::beginWrite
virtual void beginWrite()=0
Notify connection that we intend to write to it.
PortReport.h
yarp::os::Property
A class for storing options and configuration information.
Definition: Property.h:37
yarp::os::Name::isRooted
bool isRooted() const
Check if port name begins with "/".
Definition: Name.cpp:19
yarp::os::impl::PortCoreOutputUnit::PortCoreOutputUnit
PortCoreOutputUnit(PortCore &owner, int index, OutputProtocol *op)
Constructor.
Definition: PortCoreOutputUnit.cpp:28
yarp::os::PortInfo::sourceName
std::string sourceName
Name of connection source, if any.
Definition: PortInfo.h:63