|
YARP
Yet Another Robot Platform
|
|
Go to the documentation of this file.
15 #define YARPRUN_NORESPONSE 1
16 #define YARPRUN_NOCONNECTION 2
17 #define YARPRUN_CONNECTION_TIMOUT 3
18 #define YARPRUN_SEMAPHORE_PARAM 4
19 #define YARPRUN_UNDEF 5
21 #define CONNECTION_TIMEOUT 2.0 //seconds
22 #define RUN_TIMEOUT 10.0 //seconds
23 #define STOP_TIMEOUT 15.0
24 #define KILL_TIMEOUT 10.0
25 #define EVENT_THREAD_PERIOD 0.5 //seconds
32 " (Remote host does not respond) ",
33 " (Remote host does no exist) ",
34 " (Timeout while connecting to the remote host) ",
35 " (Blocked in broker semaphor) ",
36 " (Undefined message) " };
46 bOnlyConnector = bInitialized =
false;
59 if(PeriodicThread::isRunning())
60 PeriodicThread::stop();
71 strError =
"Yarp network server is not up.";
75 bOnlyConnector =
true;
89 const char* szhost,
const char* szstdio,
90 const char* szworkdir,
const char* szenv )
107 strError =
"command is not specified.";
114 strError =
"remote host port is not specified.";
120 strHost = string(
"/") + string(szhost);
127 if(strlen(szworkdir))
128 strWorkdir = szworkdir;
132 if(szstdio[0] !=
'/')
133 strStdio = string(
"/") + string(szstdio);
143 strTag = strHost + strCmd + strParam + strEnv + sstrID.str();
144 string::iterator itr;
145 for(itr=strTag.begin(); itr!=strTag.end(); itr++)
146 if(((*itr) ==
' ') || ((*itr) ==
'/') )
149 __trace_message =
"(init) checking yarp network";
150 if(!NetworkBase::checkNetwork(5.0))
152 strError =
"Yarp network server is not up.";
153 __trace_message.clear();
157 __trace_message = string(
"(init) checking existence of ") + strHost;
158 if(!
exists(strHost.c_str()))
161 strError +=
" does not exist. check yarprun is running as server.";
162 __trace_message.clear();
183 if(!bInitialized)
return false;
184 if(bOnlyConnector)
return false;
187 int ret = requestServer(runProperty());
190 strError =
"cannot ask ";
192 strError +=
" to run ";
196 strError += string(
" due to " + __trace_message);
200 double base = SystemClock::nowSystem();
205 if(strStdioUUID.size())
207 if(PeriodicThread::isRunning())
208 PeriodicThread::stop();
209 PeriodicThread::start();
215 strError =
"cannot run ";
224 if(!bInitialized)
return true;
225 if(bOnlyConnector)
return false;
241 strError =
"cannot ask ";
243 strError +=
" to stop ";
247 strError += string(
" due to " + __trace_message);
251 double base = SystemClock::nowSystem();
256 PeriodicThread::stop();
261 strError =
"Timeout! Cannot stop ";
265 PeriodicThread::stop();
271 if(!bInitialized)
return true;
272 if(bOnlyConnector)
return false;
289 strError =
"cannot ask ";
291 strError +=
" to kill ";
295 strError += string(
" due to " + __trace_message);
299 double base = SystemClock::nowSystem();
304 PeriodicThread::stop();
309 strError =
"cannot kill ";
313 PeriodicThread::stop();
320 if(!bInitialized)
return -1;
321 if(bOnlyConnector)
return -1;
336 int ret = SendMsg(msg, strHost, response, 3.0);
339 strError =
"cannot ask ";
341 strError +=
" to check for status of ";
345 strError += string(
" due to " + __trace_message);
348 return ((response.
get(0).
asString() ==
"running")?1:0);
365 string cmd = strCmd + string(
" ") + strParam;
366 command.
put(
"cmd", cmd);
367 command.
put(
"on", strHost);
368 command.
put(
"as", strTag);
369 if(!strWorkdir.empty())
370 command.
put(
"workdir", strWorkdir);
371 if(!strStdio.empty())
372 command.
put(
"stdio", strStdio);
374 command.
put(
"env", strEnv);
384 const char* carrier,
bool persist)
388 strError =
"no source port is introduced.";
394 strError =
"no destination port is introduced.";
409 string strCarrier = carrier;
410 bool needDisconnect = strCarrier.find(
"udp") == (size_t)0;
411 needDisconnect |= strCarrier.find(
"mcast") == (size_t)0;
412 if(needDisconnect ==
false) {
413 if(NetworkBase::isConnected(from, to, style))
417 NetworkBase::connect(from, to, style);
420 strError =
"cannot connect ";
422 strError +=
" to " + string(to);
428 string topic = string(
"topic:/") + string(from) + string(to);
429 NetworkBase::connect(from, topic, style);
430 NetworkBase::connect(topic, to, style);
433 strError =
"a persistent connection from ";
435 strError +=
" to " + string(to);
436 strError +=
" is created but not connected.";
450 strError =
"no source port is introduced.";
456 strError =
"no destination port is introduced.";
483 if(!NetworkBase::disconnect(from, to, style))
485 strError =
"cannot disconnect ";
487 strError +=
" from " + string(to);
499 return NetworkBase::exists(szport, style);
504 if((szport==
nullptr) || (request==
nullptr))
513 if(!port.
open(
"..."))
520 for(
int i=0; i<10; i++) {
521 ret = NetworkBase::connect(port.
getName(), szport, style);
523 SystemClock::delaySystem(1.0);
534 NetworkBase::disconnect(port.
getName(), szport);
552 return NetworkBase::isConnected(from, to, style);
559 if(!semParam.
check())
565 if(!port.
open(
"...")) {
566 __trace_message.clear();
582 __trace_message =
"(getSystemInfo) connecting to " + string(port.
getName());
587 strError = string(
"Cannot connect to ") + string(server);
588 __trace_message.clear();
593 __trace_message =
"(getSystemInfo) writing to " + string(port.
getName());
595 __trace_message =
"(getSystemInfo) disconnecting from " + string(port.
getName());
596 NetworkBase::disconnect(port.
getName(), server);
601 strError = string(server) + string(
" does not respond");
602 __trace_message.clear();
608 __trace_message.clear();
621 bool ret = NetworkBase::writeToNameServer(cmd, reply, style);
624 strError =
"Failed to reach name server\n";
632 const char* delm =
"registration name ";
634 while((pos1 = str.find(delm)) != std::string::npos)
636 str = str.substr(pos1+strlen(delm));
637 if((pos2 = str.find(
' ')) != std::string::npos)
638 ports.push_back(str.substr(0, pos2));
658 int ret = SendMsg(msg, server, response, 3.0);
661 for(
size_t i=0; i<response.
size(); i++)
673 processes.push_back(proc);
678 strError =
"cannot ask ";
680 strError +=
" to give the list of running processes.";
683 strError += string(
" due to " + __trace_message);
690 string topic = string(from) + string(to);
703 const char *qosFrom,
const char *qosTo) {
706 if(qosFrom && qosTo && !strlen(qosFrom) && !strlen(qosTo))
711 if(qosFrom !=
nullptr && strlen(qosFrom)) {
712 if(!getQosFromString(qosFrom, styleFrom)) {
713 strError =
"Error in parsing Qos properties of " + string(from);
717 if(qosTo !=
nullptr && strlen(qosTo))
718 if(!getQosFromString(qosTo, styleTo)) {
719 strError =
"Error in parsing Qos properties of " + string(to);
722 return NetworkBase::setConnectionQos(from, to, styleFrom, styleTo,
true);
727 transform(strQos.begin(), strQos.end(), strQos.begin(),
728 (
int(*)(
int))toupper);
729 strQos.erase( std::remove_if( strQos.begin(), strQos.end(), ::isspace ), strQos.end() );
732 stringstream ss(strQos);
734 while(getline(ss, prop,
';')) {
735 size_t p = prop.find(
':');
736 if (p != prop.npos) {
737 string key = prop.substr(0, p);
738 string value = prop.substr(p+1);
739 if (key.length() > 0 && value.length() > 0) {
740 if (key ==
"LEVEL" || key==
"DSCP" || key ==
"TOS") {
744 else if (key ==
"PRIORITY") {
746 int prio = strtol(value.c_str(), &p, 10);
749 else if (key ==
"POLICY") {
751 int policy = strtol(value.c_str(), &p, 10);
762 return strError.c_str();
766 bool YarpBroker::timeout(
double base,
double timeout)
768 SystemClock::delaySystem(1.0);
769 if((SystemClock::nowSystem()-base) > timeout)
776 if(!strStdioUUID.size())
779 string strStdioPort = strStdioUUID +
"/stdout";
780 stdioPort.
open(
"...");
782 double base = SystemClock::nowSystem();
786 while(!timeout(base, 5.0))
788 if(NetworkBase::connect(strStdioPort, stdioPort.
getName(), style))
792 strError =
"Cannot connect to stdio port ";
793 strError += strStdioPort;
804 for (
size_t i=0; i<input->
size(); i++)
812 NetworkBase::disconnect(stdioPort.
getName(), strStdioUUID);
817 int YarpBroker::SendMsg(
Bottle& msg, std::string target,
Bottle& response,
float fTimeout)
819 if(!
exists(target.c_str()))
822 if(!semParam.
check())
828 if(!port.
open(
"..."))
830 __trace_message.clear();
840 __trace_message =
"(SendMsg) connecting to " + string(target);
841 for(
int i=0; i<10; i++)
843 ret = NetworkBase::connect(port.
getName(), target, style);
845 SystemClock::delaySystem(1.0);
851 __trace_message.clear();
856 __trace_message =
"(SendMsg) writing to " + string(target);
858 __trace_message =
"(SendMsg) disconnecting from " + string(target);
859 NetworkBase::disconnect(port.
getName(),target);
860 __trace_message.clear();
874 int YarpBroker::requestServer(
Property& config)
882 if (config.
check(
"cmd") && config.
check(
"stdio"))
905 if(response.
size() > 2)
915 if (config.
check(
"cmd"))
void close() override
Stop port activity.
A simple collection of objects that can be described and transmitted in a portable way.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
std::string toString() const override
Gives a human-readable textual representation of the bottle.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
void clear()
Empties the bottle of any objects it contains.
#define CONNECTION_TIMEOUT
Preferences for the port's Quality of Service.
size_type size() const
Gets the number of elements in the bottle.
bool rmconnect(const char *from, const char *to)
T * read(bool shouldWait=true) override
Read an available object from the port.
bool getSystemInfo(const char *server, yarp::os::SystemInfoSerializer &info)
#define YARPRUN_SEMAPHORE_PARAM
std::stringstream OSTRINGSTREAM
void threadRelease() override
Release method.
#define YARPRUN_NOCONNECTION
virtual void onBrokerStdout(const char *msg)
void run() override
Loop function.
void fromString(const std::string &text)
Initializes bottle from a string.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
bool disconnect(const char *from, const char *to, const char *carrier) override
void wait()
Decrement the counter, even if we must wait to do that.
bool setPacketPriority(const std::string &priority)
sets the packet priority from a string.
const char * yarprun_err_msg[]
#define EVENT_THREAD_PERIOD
A mini-server for network communication.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
bool setQos(const char *from, const char *to, const char *qosFrom, const char *qosTo)
void detachStdout() override
virtual bool isString() const
Checks if value is a string.
void post()
Increment the counter.
bool check()
Decrement the counter, unless that would require waiting.
bool getAllPorts(std::vector< std::string > &stingList)
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
std::vector< Process > ProcessContainer
bool threadInit() override
Initialization method.
bool connected(const char *from, const char *to, const char *carrier) override
virtual std::string asString() const
Get string value.
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
void setThreadPriority(int priority)
sets the communication thread priority level
unsigned int generateID()
bool setTimeout(float timeout)
Set a timeout on network operations.
BrokerEventSink * eventSink
bool check(const std::string &key) const override
Check if there exists a property of the given name.
std::string getName() const override
Get name of port.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
bool attachStdout() override
bool connect(const char *from, const char *to, const char *carrier, bool persist=false) override
connection broker
An abstraction for a periodic thread.
void clear()
Remove all associations.
void setThreadPolicy(int policy)
sets the communication thread scheduling policy
virtual std::int32_t asInt32() const
Get 32-bit integer value.
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
A helper class to pass the SystemInfo object around the YARP network.
const char * error() override
bool getAllProcesses(const char *server, ProcessContainer &processes)
An interface to the operating system, including Port based communication.
#define YARPRUN_NORESPONSE
void close() override
Stop port activity.
#define YARPRUN_CONNECTION_TIMOUT
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
const char * requestRpc(const char *szport, const char *request, double timeout) override
bool write(const ImageOf< PixelRgb > &src, const std::string &dest, image_fileformat format=FORMAT_PPM)
The components from which ports and connections are built.
bool exists(const char *port) override
A class for storing options and configuration information.