YARP
Yet Another Robot Platform
PortCore.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include <yarp/os/impl/PortCore.h>
11 
12 #include <yarp/conf/environment.h>
13 
14 #include <yarp/os/Bottle.h>
15 #include <yarp/os/DummyConnector.h>
16 #include <yarp/os/InputProtocol.h>
17 #include <yarp/os/Name.h>
18 #include <yarp/os/Network.h>
19 #include <yarp/os/PortInfo.h>
20 #include <yarp/os/RosNameSpace.h>
22 #include <yarp/os/SystemInfo.h>
23 #include <yarp/os/Time.h>
31 
32 #include <cstdio>
33 #include <functional>
34 #include <random>
35 #include <regex>
36 #include <vector>
37 
38 #ifdef YARP_HAS_ACE
39 # include <ace/INET_Addr.h>
40 # include <ace/Sched_Params.h>
41 // In one the ACE headers there is a definition of "main" for WIN32
42 # ifdef main
43 # undef main
44 # endif
45 #endif
46 
47 #if defined(__linux__) // used for configuring scheduling properties
48 # include <dirent.h>
49 # include <sys/types.h>
50 # include <unistd.h>
51 #endif
52 
53 
54 using namespace yarp::os::impl;
55 using namespace yarp::os;
56 using namespace yarp;
57 
58 namespace {
59 YARP_OS_LOG_COMPONENT(PORTCORE, "yarp.os.impl.PortCore")
60 } // namespace
61 
62 PortCore::PortCore() = default;
63 
65 {
66  close();
67  removeCallbackLock();
68 }
69 
70 
71 bool PortCore::listen(const Contact& address, bool shouldAnnounce)
72 {
73  yCDebug(PORTCORE, "Starting listening on %s", address.toURI().c_str());
74  // If we're using ACE, we really need to have it initialized before
75  // this point.
76  if (!NetworkBase::initialized()) {
77  yCError(PORTCORE, "YARP not initialized; create a yarp::os::Network object before using ports");
78  return false;
79  }
80 
81  yCTrace(PORTCORE, "listen");
82 
83  {
84  // Critical section
85  std::lock_guard<std::mutex> lock(m_stateMutex);
86 
87  // This method assumes we are not already on the network.
88  // We can assume this because it is not a user-facing class,
89  // and we carefully never call this method again without
90  // calling close().
91  yCAssert(PORTCORE, !m_listening);
92  yCAssert(PORTCORE, !m_running);
93  yCAssert(PORTCORE, !m_closing.load());
94  yCAssert(PORTCORE, !m_finished.load());
95  yCAssert(PORTCORE, m_face == nullptr);
96 
97  // Try to put the port on the network, using the user-supplied
98  // address (which may be incomplete). You can think of
99  // this as getting a server socket.
100  m_address = address;
101  setName(address.getRegName());
102  if (m_timeout > 0) {
103  m_address.setTimeout(m_timeout);
104  }
105  m_face = Carriers::listen(m_address);
106 
107  // We failed, abort.
108  if (m_face == nullptr) {
109  return false;
110  }
111 
112  // Update our address if it was incomplete.
113  if (m_address.getPort() <= 0) {
114  m_address = m_face->getLocalAddress();
115  if (m_address.getRegName() == "...") {
116  m_address.setName(std::string("/") + m_address.getHost() + "_" + NetType::toString(m_address.getPort()));
117  setName(m_address.getRegName());
118  }
119  }
120 
121  // Move into listening phase
122  m_listening.store(true);
123  }
124 
125  // Now that we are on the network, we can let the name server know this.
126  if (shouldAnnounce) {
127  if (!(NetworkBase::getLocalMode() && NetworkBase::getQueryBypass() == nullptr)) {
128  std::string portName = address.getRegName();
129  Bottle cmd;
130  Bottle reply;
131  cmd.addString("announce");
132  cmd.addString(portName);
133  ContactStyle style;
134  NetworkBase::writeToNameServer(cmd, reply, style);
135  }
136  }
137 
138  // Success!
139  return true;
140 }
141 
142 
144 {
145  // Don't even try to do this when the port is hot, it'll burn you
146  yCAssert(PORTCORE, !m_running.load());
147  yCAssert(PORTCORE, m_reader == nullptr);
148  m_reader = &reader;
149 }
150 
152 {
153  // Don't even try to do this when the port is hot, it'll burn you
154  yCAssert(PORTCORE, !m_running.load());
155  yCAssert(PORTCORE, m_adminReader == nullptr);
156  m_adminReader = &reader;
157 }
158 
160 {
161  // Don't even try to do this when the port is hot, it'll burn you
162  yCAssert(PORTCORE, !m_running.load());
163  yCAssert(PORTCORE, m_readableCreator == nullptr);
164  m_readableCreator = &creator;
165 }
166 
167 
169 {
170  yCTrace(PORTCORE, "run");
171 
172  // This is the server thread for the port. We listen on
173  // the network and handle any incoming connections.
174  // We don't touch those connections, just shove them
175  // in a list and move on. It is important that this
176  // thread doesn't make a connecting client wait just
177  // because some other client is slow.
178 
179  // We assume that listen() has succeeded and that
180  // start() has been called.
181  yCAssert(PORTCORE, m_listening.load());
182  yCAssert(PORTCORE, !m_running.load());
183  yCAssert(PORTCORE, !m_closing.load());
184  yCAssert(PORTCORE, !m_finished.load());
185  yCAssert(PORTCORE, m_starting.load());
186 
187  // Enter running phase
188  {
189  // Critical section
190  std::lock_guard<std::mutex> lock(m_stateMutex);
191  m_running.store(true);
192  m_starting.store(false);
193  }
194 
195  // Notify the start() thread that the run() thread is running
196  m_stateCv.notify_one();
197 
198  yCTrace(PORTCORE, "run running");
199 
200  // Enter main loop, where we block on incoming connections.
201  // The loop is exited when PortCore#closing is set. One last
202  // connection will be needed to de-block this thread and ensure
203  // that it checks PortCore#closing.
204  bool shouldStop = false;
205  while (!shouldStop) {
206 
207  // Block and wait for a connection
208  InputProtocol* ip = m_face->read();
209 
210  {
211  // Critical section
212  std::lock_guard<std::mutex> lock(m_stateMutex);
213 
214  // Attach the connection to this port and update its timeout setting
215  if (ip != nullptr) {
216  ip->attachPort(m_contactable);
217  yCDebug(PORTCORE, "received something");
218  if (m_timeout > 0) {
219  ip->setTimeout(m_timeout);
220  }
221  }
222 
223  // Check whether we should shut down
224  shouldStop |= m_closing.load();
225 
226  // Increment a global count of connection events
227  m_events++;
228  }
229 
230  // It we are not shutting down, spin off the connection.
231  // It starts life as an input connection (although it
232  // may later morph into an output).
233  if (!shouldStop) {
234  if (ip != nullptr) {
235  addInput(ip);
236  }
237  yCDebug(PORTCORE, "spun off a connection");
238  ip = nullptr;
239  }
240 
241  // If the connection wasn't spun off, just shut it down.
242  if (ip != nullptr) {
243  ip->close();
244  delete ip;
245  ip = nullptr;
246  }
247 
248  // Remove any defunct connections.
249  reapUnits();
250 
251  // Notify anyone listening for connection changes.
252  std::lock_guard<std::mutex> lock(m_stateMutex);
253  m_connectionListeners = 0;
254  m_connectionChangeCv.notify_all();
255  }
256 
257  yCTrace(PORTCORE, "run closing");
258 
259  // The server thread is shutting down.
260  std::lock_guard<std::mutex> lock(m_stateMutex);
261  m_connectionListeners = 0;
262  m_connectionChangeCv.notify_all();
263  m_finished.store(true);
264 }
265 
266 
268 {
269  closeMain();
270 
271  if (m_prop != nullptr) {
272  delete m_prop;
273  m_prop = nullptr;
274  }
275  m_modifier.releaseOutModifier();
276  m_modifier.releaseInModifier();
277 }
278 
279 
281 {
282  yCTrace(PORTCORE, "start");
283 
284  // This wait will, on success, be matched by a post in run()
285  std::unique_lock<std::mutex> lock(m_stateMutex);
286 
287  // We assume that listen() has been called.
288  yCAssert(PORTCORE, m_listening.load());
289  yCAssert(PORTCORE, !m_running.load());
290  yCAssert(PORTCORE, !m_starting.load());
291  yCAssert(PORTCORE, !m_finished.load());
292  yCAssert(PORTCORE, !m_closing.load());
293 
294  m_starting.store(true);
295 
296  // Start the server thread.
297  bool started = ThreadImpl::start();
298  if (!started) {
299  // run() won't be happening
300  yAssert(false);
301 
302  } else {
303  // Wait for the signal from the run thread before returning.
304  m_stateCv.wait(lock, [&]{ return m_running.load(); });
305  yCAssert(PORTCORE, m_running.load());
306  }
307  return started;
308 }
309 
310 
311 bool PortCore::manualStart(const char* sourceName)
312 {
313  yCTrace(PORTCORE, "manualStart");
314 
315  // This variant of start() does not create a server thread.
316  // That is appropriate if we never expect to receive incoming
317  // connections for any reason. No incoming data, no requests
318  // for state information, no requests to change connections,
319  // nothing. We set the port's name to something fake, and
320  // act like nothing is wrong.
321  m_interruptable = false;
322  m_manual = true;
323  setName(sourceName);
324  return true;
325 }
326 
327 
329 {
330  // We are no longer interrupted.
331  m_interrupted = false;
332 }
333 
335 {
336  yCTrace(PORTCORE, "interrupt");
337 
338  // This is a no-op if there is no server thread.
339  if (!m_listening.load()) {
340  return;
341  }
342 
343  // Ignore any future incoming data
344  m_interrupted = true;
345 
346  // What about data that is already coming in?
347  // If interruptable is not currently set, no worries, the user
348  // did not or will not end up blocked on a read.
349  if (!m_interruptable) {
350  return;
351  }
352 
353  // Since interruptable is set, it is possible that the user
354  // may be blocked on a read. We send an empty message,
355  // which is reserved for giving blocking readers a chance to
356  // update their state.
357  {
358  // Critical section
359  std::lock_guard<std::mutex> lock(m_stateMutex);
360  if (m_reader != nullptr) {
361  yCDebug(PORTCORE, "sending update-state message to listener");
363  lockCallback();
364  m_reader->read(sbr);
365  unlockCallback();
366  }
367  }
368 }
369 
370 
371 void PortCore::closeMain()
372 {
373  yCTrace(PORTCORE, "closeMain");
374 
375  {
376  // Critical section
377  std::lock_guard<std::mutex> lock(m_stateMutex);
378 
379  // We may not have anything to do.
380  if (m_finishing || !(m_running.load() || m_manual)) {
381  yCTrace(PORTCORE, "closeMain - nothing to do");
382  return;
383  }
384 
385  yCTrace(PORTCORE, "closeMain - Central");
386 
387  // Move into official "finishing" phase.
388  m_finishing = true;
389  yCDebug(PORTCORE, "now preparing to shut down port");
390  }
391 
392  // Start disconnecting inputs. We ask the other side of the
393  // connection to do this, so it won't come as a surprise.
394  // The details of how disconnection works vary by carrier.
395  // While we are doing this, the server thread may be still running.
396  // This is because other ports may need to talk to the server
397  // to organize details of how a connection should be broken down.
398  bool done = false;
399  std::string prevName;
400  while (!done) {
401  done = true;
402  std::string removeName;
403  {
404  // Critical section
405  std::lock_guard<std::mutex> lock(m_stateMutex);
406  for (auto* unit : m_units) {
407  if ((unit != nullptr) && unit->isInput() && !unit->isDoomed()) {
408  Route r = unit->getRoute();
409  std::string s = r.getFromName();
410  if (s.length() >= 1 && s[0] == '/' && s != getName() && s != prevName) {
411  removeName = s;
412  done = false;
413  break;
414  }
415  }
416  }
417  }
418  if (!done) {
419  yCDebug(PORTCORE, "requesting removal of connection from %s", removeName.c_str());
420  bool result = NetworkBase::disconnect(removeName,
421  getName(),
422  true);
423  if (!result) {
425  removeName,
426  true);
427  }
428  prevName = removeName;
429  }
430  }
431 
432  // Start disconnecting outputs. We don't negotiate with the
433  // other side, we just break them down.
434  done = false;
435  while (!done) {
436  done = true;
437  Route removeRoute;
438  {
439  // Critical section
440  std::lock_guard<std::mutex> lock(m_stateMutex);
441  for (auto* unit : m_units) {
442  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
443  removeRoute = unit->getRoute();
444  if (removeRoute.getFromName() == getName()) {
445  done = false;
446  break;
447  }
448  }
449  }
450  }
451  if (!done) {
452  removeUnit(removeRoute, true);
453  }
454  }
455 
456  bool stopRunning = m_running.load();
457 
458  // If the server thread is still running, we need to bring it down.
459  if (stopRunning) {
460  // Let the server thread know we no longer need its services.
461  m_closing.store(true);
462 
463  // Wake up the server thread the only way we can, by sending
464  // a message to it. Then join it, it is done.
465  if (!m_manual) {
466  OutputProtocol* op = m_face->write(m_address);
467  if (op != nullptr) {
468  op->close();
469  delete op;
470  }
471  join();
472  }
473 
474  // We should be finished now.
475  yCAssert(PORTCORE, m_finished.load());
476 
477  // Clean up our connection list. We couldn't do this earlier,
478  // because the server thread was running.
479  closeUnits();
480 
481  // Reset some state flags.
482  {
483  // Critical section
484  std::lock_guard<std::mutex> lock(m_stateMutex);
485  m_finished.store(false);
486  m_closing.store(false);
487  m_running.store(false);
488  yCInfo(PORTCORE, "Resetting. m_running = %s", m_running.load() ? "true" : "false");
489  }
490  }
491 
492  // There should be no other threads at this point and we
493  // can stop listening on the network.
494  if (m_listening.load()) {
495  yCAssert(PORTCORE, m_face != nullptr);
496  m_face->close();
497  delete m_face;
498  m_face = nullptr;
499  m_listening.store(false);
500  }
501 
502  // Check if the client is waiting for input. If so, wake them up
503  // with the bad news. An empty message signifies a request to
504  // check the port state.
505  if (m_reader != nullptr) {
506  yCDebug(PORTCORE, "sending end-of-port message to listener");
508  m_reader->read(sbr);
509  m_reader = nullptr;
510  }
511 
512  // We may need to unregister the port with the name server.
513  if (stopRunning) {
514  std::string name = getName();
515  if (name != std::string("")) {
516  if (m_controlRegistration) {
518  }
519  }
520  }
521 
522  // We are done with the finishing process.
523  m_finishing = false;
524 
525  // We are fresh as a daisy.
526  yCAssert(PORTCORE, !m_listening.load());
527  yCAssert(PORTCORE, !m_running.load());
528  yCAssert(PORTCORE, !m_starting.load());
529  yCAssert(PORTCORE, !m_closing.load());
530  yCAssert(PORTCORE, !m_finished.load());
531  yCAssert(PORTCORE, !m_finishing);
532  yCAssert(PORTCORE, m_face == nullptr);
533 }
534 
535 
537 {
538  // How many times has the server thread spun off a connection.
539  std::lock_guard<std::mutex> lock(m_stateMutex);
540  int ct = m_events;
541  return ct;
542 }
543 
544 
545 void PortCore::closeUnits()
546 {
547  // Empty the PortCore#units list. This is only possible when
548  // the server thread is finished.
549  yCAssert(PORTCORE, m_finished.load());
550 
551  // In the "finished" phase, nobody else touches the units,
552  // so we can go ahead and shut them down and delete them.
553  for (auto& i : m_units) {
554  PortCoreUnit* unit = i;
555  if (unit != nullptr) {
556  yCDebug(PORTCORE, "closing a unit");
557  unit->close();
558  yCDebug(PORTCORE, "joining a unit");
559  unit->join();
560  delete unit;
561  yCDebug(PORTCORE, "deleting a unit");
562  i = nullptr;
563  }
564  }
565 
566  // Get rid of all our nulls. Done!
567  m_units.clear();
568 }
569 
570 void PortCore::reapUnits()
571 {
572  // Connections that should be shut down get tagged as "doomed"
573  // but aren't otherwise touched until it is safe to do so.
574  if (!m_finished.load()) {
575  std::lock_guard<std::mutex> lock(m_stateMutex);
576  for (auto* unit : m_units) {
577  if ((unit != nullptr) && unit->isDoomed() && !unit->isFinished()) {
578  std::string s = unit->getRoute().toString();
579  yCDebug(PORTCORE, "Informing connection %s that it is doomed", s.c_str());
580  unit->close();
581  yCDebug(PORTCORE, "Closed connection %s", s.c_str());
582  unit->join();
583  yCDebug(PORTCORE, "Joined thread of connection %s", s.c_str());
584  }
585  }
586  }
587  cleanUnits();
588 }
589 
590 void PortCore::cleanUnits(bool blocking)
591 {
592  // We will remove any connections that have finished operating from
593  // the PortCore#units list.
594 
595  // Depending on urgency, either wait for a safe time to do this
596  // or skip if unsafe.
597  std::unique_lock<std::mutex> lock(m_stateMutex, std::defer_lock);
598  if (blocking) {
599  lock.lock();
600  } else {
601  bool have_lock = lock.try_lock();
602  if (!have_lock) {
603  return;
604  }
605  }
606  // here we have the lock
607 
608  // We will update our connection counts as a by-product.
609  int updatedInputCount = 0;
610  int updatedOutputCount = 0;
611  int updatedDataOutputCount = 0;
612  yCDebug(PORTCORE, "/ routine check of connections to this port begins");
613  if (!m_finished.load()) {
614 
615  // First, we delete and null out any defunct entries in the list.
616  for (auto& i : m_units) {
617  PortCoreUnit* unit = i;
618  if (unit != nullptr) {
619  yCDebug(PORTCORE, "| checking connection %s %s", unit->getRoute().toString().c_str(), unit->getMode().c_str());
620  if (unit->isFinished()) {
621  std::string con = unit->getRoute().toString();
622  yCDebug(PORTCORE, "| removing connection %s", con.c_str());
623  unit->close();
624  unit->join();
625  delete unit;
626  i = nullptr;
627  yCDebug(PORTCORE, "| removed connection %s", con.c_str());
628  } else {
629  // No work to do except updating connection counts.
630  if (!unit->isDoomed()) {
631  if (unit->isOutput()) {
632  updatedOutputCount++;
633  if (unit->getMode().empty()) {
634  updatedDataOutputCount++;
635  }
636  }
637  if (unit->isInput()) {
638  if (unit->getRoute().getFromName() != "admin") {
639  updatedInputCount++;
640  }
641  }
642  }
643  }
644  }
645  }
646 
647  // Now we do some awkward shuffling (list class may be from ACE
648  // or STL, if ACE it is quite limited). We move the nulls to
649  // the end of the list ...
650  size_t rem = 0;
651  for (size_t i2 = 0; i2 < m_units.size(); i2++) {
652  if (m_units[i2] != nullptr) {
653  if (rem < i2) {
654  m_units[rem] = m_units[i2];
655  m_units[i2] = nullptr;
656  }
657  rem++;
658  }
659  }
660 
661  // ... Now we get rid of the null entries
662  for (size_t i3 = 0; i3 < m_units.size() - rem; i3++) {
663  m_units.pop_back();
664  }
665  }
666 
667  // Finalize the connection counts.
668  m_dataOutputCount = updatedDataOutputCount;
669  lock.unlock();
670 
671  m_packetMutex.lock();
672  m_inputCount = updatedInputCount;
673  m_outputCount = updatedOutputCount;
674  m_packetMutex.unlock();
675  yCDebug(PORTCORE, "\\ routine check of connections to this port ends");
676 }
677 
678 
679 void PortCore::addInput(InputProtocol* ip)
680 {
681  yCTrace(PORTCORE, "addInput");
682 
683  // This method is only called by the server thread in its running phase.
684  // It wraps up a network connection as a unit and adds it to
685  // PortCore#units. The unit will have its own thread associated
686  // with it.
687 
688  yCAssert(PORTCORE, ip != nullptr);
689  std::lock_guard<std::mutex> lock(m_stateMutex);
690  PortCoreUnit* unit = new PortCoreInputUnit(*this,
691  getNextIndex(),
692  ip,
693  false);
694  yCAssert(PORTCORE, unit != nullptr);
695  unit->start();
696  m_units.push_back(unit);
697  yCTrace(PORTCORE, "there are now %zu units", m_units.size());
698 }
699 
700 
702 {
703  yCTrace(PORTCORE, "addOutput");
704 
705  // This method is called from threads associated with input
706  // connections.
707  // It wraps up a network connection as a unit and adds it to
708  // PortCore#units. The unit will start with its own thread
709  // associated with it, but that thread will be very short-lived
710  // if the port is not configured to do background writes.
711 
712  yCAssert(PORTCORE, op != nullptr);
713  if (!m_finished.load()) {
714  std::lock_guard<std::mutex> lock(m_stateMutex);
715  PortCoreUnit* unit = new PortCoreOutputUnit(*this, getNextIndex(), op);
716  yCAssert(PORTCORE, unit != nullptr);
717  unit->start();
718  m_units.push_back(unit);
719  }
720 }
721 
722 
723 bool PortCore::isUnit(const Route& route, int index)
724 {
725  // Check if a connection with a specified route (and optional ID) is present
726  bool needReap = false;
727  if (!m_finished.load()) {
728  for (auto* unit : m_units) {
729  if (unit != nullptr) {
730  Route alt = unit->getRoute();
731  std::string wild = "*";
732  bool ok = true;
733  if (index >= 0) {
734  ok = ok && (unit->getIndex() == index);
735  }
736  if (ok) {
737  if (route.getFromName() != wild) {
738  ok = ok && (route.getFromName() == alt.getFromName());
739  }
740  if (route.getToName() != wild) {
741  ok = ok && (route.getToName() == alt.getToName());
742  }
743  if (route.getCarrierName() != wild) {
744  ok = ok && (route.getCarrierName() == alt.getCarrierName());
745  }
746  }
747  if (ok) {
748  needReap = true;
749  break;
750  }
751  }
752  }
753  }
754  return needReap;
755 }
756 
757 
758 bool PortCore::removeUnit(const Route& route, bool synch, bool* except)
759 {
760  // This is a request to remove a connection. It could arise from any
761  // input thread.
762 
763  if (except != nullptr) {
764  yCDebug(PORTCORE, "asked to remove connection in the way of %s", route.toString().c_str());
765  *except = false;
766  } else {
767  yCDebug(PORTCORE, "asked to remove connection %s", route.toString().c_str());
768  }
769 
770  // Scan for units that match the given route, and collect their IDs.
771  // Mark them as "doomed".
772  std::vector<int> removals;
773 
774  bool needReap = false;
775  if (!m_finished.load()) {
776  std::lock_guard<std::mutex> lock(m_stateMutex);
777  for (auto* unit : m_units) {
778  if (unit != nullptr) {
779  Route alt = unit->getRoute();
780  std::string wild = "*";
781  bool ok = true;
782  if (route.getFromName() != wild) {
783  ok = ok && (route.getFromName() == alt.getFromName());
784  }
785  if (route.getToName() != wild) {
786  ok = ok && (route.getToName() == alt.getToName());
787  }
788  if (route.getCarrierName() != wild) {
789  if (except == nullptr) {
790  ok = ok && (route.getCarrierName() == alt.getCarrierName());
791  } else {
792  if (route.getCarrierName() == alt.getCarrierName()) {
793  *except = true;
794  ok = false;
795  }
796  }
797  }
798 
799  if (ok) {
800  yCDebug(PORTCORE, "removing connection %s", alt.toString().c_str());
801  removals.push_back(unit->getIndex());
802  unit->setDoomed();
803  needReap = true;
804  }
805  }
806  }
807  }
808 
809  // If we find one or more matches, we need to do some work.
810  // We've marked those matches as "doomed" so they'll get cleared
811  // up eventually, but we can speed this up by waking up the
812  // server thread.
813  if (needReap) {
814  yCDebug(PORTCORE, "one or more connections need prodding to die");
815 
816  if (m_manual) {
817  // No server thread, no problems.
818  reapUnits();
819  } else {
820  // Send a blank message to make up server thread.
821  OutputProtocol* op = m_face->write(m_address);
822  if (op != nullptr) {
823  op->close();
824  delete op;
825  }
826  yCDebug(PORTCORE, "sent message to prod connection death");
827 
828  if (synch) {
829  // Wait for connections to be cleaned up.
830  yCDebug(PORTCORE, "synchronizing with connection death");
831  {
832  // Critical section
833  std::unique_lock<std::mutex> lock(m_stateMutex);
834  while (std::any_of(removals.begin(), removals.end(), [&](int removal){ return isUnit(route, removal); })) {
835  m_connectionListeners++;
836  m_connectionChangeCv.wait(lock, [&]{ return m_connectionListeners == 0; });
837  }
838  }
839  }
840  }
841  }
842  return needReap;
843 }
844 
845 
846 bool PortCore::addOutput(const std::string& dest,
847  void* id,
848  OutputStream* os,
849  bool onlyIfNeeded)
850 {
851  YARP_UNUSED(id);
852  yCDebug(PORTCORE, "asked to add output to %s", dest.c_str());
853 
854  // Buffer to store text describing outcome (successful connection,
855  // or a failure).
856  BufferedConnectionWriter bw(true);
857 
858  // Look up the address we'll be connecting to.
859  Contact parts = Name(dest).toAddress();
860  Contact contact = NetworkBase::queryName(parts.getRegName());
861  Contact address = contact;
862 
863  // If we can't find it, say so and abort.
864  if (!address.isValid()) {
865  bw.appendLine(std::string("Do not know how to connect to ") + dest);
866  if (os != nullptr) {
867  bw.write(*os);
868  }
869  return false;
870  }
871 
872  // We clean all existing connections to the desired destination,
873  // optionally stopping if we find one with the right carrier.
874  if (onlyIfNeeded) {
875  // Remove any existing connections between source and destination
876  // with a different carrier. If we find a connection already
877  // present with the correct carrier, then we are done.
878  bool except = false;
879  removeUnit(Route(getName(), address.getRegName(), address.getCarrier()), true, &except);
880  if (except) {
881  // Connection already present.
882  yCDebug(PORTCORE, "output already present to %s", dest.c_str());
883  bw.appendLine(std::string("Desired connection already present from ") + getName() + " to " + dest);
884  if (os != nullptr) {
885  bw.write(*os);
886  }
887  return true;
888  }
889  } else {
890  // Remove any existing connections between source and destination.
891  removeUnit(Route(getName(), address.getRegName(), "*"), true);
892  }
893 
894  // Set up a named route for this connection.
895  std::string aname = address.getRegName();
896  if (aname.empty()) {
897  aname = address.toURI(false);
898  }
899  Route r(getName(),
900  aname,
901  ((!parts.getCarrier().empty()) ? parts.getCarrier() : address.getCarrier()));
902  r.setToContact(contact);
903 
904  // Check for any restrictions on the port. Perhaps it can only
905  // read, or write.
906  bool allowed = true;
907  std::string err;
908  std::string append;
909  unsigned int f = getFlags();
910  bool allow_output = (f & PORTCORE_IS_OUTPUT) != 0;
911  bool rpc = (f & PORTCORE_IS_RPC) != 0;
912  Name name(r.getCarrierName() + std::string("://test"));
913  std::string mode = name.getCarrierModifier("log");
914  bool is_log = (!mode.empty());
915  if (is_log) {
916  if (mode != "in") {
917  err = "Logger configured as log." + mode + ", but only log.in is supported";
918  allowed = false;
919  } else {
920  append = "; " + r.getFromName() + " will forward messages and replies (if any) to " + r.getToName();
921  }
922  }
923  if (!allow_output) {
924  if (!is_log) {
925  bool push = false;
927  if (c != nullptr) {
928  push = c->isPush();
929  }
930  if (push) {
931  err = "Outputs not allowed";
932  allowed = false;
933  }
934  }
935  } else if (rpc) {
936  if (m_dataOutputCount >= 1 && !is_log) {
937  err = "RPC output already connected";
938  allowed = false;
939  }
940  }
941 
942  // If we found a relevant restriction, abort.
943  if (!allowed) {
944  bw.appendLine(err);
945  if (os != nullptr) {
946  bw.write(*os);
947  }
948  return false;
949  }
950 
951  // Ok! We can go ahead and make a connection.
952  OutputProtocol* op = nullptr;
953  if (m_timeout > 0) {
954  address.setTimeout(m_timeout);
955  }
956  op = Carriers::connect(address);
957  if (op != nullptr) {
958  op->attachPort(m_contactable);
959  if (m_timeout > 0) {
960  op->setTimeout(m_timeout);
961  }
962 
963  bool ok = op->open(r);
964  if (!ok) {
965  yCDebug(PORTCORE, "open route error");
966  delete op;
967  op = nullptr;
968  }
969  }
970 
971  // No connection, abort.
972  if (op == nullptr) {
973  bw.appendLine(std::string("Cannot connect to ") + dest);
974  if (os != nullptr) {
975  bw.write(*os);
976  }
977  return false;
978  }
979 
980  // Ok, we have a connection, now add it to PortCore#units
981  if (op->getConnection().isPush()) {
982  // This is the normal case
983  addOutput(op);
984  } else {
985  // This is the case for connections that are initiated
986  // in the opposite direction to native YARP connections.
987  // Native YARP has push connections, initiated by the
988  // sender. HTTP and ROS have pull connections, initiated
989  // by the receiver.
990  // We invert the route, flip the protocol direction, and add.
991  r.swapNames();
992  op->rename(r);
993  InputProtocol* ip = &(op->getInput());
994  if (!m_finished.load()) {
995  std::lock_guard<std::mutex> lock(m_stateMutex);
996  PortCoreUnit* unit = new PortCoreInputUnit(*this,
997  getNextIndex(),
998  ip,
999  true);
1000  yCAssert(PORTCORE, unit != nullptr);
1001  unit->start();
1002  m_units.push_back(unit);
1003  }
1004  }
1005 
1006  // Communicated the good news.
1007  bw.appendLine(std::string("Added connection from ") + getName() + " to " + dest + append);
1008  if (os != nullptr) {
1009  bw.write(*os);
1010  }
1011  cleanUnits();
1012  return true;
1013 }
1014 
1015 
1016 void PortCore::removeOutput(const std::string& dest, void* id, OutputStream* os)
1017 {
1018  YARP_UNUSED(id);
1019  yCDebug(PORTCORE, "asked to remove output to %s", dest.c_str());
1020 
1021  // All the real work done by removeUnit().
1022  BufferedConnectionWriter bw(true);
1023  if (removeUnit(Route("*", dest, "*"), true)) {
1024  bw.appendLine(std::string("Removed connection from ") + getName() + " to " + dest);
1025  } else {
1026  bw.appendLine(std::string("Could not find an outgoing connection to ") + dest);
1027  }
1028  if (os != nullptr) {
1029  bw.write(*os);
1030  }
1031  cleanUnits();
1032 }
1033 
1034 void PortCore::removeInput(const std::string& src, void* id, OutputStream* os)
1035 {
1036  YARP_UNUSED(id);
1037  yCDebug(PORTCORE, "asked to remove input to %s", src.c_str());
1038 
1039  // All the real work done by removeUnit().
1040  BufferedConnectionWriter bw(true);
1041  if (removeUnit(Route(src, "*", "*"), true)) {
1042  bw.appendLine(std::string("Removing connection from ") + src + " to " + getName());
1043  } else {
1044  bw.appendLine(std::string("Could not find an incoming connection from ") + src);
1045  }
1046  if (os != nullptr) {
1047  bw.write(*os);
1048  }
1049  cleanUnits();
1050 }
1051 
1053 {
1054  YARP_UNUSED(id);
1055  cleanUnits();
1056 
1057  // Buffer to store a human-readable description of the port's
1058  // state.
1059  BufferedConnectionWriter bw(true);
1060 
1061  {
1062  // Critical section
1063  std::lock_guard<std::mutex> lock(m_stateMutex);
1064 
1065  // Report name and address.
1066  bw.appendLine(std::string("This is ") + m_address.getRegName() + " at " + m_address.toURI());
1067 
1068  // Report outgoing connections.
1069  int oct = 0;
1070  for (auto* unit : m_units) {
1071  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1072  Route route = unit->getRoute();
1073  std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1074  bw.appendLine(msg);
1075  oct++;
1076  }
1077  }
1078  if (oct < 1) {
1079  bw.appendLine("There are no outgoing connections");
1080  }
1081 
1082  // Report incoming connections.
1083  int ict = 0;
1084  for (auto* unit : m_units) {
1085  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1086  Route route = unit->getRoute();
1087  if (!route.getCarrierName().empty()) {
1088  std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1089  bw.appendLine(msg);
1090  ict++;
1091  }
1092  }
1093  }
1094  if (ict < 1) {
1095  bw.appendLine("There are no incoming connections");
1096  }
1097  }
1098 
1099  // Send description across network, or print it.
1100  if (os != nullptr) {
1101  bw.write(*os);
1102  } else {
1103  StringOutputStream sos;
1104  bw.write(sos);
1105  printf("%s\n", sos.toString().c_str());
1106  }
1107 }
1108 
1109 
1111 {
1112  cleanUnits();
1113 
1114  std::lock_guard<std::mutex> lock(m_stateMutex);
1115 
1116  // Report name and address of port.
1117  PortInfo baseInfo;
1119  std::string portName = m_address.getRegName();
1120  baseInfo.message = std::string("This is ") + portName + " at " + m_address.toURI();
1121  reporter.report(baseInfo);
1122 
1123  // Report outgoing connections.
1124  int oct = 0;
1125  for (auto* unit : m_units) {
1126  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1127  Route route = unit->getRoute();
1128  std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1129  PortInfo info;
1130  info.message = msg;
1132  info.incoming = false;
1133  info.portName = portName;
1134  info.sourceName = route.getFromName();
1135  info.targetName = route.getToName();
1136  info.carrierName = route.getCarrierName();
1137  reporter.report(info);
1138  oct++;
1139  }
1140  }
1141  if (oct < 1) {
1142  PortInfo info;
1144  info.message = "There are no outgoing connections";
1145  reporter.report(info);
1146  }
1147 
1148  // Report incoming connections.
1149  int ict = 0;
1150  for (auto* unit : m_units) {
1151  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1152  Route route = unit->getRoute();
1153  std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1154  PortInfo info;
1155  info.message = msg;
1157  info.incoming = true;
1158  info.portName = portName;
1159  info.sourceName = route.getFromName();
1160  info.targetName = route.getToName();
1161  info.carrierName = route.getCarrierName();
1162  reporter.report(info);
1163  ict++;
1164  }
1165  }
1166  if (ict < 1) {
1167  PortInfo info;
1169  info.message = "There are no incoming connections";
1170  reporter.report(info);
1171  }
1172 }
1173 
1174 
1176 {
1177  std::lock_guard<std::mutex> lock(m_stateMutex);
1178  if (reporter != nullptr) {
1179  m_eventReporter = reporter;
1180  }
1181 }
1182 
1184 {
1185  std::lock_guard<std::mutex> lock(m_stateMutex);
1186  m_eventReporter = nullptr;
1187 }
1188 
1189 void PortCore::report(const PortInfo& info)
1190 {
1191  // We are in the context of one of the input or output threads,
1192  // so our contact with the PortCore must be absolutely minimal.
1193  //
1194  // It is safe to pick up the address of the reporter if this is
1195  // kept constant over the lifetime of the input/output threads.
1196 
1197  if (m_eventReporter != nullptr) {
1198  m_eventReporter->report(info);
1199  }
1200 }
1201 
1202 
1204 {
1205  YARP_UNUSED(id);
1206  YARP_UNUSED(os);
1207  bool result = true;
1208 
1209  // We are in the context of one of the input threads,
1210  // so our contact with the PortCore must be absolutely minimal.
1211  //
1212  // It is safe to pick up the address of the reader since this is
1213  // constant over the lifetime of the input threads.
1214 
1215  if (m_reader != nullptr && !m_interrupted) {
1216  m_interruptable = false; // No mutexing; user of interrupt() has to be careful.
1217 
1218  bool haveOutputs = (m_outputCount != 0); // No mutexing, but failure modes are benign.
1219 
1220  if (m_logNeeded && haveOutputs) {
1221  // Normally, yarp doesn't pay attention to the content of
1222  // messages received by the client. Likewise, the content
1223  // of replies are not monitored. However it may sometimes
1224  // be useful to log this traffic.
1225 
1226  ConnectionRecorder recorder;
1227  recorder.init(&reader);
1228  lockCallback();
1229  result = m_reader->read(recorder);
1230  unlockCallback();
1231  recorder.fini();
1232  // send off a log of this transaction to whoever wants it
1233  sendHelper(recorder, PORTCORE_SEND_LOG);
1234  } else {
1235  // YARP is not needed as a middleman
1236  lockCallback();
1237  result = m_reader->read(reader);
1238  unlockCallback();
1239  }
1240 
1241  m_interruptable = true;
1242  } else {
1243  // Read and ignore message, there is no where to send it.
1244  yCDebug(PORTCORE, "data received, no reader for it");
1245  Bottle b;
1246  result = b.read(reader);
1247  }
1248  return result;
1249 }
1250 
1251 
1252 bool PortCore::send(const PortWriter& writer,
1253  PortReader* reader,
1254  const PortWriter* callback)
1255 {
1256  // check if there is any modifier
1257  // we need to protect this part while the modifier
1258  // plugin is loading or unloading!
1259  m_modifier.outputMutex.lock();
1260  if (m_modifier.outputModifier != nullptr) {
1261  if (!m_modifier.outputModifier->acceptOutgoingData(writer)) {
1262  m_modifier.outputMutex.unlock();
1263  return false;
1264  }
1265  m_modifier.outputModifier->modifyOutgoingData(writer);
1266  }
1267  m_modifier.outputMutex.unlock();
1268  if (!m_logNeeded) {
1269  return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1270  }
1271  // logging is desired, so we need to wrap up and log this send
1272  // (and any reply it gets) -- TODO not yet supported
1273  return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1274 }
1275 
1277  int mode,
1278  PortReader* reader,
1279  const PortWriter* callback)
1280 {
1281  if (m_interrupted || m_finishing) {
1282  return false;
1283  }
1284 
1285  bool all_ok = true;
1286  bool gotReply = false;
1287  int logCount = 0;
1288  std::string envelopeString = m_envelope;
1289 
1290  // Pass a message to all output units for sending on. We could
1291  // be doing more here to cache the serialization of the message
1292  // and reuse it across output connections. However, one key
1293  // optimization is present: external blocks written by
1294  // yarp::os::ConnectionWriter::appendExternalBlock are never
1295  // copied. So for example the core image array in a yarp::sig::Image
1296  // is untouched by the port communications code.
1297 
1298  yCTrace(PORTCORE, "------- send in real");
1299 
1300  // Give user the chance to know that this object is about to be
1301  // written.
1302  writer.onCommencement();
1303 
1304  // All user-facing parts of this port will be blocked on this
1305  // operation, so we'll want to be snappy. How long the
1306  // operation lasts will depend on these flags:
1307  // * waitAfterSend
1308  // * waitBeforeSend
1309  // set by setWaitAfterSend() and setWaitBeforeSend().
1310  std::lock_guard<std::mutex> lock(m_stateMutex);
1311 
1312  // If the port is shutting down, abort.
1313  if (m_finished.load()) {
1314  return false;
1315  }
1316 
1317  yCTrace(PORTCORE, "------- send in");
1318  // Prepare a "packet" for tracking a single message which
1319  // may travel by multiple outputs.
1320  m_packetMutex.lock();
1321  PortCorePacket* packet = m_packets.getFreePacket();
1322  yCAssert(PORTCORE, packet != nullptr);
1323  packet->setContent(&writer, false, callback);
1324  m_packetMutex.unlock();
1325 
1326  // Scan connections, placing message everywhere we can.
1327  for (auto* unit : m_units) {
1328  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1329  bool log = (!unit->getMode().empty());
1330  if (log) {
1331  // Some connections are for logging only.
1332  logCount++;
1333  }
1334  bool ok = (mode == PORTCORE_SEND_NORMAL) ? (!log) : (log);
1335  if (!ok) {
1336  continue;
1337  }
1338  bool waiter = m_waitAfterSend || (mode == PORTCORE_SEND_LOG);
1339  yCTrace(PORTCORE, "------- -- inc");
1340  m_packetMutex.lock();
1341  packet->inc(); // One more connection carrying message.
1342  m_packetMutex.unlock();
1343  yCTrace(PORTCORE, "------- -- pre-send");
1344  bool gotReplyOne = false;
1345  // Send the message off on this connection.
1346  void* out = unit->send(writer,
1347  reader,
1348  (callback != nullptr) ? callback : (&writer),
1349  reinterpret_cast<void*>(packet),
1350  envelopeString,
1351  waiter,
1352  m_waitBeforeSend,
1353  &gotReplyOne);
1354  gotReply = gotReply || gotReplyOne;
1355  yCTrace(PORTCORE, "------- -- send");
1356  if (out != nullptr) {
1357  // We got back a report of a message already sent.
1358  m_packetMutex.lock();
1359  (static_cast<PortCorePacket*>(out))->dec(); // Message on one fewer connections.
1360  m_packets.checkPacket(static_cast<PortCorePacket*>(out));
1361  m_packetMutex.unlock();
1362  }
1363  if (waiter) {
1364  if (unit->isFinished()) {
1365  all_ok = false;
1366  }
1367  }
1368  yCTrace(PORTCORE, "------- -- dec");
1369  }
1370  }
1371  yCTrace(PORTCORE, "------- pack check");
1372  m_packetMutex.lock();
1373 
1374  // We no longer concern ourselves with the message.
1375  // It may or may not be traveling on some connections.
1376  // But that is not our problem anymore.
1377  packet->dec();
1378 
1379  m_packets.checkPacket(packet);
1380  m_packetMutex.unlock();
1381  yCTrace(PORTCORE, "------- packed");
1382  yCTrace(PORTCORE, "------- send out");
1383  if (mode == PORTCORE_SEND_LOG) {
1384  if (logCount == 0) {
1385  m_logNeeded = false;
1386  }
1387  }
1388 
1389  yCTrace(PORTCORE, "------- send out real");
1390 
1391  if (m_waitAfterSend && reader != nullptr) {
1392  all_ok = all_ok && gotReply;
1393  }
1394 
1395  return all_ok;
1396 }
1397 
1398 
1400 {
1401  bool writing = false;
1402 
1403  // Check if any port is currently writing. TODO optimize
1404  // this query by counting down with notifyCompletion().
1405  if (!m_finished.load()) {
1406  std::lock_guard<std::mutex> lock(m_stateMutex);
1407  for (auto* unit : m_units) {
1408  if ((unit != nullptr) && !unit->isFinished() && unit->isBusy()) {
1409  writing = true;
1410  }
1411  }
1412  }
1413 
1414  return writing;
1415 }
1416 
1417 
1419 {
1420  cleanUnits(false);
1421  m_packetMutex.lock();
1422  int result = m_inputCount;
1423  m_packetMutex.unlock();
1424  return result;
1425 }
1426 
1428 {
1429  cleanUnits(false);
1430  m_packetMutex.lock();
1431  int result = m_outputCount;
1432  m_packetMutex.unlock();
1433  return result;
1434 }
1435 
1436 
1437 void PortCore::notifyCompletion(void* tracker)
1438 {
1439  yCTrace(PORTCORE, "starting notifyCompletion");
1440  m_packetMutex.lock();
1441  if (tracker != nullptr) {
1442  (static_cast<PortCorePacket*>(tracker))->dec();
1443  m_packets.checkPacket(static_cast<PortCorePacket*>(tracker));
1444  }
1445  m_packetMutex.unlock();
1446  yCTrace(PORTCORE, "stopping notifyCompletion");
1447 }
1448 
1449 
1451 {
1452  m_envelopeWriter.restart();
1453  bool ok = envelope.write(m_envelopeWriter);
1454  if (ok) {
1455  setEnvelope(m_envelopeWriter.toString());
1456  }
1457  return ok;
1458 }
1459 
1460 
1461 void PortCore::setEnvelope(const std::string& envelope)
1462 {
1463  m_envelope = envelope;
1464  for (size_t i = 0; i < m_envelope.length(); i++) {
1465  // It looks like envelopes are constrained to be printable ASCII?
1466  // I'm not sure why this would be. TODO check.
1467  if (m_envelope[i] < 32) {
1468  m_envelope = m_envelope.substr(0, i);
1469  break;
1470  }
1471  }
1472  yCDebug(PORTCORE, "set envelope to %s", m_envelope.c_str());
1473 }
1474 
1476 {
1477  return m_envelope;
1478 }
1479 
1481 {
1482  StringInputStream sis;
1483  sis.add(m_envelope);
1484  sis.add("\r\n");
1486  Route route;
1487  sbr.reset(sis, nullptr, route, 0, true);
1488  return envelope.read(sbr);
1489 }
1490 
1491 // Make an RPC connection to talk to a ROS API, send a message, get reply.
1492 // NOTE: ROS support can now be moved out of here, once all documentation
1493 // of older ways to interoperate with it are purged and people stop
1494 // doing it.
1495 static bool __pc_rpc(const Contact& c,
1496  const char* carrier,
1497  Bottle& writer,
1498  Bottle& reader,
1499  bool verbose)
1500 {
1501  ContactStyle style;
1502  style.quiet = !verbose;
1503  style.timeout = 4;
1504  style.carrier = carrier;
1505  bool ok = Network::write(c, writer, reader, style);
1506  return ok;
1507 }
1508 
1509 // ACE is sometimes confused by localhost aliases, in a ROS-incompatible
1510 // way. This method does a quick sanity check if we are using ROS.
1511 static bool __tcp_check(const Contact& c)
1512 {
1513 #ifdef YARP_HAS_ACE
1514  ACE_INET_Addr addr;
1515  int result = addr.set(c.getPort(), c.getHost().c_str());
1516  if (result != 0) {
1517  yCWarning(PORTCORE, "ACE choked on %s:%d\n", c.getHost().c_str(), c.getPort());
1518  }
1519  result = addr.set(c.getPort(), "127.0.0.1");
1520  if (result != 0) {
1521  yCWarning(PORTCORE, "ACE choked on 127.0.0.1:%d\n", c.getPort());
1522  }
1523  result = addr.set(c.getPort(), "127.0.1.1");
1524  if (result != 0) {
1525  yCWarning(PORTCORE, "ACE choked on 127.0.1.1:%d\n", c.getPort());
1526  }
1527 #endif
1528  return true;
1529 }
1530 
1531 namespace {
1532 enum class PortCoreCommand : yarp::conf::vocab32_t
1533 {
1534  Unknown = 0,
1535  Help = yarp::os::createVocab('h', 'e', 'l', 'p'),
1536  Ver = yarp::os::createVocab('v', 'e', 'r'),
1537  Pray = yarp::os::createVocab('p', 'r', 'a', 'y'),
1538  Add = yarp::os::createVocab('a', 'd', 'd'),
1539  Del = yarp::os::createVocab('d', 'e', 'l'),
1540  Atch = yarp::os::createVocab('a', 't', 'c', 'h'),
1541  Dtch = yarp::os::createVocab('d', 't', 'c', 'h'),
1542  List = yarp::os::createVocab('l', 'i', 's', 't'),
1543  Set = yarp::os::createVocab('s', 'e', 't'),
1544  Get = yarp::os::createVocab('g', 'e', 't'),
1545  Prop = yarp::os::createVocab('p', 'r', 'o', 'p'),
1546  RosPublisherUpdate = yarp::os::createVocab('r', 'p', 'u', 'p'),
1547  RosRequestTopic = yarp::os::createVocab('r', 't', 'o', 'p'),
1548  RosGetPid = yarp::os::createVocab('p', 'i', 'd'),
1549  RosGetBusInfo = yarp::os::createVocab('b', 'u', 's'),
1550 };
1551 
1552 enum class PortCoreConnectionDirection : yarp::conf::vocab32_t
1553 {
1554  Error = 0,
1555  Out = yarp::os::createVocab('o', 'u', 't'),
1556  In = yarp::os::createVocab('i', 'n'),
1557 };
1558 
1559 enum class PortCorePropertyAction : yarp::conf::vocab32_t
1560 {
1561  Error = 0,
1562  Get = yarp::os::createVocab('g', 'e', 't'),
1563  Set = yarp::os::createVocab('s', 'e', 't')
1564 };
1565 
1566 PortCoreCommand parseCommand(const yarp::os::Value& v)
1567 {
1568  if (v.isString()) {
1569  // We support ROS client API these days. Here we recode some long ROS
1570  // command names, just for convenience.
1571  std::string cmd = v.asString();
1572  if (cmd == "publisherUpdate") {
1573  return PortCoreCommand::RosPublisherUpdate;
1574  }
1575  if (cmd == "requestTopic") {
1576  return PortCoreCommand::RosRequestTopic;
1577  }
1578  if (cmd == "getPid") {
1579  return PortCoreCommand::RosGetPid;
1580  }
1581  if (cmd == "getBusInfo") {
1582  return PortCoreCommand::RosGetBusInfo;
1583  }
1584  }
1585 
1586  auto cmd = static_cast<PortCoreCommand>(v.asVocab());
1587  switch (cmd) {
1588  case PortCoreCommand::Help:
1589  case PortCoreCommand::Ver:
1590  case PortCoreCommand::Pray:
1591  case PortCoreCommand::Add:
1592  case PortCoreCommand::Del:
1593  case PortCoreCommand::Atch:
1594  case PortCoreCommand::Dtch:
1595  case PortCoreCommand::List:
1596  case PortCoreCommand::Set:
1597  case PortCoreCommand::Get:
1598  case PortCoreCommand::Prop:
1599  case PortCoreCommand::RosPublisherUpdate:
1600  case PortCoreCommand::RosRequestTopic:
1601  case PortCoreCommand::RosGetPid:
1602  case PortCoreCommand::RosGetBusInfo:
1603  return cmd;
1604  default:
1605  return PortCoreCommand::Unknown;
1606  }
1607 }
1608 
1609 PortCoreConnectionDirection parseConnectionDirection(yarp::conf::vocab32_t v, bool errorIsOut = false)
1610 {
1611  auto dir = static_cast<PortCoreConnectionDirection>(v);
1612  switch (dir) {
1613  case PortCoreConnectionDirection::In:
1614  case PortCoreConnectionDirection::Out:
1615  return dir;
1616  default:
1617  return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1618  }
1619 }
1620 
1621 PortCorePropertyAction parsePropertyAction(yarp::conf::vocab32_t v)
1622 {
1623  auto action = static_cast<PortCorePropertyAction>(v);
1624  switch (action) {
1625  case PortCorePropertyAction::Get:
1626  case PortCorePropertyAction::Set:
1627  return action;
1628  default:
1629  return PortCorePropertyAction::Error;
1630  }
1631 }
1632 
1633 void describeRoute(const Route& route, Bottle& result)
1634 {
1635  Bottle& bfrom = result.addList();
1636  bfrom.addString("from");
1637  bfrom.addString(route.getFromName());
1638 
1639  Bottle& bto = result.addList();
1640  bto.addString("to");
1641  bto.addString(route.getToName());
1642 
1643  Bottle& bcarrier = result.addList();
1644  bcarrier.addString("carrier");
1645  bcarrier.addString(route.getCarrierName());
1646 
1647  Carrier* carrier = Carriers::chooseCarrier(route.getCarrierName());
1648  if (carrier->isConnectionless()) {
1649  Bottle& bconnectionless = result.addList();
1650  bconnectionless.addString("connectionless");
1651  bconnectionless.addInt32(1);
1652  }
1653  if (!carrier->isPush()) {
1654  Bottle& breverse = result.addList();
1655  breverse.addString("push");
1656  breverse.addInt32(0);
1657  }
1658  delete carrier;
1659 }
1660 
1661 } // namespace
1662 
1664  void* id)
1665 {
1666  Bottle cmd;
1667  Bottle result;
1668 
1669  // We've received a message to the port that is marked as administrative.
1670  // That means that instead of passing it along as data to the user of the
1671  // port, the port itself is responsible for reading and responding to
1672  // it. So let's read the message and see what we're supposed to do.
1673  cmd.read(reader);
1674 
1675  yCDebug(PORTCORE, "Port %s received command %s", getName().c_str(), cmd.toString().c_str());
1676 
1677  auto handleAdminHelpCmd = []() {
1678  Bottle result;
1679  // We give a list of the most useful administrative commands.
1680  result.addVocab(yarp::os::createVocab('m', 'a', 'n', 'y'));
1681  result.addString("[help] # give this help");
1682  result.addString("[ver] # report protocol version information");
1683  result.addString("[add] $portname # add an output connection");
1684  result.addString("[add] $portname $car # add an output with a given protocol");
1685  result.addString("[del] $portname # remove an input or output connection");
1686  result.addString("[list] [in] # list input connections");
1687  result.addString("[list] [out] # list output connections");
1688  result.addString("[list] [in] $portname # give details for input");
1689  result.addString("[list] [out] $portname # give details for output");
1690  result.addString("[prop] [get] # get all user-defined port properties");
1691  result.addString("[prop] [get] $prop # get a user-defined port property (prop, val)");
1692  result.addString("[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1693  result.addString("[prop] [get] $portname # get Qos properties of a connection to/from a port");
1694  result.addString("[prop] [set] $portname # set Qos properties of a connection to/from a port");
1695  result.addString("[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1696  result.addString("[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1697  result.addString("[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1698  result.addString("[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1699  result.addString("[dtch] [out] # detach portmonitor plug-in from the port's output");
1700  result.addString("[dtch] [in] # detach portmonitor plug-in from the port's input");
1701  //result.addString("[atch] $portname $prop # attach a portmonitor plug-in to the connection to/from $portname");
1702  //result.addString("[dtch] $portname # detach any portmonitor plug-in from the connection to/from $portname");
1703  return result;
1704  };
1705 
1706  auto handleAdminVerCmd = []() {
1707  // Gives a version number for the administrative commands.
1708  // It is distinct from YARP library versioning.
1709  Bottle result;
1710  result.addVocab(Vocab::encode("ver"));
1711  result.addInt32(1);
1712  result.addInt32(2);
1713  result.addInt32(3);
1714  return result;
1715  };
1716 
1717  auto handleAdminPrayCmd = [this]() {
1718  // Strongly inspired by nethack #pray command:
1719  // https://nethackwiki.com/wiki/Prayer
1720  // http://www.steelypips.org/nethack/pray.html
1721 
1722  Bottle result;
1723 
1724  bool found = false;
1725  std::string name = yarp::conf::environment::getEnvironment("YARP_ROBOT_NAME", &found);
1726  if (!found) {
1727  name = getName();
1728  // Remove initial "/"
1729  while (name[0] == '/') {
1730  name = name.substr(1);
1731  }
1732  // Keep only the first part of the port name
1733  auto i = name.find('/');
1734  if (i != std::string::npos) {
1735  name = name.substr(0, i);
1736  }
1737  }
1738 
1739  std::random_device rd;
1740  std::mt19937 mt(rd());
1741  std::uniform_int_distribution<int> dist2(0,1);
1742  auto d2 = std::bind(dist2, mt);
1743 
1744  result.addString("You begin praying to " + name + ".");
1745  result.addString("You finish your prayer.");
1746 
1747  static const char* godvoices[] = {
1748  "booms out",
1749  "thunders",
1750  "rings out",
1751  "booms",
1752  };
1753  std::uniform_int_distribution<int> godvoices_dist(0, (sizeof(godvoices) / sizeof(godvoices[0])) - 1);
1754  auto godvoice = [&]() {
1755  return std::string(godvoices[godvoices_dist(mt)]);
1756  };
1757 
1758  static const char* creatures[] = {
1759  "mortal",
1760  "creature",
1761  "robot",
1762  };
1763  std::uniform_int_distribution<int> creatures_dist(0, (sizeof(creatures) / sizeof(creatures[0])) - 1);
1764  auto creature = [&]() {
1765  return std::string(creatures[creatures_dist(mt)]);
1766  };
1767 
1768  static const char* auras[] = {
1769  "amber",
1770  "light blue",
1771  "golden",
1772  "white",
1773  "orange",
1774  "black",
1775  };
1776  std::uniform_int_distribution<int> auras_dist(0, (sizeof(auras) / sizeof(auras[0])) - 1);
1777  auto aura = [&]() {
1778  return std::string(auras[auras_dist(mt)]);
1779  };
1780 
1781  static const char* items[] = {
1782  "keyboard",
1783  "mouse",
1784  "monitor",
1785  "headphones",
1786  "smartphone",
1787  "wallet",
1788  "eyeglasses",
1789  "shirt",
1790  };
1791  std::uniform_int_distribution<int> items_dist(0, (sizeof(items) / sizeof(items[0])) - 1);
1792  auto item = [&]() {
1793  return std::string(items[items_dist(mt)]);
1794  };
1795 
1796  static const char* blessings[] = {
1797  "You feel more limber.",
1798  "The slime disappears.",
1799  "Your amulet vanishes! You can breathe again.",
1800  "You can breathe again.",
1801  "You are back on solid ground.",
1802  "Your stomach feels content.",
1803  "You feel better.",
1804  "You feel much better.",
1805  "Your surroundings change.",
1806  "Your shape becomes uncertain.",
1807  "Your chain disappears.",
1808  "There's a tiger in your tank.",
1809  "You feel in good health again.",
1810  "Your eye feels better.",
1811  "Your eyes feel better.",
1812  "Looks like you are back in Kansas.",
1813  "Your <ITEM> softly glows <AURA>.",
1814  };
1815  std::uniform_int_distribution<int> blessings_dist(0, (sizeof(blessings) / sizeof(blessings[0])) - 1);
1816  auto blessing = [&](){
1817  auto blessing = std::string(blessings[blessings_dist(mt)]);
1818  blessing = std::regex_replace(blessing, std::regex("<ITEM>"), item());
1819  blessing = std::regex_replace(blessing, std::regex("<AURA>"), aura());
1820  return blessing;
1821  };
1822 
1823  std::uniform_int_distribution<int> dist13(0,12);
1824  switch(dist13(mt)) {
1825  case 0:
1826  case 1:
1827  result.addString("You feel that " + name + " is " + (d2() ? "bummed" : "displeased") + ".");
1828  break;
1829  case 2:
1830  case 3:
1831  result.addString("The voice of " + name + " " + godvoice() +
1832  ": \"Thou " + (d2() ? "hast strayed from the path" : "art arrogant") +
1833  ", " + creature() + ". Thou must relearn thy lessons!\"");
1834  break;
1835  case 4:
1836  case 5:
1837  result.addString("The voice of " + name + " " + godvoice() +
1838  ": \"Thou hast angered me.\"");
1839  result.addString("A black glow surrounds you.");
1840  break;
1841  case 6:
1842  result.addString("The voice of " + name + " " + godvoice() +
1843  ": \"Thou hast angered me.\"");
1844  break;
1845  case 7:
1846  case 8:
1847  result.addString("The voice of " + name + " " + godvoice() +
1848  ": \"Thou durst " + (d2() ? "scorn" : "call upon") +
1849  " me? Then die, " + creature() + "!\"");
1850  break;
1851  case 9:
1852  result.addString("You feel that " + name + " is " + (d2() ? "pleased as punch" : "well-pleased") + ".");
1853  result.addString(blessing());
1854  break;
1855  case 10:
1856  result.addString("You feel that " + name + " is " + (d2() ? "ticklish" : "pleased") + ".");
1857  result.addString(blessing());
1858  break;
1859  case 11:
1860  result.addString("You feel that " + name + " is " + (d2() ? "full" : "satisfied") + ".");
1861  result.addString(blessing());
1862  break;
1863  default:
1864  result.addString("The voice of " + name + " " + godvoice() +
1865  ": \"Thou hast angered me.\"");
1866  result.addString("Suddenly, a bolt of lightning strikes you!");
1867  result.addString("You fry to a crisp!");
1868  break;
1869  }
1870 
1871  return result;
1872  };
1873 
1874  auto handleAdminAddCmd = [this, id](std::string output,
1875  const std::string& carrier) {
1876  // Add an output to the port.
1877  Bottle result;
1878  StringOutputStream cache;
1879  if (!carrier.empty()) {
1880  output = carrier + ":/" + output;
1881  }
1882  addOutput(output, id, &cache, false);
1883  std::string r = cache.toString();
1884  int v = (r[0] == 'A') ? 0 : -1;
1885  result.addInt32(v);
1886  result.addString(r);
1887  return result;
1888  };
1889 
1890  auto handleAdminDelCmd = [this, id](const std::string& dest) {
1891  // Delete any inputs or outputs involving the named port.
1892  Bottle result;
1893  StringOutputStream cache;
1894  removeOutput(dest, id, &cache);
1895  std::string r1 = cache.toString();
1896  cache.reset();
1897  removeInput(dest, id, &cache);
1898  std::string r2 = cache.toString();
1899  int v = (r1[0] == 'R' || r2[0] == 'R') ? 0 : -1;
1900  result.addInt32(v);
1901  if (r1[0] == 'R' && r2[0] != 'R') {
1902  result.addString(r1);
1903  } else if (r1[0] != 'R' && r2[0] == 'R') {
1904  result.addString(r2);
1905  } else {
1906  result.addString(r1 + r2);
1907  }
1908  return result;
1909  };
1910 
1911  auto handleAdminAtchCmd = [this](PortCoreConnectionDirection direction,
1912  Property prop) {
1913  Bottle result;
1914  switch (direction) {
1915  case PortCoreConnectionDirection::Out: {
1916  std::string errMsg;
1917  if (!attachPortMonitor(prop, true, errMsg)) {
1918  result.addVocab(Vocab::encode("fail"));
1919  result.addString(errMsg);
1920  } else {
1921  result.addVocab(Vocab::encode("ok"));
1922  }
1923  } break;
1924  case PortCoreConnectionDirection::In: {
1925  std::string errMsg;
1926  if (!attachPortMonitor(prop, false, errMsg)) {
1927  result.addVocab(Vocab::encode("fail"));
1928  result.addString(errMsg);
1929  } else {
1930  result.addVocab(Vocab::encode("ok"));
1931  }
1932  } break;
1933  case PortCoreConnectionDirection::Error:
1934  result.addVocab(Vocab::encode("fail"));
1935  result.addString("attach command must be followed by [out] or [in]");
1936  }
1937  return result;
1938  };
1939 
1940  auto handleAdminDtchCmd = [this](PortCoreConnectionDirection direction) {
1941  Bottle result;
1942  switch (direction) {
1943  case PortCoreConnectionDirection::Out: {
1944  if (detachPortMonitor(true)) {
1945  result.addVocab(Vocab::encode("ok"));
1946  } else {
1947  result.addVocab(Vocab::encode("fail"));
1948  }
1949  } break;
1950  case PortCoreConnectionDirection::In: {
1951  if (detachPortMonitor(false)) {
1952  result.addVocab(Vocab::encode("ok"));
1953  } else {
1954  result.addVocab(Vocab::encode("fail"));
1955  }
1956  } break;
1957  case PortCoreConnectionDirection::Error:
1958  result.addVocab(Vocab::encode("fail"));
1959  result.addString("detach command must be followed by [out] or [in]");
1960  };
1961  return result;
1962  };
1963 
1964  auto handleAdminListCmd = [this](const PortCoreConnectionDirection direction,
1965  const std::string& target) {
1966  Bottle result;
1967  switch (direction) {
1968  case PortCoreConnectionDirection::In: {
1969  // Return a list of all input connections.
1970  std::lock_guard<std::mutex> lock(m_stateMutex);
1971  for (auto* unit : m_units) {
1972  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1973  Route route = unit->getRoute();
1974  if (target.empty()) {
1975  const std::string& name = route.getFromName();
1976  if (!name.empty()) {
1977  result.addString(name);
1978  }
1979  } else if (route.getFromName() == target) {
1980  describeRoute(route, result);
1981  }
1982  }
1983  }
1984  } break;
1985  case PortCoreConnectionDirection::Out: {
1986  // Return a list of all output connections.
1987  std::lock_guard<std::mutex> lock(m_stateMutex);
1988  for (auto* unit : m_units) {
1989  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1990  Route route = unit->getRoute();
1991  if (target.empty()) {
1992  result.addString(route.getToName());
1993  } else if (route.getToName() == target) {
1994  describeRoute(route, result);
1995  }
1996  }
1997  }
1998  } break;
1999  case PortCoreConnectionDirection::Error:
2000  // Should never happen
2001  yCAssert(PORTCORE, false);
2002  break;
2003  }
2004  return result;
2005  };
2006 
2007  auto handleAdminSetInCmd = [this](const std::string& target,
2008  const Property& property) {
2009  Bottle result;
2010  // Set carrier parameters on a given input connection.
2011  std::lock_guard<std::mutex> lock(m_stateMutex);
2012  if (target.empty()) {
2013  result.addInt32(-1);
2014  result.addString("target port is not specified.\r\n");
2015  } else {
2016  if (target == getName()) {
2017  std::string errMsg;
2018  if (!setParamPortMonitor(property, false, errMsg)) {
2019  result.addVocab(Vocab::encode("fail"));
2020  result.addString(errMsg);
2021  } else {
2022  result.addVocab(Vocab::encode("ok"));
2023  }
2024  } else {
2025  for (auto* unit : m_units) {
2026  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2027  Route route = unit->getRoute();
2028  if (route.getFromName() == target) {
2029  unit->setCarrierParams(property);
2030  result.addInt32(0);
2031  std::string msg = "Configured connection from ";
2032  msg += route.getFromName();
2033  msg += "\r\n";
2034  result.addString(msg);
2035  break;
2036  }
2037  }
2038  }
2039  }
2040  if (result.size() == 0) {
2041  result.addInt32(-1);
2042  std::string msg = "Could not find an incoming connection from ";
2043  msg += target;
2044  msg += "\r\n";
2045  result.addString(msg);
2046  }
2047  }
2048  return result;
2049  };
2050 
2051  auto handleAdminSetOutCmd = [this](const std::string& target,
2052  const Property& property) {
2053  Bottle result;
2054  // Set carrier parameters on a given output connection.
2055  std::lock_guard<std::mutex> lock(m_stateMutex);
2056  if (target.empty()) {
2057  result.addInt32(-1);
2058  result.addString("target port is not specified.\r\n");
2059  } else {
2060  if (target == getName()) {
2061  std::string errMsg;
2062  if (!setParamPortMonitor(property, true, errMsg)) {
2063  result.addVocab(Vocab::encode("fail"));
2064  result.addString(errMsg);
2065  } else {
2066  result.addVocab(Vocab::encode("ok"));
2067  }
2068  } else {
2069  for (auto* unit : m_units) {
2070  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2071  Route route = unit->getRoute();
2072  if (route.getToName() == target) {
2073  unit->setCarrierParams(property);
2074  result.addInt32(0);
2075  std::string msg = "Configured connection to ";
2076  msg += route.getToName();
2077  msg += "\r\n";
2078  result.addString(msg);
2079  break;
2080  }
2081  }
2082  }
2083  }
2084  if (result.size() == 0) {
2085  result.addInt32(-1);
2086  std::string msg = "Could not find an incoming connection to ";
2087  msg += target;
2088  msg += "\r\n";
2089  result.addString(msg);
2090  }
2091  }
2092  return result;
2093  };
2094 
2095  auto handleAdminGetInCmd = [this](const std::string& target) {
2096  Bottle result;
2097  // Get carrier parameters for a given input connection.
2098  std::lock_guard<std::mutex> lock(m_stateMutex);
2099  if (target.empty()) {
2100  result.addInt32(-1);
2101  result.addString("target port is not specified.\r\n");
2102  } else if (target == getName()) {
2103  yarp::os::Property property;
2104  std::string errMsg;
2105  if (!getParamPortMonitor(property, false, errMsg)) {
2106  result.addVocab(Vocab::encode("fail"));
2107  result.addString(errMsg);
2108  } else {
2109  result.addDict() = property;
2110  }
2111  } else {
2112  for (auto* unit : m_units) {
2113  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2114  Route route = unit->getRoute();
2115  if (route.getFromName() == target) {
2116  yarp::os::Property property;
2117  unit->getCarrierParams(property);
2118  result.addDict() = property;
2119  break;
2120  }
2121  }
2122  }
2123  if (result.size() == 0) {
2124  result.addInt32(-1);
2125  std::string msg = "Could not find an incoming connection from ";
2126  msg += target;
2127  msg += "\r\n";
2128  result.addString(msg);
2129  }
2130  }
2131  return result;
2132  };
2133 
2134  auto handleAdminGetOutCmd = [this](const std::string& target) {
2135  Bottle result;
2136  // Get carrier parameters for a given output connection.
2137  std::lock_guard<std::mutex> lock(m_stateMutex);
2138  if (target.empty()) {
2139  result.addInt32(-1);
2140  result.addString("target port is not specified.\r\n");
2141  } else if (target == getName()) {
2142  yarp::os::Property property;
2143  std::string errMsg;
2144  if (!getParamPortMonitor(property, true, errMsg)) {
2145  result.addVocab(Vocab::encode("fail"));
2146  result.addString(errMsg);
2147  } else {
2148  result.addDict() = property;
2149  }
2150  } else {
2151  for (auto* unit : m_units) {
2152  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2153  Route route = unit->getRoute();
2154  if (route.getToName() == target) {
2155  yarp::os::Property property;
2156  unit->getCarrierParams(property);
2157  result.addDict() = property;
2158  break;
2159  }
2160  }
2161  }
2162  if (result.size() == 0) {
2163  result.addInt32(-1);
2164  std::string msg = "Could not find an incoming connection to ";
2165  msg += target;
2166  msg += "\r\n";
2167  result.addString(msg);
2168  }
2169  }
2170  return result;
2171  };
2172 
2173  auto handleAdminPropGetCmd = [this](const std::string& key) {
2174  Bottle result;
2175  Property* p = acquireProperties(false);
2176  if (p != nullptr) {
2177  if (key.empty()) {
2178  result.fromString(p->toString());
2179  } else {
2180  // request: "prop get /portname"
2181  if (key[0] == '/') {
2182  bool bFound = false;
2183  // check for their own name
2184  if (key == getName()) {
2185  bFound = true;
2186  Bottle& sched = result.addList();
2187  sched.addString("sched");
2188  Property& sched_prop = sched.addDict();
2189  sched_prop.put("tid", static_cast<int>(this->getTid()));
2190  sched_prop.put("priority", this->getPriority());
2191  sched_prop.put("policy", this->getPolicy());
2192 
2194  Bottle& proc = result.addList();
2195  proc.addString("process");
2196  Property& proc_prop = proc.addDict();
2197  proc_prop.put("pid", info.pid);
2198  proc_prop.put("name", (info.pid != -1) ? info.name : "unknown");
2199  proc_prop.put("arguments", (info.pid != -1) ? info.arguments : "unknown");
2200  proc_prop.put("priority", info.schedPriority);
2201  proc_prop.put("policy", info.schedPolicy);
2202 
2204  Bottle& platform = result.addList();
2205  platform.addString("platform");
2206  Property& platform_prop = platform.addDict();
2207  platform_prop.put("os", pinfo.name);
2208  platform_prop.put("hostname", m_address.getHost());
2209 
2210  unsigned int f = getFlags();
2211  bool is_input = (f & PORTCORE_IS_INPUT) != 0;
2212  bool is_output = (f & PORTCORE_IS_OUTPUT) != 0;
2213  bool is_rpc = (f & PORTCORE_IS_RPC) != 0;
2214  Bottle& port = result.addList();
2215  port.addString("port");
2216  Property& port_prop = port.addDict();
2217  port_prop.put("is_input", is_input);
2218  port_prop.put("is_output", is_output);
2219  port_prop.put("is_rpc", is_rpc);
2220  port_prop.put("type", getType().getName());
2221  } else {
2222  for (auto* unit : m_units) {
2223  if ((unit != nullptr) && !unit->isFinished()) {
2224  Route route = unit->getRoute();
2225  std::string coreName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2226  if (key == coreName) {
2227  bFound = true;
2228  int priority = unit->getPriority();
2229  int policy = unit->getPolicy();
2230  int tos = getTypeOfService(unit);
2231  int tid = static_cast<int>(unit->getTid());
2232  Bottle& sched = result.addList();
2233  sched.addString("sched");
2234  Property& sched_prop = sched.addDict();
2235  sched_prop.put("tid", tid);
2236  sched_prop.put("priority", priority);
2237  sched_prop.put("policy", policy);
2238  Bottle& qos = result.addList();
2239  qos.addString("qos");
2240  Property& qos_prop = qos.addDict();
2241  qos_prop.put("tos", tos);
2242  }
2243  } // end isFinished()
2244  } // end for loop
2245  } // end portName == getname()
2246 
2247  if (!bFound) { // cannot find any port matches the requested one
2248  result.addVocab(Vocab::encode("fail"));
2249  std::string msg = "cannot find any connection to/from ";
2250  msg = msg + key;
2251  result.addString(msg);
2252  }
2253  // end of (portName[0] == '/')
2254  } else {
2255  result.add(p->find(key));
2256  }
2257  }
2258  }
2259  releaseProperties(p);
2260  return result;
2261  };
2262 
2263  auto handleAdminPropSetCmd = [this](const std::string& key,
2264  const Value& value,
2265  const Bottle& process,
2266  const Bottle& sched,
2267  const Bottle& qos) {
2268  Bottle result;
2269  Property* p = acquireProperties(false);
2270  bool bOk = true;
2271  if (p != nullptr) {
2272  p->put(key, value);
2273  // setting scheduling properties of all threads within the process
2274  // scope through the admin port
2275  // e.g. prop set /current_port (process ((priority 30) (policy 1)))
2276  if (!process.isNull()) {
2277  std::string portName = key;
2278  if ((!portName.empty()) && (portName[0] == '/')) {
2279  // check for their own name
2280  if (portName == getName()) {
2281  bOk = false;
2282  Bottle* process_prop = process.find("process").asList();
2283  if (process_prop != nullptr) {
2284  int prio = -1;
2285  int policy = -1;
2286  if (process_prop->check("priority")) {
2287  prio = process_prop->find("priority").asInt32();
2288  }
2289  if (process_prop->check("policy")) {
2290  policy = process_prop->find("policy").asInt32();
2291  }
2292  bOk = setProcessSchedulingParam(prio, policy);
2293  }
2294  }
2295  }
2296  }
2297  // check if we need to set the PortCoreUnit scheduling policy
2298  // e.g., "prop set /portname (sched ((priority 30) (policy 1)))"
2299  // The priority and policy values on Linux are:
2300  // SCHED_OTHER : policy=0, priority=[0 .. 0]
2301  // SCHED_FIFO : policy=1, priority=[1 .. 99]
2302  // SCHED_RR : policy=2, priority=[1 .. 99]
2303  if (!sched.isNull()) {
2304  if ((!key.empty()) && (key[0] == '/')) {
2305  bOk = false;
2306  for (auto* unit : m_units) {
2307  if ((unit != nullptr) && !unit->isFinished()) {
2308  Route route = unit->getRoute();
2309  std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2310 
2311  if (portName == key) {
2312  Bottle* sched_prop = sched.find("sched").asList();
2313  if (sched_prop != nullptr) {
2314  int prio = -1;
2315  int policy = -1;
2316  if (sched_prop->check("priority")) {
2317  prio = sched_prop->find("priority").asInt32();
2318  }
2319  if (sched_prop->check("policy")) {
2320  policy = sched_prop->find("policy").asInt32();
2321  }
2322  bOk = (unit->setPriority(prio, policy) != -1);
2323  } else {
2324  bOk = false;
2325  }
2326  break;
2327  }
2328  }
2329  }
2330  }
2331  }
2332  // check if we need to set the packet QOS policy
2333  // e.g., "prop set /portname (qos ((priority HIGH)))"
2334  // e.g., "prop set /portname (qos ((dscp AF12)))"
2335  // e.g., "prop set /portname (qos ((tos 12)))"
2336  if (!qos.isNull()) {
2337  if ((!key.empty()) && (key[0] == '/')) {
2338  bOk = false;
2339  for (auto* unit : m_units) {
2340  if ((unit != nullptr) && !unit->isFinished()) {
2341  Route route = unit->getRoute();
2342  std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2343  if (portName == key) {
2344  Bottle* qos_prop = qos.find("qos").asList();
2345  if (qos_prop != nullptr) {
2346  int tos = -1;
2347  if (qos_prop->check("priority")) {
2348  // set the packet TOS value on the socket based on some predefined
2349  // priority levels.
2350  // the expected levels are: LOW, NORM, HIGH, CRIT
2351  NetInt32 priority = qos_prop->find("priority").asVocab();
2352  int dscp;
2353  switch (priority) {
2354  case yarp::os::createVocab('L', 'O', 'W'):
2355  dscp = 10;
2356  break;
2357  case yarp::os::createVocab('N', 'O', 'R', 'M'):
2358  dscp = 0;
2359  break;
2360  case yarp::os::createVocab('H', 'I', 'G', 'H'):
2361  dscp = 36;
2362  break;
2363  case yarp::os::createVocab('C', 'R', 'I', 'T'):
2364  dscp = 44;
2365  break;
2366  default:
2367  dscp = -1;
2368  }
2369  if (dscp >= 0) {
2370  tos = (dscp << 2);
2371  }
2372  } else if (qos_prop->check("dscp")) {
2373  // Set the packet TOS value on the socket based on the DSCP level
2374  QosStyle::PacketPriorityDSCP dscp_class = QosStyle::getDSCPByVocab(qos_prop->find("dscp").asVocab());
2375  int dscp = -1;
2376  if (dscp_class == QosStyle::DSCP_Invalid) {
2377  auto dscp_val = qos_prop->find("dscp");
2378  if (dscp_val.isInt32()) {
2379  dscp = dscp_val.asInt32();
2380  }
2381  } else {
2382  dscp = static_cast<int>(dscp_class);
2383  }
2384  if ((dscp >= 0) && (dscp < 64)) {
2385  tos = (dscp << 2);
2386  }
2387  } else if (qos_prop->check("tos")) {
2388  // Set the TOS value directly
2389  auto tos_val = qos_prop->find("tos");
2390  if (tos_val.isInt32()) {
2391  tos = tos_val.asInt32();
2392  }
2393  }
2394  if (tos >= 0) {
2395  bOk = setTypeOfService(unit, tos);
2396  }
2397  } else {
2398  bOk = false;
2399  }
2400  break;
2401  }
2402  }
2403  }
2404  }
2405  }
2406  }
2407  releaseProperties(p);
2408  result.addVocab((bOk) ? Vocab::encode("ok") : Vocab::encode("fail"));
2409  return result;
2410  };
2411 
2412  // NOTE: YARP partially supports the ROS Slave API https://wiki.ros.org/ROS/Slave_API
2413 
2414  auto handleAdminRosPublisherUpdateCmd = [this](const std::string& topic, Bottle* pubs) {
2415  // When running against a ROS name server, we need to
2416  // support ROS-style callbacks for connecting publishers
2417  // with subscribers. Note: this should not be necessary
2418  // anymore, now that a dedicated yarp::os::Node class
2419  // has been implemented, but is still needed for older
2420  // ways of interfacing with ROS without using dedicated
2421  // node ports.
2422  Bottle result;
2423  if (pubs != nullptr) {
2424  Property listed;
2425  for (size_t i = 0; i < pubs->size(); i++) {
2426  std::string pub = pubs->get(i).asString();
2427  listed.put(pub, 1);
2428  }
2429  Property present;
2430  {
2431  // Critical section
2432  std::lock_guard<std::mutex> lock(m_stateMutex);
2433  for (auto* unit : m_units) {
2434  if ((unit != nullptr) && unit->isPupped()) {
2435  std::string me = unit->getPupString();
2436  present.put(me, 1);
2437  if (!listed.check(me)) {
2438  unit->setDoomed();
2439  }
2440  }
2441  }
2442  }
2443  for (size_t i = 0; i < pubs->size(); i++) {
2444  std::string pub = pubs->get(i).asString();
2445  if (!present.check(pub)) {
2446  yCDebug(PORTCORE, "ROS ADD %s", pub.c_str());
2447  Bottle req;
2448  Bottle reply;
2449  req.addString("requestTopic");
2450  NestedContact nc(getName());
2451  req.addString(nc.getNodeName());
2452  req.addString(topic);
2453  Bottle& lst = req.addList();
2454  Bottle& sublst = lst.addList();
2455  sublst.addString("TCPROS");
2456  yCDebug(PORTCORE, "Sending [%s] to %s", req.toString().c_str(), pub.c_str());
2457  Contact c = Contact::fromString(pub);
2458  if (!__pc_rpc(c, "xmlrpc", req, reply, false)) {
2459  fprintf(stderr, "Cannot connect to ROS subscriber %s\n", pub.c_str());
2460  // show diagnosics
2461  __pc_rpc(c, "xmlrpc", req, reply, true);
2462  __tcp_check(c);
2463  } else {
2464  Bottle* pref = reply.get(2).asList();
2465  std::string hostname;
2466  std::string carrier;
2467  int portnum = 0;
2468  if (reply.get(0).asInt32() != 1) {
2469  fprintf(stderr, "Failure looking up topic %s: %s\n", topic.c_str(), reply.toString().c_str());
2470  } else if (pref == nullptr) {
2471  fprintf(stderr, "Failure looking up topic %s: expected list of protocols\n", topic.c_str());
2472  } else if (pref->get(0).asString() != "TCPROS") {
2473  fprintf(stderr, "Failure looking up topic %s: unsupported protocol %s\n", topic.c_str(), pref->get(0).asString().c_str());
2474  } else {
2475  Value hostname2 = pref->get(1);
2476  Value portnum2 = pref->get(2);
2477  hostname = hostname2.asString();
2478  portnum = portnum2.asInt32();
2479  carrier = "tcpros+role.pub+topic.";
2480  carrier += topic;
2481  yCDebug(PORTCORE, "topic %s available at %s:%d", topic.c_str(), hostname.c_str(), portnum);
2482  }
2483  if (portnum != 0) {
2484  Contact addr(hostname, portnum);
2485  OutputProtocol* op = nullptr;
2486  Route r = Route(getName(), pub, carrier);
2487  op = Carriers::connect(addr);
2488  if (op == nullptr) {
2489  fprintf(stderr, "NO CONNECTION\n");
2490  std::exit(1);
2491  } else {
2492  op->attachPort(m_contactable);
2493  op->open(r);
2494  }
2495  Route route = op->getRoute();
2496  route.swapNames();
2497  op->rename(route);
2498  InputProtocol* ip = &(op->getInput());
2499  {
2500  // Critical section
2501  std::lock_guard<std::mutex> lock(m_stateMutex);
2502  PortCoreUnit* unit = new PortCoreInputUnit(*this,
2503  getNextIndex(),
2504  ip,
2505  true);
2506  yCAssert(PORTCORE, unit != nullptr);
2507  unit->setPupped(pub);
2508  unit->start();
2509  m_units.push_back(unit);
2510  }
2511  }
2512  }
2513  }
2514  }
2515  }
2516  result.addInt32(1);
2517  result.addString("ok");
2518  return result;
2519  };
2520 
2521  auto handleAdminRosRequestTopicCmd = [this]() {
2522  // ROS-style query for topics.
2523  Bottle result;
2524  result.addInt32(1);
2525  NestedContact nc(getName());
2526  result.addString(nc.getNodeName());
2527  Bottle& lst = result.addList();
2528  Contact addr = getAddress();
2529  lst.addString("TCPROS");
2530  lst.addString(addr.getHost());
2531  lst.addInt32(addr.getPort());
2532  return result;
2533  };
2534 
2535  auto handleAdminRosGetPidCmd = []() {
2536  // ROS-style query for PID.
2537  Bottle result;
2538  result.addInt32(1);
2539  result.addString("");
2540  result.addInt32(yarp::os::impl::getpid());
2541  return result;
2542  };
2543 
2544  auto handleAdminRosGetBusInfoCmd = []() {
2545  // ROS-style query for bus information - we support this
2546  // in yarp::os::Node but not otherwise.
2547  Bottle result;
2548  result.addInt32(1);
2549  result.addString("");
2550  result.addList().addList();
2551  return result;
2552  };
2553 
2554  auto handleAdminUnknownCmd = [this](const Bottle& cmd) {
2555  Bottle result;
2556  bool ok = false;
2557  if (m_adminReader != nullptr) {
2558  DummyConnector con;
2559  cmd.write(con.getWriter());
2560  lockCallback();
2561  ok = m_adminReader->read(con.getReader());
2562  unlockCallback();
2563  if (ok) {
2564  result.read(con.getReader());
2565  }
2566  }
2567  if (!ok) {
2568  result.addVocab(Vocab::encode("fail"));
2569  result.addString("send [help] for list of valid commands");
2570  }
2571  return result;
2572  };
2573 
2574  const PortCoreCommand command = parseCommand(cmd.get(0));
2575  switch (command) {
2576  case PortCoreCommand::Help:
2577  result = handleAdminHelpCmd();
2578  break;
2579  case PortCoreCommand::Ver:
2580  result = handleAdminVerCmd();
2581  break;
2582  case PortCoreCommand::Pray:
2583  result = handleAdminPrayCmd();
2584  break;
2585  case PortCoreCommand::Add: {
2586  std::string output = cmd.get(1).asString();
2587  std::string carrier = cmd.get(2).asString();
2588  result = handleAdminAddCmd(std::move(output), carrier);
2589  } break;
2590  case PortCoreCommand::Del: {
2591  const std::string dest = cmd.get(1).asString();
2592  result = handleAdminDelCmd(dest);
2593  } break;
2594  case PortCoreCommand::Atch: {
2595  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab());
2596  Property prop(cmd.get(2).asString().c_str());
2597  result = handleAdminAtchCmd(direction, std::move(prop));
2598  } break;
2599  case PortCoreCommand::Dtch: {
2600  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab());
2601  result = handleAdminDtchCmd(direction);
2602  } break;
2603  case PortCoreCommand::List: {
2604  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab(), true);
2605  const std::string target = cmd.get(2).asString();
2606  result = handleAdminListCmd(direction, target);
2607  } break;
2608  case PortCoreCommand::Set: {
2609  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab(), true);
2610  const std::string target = cmd.get(2).asString();
2611  yarp::os::Property property;
2612  property.fromString(cmd.toString());
2613  switch (direction) {
2614  case PortCoreConnectionDirection::In:
2615  result = handleAdminSetInCmd(target, property);
2616  break;
2617  case PortCoreConnectionDirection::Out:
2618  result = handleAdminSetOutCmd(target, property);
2619  break;
2620  case PortCoreConnectionDirection::Error:
2621  yCAssert(PORTCORE, false); // Should never happen (error is out)
2622  break;
2623  }
2624  } break;
2625  case PortCoreCommand::Get: {
2626  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab(), true);
2627  const std::string target = cmd.get(2).asString();
2628  switch (direction) {
2629  case PortCoreConnectionDirection::In:
2630  result = handleAdminGetInCmd(target);
2631  break;
2632  case PortCoreConnectionDirection::Out:
2633  result = handleAdminGetOutCmd(target);
2634  break;
2635  case PortCoreConnectionDirection::Error:
2636  yCAssert(PORTCORE, false); // Should never happen (error is out)
2637  break;
2638  }
2639  } break;
2640  case PortCoreCommand::Prop: {
2641  PortCorePropertyAction action = parsePropertyAction(cmd.get(1).asVocab());
2642  const std::string key = cmd.get(2).asString();
2643  // Set/get arbitrary properties on a port.
2644  switch (action) {
2645  case PortCorePropertyAction::Get:
2646  result = handleAdminPropGetCmd(key);
2647  break;
2648  case PortCorePropertyAction::Set: {
2649  const Value& value = cmd.get(3);
2650  const Bottle& process = cmd.findGroup("process");
2651  const Bottle& sched = cmd.findGroup("sched");
2652  const Bottle& qos = cmd.findGroup("qos");
2653  result = handleAdminPropSetCmd(key, value, process, sched, qos);
2654  } break;
2655  case PortCorePropertyAction::Error:
2656  result.addVocab(Vocab::encode("fail"));
2657  result.addString("property action not known");
2658  break;
2659  }
2660  } break;
2661  case PortCoreCommand::RosPublisherUpdate: {
2662  yCDebug(PORTCORE, "publisherUpdate! --> %s", cmd.toString().c_str());
2663  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2664  std::string topic = RosNameSpace::fromRosName(cmd.get(2).asString());
2665  Bottle* pubs = cmd.get(3).asList();
2666  result = handleAdminRosPublisherUpdateCmd(topic, pubs);
2667  reader.requestDrop(); // ROS needs us to close down.
2668  } break;
2669  case PortCoreCommand::RosRequestTopic:
2670  yCDebug(PORTCORE, "requestTopic! --> %s", cmd.toString().c_str());
2671  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2672  // std::string topic = RosNameSpace::fromRosName(cmd.get(2).asString()); // Currently unused
2673  // Bottle protocols = cmd.get(3).asList(); // Currently unused
2674  result = handleAdminRosRequestTopicCmd();
2675  reader.requestDrop(); // ROS likes to close down.
2676  break;
2677  case PortCoreCommand::RosGetPid:
2678  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2679  result = handleAdminRosGetPidCmd();
2680  reader.requestDrop(); // ROS likes to close down.
2681  break;
2682  case PortCoreCommand::RosGetBusInfo:
2683  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2684  result = handleAdminRosGetBusInfoCmd();
2685  reader.requestDrop(); // ROS likes to close down.
2686  break;
2687  case PortCoreCommand::Unknown:
2688  result = handleAdminUnknownCmd(cmd);
2689  break;
2690  }
2691 
2692  ConnectionWriter* writer = reader.getWriter();
2693  if (writer != nullptr) {
2694  result.write(*writer);
2695  }
2696 
2697  return true;
2698 }
2699 
2700 
2701 bool PortCore::setTypeOfService(PortCoreUnit* unit, int tos)
2702 {
2703  if (unit == nullptr) {
2704  return false;
2705  }
2706 
2707  yCDebug(PORTCORE, "Trying to set TOS = %d", tos);
2708 
2709  if (unit->isOutput()) {
2710  auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2711  if (outUnit != nullptr) {
2712  OutputProtocol* op = outUnit->getOutPutProtocol();
2713  if (op != nullptr) {
2714  yCDebug(PORTCORE, "Trying to set TOS = %d on output unit", tos);
2715  bool ok = op->getOutputStream().setTypeOfService(tos);
2716  if (!ok) {
2717  yCWarning(PORTCORE, "Setting TOS on output unit failed");
2718  }
2719  return ok;
2720  }
2721  }
2722  }
2723 
2724  // Some of the input units may have output stream object to write back to
2725  // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2726  // also configured for them.
2727 
2728 
2729  if (unit->isInput()) {
2730  auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2731  if (inUnit != nullptr) {
2732  InputProtocol* ip = inUnit->getInPutProtocol();
2733  if ((ip != nullptr) && ip->getOutput().isOk()) {
2734  yCDebug(PORTCORE, "Trying to set TOS = %d on input unit", tos);
2735  bool ok = ip->getOutput().getOutputStream().setTypeOfService(tos);
2736  if (!ok) {
2737  yCWarning(PORTCORE, "Setting TOS on input unit failed");
2738  }
2739  return ok;
2740  }
2741  }
2742  }
2743  // if there is nothing to be set, returns true
2744  return true;
2745 }
2746 
2747 int PortCore::getTypeOfService(PortCoreUnit* unit)
2748 {
2749  if (unit == nullptr) {
2750  return -1;
2751  }
2752 
2753  if (unit->isOutput()) {
2754  auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2755  if (outUnit != nullptr) {
2756  OutputProtocol* op = outUnit->getOutPutProtocol();
2757  if (op != nullptr) {
2758  return op->getOutputStream().getTypeOfService();
2759  }
2760  }
2761  }
2762 
2763  // Some of the input units may have output stream object to write back to
2764  // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2765  // also configured for them.
2766 
2767 
2768  if (unit->isInput()) {
2769  auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2770  if (inUnit != nullptr) {
2771  InputProtocol* ip = inUnit->getInPutProtocol();
2772  if ((ip != nullptr) && ip->getOutput().isOk()) {
2773  return ip->getOutput().getOutputStream().getTypeOfService();
2774  }
2775  }
2776  }
2777  return -1;
2778 }
2779 
2780 // attach a portmonitor plugin to the port or to a specific connection
2781 bool PortCore::attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg)
2782 {
2783  // attach to the current port
2784  Carrier* portmonitor = Carriers::chooseCarrier("portmonitor");
2785  if (portmonitor == nullptr) {
2786  errMsg = "Portmonitor carrier modifier cannot be find or it is not enabled in Yarp!";
2787  return false;
2788  }
2789 
2790  if (isOutput) {
2791  detachPortMonitor(true);
2792  prop.put("source", getName());
2793  prop.put("destination", "");
2794  prop.put("sender_side", 1);
2795  prop.put("receiver_side", 0);
2796  prop.put("carrier", "");
2797  m_modifier.outputMutex.lock();
2798  m_modifier.outputModifier = portmonitor;
2799  if (!m_modifier.outputModifier->configureFromProperty(prop)) {
2800  m_modifier.releaseOutModifier();
2801  errMsg = "Failed to configure the portmonitor plug-in";
2802  m_modifier.outputMutex.unlock();
2803  return false;
2804  }
2805  m_modifier.outputMutex.unlock();
2806  } else {
2807  detachPortMonitor(false);
2808  prop.put("source", "");
2809  prop.put("destination", getName());
2810  prop.put("sender_side", 0);
2811  prop.put("receiver_side", 1);
2812  prop.put("carrier", "");
2813  m_modifier.inputMutex.lock();
2814  m_modifier.inputModifier = portmonitor;
2815  if (!m_modifier.inputModifier->configureFromProperty(prop)) {
2816  m_modifier.releaseInModifier();
2817  errMsg = "Failed to configure the portmonitor plug-in";
2818  m_modifier.inputMutex.unlock();
2819  return false;
2820  }
2821  m_modifier.inputMutex.unlock();
2822  }
2823  return true;
2824 }
2825 
2826 // detach the portmonitor from the port or specific connection
2827 bool PortCore::detachPortMonitor(bool isOutput)
2828 {
2829  if (isOutput) {
2830  m_modifier.outputMutex.lock();
2831  m_modifier.releaseOutModifier();
2832  m_modifier.outputMutex.unlock();
2833  } else {
2834  m_modifier.inputMutex.lock();
2835  m_modifier.releaseInModifier();
2836  m_modifier.inputMutex.unlock();
2837  }
2838  return true;
2839 }
2840 
2841 bool PortCore::setParamPortMonitor(const yarp::os::Property& param,
2842  bool isOutput,
2843  std::string& errMsg)
2844 {
2845  if (isOutput) {
2846  m_modifier.outputMutex.lock();
2847  if (m_modifier.outputModifier == nullptr) {
2848  errMsg = "No port modifier is attached to the output";
2849  m_modifier.outputMutex.unlock();
2850  return false;
2851  }
2852  m_modifier.outputModifier->setCarrierParams(param);
2853  m_modifier.outputMutex.unlock();
2854  } else {
2855  m_modifier.inputMutex.lock();
2856  if (m_modifier.inputModifier == nullptr) {
2857  errMsg = "No port modifier is attached to the input";
2858  m_modifier.inputMutex.unlock();
2859  return false;
2860  }
2861  m_modifier.inputModifier->setCarrierParams(param);
2862  m_modifier.inputMutex.unlock();
2863  }
2864  return true;
2865 }
2866 
2867 bool PortCore::getParamPortMonitor(yarp::os::Property& param,
2868  bool isOutput,
2869  std::string& errMsg)
2870 {
2871  if (isOutput) {
2872  m_modifier.outputMutex.lock();
2873  if (m_modifier.outputModifier == nullptr) {
2874  errMsg = "No port modifier is attached to the output";
2875  m_modifier.outputMutex.unlock();
2876  return false;
2877  }
2878  m_modifier.outputModifier->getCarrierParams(param);
2879  m_modifier.outputMutex.unlock();
2880  } else {
2881  m_modifier.inputMutex.lock();
2882  if (m_modifier.inputModifier == nullptr) {
2883  errMsg = "No port modifier is attached to the input";
2884  m_modifier.inputMutex.unlock();
2885  return false;
2886  }
2887  m_modifier.inputModifier->getCarrierParams(param);
2888  m_modifier.inputMutex.unlock();
2889  }
2890  return true;
2891 }
2892 
2893 void PortCore::reportUnit(PortCoreUnit* unit, bool active)
2894 {
2895  YARP_UNUSED(active);
2896  if (unit != nullptr) {
2897  bool isLog = (!unit->getMode().empty());
2898  if (isLog) {
2899  m_logNeeded = true;
2900  }
2901  }
2902 }
2903 
2904 bool PortCore::setProcessSchedulingParam(int priority, int policy)
2905 {
2906 #if defined(__linux__)
2907  // set the sched properties of all threads within the process
2908  struct sched_param sch_param;
2909  sch_param.__sched_priority = priority;
2910 
2911  DIR* dir;
2912  char path[PATH_MAX];
2913  sprintf(path, "/proc/%d/task/", yarp::os::impl::getpid());
2914 
2915  dir = opendir(path);
2916  if (dir == nullptr) {
2917  return false;
2918  }
2919 
2920  struct dirent* d;
2921  char* end;
2922  long tid = 0;
2923  bool ret = true;
2924  while ((d = readdir(dir)) != nullptr) {
2925  if (isdigit(static_cast<unsigned char>(*d->d_name)) == 0) {
2926  continue;
2927  }
2928 
2929  tid = strtol(d->d_name, &end, 10);
2930  if (d->d_name == end || ((end != nullptr) && (*end != 0))) {
2931  closedir(dir);
2932  return false;
2933  }
2934  ret &= (sched_setscheduler(static_cast<pid_t>(tid), policy, &sch_param) == 0);
2935  }
2936  closedir(dir);
2937  return ret;
2938 #elif defined(YARP_HAS_ACE) // for other platforms
2939  // TODO: Check how to set the scheduling properties of all process's threads in Windows
2940  ACE_Sched_Params param(policy, (ACE_Sched_Priority)priority, ACE_SCOPE_PROCESS);
2941  int ret = ACE_OS::set_scheduling_params(param, yarp::os::impl::getpid());
2942  return (ret != -1);
2943 #else
2944  return false;
2945 #endif
2946 }
2947 
2949 {
2950  m_stateMutex.lock();
2951  if (!readOnly) {
2952  if (m_prop == nullptr) {
2953  m_prop = new Property();
2954  }
2955  }
2956  return m_prop;
2957 }
2958 
2960 {
2961  YARP_UNUSED(prop);
2962  m_stateMutex.unlock();
2963 }
2964 
2965 bool PortCore::removeIO(const Route& route, bool synch)
2966 {
2967  return removeUnit(route, synch);
2968 }
2969 
2970 void PortCore::setName(const std::string& name)
2971 {
2972  m_name = name;
2973 }
2974 
2975 std::string PortCore::getName()
2976 {
2977  return m_name;
2978 }
2979 
2980 int PortCore::getNextIndex()
2981 {
2982  int result = m_counter;
2983  m_counter++;
2984  if (m_counter < 0) {
2985  m_counter = 1;
2986  }
2987  return result;
2988 }
2989 
2991 {
2992  return m_address;
2993 }
2994 
2995 void PortCore::resetPortName(const std::string& str)
2996 {
2997  m_address.setName(str);
2998 }
2999 
3001 {
3002  return m_readableCreator;
3003 }
3004 
3006 {
3007  m_controlRegistration = flag;
3008 }
3009 
3011 {
3012  return m_listening.load();
3013 }
3014 
3016 {
3017  return m_manual;
3018 }
3019 
3021 {
3022  return m_interrupted;
3023 }
3024 
3025 void PortCore::setTimeout(float timeout)
3026 {
3027  m_timeout = timeout;
3028 }
3029 
3030 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3034 {
3035  removeCallbackLock();
3036  if (mutex != nullptr) {
3037  m_old_mutex = mutex;
3038  m_mutexOwned = false;
3039  } else {
3040  m_old_mutex = new yarp::os::Mutex();
3041  m_mutexOwned = true;
3042  }
3043  return true;
3044 }
3046 #endif // YARP_NO_DEPRECATED
3047 
3048 bool PortCore::setCallbackLock(std::mutex* mutex)
3049 {
3050  removeCallbackLock();
3051  if (mutex != nullptr) {
3052  m_mutex = mutex;
3053  m_mutexOwned = false;
3054  } else {
3055  m_mutex = new std::mutex;
3056  m_mutexOwned = true;
3057  }
3058  return true;
3059 }
3060 
3062 {
3063  if (m_mutexOwned && (m_mutex != nullptr)) {
3064  delete m_mutex;
3065  }
3066  m_mutex = nullptr;
3067 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3068  m_old_mutex = nullptr;
3069 #endif // YARP_NO_DEPRECATED
3070  m_mutexOwned = false;
3071  return true;
3072 }
3073 
3075 {
3076  if (m_mutex == nullptr) {
3077 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3078  if (m_old_mutex == nullptr) {
3079  return false;
3080  }
3081  m_old_mutex->lock();
3082  return true;
3083 #else // YARP_NO_DEPRECATED
3084  return false;
3085 #endif // YARP_NO_DEPRECATED
3086  }
3087  m_mutex->lock();
3088  return true;
3089 }
3090 
3092 {
3093  if (m_mutex == nullptr) {
3094 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3095  if (m_old_mutex == nullptr) {
3096  return true;
3097  }
3098  return m_old_mutex->try_lock();
3099 #else // YARP_NO_DEPRECATED
3100  return true;
3101 #endif // YARP_NO_DEPRECATED
3102  }
3103  return m_mutex->try_lock();
3104 }
3105 
3107 {
3108  if (m_mutex == nullptr) {
3109 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3110  if (m_old_mutex == nullptr) {
3111  return;
3112  }
3113  return m_old_mutex->unlock();
3114 #else // YARP_NO_DEPRECATED
3115  return;
3116 #endif // YARP_NO_DEPRECATED
3117  }
3118  m_mutex->unlock();
3119 }
3120 
3122 {
3123  return m_modifier;
3124 }
3125 
3127 {
3128  m_typeMutex.lock();
3129  if (!m_checkedType) {
3130  if (!m_type.isValid()) {
3131  m_type = reader.getReadType();
3132  }
3133  m_checkedType = true;
3134  }
3135  m_typeMutex.unlock();
3136 }
3137 
3139 {
3140  m_typeMutex.lock();
3141  Type t = m_type;
3142  m_typeMutex.unlock();
3143  return t;
3144 }
3145 
3146 void PortCore::promiseType(const Type& typ)
3147 {
3148  m_typeMutex.lock();
3149  m_type = typ;
3150  m_typeMutex.unlock();
3151 }
yarp::os::DummyConnector
A dummy connection to test yarp::os::Portable implementations.
Definition: DummyConnector.h:35
yarp::os::Route::toString
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:141
yarp::os::impl::PortCoreUnit::setDoomed
void setDoomed()
Request that this connection be shut down as soon as possible.
Definition: PortCoreUnit.h:100
yarp::os::impl::PortCore::send
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
Definition: PortCore.cpp:1252
yarp::os::impl::BufferedConnectionWriter::appendLine
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
Definition: BufferedConnectionWriter.cpp:304
yarp::os::impl::PortCore::acquireProperties
Property * acquireProperties(bool readOnly)
Definition: PortCore.cpp:2948
YARP_WARNING_PUSH
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
Definition: system.h:334
yarp::os::Bottle
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
yarp::os::Route::swapNames
void swapNames()
Swap from and to names.
Definition: Route.cpp:136
yarp::os::Value::asVocab
virtual std::int32_t asVocab() const
Get vocabulary identifier as an integer.
Definition: Value.cpp:231
PORTCORE_SEND_LOG
#define PORTCORE_SEND_LOG
Definition: PortCore.h:46
yarp::os::Bottle::toString
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:214
RosNameSpace.h
yarp::os::Property::put
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
Definition: Property.cpp:998
yarp::os::impl::ThreadImpl::getPolicy
int getPolicy()
Definition: ThreadImpl.cpp:307
yarp::os::impl::PortCore::setAdminReadHandler
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
Definition: PortCore.cpp:151
yarp::os::createVocab
constexpr yarp::conf::vocab32_t createVocab(char a, char b=0, char c=0, char d=0)
Definition: Vocab.h:22
yarp::os::impl::StreamConnectionReader::reset
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
Definition: StreamConnectionReader.cpp:49
Network.h
yarp::os::PortReport::report
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
PORTCORE_IS_RPC
#define PORTCORE_IS_RPC
Definition: PortCore.h:50
yarp::os::impl::ConnectionRecorder::init
void init(yarp::os::ConnectionReader *wrappedReader)
Call this to wrap a specific ConnectionReader.
Definition: ConnectionRecorder.cpp:23
yarp::os::PortReader::read
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
yarp::os::Route::getCarrierName
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:126
yarp::os::impl::PortCore::resume
void resume()
Undo an interrupt()
Definition: PortCore.cpp:328
yarp::os::ContactStyle
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:27
yarp::os::NetworkBase::getQueryBypass
static NameStore * getQueryBypass()
Definition: Network.cpp:1415
yarp::os::NestedContact
A placeholder for rich contact information.
Definition: NestedContact.h:27
yarp::os::impl::PortCoreUnit::isFinished
virtual bool isFinished()
Definition: PortCoreUnit.h:74
yarp::os::PortInfo::PORTINFO_CONNECTION
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition: PortInfo.h:43
yarp::os::impl::ThreadImpl::start
virtual bool start()
Definition: ThreadImpl.cpp:187
yarp::os::impl::PortCore::lockCallback
bool lockCallback()
Definition: PortCore.cpp:3074
yarp::os::Bottle::size
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:254
yarp::os::impl::BufferedConnectionWriter::write
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
Definition: BufferedConnectionWriter.cpp:344
yarp::os::impl::PortCoreUnit::setPupped
void setPupped(const std::string &pupString)
Tag this connection as having been created by a publisherUpdate message to the port's administrative ...
Definition: PortCoreUnit.h:251
yarp::os::impl::PortCoreUnit
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:30
yarp::os::Carrier
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:48
yarp::os::OutputProtocol
The output side of an active connection between two ports.
Definition: OutputProtocol.h:33
t
float t
Definition: FfmpegWriter.cpp:74
yarp::os::QosStyle::getDSCPByVocab
static PacketPriorityDSCP getDSCPByVocab(int vocab)
returns the IPV4/6 DSCP value given as DSCP code
Definition: QosStyle.cpp:158
PORTCORE_SEND_NORMAL
#define PORTCORE_SEND_NORMAL
Definition: PortCore.h:45
PORTCORE_IS_INPUT
#define PORTCORE_IS_INPUT
Definition: PortCore.h:51
yarp::os::impl::PortCore::isManual
bool isManual() const
Definition: PortCore.cpp:3015
yarp::os::Carriers::listen
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
Definition: Carriers.cpp:253
yarp::os::InputProtocol::close
virtual void close()=0
Negotiate an end to operations.
yCWarning
#define yCWarning(component,...)
Definition: LogComponent.h:146
yarp::os::OutputProtocol::getOutputStream
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
yarp::os::Type
Definition: Type.h:24
yarp::os::impl::PortCoreUnit::getRoute
virtual Route getRoute()
Definition: PortCoreUnit.h:83
yarp::conf::environment::getEnvironment
std::string getEnvironment(const char *key, bool *found=nullptr)
Read a variable from the environment.
Definition: environment.h:31
yarp::os::SystemInfo::getPlatformInfo
static PlatformInfo getPlatformInfo()
getPlatformInfo
Definition: SystemInfo.cpp:600
yarp::os::NetworkBase::write
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
Definition: Network.cpp:1229
yarp::os::SystemInfo::ProcessInfo::pid
int pid
Definition: SystemInfo.h:121
yarp::os::Property::fromString
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
Definition: Property.cpp:1046
yarp::os::impl::PortCore::getEnvelope
std::string getEnvelope()
Definition: PortCore.cpp:1475
yarp::os::impl::PortDataModifier
This is the heart of a yarp port.
Definition: PortCore.h:113
yarp::os::StringOutputStream::toString
std::string toString() const
Definition: StringOutputStream.h:33
yarp::os::Name::toAddress
Contact toAddress() const
Create an address from the name.
Definition: Name.cpp:30
yarp::os::impl::PortCore::start
bool start() override
Begin main thread.
Definition: PortCore.cpp:280
yarp::os::OutputStream
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:25
yarp::os::impl::PortCore::unlockCallback
void unlockCallback()
Definition: PortCore.cpp:3106
yarp::os::NetworkBase::writeToNameServer
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
Definition: Network.cpp:1986
yarp::os::PortInfo::PORTINFO_MISC
@ PORTINFO_MISC
Unspecified information.
Definition: PortInfo.h:46
yarp::os::Carrier::isConnectionless
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
yarp::os::StringInputStream::add
void add(const std::string &txt)
Definition: StringInputStream.h:47
yarp::os::Carriers::chooseCarrier
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
Definition: Carriers.cpp:236
InputProtocol.h
yarp::os::OutputProtocol::getRoute
virtual const Route & getRoute() const =0
yarp::os::SystemInfo::PlatformInfo
The PlatformInfo struct holds the operating system information.
Definition: SystemInfo.h:84
yarp::os::impl::PortCore::removeCallbackLock
bool removeCallbackLock()
Definition: PortCore.cpp:3061
yarp::os::Carriers::getCarrierTemplate
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
Definition: Carriers.cpp:241
yarp::os::impl::BufferedConnectionWriter
A helper for creating cached object descriptions.
Definition: BufferedConnectionWriter.h:50
yarp::os::Bottle::fromString
void fromString(const std::string &text)
Initializes bottle from a string.
Definition: Bottle.cpp:207
yarp::os::Carrier::isPush
bool isPush() const override
Check if carrier is "push" or "pull" style.
Definition: Carrier.cpp:25
__pc_rpc
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
Definition: PortCore.cpp:1495
YARP_UNUSED
#define YARP_UNUSED(var)
Definition: api.h:159
yarp::os::impl::PortCore::isListening
bool isListening() const
Definition: PortCore.cpp:3010
yarp::os::impl::PortCore::getName
std::string getName()
Definition: PortCore.cpp:2975
yarp::os::NestedContact::getNodeName
std::string getNodeName() const
Definition: NestedContact.cpp:184
yarp::os::InputProtocol::setTimeout
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
yarp::os::Property::find
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Property.cpp:1034
StreamConnectionReader.h
ConnectionRecorder.h
PortCoreOutputUnit.h
yarp::os::impl::StreamConnectionReader
Lets Readable objects read from the underlying InputStream associated with the connection between two...
Definition: StreamConnectionReader.h:42
yarp::os::getpid
int getpid()
Portable wrapper for the getppid() function.
Definition: Os.cpp:94
yarp::os::impl::PortCorePacket::dec
void dec()
Decrement the usage count for this messagae.
Definition: PortCorePacket.h:80
yarp::os::StringOutputStream::reset
void reset()
Definition: StringOutputStream.h:38
ret
bool ret
Definition: ImplementAxisInfo.cpp:72
yarp::os::OutputProtocol::getInput
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
yarp::os::PortReport
A base class for objects that want information about port status changes.
Definition: PortReport.h:31
yarp::os::Carrier::setCarrierParams
void setCarrierParams(const Property &params) override
Configure carrier from port administrative commands.
Definition: Carrier.cpp:118
yarp::os::impl::PortCore::run
void run() override
The body of the main thread.
Definition: PortCore.cpp:168
yarp::os::Route
Information about a connection between two ports.
Definition: Route.h:32
LogComponent.h
rpc
static bool rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader)
Definition: RosLookup.cpp:22
yarp::os::DummyConnector::getReader
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
Definition: DummyConnector.cpp:117
yarp::os::PortWriter
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
yarp::os::impl::PortCore::setCallbackLock
bool setCallbackLock(yarp::os::Mutex *mutex)
Definition: PortCore.cpp:3033
yarp::os::Contact::getRegName
std::string getRegName() const
Get the name associated with this Contact.
Definition: Contact.cpp:220
yarp::os::StringOutputStream
An OutputStream that produces a string.
Definition: StringOutputStream.h:25
yarp::os::impl::PortCore::removeIO
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
Definition: PortCore.cpp:2965
yarp::os::PortInfo
Information about a port connection or event.
Definition: PortInfo.h:29
yarp::os::impl::PortCorePacket::inc
void inc()
Increment the usage count for this messagae.
Definition: PortCorePacket.h:72
yarp::os::Bottle::find
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Bottle.cpp:290
yarp::os::PortInfo::carrierName
std::string carrierName
Name of protocol type, if releveant.
Definition: PortInfo.h:69
Name.h
yarp::os::InputProtocol::getOutput
virtual OutputProtocol & getOutput()=0
Get an interface for doing write operations on the connection.
yarp::os::impl::PortCoreUnit::isInput
virtual bool isInput()
Definition: PortCoreUnit.h:57
yarp::os::impl::PortCore::notifyCompletion
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
Definition: PortCore.cpp:1437
yarp::os::impl::PortCore::PortCore
PortCore()
Constructor.
yarp::os::impl::PortCore::describe
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
Definition: PortCore.cpp:1052
yarp::os::Contact::toURI
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:316
yarp::os::Bottle::addDict
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Definition: Bottle.cpp:191
yarp::os::Bottle::check
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Bottle.cpp:280
yarp::os::impl::PortCore::setName
void setName(const std::string &name)
Set the name of this port.
Definition: PortCore.cpp:2970
yarp::os::Bottle::findGroup
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition: Bottle.cpp:305
PortCoreInputUnit.h
yarp::os::Value::isString
virtual bool isString() const
Checks if value is a string.
Definition: Value.cpp:159
yarp::os::NetworkBase::disconnectInput
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
Definition: Network.cpp:1555
yarp::os::NetworkBase::queryName
static Contact queryName(const std::string &name)
Find out information about a registered name.
Definition: Network.cpp:998
yarp::os::impl::PortCore::promiseType
void promiseType(const Type &typ)
Definition: PortCore.cpp:3146
yarp::os::impl::PortCorePacket
A single message, potentially being transmitted on multiple connections.
Definition: PortCorePacket.h:25
yarp::os::OutputProtocol::open
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
yarp::os::PortInfo::targetName
std::string targetName
Name of connection target, if any.
Definition: PortInfo.h:66
yarp::os::impl::ThreadImpl::setPriority
int setPriority(int priority=-1, int policy=-1)
Definition: ThreadImpl.cpp:249
yarp::os::PortReader
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
yarp::os::Bottle::get
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:249
yarp::os::ContactStyle::quiet
bool quiet
Suppress all outputs and warnings.
Definition: ContactStyle.h:39
yarp::os::Bottle::addList
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition: Bottle.cpp:185
yarp::os::Contact::getPort
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:242
yarp::os::Property::toString
std::string toString() const override
Return a standard text representation of the content of the object.
Definition: Property.cpp:1052
yarp::os::Bottle::write
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition: Bottle.cpp:233
yarp::os::NetworkBase::unregisterName
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
Definition: Network.cpp:1026
yarp::os::ConnectionWriter
An interface for writing to a network connection.
Definition: ConnectionWriter.h:40
yarp::os::impl::PortCore::getEventCount
int getEventCount()
A diagnostic for testing purposes.
Definition: PortCore.cpp:536
yarp::os::SystemInfo::ProcessInfo::name
std::string name
Definition: SystemInfo.h:117
yarp::os::PortInfo::tag
int tag
Type of information.
Definition: PortInfo.h:51
yarp::os::Contact::fromString
static Contact fromString(const std::string &txt)
Factory method.
Definition: Contact.cpp:142
DummyConnector.h
yarp::os::impl::ConnectionRecorder::fini
void fini()
Call this when all reading/writing has been done.
Definition: ConnectionRecorder.cpp:32
yarp::os::NetworkBase::getLocalMode
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
Definition: Network.cpp:1057
BufferedConnectionWriter.h
yarp::os::DummyConnector::getWriter
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
Definition: DummyConnector.cpp:112
yarp::os::Contact::getCarrier
std::string getCarrier() const
Get the carrier associated with this Contact for socket communication.
Definition: Contact.cpp:253
yarp::os::impl::PortCore::readBlock
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
Definition: PortCore.cpp:1203
yarp::os::impl::PortCore::sendHelper
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
Definition: PortCore.cpp:1276
__tcp_check
static bool __tcp_check(const Contact &c)
Definition: PortCore.cpp:1511
yarp::os::impl::PortCore::reportUnit
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
Definition: PortCore.cpp:2893
yarp::os::Value::asString
virtual std::string asString() const
Get string value.
Definition: Value.cpp:237
yarp::os::PortReaderCreator
A creator for readers.
Definition: PortReaderCreator.h:34
yarp::os::SystemInfo::getProcessInfo
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
Definition: SystemInfo.cpp:808
yarp::os::Connection::isPush
virtual bool isPush() const =0
Check if carrier is "push" or "pull" style.
yarp::os::impl::ThreadImpl::join
int join(double seconds=-1)
Definition: ThreadImpl.cpp:123
yarp::os::PortInfo::incoming
bool incoming
True if a connection is incoming, false if outgoing.
Definition: PortInfo.h:54
yarp::os::impl::PortCore::removeOutput
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
Definition: PortCore.cpp:1016
yarp::os::impl::PortCore::getPortModifier
yarp::os::impl::PortDataModifier & getPortModifier()
Definition: PortCore.cpp:3121
yarp::os::Name::getCarrierModifier
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
Definition: Name.cpp:47
yarp::os::OutputProtocol::getConnection
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
yarp::os::OutputProtocol::rename
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
yarp::os::impl::PortCore::setControlRegistration
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
Definition: PortCore.cpp:3005
yarp::os::PortInfo::portName
std::string portName
Name of port.
Definition: PortInfo.h:60
yarp::os::impl::PortCoreUnit::isOutput
virtual bool isOutput()
Definition: PortCoreUnit.h:65
yarp::os::impl::PortCoreUnit::isPupped
bool isPupped() const
Definition: PortCoreUnit.h:225
yarp::os::impl::PortCore::setReadCreator
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
Definition: PortCore.cpp:159
yarp::os::Bottle::addInt32
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition: Bottle.cpp:143
yarp::os::Vocab::encode
NetInt32 encode(const std::string &str)
Convert a string into a vocabulary identifier.
Definition: Vocab.cpp:14
yarp::os::NetworkBase::initialized
static bool initialized()
Returns true if YARP has been fully initialized.
Definition: Network.cpp:1392
yarp::os::impl::PortCoreUnit::isDoomed
bool isDoomed()
Definition: PortCoreUnit.h:92
yarp::os::OutputProtocol::attachPort
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
yarp::os::impl::PortCore::getReadCreator
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Definition: PortCore.cpp:3000
yarp::os::QosStyle::PacketPriorityDSCP
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
Definition: QosStyle.h:48
yarp::os::impl::PortCore::~PortCore
~PortCore()
Destructor.
Definition: PortCore.cpp:64
yarp::os::Name
Simple abstraction for a YARP port name.
Definition: Name.h:22
yarp::os::StringInputStream
An InputStream that reads from a string.
Definition: StringInputStream.h:25
yarp::os::PortInfo::message
std::string message
A human-readable description of contents.
Definition: PortInfo.h:72
yarp::os::Carriers::connect
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
Definition: Carriers.cpp:285
yarp::os::impl::PortCore::manualStart
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
Definition: PortCore.cpp:311
yarp::os::Property::check
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1024
yarp::os::ConnectionReader::getWriter
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
yarp::os::PortReader::getReadType
virtual Type getReadType() const
Definition: PortReader.cpp:15
yarp::os::impl::PortCoreUnit::getIndex
int getIndex()
Definition: PortCoreUnit.h:198
yarp::os::impl::PortCore::releaseProperties
void releaseProperties(Property *prop)
Definition: PortCore.cpp:2959
yarp::os::PortWriter::write
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
yarp::os::Bottle::addString
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:173
yarp::os::InputProtocol::attachPort
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
yarp::os::OutputProtocol::isOk
virtual bool isOk() const =0
Check if the connection is valid and can be used.
yarp::os::OutputProtocol::close
virtual void close()=0
Negotiate an end to operations.
yarp::os::Bottle::addVocab
void addVocab(int x)
Places a vocabulary item in the bottle, at the end of the list.
Definition: Bottle.cpp:167
yarp::os::impl::PortCoreUnit::getMode
std::string getMode(bool *hasMode=nullptr)
Read the "mode" of the connection - basically, whether it is used for logging or not.
Definition: PortCoreUnit.h:211
yarp::os::impl::PortCore::isWriting
bool isWriting()
Check if a message is currently being sent.
Definition: PortCore.cpp:1399
yarp::os::impl::PortCore::tryLockCallback
bool tryLockCallback()
Definition: PortCore.cpp:3091
yCAssert
#define yCAssert(component, x)
Definition: LogComponent.h:172
YARP_WARNING_POP
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
Definition: system.h:335
yarp::os::OutputProtocol::setTimeout
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
yarp::os::impl::PortCore::getAddress
const Contact & getAddress() const
Get the address associated with the port.
Definition: PortCore.cpp:2990
yarp::os::ConnectionReader
An interface for reading from a network connection.
Definition: ConnectionReader.h:40
yarp::os::impl::PortCore::resetReportCallback
void resetReportCallback()
Reset the callback to be notified of changes in port status.
Definition: PortCore.cpp:1183
StringOutputStream.h
yarp::os::impl::PortCore::addOutput
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
Definition: PortCore.cpp:846
yarp::os::SystemInfo::ProcessInfo::schedPriority
int schedPriority
Definition: SystemInfo.h:120
yarp::os::NetType::toString
static std::string toString(int x)
Definition: NetType.cpp:138
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yarp::os::PortWriter::onCommencement
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
Definition: PortWriter.cpp:20
yarp::os::Value::asInt32
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition: Value.cpp:207
yarp::os::impl::PortCoreUnit::send
virtual void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelope, bool waitAfter=true, bool waitBefore=true, bool *gotReply=nullptr)
Send a message on the connection.
Definition: PortCoreUnit.h:130
yarp::os::impl::PortCoreOutputUnit
Manager for a single output from a port.
Definition: PortCoreOutputUnit.h:30
yarp::os::impl::PortCore::getInputCount
int getInputCount()
Check how many input connections there are.
Definition: PortCore.cpp:1418
yarp::os::ContactStyle::carrier
std::string carrier
Request that communication be made using a particular carrier.
Definition: ContactStyle.h:56
yarp::os::SystemInfo::ProcessInfo
The ProcessInfo struct provides the operating system process information.
Definition: SystemInfo.h:116
yCInfo
#define yCInfo(component,...)
Definition: LogComponent.h:135
yarp::os::impl::PortCore::removeInput
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
Definition: PortCore.cpp:1034
yarp::os::impl::PortCore::setEnvelope
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
Definition: PortCore.cpp:1461
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::impl::ThreadImpl::getPriority
int getPriority()
Definition: ThreadImpl.cpp:280
PORTCORE_IS_OUTPUT
#define PORTCORE_IS_OUTPUT
Definition: PortCore.h:52
PortCore.h
yarp::os::Contact::isValid
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:301
yarp::os::impl::PortCoreUnit::setCarrierParams
virtual void setCarrierParams(const yarp::os::Property &params)
Set arbitrary parameters for this connection.
Definition: PortCoreUnit.h:261
yarp::os::SystemInfo::ProcessInfo::schedPolicy
int schedPolicy
Definition: SystemInfo.h:119
yarp
The main, catch-all namespace for YARP.
Definition: environment.h:18
yarp::os::impl::PortCoreInputUnit
Manager for a single input to a port.
Definition: PortCoreInputUnit.h:28
yarp::os::impl::PortCore::report
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
Definition: PortCore.cpp:1189
PlatformUnistd.h
yarp::conf::vocab32_t
std::int32_t vocab32_t
Definition: numeric.h:52
environment.h
yarp::os::Contact::getHost
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:231
yarp::os::impl::PortCore::setReportCallback
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
Definition: PortCore.cpp:1175
yarp::os::Bottle::read
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:243
yarp::os::Contact
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
yarp::os::Route::getToName
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:106
yarp::os::Route::setToContact
void setToContact(const Contact &toContact)
Set the destination contact of the route.
Definition: Route.cpp:121
yarp::os::Value::asList
virtual Bottle * asList() const
Get list value.
Definition: Value.cpp:243
yarp::os::impl::PortCore::getOutputCount
int getOutputCount()
Check how many output connections there are.
Definition: PortCore.cpp:1427
yarp::os::ContactStyle::timeout
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
Definition: ContactStyle.h:50
Time.h
yarp::os::impl::PortCorePacket::setContent
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
Definition: PortCorePacket.h:109
yarp::os::Mutex
Basic wrapper for mutual exclusion.
Definition: Mutex.h:35
yarp::os::InputProtocol
The input side of an active connection between two ports.
Definition: InputProtocol.h:38
yarp::os::Bottle::add
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
Definition: Bottle.cpp:339
yCTrace
#define yCTrace(component,...)
Definition: LogComponent.h:88
yarp::os::impl::PortCore::isInterrupted
bool isInterrupted() const
Definition: PortCore.cpp:3020
yarp::os::impl::PortCore::checkType
void checkType(PortReader &reader)
Definition: PortCore.cpp:3126
yarp::os::impl::PortCore::interrupt
void interrupt()
Prepare the port to be shut down.
Definition: PortCore.cpp:334
yarp::os::impl::PortCore::setReadHandler
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
Definition: PortCore.cpp:143
yarp::os::impl::PortCoreUnit::getCarrierParams
virtual void getCarrierParams(yarp::os::Property &params)
Definition: PortCoreUnit.h:269
yarp::os::impl::ThreadImpl::close
virtual void close()
Definition: ThreadImpl.cpp:158
yarp::os::OutputProtocol::write
virtual bool write(SizedWriter &writer)=0
Write a message on the connection.
yarp::os::impl::ConnectionRecorder
A helper for recording entire message/reply transactions.
Definition: ConnectionRecorder.h:31
yarp::os::Value
A single value (typically within a Bottle).
Definition: Value.h:47
yarp::os::Route::getFromName
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
yarp::os::impl::PortCoreUnit::isBusy
virtual bool isBusy()
Definition: PortCoreUnit.h:167
PortInfo.h
yAssert
#define yAssert(x)
Definition: Log.h:297
YARP_OS_LOG_COMPONENT
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
YARP_DISABLE_DEPRECATED_WARNING
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.
Definition: system.h:336
yarp::os::impl
The components from which ports and connections are built.
yarp::os::impl::PortCore::getType
yarp::os::Type getType()
Definition: PortCore.cpp:3138
yarp::os::ConnectionReader::requestDrop
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
yarp::os::RosNameSpace::fromRosName
static std::string fromRosName(const std::string &name)
Definition: RosNameSpace.cpp:678
yarp::os::impl::PortCoreUnit::getPupString
std::string getPupString() const
Definition: PortCoreUnit.h:237
yarp::os::Contact::setTimeout
void setTimeout(float timeout)
Set timeout for this Contact.
Definition: Contact.cpp:285
yarp::os::impl::ThreadImpl::getTid
long getTid()
Definition: ThreadImpl.cpp:334
yarp::os::impl::PortCore::adminBlock
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
Definition: PortCore.cpp:1663
Bottle.h
yarp::os::SystemInfo::PlatformInfo::name
std::string name
Definition: SystemInfo.h:85
yarp::os::SystemInfo::ProcessInfo::arguments
std::string arguments
Definition: SystemInfo.h:118
yarp::os::QosStyle::DSCP_Invalid
@ DSCP_Invalid
Definition: QosStyle.h:49
yarp::os::OutputStream::setTypeOfService
virtual bool setTypeOfService(int tos)
Definition: OutputStream.cpp:45
yarp::os::Property
A class for storing options and configuration information.
Definition: Property.h:37
yarp::os::impl::PortCore::close
void close() override
Shut down port.
Definition: PortCore.cpp:267
yarp::os::impl::PortCore::listen
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
Definition: PortCore.cpp:71
SystemInfo.h
yarp::os::NetworkBase::disconnect
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
Definition: Network.cpp:703
yarp::os::impl::PortCore::resetPortName
void resetPortName(const std::string &str)
Definition: PortCore.cpp:2995
yarp::os::NetInt32
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:33
yarp::os::impl::PortCore::setTimeout
void setTimeout(float timeout)
Definition: PortCore.cpp:3025
yarp::os::OutputStream::getTypeOfService
virtual int getTypeOfService()
Definition: OutputStream.cpp:51
yarp::os::PortInfo::sourceName
std::string sourceName
Name of connection source, if any.
Definition: PortInfo.h:63