YARP
Yet Another Robot Platform
localbroker.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * All rights reserved.
4  *
5  * This software may be modified and distributed under the terms of the
6  * BSD-3-Clause license. See the accompanying LICENSE file for details.
7  */
8 
11 
12 #include <csignal>
13 #include <cstring>
14 
15 #define RUN_TIMEOUT 10.0 //seconds
16 #define STOP_TIMEOUT 15.0
17 #define KILL_TIMEOUT 10.0
18 #define CONNECTION_TIMEOUT 2.0
19 
20 #define WRITE_TO_PIPE 1
21 #define READ_FROM_PIPE 0
22 
23 #if defined(_WIN32)
24  #include<Windows.h>
25  #define SIGKILL 9
26 #else
27  #include <cstdlib>
28  #include <sys/types.h>
29  #include <sys/stat.h>
30  #include <fcntl.h>
31  #include <cerrno>
32  #include <unistd.h>
33  #include <cstring>
34 
35  #define PIPE_TIMEOUT 0
36  #define PIPE_EVENT 1
37  #define PIPE_SIGNALED 2
38  #define C_MAXARGS 128 // max number of the command parameters
39 #endif
40 
41 using namespace yarp::os;
42 using namespace yarp::manager;
43 using namespace std;
44 
45 
46 #if defined(_WIN32)
47 class LocalTerminateParams
48 {
49 public:
50  LocalTerminateParams(DWORD id) {
51  nWin = 0;
52  dwID = id;
53  }
54 
55  ~LocalTerminateParams(){}
56  int nWin;
57  DWORD dwID;
58 };
59 
60 BOOL CALLBACK LocalTerminateAppEnum(HWND hwnd, LPARAM lParam)
61 {
62  LocalTerminateParams* params=(LocalTerminateParams*)lParam;
63  DWORD dwID;
64  GetWindowThreadProcessId(hwnd, &dwID);
65  if (dwID==params->dwID)
66  {
67  params->nWin++;
68  PostMessage(hwnd,WM_CLOSE,0,0);
69  }
70  return TRUE ;
71 }
72 #if defined(_WIN64)
73 volatile LONGLONG uniquePipeNumber = 0;
74 #else
75 volatile LONG uniquePipeNumber = 0;
76 #endif
77 
78 /*
79 * TODO: check deeply for asyn PIPE
80 */
81 BOOL CreatePipeAsync(
82  OUT LPHANDLE lpReadPipe,
83  OUT LPHANDLE lpWritePipe,
84  IN LPSECURITY_ATTRIBUTES lpPipeAttributes,
85  IN DWORD nSize)
86 {
87  HANDLE ReadPipeHandle, WritePipeHandle;
88  DWORD dwError;
89  char PipeNameBuffer[MAX_PATH];
90  nSize = (nSize ==0) ? 100*8096: nSize;
91 
92 #if defined(_WIN64)
93  InterlockedIncrement64(&uniquePipeNumber);
94 #else
95  InterlockedIncrement(&uniquePipeNumber);
96 #endif
97 
98  sprintf( PipeNameBuffer,
99  "\\\\.\\Pipe\\RemoteExeAnon.%08x.%08x",
100  GetCurrentProcessId(),
101  uniquePipeNumber
102  );
103 
104  ReadPipeHandle = CreateNamedPipeA(
105  (LPSTR)PipeNameBuffer,
106  PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
107  PIPE_TYPE_BYTE | PIPE_WAIT, //PIPE_NOWAIT,
108  1, // Number of pipes
109  nSize, // Out buffer size
110  nSize, // In buffer size
111  120 * 1000, // Timeout in ms
112  lpPipeAttributes
113  );
114 
115  if (! ReadPipeHandle) {
116  return FALSE;
117  }
118 
119  WritePipeHandle = CreateFileA(
120  (LPSTR)PipeNameBuffer,
121  GENERIC_WRITE,
122  0, // No sharing
123  lpPipeAttributes,
124  OPEN_EXISTING,
125  FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
126  nullptr // Template file
127  );
128 
129  if (INVALID_HANDLE_VALUE == WritePipeHandle)
130  {
131  dwError = GetLastError();
132  CloseHandle( ReadPipeHandle );
133  SetLastError(dwError);
134  return FALSE;
135  }
136 
137  *lpReadPipe = ReadPipeHandle;
138  *lpWritePipe = WritePipeHandle;
139  return( TRUE );
140 }
141 
142 #endif
143 
144 LocalBroker::LocalBroker()
145 {
146  bOnlyConnector = bInitialized = false;
147  ID = 0;
148  fd_stdout = nullptr;
149  setWindowMode(WINDOW_HIDDEN);
150 }
151 
152 
153 LocalBroker::~LocalBroker()
154 {
155  fini();
156 }
157 
158 void LocalBroker::fini()
159 {
160  if(Thread::isRunning())
161  Thread::stop();
162 }
163 
164 bool LocalBroker::init()
165 {
166  /*
167  if(!NetworkBase::checkNetwork(5.0))
168  {
169  strError = "Yarp network server is not up.";
170  return false;
171  }
172  */
173  bInitialized = true;
174  bOnlyConnector = true;
175  return true;
176 }
177 
178 bool LocalBroker::init(const char* szcmd, const char* szparam,
179  const char* szhost, const char* szstdio,
180  const char* szworkdir, const char* szenv )
181 {
182 
183  strCmd.clear();
184  strParam.clear();
185  strHost.clear();
186  strStdio.clear();
187  strWorkdir.clear();
188  strTag.clear();
189  strEnv.clear();
190 
191  if(!szcmd)
192  {
193  strError = "command is not specified.";
194  return false;
195  }
196  strCmd = szcmd;
197  if(szparam && strlen(szparam))
198  strParam = szparam;
199 
200  if(szhost && strlen(szhost))
201  strHost = szhost;
202  if(szworkdir && strlen(szworkdir))
203  strWorkdir = szworkdir;
204 
205  if(szstdio && strlen(szstdio))
206  {
207  if(szstdio[0] != '/')
208  strStdio = string("/") + string(szstdio);
209  else
210  strStdio = szstdio;
211  }
212 
213  if(szenv && strlen(szenv))
214  strEnv = szenv;
215 
216  /*
217  OSTRINGSTREAM sstrID;
218  sstrID<<ID;
219  strTag = strHost + strCmd + sstrID.str();
220 
221  if(!NetworkBase::checkNetwork(5.0))
222  {
223  strError = "Yarp network server is not up.";
224  semParam.post();
225  return false;
226  }
227  */
228 
229 #if defined(_WIN32)
230  // do nothing
231  bInitialized = true;
232  return true;
233 #else
234  /* avoiding zombie */
235  struct sigaction new_action;
236  new_action.sa_handler = SIG_IGN;
237  sigemptyset (&new_action.sa_mask);
238  new_action.sa_flags = 0;
239  sigaction (SIGCHLD, &new_action, nullptr);
240  bInitialized = true;
241  return true;
242 #endif
243 
244 }
245 
246 
247 bool LocalBroker::start()
248 {
249  if(!bInitialized) return false;
250  if(bOnlyConnector) return false;
251 
252  if(running())
253  return true;
254 
255  strError.clear();
256  ID = ExecuteCmd();
257  if(!ID)
258  return false;
259 
260  if(running())
261  {
262  return true;
263  }
264  return false;
265 }
266 
267 bool LocalBroker::stop()
268 {
269  if(!bInitialized) return true;
270  if(bOnlyConnector) return false;
271 
272  strError.clear();
273 #if defined(_WIN32)
274  stopCmd(ID);
275  stopStdout();
276 #else
277  stopStdout();
278  stopCmd(ID);
279 #endif
280 
281  double base = SystemClock::nowSystem();
282  while(!timeout(base, STOP_TIMEOUT))
283  {
284  if(!running())
285  return true;
286  }
287 
288  strError = "Timeout! cannot stop ";
289  strError += strCmd;
290  strError += " on ";
291  strError += strHost;
292  return false;
293 }
294 
295 bool LocalBroker::kill()
296 {
297  if(!bInitialized) return true;
298  if(bOnlyConnector) return false;
299 
300  strError.clear();
301 
302 #if defined(_WIN32)
303  killCmd(ID);
304  stopStdout();
305 #else
306  stopStdout();
307  stopCmd(ID);
308 #endif
309 
310  double base = SystemClock::nowSystem();
311  while(!timeout(base, KILL_TIMEOUT))
312  {
313  if(!running())
314  return true;
315  }
316 
317  strError = "Timeout! cannot kill ";
318  strError += strCmd;
319  strError += " on ";
320  strError += strHost;
321  return false;
322 }
323 
324 
325 int LocalBroker::running()
326 {
327  if(!bInitialized) return 0;
328  if(bOnlyConnector) return 0;
329  return (psCmd(ID))?1:0;
330 }
331 
332 
336 bool LocalBroker::connect(const char* from, const char* to,
337  const char* carrier, bool persist)
338 {
339 
340  if(!from)
341  {
342  strError = "no source port is introduced.";
343  return false;
344  }
345 
346  if(!to)
347  {
348  strError = "no destination port is introduced.";
349  return false;
350  }
351 
352  if(!exists(from))
353  {
354  strError = from;
355  strError += " does not exist.";
356  return false;
357  }
358 
359  if(!exists(to))
360  {
361  strError = to;
362  strError += " does not exist.";
363  return false;
364  }
365 
366  if(!NetworkBase::connect(from, to, carrier) || !connected(from, to, carrier))
367  {
368  strError = "cannot connect ";
369  strError +=from;
370  strError += " to " + string(to);
371  return false;
372  }
373  return true;
374 }
375 
376 bool LocalBroker::disconnect(const char* from, const char* to, const char *carrier)
377 {
378 
379  if(!from)
380  {
381  strError = "no source port is introduced.";
382  return false;
383  }
384 
385  if(!to)
386  {
387  strError = "no destination port is introduced.";
388  return false;
389  }
390 
391  if(!exists(from))
392  {
393  strError = from;
394  strError += " does not exist.";
395  return true;
396  }
397 
398  if(!exists(to))
399  {
400  strError = to;
401  strError += " does not exist.";
402  return true;
403  }
404 
405  if(!connected(from, to, carrier))
406  return true;
407 
408  if(!NetworkBase::disconnect(from, to))
409  {
410  strError = "cannot disconnect ";
411  strError +=from;
412  strError += " from " + string(to);
413  return false;
414  }
415  return true;
416 
417 }
418 
419 bool LocalBroker::exists(const char* port)
420 {
421  return NetworkBase::exists(port);
422 }
423 
424 
425 const char* LocalBroker::requestRpc(const char* szport, const char* request, double timeout)
426 {
427  if((szport==nullptr) || (request==nullptr))
428  return nullptr;
429 
430  if(!exists(szport))
431  return nullptr;
432 
433  // opening the port
434  yarp::os::Port port;
435  port.setTimeout((float)((timeout>0.0) ? timeout : CONNECTION_TIMEOUT));
436  if(!port.open("..."))
437  return nullptr;
438 
439  ContactStyle style;
440  style.quiet = true;
441  style.timeout = (timeout>0.0) ? timeout : CONNECTION_TIMEOUT;
442  bool ret;
443  for(int i=0; i<10; i++) {
444  ret = NetworkBase::connect(port.getName(), szport, style);
445  if(ret) break;
447  }
448 
449  if(!ret) {
450  port.close();
451  return nullptr;
452  }
453 
454  Bottle msg, response;
455  msg.fromString(request);
456  ret = port.write(msg, response);
457  NetworkBase::disconnect(port.getName(), szport);
458  if(!response.size() || !ret) {
459  port.close();
460  return nullptr;
461  }
462 
463  port.close();
464  return response.toString().c_str();
465 }
466 
467 bool LocalBroker::connected(const char* from, const char* to, const char* carrier)
468 {
469  if(!exists(from) || !exists(to))
470  return false;
471  return NetworkBase::isConnected(from, to);
472 }
473 
474 
475 const char* LocalBroker::error()
476 {
477  return strError.c_str();
478 }
479 
480 bool LocalBroker::attachStdout()
481 {
482  if(Thread::isRunning())
483  return true;
484  if(!running())
485  {
486  strError = "Module is not running";
487  return false;
488  }
489  return startStdout();
490 }
491 
492 void LocalBroker::detachStdout()
493 {
494  stopStdout();
495 }
496 
497 
498 bool LocalBroker::timeout(double base, double timeout)
499 {
501  if((SystemClock::nowSystem()-base) > timeout)
502  return true;
503  return false;
504 }
505 
506 bool LocalBroker::threadInit()
507 {
508  return true;
509 }
510 
511 
512 void LocalBroker::run()
513 {
514 
515 #if defined(_WIN32)
516  //windows implementation
517  DWORD dwRead;
518  CHAR buff[1024];
519  while(!Thread::isStopping())
520  {
521  BOOL bRet = ReadFile(read_from_pipe_cmd_to_stdout,
522  buff, 1023, &dwRead, nullptr);
523  if(!bRet)
524  break;
525  buff[dwRead] = (CHAR)0;
526  if(eventSink && strlen(buff))
527  eventSink->onBrokerStdout(buff);
528  yarp::os::SystemClock::delaySystem(0.5); // this prevents event flooding
529  }
530 #else
531  while(!Thread::isStopping())
532  {
533  if(waitPipeSignal(pipe_to_stdout[READ_FROM_PIPE]) == PIPE_EVENT)
534  {
535  if(fd_stdout)
536  {
537  string strmsg;
538  char buff[1024];
539  while(fgets(buff, 1024, fd_stdout))
540  strmsg += string(buff);
541  if(eventSink && strmsg.size())
542  eventSink->onBrokerStdout(strmsg.c_str());
543  yarp::os::SystemClock::delaySystem(0.5); // this prevents event flooding
544  }
545  }
546  }
547 #endif
548 }
549 
550 
551 void LocalBroker::threadRelease()
552 {
553 }
554 
555 
556 void LocalBroker::setWindowMode(WindowMode m)
557 {
558  windowMode=m;
559 }
560 
561 
562 #if defined(_WIN32)
563 
564 string LocalBroker::lastError2String()
565 {
566  int error=GetLastError();
567  char buff[1024];
568  FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,nullptr,error,0,buff,1024,nullptr);
569  return string(buff);
570 }
571 
572 bool LocalBroker::startStdout()
573 {
574  if (!CloseHandle(write_to_pipe_cmd_to_stdout))
575  return false;
576  Thread::start();
577  return true;
578 }
579 
580 void LocalBroker::stopStdout()
581 {
582  Thread::stop();
583 }
584 
585 int LocalBroker::ExecuteCmd()
586 {
587  string strCmdLine = strCmd + string(" ") + strParam;
588 
589  PROCESS_INFORMATION cmd_process_info;
590  STARTUPINFO cmd_startup_info;
591  ZeroMemory(&cmd_process_info,sizeof(PROCESS_INFORMATION));
592  ZeroMemory(&cmd_startup_info,sizeof(STARTUPINFO));
593  cmd_startup_info.cb = sizeof(STARTUPINFO);
594 
595 
596  string strDisplay=getDisplay();
597 
598  DWORD dwCreationFlags;
599 
600  //these come from xml
601  /*
602  // These are not supported until we find a way to send break signals to
603  // consoles that are not inherited
604  if (strDisplay=="--visible_na")
605  windowMode=WINDOW_VISIBLE;
606  if (strDisplay=="--hidden")
607  windowMode=WINDOW_HIDDEN;
608  */
609 
610  // this is for "attach to stoud only"
611  if (windowMode==WINDOW_VISIBLE)
612  {
613  //this is common to all processes
614  cmd_startup_info.dwFlags |= STARTF_USESHOWWINDOW;
615  cmd_startup_info.wShowWindow = SW_SHOWNA;
616  dwCreationFlags=CREATE_NEW_CONSOLE;
617  }
618  if (windowMode==WINDOW_HIDDEN)
619  {
620  // Setting up child process and pipe for stdout (useful for attaching stdout)
621  SECURITY_ATTRIBUTES pipe_sec_attr;
622  pipe_sec_attr.nLength = sizeof(SECURITY_ATTRIBUTES);
623  pipe_sec_attr.bInheritHandle = TRUE;
624  pipe_sec_attr.lpSecurityDescriptor = nullptr;
625  CreatePipeAsync(&read_from_pipe_cmd_to_stdout,
626  &write_to_pipe_cmd_to_stdout,
627  &pipe_sec_attr, 0);
628 
629  cmd_startup_info.hStdError = write_to_pipe_cmd_to_stdout;
630  cmd_startup_info.hStdOutput = write_to_pipe_cmd_to_stdout;
631 
632  cmd_startup_info.dwFlags |= STARTF_USESTDHANDLES;
633 
634  dwCreationFlags=CREATE_NEW_PROCESS_GROUP; //CREATE_NEW_CONSOLE|CREATE_NEW_PROCESS_GROUP,
635  }
636 
637 
638 
639  /*
640  * setting environment variable for child process
641  */
642  TCHAR chNewEnv[32767];
643 
644  // Get a pointer to the env block.
645  LPTCH chOldEnv = GetEnvironmentStrings();
646 
647  // copying parent env variables
648  LPTSTR lpOld = (LPTSTR) chOldEnv;
649  LPTSTR lpNew = (LPTSTR) chNewEnv;
650  while (*lpOld)
651  {
652  lstrcpy(lpNew, lpOld);
653  lpOld += lstrlen(lpOld) + 1;
654  lpNew += lstrlen(lpNew) + 1;
655  }
656 
657  // adding new env variables
658  std::string cstrEnvName;
659  if(strEnv.size())
660  {
661  yarp::os::impl::SplitString ss(strEnv.c_str(), ';');
662  for(int i=0; i<ss.size(); i++) {
663  lstrcpy(lpNew, (LPTCH) ss.get(i));
664  lpNew += lstrlen(lpNew) + 1;
665  }
666  }
667 
668  // closing env block
669  *lpNew = (TCHAR)0;
670 
671  bool bWorkdir=(strWorkdir.size()) ? true : false;
672  string strWorkdirOk = bWorkdir ? strWorkdir+string("\\") : "";
673 
674  BOOL bSuccess=CreateProcess(nullptr, // command name
675  (char*)(strWorkdirOk+strCmdLine).c_str(), // command line
676  nullptr, // process security attributes
677  nullptr, // primary thread security attributes
678  TRUE, // handles are inherited
679  dwCreationFlags,
680  (LPVOID) chNewEnv, // use new environment
681  bWorkdir?strWorkdirOk.c_str():nullptr, // working directory
682  &cmd_startup_info, // STARTUPINFO pointer
683  &cmd_process_info); // receives PROCESS_INFORMATION
684 
685  if (!bSuccess && bWorkdir)
686  {
687  bSuccess=CreateProcess(nullptr, // command name
688  (char*)(strCmdLine.c_str()), // command line
689  nullptr, // process security attributes
690  nullptr, // primary thread security attributes
691  TRUE, // handles are inherited
692  dwCreationFlags,
693  (LPVOID) chNewEnv, // use new environment
694  strWorkdirOk.c_str(), // working directory
695  &cmd_startup_info, // STARTUPINFO pointer
696  &cmd_process_info); // receives PROCESS_INFORMATION
697  }
698 
699  // deleting old environment variable
700  FreeEnvironmentStrings(chOldEnv);
701 
702  CloseHandle(cmd_process_info.hProcess);
703  CloseHandle(cmd_process_info.hThread);
704 
705  if (!bSuccess)
706  {
707  strError = string("Can't execute command because ") + lastError2String();
708  return 0;
709  }
710 
711  return cmd_process_info.dwProcessId;
712 }
713 
714 bool LocalBroker::psCmd(int pid)
715 {
716  HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_QUERY_INFORMATION, FALSE, pid);
717  if (hProc==nullptr)
718  return false;
719 
720  DWORD status;
721  GetExitCodeProcess(hProc , &status);
722  CloseHandle(hProc);
723  return (status==STILL_ACTIVE);
724 }
725 
726 bool LocalBroker::killCmd(int pid)
727 {
728  HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, pid);
729  if (hProc==nullptr)
730  return false;
731 
732  BOOL bRet = TerminateProcess(hProc, 0);
733  CloseHandle(hProc);
734  return bRet ? true : false;
735 }
736 
737 bool LocalBroker::stopCmd(int pid)
738 {
739  HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, pid);
740  if (hProc==nullptr)
741  return false;
742 
743  LocalTerminateParams params(pid);
744  EnumWindows((WNDENUMPROC)LocalTerminateAppEnum,(LPARAM)&params);
745 
746  // I believe we do not need this. It is ignored by console applications created with CREATE_NEW_PROCESS_GROUP
747  GenerateConsoleCtrlEvent(CTRL_C_EVENT, pid);
748 
749  //send BREAK_EVENT because we created the process with CREATE_NEW_PROCESS_GROUP
750  GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid);
751 
752  CloseHandle(hProc);
753  return true;
754 }
755 
756 #else //for UNIX
757 
758 bool LocalBroker::psCmd(int pid)
759 {
760  if(!pid)
761  return false;
762  return !::kill(pid, 0);
763 }
764 
765 
766 bool LocalBroker::killCmd(int pid)
767 {
768  if(!pid)
769  return false;
770  return !::kill(pid, SIGKILL);
771 }
772 
773 
774 bool LocalBroker::stopCmd(int pid)
775 {
776  if(!pid)
777  return false;
778  return !::kill(pid, SIGTERM);
779 }
780 
781 int LocalBroker::waitPipe(int pipe_fd)
782 {
783  struct timeval timeout;
784  int rc;
785  fd_set fd;
786 
787  timeout.tv_sec = 0;
788  timeout.tv_usec = 500000;
789 
790  FD_ZERO(&fd);
791  FD_SET(pipe_fd, &fd);
792  rc = select(pipe_fd + 1, &fd, nullptr, nullptr, &timeout);
793  return rc;
794 }
795 
796 
797 int LocalBroker::waitPipeSignal(int pipe_fd)
798 {
799  struct timespec timeout;
800  fd_set fd;
801 
802  timeout.tv_sec = 2;
803  timeout.tv_nsec = 0;
804  FD_ZERO(&fd);
805  FD_SET(pipe_fd, &fd);
806 
807  /*
808 #if (_POSIX_C_SOURCE >= 200112L) || (_XOPEN_SOURCE >= 600)
809  struct sigaction new_action;
810  new_action.sa_handler = SIG_IGN;
811  sigemptyset (&new_action.sa_mask);
812  new_action.sa_flags = 0;
813  sigaction (SIGUSR1, &new_action, nullptr);
814  sigset_t sset, orgmask;
815  sigemptyset(&sset);
816  sigaddset(&sset, SIGUSR1);
817  pthread_sigmask(SIG_BLOCK, &sset, &orgmask);
818  if(pselect(pipe_fd + 1, &fd, nullptr, nullptr, &timeout, &orgmask))
819  return PIPE_EVENT;
820 #endif
821 */
822  if(pselect(pipe_fd + 1, &fd, nullptr, nullptr, &timeout, nullptr))
823  return PIPE_EVENT;
824  return PIPE_TIMEOUT;
825 }
826 
827 
828 bool LocalBroker::startStdout()
829 {
830  fd_stdout = fdopen(pipe_to_stdout[READ_FROM_PIPE], "r");
831  if(!fd_stdout)
832  {
833  strError = "cannot open pipe. " + string(strerror(errno));
834  //close(pipe_to_stdout[READ_FROM_PIPE]);
835  return false;
836  }
837 
838  int oflags = fcntl(pipe_to_stdout[READ_FROM_PIPE], F_GETFL);
839  if(fcntl(pipe_to_stdout[READ_FROM_PIPE], F_SETFL, oflags|O_NONBLOCK) == -1)
840  {
841  strError = "cannot set flag on pipe: " + string(strerror(errno));
842  //close(pipe_to_stdout[READ_FROM_PIPE]);
843  return false;
844  }
845  Thread::start();
846  return true;
847 }
848 
849 void LocalBroker::stopStdout()
850 {
851  Thread::stop();
852  if(fd_stdout)
853  fclose(fd_stdout);
854  fd_stdout = nullptr;
855 }
856 
857 
858 
859 int LocalBroker::ExecuteCmd()
860 {
861  int pipe_child_to_parent[2];
862  int ret = pipe(pipe_child_to_parent);
863  if (ret!=0)
864  {
865  strError = string("Can't create child pipe because") + string(strerror(errno));
866  return 0;
867  }
868 
869  ret = pipe(pipe_to_stdout);
870  if (ret!=0)
871  {
872  strError = string("Can't create stdout pipe because") + string(strerror(errno));
873  return 0;
874  }
875 
876  int pid_cmd = fork();
877 
878  if(IS_INVALID(pid_cmd))
879  {
880  strError = string("Can't fork command because ") + string(strerror(errno));
881  return 0;
882  }
883 
884  if (IS_NEW_PROCESS(pid_cmd)) // RUN COMMAND HERE
885  {
886  close(pipe_child_to_parent[READ_FROM_PIPE]);
887  //int saved_stderr = dup(STDERR_FILENO);
888  dup2(pipe_to_stdout[WRITE_TO_PIPE], STDOUT_FILENO);
889  dup2(pipe_to_stdout[WRITE_TO_PIPE], STDERR_FILENO);
890  if (fcntl(STDOUT_FILENO, F_SETFL, fcntl(STDOUT_FILENO, F_GETFL) | O_NONBLOCK) == -1) {
891  strError = string("Can't set flag on stdout: ") + string(strerror(errno));
892  return 0;
893  }
894  if (fcntl(STDERR_FILENO, F_SETFL, fcntl(STDERR_FILENO, F_GETFL) | O_NONBLOCK) == -1) {
895  strError = string("Can't set flag on stderr: ") + string(strerror(errno));
896  return 0;
897  }
898 
899  close(pipe_to_stdout[WRITE_TO_PIPE]);
900  close(pipe_to_stdout[READ_FROM_PIPE]);
901 
902  strCmd = strCmd + string(" ") + strParam;
903  char *szcmd = new char[strCmd.size()+1];
904  strcpy(szcmd,strCmd.c_str());
905  int nargs = 0;
906  char **szarg = new char*[C_MAXARGS + 1];
907  parseArguments(szcmd, &nargs, szarg);
908  szarg[nargs]=nullptr;
909  if(strEnv.size())
910  {
911  yarp::os::impl::SplitString ss(strEnv.c_str(), ';');
912  for(int i=0; i<ss.size(); i++) {
913  char* szenv = new char[strlen(ss.get(i))+1];
914  strcpy(szenv,ss.get(i));
915  putenv(szenv);
916  }
917  //delete szenv;
918  }
919 
920  if(strWorkdir.size())
921  {
922  int ret = chdir(strWorkdir.c_str());
923  if (ret!=0)
924  {
925  strError = string("Can't set working directory because ") + string(strerror(errno));
926  FILE* out_to_parent = fdopen(pipe_child_to_parent[WRITE_TO_PIPE],"w");
927  fprintf(out_to_parent,"%s", strError.c_str());
928  fflush(out_to_parent);
929  fclose(out_to_parent);
930  close(pipe_child_to_parent[WRITE_TO_PIPE]);
931  delete [] szcmd;
932  delete [] szarg;
933  std::exit(ret);
934  }
935  }
936 
937  char currWorkDirBuff[1024];
938  char *currWorkDir = getcwd(currWorkDirBuff,1024);
939 
940  ret = 0;
941  if (currWorkDir)
942  {
943  char **cwd_szarg=new char*[nargs+1];
944  for (int i=1; i<nargs; ++i) cwd_szarg[i]=szarg[i];
945  cwd_szarg[nargs]=nullptr;
946  cwd_szarg[0]=new char[strlen(currWorkDir)+strlen(szarg[0])+16];
947 
948  strcpy(cwd_szarg[0],currWorkDir);
949  strcat(cwd_szarg[0],"/");
950  strcat(cwd_szarg[0],szarg[0]);
951  ret=execvp(cwd_szarg[0],cwd_szarg);
952  delete [] cwd_szarg[0];
953  delete [] cwd_szarg;
954  }
955 
956  if (ret==-1)
957  {
958  ret=execvp(szarg[0],szarg);
959  }
960 
961  if (ret==-1)
962  {
963  strError = string("Can't execute command because ") + string(strerror(errno));
964  FILE* out_to_parent = fdopen(pipe_child_to_parent[WRITE_TO_PIPE],"w");
965  fprintf(out_to_parent,"%s", strError.c_str());
966  fflush(out_to_parent);
967  fclose(out_to_parent);
968  }
969  close(pipe_child_to_parent[WRITE_TO_PIPE]);
970  delete [] szcmd;
971  delete [] szarg;
972  ::exit(ret);
973  }
974 
975  if (IS_PARENT_OF(pid_cmd))
976  {
977  close(pipe_child_to_parent[WRITE_TO_PIPE]);
978  FILE* in_from_child = fdopen(pipe_child_to_parent[READ_FROM_PIPE],"r");
979  int flags=fcntl(pipe_child_to_parent[READ_FROM_PIPE],F_GETFL,0);
980  if (fcntl(pipe_child_to_parent[READ_FROM_PIPE],F_SETFL,flags|O_NONBLOCK) == -1)
981  {
982  strError = string("Can't set flag on pipe: ") + string(strerror(errno));
983  fclose(in_from_child);
984  return 0;
985  }
986 
987  string retError;
988  waitPipe(pipe_child_to_parent[READ_FROM_PIPE]);
989 
990  for (char buff[1024]; fgets(buff,1024,in_from_child);)
991  retError += string(buff);
992  fclose(in_from_child);
993 
994  if(retError.size())
995  {
996  strError = retError;
997  close(pipe_child_to_parent[READ_FROM_PIPE]);
998  return 0;
999  }
1000 
1001  close(pipe_to_stdout[WRITE_TO_PIPE]);
1002  close(pipe_child_to_parent[READ_FROM_PIPE]);
1003  return pid_cmd;
1004  }
1005 
1006  return 0;
1007 }
1008 
1012 void LocalBroker::splitLine(char *pLine, char **pArgs)
1013 {
1014  char *pTmp = strchr(pLine, ' ');
1015 
1016  if (pTmp) {
1017  *pTmp = '\0';
1018  pTmp++;
1019  while ((*pTmp) && (*pTmp == ' ')) {
1020  pTmp++;
1021  }
1022  if (*pTmp == '\0') {
1023  pTmp = nullptr;
1024  }
1025  }
1026  *pArgs = pTmp;
1027 }
1028 
1029 
1030 
1034 void LocalBroker::parseArguments(char *io_pLine, int *o_pArgc, char **o_pArgv)
1035 {
1036  char *pNext = io_pLine;
1037  size_t i;
1038  int j;
1039  int quoted = 0;
1040  size_t len = strlen(io_pLine);
1041 
1042  // Protect spaces inside quotes, but lose the quotes
1043  for(i = 0; i < len; i++) {
1044  if ((!quoted) && ('"' == io_pLine[i])) {
1045  quoted = 1;
1046  io_pLine[i] = ' ';
1047  } else if ((quoted) && ('"' == io_pLine[i])) {
1048  quoted = 0;
1049  io_pLine[i] = ' ';
1050  } else if ((quoted) && (' ' == io_pLine[i])) {
1051  io_pLine[i] = '\1';
1052  }
1053  }
1054 
1055  // init
1056  memset(o_pArgv, 0x00, sizeof(char*) * C_MAXARGS);
1057  *o_pArgc = 1;
1058  o_pArgv[0] = io_pLine;
1059 
1060  while ((nullptr != pNext) && (*o_pArgc < C_MAXARGS)) {
1061  splitLine(pNext, &(o_pArgv[*o_pArgc]));
1062  pNext = o_pArgv[*o_pArgc];
1063 
1064  if (nullptr != o_pArgv[*o_pArgc]) {
1065  *o_pArgc += 1;
1066  }
1067  }
1068 
1069  for(j = 0; j < *o_pArgc; j++) {
1070  len = strlen(o_pArgv[j]);
1071  for(i = 0; i < len; i++) {
1072  if('\1' == o_pArgv[j][i]) {
1073  o_pArgv[j][i] = ' ';
1074  }
1075  }
1076  }
1077 }
1078 #endif
yarp::os::Port::close
void close() override
Stop port activity.
Definition: Port.cpp:357
yarp::os::Bottle
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
yarp::os::Bottle::toString
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:214
yarp::os::ContactStyle
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:27
yarp::os::Bottle::size
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:254
splitLine
void splitLine(char *pLine, char **pArgs)
Split a line into separate words.
Definition: Run.cpp:2344
yarp::os::impl::SplitString
Split a string into pieces.
Definition: SplitString.h:27
yarp::manager::LocalBroker::WindowMode
WindowMode
Definition: localbroker.h:41
yarp::manager
Definition: application.h:24
PIPE_EVENT
#define PIPE_EVENT
Definition: localbroker.cpp:36
yarp::os::Bottle::fromString
void fromString(const std::string &text)
Initializes bottle from a string.
Definition: Bottle.cpp:207
yarp::os::Port::open
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
Definition: Port.cpp:82
ret
bool ret
Definition: ImplementAxisInfo.cpp:72
KILL_TIMEOUT
#define KILL_TIMEOUT
Definition: localbroker.cpp:17
localbroker.h
yarp::os::fork
int fork()
Portable wrapper for the fork() function.
Definition: Os.cpp:162
yarp::os::SystemClock::nowSystem
static double nowSystem()
Definition: SystemClock.cpp:37
yarp::os::Port
A mini-server for network communication.
Definition: Port.h:50
yarp::os::Thread::isRunning
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition: Thread.cpp:108
STOP_TIMEOUT
#define STOP_TIMEOUT
Definition: localbroker.cpp:16
CONNECTION_TIMEOUT
#define CONNECTION_TIMEOUT
Definition: localbroker.cpp:18
yarp::os::ContactStyle::quiet
bool quiet
Suppress all outputs and warnings.
Definition: ContactStyle.h:39
yarp::os::SystemClock::delaySystem
static void delaySystem(double seconds)
Definition: SystemClock.cpp:32
HANDLE
void * HANDLE
Definition: RunProcManager.h:42
parseArguments
void parseArguments(char *io_pLine, int *o_pArgc, char **o_pArgv)
Breaks up a line into multiple arguments.
Definition: Run.cpp:2364
yarp::os::NetworkBase::connect
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
Definition: Network.cpp:685
yarp::os::Port::setTimeout
bool setTimeout(float timeout)
Set a timeout on network operations.
Definition: Port.cpp:628
yarp::os::Contactable::getName
virtual std::string getName() const
Get name of port.
Definition: Contactable.cpp:17
yarp::os::getcwd
char * getcwd(char *buf, size_t size)
Portable wrapper for the getcwd() function.
Definition: Os.cpp:115
yarp::os::Port::write
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition: Port.cpp:430
yarp::os::Thread::isStopping
bool isStopping()
Returns true if the thread is stopping (Thread::stop has been called).
Definition: Thread.cpp:102
READ_FROM_PIPE
#define READ_FROM_PIPE
Definition: localbroker.cpp:21
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
PIPE_TIMEOUT
#define PIPE_TIMEOUT
Definition: localbroker.cpp:35
yarp::os::NetworkBase::isConnected
static bool isConnected(const std::string &src, const std::string &dest, bool quiet)
Check if a connection exists between two ports.
Definition: Network.cpp:730
yarp::os::Thread::start
bool start()
Start the new thread running.
Definition: Thread.cpp:96
yarp::os::ContactStyle::timeout
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
Definition: ContactStyle.h:50
SplitString.h
WRITE_TO_PIPE
#define WRITE_TO_PIPE
Definition: localbroker.cpp:20
yarp::os::Thread::stop
bool stop()
Stop the thread.
Definition: Thread.cpp:84
C_MAXARGS
#define C_MAXARGS
Definition: localbroker.cpp:38
yarp::os::NetworkBase::disconnect
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
Definition: Network.cpp:703
yarp::os::NetworkBase::exists
static bool exists(const std::string &port, bool quiet=true, bool checkVer=true)
Check for a port to be ready and responsive.
Definition: Network.cpp:749