blocxx
PosixUnnamedPipe.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
3 * Copyright (C) 2006, Novell, Inc. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of
14 * Vintela, Inc.,
15 * nor Novell, Inc.,
16 * nor the names of its contributors or employees may be used to
17 * endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 *******************************************************************************/
32 
33 
39 #include "blocxx/BLOCXX_config.h"
40 
41 #if !defined(BLOCXX_WIN32)
42 
44 #include "blocxx/AutoPtr.hpp"
45 #include "blocxx/IOException.hpp"
46 #include "blocxx/Format.hpp"
47 #include "blocxx/SocketUtils.hpp"
48 #include "blocxx/Assertion.hpp"
50 #include "blocxx/SignalScope.hpp"
51 #include "blocxx/Logger.hpp"
52 #include "blocxx/GlobalString.hpp"
53 
54 
55 #include "blocxx/Thread.hpp"
56 #ifdef BLOCXX_HAVE_UNISTD_H
57  #include <unistd.h>
58 #endif
59 #include <sys/socket.h>
60 #include <sys/types.h>
61 
62 #include <fcntl.h>
63 #include <errno.h>
64 #include <cstring>
65 
66 #if defined(BLOCXX_DARWIN)
67 // Necessary for detecting the kernel version in order to activate the descriptor passing workaround.
68 #include "blocxx/ThreadOnce.hpp"
69 #include "blocxx/PosixRegEx.hpp"
70 #include <sys/utsname.h>
71 #endif
72 
73 
74 namespace BLOCXX_NAMESPACE
75 {
76 
77 namespace
78 {
79  int upclose(int fd)
80  {
81  int rc;
82  do
83  {
84  rc = ::close(fd);
85  } while (rc < 0 && errno == EINTR);
86  if (rc == -1)
87  {
88  int lerrno = errno;
89  Logger lgr("blocxx");
90  BLOCXX_LOG_ERROR(lgr, Format("Closing pipe handle %1 failed: %2", fd, lerrno));
91  }
92  return rc;
93  }
94 
95  ::ssize_t upread(int fd, void * buf, std::size_t count)
96  {
97  ::ssize_t rv;
98  do
99  {
101  rv = ::read(fd, buf, count);
102  } while (rv < 0 && errno == EINTR);
103  return rv;
104  }
105 
106  ::ssize_t upwrite(int fd, void const * buf, std::size_t count)
107  {
108  ::ssize_t rv;
109  // block SIGPIPE so we don't kill the process if the pipe is closed.
110  SignalScope ss(SIGPIPE, SIG_IGN);
111  do
112  {
114  rv = ::write(fd, buf, count);
115  } while (rv < 0 && errno == EINTR);
116  return rv;
117  }
118 
119  int upaccept(int s, struct sockaddr * addr, socklen_t * addrlen)
120  {
121  int rv;
122  do
123  {
124  rv = ::accept(s, addr, addrlen);
125  } while (rv < 0 && errno == EINTR);
126  return rv;
127  }
128  enum EDirection
129  {
130  E_WRITE_PIPE, E_READ_PIPE
131  };
132 
133  // bufsz MUST be an int, and not some other integral type (address taken)
134  //
135  void setKernelBufferSize(Descriptor sockfd, int bufsz, EDirection edir)
136  {
137  if (sockfd == BLOCXX_INVALID_HANDLE)
138  {
139  return;
140  }
141 
142  int optname = (edir == E_WRITE_PIPE ? SO_SNDBUF : SO_RCVBUF);
143 
144  int getbufsz;
145  socklen_t getbufsz_len = sizeof(getbufsz);
146 
147 #ifdef BLOCXX_NCR
148  int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, (char*)&getbufsz, &getbufsz_len);
149 #else
150  int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, &getbufsz, &getbufsz_len);
151 #endif
152  if (errc == 0 && getbufsz < bufsz)
153  {
154 #ifdef BLOCXX_NCR
155  ::setsockopt(sockfd, SOL_SOCKET, optname, (char*)&bufsz, sizeof(bufsz));
156 #else
157  ::setsockopt(sockfd, SOL_SOCKET, optname, &bufsz, sizeof(bufsz));
158 #endif
159  }
160  }
161 
162  void setDefaultKernelBufsz(Descriptor sockfd_read, Descriptor sockfd_write)
163  {
164  int const BUFSZ = 64 * 1024;
165  setKernelBufferSize(sockfd_read, BUFSZ, E_READ_PIPE);
166  setKernelBufferSize(sockfd_write, BUFSZ, E_WRITE_PIPE);
167  }
168 
169  GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx.PosixUnnamedPipe");
170 
171 #if defined(BLOCXX_DARWIN)
172  // Mac OS X < 10.5 has a kernel bug related to passing descriptors. As a workaround, descriptors are passed synchronously.
173  // This variable determines whether the workaround will be used. It will be set to false by detectDescriptorPassingBug()
174  bool needDescriptorPassingWorkaround = true;
175 
176  // This is the control to ensure that detectDescriptorPassingBug() is only called once.
177  OnceFlag detectDescriptorPassingBugFlag = BLOCXX_ONCE_INIT;
178 
179  // This function can not have logging statements or they will be sent over the pipe before sending the ACK.
180  void detectDescriptorPassingBug()
181  {
182  // until OS X 10.5 is actually released, assume it will be broken (even though Apple said it is fixed)
183  needDescriptorPassingWorkaround = true;
184  return;
185 #if 0
186  // if uname() reports the version as < 9.0.0 then we'll need the workaround.
187  struct utsname unamerv;
188  if (::uname(&unamerv) == -1)
189  {
190  needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary.
191  return;
192  }
193  String release(unamerv.release);
194  PosixRegEx re("([^.]*)\\..*");
195  StringArray releaseCapture = re.capture(release);
196  if (releaseCapture.size() < 2)
197  {
198  needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary.
199  return;
200  }
201  String majorRelease = releaseCapture[1];
202  try
203  {
204  needDescriptorPassingWorkaround = (majorRelease.toInt32() < 9);
205  }
206  catch (StringConversionException& e)
207  {
208  needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary.
209  return;
210  }
211 #endif
212  }
213 #endif
214 
215 }
216 
217 #ifdef BLOCXX_NETWARE
218 namespace
219 {
220 class AcceptThread
221 {
222 public:
223  AcceptThread(int serversock)
224  : m_serversock(serversock)
225  , m_serverconn(-1)
226  {
227  }
228 
229  void acceptConnection();
230  int getConnectFD() { return m_serverconn; }
231 private:
232  int m_serversock;
233  int m_serverconn;
234 };
235 
236 void
237 AcceptThread::acceptConnection()
238 {
239  struct sockaddr_in sin;
240  size_t val;
241  int tmp = 1;
242 
243  tmp = 1;
244  ::setsockopt(m_serversock, IPPROTO_TCP, 1, // #define TCP_NODELAY 1
245  (char*) &tmp, sizeof(int));
246 
247  val = sizeof(struct sockaddr_in);
248  if ((m_serverconn = upaccept(m_serversock, (struct sockaddr*)&sin, &val))
249  == -1)
250  {
251  return;
252  }
253  tmp = 1;
254  ::setsockopt(m_serverconn, IPPROTO_TCP, 1, // #define TCP_NODELAY 1
255  (char *) &tmp, sizeof(int));
256  tmp = 0;
257  ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
258  (char*) &tmp, sizeof(int));
259 }
260 
261 void*
262 runConnClass(void* arg)
263 {
264  AcceptThread* acceptThread = (AcceptThread*)(arg);
265  acceptThread->acceptConnection();
266  ::pthread_exit(NULL);
267  return 0;
268 }
269 
270 int
271 _pipe(int *fds)
272 {
273  int svrfd, lerrno, connectfd;
274  size_t val;
275  struct sockaddr_in sin;
276 
277  svrfd = socket( AF_INET, SOCK_STREAM, 0 );
278  sin.sin_family = AF_INET;
279  sin.sin_addr.s_addr = htonl( 0x7f000001 ); // loopback
280  sin.sin_port = 0;
281  memset(sin.sin_zero, 0, 8 );
282  if (bind(svrfd, (struct sockaddr * )&sin, sizeof( struct sockaddr_in ) ) == -1)
283  {
284  int lerrno = errno;
285  upclose(svrfd);
286  fprintf(stderr, "CreateSocket(): Failed to bind on socket" );
287  return -1;
288  }
289  if (listen(svrfd, 1) == -1)
290  {
291  int lerrno = errno;
292  upclose(svrfd);
293  return -1;
294  }
295  val = sizeof(struct sockaddr_in);
296  if (getsockname(svrfd, ( struct sockaddr * )&sin, &val ) == -1)
297  {
298  int lerrno = errno;
299  fprintf(stderr, "CreateSocket(): Failed to obtain socket name" );
300  upclose(svrfd);
301  return -1;
302  }
303 
304  AcceptThread* pat = new AcceptThread(svrfd);
305  pthread_t athread;
306  // Start thread that will accept connection on svrfd.
307  // Once a connection is made the thread will exit.
308  pthread_create(&athread, NULL, runConnClass, pat);
309 
310  int clientfd = socket(AF_INET, SOCK_STREAM, 0);
311  if (clientfd == -1)
312  {
313  delete pat;
314  return -1;
315  }
316 
317  // Connect to server
318  struct sockaddr_in csin;
319  csin.sin_family = AF_INET;
320  csin.sin_addr.s_addr = htonl(0x7f000001); // loopback
321  csin.sin_port = sin.sin_port;
322  if (::connect(clientfd, (struct sockaddr*)&csin, sizeof(csin)) == -1)
323  {
324  delete pat;
325  return -1;
326  }
327 
328 #define TCP_NODELAY 1
329  int tmp = 1;
330  //
331  // Set for Non-blocking writes and disable keepalive
332  //
333  ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (char*)&tmp, sizeof(int));
334  tmp = 0;
335  ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (char*)&tmp, sizeof(int));
336 
337  void* threadResult;
338  // Wait for accept thread to terminate
339  ::pthread_join(athread, &threadResult);
340 
341  upclose(svrfd);
342  fds[0] = pat->getConnectFD();
343  fds[1] = clientfd;
344  delete pat;
345  return 0;
346 }
347 }
348 #endif // BLOCXX_NETWARE
349 
352 {
354  if (doOpen)
355  {
356  open();
357  }
358  setTimeouts(Timeout::relative(60 * 10)); // 10 minutes. This helps break deadlocks when using safePopen()
359  setBlocking(E_BLOCKING); // necessary to set the pipes up right.
360 }
361 
364 {
365  m_fds[0] = inputfd.get();
366  m_fds[1] = outputfd.get();
367  setTimeouts(Timeout::relative(60 * 10)); // 10 minutes. This helps break deadlocks when using safePopen()
369  setDefaultKernelBufsz(m_fds[0], m_fds[1]);
370  inputfd.release();
371  outputfd.release();
372 }
373 
376 {
377  close();
378 }
380 namespace
381 {
382  typedef UnnamedPipe::EBlockingMode EBlockingMode;
383 
384  void set_desc_blocking(
385  int d, EBlockingMode & bmflag, EBlockingMode blocking_mode)
386  {
388  int fdflags = fcntl(d, F_GETFL, 0);
389  if (fdflags == -1)
390  {
391  BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe blocking mode");
392  }
393  if (blocking_mode == UnnamedPipe::E_BLOCKING)
394  {
395  fdflags &= ~O_NONBLOCK;
396  }
397  else
398  {
399  fdflags |= O_NONBLOCK;
400  }
401  if (fcntl(d, F_SETFL, fdflags) == -1)
402  {
403  BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe blocking mode");
404  }
405  bmflag = blocking_mode;
406  }
407 }
409 void
410 PosixUnnamedPipe::setBlocking(EBlockingMode blocking_mode)
411 {
413 
414  for (size_t i = 0; i < 2; ++i)
415  {
416  if (m_fds[i] != -1)
417  {
418  set_desc_blocking(m_fds[i], m_blocking[i], blocking_mode);
419  }
420  }
421 }
423 void
424 PosixUnnamedPipe::setWriteBlocking(EBlockingMode blocking_mode)
425 {
426  set_desc_blocking(m_fds[1], m_blocking[1], blocking_mode);
427 }
429 void
430 PosixUnnamedPipe::setReadBlocking(EBlockingMode blocking_mode)
431 {
432  set_desc_blocking(m_fds[0], m_blocking[0], blocking_mode);
433 }
435 void
437 {
438  if (m_fds[0] != BLOCXX_INVALID_HANDLE)
439  {
440  close();
441  }
442 #if defined(BLOCXX_NETWARE)
443  if (_pipe(m_fds) == BLOCXX_INVALID_HANDLE)
444  {
446  BLOCXX_THROW_ERRNO_MSG(UnnamedPipeException, "PosixUnamedPipe::open(): soketpair()");
447  }
448 
449 #else
450  if (::socketpair(AF_UNIX, SOCK_STREAM, 0, m_fds) == -1)
451  {
452  m_fds[0] = m_fds[1] = -1;
453  BLOCXX_THROW_ERRNO_MSG(UnnamedPipeException, "PosixUnamedPipe::open(): soketpair()");
454  }
455  ::shutdown(m_fds[0], SHUT_WR);
456  ::shutdown(m_fds[1], SHUT_RD);
457  setDefaultKernelBufsz(m_fds[0], m_fds[1]);
458 #endif
459 }
461 int
463 {
464  int rc = -1;
465 
466  // handle the case where both input and output are the same descriptor. It can't be closed twice.
467  if (m_fds[0] == m_fds[1])
468  {
470  }
471 
472  if (m_fds[0] != BLOCXX_INVALID_HANDLE)
473  {
474 
475  rc = upclose(m_fds[0]);
477  }
478 
479  if (m_fds[1] != BLOCXX_INVALID_HANDLE)
480  {
481  rc = upclose(m_fds[1]);
483  }
484 
485  return rc;
486 }
488 bool
490 {
491  return (m_fds[0] != BLOCXX_INVALID_HANDLE) || (m_fds[1] != BLOCXX_INVALID_HANDLE);
492 }
493 
495 int
497 {
498  int rc = -1;
499  if (m_fds[0] != BLOCXX_INVALID_HANDLE)
500  {
501  if (m_fds[0] != m_fds[1])
502  {
503  rc = upclose(m_fds[0]);
504  }
506  }
507  return rc;
508 }
510 int
512 {
513  int rc = -1;
514  if (m_fds[1] != BLOCXX_INVALID_HANDLE)
515  {
516  if (m_fds[0] != m_fds[1])
517  {
518  rc = upclose(m_fds[1]);
519  }
521  }
522  return rc;
523 }
525 int
526 PosixUnnamedPipe::write(const void* data, int dataLen, ErrorAction errorAsException)
527 {
528  int rc = -1;
529  if (m_fds[1] != BLOCXX_INVALID_HANDLE)
530  {
531  if (m_blocking[1] == E_BLOCKING)
532  {
534  if (rc != 0)
535  {
536  if (rc == ETIMEDOUT)
537  {
538  errno = ETIMEDOUT;
539  }
540  if (errorAsException == E_THROW_ON_ERROR)
541  {
542  BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
543  }
544  else
545  {
546  return -1;
547  }
548  }
549  }
550  rc = upwrite(m_fds[1], data, dataLen);
551  }
552  if (errorAsException == E_THROW_ON_ERROR && rc == -1)
553  {
554  if (m_fds[1] == BLOCXX_INVALID_HANDLE)
555  {
556  BLOCXX_THROW(IOException, "pipe write failed because pipe is closed");
557  }
558  else
559  {
560  BLOCXX_THROW_ERRNO_MSG(IOException, "pipe write failed");
561  }
562  }
563  return rc;
564 }
566 int
567 PosixUnnamedPipe::read(void* buffer, int bufferLen, ErrorAction errorAsException)
568 {
569  int rc = -1;
570  if (m_fds[0] != BLOCXX_INVALID_HANDLE)
571  {
572  if (m_blocking[0] == E_BLOCKING)
573  {
575  if (rc != 0)
576  {
577  if (rc == ETIMEDOUT)
578  {
579  errno = ETIMEDOUT;
580  }
581  if (errorAsException == E_THROW_ON_ERROR)
582  {
583  BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
584  }
585  else
586  {
587  return -1;
588  }
589  }
590  }
591  rc = upread(m_fds[0], buffer, bufferLen);
592  }
593 
594  if (rc == 0)
595  {
597  }
598 
599  if (errorAsException == E_THROW_ON_ERROR && rc == -1)
600  {
601  if (m_fds[0] == BLOCXX_INVALID_HANDLE)
602  {
603  BLOCXX_THROW(IOException, "pipe read failed because pipe is closed");
604  }
605  else
606  {
607  BLOCXX_THROW_ERRNO_MSG(IOException, "pipe read failed");
608  }
609  }
610  return rc;
611 }
613 Select_t
615 {
616  return m_fds[0];
617 }
618 
620 Select_t
622 {
623  return m_fds[1];
624 }
625 
627 void
628 PosixUnnamedPipe::passDescriptor(Descriptor descriptor, const UnnamedPipeRef& ackPipe, const ProcessRef& targetProcess)
629 {
630  int rc = -1;
631  if (m_fds[1] != BLOCXX_INVALID_HANDLE)
632  {
633  if (m_blocking[1] == E_BLOCKING)
634  {
636 
637  if (rc != 0)
638  {
639  if (rc == ETIMEDOUT)
640  {
641  errno = ETIMEDOUT;
642  }
643  BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
644  }
645  }
646 
647  rc = blocxx::passDescriptor(m_fds[1], descriptor);
648  if (rc == -1)
649  {
650  BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: passDescriptor()");
651  }
652 
653 #if defined(BLOCXX_DARWIN)
654  callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
655  if (rc != -1 && needDescriptorPassingWorkaround)
656  {
657  // This ignores the blocking and timeouts, because this ACK shouldn't timeout.
659  if (rc != -1)
660  {
661  char ack = 'Z';
662  rc = ackPipe->read(&ack, sizeof(ack), E_RETURN_ON_ERROR);
663  if (rc == -1)
664  {
665  BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: ackPipe->read()");
666  }
667  if (ack != 'A')
668  {
669  BLOCXX_THROW(IOException, Format("sendDescriptor() failed: ackPipe->read() didn't get 'A', got %1", static_cast<int>(ack)).c_str());
670  }
671  }
672  else
673  {
674  BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: waitForIO()");
675  }
676  }
677 #endif
678  }
679  if (rc == -1)
680  {
681  if (m_fds[1] == BLOCXX_INVALID_HANDLE)
682  {
683  BLOCXX_THROW(IOException, "sendDescriptor() failed because pipe is closed");
684  }
685  else
686  {
687  BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed");
688  }
689  }
690 }
691 
695 {
696  int rc = -1;
697  AutoDescriptor descriptor;
698  if (m_fds[0] != BLOCXX_INVALID_HANDLE)
699  {
700  if (m_blocking[0] == E_BLOCKING)
701  {
703 
704  if (rc != 0)
705  {
706  if (rc == ETIMEDOUT)
707  {
708  errno = ETIMEDOUT;
709  }
710  BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
711  }
712  }
713  descriptor = blocxx::receiveDescriptor(m_fds[0]);
714 
715 #if defined(BLOCXX_DARWIN)
716  callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
717  if (needDescriptorPassingWorkaround)
718  {
719  // This ignores the blocking and timeouts, because this ACK shouldn't timeout.
721  if (rc != -1)
722  {
723  char ack = 'A';
724  ackPipe->write(&ack, sizeof(ack), E_THROW_ON_ERROR);
725  }
726  }
727 #endif
728  }
729  else
730  {
731  BLOCXX_THROW(IOException, "receiveDescriptor() failed because pipe is closed");
732  }
733  return descriptor;
734 }
735 
739 {
740  return m_fds[0];
741 }
742 
746 {
747  return m_fds[1];
748 }
749 
751 EBlockingMode
753 {
754  return m_blocking[0];
755 }
756 
758 EBlockingMode
760 {
761  return m_blocking[1];
762 }
763 
764 } // end namespace BLOCXX_NAMESPACE
765 
766 #endif