YARP
Yet Another Robot Platform
PortReaderBufferBase.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/os/Os.h>
14 #include <yarp/os/Portable.h>
15 #include <yarp/os/Semaphore.h>
17 #include <yarp/os/Thread.h>
18 #include <yarp/os/Time.h>
22 
23 #include <list>
24 #include <mutex>
25 
26 using namespace yarp::os::impl;
27 using namespace yarp::os;
28 
29 namespace {
30 YARP_OS_LOG_COMPONENT(PORTREADERBUFFERBASE, "yarp.os.PortReaderBufferBase")
31 } // namespace
32 
33 #ifndef DOXYGEN_SHOULD_SKIP_THIS
34 class PortReaderPacket
35 {
36 public:
37  PortReaderPacket *prev_, *next_;
38 
39  // if non-null, contains a buffer that the packet owns
40  PortReader* reader;
41 
42  std::string envelope;
43 
44  // if nun-null, refers to an external buffer
45  // by convention, overrides reader
46  PortReader* external;
47  PortWriter* writer; // if a callback is needed
48 
49  PortReaderPacket()
50  {
51  prev_ = next_ = nullptr;
52  reader = nullptr;
53  external = nullptr;
54  writer = nullptr;
55  reset();
56  }
57 
58  virtual ~PortReaderPacket()
59  {
60  resetExternal();
61  reset();
62  }
63 
64  void reset()
65  {
66  if (reader != nullptr) {
67  delete reader;
68  reader = nullptr;
69  }
70  writer = nullptr;
71  envelope = "";
72  }
73 
74  PortReader* getReader()
75  {
76  return reader;
77  }
78 
79  void setReader(PortReader* reader)
80  {
81  resetExternal();
82  reset();
83  this->reader = reader;
84  }
85 
86  PortReader* getExternal()
87  {
88  return external;
89  }
90 
91  void setExternal(PortReader* reader, PortWriter* writer)
92  {
93  resetExternal();
94  this->external = reader;
95  this->writer = writer;
96  }
97 
98  void setEnvelope(const Bytes& bytes)
99  {
100  envelope = std::string(bytes.get(), bytes.length());
101  //envelope.set(bytes.get(), bytes.length(), 1);
102  }
103 
104  void resetExternal()
105  {
106  if (writer != nullptr) {
107  writer->onCompletion();
108  writer = nullptr;
109  }
110  external = nullptr;
111  }
112 };
113 
114 
115 class PortReaderPool
116 {
117 private:
118  std::list<PortReaderPacket*> inactive;
119  std::list<PortReaderPacket*> active;
120 
121 public:
122  size_t getCount()
123  {
124  return active.size();
125  }
126 
127  size_t getFree()
128  {
129  return inactive.size();
130  }
131 
132  PortReaderPacket* getInactivePacket()
133  {
134  if (inactive.empty()) {
135  PortReaderPacket* obj = nullptr;
136  obj = new PortReaderPacket();
137  inactive.push_back(obj);
138  }
139  PortReaderPacket* next = inactive.front();
140  yCAssert(PORTREADERBUFFERBASE, next != nullptr);
141  inactive.remove(next);
142  return next;
143  }
144 
145  PortReaderPacket* getActivePacket()
146  {
147  PortReaderPacket* next = nullptr;
148  if (getCount() >= 1) {
149  next = active.front();
150  yCAssert(PORTREADERBUFFERBASE, next != nullptr);
151  active.remove(next);
152  }
153  return next;
154  }
155 
156  void addActivePacket(PortReaderPacket* packet)
157  {
158  if (packet != nullptr) {
159  active.push_back(packet);
160  }
161  }
162 
163  void addInactivePacket(PortReaderPacket* packet)
164  {
165  if (packet != nullptr) {
166  inactive.push_back(packet);
167  }
168  }
169 
170  void reset()
171  {
172  while (!active.empty()) {
173  delete active.back();
174  active.pop_back();
175  }
176  while (!inactive.empty()) {
177  delete inactive.back();
178  inactive.pop_back();
179  }
180  }
181 };
182 
183 
184 class PortReaderBufferBase::Private
185 {
186 private:
187  PortReaderBufferBase& owner;
188  PortReaderPacket* prev;
189 
190 public:
192  unsigned int maxBuffer;
193  bool prune;
194  yarp::os::PortReader* replier;
195  double period;
196  double last_recv;
197 
198  PortReaderPool pool;
199 
200  int ct;
201  Port* port;
202  yarp::os::Semaphore contentSema;
203  yarp::os::Semaphore consumeSema;
204  std::mutex stateMutex;
205 
206  Private(PortReaderBufferBase& owner, unsigned int maxBuffer) :
207  owner(owner),
208  prev(nullptr),
209  creator(nullptr),
210  maxBuffer(maxBuffer),
211  prune(false),
212  replier(nullptr),
213  period(-1),
214  last_recv(-1),
215  ct(0),
216  port(nullptr),
217  contentSema(0),
218  consumeSema(0),
219  stateMutex()
220  {
221  }
222 
223  virtual ~Private()
224  {
225  Port* closePort = nullptr;
226  stateMutex.lock();
227  if (port != nullptr) {
228  closePort = port;
229  }
230  stateMutex.unlock();
231  if (closePort != nullptr) {
232  closePort->close();
233  }
234  stateMutex.lock();
235  clear();
236  stateMutex.unlock();
237  }
238 
239  void clear()
240  {
241  if (prev != nullptr) {
242  pool.addInactivePacket(prev);
243  prev = nullptr;
244  }
245  pool.reset();
246  ct = 0;
247  }
248 
249 
250  std::string getName()
251  {
252  if (port != nullptr) {
253  return port->getName();
254  }
255  return {};
256  }
257 
258  PortReaderPacket* get()
259  {
260  PortReaderPacket* result = nullptr;
261  bool grab = true;
262  if (pool.getFree() == 0) {
263  grab = false;
264  if (maxBuffer == 0 || pool.getCount() < maxBuffer) {
265  grab = true;
266  } else {
267  // ok, can't get free, clean space.
268  // here would be a good place to do buffer reuse.
269  }
270  }
271  if (grab) {
272  result = pool.getInactivePacket();
273  }
274 
275  return result;
276  }
277 
278  int checkContent()
279  {
280  return (int)pool.getCount();
281  }
282 
283  PortReaderPacket* getContent()
284  {
285  if (prev != nullptr) {
286  pool.addInactivePacket(prev);
287  prev = nullptr;
288  }
289  if (pool.getCount() >= 1) {
290  prev = pool.getActivePacket();
291  ct--;
292  }
293  return prev;
294  }
295 
296 
297  bool getEnvelope(PortReader& envelope)
298  {
299  if (prev == nullptr) {
300  return false;
301  }
302  StringInputStream sis;
303  sis.add(prev->envelope);
304  sis.add("\r\n");
306  Route route;
307  sbr.reset(sis, nullptr, route, 0, true);
308  return envelope.read(sbr);
309  }
310 
311  PortReaderPacket* dropContent()
312  {
313  // don't affect "prev"
314  PortReaderPacket* drop = nullptr;
315 
316  if (pool.getCount() >= 1) {
317  drop = pool.getActivePacket();
318  if (drop != nullptr) {
319  pool.addInactivePacket(drop);
320  }
321  ct--;
322  }
323  return drop;
324  }
325 
326  void attach(Port& port)
327  {
328  this->port = &port;
329  port.setReader(owner);
330  }
331 
332  void* acquire()
333  {
334  if (prev != nullptr) {
335  void* result = prev;
336  prev = nullptr;
337  return result;
338  }
339  return nullptr;
340  }
341 
342  void release(void* key)
343  {
344  if (key != nullptr) {
345  pool.addInactivePacket((PortReaderPacket*)key);
346  }
347  }
348 };
349 #endif // DOXYGEN_SHOULD_SKIP_THIS
350 
351 
352 
354  mPriv(new Private(*this, maxBuffer))
355 {
356 }
357 
359 {
360  delete mPriv;
361 }
362 
364 {
365  if (mPriv->creator != nullptr) {
366  return mPriv->creator->create();
367  }
368  return nullptr;
369 }
370 
372 {
373  mPriv->stateMutex.lock();
374  int count = mPriv->checkContent();
375  mPriv->stateMutex.unlock();
376  return count;
377 }
378 
379 
381 {
382  // give read a chance
383  mPriv->contentSema.post();
384 }
385 
386 PortReader* PortReaderBufferBase::readBase(bool& missed, bool cleanup)
387 {
388  missed = false;
389  if (mPriv->period < 0 || cleanup) {
390  mPriv->contentSema.wait();
391  } else {
392  bool ok = false;
393  double now = SystemClock::nowSystem();
394  double target = now + mPriv->period;
395  if (mPriv->last_recv > 0) {
396  target = mPriv->last_recv + mPriv->period;
397  }
398  double diff = target - now;
399  if (diff > 0) {
400  ok = mPriv->contentSema.waitWithTimeout(diff);
401  } else {
402  ok = mPriv->contentSema.check();
403  if (ok) {
404  mPriv->contentSema.wait();
405  }
406  }
407  if (!ok) {
408  missed = true;
409  if (mPriv->last_recv > 0) {
410  mPriv->last_recv += mPriv->period;
411  }
412  return nullptr;
413  }
415  if (mPriv->last_recv < 0) {
416  mPriv->last_recv = now;
417  } else {
418  diff = target - now;
419  if (diff > 0) {
421  }
422  mPriv->last_recv = target;
423  }
424  }
425  mPriv->stateMutex.lock();
426  PortReaderPacket* readerPacket = mPriv->getContent();
427  PortReader* reader = nullptr;
428  if (readerPacket != nullptr) {
429  PortReader* external = readerPacket->getExternal();
430  if (external == nullptr) {
431  reader = readerPacket->getReader();
432  } else {
433  reader = external;
434  }
435  }
436  mPriv->stateMutex.unlock();
437  if (reader != nullptr) {
438  mPriv->consumeSema.post();
439  }
440  return reader;
441 }
442 
443 
445 {
446  if (connection.getReference() != nullptr) {
447  //printf("REF %ld %d\n", (long int)connection.getReference(),
448  // connection.isValid());
449  return acceptObjectBase(connection.getReference(), nullptr);
450  }
451 
452  if (mPriv->replier != nullptr) {
453  if (connection.getWriter() != nullptr) {
454  return mPriv->replier->read(connection);
455  }
456  }
457  PortReaderPacket* reader = nullptr;
458  while (reader == nullptr) {
459  mPriv->stateMutex.lock();
460  reader = mPriv->get();
461  if ((reader != nullptr) && reader->getReader() == nullptr) {
462  PortReader* next = create();
463  yCAssert(PORTREADERBUFFERBASE, next != nullptr);
464  reader->setReader(next);
465  }
466 
467  mPriv->stateMutex.unlock();
468  if (reader == nullptr) {
469  mPriv->consumeSema.wait();
470  }
471  }
472  bool ok = false;
473  if (connection.isValid()) {
474  yCAssert(PORTREADERBUFFERBASE, reader->getReader() != nullptr);
475  ok = reader->getReader()->read(connection);
476  reader->setEnvelope(connection.readEnvelope());
477  } else {
478  // this is a disconnection
479  // don't talk to this port ever again
480  mPriv->port = nullptr;
481  }
482  if (ok) {
483  mPriv->stateMutex.lock();
484  bool pruned = false;
485  if (mPriv->ct > 0 && mPriv->prune) {
486  PortReaderPacket* readerPacket = mPriv->dropContent();
487  pruned = (readerPacket != nullptr);
488  }
489  //mPriv->configure(reader, false, true);
490  mPriv->pool.addActivePacket(reader);
491  mPriv->ct++;
492  mPriv->stateMutex.unlock();
493  if (!pruned) {
494  mPriv->contentSema.post();
495  }
496  yCTrace(PORTREADERBUFFERBASE, ">>>>>>>>>>>>>>>>> adding data");
497  } else {
498  mPriv->stateMutex.lock();
499  mPriv->pool.addInactivePacket(reader);
500  mPriv->stateMutex.unlock();
501  yCTrace(PORTREADERBUFFERBASE, ">>>>>>>>>>>>>>>>> skipping data");
502 
503  // important to give reader a shot anyway, allowing proper closing
504  yCDebug(PORTREADERBUFFERBASE, "giving PortReaderBuffer chance to close");
505  mPriv->contentSema.post();
506  }
507  return ok;
508 }
509 
510 
512 {
513  mPriv->creator = creator;
514 }
515 
517 {
518  mPriv->replier = &reader;
519 }
520 
522 {
523  mPriv->prune = flag;
524 }
525 
527 {
528  mPriv->period = period;
529 }
530 
531 std::string PortReaderBufferBase::getName() const
532 {
533  return mPriv->getName();
534 }
535 
537 {
538  return mPriv->maxBuffer;
539 }
540 
542 {
543  return mPriv->port == nullptr;
544 }
545 
547 {
548  mPriv->attach(port);
549 }
550 
551 
556 
558  yarp::os::PortWriter* wrapper)
559 {
560  // getting an object here should be basically the same as
561  // receiving from a Port -- except no need to create/read
562  // the object
563 
564  PortReaderPacket* reader = nullptr;
565  while (reader == nullptr) {
566  mPriv->stateMutex.lock();
567  reader = mPriv->get();
568  mPriv->stateMutex.unlock();
569  if (reader == nullptr) {
570  mPriv->consumeSema.wait();
571  }
572  }
573 
574  reader->setExternal(obj, wrapper);
575 
576  mPriv->stateMutex.lock();
577  bool pruned = false;
578  if (mPriv->ct > 0 && mPriv->prune) {
579  PortReaderPacket* readerPacket = mPriv->dropContent();
580  pruned = (readerPacket != nullptr);
581  }
582  //mPriv->configure(reader, false, true);
583  mPriv->pool.addActivePacket(reader);
584  mPriv->ct++;
585  mPriv->stateMutex.unlock();
586  if (!pruned) {
587  mPriv->contentSema.post();
588  }
589  yCTrace(PORTREADERBUFFERBASE, ">>>>>>>>>>>>>>>>> adding data");
590 
591  return true;
592 }
593 
594 
596  yarp::os::PortWriter* wrapper)
597 {
598  YARP_UNUSED(obj);
599  YARP_UNUSED(wrapper);
600  printf("Sorry, forgetting not implemented yet\n");
601  return false;
602 }
603 
604 
606 {
607  return mPriv->acquire();
608 }
609 
611 {
612  mPriv->stateMutex.lock();
613  mPriv->release(key);
614  mPriv->stateMutex.unlock();
615 }
616 
617 
619 {
620  return mPriv->getEnvelope(envelope);
621 }
622 
624 {
625  mPriv->clear();
626 }
627 
629 {
630  yCError(PORTREADERBUFFERBASE, "Missing or incorrectly typed onRead function");
631 }
yarp::os::Port::close
void close() override
Stop port activity.
Definition: Port.cpp:357
yarp::os::ConnectionReader::readEnvelope
virtual Bytes readEnvelope()
Read a message envelope, if available.
Definition: ConnectionReader.cpp:18
yarp::os::impl::StreamConnectionReader::reset
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
Definition: StreamConnectionReader.cpp:49
yarp::os::PortReaderBufferBase::getMaxBuffer
unsigned int getMaxBuffer()
Definition: PortReaderBufferBase.cpp:536
yarp::os::PortReader::read
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
yarp::os::Semaphore
A class for thread synchronization and mutual exclusion.
Definition: Semaphore.h:29
yarp::os::PortReaderBufferBase::create
virtual yarp::os::PortReader * create()
Definition: PortReaderBufferBase.cpp:363
typedReaderMissingCallback
void typedReaderMissingCallback()
Definition: PortReaderBufferBase.cpp:628
yarp::os::PortReaderBufferBase::PortReaderBufferBase
PortReaderBufferBase(unsigned int maxBuffer)
Definition: PortReaderBufferBase.cpp:353
Portable.h
yarp::os::PortReaderBufferBase::setPrune
void setPrune(bool flag=true)
Definition: PortReaderBufferBase.cpp:521
yarp::os::PortReaderBufferBase::setReplier
void setReplier(yarp::os::PortReader &reader)
Definition: PortReaderBufferBase.cpp:516
yarp::os::StringInputStream::add
void add(const std::string &txt)
Definition: StringInputStream.h:47
YARP_UNUSED
#define YARP_UNUSED(var)
Definition: api.h:159
StreamConnectionReader.h
yarp::os::ConnectionReader::isValid
virtual bool isValid() const =0
PortReaderBuffer.h
yarp::os::impl::StreamConnectionReader
Lets Readable objects read from the underlying InputStream associated with the connection between two...
Definition: StreamConnectionReader.h:42
yarp::os::PortWriter::onCompletion
virtual void onCompletion() const
This is called when the port has finished all writing operations.
Definition: PortWriter.cpp:16
yarp::os::Route
Information about a connection between two ports.
Definition: Route.h:32
LogComponent.h
yarp::os::PortWriter
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
yarp::os::PortReaderBufferBase::getName
std::string getName() const
Definition: PortReaderBufferBase.cpp:531
StringInputStream.h
yarp::os::Time::now
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:124
yarp::os::PortReaderBufferBase::interrupt
void interrupt()
Definition: PortReaderBufferBase.cpp:380
yarp::os::SystemClock::nowSystem
static double nowSystem()
Definition: SystemClock.cpp:37
yarp::os::PortReaderBufferBase::setTargetPeriod
void setTargetPeriod(double period)
Definition: PortReaderBufferBase.cpp:526
yarp::os::PortReaderBufferBase::getEnvelope
virtual bool getEnvelope(PortReader &envelope)
Definition: PortReaderBufferBase.cpp:618
yarp::os::Port
A mini-server for network communication.
Definition: Port.h:50
PortReaderBufferBase.h
yarp::os::PortReader
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
yarp::os::SystemClock::delaySystem
static void delaySystem(double seconds)
Definition: SystemClock.cpp:32
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
Os.h
yarp::os::PortReaderBufferBase
Definition: PortReaderBufferBase.h:26
Thread.h
yarp::os::StringInputStream
An InputStream that reads from a string.
Definition: StringInputStream.h:25
yarp::os::ConnectionReader::getWriter
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
yarp::os::PortReaderBufferBase::read
bool read(yarp::os::ConnectionReader &connection) override
Read this object from a network connection.
Definition: PortReaderBufferBase.cpp:444
yarp::os::Contactable::getName
virtual std::string getName() const
Get name of port.
Definition: Contactable.cpp:17
Semaphore.h
yarp::os::PortReaderBufferBase::acceptObjectBase
virtual bool acceptObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
Careful! merge with ::read – very similar code Until merge, don't change one without looking at other...
Definition: PortReaderBufferBase.cpp:557
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
yarp::os::PortReaderBufferBase::readBase
yarp::os::PortReader * readBase(bool &missed, bool cleanup)
Definition: PortReaderBufferBase.cpp:386
yarp::os::Port::setReader
void setReader(PortReader &reader) override
Set an external reader for port data.
Definition: Port.cpp:505
yarp::os::PortReaderBufferBase::setCreator
void setCreator(PortReaderBufferBaseCreator *creator)
Definition: PortReaderBufferBase.cpp:511
yarp::os::PortReaderBufferBase::release
void release(void *key)
Definition: PortReaderBufferBase.cpp:610
yarp::os::PortReaderBufferBase::acquire
void * acquire()
Definition: PortReaderBufferBase.cpp:605
yarp::os::PortReaderBufferBase::attachBase
void attachBase(yarp::os::Port &port)
Definition: PortReaderBufferBase.cpp:546
yCAssert
#define yCAssert(component, x)
Definition: LogComponent.h:172
yarp::os::ConnectionReader
An interface for reading from a network connection.
Definition: ConnectionReader.h:40
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
PortCorePacket.h
yarp::os::ConnectionReader::getReference
virtual Portable * getReference() const =0
Get a direct pointer to the object being sent, if possible.
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::PortReaderBufferBase::forgetObjectBase
virtual bool forgetObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
Definition: PortReaderBufferBase.cpp:595
yarp::os::PortReaderBufferBase::isClosed
bool isClosed()
Definition: PortReaderBufferBase.cpp:541
yarp::os::PortReaderBufferBase::check
int check()
Definition: PortReaderBufferBase.cpp:371
Time.h
yCTrace
#define yCTrace(component,...)
Definition: LogComponent.h:88
YARP_OS_LOG_COMPONENT
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
yarp::os::impl
The components from which ports and connections are built.
yarp::os::PortReaderBufferBase::~PortReaderBufferBase
virtual ~PortReaderBufferBase()
Definition: PortReaderBufferBase.cpp:358
yarp::os::PortReaderBufferBaseCreator
Definition: PortReaderBufferBaseCreator.h:21
yarp::os::PortReaderBufferBase::clear
void clear()
Definition: PortReaderBufferBase.cpp:623