YARP
Yet Another Robot Platform
MessageStack.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * All rights reserved.
4  *
5  * This software may be modified and distributed under the terms of the
6  * BSD-3-Clause license. See the accompanying LICENSE file for details.
7  */
8 
9 #include <yarp/os/MessageStack.h>
10 
11 #include <yarp/conf/numeric.h>
12 
13 #include <yarp/os/Bottle.h>
14 #include <yarp/os/DummyConnector.h>
15 #include <yarp/os/Log.h>
16 #include <yarp/os/Semaphore.h>
17 #include <yarp/os/Thread.h>
18 
19 #include <condition_variable>
20 #include <deque>
21 #include <list>
22 #include <mutex>
23 
24 using namespace yarp::os;
25 
26 namespace {
27 
28 class MessageStackHelper;
29 
30 class MessageStackThread : public Thread
31 {
32 public:
33  MessageStackHelper& helper;
34 
35  explicit MessageStackThread(MessageStackHelper& helper) :
36  helper(helper)
37  {
38  }
39 
40  void run() override;
41 };
42 
43 class MessageStackHelper
44 {
45 private:
46  std::list<MessageStackThread*> threads;
47  std::deque<Bottle> msgs;
48  std::mutex mutex;
49  std::condition_variable cv;
50  size_t max_threads;
51  int available_threads;
52  PortReader& owner;
53  bool active;
54 
55 public:
56  MessageStackHelper(size_t max_threads, PortReader& owner) :
57  owner(owner)
58  {
59  this->max_threads = max_threads;
60  available_threads = 0;
61  active = true;
62  }
63 
64  void clear()
65  {
66  active = false;
67  cv.notify_all();
68  for (auto& thread : threads) {
69  thread->stop();
70  delete thread;
71  thread = nullptr;
72  }
73  threads.clear();
74  msgs.clear();
75  active = true;
76  }
77 
78  void stack(PortWriter& msg, const std::string& tag)
79  {
80  std::unique_lock<std::mutex> lock(mutex);
81  msgs.emplace_back();
82  if (!tag.empty()) {
83  Bottle b;
84  b.read(msg);
85  Bottle& back = msgs.back();
86  back.clear();
87  back.addString(tag);
88  back.append(b);
89  } else {
90  msgs.back().read(msg);
91  }
92  if (available_threads == 0) {
93  if (threads.size() < max_threads || max_threads == 0) {
94  available_threads++;
95  threads.push_back(new MessageStackThread(*this));
96  threads.back()->start();
97  }
98  }
99  available_threads--;
100  cv.notify_one();
101  }
102 
103  bool process()
104  {
105  std::unique_lock<std::mutex> lock(mutex);
106  cv.wait(lock, [&]{return !msgs.empty() || !active;});
107  if (!active) {
108  return false;
109  }
110  Bottle b = msgs.front();
111  msgs.pop_front();
112  lock.unlock();
113  DummyConnector con;
114  b.write(con.getWriter());
115  owner.read(con.getReader());
116  lock.lock();
117  available_threads++;
118  lock.unlock();
119  return active;
120  }
121 
122  bool isOwner(PortReader& owner)
123  {
124  return &(this->owner) == &owner;
125  }
126 };
127 
128 
129 void MessageStackThread::run()
130 {
131  while (helper.process()) {
132  // forever
133  }
134 }
135 
136 
137 } // namespace
138 
139 
140 
142 {
143 public:
144  size_t max_threads{0};
145  MessageStackHelper* helper = nullptr;
146 
147  explicit Private(size_t max_threads) :
149  {
150  }
151 
153  {
154  if (helper == nullptr) {
155  return;
156  }
157  helper->clear();
158  delete helper;
159  }
160 
161  void attach(PortReader& owner) {
162  if (helper != nullptr) {
163  if (helper->isOwner(owner)) {
164  return;
165  }
166  delete helper;
167  helper = nullptr;
168  }
169  helper = new MessageStackHelper(max_threads, owner);
170  }
171 
172  void stack(PortWriter& msg, const std::string& tag)
173  {
174  if (helper == nullptr) {
175  return;
176  }
177  helper->stack(msg, tag);
178  }
179 };
180 
181 
182 
183 MessageStack::MessageStack(size_t max_threads) :
184  mPriv(new Private(max_threads))
185 {
186 }
187 
189 {
190  delete mPriv;
191 }
192 
194 {
195  mPriv->attach(owner);
196 }
197 
198 void MessageStack::stack(PortWriter& msg, const std::string& tag)
199 {
200  mPriv->stack(msg, tag);
201 }
yarp::os::DummyConnector
A dummy connection to test yarp::os::Portable implementations.
Definition: DummyConnector.h:35
yarp::os::MessageStack::Private::attach
void attach(PortReader &owner)
Definition: MessageStack.cpp:161
yarp::os::Bottle
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
yarp::os::Bottle::clear
void clear()
Empties the bottle of any objects it contains.
Definition: Bottle.cpp:124
MessageStack.h
yarp::os::PortReader::read
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
yarp::os::MessageStack::~MessageStack
virtual ~MessageStack()
Destructor.
Definition: MessageStack.cpp:188
yarp::os::MessageStack::Private
Definition: MessageStack.cpp:142
numeric.h
yarp::os::MessageStack::MessageStack
MessageStack(size_t max_threads=0)
Constructor.
Definition: MessageStack.cpp:183
yarp::os::Thread
An abstraction for a thread of execution.
Definition: Thread.h:25
yarp::os::MessageStack::attach
void attach(PortReader &owner)
Definition: MessageStack.cpp:193
yarp::os::DummyConnector::getReader
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
Definition: DummyConnector.cpp:117
yarp::os::PortWriter
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
Log.h
yarp::os::PortReader
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
yarp::os::Bottle::write
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition: Bottle.cpp:233
yarp::os::MessageStack::Private::helper
MessageStackHelper * helper
Definition: MessageStack.cpp:145
DummyConnector.h
yarp::os::DummyConnector::getWriter
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
Definition: DummyConnector.cpp:112
Thread.h
Semaphore.h
yarp::os::Bottle::addString
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:173
yarp::os::MessageStack::Private::max_threads
size_t max_threads
Definition: MessageStack.cpp:144
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yarp::os::Bottle::read
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:243
yarp::os::MessageStack::Private::~Private
~Private()
Definition: MessageStack.cpp:152
yarp::os::MessageStack::Private::Private
Private(size_t max_threads)
Definition: MessageStack.cpp:147
yarp::os::MessageStack::Private::stack
void stack(PortWriter &msg, const std::string &tag)
Definition: MessageStack.cpp:172
Bottle.h
yarp::os::MessageStack::stack
void stack(PortWriter &msg, const std::string &tag="")
Add a message to the message stack, to be sent whenever the gods see fit.
Definition: MessageStack.cpp:198
yarp::os::Bottle::append
void append(const Bottle &alt)
Append the content of the given bottle to the current list.
Definition: Bottle.cpp:383