blocxx
WaitpidThreadFix.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2 * Copyright (C) 2005, Quest Software, Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are met:
6 *
7 * * Redistributions of source code must retain the above copyright notice,
8 * this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the Network Associates, nor Quest Software, Inc., nor the
13 * names of its contributors or employees may be used to endorse or promote
14 * products derived from this software without specific prior written
15 * permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 *******************************************************************************/
29 
34 #include "blocxx/Thread.hpp"
36 #include "blocxx/Exec.hpp"
38 #include "blocxx/ThreadOnce.hpp"
41 #include "blocxx/Condition.hpp"
42 #include "blocxx/Reference.hpp"
44 #include <queue>
45 #include <sys/types.h>
46 #ifndef BLOCXX_WIN32
47 #include <sys/wait.h>
48 #endif
49 
50 using namespace blocxx;
51 
52 namespace BLOCXX_NAMESPACE
53 {
54 
55 namespace
56 {
57  bool g_useWaitpidThreadFix =
58 #ifdef BLOCXX_WAITPID_THREADING_PROBLEM
59  true;
60 #else
61  false;
62 #endif
63 
64  class ProcessThread;
65 
66  OnceFlag g_initThreadGuard = BLOCXX_ONCE_INIT;
67  ProcessThread* g_processThread = 0;
68 
69  void initThread();
70 
71  Thread_t getWorkerThreadId();
72 
73 }
74 
76 {
77  bool rv = g_useWaitpidThreadFix;
78  g_useWaitpidThreadFix = enabled;
79  return rv;
80 }
81 
83 {
84  if (!g_useWaitpidThreadFix)
85  {
86  return false;
87  }
88  Thread_t currThread = ThreadImpl::currentThread();
89  Thread_t workerThread = getWorkerThreadId();
90 
91  // If we are already in the WaitpidThreadFix worker thread
92  // then we dont want to cause an infinite loop
93  if (ThreadImpl::sameThreads(currThread, workerThread))
94  {
95  return false;
96  }
97  return true;
98 }
99 
100 namespace
101 {
102  typedef Reference<Exception> ExceptionPtr;
103 
104 
105  class WorkSignal
106  {
107  public:
108  WorkSignal()
109  : m_signal(false)
110  {
111  }
112 
113  ~WorkSignal()
114  {
115  }
116 
117  void signal()
118  {
119  NonRecursiveMutexLock lock(m_mutex);
120  m_signal = true;
121  m_cond.notifyAll();
122  }
123 
124  void waitForSignal()
125  {
126  NonRecursiveMutexLock lock(m_mutex);
127 
128  while(!m_signal)
129  {
130  m_cond.wait(lock);
131  }
132  }
133 
134  private:
135  bool m_signal;
136  Condition m_cond;
137  NonRecursiveMutex m_mutex;
138  };
139 
140  //***************************************************************************
141  // - This base class represents the work to be performed by ControlledAccessThread
142  // - This class and all derived classes must be thread safe
143  class WorkItem : public IntrusiveCountableBase
144  {
145  public:
146  virtual ~WorkItem()
147  {
148  }
149 
150  virtual void doWork() = 0;
151 
152  void signalDone()
153  {
154  m_doneSig.signal();
155  }
156 
157  void saveException(Exception* err)
158  {
159  NonRecursiveMutexLock lock(m_errMutex);
160  m_err = err;
161  }
162 
163  Exception* getException()
164  {
165  NonRecursiveMutexLock lock(m_errMutex);
166  return m_err.getPtr();
167  }
168 
169  protected:
170  ExceptionPtr m_err;
171  NonRecursiveMutex m_errMutex;
172  WorkSignal m_doneSig;
173  };
174 
175 
176  //***************************************************************************
177  class SpawnWorkItem : public WorkItem
178  {
179  public:
180  SpawnWorkItem(char const * execPath, char const * const argv[],
181  char const * const envp[], Exec::PreExec & preExec)
182  : m_execPath(execPath)
183  , m_argv(argv)
184  , m_envp(envp)
185  , m_preExec(preExec)
186  {
187  }
188 
189  virtual ~SpawnWorkItem()
190  {
191  }
192 
193  virtual void doWork()
194  {
195  NonRecursiveMutexLock lock(m_resultMutex);
197  }
198 
199  ProcessRef waitTillDone()
200  {
201  m_doneSig.waitForSignal();
202 
203  NonRecursiveMutexLock lock(m_resultMutex);
204  return m_result;
205  }
206 
207  protected:
209  NonRecursiveMutex m_resultMutex;
210 
211  const char * m_execPath;
212  const char * const * m_argv;
213  const char * const * m_envp;
214  Exec::PreExec& m_preExec;
215  };
216 
217 
218  //***************************************************************************
219  class WaitpidWorkItem : public WorkItem
220  {
221  public:
222  WaitpidWorkItem(const ::pid_t& pid)
223  : m_pid(pid)
224  {
225  }
226 
227  virtual ~WaitpidWorkItem()
228  {
229  }
230 
231  virtual void doWork()
232  {
233  NonRecursiveMutexLock lock(m_resultMutex);
235  }
236 
237  Process::Status waitTillDone()
238  {
239  m_doneSig.waitForSignal();
240 
241  NonRecursiveMutexLock lock(m_resultMutex);
242  return m_result;
243  }
244 
245 
246  protected:
247  Process::Status m_result;
248  NonRecursiveMutex m_resultMutex;
249 
250  const ::pid_t& m_pid;
251  };
252 
253  typedef IntrusiveReference<SpawnWorkItem> SpawnWorkItemPtr;
254  typedef IntrusiveReference<WaitpidWorkItem> WaitpidWorkItemPtr;
255 
256  class WorkQueue
257  {
258  public:
259  WorkQueue() {}
260  virtual ~WorkQueue() {}
261 
262  WorkItem* getWork()
263  {
264  NonRecursiveMutexLock lock(m_workMutex);
265 
266  // Wait for some work to show up
267  // by checking the predicate in a loop
268  while(m_work.empty())
269  {
270  m_workNotEmpty.wait(lock);
271  }
272 
273  WorkItem* newWork = m_work.front();
274  m_work.pop();
275 
276  return newWork;
277  }
278 
279  void addWork(WorkItem* newWork)
280  {
281  NonRecursiveMutexLock lock(m_workMutex);
282  m_work.push(newWork);
283  m_workNotEmpty.notifyAll();
284  }
285 
286  private:
287  std::queue<WorkItem*> m_work;
288  Condition m_workNotEmpty;
289  NonRecursiveMutex m_workMutex;
290  };
291 
292  //***************************************************************************
293  // This is the worker thread that launches processes and/or calls
294  // waitpid on them when BLOCXX_WAITPID_THREADING_PROBLEM is defined
295  //***************************************************************************
296  class ProcessThread : public Thread
297  {
298  public:
299  ProcessThread();
300  virtual ~ProcessThread();
301 
302  virtual Int32 run();
303 
305  char const * exec_path,
306  char const * const argv[],
307  char const * const envp[],
308  Exec::PreExec & pre_exec
309  );
310 
311  Process::Status waitPid(const ProcId& pid);
312 
313  protected:
314  WorkQueue m_workQueue;
315 
316  NonRecursiveMutex m_idMutex;
317  };
318 
319  ProcessThread::ProcessThread()
320  {
321  }
322 
323  ProcessThread::~ProcessThread()
324  {
325  }
326 
327  // This function will never exit until the process terminates itself.
328  Int32 ProcessThread::run()
329  {
330  // Infinite loop.
331  while(true)
332  {
333  WorkItem* newWork;
334  newWork = m_workQueue.getWork();
335 
336  try
337  {
338  newWork->doWork();
339  }
340  catch(Exception& e)
341  {
342  newWork->saveException(e.clone());
343  }
344  newWork->signalDone();
345  }
346 
347  // A return (never reached) to make various compilers happy.
348  return 0;
349  }
350 
351  ProcessRef ProcessThread::spawn(char const * exec_path, char const * const argv[],
352  char const * const envp[], Exec::PreExec & pre_exec)
353  {
354  SpawnWorkItemPtr newWork(new SpawnWorkItem(exec_path, argv, envp, pre_exec));
355  m_workQueue.addWork(newWork.getPtr());
356 
357  ProcessRef result = newWork->waitTillDone();
358 
359  Exception* err = newWork->getException();
360  if(err != 0)
361  {
362  err->rethrow();
363  }
364 
365  return result;
366  }
367 
368  Process::Status ProcessThread::waitPid(const ProcId& pid)
369  {
370  WaitpidWorkItemPtr newWork(new WaitpidWorkItem(pid));
371  m_workQueue.addWork(newWork.getPtr());
372 
373  Process::Status result = newWork->waitTillDone();
374 
375  Exception* err = newWork->getException();
376  if(err != 0)
377  {
378  err->rethrow();
379  }
380 
381  return result;
382  }
383 
384 
385  void initThread()
386  {
387  // create the worker thread
388  g_processThread = new ProcessThread();
389  g_processThread->start();
390  }
391 
392  Thread_t getWorkerThreadId()
393  {
394  callOnce(g_initThreadGuard, initThread);
395  return g_processThread->getId();
396  }
397 
398 } // namespace (anon)
399 
400 
402  char const * const argv[], char const * const envp[], Exec::PreExec & pre_exec)
403 {
404  callOnce(g_initThreadGuard, initThread);
405  return g_processThread->spawn(exec_path, argv, envp, pre_exec);
406 }
407 
409 {
410  callOnce(g_initThreadGuard, initThread);
411  return g_processThread->waitPid(pid);
412 }
413 
414 } //namespace BLOCXX_NAMESPACE
415