38 #include "blocxx/BLOCXX_config.h"
59 namespace BLOCXX_NAMESPACE
65 #define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
66 #define BLOCXX_POOL_LOG_DEBUG2(logger, arg) do { BLOCXX_LOG_DEBUG2(logger, m_poolName + ": " + arg); } while (0)
67 #define BLOCXX_POOL_LOG_DEBUG3(logger, arg) do { BLOCXX_LOG_DEBUG3(logger, m_poolName + ": " + arg); } while (0)
68 #define BLOCXX_POOL_LOG_ERROR(logger, arg) do { BLOCXX_LOG_ERROR(logger, m_poolName + ": " + arg); } while (0)
69 #define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
87 class FixedSizePoolImpl;
89 class FixedSizePoolWorkerThread :
public Thread
92 FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
99 virtual void doShutdown()
107 virtual void doCooperativeCancel()
115 virtual void doDefinitiveCancel()
130 FixedSizePoolWorkerThread(
const FixedSizePoolWorkerThread&);
131 FixedSizePoolWorkerThread& operator=(
const FixedSizePoolWorkerThread&);
134 class CommonPoolImpl :
public ThreadPoolImpl
137 CommonPoolImpl(UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
146 virtual ~CommonPoolImpl()
151 virtual bool queueIsFull()
const
157 bool queueClosed()
const
174 if (finishWorkInQueue)
176 TimeoutTimer timer(timeout);
179 if (timer.infinite())
187 if (!
m_queueEmpty.timedWait(l, timer.asAbsoluteTimeout()))
199 virtual void waitForEmptyQueue()
212 TimeoutTimer shutdownTimer(shutdownTimeout);
213 TimeoutTimer dTimer(definitiveCancelTimeout);
214 if (!finishOffWorkInQueue(finishWorkInQueue, shutdownTimer.asAbsoluteTimeout()))
223 if (!shutdownTimer.infinite())
233 Timeout absoluteShutdownTimeout(shutdownTimer.asAbsoluteTimeout());
237 m_threads[
i]->timedWait(absoluteShutdownTimeout);
247 if (!dTimer.infinite())
250 Timeout absoluteDefinitiveTimeout(dTimer.asAbsoluteTimeout());
256 if (!
m_threads[
i]->definitiveCancel(absoluteDefinitiveTimeout))
261 catch (CancellationDeniedException& e)
311 incrementWorkerCount();
328 virtual void incrementWorkerCount()
332 virtual void decrementWorkerCount()
351 class FixedSizePoolImpl :
public CommonPoolImpl
354 FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
355 : CommonPoolImpl(maxQueueSize, logger, poolName)
359 for (UInt32
i = 0;
i < numThreads; ++
i)
363 for (UInt32
i = 0;
i < numThreads; ++
i)
369 catch (ThreadException& e)
381 virtual bool addWork(
const RunnableRef& work,
const Timeout& timeout)
391 TimeoutTimer timer(timeout);
392 while ( queueIsFull() && !queueClosed() )
426 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
428 virtual ~FixedSizePoolImpl()
446 friend class FixedSizePoolWorkerThread;
455 catch (ThreadCancelledException&)
459 catch (Exception& ex)
462 std::clog <<
"!!! Exception: " << ex.type() <<
" caught in ThreadPool worker: " << ex << std::endl;
464 Logger logger(COMPONENT_NAME);
465 BLOCXX_LOG_ERROR(logger, Format(
"!!! Exception caught in ThreadPool worker: %1", ex));
467 catch(std::exception& ex)
470 std::clog <<
"!!! std::exception what = \"" << ex.what() <<
"\" caught in ThreadPool worker" << std::endl;
472 Logger logger(COMPONENT_NAME);
473 BLOCXX_LOG_ERROR(logger, Format(
"!!! std::exception caught in ThreadPool worker: %1", ex.what()));
478 std::clog <<
"!!! Unknown Exception caught in ThreadPool worker" << std::endl;
480 Logger logger(COMPONENT_NAME);
481 BLOCXX_LOG_ERROR(logger,
"!!! Unknown Exception caught in ThreadPool worker.");
484 Int32 FixedSizePoolWorkerThread::run()
507 class DynamicSizePoolImpl;
509 class DynamicSizePoolWorkerThread :
public Thread
512 DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
519 virtual void doShutdown()
527 virtual void doCooperativeCancel()
535 virtual void doDefinitiveCancel()
550 DynamicSizePoolWorkerThread(
const DynamicSizePoolWorkerThread&);
551 DynamicSizePoolWorkerThread& operator=(
const DynamicSizePoolWorkerThread&);
554 class DynamicSizePoolImpl :
public CommonPoolImpl
557 DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
558 : CommonPoolImpl(maxQueueSize, logger, poolName)
563 virtual bool addWork(
const RunnableRef& work,
const Timeout& timeout)
598 TimeoutTimer timer(timeout);
599 while ( queueIsFull() && !queueClosed() )
635 ThreadRef theThread(
new DynamicSizePoolWorkerThread(
this));
642 catch (ThreadException& e)
656 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
658 virtual ~DynamicSizePoolImpl()
677 UInt32 getMaxThreads()
const
685 friend class DynamicSizePoolWorkerThread;
687 Int32 DynamicSizePoolWorkerThread::run()
713 class DynamicSizeNoQueuePoolImpl :
public DynamicSizePoolImpl
716 DynamicSizeNoQueuePoolImpl(UInt32 maxThreads,
const Logger& logger,
const String& poolName)
717 : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName)
722 virtual ~DynamicSizeNoQueuePoolImpl()
726 virtual void incrementWorkerCount()
731 virtual void decrementWorkerCount()
740 virtual bool queueIsFull()
const
745 return (freeThreads <=
m_queue.size());
762 m_impl =
new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
765 m_impl =
new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
768 m_impl =
new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
778 m_impl =
new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
781 m_impl =
new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
784 m_impl =
new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
816 m_impl->
shutdown(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);