blocxx
ThreadImpl.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
3 * Copyright (C) 2006, Novell, Inc. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of
14 * Vintela, Inc.,
15 * nor Novell, Inc.,
16 * nor the names of its contributors or employees may be used to
17 * endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 *******************************************************************************/
32 
33 
38 #include "blocxx/BLOCXX_config.h"
39 #include "blocxx/ThreadImpl.hpp"
40 #include "blocxx/Mutex.hpp"
41 #include "blocxx/Assertion.hpp"
42 #include "blocxx/Thread.hpp"
45 #include "blocxx/Condition.hpp"
46 #include "blocxx/Timeout.hpp"
47 #include "blocxx/Format.hpp"
48 #include "blocxx/TimeoutTimer.hpp"
49 #if defined(BLOCXX_WIN32)
50 #include "blocxx/Map.hpp"
51 #include "blocxx/MutexLock.hpp"
52 #endif
53 #include <cassert>
54 #include <cstring>
55 #include <cstddef>
56 
57 extern "C"
58 {
59 #ifdef BLOCXX_HAVE_SYS_TIME_H
60 #include <sys/time.h>
61 #endif
62 
63 #include <sys/types.h>
64 
65 #ifdef BLOCXX_HAVE_UNISTD_H
66 #include <unistd.h>
67 #endif
68 
69 #include <errno.h>
70 #include <signal.h>
71 
72 #ifdef BLOCXX_USE_PTHREAD
73 #include <pthread.h>
74 #endif
75 
76 #ifdef BLOCXX_WIN32
77 #include <process.h>
78 #endif
79 }
80 
81 namespace BLOCXX_NAMESPACE
82 {
83 
84 namespace ThreadImpl {
85 
87 // STATIC
88 void
89 sleep(UInt32 milliSeconds)
90 {
91  sleep(Timeout::relative(milliSeconds / 1000.0));
92 }
93 
94 void
95 sleep(const Timeout& timeout)
96 {
98  NonRecursiveMutexLock lock(mtx);
99  Condition cond;
100  TimeoutTimer timer(timeout);
101  while (!timer.expired())
102  {
103  // if it timed out, no reason to loop again
104  if (!cond.timedWait(lock, timer.asAbsoluteTimeout()))
105  {
106  return;
107  }
108  timer.loop();
109  }
110 }
112 // STATIC
113 void
115 {
116 #if defined(BLOCXX_HAVE_SCHED_YIELD)
117  sched_yield();
118 #elif defined(BLOCXX_WIN32)
120  ::SwitchToThread();
121 #else
123 #endif
124 }
125 
126 #if defined(BLOCXX_USE_PTHREAD)
127 namespace {
128 struct LocalThreadParm
129 {
130  ThreadFunction m_func;
131  void* m_funcParm;
132 };
133 extern "C" {
134 static void*
135 threadStarter(void* arg)
136 {
137  // set our cancellation state
138 #ifdef BLOCXX_NCR
139  pthread_setcancel(CANCEL_ON);
140  pthread_setasynccancel(CANCEL_OFF);
141 #else
142  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
143  pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
144 #endif
145 
146  // block all signals except SIGUSR1, which is used to signal termination
147  sigset_t signalSet;
148  int rv = sigfillset(&signalSet);
149  BLOCXX_ASSERT(rv == 0);
150  rv = sigdelset(&signalSet, SIGUSR1);
151  BLOCXX_ASSERT(rv == 0);
152  rv = pthread_sigmask(SIG_SETMASK, &signalSet, 0);
153  BLOCXX_ASSERT(rv == 0);
154 
155  LocalThreadParm* parg = static_cast<LocalThreadParm*>(arg);
156  ThreadFunction func = parg->m_func;
157  void* funcParm = parg->m_funcParm;
158  delete parg;
159  Int32 rval = (*func)(funcParm);
160  void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
161  pthread_exit(prval);
162  return prval;
163 }
164 }
165 // The purpose of this class is to retrieve the default stack size only once
166 // at library load time and re-use it thereafter.
167 struct default_stack_size
168 {
169 #if !defined(BLOCXX_NCR)
170  default_stack_size()
171  {
172  // if anything in this function fails, we'll just leave val == 0.
173  val = 0;
174  needsSetting = false;
175 
176 // make sure we have a big enough stack. BloCxx can use quite a bit, so we'll try to make sure we get at least 1 MB.
177 // 1 MB is just an arbitrary number. The default on Linux is 2 MB which has never been a problem. However, on UnixWare
178 // the default is really low (16K IIRC) and that isn't enough. It would be good to do some sort of measurement...
179 #ifdef _POSIX_THREAD_ATTR_STACKSIZE
180  pthread_attr_t stack_size_attr;
181  if (pthread_attr_init(&stack_size_attr) != 0)
182  {
183  return;
184  }
185  if (pthread_attr_getstacksize(&stack_size_attr, &val) != 0)
186  {
187  return;
188  }
189 
190  if (val < 1048576)
191  {
192  val = 1048576; // 1 MB
193  needsSetting = true;
194  }
195 #ifdef PTHREAD_STACK_MIN
196  if (PTHREAD_STACK_MIN > val)
197  {
198  val = PTHREAD_STACK_MIN;
199  needsSetting = true;
200  }
201 #endif
202 
203 #endif //#ifdef _POSIX_THREAD_ATTR_STACKSIZE
204  }
205 
206 #else //#if !defined(BLOCXX_NCR)
207  default_stack_size()
208  {
209  // if anything in this function fails, we'll just leave val == 0.
210  val = 0;
211  needsSetting = false;
212 
213 // make sure we have a big enough stack. BloCxx can use quite a bit, so we'll try to make sure we get at least 1 MB.
214 // 1 MB is just an arbitrary number. The default on Linux is 2 MB which has never been a problem. However, on UnixWare
215 // the default is really low (16K IIRC) and that isn't enough. It would be good to do some sort of measurement...
216 #ifdef _POSIX_THREAD_ATTR_STACKSIZE
217  pthread_attr_t stack_size_attr;
218  if (pthread_attr_create(&stack_size_attr) != 0)
219  {
220  return;
221  }
222 
223  val = pthread_attr_getstacksize(stack_size_attr);
224  if (static_cast<signed>(val) == -1)
225  {
226  return;
227  }
228 
229  //we do not set the minimal stack size in 1 Mb because NCR returns 32K
230  //and if we set 1M or even 256K we get 'Out of Memory'
231 
232 #if defined(PTHREAD_STACK_MIN) && defined(_SC_THREAD_STACK_MIN)
233  if (PTHREAD_STACK_MIN > val)
234  {
235  val = PTHREAD_STACK_MIN;
236  needsSetting = true;
237  }
238 #endif
239 
240 #endif //#ifdef _POSIX_THREAD_ATTR_STACKSIZE
241  }
242 #endif //#if !defined(BLOCXX_NCR)
243 
244  static size_t val;
245  static bool needsSetting;
246 };
247 
248 size_t default_stack_size::val = 0;
249 bool default_stack_size::needsSetting(false);
250 default_stack_size g_theDefaultStackSize;
252 pthread_once_t once_control = PTHREAD_ONCE_INIT;
253 pthread_key_t theKey;
254 extern "C" {
255 
256 #ifdef BLOCXX_NCR
257 static void
258 SIGUSR1Handler()
259 {
260  // do nothing
261 }
262 #else
263 static void
264 SIGUSR1Handler(int sig)
265 {
266  // do nothing
267 }
268 #endif
269 
271 static void doOneTimeThreadInitialization()
272 {
273 #ifdef BLOCXX_NCR
274  pthread_keycreate(&theKey, NULL);
275 #else
276  pthread_key_create(&theKey, NULL);
277 #endif
278  // Handle SIGUSR1 so we can safely send it to threads when we want to cancel them.
279  struct sigaction temp;
280  memset(&temp, '\0', sizeof(temp));
281  sigaction(SIGUSR1, 0, &temp);
282  temp.sa_handler = SIGUSR1Handler;
283  sigemptyset(&temp.sa_mask);
284  temp.sa_flags = 0;
285  sigaction(SIGUSR1, &temp, NULL);
286 }
287 
288 } // end extern "C"
289 } // end unnamed namespace
291 
292 #if !defined(BLOCXX_NCR)
293 // STATIC
294 int
295 createThread(Thread_t& handle, ThreadFunction func,
296  void* funcParm, UInt32 threadFlags)
297 {
298  int cc = 0;
299  pthread_attr_t attr;
300  pthread_attr_init(&attr);
301  if (!(threadFlags & BLOCXX_THREAD_FLG_JOINABLE))
302  {
303  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
304  }
305 
306 #if !defined(BLOCXX_VALGRIND_SUPPORT) // valgrind doesn't like us to set the stack size
307  // Won't be set to true unless _POSIX_THREAD_ATTR_STACKSIZE is defined
308  if (default_stack_size::needsSetting)
309  {
310  pthread_attr_setstacksize(&attr, default_stack_size::val);
311  }
312 #endif
313 
314  LocalThreadParm* parg = new LocalThreadParm;
315  parg->m_func = func;
316  parg->m_funcParm = funcParm;
317  cc = pthread_create(&handle, &attr, threadStarter, parg);
318  pthread_attr_destroy(&attr);
319  return cc;
320 }
321 
322 #else //#if !defined(BLOCXX_NCR)
323 // STATIC
324 int
325 createThread(Thread_t& handle, ThreadFunction func,
326  void* funcParm, UInt32 threadFlags)
327 {
328  int cc = 0;
329  pthread_attr_t attr;
330  pthread_attr_create(&attr);
331 
332 #if !defined(BLOCXX_VALGRIND_SUPPORT) // valgrind doesn't like us to set the stack size
333  // Won't be set to true unless _POSIX_THREAD_ATTR_STACKSIZE is defined
334  if (default_stack_size::needsSetting)
335  {
336  pthread_attr_setstacksize(&attr, default_stack_size::val);
337  }
338 #endif
339 
340  LocalThreadParm* parg = new LocalThreadParm;
341  parg->m_func = func;
342  parg->m_funcParm = funcParm;
343  if (pthread_create(&handle, attr, threadStarter, parg) != 0)
344  {
345  cc = -1;
346  }
347 
348  if (cc != -1 && !(threadFlags & BLOCXX_THREAD_FLG_JOINABLE))
349  {
350  pthread_detach(&handle);
351  }
352 
353  pthread_attr_delete(&attr);
354  return cc;
355 }
356 #endif //#if !defined(BLOCXX_NCR)
357 
358 // STATIC
359 void
360 exitThread(Thread_t&, Int32 rval)
361 {
362  void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
363  pthread_exit(prval);
364 }
365 
366 
367 #if defined(BLOCXX_SIZEOF_PTHREAD_T)
368 #if BLOCXX_SIZEOF_PTHREAD_T == 2
369 #define BLOCXX_THREAD_CONVERTER UInt16
370 #elif BLOCXX_SIZEOF_PTHREAD_T == 4
371 #define BLOCXX_THREAD_CONVERTER UInt32
372 #elif BLOCXX_SIZEOF_PTHREAD_T == 8
373 #define BLOCXX_THREAD_CONVERTER UInt64
374 #else
375 #ifdef BLOCXX_NCR //BLOCXX_SIZEOF_PTHREAD_T=0 for this OS
376 #define BLOCXX_THREAD_CONVERTER UInt16
377 #else /* BLOCXX_SIZEOF_PTHREAD_T */
378 #error Unexpected size for pthread_t
379 #endif /* BLOCXX_NCR */
380 #endif /* BLOCXX_SIZEOF_PTHREAD_T */
381 #else
382 #error No pthread_t size was found!
383 #endif /* defined(BLOCXX_SIZEOF_PTHREAD_T) */
384 
385 UInt64 thread_t_ToUInt64(Thread_t thr)
386 {
387 #ifdef BLOCXX_NCR
388  return UInt64(BLOCXX_THREAD_CONVERTER(cma_thread_get_unique(&thr)));
389 #else
390  return UInt64(BLOCXX_THREAD_CONVERTER(thr));
391 #endif
392 }
393 #undef BLOCXX_THREAD_CONVERTER
394 
396 // STATIC
397 void
398 destroyThread(Thread_t& )
399 {
400 }
402 // STATIC
403 int
404 setThreadDetached(Thread_t& handle)
405 {
406 #ifdef BLOCXX_NCR
407  int cc = pthread_detach(&handle);
408 #else
409  int cc = pthread_detach(handle);
410 #endif
411  if (cc != 0)
412  {
413  if (cc != EINVAL)
414  {
415  cc = -1;
416  }
417  }
418  return cc;
419 }
421 // STATIC
422 int
423 joinThread(Thread_t& handle, Int32& rval)
424 {
425  void* prval(0);
426  if ((errno = pthread_join(handle, &prval)) == 0)
427  {
428  rval = static_cast<Int32>(reinterpret_cast<ptrdiff_t>(prval));
429  return 0;
430  }
431  else
432  {
433  return 1;
434  }
435 }
437 void
438 testCancel()
439 {
440  // set up our TLS which will be used to store the Thread* in.
441  pthread_once(&once_control, &doOneTimeThreadInitialization);
442  Thread* theThread = NULL;
443 #ifdef BLOCXX_NCR
444  pthread_addr_t addr_ptr = NULL;
445  int ret = pthread_getspecific(theKey, &addr_ptr);
446  if (ret == 0)
447  {
448  theThread = reinterpret_cast<Thread*>(addr_ptr);
449  }
450 #else
451  theThread = reinterpret_cast<Thread*>(pthread_getspecific(theKey));
452 #endif
453  if (theThread == 0)
454  {
455  return;
456  }
457  if (AtomicGet(theThread->m_cancelRequested) == 1)
458  {
459  // We don't use BLOCXX_THROW here because
460  // ThreadCancelledException is special. It's not derived
461  // from Exception on purpose so it can be propagated up
462  // the stack easier. This exception shouldn't be caught and not
463  // re-thrown anywhere except in Thread::threadRunner()
464  throw ThreadCancelledException();
465  }
466 }
468 void saveThreadInTLS(void* pTheThread)
469 {
470  // set up our TLS which will be used to store the Thread* in.
471  pthread_once(&once_control, &doOneTimeThreadInitialization);
472  int rc;
473  if ((rc = pthread_setspecific(theKey, pTheThread)) != 0)
474  {
475  BLOCXX_THROW(ThreadException, Format("pthread_setspecific failed. error = %1(%2)", rc, strerror(rc)).c_str());
476  }
477 }
479 void sendSignalToThread(Thread_t threadID, int signo)
480 {
481  int rc;
482  if ((rc = pthread_kill(threadID, signo)) != 0)
483  {
484  BLOCXX_THROW(ThreadException, Format("pthread_kill failed. error = %1(%2)", rc, strerror(rc)).c_str());
485  }
486 }
488 void cancel(Thread_t threadID)
489 {
490  int rc;
491  if ((rc = pthread_cancel(threadID)) != 0)
492  {
493  BLOCXX_THROW(ThreadException, Format("pthread_cancel failed. error = %1(%2)", rc, strerror(rc)).c_str());
494  }
495 }
496 #endif // #ifdef BLOCXX_USE_PTHREAD
497 
498 #if defined(BLOCXX_WIN32)
499 
500 namespace {
501 
502 struct WThreadInfo
503 {
504  HANDLE handle;
505  BLOCXX_NAMESPACE::Thread* pTheThread;
506 };
507 
508 typedef Map<DWORD, WThreadInfo> Win32ThreadMap;
509 Win32ThreadMap g_threads;
510 Mutex g_threadsGuard;
511 
512 struct LocalThreadParm
513 {
514  ThreadFunction m_func;
515  void* m_funcParm;
516 };
517 
519 extern "C" {
520 unsigned __stdcall threadStarter(void* arg)
521 {
522  LocalThreadParm* parg = reinterpret_cast<LocalThreadParm*>(arg);
523  ThreadFunction func = parg->m_func;
524  void* funcParm = parg->m_funcParm;
525  delete parg;
526  Int32 rval = (*func)(funcParm);
527  ::_endthreadex(static_cast<unsigned>(rval));
528  return rval;
529 }
530 } // End extern "C"
531 
533 void
534 addThreadToMap(DWORD threadId, HANDLE threadHandle)
535 {
536  MutexLock ml(g_threadsGuard);
537  WThreadInfo wi;
538  wi.handle = threadHandle;
539  wi.pTheThread = 0;
540  g_threads[threadId] = wi;
541 }
542 
544 HANDLE
545 getThreadHandle(DWORD threadId)
546 {
547  MutexLock ml(g_threadsGuard);
548  HANDLE chdl = 0;
549  Win32ThreadMap::iterator it = g_threads.find(threadId);
550  if (it != g_threads.end())
551  {
552  chdl = it->second.handle;
553  }
554  return chdl;
555 }
556 
558 void
559 setThreadPointer(DWORD threadId, Thread* pTheThread)
560 {
561  MutexLock ml(g_threadsGuard);
562  Win32ThreadMap::iterator it = g_threads.find(threadId);
563  if (it != g_threads.end())
564  {
565  it->second.pTheThread = pTheThread;
566  }
567 }
568 
570 HANDLE
571 removeThreadFromMap(DWORD threadId)
572 {
573  MutexLock ml(g_threadsGuard);
574  HANDLE chdl = 0;
575  Win32ThreadMap::iterator it = g_threads.find(threadId);
576  if (it != g_threads.end())
577  {
578  chdl = it->second.handle;
579  g_threads.erase(it);
580  }
581  return chdl;
582 }
583 
585 Thread*
586 getThreadObject(DWORD threadId)
587 {
588  Thread* pTheThread = 0;
589  MutexLock ml(g_threadsGuard);
590  Win32ThreadMap::iterator it = g_threads.find(threadId);
591  if (it != g_threads.end())
592  {
593  pTheThread = it->second.pTheThread;
594  }
595  return pTheThread;
596 }
597 
598 } // End unnamed namespace
599 
601 // STATIC
602 int
603 createThread(Thread_t& handle, ThreadFunction func,
604  void* funcParm, UInt32 threadFlags)
605 {
606  int cc = -1;
607  HANDLE hThread;
608  unsigned threadId;
609 
610  LocalThreadParm* parg = new LocalThreadParm;
611  parg->m_func = func;
612  parg->m_funcParm = funcParm;
613  hThread = reinterpret_cast<HANDLE>(::_beginthreadex(NULL, 0, threadStarter,
614  parg, 0, &threadId));
615  if (hThread != 0)
616  {
617  addThreadToMap(threadId, hThread);
618  handle = threadId;
619  cc = 0;
620  }
621  else
622  {
623  cc = errno;
624  }
625 
626  return cc;
627 }
629 // STATIC
630 void
631 exitThread(Thread_t&, Int32 rval)
632 {
633  ::_endthreadex(static_cast<unsigned>(rval));
634 }
635 
637 // STATIC
638 UInt64 thread_t_ToUInt64(Thread_t thr)
639 {
640  // This should really be a compile time assert.
641  BLOCXX_ASSERTMSG(sizeof(unsigned long) >= sizeof(Thread_t)," Thread_t truncated!");
642  return static_cast<UInt64>(thr);
643 }
644 
646 // STATIC
647 void
648 destroyThread(Thread_t& threadId)
649 {
650  HANDLE thdl = removeThreadFromMap(threadId);
651  if (thdl != 0)
652  {
653  ::CloseHandle(thdl);
654  }
655 }
657 // STATIC
658 int
659 setThreadDetached(Thread_t& handle)
660 {
661  // No need for this on Win32
662  return 0;
663 }
665 // STATIC
666 int
667 joinThread(Thread_t& threadId, Int32& rvalArg)
668 {
669  int cc = -1;
670  DWORD rval;
671  HANDLE thdl = getThreadHandle(threadId);
672  if (thdl != 0)
673  {
674  if (::WaitForSingleObject(thdl, INFINITE) != WAIT_FAILED)
675  {
676  if (::GetExitCodeThread(thdl, &rval) != 0)
677  {
678  rvalArg = static_cast<Int32>(rval);
679  cc = 0;
680  }
681  }
682  }
683  return cc;
684 }
685 
687 void
688 testCancel()
689 {
690  DWORD threadId = ThreadImpl::currentThread();
691  Thread* pTheThread = getThreadObject(threadId);
692  if (pTheThread)
693  {
694  if (AtomicGet(pTheThread->m_cancelRequested) == 1)
695  {
696  // We don't use BLOCXX_THROW here because
697  // ThreadCancelledException is special. It's not derived
698  // from Exception on purpose so it can be propagated up
699  // the stack easier. This exception shouldn't be caught and not
700  // re-thrown anywhere except in Thread::threadRunner()
701  throw ThreadCancelledException();
702  }
703  }
704 }
706 void saveThreadInTLS(void* pThreadArg)
707 {
708  Thread* pThread = static_cast<Thread*>(pThreadArg);
709  DWORD threadId = pThread->getId();
710  setThreadPointer(threadId, pThread);
711 }
713 void sendSignalToThread(Thread_t threadID, int signo)
714 {
715 }
717 void cancel(Thread_t threadId)
718 {
719  HANDLE thdl = getThreadHandle(threadId);
720  if (thdl != 0)
721  {
722  ::TerminateThread(thdl, -1);
723  }
724 }
725 
726 #endif // #ifdef BLOCXX_WIN32
727 } // end namespace BLOCXX_ThreadImpl
728 
729 } // end namespace BLOCXX_NAMESPACE
730