Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #if defined(_MSC_VER) && _MSC_VER < 1300
     29 #pragma warning(disable:4786)
     30 #endif
     31 
     32 #include <cassert>
     33 
     34 #ifdef POSIX
     35 #include <string.h>
     36 #include <errno.h>
     37 #include <fcntl.h>
     38 #include <sys/time.h>
     39 #include <unistd.h>
     40 #include <signal.h>
     41 #endif
     42 
     43 #ifdef WIN32
     44 #define WIN32_LEAN_AND_MEAN
     45 #include <windows.h>
     46 #include <winsock2.h>
     47 #include <ws2tcpip.h>
     48 #undef SetPort
     49 #endif
     50 
     51 #include <algorithm>
     52 #include <map>
     53 
     54 #include "talk/base/basictypes.h"
     55 #include "talk/base/byteorder.h"
     56 #include "talk/base/common.h"
     57 #include "talk/base/logging.h"
     58 #include "talk/base/nethelpers.h"
     59 #include "talk/base/physicalsocketserver.h"
     60 #include "talk/base/timeutils.h"
     61 #include "talk/base/winping.h"
     62 #include "talk/base/win32socketinit.h"
     63 
     64 // stm: this will tell us if we are on OSX
     65 #ifdef HAVE_CONFIG_H
     66 #include "config.h"
     67 #endif
     68 
     69 #ifdef POSIX
     70 #include <netinet/tcp.h>  // for TCP_NODELAY
     71 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
     72 typedef void* SockOptArg;
     73 #endif  // POSIX
     74 
     75 #ifdef WIN32
     76 typedef char* SockOptArg;
     77 #endif
     78 
     79 namespace talk_base {
     80 
     81 // Standard MTUs, from RFC 1191
     82 const uint16 PACKET_MAXIMUMS[] = {
     83   65535,    // Theoretical maximum, Hyperchannel
     84   32000,    // Nothing
     85   17914,    // 16Mb IBM Token Ring
     86   8166,     // IEEE 802.4
     87   //4464,   // IEEE 802.5 (4Mb max)
     88   4352,     // FDDI
     89   //2048,   // Wideband Network
     90   2002,     // IEEE 802.5 (4Mb recommended)
     91   //1536,   // Expermental Ethernet Networks
     92   //1500,   // Ethernet, Point-to-Point (default)
     93   1492,     // IEEE 802.3
     94   1006,     // SLIP, ARPANET
     95   //576,    // X.25 Networks
     96   //544,    // DEC IP Portal
     97   //512,    // NETBIOS
     98   508,      // IEEE 802/Source-Rt Bridge, ARCNET
     99   296,      // Point-to-Point (low delay)
    100   68,       // Official minimum
    101   0,        // End of list marker
    102 };
    103 
    104 static const int IP_HEADER_SIZE = 20u;
    105 static const int IPV6_HEADER_SIZE = 40u;
    106 static const int ICMP_HEADER_SIZE = 8u;
    107 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u;
    108 
    109 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
    110  public:
    111   PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET)
    112     : ss_(ss), s_(s), enabled_events_(0), error_(0),
    113       state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
    114       resolver_(NULL) {
    115 #ifdef WIN32
    116     // EnsureWinsockInit() ensures that winsock is initialized. The default
    117     // version of this function doesn't do anything because winsock is
    118     // initialized by constructor of a static object. If neccessary libjingle
    119     // users can link it with a different version of this function by replacing
    120     // win32socketinit.cc. See win32socketinit.cc for more details.
    121     EnsureWinsockInit();
    122 #endif
    123     if (s_ != INVALID_SOCKET) {
    124       enabled_events_ = DE_READ | DE_WRITE;
    125 
    126       int type = SOCK_STREAM;
    127       socklen_t len = sizeof(type);
    128       VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
    129       udp_ = (SOCK_DGRAM == type);
    130     }
    131   }
    132 
    133   virtual ~PhysicalSocket() {
    134     Close();
    135   }
    136 
    137   // Creates the underlying OS socket (same as the "socket" function).
    138   virtual bool Create(int family, int type) {
    139     Close();
    140     s_ = ::socket(family, type, 0);
    141     udp_ = (SOCK_DGRAM == type);
    142     UpdateLastError();
    143     if (udp_)
    144       enabled_events_ = DE_READ | DE_WRITE;
    145     return s_ != INVALID_SOCKET;
    146   }
    147 
    148   SocketAddress GetLocalAddress() const {
    149     sockaddr_storage addr_storage = {0};
    150     socklen_t addrlen = sizeof(addr_storage);
    151     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    152     int result = ::getsockname(s_, addr, &addrlen);
    153     SocketAddress address;
    154     if (result >= 0) {
    155       SocketAddressFromSockAddrStorage(addr_storage, &address);
    156     } else {
    157       LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
    158                       << s_;
    159     }
    160     return address;
    161   }
    162 
    163   SocketAddress GetRemoteAddress() const {
    164     sockaddr_storage addr_storage = {0};
    165     socklen_t addrlen = sizeof(addr_storage);
    166     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    167     int result = ::getpeername(s_, addr, &addrlen);
    168     SocketAddress address;
    169     if (result >= 0) {
    170       SocketAddressFromSockAddrStorage(addr_storage, &address);
    171     } else {
    172       LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
    173                       << s_;
    174     }
    175     return address;
    176   }
    177 
    178   int Bind(const SocketAddress& bind_addr) {
    179     sockaddr_storage addr_storage;
    180     size_t len = bind_addr.ToSockAddrStorage(&addr_storage);
    181     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    182     int err = ::bind(s_, addr, static_cast<int>(len));
    183     UpdateLastError();
    184 #ifdef _DEBUG
    185     if (0 == err) {
    186       dbg_addr_ = "Bound @ ";
    187       dbg_addr_.append(GetLocalAddress().ToString());
    188     }
    189 #endif  // _DEBUG
    190     return err;
    191   }
    192 
    193   int Connect(const SocketAddress& addr) {
    194     // TODO: Implicit creation is required to reconnect...
    195     // ...but should we make it more explicit?
    196     if (state_ != CS_CLOSED) {
    197       SetError(EALREADY);
    198       return SOCKET_ERROR;
    199     }
    200     if (addr.IsUnresolved()) {
    201       LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
    202       resolver_ = new AsyncResolver();
    203       resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
    204       resolver_->Start(addr);
    205       state_ = CS_CONNECTING;
    206       return 0;
    207     }
    208 
    209     return DoConnect(addr);
    210   }
    211 
    212   int DoConnect(const SocketAddress& connect_addr) {
    213     if ((s_ == INVALID_SOCKET) &&
    214         !Create(connect_addr.family(), SOCK_STREAM)) {
    215       return SOCKET_ERROR;
    216     }
    217     sockaddr_storage addr_storage;
    218     size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
    219     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    220     int err = ::connect(s_, addr, static_cast<int>(len));
    221     UpdateLastError();
    222     if (err == 0) {
    223       state_ = CS_CONNECTED;
    224     } else if (IsBlockingError(GetError())) {
    225       state_ = CS_CONNECTING;
    226       enabled_events_ |= DE_CONNECT;
    227     } else {
    228       return SOCKET_ERROR;
    229     }
    230 
    231     enabled_events_ |= DE_READ | DE_WRITE;
    232     return 0;
    233   }
    234 
    235   int GetError() const {
    236     CritScope cs(&crit_);
    237     return error_;
    238   }
    239 
    240   void SetError(int error) {
    241     CritScope cs(&crit_);
    242     error_ = error;
    243   }
    244 
    245   ConnState GetState() const {
    246     return state_;
    247   }
    248 
    249   int GetOption(Option opt, int* value) {
    250     int slevel;
    251     int sopt;
    252     if (TranslateOption(opt, &slevel, &sopt) == -1)
    253       return -1;
    254     socklen_t optlen = sizeof(*value);
    255     int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
    256     if (ret != -1 && opt == OPT_DONTFRAGMENT) {
    257 #ifdef LINUX
    258       *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
    259 #endif
    260     }
    261     return ret;
    262   }
    263 
    264   int SetOption(Option opt, int value) {
    265     int slevel;
    266     int sopt;
    267     if (TranslateOption(opt, &slevel, &sopt) == -1)
    268       return -1;
    269     if (opt == OPT_DONTFRAGMENT) {
    270 #ifdef LINUX
    271       value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
    272 #endif
    273     }
    274     return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
    275   }
    276 
    277   int Send(const void *pv, size_t cb) {
    278     int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
    279 #ifdef LINUX
    280         // Suppress SIGPIPE. Without this, attempting to send on a socket whose
    281         // other end is closed will result in a SIGPIPE signal being raised to
    282         // our process, which by default will terminate the process, which we
    283         // don't want. By specifying this flag, we'll just get the error EPIPE
    284         // instead and can handle the error gracefully.
    285         MSG_NOSIGNAL
    286 #else
    287         0
    288 #endif
    289         );
    290     UpdateLastError();
    291     MaybeRemapSendError();
    292     // We have seen minidumps where this may be false.
    293     ASSERT(sent <= static_cast<int>(cb));
    294     if ((sent < 0) && IsBlockingError(GetError())) {
    295       enabled_events_ |= DE_WRITE;
    296     }
    297     return sent;
    298   }
    299 
    300   int SendTo(const void* buffer, size_t length, const SocketAddress& addr) {
    301     sockaddr_storage saddr;
    302     size_t len = addr.ToSockAddrStorage(&saddr);
    303     int sent = ::sendto(
    304         s_, static_cast<const char *>(buffer), static_cast<int>(length),
    305 #ifdef LINUX
    306         // Suppress SIGPIPE. See above for explanation.
    307         MSG_NOSIGNAL,
    308 #else
    309         0,
    310 #endif
    311         reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
    312     UpdateLastError();
    313     MaybeRemapSendError();
    314     // We have seen minidumps where this may be false.
    315     ASSERT(sent <= static_cast<int>(length));
    316     if ((sent < 0) && IsBlockingError(GetError())) {
    317       enabled_events_ |= DE_WRITE;
    318     }
    319     return sent;
    320   }
    321 
    322   int Recv(void* buffer, size_t length) {
    323     int received = ::recv(s_, static_cast<char*>(buffer),
    324                           static_cast<int>(length), 0);
    325     if ((received == 0) && (length != 0)) {
    326       // Note: on graceful shutdown, recv can return 0.  In this case, we
    327       // pretend it is blocking, and then signal close, so that simplifying
    328       // assumptions can be made about Recv.
    329       LOG(LS_WARNING) << "EOF from socket; deferring close event";
    330       // Must turn this back on so that the select() loop will notice the close
    331       // event.
    332       enabled_events_ |= DE_READ;
    333       SetError(EWOULDBLOCK);
    334       return SOCKET_ERROR;
    335     }
    336     UpdateLastError();
    337     int error = GetError();
    338     bool success = (received >= 0) || IsBlockingError(error);
    339     if (udp_ || success) {
    340       enabled_events_ |= DE_READ;
    341     }
    342     if (!success) {
    343       LOG_F(LS_VERBOSE) << "Error = " << error;
    344     }
    345     return received;
    346   }
    347 
    348   int RecvFrom(void* buffer, size_t length, SocketAddress *out_addr) {
    349     sockaddr_storage addr_storage;
    350     socklen_t addr_len = sizeof(addr_storage);
    351     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    352     int received = ::recvfrom(s_, static_cast<char*>(buffer),
    353                               static_cast<int>(length), 0, addr, &addr_len);
    354     UpdateLastError();
    355     if ((received >= 0) && (out_addr != NULL))
    356       SocketAddressFromSockAddrStorage(addr_storage, out_addr);
    357     int error = GetError();
    358     bool success = (received >= 0) || IsBlockingError(error);
    359     if (udp_ || success) {
    360       enabled_events_ |= DE_READ;
    361     }
    362     if (!success) {
    363       LOG_F(LS_VERBOSE) << "Error = " << error;
    364     }
    365     return received;
    366   }
    367 
    368   int Listen(int backlog) {
    369     int err = ::listen(s_, backlog);
    370     UpdateLastError();
    371     if (err == 0) {
    372       state_ = CS_CONNECTING;
    373       enabled_events_ |= DE_ACCEPT;
    374 #ifdef _DEBUG
    375       dbg_addr_ = "Listening @ ";
    376       dbg_addr_.append(GetLocalAddress().ToString());
    377 #endif  // _DEBUG
    378     }
    379     return err;
    380   }
    381 
    382   AsyncSocket* Accept(SocketAddress *out_addr) {
    383     sockaddr_storage addr_storage;
    384     socklen_t addr_len = sizeof(addr_storage);
    385     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    386     SOCKET s = ::accept(s_, addr, &addr_len);
    387     UpdateLastError();
    388     if (s == INVALID_SOCKET)
    389       return NULL;
    390     enabled_events_ |= DE_ACCEPT;
    391     if (out_addr != NULL)
    392       SocketAddressFromSockAddrStorage(addr_storage, out_addr);
    393     return ss_->WrapSocket(s);
    394   }
    395 
    396   int Close() {
    397     if (s_ == INVALID_SOCKET)
    398       return 0;
    399     int err = ::closesocket(s_);
    400     UpdateLastError();
    401     s_ = INVALID_SOCKET;
    402     state_ = CS_CLOSED;
    403     enabled_events_ = 0;
    404     if (resolver_) {
    405       resolver_->Destroy(false);
    406       resolver_ = NULL;
    407     }
    408     return err;
    409   }
    410 
    411   int EstimateMTU(uint16* mtu) {
    412     SocketAddress addr = GetRemoteAddress();
    413     if (addr.IsAny()) {
    414       SetError(ENOTCONN);
    415       return -1;
    416     }
    417 
    418 #if defined(WIN32)
    419     // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
    420     WinPing ping;
    421     if (!ping.IsValid()) {
    422       SetError(EINVAL);  // can't think of a better error ID
    423       return -1;
    424     }
    425     int header_size = ICMP_HEADER_SIZE;
    426     if (addr.family() == AF_INET6) {
    427       header_size += IPV6_HEADER_SIZE;
    428     } else if (addr.family() == AF_INET) {
    429       header_size += IP_HEADER_SIZE;
    430     }
    431 
    432     for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
    433       int32 size = PACKET_MAXIMUMS[level] - header_size;
    434       WinPing::PingResult result = ping.Ping(addr.ipaddr(), size,
    435                                              ICMP_PING_TIMEOUT_MILLIS,
    436                                              1, false);
    437       if (result == WinPing::PING_FAIL) {
    438         SetError(EINVAL);  // can't think of a better error ID
    439         return -1;
    440       } else if (result != WinPing::PING_TOO_LARGE) {
    441         *mtu = PACKET_MAXIMUMS[level];
    442         return 0;
    443       }
    444     }
    445 
    446     ASSERT(false);
    447     return -1;
    448 #elif defined(IOS) || defined(OSX)
    449     // No simple way to do this on Mac OS X.
    450     // SIOCGIFMTU would work if we knew which interface would be used, but
    451     // figuring that out is pretty complicated. For now we'll return an error
    452     // and let the caller pick a default MTU.
    453     SetError(EINVAL);
    454     return -1;
    455 #elif defined(LINUX) || defined(ANDROID)
    456     // Gets the path MTU.
    457     int value;
    458     socklen_t vlen = sizeof(value);
    459     int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
    460     if (err < 0) {
    461       UpdateLastError();
    462       return err;
    463     }
    464 
    465     ASSERT((0 <= value) && (value <= 65536));
    466     *mtu = value;
    467     return 0;
    468 #elif defined(__native_client__)
    469     // Most socket operations, including this, will fail in NaCl's sandbox.
    470     error_ = EACCES;
    471     return -1;
    472 #endif
    473   }
    474 
    475   SocketServer* socketserver() { return ss_; }
    476 
    477  protected:
    478   void OnResolveResult(AsyncResolverInterface* resolver) {
    479     if (resolver != resolver_) {
    480       return;
    481     }
    482 
    483     int error = resolver_->GetError();
    484     if (error == 0) {
    485       error = DoConnect(resolver_->address());
    486     } else {
    487       Close();
    488     }
    489 
    490     if (error) {
    491       SetError(error);
    492       SignalCloseEvent(this, error);
    493     }
    494   }
    495 
    496   void UpdateLastError() {
    497     SetError(LAST_SYSTEM_ERROR);
    498   }
    499 
    500   void MaybeRemapSendError() {
    501 #if defined(OSX)
    502     // https://developer.apple.com/library/mac/documentation/Darwin/
    503     // Reference/ManPages/man2/sendto.2.html
    504     // ENOBUFS - The output queue for a network interface is full.
    505     // This generally indicates that the interface has stopped sending,
    506     // but may be caused by transient congestion.
    507     if (GetError() == ENOBUFS) {
    508       SetError(EWOULDBLOCK);
    509     }
    510 #endif
    511   }
    512 
    513   static int TranslateOption(Option opt, int* slevel, int* sopt) {
    514     switch (opt) {
    515       case OPT_DONTFRAGMENT:
    516 #ifdef WIN32
    517         *slevel = IPPROTO_IP;
    518         *sopt = IP_DONTFRAGMENT;
    519         break;
    520 #elif defined(IOS) || defined(OSX) || defined(BSD)
    521         LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
    522         return -1;
    523 #elif defined(POSIX)
    524         *slevel = IPPROTO_IP;
    525         *sopt = IP_MTU_DISCOVER;
    526         break;
    527 #endif
    528       case OPT_RCVBUF:
    529         *slevel = SOL_SOCKET;
    530         *sopt = SO_RCVBUF;
    531         break;
    532       case OPT_SNDBUF:
    533         *slevel = SOL_SOCKET;
    534         *sopt = SO_SNDBUF;
    535         break;
    536       case OPT_NODELAY:
    537         *slevel = IPPROTO_TCP;
    538         *sopt = TCP_NODELAY;
    539         break;
    540       case OPT_DSCP:
    541         LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
    542         return -1;
    543       default:
    544         ASSERT(false);
    545         return -1;
    546     }
    547     return 0;
    548   }
    549 
    550   PhysicalSocketServer* ss_;
    551   SOCKET s_;
    552   uint8 enabled_events_;
    553   bool udp_;
    554   int error_;
    555   // Protects |error_| that is accessed from different threads.
    556   mutable CriticalSection crit_;
    557   ConnState state_;
    558   AsyncResolver* resolver_;
    559 
    560 #ifdef _DEBUG
    561   std::string dbg_addr_;
    562 #endif  // _DEBUG;
    563 };
    564 
    565 #ifdef POSIX
    566 class EventDispatcher : public Dispatcher {
    567  public:
    568   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
    569     if (pipe(afd_) < 0)
    570       LOG(LERROR) << "pipe failed";
    571     ss_->Add(this);
    572   }
    573 
    574   virtual ~EventDispatcher() {
    575     ss_->Remove(this);
    576     close(afd_[0]);
    577     close(afd_[1]);
    578   }
    579 
    580   virtual void Signal() {
    581     CritScope cs(&crit_);
    582     if (!fSignaled_) {
    583       const uint8 b[1] = { 0 };
    584       if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) {
    585         fSignaled_ = true;
    586       }
    587     }
    588   }
    589 
    590   virtual uint32 GetRequestedEvents() {
    591     return DE_READ;
    592   }
    593 
    594   virtual void OnPreEvent(uint32 ff) {
    595     // It is not possible to perfectly emulate an auto-resetting event with
    596     // pipes.  This simulates it by resetting before the event is handled.
    597 
    598     CritScope cs(&crit_);
    599     if (fSignaled_) {
    600       uint8 b[4];  // Allow for reading more than 1 byte, but expect 1.
    601       VERIFY(1 == read(afd_[0], b, sizeof(b)));
    602       fSignaled_ = false;
    603     }
    604   }
    605 
    606   virtual void OnEvent(uint32 ff, int err) {
    607     ASSERT(false);
    608   }
    609 
    610   virtual int GetDescriptor() {
    611     return afd_[0];
    612   }
    613 
    614   virtual bool IsDescriptorClosed() {
    615     return false;
    616   }
    617 
    618  private:
    619   PhysicalSocketServer *ss_;
    620   int afd_[2];
    621   bool fSignaled_;
    622   CriticalSection crit_;
    623 };
    624 
    625 // These two classes use the self-pipe trick to deliver POSIX signals to our
    626 // select loop. This is the only safe, reliable, cross-platform way to do
    627 // non-trivial things with a POSIX signal in an event-driven program (until
    628 // proper pselect() implementations become ubiquitous).
    629 
    630 class PosixSignalHandler {
    631  public:
    632   // POSIX only specifies 32 signals, but in principle the system might have
    633   // more and the programmer might choose to use them, so we size our array
    634   // for 128.
    635   static const int kNumPosixSignals = 128;
    636 
    637   // There is just a single global instance. (Signal handlers do not get any
    638   // sort of user-defined void * parameter, so they can't access anything that
    639   // isn't global.)
    640   static PosixSignalHandler* Instance() {
    641     LIBJINGLE_DEFINE_STATIC_LOCAL(PosixSignalHandler, instance, ());
    642     return &instance;
    643   }
    644 
    645   // Returns true if the given signal number is set.
    646   bool IsSignalSet(int signum) const {
    647     ASSERT(signum < ARRAY_SIZE(received_signal_));
    648     if (signum < ARRAY_SIZE(received_signal_)) {
    649       return received_signal_[signum];
    650     } else {
    651       return false;
    652     }
    653   }
    654 
    655   // Clears the given signal number.
    656   void ClearSignal(int signum) {
    657     ASSERT(signum < ARRAY_SIZE(received_signal_));
    658     if (signum < ARRAY_SIZE(received_signal_)) {
    659       received_signal_[signum] = false;
    660     }
    661   }
    662 
    663   // Returns the file descriptor to monitor for signal events.
    664   int GetDescriptor() const {
    665     return afd_[0];
    666   }
    667 
    668   // This is called directly from our real signal handler, so it must be
    669   // signal-handler-safe. That means it cannot assume anything about the
    670   // user-level state of the process, since the handler could be executed at any
    671   // time on any thread.
    672   void OnPosixSignalReceived(int signum) {
    673     if (signum >= ARRAY_SIZE(received_signal_)) {
    674       // We don't have space in our array for this.
    675       return;
    676     }
    677     // Set a flag saying we've seen this signal.
    678     received_signal_[signum] = true;
    679     // Notify application code that we got a signal.
    680     const uint8 b[1] = { 0 };
    681     if (-1 == write(afd_[1], b, sizeof(b))) {
    682       // Nothing we can do here. If there's an error somehow then there's
    683       // nothing we can safely do from a signal handler.
    684       // No, we can't even safely log it.
    685       // But, we still have to check the return value here. Otherwise,
    686       // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help.
    687       return;
    688     }
    689   }
    690 
    691  private:
    692   PosixSignalHandler() {
    693     if (pipe(afd_) < 0) {
    694       LOG_ERR(LS_ERROR) << "pipe failed";
    695       return;
    696     }
    697     if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) {
    698       LOG_ERR(LS_WARNING) << "fcntl #1 failed";
    699     }
    700     if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) {
    701       LOG_ERR(LS_WARNING) << "fcntl #2 failed";
    702     }
    703     memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)),
    704            0,
    705            sizeof(received_signal_));
    706   }
    707 
    708   ~PosixSignalHandler() {
    709     int fd1 = afd_[0];
    710     int fd2 = afd_[1];
    711     // We clobber the stored file descriptor numbers here or else in principle
    712     // a signal that happens to be delivered during application termination
    713     // could erroneously write a zero byte to an unrelated file handle in
    714     // OnPosixSignalReceived() if some other file happens to be opened later
    715     // during shutdown and happens to be given the same file descriptor number
    716     // as our pipe had. Unfortunately even with this precaution there is still a
    717     // race where that could occur if said signal happens to be handled
    718     // concurrently with this code and happens to have already read the value of
    719     // afd_[1] from memory before we clobber it, but that's unlikely.
    720     afd_[0] = -1;
    721     afd_[1] = -1;
    722     close(fd1);
    723     close(fd2);
    724   }
    725 
    726   int afd_[2];
    727   // These are boolean flags that will be set in our signal handler and read
    728   // and cleared from Wait(). There is a race involved in this, but it is
    729   // benign. The signal handler sets the flag before signaling the pipe, so
    730   // we'll never end up blocking in select() while a flag is still true.
    731   // However, if two of the same signal arrive close to each other then it's
    732   // possible that the second time the handler may set the flag while it's still
    733   // true, meaning that signal will be missed. But the first occurrence of it
    734   // will still be handled, so this isn't a problem.
    735   // Volatile is not necessary here for correctness, but this data _is_ volatile
    736   // so I've marked it as such.
    737   volatile uint8 received_signal_[kNumPosixSignals];
    738 };
    739 
    740 class PosixSignalDispatcher : public Dispatcher {
    741  public:
    742   PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) {
    743     owner_->Add(this);
    744   }
    745 
    746   virtual ~PosixSignalDispatcher() {
    747     owner_->Remove(this);
    748   }
    749 
    750   virtual uint32 GetRequestedEvents() {
    751     return DE_READ;
    752   }
    753 
    754   virtual void OnPreEvent(uint32 ff) {
    755     // Events might get grouped if signals come very fast, so we read out up to
    756     // 16 bytes to make sure we keep the pipe empty.
    757     uint8 b[16];
    758     ssize_t ret = read(GetDescriptor(), b, sizeof(b));
    759     if (ret < 0) {
    760       LOG_ERR(LS_WARNING) << "Error in read()";
    761     } else if (ret == 0) {
    762       LOG(LS_WARNING) << "Should have read at least one byte";
    763     }
    764   }
    765 
    766   virtual void OnEvent(uint32 ff, int err) {
    767     for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals;
    768          ++signum) {
    769       if (PosixSignalHandler::Instance()->IsSignalSet(signum)) {
    770         PosixSignalHandler::Instance()->ClearSignal(signum);
    771         HandlerMap::iterator i = handlers_.find(signum);
    772         if (i == handlers_.end()) {
    773           // This can happen if a signal is delivered to our process at around
    774           // the same time as we unset our handler for it. It is not an error
    775           // condition, but it's unusual enough to be worth logging.
    776           LOG(LS_INFO) << "Received signal with no handler: " << signum;
    777         } else {
    778           // Otherwise, execute our handler.
    779           (*i->second)(signum);
    780         }
    781       }
    782     }
    783   }
    784 
    785   virtual int GetDescriptor() {
    786     return PosixSignalHandler::Instance()->GetDescriptor();
    787   }
    788 
    789   virtual bool IsDescriptorClosed() {
    790     return false;
    791   }
    792 
    793   void SetHandler(int signum, void (*handler)(int)) {
    794     handlers_[signum] = handler;
    795   }
    796 
    797   void ClearHandler(int signum) {
    798     handlers_.erase(signum);
    799   }
    800 
    801   bool HasHandlers() {
    802     return !handlers_.empty();
    803   }
    804 
    805  private:
    806   typedef std::map<int, void (*)(int)> HandlerMap;
    807 
    808   HandlerMap handlers_;
    809   // Our owner.
    810   PhysicalSocketServer *owner_;
    811 };
    812 
    813 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
    814  public:
    815   explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {
    816   }
    817   SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
    818   }
    819 
    820   virtual ~SocketDispatcher() {
    821     Close();
    822   }
    823 
    824   bool Initialize() {
    825     ss_->Add(this);
    826     fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
    827     return true;
    828   }
    829 
    830   virtual bool Create(int type) {
    831     return Create(AF_INET, type);
    832   }
    833 
    834   virtual bool Create(int family, int type) {
    835     // Change the socket to be non-blocking.
    836     if (!PhysicalSocket::Create(family, type))
    837       return false;
    838 
    839     return Initialize();
    840   }
    841 
    842   virtual int GetDescriptor() {
    843     return s_;
    844   }
    845 
    846   virtual bool IsDescriptorClosed() {
    847     // We don't have a reliable way of distinguishing end-of-stream
    848     // from readability.  So test on each readable call.  Is this
    849     // inefficient?  Probably.
    850     char ch;
    851     ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
    852     if (res > 0) {
    853       // Data available, so not closed.
    854       return false;
    855     } else if (res == 0) {
    856       // EOF, so closed.
    857       return true;
    858     } else {  // error
    859       switch (errno) {
    860         // Returned if we've already closed s_.
    861         case EBADF:
    862         // Returned during ungraceful peer shutdown.
    863         case ECONNRESET:
    864           return true;
    865         default:
    866           // Assume that all other errors are just blocking errors, meaning the
    867           // connection is still good but we just can't read from it right now.
    868           // This should only happen when connecting (and at most once), because
    869           // in all other cases this function is only called if the file
    870           // descriptor is already known to be in the readable state. However,
    871           // it's not necessary a problem if we spuriously interpret a
    872           // "connection lost"-type error as a blocking error, because typically
    873           // the next recv() will get EOF, so we'll still eventually notice that
    874           // the socket is closed.
    875           LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
    876           return false;
    877       }
    878     }
    879   }
    880 
    881   virtual uint32 GetRequestedEvents() {
    882     return enabled_events_;
    883   }
    884 
    885   virtual void OnPreEvent(uint32 ff) {
    886     if ((ff & DE_CONNECT) != 0)
    887       state_ = CS_CONNECTED;
    888     if ((ff & DE_CLOSE) != 0)
    889       state_ = CS_CLOSED;
    890   }
    891 
    892   virtual void OnEvent(uint32 ff, int err) {
    893     // Make sure we deliver connect/accept first. Otherwise, consumers may see
    894     // something like a READ followed by a CONNECT, which would be odd.
    895     if ((ff & DE_CONNECT) != 0) {
    896       enabled_events_ &= ~DE_CONNECT;
    897       SignalConnectEvent(this);
    898     }
    899     if ((ff & DE_ACCEPT) != 0) {
    900       enabled_events_ &= ~DE_ACCEPT;
    901       SignalReadEvent(this);
    902     }
    903     if ((ff & DE_READ) != 0) {
    904       enabled_events_ &= ~DE_READ;
    905       SignalReadEvent(this);
    906     }
    907     if ((ff & DE_WRITE) != 0) {
    908       enabled_events_ &= ~DE_WRITE;
    909       SignalWriteEvent(this);
    910     }
    911     if ((ff & DE_CLOSE) != 0) {
    912       // The socket is now dead to us, so stop checking it.
    913       enabled_events_ = 0;
    914       SignalCloseEvent(this, err);
    915     }
    916   }
    917 
    918   virtual int Close() {
    919     if (s_ == INVALID_SOCKET)
    920       return 0;
    921 
    922     ss_->Remove(this);
    923     return PhysicalSocket::Close();
    924   }
    925 };
    926 
    927 class FileDispatcher: public Dispatcher, public AsyncFile {
    928  public:
    929   FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
    930     set_readable(true);
    931 
    932     ss_->Add(this);
    933 
    934     fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
    935   }
    936 
    937   virtual ~FileDispatcher() {
    938     ss_->Remove(this);
    939   }
    940 
    941   SocketServer* socketserver() { return ss_; }
    942 
    943   virtual int GetDescriptor() {
    944     return fd_;
    945   }
    946 
    947   virtual bool IsDescriptorClosed() {
    948     return false;
    949   }
    950 
    951   virtual uint32 GetRequestedEvents() {
    952     return flags_;
    953   }
    954 
    955   virtual void OnPreEvent(uint32 ff) {
    956   }
    957 
    958   virtual void OnEvent(uint32 ff, int err) {
    959     if ((ff & DE_READ) != 0)
    960       SignalReadEvent(this);
    961     if ((ff & DE_WRITE) != 0)
    962       SignalWriteEvent(this);
    963     if ((ff & DE_CLOSE) != 0)
    964       SignalCloseEvent(this, err);
    965   }
    966 
    967   virtual bool readable() {
    968     return (flags_ & DE_READ) != 0;
    969   }
    970 
    971   virtual void set_readable(bool value) {
    972     flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ);
    973   }
    974 
    975   virtual bool writable() {
    976     return (flags_ & DE_WRITE) != 0;
    977   }
    978 
    979   virtual void set_writable(bool value) {
    980     flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE);
    981   }
    982 
    983  private:
    984   PhysicalSocketServer* ss_;
    985   int fd_;
    986   int flags_;
    987 };
    988 
    989 AsyncFile* PhysicalSocketServer::CreateFile(int fd) {
    990   return new FileDispatcher(fd, this);
    991 }
    992 
    993 #endif // POSIX
    994 
    995 #ifdef WIN32
    996 static uint32 FlagsToEvents(uint32 events) {
    997   uint32 ffFD = FD_CLOSE;
    998   if (events & DE_READ)
    999     ffFD |= FD_READ;
   1000   if (events & DE_WRITE)
   1001     ffFD |= FD_WRITE;
   1002   if (events & DE_CONNECT)
   1003     ffFD |= FD_CONNECT;
   1004   if (events & DE_ACCEPT)
   1005     ffFD |= FD_ACCEPT;
   1006   return ffFD;
   1007 }
   1008 
   1009 class EventDispatcher : public Dispatcher {
   1010  public:
   1011   EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) {
   1012     hev_ = WSACreateEvent();
   1013     if (hev_) {
   1014       ss_->Add(this);
   1015     }
   1016   }
   1017 
   1018   ~EventDispatcher() {
   1019     if (hev_ != NULL) {
   1020       ss_->Remove(this);
   1021       WSACloseEvent(hev_);
   1022       hev_ = NULL;
   1023     }
   1024   }
   1025 
   1026   virtual void Signal() {
   1027     if (hev_ != NULL)
   1028       WSASetEvent(hev_);
   1029   }
   1030 
   1031   virtual uint32 GetRequestedEvents() {
   1032     return 0;
   1033   }
   1034 
   1035   virtual void OnPreEvent(uint32 ff) {
   1036     WSAResetEvent(hev_);
   1037   }
   1038 
   1039   virtual void OnEvent(uint32 ff, int err) {
   1040   }
   1041 
   1042   virtual WSAEVENT GetWSAEvent() {
   1043     return hev_;
   1044   }
   1045 
   1046   virtual SOCKET GetSocket() {
   1047     return INVALID_SOCKET;
   1048   }
   1049 
   1050   virtual bool CheckSignalClose() { return false; }
   1051 
   1052 private:
   1053   PhysicalSocketServer* ss_;
   1054   WSAEVENT hev_;
   1055 };
   1056 
   1057 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
   1058  public:
   1059   static int next_id_;
   1060   int id_;
   1061   bool signal_close_;
   1062   int signal_err_;
   1063 
   1064   SocketDispatcher(PhysicalSocketServer* ss)
   1065       : PhysicalSocket(ss),
   1066         id_(0),
   1067         signal_close_(false) {
   1068   }
   1069 
   1070   SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
   1071       : PhysicalSocket(ss, s),
   1072         id_(0),
   1073         signal_close_(false) {
   1074   }
   1075 
   1076   virtual ~SocketDispatcher() {
   1077     Close();
   1078   }
   1079 
   1080   bool Initialize() {
   1081     ASSERT(s_ != INVALID_SOCKET);
   1082     // Must be a non-blocking
   1083     u_long argp = 1;
   1084     ioctlsocket(s_, FIONBIO, &argp);
   1085     ss_->Add(this);
   1086     return true;
   1087   }
   1088 
   1089   virtual bool Create(int type) {
   1090     return Create(AF_INET, type);
   1091   }
   1092 
   1093   virtual bool Create(int family, int type) {
   1094     // Create socket
   1095     if (!PhysicalSocket::Create(family, type))
   1096       return false;
   1097 
   1098     if (!Initialize())
   1099       return false;
   1100 
   1101     do { id_ = ++next_id_; } while (id_ == 0);
   1102     return true;
   1103   }
   1104 
   1105   virtual int Close() {
   1106     if (s_ == INVALID_SOCKET)
   1107       return 0;
   1108 
   1109     id_ = 0;
   1110     signal_close_ = false;
   1111     ss_->Remove(this);
   1112     return PhysicalSocket::Close();
   1113   }
   1114 
   1115   virtual uint32 GetRequestedEvents() {
   1116     return enabled_events_;
   1117   }
   1118 
   1119   virtual void OnPreEvent(uint32 ff) {
   1120     if ((ff & DE_CONNECT) != 0)
   1121       state_ = CS_CONNECTED;
   1122     // We set CS_CLOSED from CheckSignalClose.
   1123   }
   1124 
   1125   virtual void OnEvent(uint32 ff, int err) {
   1126     int cache_id = id_;
   1127     // Make sure we deliver connect/accept first. Otherwise, consumers may see
   1128     // something like a READ followed by a CONNECT, which would be odd.
   1129     if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
   1130       if (ff != DE_CONNECT)
   1131         LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
   1132       enabled_events_ &= ~DE_CONNECT;
   1133 #ifdef _DEBUG
   1134       dbg_addr_ = "Connected @ ";
   1135       dbg_addr_.append(GetRemoteAddress().ToString());
   1136 #endif  // _DEBUG
   1137       SignalConnectEvent(this);
   1138     }
   1139     if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
   1140       enabled_events_ &= ~DE_ACCEPT;
   1141       SignalReadEvent(this);
   1142     }
   1143     if ((ff & DE_READ) != 0) {
   1144       enabled_events_ &= ~DE_READ;
   1145       SignalReadEvent(this);
   1146     }
   1147     if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
   1148       enabled_events_ &= ~DE_WRITE;
   1149       SignalWriteEvent(this);
   1150     }
   1151     if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
   1152       signal_close_ = true;
   1153       signal_err_ = err;
   1154     }
   1155   }
   1156 
   1157   virtual WSAEVENT GetWSAEvent() {
   1158     return WSA_INVALID_EVENT;
   1159   }
   1160 
   1161   virtual SOCKET GetSocket() {
   1162     return s_;
   1163   }
   1164 
   1165   virtual bool CheckSignalClose() {
   1166     if (!signal_close_)
   1167       return false;
   1168 
   1169     char ch;
   1170     if (recv(s_, &ch, 1, MSG_PEEK) > 0)
   1171       return false;
   1172 
   1173     state_ = CS_CLOSED;
   1174     signal_close_ = false;
   1175     SignalCloseEvent(this, signal_err_);
   1176     return true;
   1177   }
   1178 };
   1179 
   1180 int SocketDispatcher::next_id_ = 0;
   1181 
   1182 #endif  // WIN32
   1183 
   1184 // Sets the value of a boolean value to false when signaled.
   1185 class Signaler : public EventDispatcher {
   1186  public:
   1187   Signaler(PhysicalSocketServer* ss, bool* pf)
   1188       : EventDispatcher(ss), pf_(pf) {
   1189   }
   1190   virtual ~Signaler() { }
   1191 
   1192   void OnEvent(uint32 ff, int err) {
   1193     if (pf_)
   1194       *pf_ = false;
   1195   }
   1196 
   1197  private:
   1198   bool *pf_;
   1199 };
   1200 
   1201 PhysicalSocketServer::PhysicalSocketServer()
   1202     : fWait_(false),
   1203       last_tick_tracked_(0),
   1204       last_tick_dispatch_count_(0) {
   1205   signal_wakeup_ = new Signaler(this, &fWait_);
   1206 #ifdef WIN32
   1207   socket_ev_ = WSACreateEvent();
   1208 #endif
   1209 }
   1210 
   1211 PhysicalSocketServer::~PhysicalSocketServer() {
   1212 #ifdef WIN32
   1213   WSACloseEvent(socket_ev_);
   1214 #endif
   1215 #ifdef POSIX
   1216   signal_dispatcher_.reset();
   1217 #endif
   1218   delete signal_wakeup_;
   1219   ASSERT(dispatchers_.empty());
   1220 }
   1221 
   1222 void PhysicalSocketServer::WakeUp() {
   1223   signal_wakeup_->Signal();
   1224 }
   1225 
   1226 Socket* PhysicalSocketServer::CreateSocket(int type) {
   1227   return CreateSocket(AF_INET, type);
   1228 }
   1229 
   1230 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
   1231   PhysicalSocket* socket = new PhysicalSocket(this);
   1232   if (socket->Create(family, type)) {
   1233     return socket;
   1234   } else {
   1235     delete socket;
   1236     return 0;
   1237   }
   1238 }
   1239 
   1240 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
   1241   return CreateAsyncSocket(AF_INET, type);
   1242 }
   1243 
   1244 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
   1245   SocketDispatcher* dispatcher = new SocketDispatcher(this);
   1246   if (dispatcher->Create(family, type)) {
   1247     return dispatcher;
   1248   } else {
   1249     delete dispatcher;
   1250     return 0;
   1251   }
   1252 }
   1253 
   1254 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
   1255   SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
   1256   if (dispatcher->Initialize()) {
   1257     return dispatcher;
   1258   } else {
   1259     delete dispatcher;
   1260     return 0;
   1261   }
   1262 }
   1263 
   1264 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
   1265   CritScope cs(&crit_);
   1266   // Prevent duplicates. This can cause dead dispatchers to stick around.
   1267   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
   1268                                            dispatchers_.end(),
   1269                                            pdispatcher);
   1270   if (pos != dispatchers_.end())
   1271     return;
   1272   dispatchers_.push_back(pdispatcher);
   1273 }
   1274 
   1275 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
   1276   CritScope cs(&crit_);
   1277   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
   1278                                            dispatchers_.end(),
   1279                                            pdispatcher);
   1280   // We silently ignore duplicate calls to Add, so we should silently ignore
   1281   // the (expected) symmetric calls to Remove. Note that this may still hide
   1282   // a real issue, so we at least log a warning about it.
   1283   if (pos == dispatchers_.end()) {
   1284     LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
   1285                     << "dispatcher, potentially from a duplicate call to Add.";
   1286     return;
   1287   }
   1288   size_t index = pos - dispatchers_.begin();
   1289   dispatchers_.erase(pos);
   1290   for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
   1291        ++it) {
   1292     if (index < **it) {
   1293       --**it;
   1294     }
   1295   }
   1296 }
   1297 
   1298 #ifdef POSIX
   1299 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
   1300   // Calculate timing information
   1301 
   1302   struct timeval *ptvWait = NULL;
   1303   struct timeval tvWait;
   1304   struct timeval tvStop;
   1305   if (cmsWait != kForever) {
   1306     // Calculate wait timeval
   1307     tvWait.tv_sec = cmsWait / 1000;
   1308     tvWait.tv_usec = (cmsWait % 1000) * 1000;
   1309     ptvWait = &tvWait;
   1310 
   1311     // Calculate when to return in a timeval
   1312     gettimeofday(&tvStop, NULL);
   1313     tvStop.tv_sec += tvWait.tv_sec;
   1314     tvStop.tv_usec += tvWait.tv_usec;
   1315     if (tvStop.tv_usec >= 1000000) {
   1316       tvStop.tv_usec -= 1000000;
   1317       tvStop.tv_sec += 1;
   1318     }
   1319   }
   1320 
   1321   // Zero all fd_sets. Don't need to do this inside the loop since
   1322   // select() zeros the descriptors not signaled
   1323 
   1324   fd_set fdsRead;
   1325   FD_ZERO(&fdsRead);
   1326   fd_set fdsWrite;
   1327   FD_ZERO(&fdsWrite);
   1328 
   1329   fWait_ = true;
   1330 
   1331   while (fWait_) {
   1332     int fdmax = -1;
   1333     {
   1334       CritScope cr(&crit_);
   1335       for (size_t i = 0; i < dispatchers_.size(); ++i) {
   1336         // Query dispatchers for read and write wait state
   1337         Dispatcher *pdispatcher = dispatchers_[i];
   1338         ASSERT(pdispatcher);
   1339         if (!process_io && (pdispatcher != signal_wakeup_))
   1340           continue;
   1341         int fd = pdispatcher->GetDescriptor();
   1342         if (fd > fdmax)
   1343           fdmax = fd;
   1344 
   1345         uint32 ff = pdispatcher->GetRequestedEvents();
   1346         if (ff & (DE_READ | DE_ACCEPT))
   1347           FD_SET(fd, &fdsRead);
   1348         if (ff & (DE_WRITE | DE_CONNECT))
   1349           FD_SET(fd, &fdsWrite);
   1350       }
   1351     }
   1352 
   1353     // Wait then call handlers as appropriate
   1354     // < 0 means error
   1355     // 0 means timeout
   1356     // > 0 means count of descriptors ready
   1357     int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait);
   1358 
   1359     // If error, return error.
   1360     if (n < 0) {
   1361       if (errno != EINTR) {
   1362         LOG_E(LS_ERROR, EN, errno) << "select";
   1363         return false;
   1364       }
   1365       // Else ignore the error and keep going. If this EINTR was for one of the
   1366       // signals managed by this PhysicalSocketServer, the
   1367       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
   1368       // iteration.
   1369     } else if (n == 0) {
   1370       // If timeout, return success
   1371       return true;
   1372     } else {
   1373       // We have signaled descriptors
   1374       CritScope cr(&crit_);
   1375       for (size_t i = 0; i < dispatchers_.size(); ++i) {
   1376         Dispatcher *pdispatcher = dispatchers_[i];
   1377         int fd = pdispatcher->GetDescriptor();
   1378         uint32 ff = 0;
   1379         int errcode = 0;
   1380 
   1381         // Reap any error code, which can be signaled through reads or writes.
   1382         // TODO: Should we set errcode if getsockopt fails?
   1383         if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
   1384           socklen_t len = sizeof(errcode);
   1385           ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
   1386         }
   1387 
   1388         // Check readable descriptors. If we're waiting on an accept, signal
   1389         // that. Otherwise we're waiting for data, check to see if we're
   1390         // readable or really closed.
   1391         // TODO: Only peek at TCP descriptors.
   1392         if (FD_ISSET(fd, &fdsRead)) {
   1393           FD_CLR(fd, &fdsRead);
   1394           if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
   1395             ff |= DE_ACCEPT;
   1396           } else if (errcode || pdispatcher->IsDescriptorClosed()) {
   1397             ff |= DE_CLOSE;
   1398           } else {
   1399             ff |= DE_READ;
   1400           }
   1401         }
   1402 
   1403         // Check writable descriptors. If we're waiting on a connect, detect
   1404         // success versus failure by the reaped error code.
   1405         if (FD_ISSET(fd, &fdsWrite)) {
   1406           FD_CLR(fd, &fdsWrite);
   1407           if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
   1408             if (!errcode) {
   1409               ff |= DE_CONNECT;
   1410             } else {
   1411               ff |= DE_CLOSE;
   1412             }
   1413           } else {
   1414             ff |= DE_WRITE;
   1415           }
   1416         }
   1417 
   1418         // Tell the descriptor about the event.
   1419         if (ff != 0) {
   1420           pdispatcher->OnPreEvent(ff);
   1421           pdispatcher->OnEvent(ff, errcode);
   1422         }
   1423       }
   1424     }
   1425 
   1426     // Recalc the time remaining to wait. Doing it here means it doesn't get
   1427     // calced twice the first time through the loop
   1428     if (ptvWait) {
   1429       ptvWait->tv_sec = 0;
   1430       ptvWait->tv_usec = 0;
   1431       struct timeval tvT;
   1432       gettimeofday(&tvT, NULL);
   1433       if ((tvStop.tv_sec > tvT.tv_sec)
   1434           || ((tvStop.tv_sec == tvT.tv_sec)
   1435               && (tvStop.tv_usec > tvT.tv_usec))) {
   1436         ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
   1437         ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
   1438         if (ptvWait->tv_usec < 0) {
   1439           ASSERT(ptvWait->tv_sec > 0);
   1440           ptvWait->tv_usec += 1000000;
   1441           ptvWait->tv_sec -= 1;
   1442         }
   1443       }
   1444     }
   1445   }
   1446 
   1447   return true;
   1448 }
   1449 
   1450 static void GlobalSignalHandler(int signum) {
   1451   PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
   1452 }
   1453 
   1454 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
   1455                                                  void (*handler)(int)) {
   1456   // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
   1457   // otherwise set one.
   1458   if (handler == SIG_IGN || handler == SIG_DFL) {
   1459     if (!InstallSignal(signum, handler)) {
   1460       return false;
   1461     }
   1462     if (signal_dispatcher_) {
   1463       signal_dispatcher_->ClearHandler(signum);
   1464       if (!signal_dispatcher_->HasHandlers()) {
   1465         signal_dispatcher_.reset();
   1466       }
   1467     }
   1468   } else {
   1469     if (!signal_dispatcher_) {
   1470       signal_dispatcher_.reset(new PosixSignalDispatcher(this));
   1471     }
   1472     signal_dispatcher_->SetHandler(signum, handler);
   1473     if (!InstallSignal(signum, &GlobalSignalHandler)) {
   1474       return false;
   1475     }
   1476   }
   1477   return true;
   1478 }
   1479 
   1480 Dispatcher* PhysicalSocketServer::signal_dispatcher() {
   1481   return signal_dispatcher_.get();
   1482 }
   1483 
   1484 bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) {
   1485   struct sigaction act;
   1486   // It doesn't really matter what we set this mask to.
   1487   if (sigemptyset(&act.sa_mask) != 0) {
   1488     LOG_ERR(LS_ERROR) << "Couldn't set mask";
   1489     return false;
   1490   }
   1491   act.sa_handler = handler;
   1492   // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it
   1493   // and it's a nuisance. Though some syscalls still return EINTR and there's no
   1494   // real standard for which ones. :(
   1495   act.sa_flags = SA_RESTART;
   1496   if (sigaction(signum, &act, NULL) != 0) {
   1497     LOG_ERR(LS_ERROR) << "Couldn't set sigaction";
   1498     return false;
   1499   }
   1500   return true;
   1501 }
   1502 #endif  // POSIX
   1503 
   1504 #ifdef WIN32
   1505 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
   1506   int cmsTotal = cmsWait;
   1507   int cmsElapsed = 0;
   1508   uint32 msStart = Time();
   1509 
   1510 #if LOGGING
   1511   if (last_tick_dispatch_count_ == 0) {
   1512     last_tick_tracked_ = msStart;
   1513   }
   1514 #endif
   1515 
   1516   fWait_ = true;
   1517   while (fWait_) {
   1518     std::vector<WSAEVENT> events;
   1519     std::vector<Dispatcher *> event_owners;
   1520 
   1521     events.push_back(socket_ev_);
   1522 
   1523     {
   1524       CritScope cr(&crit_);
   1525       size_t i = 0;
   1526       iterators_.push_back(&i);
   1527       // Don't track dispatchers_.size(), because we want to pick up any new
   1528       // dispatchers that were added while processing the loop.
   1529       while (i < dispatchers_.size()) {
   1530         Dispatcher* disp = dispatchers_[i++];
   1531         if (!process_io && (disp != signal_wakeup_))
   1532           continue;
   1533         SOCKET s = disp->GetSocket();
   1534         if (disp->CheckSignalClose()) {
   1535           // We just signalled close, don't poll this socket
   1536         } else if (s != INVALID_SOCKET) {
   1537           WSAEventSelect(s,
   1538                          events[0],
   1539                          FlagsToEvents(disp->GetRequestedEvents()));
   1540         } else {
   1541           events.push_back(disp->GetWSAEvent());
   1542           event_owners.push_back(disp);
   1543         }
   1544       }
   1545       ASSERT(iterators_.back() == &i);
   1546       iterators_.pop_back();
   1547     }
   1548 
   1549     // Which is shorter, the delay wait or the asked wait?
   1550 
   1551     int cmsNext;
   1552     if (cmsWait == kForever) {
   1553       cmsNext = cmsWait;
   1554     } else {
   1555       cmsNext = _max(0, cmsTotal - cmsElapsed);
   1556     }
   1557 
   1558     // Wait for one of the events to signal
   1559     DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
   1560                                         &events[0],
   1561                                         false,
   1562                                         cmsNext,
   1563                                         false);
   1564 
   1565 #if 0  // LOGGING
   1566     // we track this information purely for logging purposes.
   1567     last_tick_dispatch_count_++;
   1568     if (last_tick_dispatch_count_ >= 1000) {
   1569       int32 elapsed = TimeSince(last_tick_tracked_);
   1570       LOG(INFO) << "PhysicalSocketServer took " << elapsed
   1571                 << "ms for 1000 events";
   1572 
   1573       // If we get more than 1000 events in a second, we are spinning badly
   1574       // (normally it should take about 8-20 seconds).
   1575       ASSERT(elapsed > 1000);
   1576 
   1577       last_tick_tracked_ = Time();
   1578       last_tick_dispatch_count_ = 0;
   1579     }
   1580 #endif
   1581 
   1582     if (dw == WSA_WAIT_FAILED) {
   1583       // Failed?
   1584       // TODO: need a better strategy than this!
   1585       int error = WSAGetLastError();
   1586       ASSERT(false);
   1587       return false;
   1588     } else if (dw == WSA_WAIT_TIMEOUT) {
   1589       // Timeout?
   1590       return true;
   1591     } else {
   1592       // Figure out which one it is and call it
   1593       CritScope cr(&crit_);
   1594       int index = dw - WSA_WAIT_EVENT_0;
   1595       if (index > 0) {
   1596         --index; // The first event is the socket event
   1597         event_owners[index]->OnPreEvent(0);
   1598         event_owners[index]->OnEvent(0, 0);
   1599       } else if (process_io) {
   1600         size_t i = 0, end = dispatchers_.size();
   1601         iterators_.push_back(&i);
   1602         iterators_.push_back(&end);  // Don't iterate over new dispatchers.
   1603         while (i < end) {
   1604           Dispatcher* disp = dispatchers_[i++];
   1605           SOCKET s = disp->GetSocket();
   1606           if (s == INVALID_SOCKET)
   1607             continue;
   1608 
   1609           WSANETWORKEVENTS wsaEvents;
   1610           int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
   1611           if (err == 0) {
   1612 
   1613 #if LOGGING
   1614             {
   1615               if ((wsaEvents.lNetworkEvents & FD_READ) &&
   1616                   wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
   1617                 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error "
   1618                              << wsaEvents.iErrorCode[FD_READ_BIT];
   1619               }
   1620               if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
   1621                   wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
   1622                 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error "
   1623                              << wsaEvents.iErrorCode[FD_WRITE_BIT];
   1624               }
   1625               if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
   1626                   wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
   1627                 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error "
   1628                              << wsaEvents.iErrorCode[FD_CONNECT_BIT];
   1629               }
   1630               if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
   1631                   wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
   1632                 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error "
   1633                              << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
   1634               }
   1635               if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
   1636                   wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
   1637                 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error "
   1638                              << wsaEvents.iErrorCode[FD_CLOSE_BIT];
   1639               }
   1640             }
   1641 #endif
   1642             uint32 ff = 0;
   1643             int errcode = 0;
   1644             if (wsaEvents.lNetworkEvents & FD_READ)
   1645               ff |= DE_READ;
   1646             if (wsaEvents.lNetworkEvents & FD_WRITE)
   1647               ff |= DE_WRITE;
   1648             if (wsaEvents.lNetworkEvents & FD_CONNECT) {
   1649               if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
   1650                 ff |= DE_CONNECT;
   1651               } else {
   1652                 ff |= DE_CLOSE;
   1653                 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
   1654               }
   1655             }
   1656             if (wsaEvents.lNetworkEvents & FD_ACCEPT)
   1657               ff |= DE_ACCEPT;
   1658             if (wsaEvents.lNetworkEvents & FD_CLOSE) {
   1659               ff |= DE_CLOSE;
   1660               errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
   1661             }
   1662             if (ff != 0) {
   1663               disp->OnPreEvent(ff);
   1664               disp->OnEvent(ff, errcode);
   1665             }
   1666           }
   1667         }
   1668         ASSERT(iterators_.back() == &end);
   1669         iterators_.pop_back();
   1670         ASSERT(iterators_.back() == &i);
   1671         iterators_.pop_back();
   1672       }
   1673 
   1674       // Reset the network event until new activity occurs
   1675       WSAResetEvent(socket_ev_);
   1676     }
   1677 
   1678     // Break?
   1679     if (!fWait_)
   1680       break;
   1681     cmsElapsed = TimeSince(msStart);
   1682     if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
   1683        break;
   1684     }
   1685   }
   1686 
   1687   // Done
   1688   return true;
   1689 }
   1690 #endif  // WIN32
   1691 
   1692 }  // namespace talk_base
   1693