blocxx
ThreadPool.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
3 * Copyright (C) 2006, Novell, Inc. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of
14 * Vintela, Inc.,
15 * nor Novell, Inc.,
16 * nor the names of its contributors or employees may be used to
17 * endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 *******************************************************************************/
32 
33 
38 #include "blocxx/BLOCXX_config.h"
39 #include "blocxx/ThreadPool.hpp"
40 #include "blocxx/Array.hpp"
41 #include "blocxx/Thread.hpp"
44 #include "blocxx/Condition.hpp"
45 #include "blocxx/Format.hpp"
46 #include "blocxx/Mutex.hpp"
47 #include "blocxx/MutexLock.hpp"
48 #include "blocxx/NullLogger.hpp"
49 #include "blocxx/Timeout.hpp"
50 #include "blocxx/TimeoutTimer.hpp"
51 #include "blocxx/GlobalString.hpp"
52 
53 #include <deque>
54 
55 #ifdef BLOCXX_DEBUG
56 #include <iostream> // for cerr
57 #endif
58 
59 namespace BLOCXX_NAMESPACE
60 {
61 
63 
64 // logger can be null
65 #define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
66 #define BLOCXX_POOL_LOG_DEBUG2(logger, arg) do { BLOCXX_LOG_DEBUG2(logger, m_poolName + ": " + arg); } while (0)
67 #define BLOCXX_POOL_LOG_DEBUG3(logger, arg) do { BLOCXX_LOG_DEBUG3(logger, m_poolName + ": " + arg); } while (0)
68 #define BLOCXX_POOL_LOG_ERROR(logger, arg) do { BLOCXX_LOG_ERROR(logger, m_poolName + ": " + arg); } while (0)
69 #define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
70 
73 {
74 public:
75  // returns true if work is placed in the queue to be run and false if not.
76  virtual bool addWork(const RunnableRef& work, const Timeout& timeout) = 0;
77  virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout) = 0;
78  virtual void waitForEmptyQueue() = 0;
79  virtual ~ThreadPoolImpl()
80  {
81  }
82 };
83 namespace {
84 
86 
87 class FixedSizePoolImpl;
89 class FixedSizePoolWorkerThread : public Thread
90 {
91 public:
92  FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
93  : Thread()
94  , m_thePool(thePool)
95  {
96  }
97  virtual Int32 run();
98 private:
99  virtual void doShutdown()
100  {
101  MutexLock lock(m_guard);
102  if (m_currentRunnable)
103  {
104  m_currentRunnable->doShutdown();
105  }
106  }
107  virtual void doCooperativeCancel()
108  {
109  MutexLock lock(m_guard);
110  if (m_currentRunnable)
111  {
112  m_currentRunnable->doCooperativeCancel();
113  }
114  }
115  virtual void doDefinitiveCancel()
116  {
117  MutexLock lock(m_guard);
118  if (m_currentRunnable)
119  {
120  m_currentRunnable->doDefinitiveCancel();
121  }
122  }
123 
124  FixedSizePoolImpl* m_thePool;
125 
126  Mutex m_guard;
128 
129  // non-copyable
130  FixedSizePoolWorkerThread(const FixedSizePoolWorkerThread&);
131  FixedSizePoolWorkerThread& operator=(const FixedSizePoolWorkerThread&);
132 };
134 class CommonPoolImpl : public ThreadPoolImpl
135 {
136 protected:
137  CommonPoolImpl(UInt32 maxQueueSize, const Logger& logger, const String& poolName)
138  : m_maxQueueSize(maxQueueSize)
139  , m_queueClosed(false)
140  , m_shutdown(false)
141  , m_logger(logger)
142  , m_poolName(poolName)
143  {
144  }
145 
146  virtual ~CommonPoolImpl()
147  {
148  }
149 
150  // assumes that m_queueLock is locked. DynamicSizeNoQueuePoolImpl overrides this.
151  virtual bool queueIsFull() const
152  {
153  return ((m_maxQueueSize > 0) && (m_queue.size() == m_maxQueueSize));
154  }
155 
156  // assumes that m_queueLock is locked
157  bool queueClosed() const
158  {
159  return m_shutdown || m_queueClosed;
160  }
161 
162  bool finishOffWorkInQueue(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& timeout)
163  {
164  NonRecursiveMutexLock l(m_queueLock);
165  // the pool is in the process of being destroyed
166  if (queueClosed())
167  {
168  BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue is already closed. Why are you trying to shutdown again?");
169  return false;
170  }
171  m_queueClosed = true;
172  BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue closed");
173 
174  if (finishWorkInQueue)
175  {
176  TimeoutTimer timer(timeout);
177  while (m_queue.size() != 0)
178  {
179  if (timer.infinite())
180  {
181  BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting forever for queue to empty");
182  m_queueEmpty.wait(l);
183  }
184  else
185  {
186  BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting w/timout for queue to empty");
187  if (!m_queueEmpty.timedWait(l, timer.asAbsoluteTimeout()))
188  {
189  BLOCXX_POOL_LOG_DEBUG2(m_logger, "Wait timed out. Work in queue will be discarded.");
190  break; // timed out
191  }
192  }
193  }
194  }
195  m_shutdown = true;
196  return true;
197  }
198 
199  virtual void waitForEmptyQueue()
200  {
201  NonRecursiveMutexLock l(m_queueLock);
202  while (m_queue.size() != 0)
203  {
204  BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting for empty queue");
205  m_queueEmpty.wait(l);
206  }
207  BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue empty: the wait is over");
208  }
209 
210  void shutdownThreads(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
211  {
212  TimeoutTimer shutdownTimer(shutdownTimeout);
213  TimeoutTimer dTimer(definitiveCancelTimeout);
214  if (!finishOffWorkInQueue(finishWorkInQueue, shutdownTimer.asAbsoluteTimeout()))
215  {
216  return;
217  }
218 
219  // Wake up any workers so they recheck shutdown flag
220  m_queueNotEmpty.notifyAll();
221  m_queueNotFull.notifyAll();
222 
223  if (!shutdownTimer.infinite())
224  {
225  // Tell all the threads to shutdown
226  for (UInt32 i = 0; i < m_threads.size(); ++i)
227  {
228  BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Calling shutdown on thread %1", i));
229  m_threads[i]->shutdown();
230  }
231 
232  // Wait until shutdownTimeout for the threads to finish
233  Timeout absoluteShutdownTimeout(shutdownTimer.asAbsoluteTimeout());
234  for (UInt32 i = 0; i < m_threads.size(); ++i)
235  {
236  BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Waiting for thread %1 to exit.", i));
237  m_threads[i]->timedWait(absoluteShutdownTimeout);
238  }
239 
240  // Tell all the threads to cooperative cancel
241  for (UInt32 i = 0; i < m_threads.size(); ++i)
242  {
243  BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Calling cooperativeCancel on thread %1", i));
244  m_threads[i]->cooperativeCancel();
245  }
246 
247  if (!dTimer.infinite())
248  {
249  // If any still haven't shut down, definitiveCancel will kill them.
250  Timeout absoluteDefinitiveTimeout(dTimer.asAbsoluteTimeout());
251  for (UInt32 i = 0; i < m_threads.size(); ++i)
252  {
253  BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Calling definitiveCancel on thread %1", i));
254  try
255  {
256  if (!m_threads[i]->definitiveCancel(absoluteDefinitiveTimeout))
257  {
258  BLOCXX_POOL_LOG_FATAL_ERROR(m_logger, Format("Thread %1 was forcibly cancelled.", i));
259  }
260  }
261  catch (CancellationDeniedException& e)
262  {
263  BLOCXX_POOL_LOG_ERROR(m_logger, Format("Caught CanacellationDeniedException: %1 for thread %2. Pool shutdown may hang.", e, i));
264  }
265  }
266  }
267 
268  }
269 
270  // Clean up after the threads and/or wait for them to exit.
271  for (UInt32 i = 0; i < m_threads.size(); ++i)
272  {
273  BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("calling join() on thread %1", i));
274  m_threads[i]->join();
275  BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("join() finished for thread %1", i));
276  }
277  }
278 
279  RunnableRef getWorkFromQueue(bool waitForWork)
280  {
281  NonRecursiveMutexLock l(m_queueLock);
282  while ((m_queue.size() == 0) && (!m_shutdown))
283  {
284  if (waitForWork)
285  {
286  BLOCXX_POOL_LOG_DEBUG3(m_logger, "Waiting for work");
287  m_queueNotEmpty.wait(l);
288  }
289  else
290  {
291  // wait 1 sec for work, to more efficiently handle a stream
292  // of single requests.
293  if (!m_queueNotEmpty.timedWait(l,Timeout::relative(1)))
294  {
295  BLOCXX_POOL_LOG_DEBUG3(m_logger, "No work after 1 sec. I'm not waiting any longer");
296  return RunnableRef();
297  }
298  }
299  }
300  // check to see if a shutdown started while the thread was sleeping
301  if (m_shutdown)
302  {
303  BLOCXX_POOL_LOG_DEBUG(m_logger, "The pool is shutdown, not getting any more work");
304  return RunnableRef();
305  }
306 
307  RunnableRef work = m_queue.front();
308  m_queue.pop_front();
309 
310  // This needs to happen before the call to queueIsFull() because the worker count can affect the result of queueIsFull()
311  incrementWorkerCount();
312  // handle threads waiting in addWork().
313  if (!queueIsFull())
314  {
315  m_queueNotFull.notifyAll();
316  }
317 
318  // handle waiting shutdown thread or callers of waitForEmptyQueue()
319  if (m_queue.size() == 0)
320  {
321  m_queueEmpty.notifyAll();
322  }
323  BLOCXX_POOL_LOG_DEBUG3(m_logger, "A thread got some work to do");
324  return work;
325  }
326 
327  // hooks for DynamicSizeNoQueuePoolImpl subclass. Yes this is a horrible design, it just saves code duplication.
328  virtual void incrementWorkerCount()
329  {
330  }
331 
332  virtual void decrementWorkerCount()
333  {
334  }
335 
336  // pool characteristics
338  // pool state
339  Array<ThreadRef> m_threads;
340  std::deque<RunnableRef> m_queue;
343  // pool synchronization
344  NonRecursiveMutex m_queueLock;
345  Condition m_queueNotFull;
346  Condition m_queueEmpty;
347  Condition m_queueNotEmpty;
349  String m_poolName;
350 };
351 class FixedSizePoolImpl : public CommonPoolImpl
352 {
353 public:
354  FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
355  : CommonPoolImpl(maxQueueSize, logger, poolName)
356  {
357  // create the threads and start them up.
358  m_threads.reserve(numThreads);
359  for (UInt32 i = 0; i < numThreads; ++i)
360  {
361  m_threads.push_back(ThreadRef(new FixedSizePoolWorkerThread(this)));
362  }
363  for (UInt32 i = 0; i < numThreads; ++i)
364  {
365  try
366  {
367  m_threads[i]->start();
368  }
369  catch (ThreadException& e)
370  {
371  BLOCXX_POOL_LOG_ERROR(m_logger, Format("Failed to start thread #%1: %2", i, e));
372  m_threads.resize(i); // remove non-started threads
373  // shutdown the rest
374  this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
375  throw;
376  }
377  }
378  BLOCXX_POOL_LOG_DEBUG(m_logger, "Threads are started and ready to go");
379  }
380  // returns true if work is placed in the queue to be run and false if not.
381  virtual bool addWork(const RunnableRef& work, const Timeout& timeout)
382  {
383  // check precondition: work != NULL
384  if (!work)
385  {
386  BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
387  return false;
388  }
389 
390  NonRecursiveMutexLock l(m_queueLock);
391  TimeoutTimer timer(timeout);
392  while ( queueIsFull() && !queueClosed() )
393  {
394  BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
395  if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout()))
396  {
397  // timed out
398  BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full and timeout expired. Not adding work and returning false");
399  return false;
400  }
401  }
402 
403  // the pool is in the process of being destroyed
404  if (queueClosed())
405  {
406  BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
407  return false;
408  }
409 
410  m_queue.push_back(work);
411 
412  // if the queue was empty, there may be workers just sitting around, so we need to wake them up!
413  if (m_queue.size() == 1)
414  {
415  BLOCXX_POOL_LOG_DEBUG3(m_logger, "Waking up sleepy workers");
416  m_queueNotEmpty.notifyAll();
417  }
418 
419  BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
420  return true;
421  }
422 
423  // we keep this around so it can be called in the destructor
424  virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
425  {
426  shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
427  }
428  virtual ~FixedSizePoolImpl()
429  {
430  // can't let exception escape the destructor
431  try
432  {
433  // don't need a lock here, because we're the only thread left.
434  if (!queueClosed())
435  {
436  // Make sure the pool is shutdown.
437  // Specify which shutdown() we want so we don't get undefined behavior calling a virtual function from the destructor.
438  this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
439  }
440  }
441  catch (...)
442  {
443  }
444  }
445 private:
446  friend class FixedSizePoolWorkerThread;
447 };
448 void runRunnable(const RunnableRef& work)
449 {
450  // don't let exceptions escape, we need to keep going, except for ThreadCancelledException, in which case we need to stop.
451  try
452  {
453  work->run();
454  }
455  catch (ThreadCancelledException&)
456  {
457  throw;
458  }
459  catch (Exception& ex)
460  {
461 #ifdef BLOCXX_DEBUG
462  std::clog << "!!! Exception: " << ex.type() << " caught in ThreadPool worker: " << ex << std::endl;
463 #endif
464  Logger logger(COMPONENT_NAME);
465  BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in ThreadPool worker: %1", ex));
466  }
467  catch(std::exception& ex)
468  {
469 #ifdef BLOCXX_DEBUG
470  std::clog << "!!! std::exception what = \"" << ex.what() << "\" caught in ThreadPool worker" << std::endl;
471 #endif
472  Logger logger(COMPONENT_NAME);
473  BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in ThreadPool worker: %1", ex.what()));
474  }
475  catch (...)
476  {
477 #ifdef BLOCXX_DEBUG
478  std::clog << "!!! Unknown Exception caught in ThreadPool worker" << std::endl;
479 #endif
480  Logger logger(COMPONENT_NAME);
481  BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in ThreadPool worker.");
482  }
483 }
484 Int32 FixedSizePoolWorkerThread::run()
485 {
486  while (true)
487  {
488  // check queue for work
489  RunnableRef work = m_thePool->getWorkFromQueue(true);
490  if (!work)
491  {
492  return 0;
493  }
494  // save this off so it can be cancelled by another thread.
495  {
496  MutexLock lock(m_guard);
497  m_currentRunnable = work;
498  }
499  runRunnable(work);
500  {
501  MutexLock lock(m_guard);
502  m_currentRunnable = 0;
503  }
504  }
505  return 0;
506 }
507 class DynamicSizePoolImpl;
509 class DynamicSizePoolWorkerThread : public Thread
510 {
511 public:
512  DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
513  : Thread()
514  , m_thePool(thePool)
515  {
516  }
517  virtual Int32 run();
518 private:
519  virtual void doShutdown()
520  {
521  MutexLock lock(m_guard);
522  if (m_currentRunnable)
523  {
524  m_currentRunnable->doShutdown();
525  }
526  }
527  virtual void doCooperativeCancel()
528  {
529  MutexLock lock(m_guard);
530  if (m_currentRunnable)
531  {
532  m_currentRunnable->doCooperativeCancel();
533  }
534  }
535  virtual void doDefinitiveCancel()
536  {
537  MutexLock lock(m_guard);
538  if (m_currentRunnable)
539  {
540  m_currentRunnable->doDefinitiveCancel();
541  }
542  }
543 
544  DynamicSizePoolImpl* m_thePool;
545 
546  Mutex m_guard;
548 
549  // non-copyable
550  DynamicSizePoolWorkerThread(const DynamicSizePoolWorkerThread&);
551  DynamicSizePoolWorkerThread& operator=(const DynamicSizePoolWorkerThread&);
552 };
554 class DynamicSizePoolImpl : public CommonPoolImpl
555 {
556 public:
557  DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
558  : CommonPoolImpl(maxQueueSize, logger, poolName)
559  , m_maxThreads(maxThreads)
560  {
561  }
562  // returns true if work is placed in the queue to be run and false if not.
563  virtual bool addWork(const RunnableRef& work, const Timeout& timeout)
564  {
565  // check precondition: work != NULL
566  if (!work)
567  {
568  BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
569  return false;
570  }
571  NonRecursiveMutexLock l(m_queueLock);
572 
573  // the pool is in the process of being destroyed
574  if (queueClosed())
575  {
576  BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
577  return false;
578  }
579 
580  // Can't touch m_threads until *after* we check for the queue being closed, shutdown
581  // requires that m_threads not change after the queue is closed.
582  // Now clean up dead threads (before we add the new one, so we don't need to check it)
583  size_t i = 0;
584  while (i < m_threads.size())
585  {
586  if (!m_threads[i]->isRunning())
587  {
588  BLOCXX_POOL_LOG_DEBUG3(m_logger, Format("Thread %1 is finished. Cleaning up it's remains.", i));
589  m_threads[i]->join();
590  m_threads.remove(i);
591  }
592  else
593  {
594  ++i;
595  }
596  }
597 
598  TimeoutTimer timer(timeout);
599  while ( queueIsFull() && !queueClosed() )
600  {
601  BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
602  if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout()))
603  {
604  // timed out
605  BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full and timeout expired. Not adding work and returning false");
606  return false;
607  }
608  }
609 
610  // The previous loop could have ended because a spot opened in the queue or it was closed. Check for the close.
611  if (queueClosed())
612  {
613  BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
614  return false;
615  }
616 
617  m_queue.push_back(work);
618 
619  BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
620 
621  // Release the lock and wake up a thread waiting for work in the queue
622  // This bit of code is a race condition with the thread,
623  // but if we acquire the lock again before it does, then we
624  // properly handle that case. The only disadvantage if we win
625  // the "race" is that we'll unnecessarily start a new thread.
626  // In practice it works all the time.
627  l.release();
628  m_queueNotEmpty.notifyOne();
629  Thread::yield(); // give the thread a chance to run
630  l.lock();
631 
632  // Start up a new thread to handle the work in the queue.
633  if (!m_queue.empty() && m_threads.size() < m_maxThreads)
634  {
635  ThreadRef theThread(new DynamicSizePoolWorkerThread(this));
636  m_threads.push_back(theThread);
637  BLOCXX_POOL_LOG_DEBUG3(m_logger, "About to start a new thread");
638  try
639  {
640  theThread->start();
641  }
642  catch (ThreadException& e)
643  {
644  BLOCXX_POOL_LOG_ERROR(m_logger, Format("Failed to start thread: %1", e));
645  m_threads.pop_back();
646  throw;
647  }
648  BLOCXX_POOL_LOG_DEBUG2(m_logger, "New thread started");
649  }
650  return true;
651  }
652 
653  // we keep this around so it can be called in the destructor
654  virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
655  {
656  shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
657  }
658  virtual ~DynamicSizePoolImpl()
659  {
660  // can't let exception escape the destructor
661  try
662  {
663  // don't need a lock here, because we're the only thread left.
664  if (!queueClosed())
665  {
666  // Make sure the pool is shutdown.
667  // Specify which shutdown() we want so we don't get undefined behavior calling a virtual function from the destructor.
668  this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
669  }
670  }
671  catch (...)
672  {
673  }
674  }
675 
676 protected:
677  UInt32 getMaxThreads() const
678  {
679  return m_maxThreads;
680  }
681 
682 private:
683  // pool characteristics
684  UInt32 m_maxThreads;
685  friend class DynamicSizePoolWorkerThread;
686 };
687 Int32 DynamicSizePoolWorkerThread::run()
688 {
689  while (true)
690  {
691  // check queue for work
692  RunnableRef work = m_thePool->getWorkFromQueue(false);
693  if (!work)
694  {
695  return 0;
696  }
697  // save this off so it can be cancelled by another thread.
698  {
699  MutexLock lock(m_guard);
700  m_currentRunnable = work;
701  }
702  runRunnable(work);
703  m_thePool->decrementWorkerCount();
704  {
705  MutexLock lock(m_guard);
706  m_currentRunnable = 0;
707  }
708  }
709  return 0;
710 }
711 
713 class DynamicSizeNoQueuePoolImpl : public DynamicSizePoolImpl
714 {
715 public:
716  DynamicSizeNoQueuePoolImpl(UInt32 maxThreads, const Logger& logger, const String& poolName)
717  : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName) // allow queue in superclass, but prevent it from having any backlog
718  , m_workingThreads(0)
719  {
720  }
721 
722  virtual ~DynamicSizeNoQueuePoolImpl()
723  {
724  }
725 
726  virtual void incrementWorkerCount()
727  {
729  }
730 
731  virtual void decrementWorkerCount()
732  {
733  NonRecursiveMutexLock lock(m_queueLock);
735  // wake up any threads waiting to start some work
736  m_queueNotFull.notifyAll();
737  }
738 
739  // One difference between this class and DynamicSizePoolImpl is that we change the definition of queueIsFull()
740  virtual bool queueIsFull() const
741  {
742  // don't let the queue get bigger than the number of free threads. This effectively prevents work from being
743  // queued up which can't be immediately serviced.
744  size_t freeThreads = getMaxThreads() - AtomicGet(m_workingThreads);
745  return (freeThreads <= m_queue.size());
746  }
747 
748 private:
749  // Keep track of the number of threads doing work. Protected by m_guard
751 
752 };
753 
754 } // end anonymous namespace
756 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const String& poolName)
757 {
758  NullLogger logger;
759  switch (poolType)
760  {
761  case FIXED_SIZE:
762  m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
763  break;
764  case DYNAMIC_SIZE:
765  m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
766  break;
768  m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
769  break;
770  }
771 }
773 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
774 {
775  switch (poolType)
776  {
777  case FIXED_SIZE:
778  m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
779  break;
780  case DYNAMIC_SIZE:
781  m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
782  break;
784  m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
785  break;
786  }
787 }
790 {
791  return m_impl->addWork(work, Timeout::infinite);
792 }
795 {
796  return m_impl->addWork(work, Timeout::relative(0));
797 }
799 bool ThreadPool::tryAddWork(const RunnableRef& work, const Timeout& timeout)
800 {
801  return m_impl->addWork(work, timeout);
802 }
804 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
805 {
806  m_impl->shutdown(finishWorkInQueue, Timeout::relative(shutdownSecs), Timeout::relative(shutdownSecs));
807 }
809 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, const Timeout& timeout)
810 {
811  m_impl->shutdown(finishWorkInQueue, timeout, timeout);
812 }
814 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
815 {
816  m_impl->shutdown(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
817 }
820 {
822 }
825 {
826 }
830  , m_impl(x.m_impl)
831 {
832 }
835 {
836  m_impl = x.m_impl;
837  return *this;
838 }
839 
840 } // end namespace BLOCXX_NAMESPACE
841