blocxx
Select.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
3 * Copyright (C) 2006, Novell, Inc. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of
14 * Vintela, Inc.,
15 * nor Novell, Inc.,
16 * nor the names of its contributors or employees may be used to
17 * endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 *******************************************************************************/
32 
33 
39 #include "blocxx/BLOCXX_config.h"
40 #include "blocxx/Select.hpp"
41 #include "blocxx/AutoPtr.hpp"
42 #include "blocxx/Assertion.hpp"
43 #include "blocxx/Thread.hpp" // for testCancel()
44 #include "blocxx/TimeoutTimer.hpp"
46 
47 #if defined(BLOCXX_WIN32)
48 #include <cassert>
49 #endif
50 
51 extern "C"
52 {
53 
54 #ifndef BLOCXX_WIN32
55  #ifdef BLOCXX_HAVE_SYS_EPOLL_H
56  #include <sys/epoll.h>
57  #endif
58  #if defined (BLOCXX_HAVE_SYS_POLL_H)
59  #include <sys/poll.h>
60  #endif
61  #if defined (BLOCXX_HAVE_SYS_SELECT_H)
62  #include <sys/select.h>
63  #endif
64 #endif
65 
66 #ifdef BLOCXX_HAVE_SYS_TIME_H
67  #include <sys/time.h>
68 #endif
69 
70 #include <sys/types.h>
71 
72 #ifdef BLOCXX_HAVE_UNISTD_H
73  #include <unistd.h>
74 #endif
75 
76 #include <errno.h>
77 }
78 
79 namespace BLOCXX_NAMESPACE
80 {
81 
82 namespace Select
83 {
84 
85 namespace
86 {
87  const float LOOP_TIMEOUT = 10.0;
88 }
90 // deprecated in 4.0.0
91 int
92 selectRW(SelectObjectArray& selarray, UInt32 ms)
93 {
94  return selectRW(selarray, Timeout::relative(static_cast<float>(ms) * 1000));
95 }
96 
97 #if defined(BLOCXX_WIN32)
98 
99 int
100 selectRW(SelectObjectArray& selarray, const Timeout& timeout)
101 {
102  int rc;
103  size_t hcount = static_cast<DWORD>(selarray.size());
104  AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]);
105 
106  size_t handleidx = 0;
107  for (size_t i = 0; i < selarray.size(); i++, handleidx++)
108  {
109  if(selarray[i].s.isSocket && selarray[i].s.networkevents)
110  {
111  ::WSAEventSelect(selarray[i].s.sockfd,
112  selarray[i].s.event, selarray[i].s.networkevents);
113  }
114 
115  hdls[handleidx] = selarray[i].s.event;
116  }
117 
118  TimeoutTimer timer(timeout);
119  timer.start();
120  DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timer.asDWORDMs());
121 
122  assert(cc != WAIT_ABANDONED);
123 
124  switch (cc)
125  {
126  case WAIT_FAILED:
128  break;
129  case WAIT_TIMEOUT:
131  break;
132  default:
133  rc = cc - WAIT_OBJECT_0;
134 
135  // If this is a socket, set it back to
136  // blocking mode
137  if(selarray[rc].s.isSocket)
138  {
139  if(selarray[rc].s.networkevents
140  && selarray[rc].s.doreset == false)
141  {
142  ::WSAEventSelect(selarray[rc].s.sockfd,
143  selarray[rc].s.event, selarray[rc].s.networkevents);
144  }
145  else
146  {
147  // Set socket back to blocking
148  ::WSAEventSelect(selarray[rc].s.sockfd,
149  selarray[rc].s.event, 0);
150  u_long ioctlarg = 0;
151  ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
152  }
153  }
154  break;
155  }
156 
157  if( rc < 0 )
158  return rc;
159 
160  int availableCount = 0;
161  for (size_t i = 0; i < selarray.size(); i++)
162  {
163  if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
164  {
165  if( selarray[i].waitForRead )
166  selarray[i].readAvailable = true;
167  if( selarray[i].waitForWrite )
168  selarray[i].writeAvailable = true;
169  ++availableCount;
170  }
171  else
172  {
173  selarray[i].readAvailable = false;
174  selarray[i].writeAvailable = false;
175  }
176  }
177  return availableCount;
178 }
179 
180 
181 #else
182 
184 // epoll version
185 int
186 selectRWEpoll(SelectObjectArray& selarray, const Timeout& timeout)
187 {
188 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
189  int ecc = 0;
190  AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]);
191  AutoDescriptor epfd(epoll_create(selarray.size()));
192  if(epfd.get() == -1)
193  {
194  if (errno == ENOSYS) // kernel doesn't support it
195  {
196  return SELECT_NOT_IMPLEMENTED;
197  }
198  // Need to return something else?
199  return Select::SELECT_ERROR;
200  }
201 
202  UInt32 const read_events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
203  UInt32 const write_events = EPOLLOUT | EPOLLERR | EPOLLHUP;
204  for (size_t i = 0; i < selarray.size(); i++)
205  {
206  BLOCXX_ASSERT(selarray[i].s >= 0);
207  selarray[i].readAvailable = false;
208  selarray[i].writeAvailable = false;
209  selarray[i].wasError = false;
210  events[i].data = epoll_data_t(); // zero-init to make valgrind happy
211  events[i].data.u32 = i;
212  events[i].events = 0;
213  if(selarray[i].waitForRead)
214  {
215  events[i].events |= read_events;
216  }
217  if(selarray[i].waitForWrite)
218  {
219  events[i].events |= write_events;
220  }
221 
222  if(epoll_ctl(epfd.get(), EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
223  {
224  return errno == EPERM ? SELECT_NOT_IMPLEMENTED : SELECT_ERROR;
225  }
226  }
227 
228  // here we spin checking for thread cancellation every so often.
229 
230  TimeoutTimer timer(timeout);
231  timer.start();
232  int savedErrno;
233  do
234  {
236  const float maxWaitSec = LOOP_TIMEOUT;
237  ecc = epoll_wait(epfd.get(), events.get(), selarray.size(), timer.asIntMs(maxWaitSec));
238  savedErrno = errno;
239  if (ecc < 0 && errno == EINTR)
240  {
241  ecc = 0;
242  errno = 0;
244  }
245  timer.loop();
246  } while ((ecc == 0) && !timer.expired());
247 
248  if (ecc < 0)
249  {
250  errno = savedErrno;
251  return Select::SELECT_ERROR;
252  }
253  if (ecc == 0)
254  {
255  return Select::SELECT_TIMEOUT;
256  }
257 
258  for(int i = 0; i < ecc; i++)
259  {
260  SelectObject & so = selarray[events[i].data.u32];
261  so.readAvailable = so.waitForRead && (events[i].events & read_events);
262  so.writeAvailable = so.waitForWrite && (events[i].events & write_events);
263  }
264 
265  return ecc;
266 #else
267  return SELECT_NOT_IMPLEMENTED;
268 #endif
269 }
270 
272 // poll() version
273 int
274 selectRWPoll(SelectObjectArray& selarray, const Timeout& timeout)
275 {
276 #if defined (BLOCXX_HAVE_SYS_POLL_H)
277  int rc = 0;
278 
279  AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]);
280 
281  // here we spin checking for thread cancellation every so often.
282  TimeoutTimer timer(timeout);
283  timer.start();
284 
285  int savedErrno;
286  do
287  {
288  for (size_t i = 0; i < selarray.size(); i++)
289  {
290  BLOCXX_ASSERT(selarray[i].s >= 0);
291  selarray[i].readAvailable = false;
292  selarray[i].writeAvailable = false;
293  selarray[i].wasError = false;
294  pfds[i].revents = 0;
295  pfds[i].fd = selarray[i].s;
296  pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
297  if(selarray[i].waitForWrite)
298  pfds[i].events |= POLLOUT;
299  }
300 
302  const float maxWaitSec = LOOP_TIMEOUT;
303  rc = ::poll(pfds.get(), selarray.size(), timer.asIntMs(maxWaitSec));
304  savedErrno = errno;
305  if (rc < 0 && errno == EINTR)
306  {
307  rc = 0;
308  errno = 0;
310 #ifdef BLOCXX_NETWARE
311  // When the NetWare server is shutting down, select will
312  // set errno to EINTR on return. If this thread does not
313  // yield control (cooperative multitasking) then we end
314  // up in a very tight loop and get a CPUHog server abbend.
315  pthread_yield();
316 #endif
317  }
318 
319  timer.loop();
320  } while ((rc == 0) && !timer.expired());
321 
322  if (rc < 0)
323  {
324  errno = savedErrno;
325  return Select::SELECT_ERROR;
326  }
327  if (rc == 0)
328  {
329  return Select::SELECT_TIMEOUT;
330  }
331  for (size_t i = 0; i < selarray.size(); i++)
332  {
333  if (pfds[i].revents & (POLLERR | POLLNVAL))
334  {
335  selarray[i].wasError = true;
336  }
337 
338  if(selarray[i].waitForRead)
339  {
340  selarray[i].readAvailable = (pfds[i].revents &
341  (POLLIN | POLLPRI | POLLHUP));
342  }
343 
344  if(selarray[i].waitForWrite)
345  {
346  selarray[i].writeAvailable = (pfds[i].revents &
347  (POLLOUT | POLLHUP));
348  }
349  }
350 
351  return rc;
352 #else
353  return SELECT_NOT_IMPLEMENTED;
354 #endif
355 }
357 // ::select() version
358 int
359 selectRWSelect(SelectObjectArray& selarray, const Timeout& timeout)
360 {
361 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
362  int rc = 0;
363  fd_set ifds;
364  fd_set ofds;
365 
366  // here we spin checking for thread cancellation every so often.
367  TimeoutTimer timer(timeout);
368  timer.start();
369 
370  int savedErrno;
371  do
372  {
373  int maxfd = 0;
374  FD_ZERO(&ifds);
375  FD_ZERO(&ofds);
376  for (size_t i = 0; i < selarray.size(); ++i)
377  {
378  int fd = selarray[i].s;
379  BLOCXX_ASSERT(fd >= 0);
380  if (maxfd < fd)
381  {
382  maxfd = fd;
383  }
384  if (fd < 0 || fd >= FD_SETSIZE)
385  {
386  errno = EINVAL;
387  return Select::SELECT_ERROR;
388  }
389  if (selarray[i].waitForRead)
390  {
391  FD_SET(fd, &ifds);
392  }
393  if (selarray[i].waitForWrite)
394  {
395  FD_SET(fd, &ofds);
396  }
397  }
398 
400  struct timeval tv;
401  const float maxWaitSec = LOOP_TIMEOUT;
402  rc = ::select(maxfd+1, &ifds, &ofds, NULL, timer.asTimeval(tv, maxWaitSec));
403  savedErrno = errno;
404  if (rc < 0 && errno == EINTR)
405  {
406  rc = 0;
407  errno = 0;
409 #ifdef BLOCXX_NETWARE
410  // When the NetWare server is shutting down, select will
411  // set errno to EINTR on return. If this thread does not
412  // yield control (cooperative multitasking) then we end
413  // up in a very tight loop and get a CPUHog server abbend.
414  pthread_yield();
415 #endif
416  }
417 
418  timer.loop();
419  } while ((rc == 0) && !timer.expired());
420 
421  if (rc < 0)
422  {
423  errno = savedErrno;
424  return Select::SELECT_ERROR;
425  }
426  if (rc == 0)
427  {
428  return Select::SELECT_TIMEOUT;
429  }
430  int availableCount = 0;
431  int cval;
432  for (size_t i = 0; i < selarray.size(); i++)
433  {
434  selarray[i].wasError = false;
435  cval = 0;
436  if (FD_ISSET(selarray[i].s, &ifds))
437  {
438  selarray[i].readAvailable = true;
439  cval = 1;
440  }
441  else
442  {
443  selarray[i].readAvailable = false;
444  }
445 
446  if (FD_ISSET(selarray[i].s, &ofds))
447  {
448  selarray[i].writeAvailable = true;
449  cval = 1;
450  }
451  else
452  {
453  selarray[i].writeAvailable = false;
454  }
455 
456  availableCount += cval;
457 
458  }
459 
460  return availableCount;
461 #else
462  return SELECT_NOT_IMPLEMENTED;
463 #endif
464 }
465 
466 int
467 selectRW(SelectObjectArray& selarray, const Timeout& timeout)
468 {
469  int rv = selectRWEpoll(selarray, timeout);
470  if (rv != SELECT_NOT_IMPLEMENTED)
471  {
472  return rv;
473  }
474 
475  rv = selectRWPoll(selarray, timeout);
476  if (rv != SELECT_NOT_IMPLEMENTED)
477  {
478  return rv;
479  }
480 
481  rv = selectRWSelect(selarray, timeout);
483  return rv;
484 }
485 
487 #endif // #else BLOCXX_WIN32
488 
489 int
490 select(const SelectTypeArray& selarray, UInt32 ms)
491 {
492  return select(selarray, Timeout::relative(static_cast<float>(ms) * 1000.0));
493 }
494 
496 int
497 select(const SelectTypeArray& selarray, const Timeout& timeout)
498 {
499  SelectObjectArray soa;
500  soa.reserve(selarray.size());
501  for (size_t i = 0; i < selarray.size(); ++i)
502  {
503  SelectObject curObj(selarray[i]);
504  curObj.waitForRead = true;
505  soa.push_back(curObj);
506  }
507  int rv = selectRW(soa, timeout);
508  if (rv < 0)
509  {
510  return rv;
511  }
512 
513  // find the first selected object
514  for (size_t i = 0; i < soa.size(); ++i)
515  {
516  if (soa[i].readAvailable)
517  {
518  return i;
519  }
520  }
521  errno = 0;
522  return SELECT_ERROR;
523 }
524 
525 } // end namespace Select
526 
527 } // end namespace BLOCXX_NAMESPACE
528