YARP
Yet Another Robot Platform
ThreadImpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/os/NetType.h>
13 #include <yarp/os/Semaphore.h>
16 
17 #include <cstdlib>
18 #include <sstream>
19 #include <thread>
20 
21 #if defined(YARP_HAS_ACE)
22 # include <ace/Thread.h> // For using ACE_hthread_t as native_handle
23 // In one the ACE headers there is a definition of "main" for WIN32
24 # ifdef main
25 # undef main
26 # endif
27 #endif
28 
29 #if defined(__linux__) // Use the POSIX syscalls for the gettid()
30 # include <sys/syscall.h>
31 # include <unistd.h>
32 #endif
33 
34 
35 using namespace yarp::os::impl;
36 
37 namespace {
38 YARP_OS_LOG_COMPONENT(THREADIMPL, "yarp.os.impl.ThreadImpl")
39 } // namespace
40 
41 static std::atomic<int> threadCount{0};
42 
43 void theExecutiveBranch(void* args)
44 {
45  // just for now -- rather deal with broken pipes through normal procedures
46  yarp::os::impl::signal(SIGPIPE, SIG_IGN);
47 
48 
49  /*
50  sigset_t set;
51  sigemptyset(&set);
52  sigaddset(&set, SIGHUP);
53  sigaddset(&set, SIGINT);
54  sigaddset(&set, SIGQUIT);
55  sigaddset(&set, SIGTERM);
56  sigaddset(&set, SIGUSR1);
57  sigaddset(&set, SIGCHLD);
58  ACE_OS::thr_sigsetmask(SIG_BLOCK, &set, nullptr);
59  fprintf(stderr, "Blocking signals\n");
60  */
61 
62  auto* thread = (ThreadImpl*)args;
63 
64  yCDebug(THREADIMPL, "Thread starting up");
65 
66  bool success = thread->threadInit();
67  thread->notify(success);
68  thread->notifyOpened(success);
69  thread->synchroPost();
70 
71  if (success) {
72  // the thread id must be set before calling run() to avoid a race
73  // condition in case the run() method checks it.
74  thread->id = std::this_thread::get_id();
75 #if defined(__linux__)
76  // Use the POSIX syscalls to get
77  // the real thread ID (gettid) on Linux machine
78  thread->tid = static_cast<long int>(syscall(SYS_gettid));
79 #else
80  thread->tid = static_cast<long int>(std::hash<std::thread::id>()(thread->id));
81 #endif
82 
83  thread->setPriority();
84  thread->run();
85  thread->threadRelease();
86  }
87 
88  --threadCount;
89  yCDebug(THREADIMPL, "Thread shutting down");
90 
91  thread->notify(false);
92  thread->synchroPost();
93 
94  return;
95 }
96 
97 
99 {
100  yCDebug(THREADIMPL, "Thread being deleted");
101  join();
102 }
103 
104 
106 {
107  return tid;
108 }
109 
110 
112 {
113 #if defined(__linux__)
114  // Use the POSIX syscalls to get
115  // the real thread ID (gettid) on Linux machine
116  return static_cast<long int>(syscall(SYS_gettid));
117 #else
118  return static_cast<long int>(std::hash<std::thread::id>()(std::this_thread::get_id()));
119 #endif
120 }
121 
122 
123 int ThreadImpl::join(double seconds)
124 {
125  closing = true;
126  if (needJoin) {
127  if (seconds > 0) {
128  if (!initWasSuccessful) {
129  // join called before start completed
130  yCError(THREADIMPL, "Tried to join a thread before starting it");
131  return -1;
132  }
133  synchro.waitWithTimeout(seconds);
134  if (active) {
135  return -1;
136  }
137  }
138 
139  int result = -1;
140  if (thread.joinable()) {
141  thread.join();
142  result = 0;
143  }
144 
145  needJoin = false;
146  active = false;
147  while (synchro.check()) {
148  }
149  return result;
150  }
151  return 0;
152 }
153 
155 {
156 }
157 
159 {
160  closing = true;
161  join(-1);
162 }
163 
164 // similar to close(), but does not join (does not block)
166 {
167  closing = true;
168 }
169 
171 {
172 }
173 
174 void ThreadImpl::afterStart(bool success)
175 {
176 }
177 
179 {
180  return true;
181 }
182 
184 {
185 }
186 
188 {
189  join();
190  closing = false;
191  initWasSuccessful = false;
192  beforeStart();
193  thread = std::thread(theExecutiveBranch, (void*)this);
194  int result = thread.joinable() ? 0 : 1;
195  if (result == 0) {
196  // we must, at some point in the future, join the thread
197  needJoin = true;
198 
199  // the thread started correctly, wait for the initialization
200  yCDebug(THREADIMPL, "Child thread initializing");
201  synchroWait();
202  initWasSuccessful = true;
203  if (opened) {
204  ++threadCount;
205  yCDebug(THREADIMPL, "Child thread initialized ok");
206  afterStart(true);
207  return true;
208  }
209  yCDebug(THREADIMPL, "Child thread did not initialize ok");
210  //wait for the thread to really exit
211  ThreadImpl::join(-1);
212  }
213  //the thread did not start, call afterStart() to warn the user
214  yCError(THREADIMPL, "A thread failed to start with error code: %d", result);
215  afterStart(false);
216  return false;
217 }
218 
220 {
221  synchro.wait();
222 }
223 
225 {
226  synchro.post();
227 }
228 
229 void ThreadImpl::notify(bool s)
230 {
231  active = s;
232 }
233 
235 {
236  return closing;
237 }
238 
240 {
241  return active;
242 }
243 
245 {
246  return threadCount;
247 }
248 
249 int ThreadImpl::setPriority(int priority, int policy)
250 {
251  if (priority == -1) {
252  priority = defaultPriority;
253  policy = defaultPolicy;
254  } else {
255  defaultPriority = priority;
256  defaultPolicy = policy;
257  }
258  if (active && priority != -1) {
259 #if defined(YARP_HAS_ACE)
260  if (std::is_same<std::thread::native_handle_type, ACE_hthread_t>::value) {
261  return ACE_Thread::setprio(thread.native_handle(), priority, policy);
262  }
263  yCError(THREADIMPL, "Cannot set priority without ACE");
264 #elif defined(__unix__)
265  if (std::is_same<std::thread::native_handle_type, pthread_t>::value) {
266  struct sched_param thread_param;
267  thread_param.sched_priority = priority;
268  int ret = pthread_setschedparam(thread.native_handle(), policy, &thread_param);
269  return (ret != 0) ? -1 : 0;
270  } else {
271  yCError(THREADIMPL, "Cannot set priority without ACE");
272  }
273 #else
274  yCError(THREADIMPL, "Cannot set priority without ACE");
275 #endif
276  }
277  return 0;
278 }
279 
281 {
282  int prio = defaultPriority;
283  if (active) {
284 #if defined(YARP_HAS_ACE)
285  if (std::is_same<std::thread::native_handle_type, ACE_hthread_t>::value) {
286  ACE_Thread::getprio(thread.native_handle(), prio);
287  } else {
288  yCError(THREADIMPL, "Cannot get priority without ACE");
289  }
290 #elif defined(__unix__)
291  if (std::is_same<std::thread::native_handle_type, pthread_t>::value) {
292  struct sched_param thread_param;
293  int policy;
294  if (pthread_getschedparam(thread.native_handle(), &policy, &thread_param) == 0) {
295  prio = thread_param.sched_priority;
296  } else {
297  yCError(THREADIMPL, "Cannot get priority without ACE");
298  }
299  }
300 #else
301  yCError(THREADIMPL, "Cannot get priority without ACE");
302 #endif
303  }
304  return prio;
305 }
306 
308 {
309  int policy = defaultPolicy;
310  if (active) {
311 #if defined(YARP_HAS_ACE)
312  if (std::is_same<std::thread::native_handle_type, ACE_hthread_t>::value) {
313  int prio;
314  ACE_Thread::getprio(thread.native_handle(), prio, policy);
315  } else {
316  yCError(THREADIMPL, "Cannot get scheduling policy without ACE");
317  }
318 #elif defined(__unix__)
319  if (std::is_same<std::thread::native_handle_type, pthread_t>::value) {
320  struct sched_param thread_param;
321  if (pthread_getschedparam(thread.native_handle(), &policy, &thread_param) != 0) {
322  policy = defaultPolicy;
323  }
324  } else {
325  yCError(THREADIMPL, "Cannot get scheduling policy without ACE");
326  }
327 #else
328  yCError(THREADIMPL, "Cannot get scheduling policy without ACE");
329 #endif
330  }
331  return policy;
332 }
333 
335 {
336  return tid;
337 }
338 
340 {
342 }
yarp::os::impl::ThreadImpl::getPolicy
int getPolicy()
Definition: ThreadImpl.cpp:307
yarp::os::impl::ThreadImpl::start
virtual bool start()
Definition: ThreadImpl.cpp:187
yarp::os::impl::ThreadImpl::notify
void notify(bool s)
Definition: ThreadImpl.cpp:229
yarp::os::impl::ThreadImpl::threadRelease
virtual void threadRelease()
Definition: ThreadImpl.cpp:183
yarp::os::impl::ThreadImpl::isRunning
bool isRunning()
Definition: ThreadImpl.cpp:239
yarp::os::impl::ThreadImpl::threadInit
virtual bool threadInit()
Definition: ThreadImpl.cpp:178
ThreadImpl.h
yarp::os::impl::ThreadImpl::synchroPost
void synchroPost()
Definition: ThreadImpl.cpp:224
yarp::os::impl::ThreadImpl::~ThreadImpl
virtual ~ThreadImpl()
Definition: ThreadImpl.cpp:98
ret
bool ret
Definition: ImplementAxisInfo.cpp:72
LogComponent.h
yarp::os::Semaphore::wait
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:99
threadCount
static std::atomic< int > threadCount
Definition: ThreadImpl.cpp:41
NetType.h
yarp::os::impl::ThreadImpl::run
virtual void run()
Definition: ThreadImpl.cpp:154
yarp::os::Semaphore::post
void post()
Increment the counter.
Definition: Semaphore.cpp:114
yarp::os::impl::ThreadImpl::setPriority
int setPriority(int priority=-1, int policy=-1)
Definition: ThreadImpl.cpp:249
yarp::os::Semaphore::check
bool check()
Decrement the counter, unless that would require waiting.
Definition: Semaphore.cpp:109
yarp::os::impl::ThreadImpl::yield
static void yield()
Definition: ThreadImpl.cpp:339
theExecutiveBranch
void theExecutiveBranch(void *args)
Definition: ThreadImpl.cpp:43
yarp::os::impl::ThreadImpl::tid
long tid
Definition: ThreadImpl.h:69
yarp::os::impl::ThreadImpl::join
int join(double seconds=-1)
Definition: ThreadImpl.cpp:123
yarp::os::Time::yield
void yield()
The calling thread releases its remaining quantum upon calling this function.
Definition: Time.cpp:141
yarp::os::impl::ThreadImpl::beforeStart
virtual void beforeStart()
Definition: ThreadImpl.cpp:170
yarp::os::impl::ThreadImpl::getKey
long int getKey()
Definition: ThreadImpl.cpp:105
Semaphore.h
yarp::os::impl::ThreadImpl::getCount
static int getCount()
Definition: ThreadImpl.cpp:244
yarp::os::impl::ThreadImpl::askToClose
void askToClose()
Definition: ThreadImpl.cpp:165
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yarp::os::impl::ThreadImpl::synchroWait
void synchroWait()
Definition: ThreadImpl.cpp:219
yarp::os::impl::ThreadImpl
An abstraction for a thread of execution.
Definition: ThreadImpl.h:26
PlatformSignal.h
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::impl::ThreadImpl::getPriority
int getPriority()
Definition: ThreadImpl.cpp:280
yarp::os::impl::ThreadImpl::isClosing
bool isClosing()
Definition: ThreadImpl.cpp:234
yarp::os::impl::ThreadImpl::getKeyOfCaller
static long int getKeyOfCaller()
Definition: ThreadImpl.cpp:111
yarp::os::Semaphore::waitWithTimeout
bool waitWithTimeout(double timeoutInSeconds)
Try to decrement the counter, even if we must wait - but don't wait forever.
Definition: Semaphore.cpp:104
yarp::os::impl::ThreadImpl::close
virtual void close()
Definition: ThreadImpl.cpp:158
YARP_OS_LOG_COMPONENT
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
yarp::os::impl
The components from which ports and connections are built.
yarp::os::impl::ThreadImpl::getTid
long getTid()
Definition: ThreadImpl.cpp:334
yarp::os::impl::ThreadImpl::afterStart
virtual void afterStart(bool success)
Definition: ThreadImpl.cpp:174