YARP
Yet Another Robot Platform
PortWriterBufferBase.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/os/Port.h>
13 #include <yarp/os/Semaphore.h>
16 
17 using namespace yarp::os::impl;
18 using namespace yarp::os;
19 
20 namespace {
21 YARP_OS_LOG_COMPONENT(PORTWRITERBUFFERBASE, "yarp.os.PortWriterBufferBase")
22 } // namespace
23 
24 PortWriterBufferManager::~PortWriterBufferManager() = default;
25 
26 
27 class PortWriterBufferBase::Private : public PortWriterBufferManager
28 {
29 public:
31  owner(owner),
32  stateSema(1),
33  completionSema(0),
34  port(nullptr),
35  current(nullptr),
36  callback(nullptr),
37  finishing(false),
38  outCt(0)
39  {
40  }
41 
42  ~Private() override
43  {
44  release();
45  finishWrites();
46  stateSema.wait();
47  }
48 
49  int getCount()
50  {
51  stateSema.wait();
52  int ct = packets.getCount();
53  stateSema.post();
54  return ct;
55  }
56 
57  void finishWrites()
58  {
59  yCDebug(PORTWRITERBUFFERBASE, "finishing writes");
60  bool done = false;
61  while (!done) {
62  stateSema.wait();
63  if (port != nullptr) {
64  if (!port->isOpen()) {
65  outCt = 0;
66  }
67  }
68  done = (outCt == 0);
69  if (!done) {
70  finishing = true;
71  }
72  stateSema.post();
73  if (!done) {
74  completionSema.wait();
75  }
76  }
77  yCDebug(PORTWRITERBUFFERBASE, "finished writes");
78  }
79 
80  const void* get()
81  {
82  if (callback != nullptr) {
83  // (Safe to check outside mutex)
84  // oops, there is already a prepared and unwritten
85  // object. best remove it.
86  yCDebug(PORTWRITERBUFFERBASE, "releasing unused buffer");
87  release();
88  }
89  stateSema.wait();
90  PortCorePacket* packet = packets.getFreePacket();
91  yCAssert(PORTWRITERBUFFERBASE, packet != nullptr);
92  if (packet->getContent() == nullptr) {
93  yCDebug(PORTWRITERBUFFERBASE, "creating a writer buffer");
94  //packet->setContent(owner.create(*this, packet), true);
95  yarp::os::PortWriterWrapper* wrapper = owner.create(*this, packet);
96  //packet->setContent(wrapper, true);
97  packet->setContent(wrapper->getInternal(), false, wrapper, true);
98  }
99  stateSema.post();
100 
101  current = packet->getContent();
102  callback = packet->getCallback();
103  return callback;
104  }
105 
106  bool release()
107  {
108  stateSema.wait();
109  const PortWriter* cback = callback;
110  current = nullptr;
111  callback = nullptr;
112  stateSema.post();
113  if (cback != nullptr) {
114  stateSema.wait();
115  outCt++;
116  stateSema.post();
117  cback->onCompletion();
118  }
119  return cback != nullptr;
120  }
121 
122  void onCompletion(void* tracker) override
123  {
124  stateSema.wait();
125  yCDebug(PORTWRITERBUFFERBASE, "freeing up a writer buffer");
126  packets.freePacket((PortCorePacket*)tracker, false);
127  outCt--;
128  bool sig = finishing;
129  finishing = false;
130  stateSema.post();
131  if (sig) {
132  completionSema.post();
133  }
134  }
135 
136 
137  void attach(Port& port)
138  {
139  stateSema.wait();
140  this->port = &port;
141  port.enableBackgroundWrite(true);
142  stateSema.post();
143  }
144 
145  void detach()
146  {
147  // nothing to do
148  }
149 
150  void write(bool strict)
151  {
152  if (strict) {
153  finishWrites();
154  }
155  stateSema.wait();
156  const PortWriter* active = current;
157  const PortWriter* cback = callback;
158  current = nullptr;
159  callback = nullptr;
160  stateSema.post();
161  if (active != nullptr && port != nullptr) {
162  stateSema.wait();
163  outCt++;
164  stateSema.post();
165  port->write(*active, cback);
166  }
167  }
168 
169 private:
170  PortWriterBufferBase& owner;
171  PortCorePackets packets;
172  yarp::os::Semaphore stateSema;
173  yarp::os::Semaphore completionSema;
174  Port* port;
175  const PortWriter* current;
176  const PortWriter* callback;
177  bool finishing;
178  int outCt;
179 };
180 
181 
182 
183 
185  mPriv(new Private(*this))
186 {
187 }
188 
190 {
191  delete mPriv;
192 }
193 
195 {
196  return mPriv->get();
197 }
198 
200 {
201  return mPriv->release();
202 }
203 
204 
206 {
207  return mPriv->getCount();
208 }
209 
211 {
212  mPriv->attach(port);
213 }
214 
216 {
217  mPriv->detach();
218 }
219 
221 {
222  mPriv->write(strict);
223 }
224 
226 {
227  mPriv->finishWrites();
228 }
yarp::os::PortWriterBufferBase::Private::getCount
int getCount()
Definition: PortWriterBufferBase.cpp:49
yarp::os::PortWriterBufferBase::Private::~Private
~Private() override
Definition: PortWriterBufferBase.cpp:42
PortCorePackets.h
yarp::os::PortWriterBufferBase::getContent
const void * getContent() const
Definition: PortWriterBufferBase.cpp:194
yarp::os::Semaphore
A class for thread synchronization and mutual exclusion.
Definition: Semaphore.h:29
yarp::os::PortWriterBufferBase::Private
Definition: PortWriterBufferBase.cpp:28
yarp::os::PortWriterBufferBase::Private::get
const void * get()
Definition: PortWriterBufferBase.cpp:80
yarp::os::impl::PortCorePacket::getContent
const yarp::os::PortWriter * getContent()
Definition: PortCorePacket.h:88
Port.h
yarp::os::PortWriterBufferBase::~PortWriterBufferBase
virtual ~PortWriterBufferBase()
Definition: PortWriterBufferBase.cpp:189
yarp::os::impl::PortCorePacket::getCallback
const yarp::os::PortWriter * getCallback()
Definition: PortCorePacket.h:96
yarp::os::PortWriterBufferBase::write
void write(bool strict)
Definition: PortWriterBufferBase.cpp:220
yarp::os::PortWriterBufferBase::Private::attach
void attach(Port &port)
Definition: PortWriterBufferBase.cpp:137
yarp::os::PortWriterBufferBase::Private::finishWrites
void finishWrites()
Definition: PortWriterBufferBase.cpp:57
yarp::os::PortWriterBufferBase::getCount
int getCount()
Definition: PortWriterBufferBase.cpp:205
PortWriterBuffer.h
yarp::os::PortWriter::onCompletion
virtual void onCompletion() const
This is called when the port has finished all writing operations.
Definition: PortWriter.cpp:16
LogComponent.h
yarp::os::PortWriter
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
yarp::os::PortWriterBufferBase::detach
void detach()
Definition: PortWriterBufferBase.cpp:215
yarp::os::PortWriterBufferBase::attach
void attach(Port &port)
Definition: PortWriterBufferBase.cpp:210
yarp::os::Port
A mini-server for network communication.
Definition: Port.h:50
yarp::os::impl::PortCorePacket
A single message, potentially being transmitted on multiple connections.
Definition: PortCorePacket.h:25
yarp::os::Port::enableBackgroundWrite
void enableBackgroundWrite(bool backgroundFlag)
control whether writing from this port is done in the background.
Definition: Port.cpp:527
yarp::os::PortWriterBufferBase::Private::release
bool release()
Definition: PortWriterBufferBase.cpp:106
yarp::os::PortWriterBufferBase::Private::Private
Private(PortWriterBufferBase &owner)
Definition: PortWriterBufferBase.cpp:30
Semaphore.h
yarp::os::PortWriter::write
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
yCAssert
#define yCAssert(component, x)
Definition: LogComponent.h:172
yarp::os::PortWriterBufferBase::waitForWrite
void waitForWrite()
Definition: PortWriterBufferBase.cpp:225
yarp::os::PortWriterBufferBase::Private::onCompletion
void onCompletion(void *tracker) override
Definition: PortWriterBufferBase.cpp:122
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::PortWriterBufferBase::releaseContent
bool releaseContent()
Definition: PortWriterBufferBase.cpp:199
yarp::os::PortWriterBufferBase::Private::detach
void detach()
Definition: PortWriterBufferBase.cpp:145
yarp::os::PortWriterBufferBase::PortWriterBufferBase
PortWriterBufferBase()
Definition: PortWriterBufferBase.cpp:184
yarp::os::PortWriterBufferBase
Definition: PortWriterBufferBase.h:40
yarp::os::impl::PortCorePacket::setContent
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
Definition: PortCorePacket.h:109
yarp::os::impl::PortCorePackets
A collection of messages being transmitted over connections.
Definition: PortCorePackets.h:29
YARP_OS_LOG_COMPONENT
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
yarp::os::impl
The components from which ports and connections are built.
yarp::os::PortWriterBufferBase::Private::write
void write(bool strict)
Definition: PortWriterBufferBase.cpp:150