YARP
Yet Another Robot Platform
yarpbroker.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * All rights reserved.
4  *
5  * This software may be modified and distributed under the terms of the
6  * BSD-3-Clause license. See the accompanying LICENSE file for details.
7  */
8 
10 
11 #include <csignal>
12 #include <cstring>
13 
14 #define YARPRUN_OK 0
15 #define YARPRUN_NORESPONSE 1
16 #define YARPRUN_NOCONNECTION 2
17 #define YARPRUN_CONNECTION_TIMOUT 3
18 #define YARPRUN_SEMAPHORE_PARAM 4
19 #define YARPRUN_UNDEF 5
20 
21 #define CONNECTION_TIMEOUT 2.0 //seconds
22 #define RUN_TIMEOUT 10.0 //seconds
23 #define STOP_TIMEOUT 15.0
24 #define KILL_TIMEOUT 10.0
25 #define EVENT_THREAD_PERIOD 0.5 //seconds
26 
27 #if defined(_WIN32)
28  #define SIGKILL 9
29 #endif
30 
31 const char* yarprun_err_msg[] = { " (Ok) ",
32  " (Remote host does not respond) ",
33  " (Remote host does no exist) ",
34  " (Timeout while connecting to the remote host) ",
35  " (Blocked in broker semaphor) ",
36  " (Undefined message) " };
37 
38 using namespace yarp::os;
39 using namespace yarp::os::impl;
40 using namespace std;
41 using namespace yarp::manager;
42 
43 
44 YarpBroker::YarpBroker() : PeriodicThread(EVENT_THREAD_PERIOD)
45 {
46  bOnlyConnector = bInitialized = false;
47  ID = generateID();
48  strStdioUUID.clear();
49 }
50 
51 
53 {
54  fini();
55 }
56 
58 {
59  if(PeriodicThread::isRunning())
60  PeriodicThread::stop();
61  //port.close();
62 }
63 
65 {
66  //if(bInitialized)
67  // return true;
68 
69  if(!NetworkBase::checkNetwork(CONNECTION_TIMEOUT))
70  {
71  strError = "Yarp network server is not up.";
72  return false;
73  }
74  bInitialized = true;
75  bOnlyConnector = true;
76 
77  /*
78  semParam.wait();
79  __trace_message = "(init) opening port ...";
80  port.setTimeout(CONNECTION_TIMEOUT);
81  port.open("...");
82  __trace_message.clear();
83  semParam.post();
84  */
85  return true;
86 }
87 
88 bool YarpBroker::init(const char* szcmd, const char* szparam,
89  const char* szhost, const char* szstdio,
90  const char* szworkdir, const char* szenv )
91 {
92  //if(bInitialized)
93  // return true;
94 
95  semParam.wait();
96 
97  strCmd.clear();
98  strParam.clear();
99  strHost.clear();
100  strStdio.clear();
101  strWorkdir.clear();
102  strTag.clear();
103  strEnv.clear();
104 
105  if(!szcmd)
106  {
107  strError = "command is not specified.";
108  semParam.post();
109  return false;
110  }
111 
112  if(!szhost)
113  {
114  strError = "remote host port is not specified.";
115  semParam.post();
116  return false;
117  }
118 
119  if(szhost[0] != '/')
120  strHost = string("/") + string(szhost);
121  else
122  strHost = szhost;
123 
124  strCmd = szcmd;
125  if(strlen(szparam))
126  strParam = szparam;
127  if(strlen(szworkdir))
128  strWorkdir = szworkdir;
129 
130  if(strlen(szstdio))
131  {
132  if(szstdio[0] != '/')
133  strStdio = string("/") + string(szstdio);
134  else
135  strStdio = szstdio;
136  }
137 
138  if(strlen(szenv))
139  strEnv = szenv;
140 
141  OSTRINGSTREAM sstrID;
142  sstrID<<(int)ID;
143  strTag = strHost + strCmd + strParam + strEnv + sstrID.str();
144  string::iterator itr;
145  for(itr=strTag.begin(); itr!=strTag.end(); itr++)
146  if(((*itr) == ' ') || ((*itr) == '/') )
147  (*itr) = ':';
148 
149  __trace_message = "(init) checking yarp network";
150  if(!NetworkBase::checkNetwork(5.0))
151  {
152  strError = "Yarp network server is not up.";
153  __trace_message.clear();
154  semParam.post();
155  return false;
156  }
157  __trace_message = string("(init) checking existence of ") + strHost;
158  if(!exists(strHost.c_str()))
159  {
160  strError = szhost;
161  strError += " does not exist. check yarprun is running as server.";
162  __trace_message.clear();
163  semParam.post();
164  return false;
165  }
166 
167  /*
168  port.setTimeout(CONNECTION_TIMEOUT);
169  __trace_message = "(init) opening port ...";
170  port.open("...");
171  __trace_message.clear();
172  */
173 
174  bInitialized = true;
175  semParam.post();
176 
177  return true;
178 }
179 
180 
182 {
183  if(!bInitialized) return false;
184  if(bOnlyConnector) return false;
185 
186  strError.clear();
187  int ret = requestServer(runProperty());
188  if(ret != YARPRUN_OK)
189  {
190  strError = "cannot ask ";
191  strError += strHost;
192  strError += " to run ";
193  strError += strCmd;
194  strError += yarprun_err_msg[ret];
196  strError += string(" due to " + __trace_message);
197  return false;
198  }
199 
200  double base = SystemClock::nowSystem();
201  while(!timeout(base, RUN_TIMEOUT))
202  {
203  if(running() == 1)
204  {
205  if(strStdioUUID.size())
206  {
207  if(PeriodicThread::isRunning())
208  PeriodicThread::stop();
209  PeriodicThread::start();
210  }
211  return true;
212  }
213  }
214 
215  strError = "cannot run ";
216  strError += strCmd;
217  strError += " on ";
218  strError += strHost;
219  return false;
220 }
221 
223 {
224  if(!bInitialized) return true;
225  if(bOnlyConnector) return false;
226 
227  strError.clear();
228  yarp::os::Bottle msg,grp,response;
229 
230  grp.clear();
231  grp.addString("on");
232  grp.addString(strHost.c_str());
233  msg.addList()=grp;
234  grp.clear();
235  grp.addString("sigterm");
236  grp.addString(strTag.c_str());
237  msg.addList()=grp;
238  int ret = SendMsg(msg, strHost, response, CONNECTION_TIMEOUT);
239  if(ret != YARPRUN_OK)
240  {
241  strError = "cannot ask ";
242  strError += strHost;
243  strError += " to stop ";
244  strError += strCmd;
245  strError += yarprun_err_msg[ret];
247  strError += string(" due to " + __trace_message);
248  return false;
249  }
250 
251  double base = SystemClock::nowSystem();
252  while(!timeout(base, STOP_TIMEOUT))
253  {
254  if(running() == 0)
255  {
256  PeriodicThread::stop();
257  return true;
258  }
259  }
260 
261  strError = "Timeout! Cannot stop ";
262  strError += strCmd;
263  strError += " on ";
264  strError += strHost;
265  PeriodicThread::stop();
266  return false;
267 }
268 
270 {
271  if(!bInitialized) return true;
272  if(bOnlyConnector) return false;
273 
274  strError.clear();
275 
276  yarp::os::Bottle msg,grp,response;
277  grp.clear();
278  grp.addString("on");
279  grp.addString(strHost.c_str());
280  msg.addList() = grp;
281  grp.clear();
282  grp.addString("kill");
283  grp.addString(strTag.c_str());
284  grp.addInt32(SIGKILL);
285  msg.addList() = grp;
286  int ret = SendMsg(msg, strHost, response, CONNECTION_TIMEOUT);
287  if(ret != YARPRUN_OK)
288  {
289  strError = "cannot ask ";
290  strError += strHost;
291  strError += " to kill ";
292  strError += strCmd;
293  strError += yarprun_err_msg[ret];
295  strError += string(" due to " + __trace_message);
296  return false;
297  }
298 
299  double base = SystemClock::nowSystem();
300  while(!timeout(base, KILL_TIMEOUT))
301  {
302  if(running() == 0)
303  {
304  PeriodicThread::stop();
305  return true;
306  }
307  }
308 
309  strError = "cannot kill ";
310  strError += strCmd;
311  strError += " on ";
312  strError += strHost;
313  PeriodicThread::stop();
314  return false;
315 }
316 
317 
319 {
320  if(!bInitialized) return -1;
321  if(bOnlyConnector) return -1;
322 
323  strError.clear();
324  yarp::os::Bottle msg,grp,response;
325 
326  grp.clear();
327  grp.addString("on");
328  grp.addString(strHost.c_str());
329  msg.addList()=grp;
330 
331  grp.clear();
332  grp.addString("isrunning");
333  grp.addString(strTag.c_str());
334  msg.addList()=grp;
335 
336  int ret = SendMsg(msg, strHost, response, 3.0);
337  if(ret != YARPRUN_OK)
338  {
339  strError = "cannot ask ";
340  strError += strHost;
341  strError += " to check for status of ";
342  strError += strCmd;
343  strError += yarprun_err_msg[ret];
345  strError += string(" due to " + __trace_message);
346  return -1;
347  }
348  return ((response.get(0).asString() == "running")?1:0);
349 }
350 
351 
353 {
354  return true;
355 }
356 
358 {
359 }
360 
361 
362 Property& YarpBroker::runProperty()
363 {
364  command.clear();
365  string cmd = strCmd + string(" ") + strParam;
366  command.put("cmd", cmd);
367  command.put("on", strHost);
368  command.put("as", strTag);
369  if(!strWorkdir.empty())
370  command.put("workdir", strWorkdir);
371  if(!strStdio.empty())
372  command.put("stdio", strStdio);
373  if(!strEnv.empty())
374  command.put("env", strEnv);
375  //command.put("hold", "hold");
376  return command;
377 }
378 
379 
383 bool YarpBroker::connect(const char* from, const char* to,
384  const char* carrier, bool persist)
385 {
386  if(!from)
387  {
388  strError = "no source port is introduced.";
389  return false;
390  }
391 
392  if(!to)
393  {
394  strError = "no destination port is introduced.";
395  return false;
396  }
397 
398  ContactStyle style;
399  style.quiet = true;
400  style.timeout = CONNECTION_TIMEOUT;
401  style.carrier = carrier;
402 
403  if(!persist)
404  {
405  /*
406  * TODO: this check should be removed and
407  * the necessary modification should be done inside NetworkBase::isConnected!!!
408  */
409  string strCarrier = carrier;
410  bool needDisconnect = strCarrier.find("udp") == (size_t)0;
411  needDisconnect |= strCarrier.find("mcast") == (size_t)0;
412  if(needDisconnect == false) {
413  if(NetworkBase::isConnected(from, to, style))
414  return true;
415  }
416 
417  NetworkBase::connect(from, to, style);
418  if(!connected(from, to, carrier))
419  {
420  strError = "cannot connect ";
421  strError +=from;
422  strError += " to " + string(to);
423  return false;
424  }
425  }
426  else
427  {
428  string topic = string("topic:/") + string(from) + string(to);
429  NetworkBase::connect(from, topic, style);
430  NetworkBase::connect(topic, to, style);
431  if(!connected(from, to, carrier))
432  {
433  strError = "a persistent connection from ";
434  strError +=from;
435  strError += " to " + string(to);
436  strError += " is created but not connected.";
437  return false;
438  }
439 
440  }
441 
442  return true;
443 }
444 
445 bool YarpBroker::disconnect(const char* from, const char* to, const char* carrier)
446 {
447 
448  if(!from)
449  {
450  strError = "no source port is introduced.";
451  return false;
452  }
453 
454  if(!to)
455  {
456  strError = "no destination port is introduced.";
457  return false;
458  }
459 
460  /*
461  if(!exists(from))
462  {
463  strError = from;
464  strError += " does not exist.";
465  return true;
466  }
467 
468  if(!exists(to))
469  {
470  strError = to;
471  strError += " does not exist.";
472  return true;
473  }
474  */
475 
476  if(!connected(from, to, carrier))
477  return true;
478 
479  ContactStyle style;
480  style.quiet = true;
481  style.timeout = CONNECTION_TIMEOUT;
482  style.carrier = carrier;
483  if(!NetworkBase::disconnect(from, to, style))
484  {
485  strError = "cannot disconnect ";
486  strError +=from;
487  strError += " from " + string(to);
488  return false;
489  }
490  return true;
491 
492 }
493 
494 bool YarpBroker::exists(const char* szport)
495 {
496  ContactStyle style;
497  style.quiet = true;
498  style.timeout = CONNECTION_TIMEOUT;
499  return NetworkBase::exists(szport, style);
500 }
501 
502 const char* YarpBroker::requestRpc(const char* szport, const char* request, double timeout)
503 {
504  if((szport==nullptr) || (request==nullptr))
505  return nullptr;
506 
507  if(!exists(szport))
508  return nullptr;
509 
510  // opening the port
511  yarp::os::Port port;
512  port.setTimeout((float)((timeout>0.0) ? timeout : CONNECTION_TIMEOUT));
513  if(!port.open("..."))
514  return nullptr;
515 
516  ContactStyle style;
517  style.quiet = true;
518  style.timeout = (timeout>0.0) ? timeout : CONNECTION_TIMEOUT;
519  bool ret;
520  for(int i=0; i<10; i++) {
521  ret = NetworkBase::connect(port.getName(), szport, style);
522  if(ret) break;
523  SystemClock::delaySystem(1.0);
524  }
525 
526  if(!ret) {
527  port.close();
528  return nullptr;
529  }
530 
531  Bottle msg, response;
532  msg.fromString(request);
533  ret = port.write(msg, response);
534  NetworkBase::disconnect(port.getName(), szport);
535  if(!response.size() || !ret) {
536  port.close();
537  return nullptr;
538  }
539 
540  port.close();
541  return response.toString().c_str();
542 }
543 
544 bool YarpBroker::connected(const char* from, const char* to, const char* carrier)
545 {
546  if(!exists(from) || !exists(to))
547  return false;
548  ContactStyle style;
549  style.quiet = true;
550  style.timeout = CONNECTION_TIMEOUT;
551  style.carrier = carrier;
552  return NetworkBase::isConnected(from, to, style);
553 }
554 
555 bool YarpBroker::getSystemInfo(const char* server, SystemInfoSerializer& info)
556 {
557  if(!strlen(server))
558  return false;
559  if(!semParam.check())
560  return false;
561 
562  yarp::os::Port port;
563  // opening the port
565  if(!port.open("...")) {
566  __trace_message.clear();
567  semParam.post();
568  return false;
569  }
570 
571  yarp::os::Bottle msg, grp;
572  grp.clear();
573  grp.addString("sysinfo");
574  msg.addList() = grp;
575 
576  ContactStyle style;
577  style.quiet = true;
578  style.timeout = CONNECTION_TIMEOUT;
579  //style.carrier = carrier;
580 
581 
582  __trace_message = "(getSystemInfo) connecting to " + string(port.getName());
583  bool connected = yarp::os::NetworkBase::connect(port.getName(), server, style);
584  if(!connected)
585  {
586  port.close();
587  strError = string("Cannot connect to ") + string(server);
588  __trace_message.clear();
589  semParam.post();
590  return false;
591  }
592 
593  __trace_message = "(getSystemInfo) writing to " + string(port.getName());
594  bool ret = port.write(msg, info);
595  __trace_message = "(getSystemInfo) disconnecting from " + string(port.getName());
596  NetworkBase::disconnect(port.getName(), server);
597 
598  if(!ret)
599  {
600  port.close();
601  strError = string(server) + string(" does not respond");
602  __trace_message.clear();
603  semParam.post();
604  return false;
605  }
606 
607  port.close();
608  __trace_message.clear();
609  semParam.post();
610  return true;
611 }
612 
613 bool YarpBroker::getAllPorts(vector<string> &ports)
614 {
615  ContactStyle style;
616  style.quiet = true;
617  style.timeout = CONNECTION_TIMEOUT;
618  Bottle cmd, reply;
619  cmd.addString("list");
620 
621  bool ret = NetworkBase::writeToNameServer(cmd, reply, style);
622  if (!ret)
623  {
624  strError = "Failed to reach name server\n";
625  return false;
626  }
627 
628  if((reply.size()!=1) || (!reply.get(0).isString()))
629  return false;
630 
631  std::string str = reply.get(0).asString();
632  const char* delm = "registration name ";
633  size_t pos1, pos2;
634  while((pos1 = str.find(delm)) != std::string::npos)
635  {
636  str = str.substr(pos1+strlen(delm));
637  if((pos2 = str.find(' ')) != std::string::npos)
638  ports.push_back(str.substr(0, pos2));
639  }
640 
641  return true;
642 }
643 
644 bool YarpBroker::getAllProcesses(const char* server,
645  ProcessContainer& processes)
646 {
647  if(!strlen(server))
648  return false;
649 
650  processes.clear();
651  strError.clear();
652  yarp::os::Bottle msg,grp,response;
653 
654  grp.clear();
655  grp.addString("ps");
656  msg.addList()=grp;
657 
658  int ret = SendMsg(msg, server, response, 3.0);
659  if((ret == YARPRUN_OK) || (ret == YARPRUN_NORESPONSE))
660  {
661  for(size_t i=0; i<response.size(); i++)
662  {
663  Process proc;
664  std::string sprc;
665  if(response.get(i).check("pid"))
666  proc.pid = response.get(i).find("pid").asInt32();
667  if(response.get(i).check("cmd"))
668  sprc = response.get(i).find("cmd").asString();
669  if(response.get(i).check("env") &&
670  response.get(i).find("env").asString().length())
671  sprc.append("; ").append(response.get(i).find("env").asString());
672  proc.command = sprc;
673  processes.push_back(proc);
674  }
675  return true;
676  }
677 
678  strError = "cannot ask ";
679  strError += server;
680  strError += " to give the list of running processes.";
681  strError += yarprun_err_msg[ret];
683  strError += string(" due to " + __trace_message);
684  return false;
685 }
686 
687 
688 bool YarpBroker::rmconnect(const char* from, const char* to)
689 {
690  string topic = string(from) + string(to);
691  Bottle cmd, reply;
692  cmd.addString("untopic");
693  cmd.addString(topic.c_str());
694  return NetworkBase::write(NetworkBase::getNameServerContact(),
695  cmd,
696  reply,
697  false,
698  true,
700 }
701 
702 bool YarpBroker::setQos(const char* from, const char *to,
703  const char *qosFrom, const char *qosTo) {
704  strError.clear();
705 
706  if(qosFrom && qosTo && !strlen(qosFrom) && !strlen(qosTo))
707  return true;
708 
709  QosStyle styleFrom;
710  QosStyle styleTo;
711  if(qosFrom != nullptr && strlen(qosFrom)) {
712  if(!getQosFromString(qosFrom, styleFrom)) {
713  strError = "Error in parsing Qos properties of " + string(from);
714  return false;
715  }
716  }
717  if(qosTo != nullptr && strlen(qosTo))
718  if(!getQosFromString(qosTo, styleTo)) {
719  strError = "Error in parsing Qos properties of " + string(to);
720  return false;
721  }
722  return NetworkBase::setConnectionQos(from, to, styleFrom, styleTo, true);
723 }
724 
725 bool YarpBroker::getQosFromString(const char* qos, yarp::os::QosStyle& style) {
726  string strQos(qos);
727  transform(strQos.begin(), strQos.end(), strQos.begin(),
728  (int(*)(int))toupper);
729  strQos.erase( std::remove_if( strQos.begin(), strQos.end(), ::isspace ), strQos.end() );
730 
731  //level:high; priority:10; policy:1
732  stringstream ss(strQos); // Turn the string into a stream.
733  string prop;
734  while(getline(ss, prop, ';')) {
735  size_t p = prop.find(':');
736  if (p != prop.npos) {
737  string key = prop.substr(0, p);
738  string value = prop.substr(p+1);
739  if (key.length() > 0 && value.length() > 0) {
740  if (key == "LEVEL" || key=="DSCP" || key == "TOS") {
741  if(!style.setPacketPriority(prop))
742  return false;
743  }
744  else if (key == "PRIORITY") {
745  char* p;
746  int prio = strtol(value.c_str(), &p, 10);
747  style.setThreadPriority(prio);
748  }
749  else if (key == "POLICY") {
750  char* p;
751  int policy = strtol(value.c_str(), &p, 10);
752  style.setThreadPolicy(policy);
753  }
754  }
755  }
756  }
757  return true;
758 }
759 
760 const char* YarpBroker::error()
761 {
762  return strError.c_str();
763 }
764 
765 
766 bool YarpBroker::timeout(double base, double timeout)
767 {
768  SystemClock::delaySystem(1.0);
769  if((SystemClock::nowSystem()-base) > timeout)
770  return true;
771  return false;
772 }
773 
775 {
776  if(!strStdioUUID.size())
777  return false;
778 
779  string strStdioPort = strStdioUUID + "/stdout";
780  stdioPort.open("...");
781 
782  double base = SystemClock::nowSystem();
783  ContactStyle style;
784  style.quiet = true;
785  style.timeout = CONNECTION_TIMEOUT;
786  while(!timeout(base, 5.0))
787  {
788  if(NetworkBase::connect(strStdioPort, stdioPort.getName(), style))
789  return true;
790  }
791 
792  strError = "Cannot connect to stdio port ";
793  strError += strStdioPort;
794  stdioPort.close();
795  return false;
796 }
797 
798 
800 {
801  Bottle *input;
802  if( (input=stdioPort.read(false)) && eventSink)
803  {
804  for (size_t i=0; i<input->size(); i++)
805  eventSink->onBrokerStdout(input->get(i).asString().c_str());
806  }
807 }
808 
809 
811 {
812  NetworkBase::disconnect(stdioPort.getName(), strStdioUUID);
813  stdioPort.close();
814 }
815 
816 
817 int YarpBroker::SendMsg(Bottle& msg, std::string target, Bottle& response, float fTimeout)
818 {
819  if(!exists(target.c_str()))
820  return YARPRUN_NOCONNECTION;
821 
822  if(!semParam.check())
824 
825  // opening the port
826  yarp::os::Port port;
827  port.setTimeout(fTimeout);
828  if(!port.open("..."))
829  {
830  __trace_message.clear();
831  semParam.post();
833  }
834 
835  ContactStyle style;
836  style.quiet = true;
837  style.timeout = CONNECTION_TIMEOUT;
838 
839  bool ret;
840  __trace_message = "(SendMsg) connecting to " + string(target);
841  for(int i=0; i<10; i++)
842  {
843  ret = NetworkBase::connect(port.getName(), target, style);
844  if(ret) break;
845  SystemClock::delaySystem(1.0);
846  }
847 
848  if(!ret)
849  {
850  port.close();
851  __trace_message.clear();
852  semParam.post();
854  }
855 
856  __trace_message = "(SendMsg) writing to " + string(target);
857  ret = port.write(msg, response);
858  __trace_message = "(SendMsg) disconnecting from " + string(target);
859  NetworkBase::disconnect(port.getName(),target);
860  __trace_message.clear();
861  semParam.post();
862 
863  if(!response.size() || !ret) {
864  port.close();
865  return YARPRUN_NORESPONSE;
866  }
867 
868  port.close();
869 
870  return YARPRUN_OK;
871 }
872 
873 
874 int YarpBroker::requestServer(Property& config)
875 {
876  yarp::os::Bottle msg;
877 
878  // USE A YARP RUN SERVER TO MANAGE STDIO
879  //
880  // client -> stdio server -> cmd server
881  //
882  if (config.check("cmd") && config.check("stdio"))
883  {
884  if (config.find("stdio").asString()=="") {return YARPRUN_UNDEF; }
885  if (config.find("cmd").asString()=="") {return YARPRUN_UNDEF; }
886  if (!config.check("as") || config.find("as").asString()=="") { return YARPRUN_UNDEF; }
887  if (!config.check("on") || config.find("on").asString()=="") { return YARPRUN_UNDEF; }
888 
889  msg.addList()=config.findGroup("stdio");
890  msg.addList()=config.findGroup("cmd");
891  msg.addList()=config.findGroup("as");
892  msg.addList()=config.findGroup("on");
893 
894  if (config.check("workdir")) msg.addList()=config.findGroup("workdir");
895  if (config.check("geometry")) msg.addList()=config.findGroup("geometry");
896  if (config.check("hold")) msg.addList()=config.findGroup("hold");
897  if (config.check("env")) msg.addList()=config.findGroup("env");
898 
899  Bottle response;
900  int ret = SendMsg(msg, config.find("stdio").asString(),
901  response, CONNECTION_TIMEOUT);
902  if (ret != YARPRUN_OK)
903  return ret;
904 
905  if(response.size() > 2)
906  strStdioUUID = response.get(2).asString();
907 
908  return ((response.get(0).asInt32()>0)?YARPRUN_OK:YARPRUN_UNDEF);
909  }
910 
911  // DON'T USE A RUN SERVER TO MANAGE STDIO
912  //
913  // client -> cmd server
914  //
915  if (config.check("cmd"))
916  {
917  if (config.find("cmd").asString()=="") { return YARPRUN_UNDEF; }
918  if (!config.check("as") || config.find("as").asString()=="") {return YARPRUN_UNDEF; }
919  if (!config.check("on") || config.find("on").asString()=="") {return YARPRUN_UNDEF; }
920 
921  msg.addList()=config.findGroup("cmd");
922  msg.addList()=config.findGroup("as");
923 
924  if (config.check("workdir")) msg.addList()=config.findGroup("workdir");
925  if (config.check("env")) msg.addList()=config.findGroup("env");
926 
927  Bottle response;
928  int ret = SendMsg(msg, config.find("on").asString(),
929  response, CONNECTION_TIMEOUT);
930  if (ret != YARPRUN_OK)
931  return ret;
932 
933  return ((response.get(0).asInt32()>0)?YARPRUN_OK:YARPRUN_UNDEF);
934  }
935 
936  return YARPRUN_UNDEF;
937 }
yarp::os::Port::close
void close() override
Stop port activity.
Definition: Port.cpp:357
yarp::os::Bottle
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
yarp::os::Value::find
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Value.cpp:330
yarp::manager::YarpBroker::start
bool start() override
Definition: yarpbroker.cpp:181
yarp::os::Bottle::toString
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:214
yarp::os::Property::put
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
Definition: Property.cpp:998
yarp::os::Bottle::clear
void clear()
Empties the bottle of any objects it contains.
Definition: Bottle.cpp:124
CONNECTION_TIMEOUT
#define CONNECTION_TIMEOUT
Definition: yarpbroker.cpp:21
RUN_TIMEOUT
#define RUN_TIMEOUT
Definition: yarpbroker.cpp:22
yarp::os::ContactStyle
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:27
yarp::os::QosStyle
Preferences for the port's Quality of Service.
Definition: QosStyle.h:26
yarp::os::Bottle::size
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:254
yarp::manager::YarpBroker::running
int running() override
Definition: yarpbroker.cpp:318
yarp::manager::YarpBroker::rmconnect
bool rmconnect(const char *from, const char *to)
Definition: yarpbroker.cpp:688
yarp::manager::YarpBroker::stop
bool stop() override
Definition: yarpbroker.cpp:222
yarp::os::BufferedPort::read
T * read(bool shouldWait=true) override
Read an available object from the port.
Definition: BufferedPort-inl.h:154
STOP_TIMEOUT
#define STOP_TIMEOUT
Definition: yarpbroker.cpp:23
yarp::manager::YarpBroker::getSystemInfo
bool getSystemInfo(const char *server, yarp::os::SystemInfoSerializer &info)
Definition: yarpbroker.cpp:555
YARPRUN_SEMAPHORE_PARAM
#define YARPRUN_SEMAPHORE_PARAM
Definition: yarpbroker.cpp:18
yarp::manager::OSTRINGSTREAM
std::stringstream OSTRINGSTREAM
Definition: utility.h:52
yarp::manager::YarpBroker::threadRelease
void threadRelease() override
Release method.
Definition: yarpbroker.cpp:810
yarp::manager
Definition: application.h:24
YARPRUN_NOCONNECTION
#define YARPRUN_NOCONNECTION
Definition: yarpbroker.cpp:16
yarp::manager::BrokerEventSink::onBrokerStdout
virtual void onBrokerStdout(const char *msg)
Definition: broker.h:26
yarp::manager::YarpBroker::run
void run() override
Loop function.
Definition: yarpbroker.cpp:799
yarp::os::Bottle::fromString
void fromString(const std::string &text)
Initializes bottle from a string.
Definition: Bottle.cpp:207
yarp::os::Property::find
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Property.cpp:1034
yarp::os::Port::open
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
Definition: Port.cpp:82
ret
bool ret
Definition: ImplementAxisInfo.cpp:72
yarp::manager::YarpBroker::disconnect
bool disconnect(const char *from, const char *to, const char *carrier) override
Definition: yarpbroker.cpp:445
yarp::os::Semaphore::wait
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:99
yarp::manager::YarpBroker::kill
bool kill() override
Definition: yarpbroker.cpp:269
yarp::os::QosStyle::setPacketPriority
bool setPacketPriority(const std::string &priority)
sets the packet priority from a string.
Definition: QosStyle.cpp:42
yarprun_err_msg
const char * yarprun_err_msg[]
Definition: yarpbroker.cpp:31
EVENT_THREAD_PERIOD
#define EVENT_THREAD_PERIOD
Definition: yarpbroker.cpp:25
YARPRUN_OK
#define YARPRUN_OK
Definition: yarpbroker.cpp:14
yarp::os::Port
A mini-server for network communication.
Definition: Port.h:50
yarp::os::Value::check
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Value.cpp:324
yarp::manager::YarpBroker::setQos
bool setQos(const char *from, const char *to, const char *qosFrom, const char *qosTo)
Definition: yarpbroker.cpp:702
yarp::manager::YarpBroker::detachStdout
void detachStdout() override
Definition: yarpbroker.cpp:357
yarp::os::Value::isString
virtual bool isString() const
Checks if value is a string.
Definition: Value.cpp:159
yarp::manager::YarpBroker::fini
void fini() override
Definition: yarpbroker.cpp:57
yarp::os::Semaphore::post
void post()
Increment the counter.
Definition: Semaphore.cpp:114
yarp::os::Semaphore::check
bool check()
Decrement the counter, unless that would require waiting.
Definition: Semaphore.cpp:109
yarp::manager::YarpBroker::getAllPorts
bool getAllPorts(std::vector< std::string > &stingList)
Definition: yarpbroker.cpp:613
yarp::os::Bottle::get
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:249
yarp::os::ContactStyle::quiet
bool quiet
Suppress all outputs and warnings.
Definition: ContactStyle.h:39
yarp::os::Bottle::addList
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition: Bottle.cpp:185
yarp::manager::ProcessContainer
std::vector< Process > ProcessContainer
Definition: primresource.h:152
yarp::manager::YarpBroker::threadInit
bool threadInit() override
Initialization method.
Definition: yarpbroker.cpp:774
yarp::manager::YarpBroker::connected
bool connected(const char *from, const char *to, const char *carrier) override
Definition: yarpbroker.cpp:544
yarp::os::Value::asString
virtual std::string asString() const
Get string value.
Definition: Value.cpp:237
yarp::os::NetworkBase::connect
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
Definition: Network.cpp:685
yarp::os::BufferedPort::open
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
Definition: BufferedPort-inl.h:41
yarp::os::Bottle::addInt32
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition: Bottle.cpp:143
yarp::os::QosStyle::setThreadPriority
void setThreadPriority(int priority)
sets the communication thread priority level
Definition: QosStyle.h:130
yarp::manager::Broker::generateID
unsigned int generateID()
Definition: broker.cpp:29
yarp::os::Port::setTimeout
bool setTimeout(float timeout)
Set a timeout on network operations.
Definition: Port.cpp:628
yarp::manager::Broker::eventSink
BrokerEventSink * eventSink
Definition: broker.h:72
yarp::os::Property::check
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1024
yarp::os::Contactable::getName
virtual std::string getName() const
Get name of port.
Definition: Contactable.cpp:17
yarp::os::BufferedPort::getName
std::string getName() const override
Get name of port.
Definition: BufferedPort-inl.h:108
yarp::os::Bottle::addString
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:173
yarp::manager::YarpBroker::attachStdout
bool attachStdout() override
Definition: yarpbroker.cpp:352
yarp::manager::YarpBroker::connect
bool connect(const char *from, const char *to, const char *carrier, bool persist=false) override
connection broker
Definition: yarpbroker.cpp:383
yarp::os::PeriodicThread
An abstraction for a periodic thread.
Definition: PeriodicThread.h:25
yarpbroker.h
yarp::manager::_Process
Class Computer.
Definition: primresource.h:147
yarp::os::Property::clear
void clear()
Remove all associations.
Definition: Property.cpp:1040
yarp::os::QosStyle::setThreadPolicy
void setThreadPolicy(int policy)
sets the communication thread scheduling policy
Definition: QosStyle.h:140
yarp::os::Value::asInt32
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition: Value.cpp:207
yarp::os::Port::write
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition: Port.cpp:430
yarp::os::SystemInfoSerializer
A helper class to pass the SystemInfo object around the YARP network.
Definition: SystemInfoSerializer.h:24
yarp::manager::YarpBroker::error
const char * error() override
Definition: yarpbroker.cpp:760
yarp::os::ContactStyle::carrier
std::string carrier
Request that communication be made using a particular carrier.
Definition: ContactStyle.h:56
yarp::manager::YarpBroker::getAllProcesses
bool getAllProcesses(const char *server, ProcessContainer &processes)
Definition: yarpbroker.cpp:644
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yarp::manager::YarpBroker::init
bool init() override
Definition: yarpbroker.cpp:64
YARPRUN_NORESPONSE
#define YARPRUN_NORESPONSE
Definition: yarpbroker.cpp:15
yarp::os::BufferedPort::close
void close() override
Stop port activity.
Definition: BufferedPort-inl.h:73
YARPRUN_CONNECTION_TIMOUT
#define YARPRUN_CONNECTION_TIMOUT
Definition: yarpbroker.cpp:17
yarp::os::Property::findGroup
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition: Property.cpp:1125
yarp::os::ContactStyle::timeout
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
Definition: ContactStyle.h:50
yarp::manager::YarpBroker::requestRpc
const char * requestRpc(const char *szport, const char *request, double timeout) override
Definition: yarpbroker.cpp:502
yarp::manager::YarpBroker::~YarpBroker
~YarpBroker() override
Definition: yarpbroker.cpp:52
yarp::sig::file::write
bool write(const ImageOf< PixelRgb > &src, const std::string &dest, image_fileformat format=FORMAT_PPM)
Definition: ImageFile.cpp:971
yarp::manager::_Process::pid
int pid
Definition: primresource.h:149
yarp::manager::_Process::command
std::string command
Definition: primresource.h:148
yarp::os::impl
The components from which ports and connections are built.
YARPRUN_UNDEF
#define YARPRUN_UNDEF
Definition: yarpbroker.cpp:19
yarp::manager::YarpBroker::exists
bool exists(const char *port) override
Definition: yarpbroker.cpp:494
yarp::os::Property
A class for storing options and configuration information.
Definition: Property.h:37
KILL_TIMEOUT
#define KILL_TIMEOUT
Definition: yarpbroker.cpp:24