YARP
Yet Another Robot Platform
PortCore.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #ifndef YARP_OS_IMPL_PORTCORE_H
11 #define YARP_OS_IMPL_PORTCORE_H
12 
13 #include <yarp/os/Carriers.h>
14 #include <yarp/os/Contact.h>
15 #include <yarp/os/Contactable.h>
17 #include <yarp/os/PortReader.h>
19 #include <yarp/os/PortReport.h>
20 #include <yarp/os/PortWriter.h>
21 #include <yarp/os/Property.h>
22 #include <yarp/os/Type.h>
23 #include <yarp/os/Vocab.h>
27 
28 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
29 #define YARP_INCLUDING_DEPRECATED_HEADER_ON_PURPOSE
30 #include <yarp/os/Mutex.h>
31 #undef YARP_INCLUDING_DEPRECATED_HEADER_ON_PURPOSE
32 #endif
33 
34 #include <atomic>
35 #include <condition_variable>
36 #include <mutex>
37 #include <vector>
38 
39 namespace yarp {
40 namespace os {
41 namespace impl {
42 
43 class PortCoreUnit;
44 
45 #define PORTCORE_SEND_NORMAL (1)
46 #define PORTCORE_SEND_LOG (2)
47 
48 // some flags for restricting port behavior
49 #define PORTCORE_IS_NULL (0)
50 #define PORTCORE_IS_RPC (1)
51 #define PORTCORE_IS_INPUT (2)
52 #define PORTCORE_IS_OUTPUT (4)
53 
113 {
114 public:
116  outputModifier(nullptr),
117  inputModifier(nullptr)
118  {
119  }
120 
122  {
123  releaseOutModifier();
124  releaseInModifier();
125  }
126 
128  {
129  if (outputModifier != nullptr) {
130  outputModifier->close();
131  delete outputModifier;
132  outputModifier = nullptr;
133  }
134  }
135 
137  {
138  if (inputModifier != nullptr) {
139  inputModifier->close();
140  delete inputModifier;
141  inputModifier = nullptr;
142  }
143  }
144 
145 public:
148  std::mutex outputMutex;
149  std::mutex inputMutex;
150 };
151 
153  public ThreadImpl,
154  public yarp::os::PortReader
155 {
156 public:
161 
165  ~PortCore();
166 
176  bool addOutput(const std::string& dest,
177  void* id,
179  bool onlyIfNeeded = false);
180 
184  void addOutput(OutputProtocol* op);
185 
192  void removeInput(const std::string& src,
193  void* id,
195 
202  void removeOutput(const std::string& dest,
203  void* id,
205 
212  bool removeIO(const Route& route, bool synch = false);
213 
219  void describe(void* id, yarp::os::OutputStream* os);
220 
225  void describe(yarp::os::PortReport& reporter);
226 
233  bool readBlock(ConnectionReader& reader,
234  void* id,
236 
243  bool adminBlock(ConnectionReader& reader,
244  void* id);
245 
250  void setName(const std::string& name);
251 
255  std::string getName();
256 
262  void setEnvelope(const std::string& envelope);
263 
267  bool setEnvelope(yarp::os::PortWriter& envelope);
268 
269  std::string getEnvelope();
270 
274  bool getEnvelope(yarp::os::PortReader& envelope);
275 
282  void report(const yarp::os::PortInfo& info);
283 
291  void reportUnit(PortCoreUnit* unit, bool active);
292 
296  void setFlags(unsigned int flags)
297  {
298  this->m_flags = flags;
299  }
300 
301  void setContactable(Contactable* contactable)
302  {
303  this->m_contactable = contactable;
304  }
305 
309  unsigned int getFlags()
310  {
311  return m_flags;
312  }
313 
317  bool listen(const Contact& address, bool shouldAnnounce = true);
318 
322  bool isWriting();
323 
327  int getInputCount();
328 
332  int getOutputCount();
333 
337  void setReadHandler(yarp::os::PortReader& reader);
338 
342  void setAdminReadHandler(yarp::os::PortReader& reader);
343 
347  void setReadCreator(yarp::os::PortReaderCreator& creator);
348 
353  void setWaitBeforeSend(bool waitBeforeSend)
354  {
355  this->m_waitBeforeSend = waitBeforeSend;
356  }
357 
362  void setWaitAfterSend(bool waitAfterSend)
363  {
364  this->m_waitAfterSend = waitAfterSend;
365  }
366 
370  bool read(yarp::os::ConnectionReader& reader) override
371  {
372  // does nothing by default
373  YARP_UNUSED(reader);
374  return true;
375  }
376 
380  bool start() override;
381 
385  bool manualStart(const char* sourceName);
386 
393  bool send(const yarp::os::PortWriter& writer,
394  yarp::os::PortReader* reader = nullptr,
395  const yarp::os::PortWriter* callback = nullptr);
396 
403  bool sendHelper(const yarp::os::PortWriter& writer,
404  int mode,
405  yarp::os::PortReader* reader = nullptr,
406  const yarp::os::PortWriter* callback = nullptr);
407 
411  void close() override;
412 
416  void run() override;
417 
421  int getEventCount();
422 
426  const Contact& getAddress() const;
427 
428  void resetPortName(const std::string& str);
429 
433  yarp::os::PortReaderCreator* getReadCreator();
434 
438  void notifyCompletion(void* tracker);
439 
444  void setControlRegistration(bool flag);
445 
449  void interrupt();
450 
454  void resume();
455 
459  void setReportCallback(yarp::os::PortReport* reporter);
460 
464  void resetReportCallback();
465 
470  bool isListening() const;
471 
476  bool isManual() const;
477 
481  bool isInterrupted() const;
482 
483  void setTimeout(float timeout);
484 
488  Property* acquireProperties(bool readOnly);
489 
493  void releaseProperties(Property* prop);
494 
495 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
499  bool setCallbackLock(yarp::os::Mutex* mutex);
501 #endif // YARP_NO_DEPRECATED
502 
503  bool setCallbackLock(std::mutex* mutex = nullptr);
504 
505  bool removeCallbackLock();
506 
507  bool lockCallback();
508 
509  bool tryLockCallback();
510 
511  void unlockCallback();
512 
513  yarp::os::impl::PortDataModifier& getPortModifier();
514 
515  void checkType(PortReader& reader);
516 
517  yarp::os::Type getType();
518 
519  void promiseType(const Type& typ);
520 
521 private:
522  // main internal PortCore state and operations
523  std::vector<PortCoreUnit *> m_units;
524  std::mutex m_stateMutex;
525  std::condition_variable m_stateCv;
526  std::mutex m_packetMutex;
527  std::condition_variable m_connectionChangeCv;
528  Face* m_face {nullptr};
529  std::string m_name;
530  yarp::os::Contact m_address;
531  yarp::os::PortReader *m_reader {nullptr};
532  yarp::os::PortReader *m_adminReader {nullptr};
533  yarp::os::PortReaderCreator *m_readableCreator {nullptr};
534  yarp::os::PortReport *m_eventReporter {nullptr};
535  std::atomic<bool> m_listening {false};
536  std::atomic<bool> m_running {false};
537  std::atomic<bool> m_starting {false};
538  std::atomic<bool> m_closing {false};
539  std::atomic<bool> m_finished {false};
540  bool m_finishing {false};
541  bool m_waitBeforeSend {true};
542  bool m_waitAfterSend {true};
543  bool m_controlRegistration {true};
544  bool m_interruptable {true};
545  bool m_interrupted {false};
546  bool m_manual {false};
547  int m_events {0};
548  int m_connectionListeners {0};
549  int m_inputCount {0};
550  int m_outputCount {0};
551  int m_dataOutputCount {0};
552  unsigned int m_flags {PORTCORE_IS_INPUT | PORTCORE_IS_OUTPUT};
553  bool m_logNeeded {false};
554  PortCorePackets m_packets {};
555  std::string m_envelope;
556  float m_timeout {-1};
557  int m_counter {1};
558  yarp::os::Property *m_prop {nullptr};
559  yarp::os::Contactable *m_contactable {nullptr};
560  std::mutex* m_mutex {nullptr};
561 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
564  yarp::os::Mutex* m_old_mutex {nullptr};
566 #endif // YARP_NO_DEPRECATED
567  bool m_mutexOwned {false};
568  BufferedConnectionWriter m_envelopeWriter {true};
569 
570  std::mutex m_typeMutex;
571  bool m_checkedType {false};
572  Type m_type;
573 
574  // port data modifier
576 
577  // set IP packet TOS
578  bool setTypeOfService(PortCoreUnit* unit, int tos);
579 
580  // get IP packet TOS
581  int getTypeOfService(PortCoreUnit* unit);
582 
583  // set the scheduling properties of all threads
584  // within the process scope.
585  bool setProcessSchedulingParam(int priority = -1, int policy = -1);
586 
587  // attach a portmonitor plugin to the port or to a specific connection
588  bool attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg);
589 
590  // detach the portmonitor from the port or specific connection
591  bool detachPortMonitor(bool isOutput);
592 
593  // set the parameter for the portmonitor of the port (if any)
594  bool setParamPortMonitor(const yarp::os::Property& param, bool isOutput, std::string& errMsg);
595 
596  // get the parameters from the portmonitor of the port (if any)
597  bool getParamPortMonitor(yarp::os::Property& param, bool isOutput, std::string& errMsg);
598 
599  void closeMain();
600 
601  bool isUnit(const Route& route, int index);
602 
603  // only called in "finished" phase
604  void closeUnits();
605 
606  // called anytime, garbage collects terminated units
607  void cleanUnits(bool blocking = true);
608 
609  // only called by the manager
610  void reapUnits();
611 
612  // only called in "running" phase
613  void addInput(InputProtocol* ip);
614 
615  bool removeUnit(const Route& route, bool synch = false, bool* except = nullptr);
616 
617  int getNextIndex();
618 };
619 
620 } // namespace impl
621 } // namespace os
622 } // namespace yarp
623 
624 #endif // YARP_OS_IMPL_PORTCORE_H
YARP_WARNING_PUSH
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
Definition: system.h:334
PortCorePackets.h
yarp::os::impl::PortCore::setContactable
void setContactable(Contactable *contactable)
Definition: PortCore.h:301
yarp::os::impl::PortCoreUnit
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:30
yarp::os::impl::PortCore::read
bool read(yarp::os::ConnectionReader &reader) override
Callback for data.
Definition: PortCore.h:370
yarp::os::Carrier
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:48
yarp::os::OutputProtocol
The output side of an active connection between two ports.
Definition: OutputProtocol.h:33
PORTCORE_IS_INPUT
#define PORTCORE_IS_INPUT
Definition: PortCore.h:51
yarp::os::Type
Definition: Type.h:24
ThreadImpl.h
yarp::os::impl::PortDataModifier::outputModifier
yarp::os::Carrier * outputModifier
Definition: PortCore.h:146
yarp::os::impl::PortDataModifier
This is the heart of a yarp port.
Definition: PortCore.h:113
yarp::os::impl::PortDataModifier::outputMutex
std::mutex outputMutex
Definition: PortCore.h:148
yarp::os::OutputStream
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:25
yarp::os::impl::PortCore::getFlags
unsigned int getFlags()
Check current configuration of port.
Definition: PortCore.h:309
YARP_DEPRECATED
#define YARP_DEPRECATED
Expands to either the standard [[deprecated]] attribute or a compiler-specific decorator such as __at...
Definition: compiler.h:2882
yarp::os::impl::PortCore::setWaitBeforeSend
void setWaitBeforeSend(bool waitBeforeSend)
Upon being asked to send a message, should we wait for any existing message to be sent to all destina...
Definition: PortCore.h:353
YARP_UNUSED
#define YARP_UNUSED(var)
Definition: api.h:159
Carriers.h
yarp::os::impl::PortDataModifier::releaseOutModifier
void releaseOutModifier()
Definition: PortCore.h:127
yarp::os::PortReport
A base class for objects that want information about port status changes.
Definition: PortReport.h:31
yarp::os::impl::PortCore
Definition: PortCore.h:155
yarp::os::Route
Information about a connection between two ports.
Definition: Route.h:32
yarp::os::PortWriter
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
yarp::os::PortInfo
Information about a port connection or event.
Definition: PortInfo.h:29
yarp::os::impl::PortCore::setWaitAfterSend
void setWaitAfterSend(bool waitAfterSend)
After sending a message, should we wait for it to be sent to all destinations before returning?
Definition: PortCore.h:362
yarp::os::impl::PortCore::PortCore
PortCore()
Constructor.
PortWriter.h
PortReaderCreator.h
Type.h
yarp::os::PortReader
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
yarp::os::impl::PortDataModifier::releaseInModifier
void releaseInModifier()
Definition: PortCore.h:136
Property.h
BufferedConnectionWriter.h
yarp::os::PortReaderCreator
A creator for readers.
Definition: PortReaderCreator.h:34
yarp::os::impl::PortDataModifier::~PortDataModifier
virtual ~PortDataModifier()
Definition: PortCore.h:121
yarp::os::impl::PortDataModifier::inputModifier
yarp::os::Carrier * inputModifier
Definition: PortCore.h:147
yarp::os::impl::PortCore::setFlags
void setFlags(unsigned int flags)
Configure the port to meet certain restrictions in behavior.
Definition: PortCore.h:296
ModifyingCarrier.h
yarp::os::impl::PortDataModifier::inputMutex
std::mutex inputMutex
Definition: PortCore.h:149
YARP_WARNING_POP
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
Definition: system.h:335
yarp::os::ConnectionReader
An interface for reading from a network connection.
Definition: ConnectionReader.h:40
Mutex.h
yarp::os::impl::ThreadImpl
An abstraction for a thread of execution.
Definition: ThreadImpl.h:26
PORTCORE_IS_OUTPUT
#define PORTCORE_IS_OUTPUT
Definition: PortCore.h:52
yarp
The main, catch-all namespace for YARP.
Definition: environment.h:18
Vocab.h
yarp::os::impl::PortDataModifier::PortDataModifier
PortDataModifier()
Definition: PortCore.h:115
YARP_os_impl_API
#define YARP_os_impl_API
Definition: api.h:45
yarp::os::Contact
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
PortReader.h
yarp::os::Mutex
Basic wrapper for mutual exclusion.
Definition: Mutex.h:35
Contact.h
YARP_DISABLE_DEPRECATED_WARNING
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.
Definition: system.h:336
yarp::os::Face
The initial point-of-contact with a port.
Definition: Face.h:24
PortReport.h
yarp::os::Property
A class for storing options and configuration information.
Definition: Property.h:37
Contactable.h
yarp::os::Contactable
An abstract port.
Definition: Contactable.h:38