Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #if defined(_MSC_VER) && _MSC_VER < 1300
     29 #pragma warning(disable:4786)
     30 #endif
     31 
     32 #include <cassert>
     33 
     34 #ifdef POSIX
     35 #include <string.h>
     36 #include <errno.h>
     37 #include <fcntl.h>
     38 #include <sys/time.h>
     39 #include <unistd.h>
     40 #include <signal.h>
     41 #endif
     42 
     43 #ifdef WIN32
     44 #define WIN32_LEAN_AND_MEAN
     45 #include <windows.h>
     46 #include <winsock2.h>
     47 #include <ws2tcpip.h>
     48 #undef SetPort
     49 #endif
     50 
     51 #include <algorithm>
     52 #include <map>
     53 
     54 #include "talk/base/basictypes.h"
     55 #include "talk/base/byteorder.h"
     56 #include "talk/base/common.h"
     57 #include "talk/base/logging.h"
     58 #include "talk/base/nethelpers.h"
     59 #include "talk/base/physicalsocketserver.h"
     60 #include "talk/base/time.h"
     61 #include "talk/base/winping.h"
     62 #include "talk/base/win32socketinit.h"
     63 
     64 // stm: this will tell us if we are on OSX
     65 #ifdef HAVE_CONFIG_H
     66 #include "config.h"
     67 #endif
     68 
     69 #ifdef POSIX
     70 #include <netinet/tcp.h>  // for TCP_NODELAY
     71 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
     72 typedef void* SockOptArg;
     73 #endif  // POSIX
     74 
     75 #ifdef WIN32
     76 typedef char* SockOptArg;
     77 #endif
     78 
     79 namespace talk_base {
     80 
     81 // Standard MTUs, from RFC 1191
     82 const uint16 PACKET_MAXIMUMS[] = {
     83   65535,    // Theoretical maximum, Hyperchannel
     84   32000,    // Nothing
     85   17914,    // 16Mb IBM Token Ring
     86   8166,     // IEEE 802.4
     87   //4464,   // IEEE 802.5 (4Mb max)
     88   4352,     // FDDI
     89   //2048,   // Wideband Network
     90   2002,     // IEEE 802.5 (4Mb recommended)
     91   //1536,   // Expermental Ethernet Networks
     92   //1500,   // Ethernet, Point-to-Point (default)
     93   1492,     // IEEE 802.3
     94   1006,     // SLIP, ARPANET
     95   //576,    // X.25 Networks
     96   //544,    // DEC IP Portal
     97   //512,    // NETBIOS
     98   508,      // IEEE 802/Source-Rt Bridge, ARCNET
     99   296,      // Point-to-Point (low delay)
    100   68,       // Official minimum
    101   0,        // End of list marker
    102 };
    103 
    104 const uint32 IP_HEADER_SIZE = 20;
    105 const uint32 ICMP_HEADER_SIZE = 8;
    106 
    107 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
    108  public:
    109   PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET)
    110     : ss_(ss), s_(s), enabled_events_(0), error_(0),
    111       state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
    112       resolver_(NULL) {
    113 #ifdef WIN32
    114     // EnsureWinsockInit() ensures that winsock is initialized. The default
    115     // version of this function doesn't do anything because winsock is
    116     // initialized by constructor of a static object. If neccessary libjingle
    117     // users can link it with a different version of this function by replacing
    118     // win32socketinit.cc. See win32socketinit.cc for more details.
    119     EnsureWinsockInit();
    120 #endif
    121     if (s_ != INVALID_SOCKET) {
    122       enabled_events_ = DE_READ | DE_WRITE;
    123 
    124       int type = SOCK_STREAM;
    125       socklen_t len = sizeof(type);
    126       VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
    127       udp_ = (SOCK_DGRAM == type);
    128     }
    129   }
    130 
    131   virtual ~PhysicalSocket() {
    132     Close();
    133   }
    134 
    135   // Creates the underlying OS socket (same as the "socket" function).
    136   virtual bool Create(int type) {
    137     Close();
    138     s_ = ::socket(AF_INET, type, 0);
    139     udp_ = (SOCK_DGRAM == type);
    140     UpdateLastError();
    141     if (udp_)
    142       enabled_events_ = DE_READ | DE_WRITE;
    143     return s_ != INVALID_SOCKET;
    144   }
    145 
    146   SocketAddress GetLocalAddress() const {
    147     sockaddr_in addr;
    148     socklen_t addrlen = sizeof(addr);
    149     int result = ::getsockname(s_, (sockaddr*)&addr, &addrlen);
    150     SocketAddress address;
    151     if (result >= 0) {
    152       ASSERT(addrlen == sizeof(addr));
    153       address.FromSockAddr(addr);
    154     } else {
    155       LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
    156                       << s_;
    157     }
    158     return address;
    159   }
    160 
    161   SocketAddress GetRemoteAddress() const {
    162     sockaddr_in addr;
    163     socklen_t addrlen = sizeof(addr);
    164     int result = ::getpeername(s_, (sockaddr*)&addr, &addrlen);
    165     SocketAddress address;
    166     if (result >= 0) {
    167       ASSERT(addrlen == sizeof(addr));
    168       address.FromSockAddr(addr);
    169     } else {
    170       LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
    171                       << s_;
    172     }
    173     return address;
    174   }
    175 
    176   int Bind(const SocketAddress& addr) {
    177     sockaddr_in saddr;
    178     addr.ToSockAddr(&saddr);
    179     int err = ::bind(s_, (sockaddr*)&saddr, sizeof(saddr));
    180     UpdateLastError();
    181 #ifdef _DEBUG
    182     if (0 == err) {
    183       dbg_addr_ = "Bound @ ";
    184       dbg_addr_.append(GetLocalAddress().ToString());
    185     }
    186 #endif  // _DEBUG
    187     return err;
    188   }
    189 
    190   int Connect(const SocketAddress& addr) {
    191     // TODO: Implicit creation is required to reconnect...
    192     // ...but should we make it more explicit?
    193     if ((s_ == INVALID_SOCKET) && !Create(SOCK_STREAM))
    194       return SOCKET_ERROR;
    195     if (addr.IsUnresolved()) {
    196       if (state_ != CS_CLOSED) {
    197         SetError(EALREADY);
    198         return SOCKET_ERROR;
    199       }
    200 
    201       LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
    202       resolver_ = new AsyncResolver();
    203       resolver_->set_address(addr);
    204       resolver_->SignalWorkDone.connect(this, &PhysicalSocket::OnResolveResult);
    205       resolver_->Start();
    206       state_ = CS_CONNECTING;
    207       return 0;
    208     }
    209 
    210     return DoConnect(addr);
    211   }
    212 
    213   int DoConnect(const SocketAddress& addr) {
    214     sockaddr_in saddr;
    215     addr.ToSockAddr(&saddr);
    216     int err = ::connect(s_, (sockaddr*)&saddr, sizeof(saddr));
    217     UpdateLastError();
    218     if (err == 0) {
    219       state_ = CS_CONNECTED;
    220     } else if (IsBlockingError(error_)) {
    221       state_ = CS_CONNECTING;
    222       enabled_events_ |= DE_CONNECT;
    223     } else {
    224       return SOCKET_ERROR;
    225     }
    226 
    227     enabled_events_ |= DE_READ | DE_WRITE;
    228     return 0;
    229   }
    230 
    231   int GetError() const {
    232     return error_;
    233   }
    234 
    235   void SetError(int error) {
    236     error_ = error;
    237   }
    238 
    239   ConnState GetState() const {
    240     return state_;
    241   }
    242 
    243   int GetOption(Option opt, int* value) {
    244     int slevel;
    245     int sopt;
    246     if (TranslateOption(opt, &slevel, &sopt) == -1)
    247       return -1;
    248     socklen_t optlen = sizeof(*value);
    249     int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
    250     if (ret != -1 && opt == OPT_DONTFRAGMENT) {
    251 #ifdef LINUX
    252       *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
    253 #endif
    254     }
    255     return ret;
    256   }
    257 
    258   int SetOption(Option opt, int value) {
    259     int slevel;
    260     int sopt;
    261     if (TranslateOption(opt, &slevel, &sopt) == -1)
    262       return -1;
    263     if (opt == OPT_DONTFRAGMENT) {
    264 #ifdef LINUX
    265       value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
    266 #endif
    267     }
    268     return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
    269   }
    270 
    271   int Send(const void *pv, size_t cb) {
    272     int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
    273 #ifdef LINUX
    274         // Suppress SIGPIPE. Without this, attempting to send on a socket whose
    275         // other end is closed will result in a SIGPIPE signal being raised to
    276         // our process, which by default will terminate the process, which we
    277         // don't want. By specifying this flag, we'll just get the error EPIPE
    278         // instead and can handle the error gracefully.
    279         MSG_NOSIGNAL
    280 #else
    281         0
    282 #endif
    283         );
    284     UpdateLastError();
    285     // We have seen minidumps where this may be false.
    286     ASSERT(sent <= static_cast<int>(cb));
    287     if ((sent < 0) && IsBlockingError(error_)) {
    288       enabled_events_ |= DE_WRITE;
    289     }
    290     return sent;
    291   }
    292 
    293   int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
    294     sockaddr_in saddr;
    295     addr.ToSockAddr(&saddr);
    296     int sent = ::sendto(
    297         s_, (const char *)pv, (int)cb,
    298 #ifdef LINUX
    299         // Suppress SIGPIPE. See above for explanation.
    300         MSG_NOSIGNAL,
    301 #else
    302         0,
    303 #endif
    304         (sockaddr*)&saddr, sizeof(saddr));
    305     UpdateLastError();
    306     // We have seen minidumps where this may be false.
    307     ASSERT(sent <= static_cast<int>(cb));
    308     if ((sent < 0) && IsBlockingError(error_)) {
    309       enabled_events_ |= DE_WRITE;
    310     }
    311     return sent;
    312   }
    313 
    314   int Recv(void *pv, size_t cb) {
    315     int received = ::recv(s_, (char *)pv, (int)cb, 0);
    316     if ((received == 0) && (cb != 0)) {
    317       // Note: on graceful shutdown, recv can return 0.  In this case, we
    318       // pretend it is blocking, and then signal close, so that simplifying
    319       // assumptions can be made about Recv.
    320       LOG(LS_WARNING) << "EOF from socket; deferring close event";
    321       // Must turn this back on so that the select() loop will notice the close
    322       // event.
    323       enabled_events_ |= DE_READ;
    324       error_ = EWOULDBLOCK;
    325       return SOCKET_ERROR;
    326     }
    327     UpdateLastError();
    328     bool success = (received >= 0) || IsBlockingError(error_);
    329     if (udp_ || success) {
    330       enabled_events_ |= DE_READ;
    331     }
    332     if (!success) {
    333       LOG_F(LS_VERBOSE) << "Error = " << error_;
    334     }
    335     return received;
    336   }
    337 
    338   int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
    339     sockaddr_in saddr;
    340     socklen_t cbAddr = sizeof(saddr);
    341     int received = ::recvfrom(s_, (char *)pv, (int)cb, 0, (sockaddr*)&saddr,
    342                               &cbAddr);
    343     UpdateLastError();
    344     if ((received >= 0) && (paddr != NULL))
    345       paddr->FromSockAddr(saddr);
    346     bool success = (received >= 0) || IsBlockingError(error_);
    347     if (udp_ || success) {
    348       enabled_events_ |= DE_READ;
    349     }
    350     if (!success) {
    351       LOG_F(LS_VERBOSE) << "Error = " << error_;
    352     }
    353     return received;
    354   }
    355 
    356   int Listen(int backlog) {
    357     int err = ::listen(s_, backlog);
    358     UpdateLastError();
    359     if (err == 0) {
    360       state_ = CS_CONNECTING;
    361       enabled_events_ |= DE_ACCEPT;
    362 #ifdef _DEBUG
    363       dbg_addr_ = "Listening @ ";
    364       dbg_addr_.append(GetLocalAddress().ToString());
    365 #endif  // _DEBUG
    366     }
    367     return err;
    368   }
    369 
    370   AsyncSocket* Accept(SocketAddress *paddr) {
    371     sockaddr_in saddr;
    372     socklen_t cbAddr = sizeof(saddr);
    373     SOCKET s = ::accept(s_, (sockaddr*)&saddr, &cbAddr);
    374     UpdateLastError();
    375     if (s == INVALID_SOCKET)
    376       return NULL;
    377     enabled_events_ |= DE_ACCEPT;
    378     if (paddr != NULL)
    379       paddr->FromSockAddr(saddr);
    380     return ss_->WrapSocket(s);
    381   }
    382 
    383   int Close() {
    384     if (s_ == INVALID_SOCKET)
    385       return 0;
    386     int err = ::closesocket(s_);
    387     UpdateLastError();
    388     s_ = INVALID_SOCKET;
    389     state_ = CS_CLOSED;
    390     enabled_events_ = 0;
    391     if (resolver_) {
    392       resolver_->Destroy(false);
    393       resolver_ = NULL;
    394     }
    395     return err;
    396   }
    397 
    398   int EstimateMTU(uint16* mtu) {
    399     SocketAddress addr = GetRemoteAddress();
    400     if (addr.IsAny()) {
    401       error_ = ENOTCONN;
    402       return -1;
    403     }
    404 
    405 #if defined(WIN32)
    406     // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
    407     WinPing ping;
    408     if (!ping.IsValid()) {
    409       error_ = EINVAL; // can't think of a better error ID
    410       return -1;
    411     }
    412 
    413     for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
    414       int32 size = PACKET_MAXIMUMS[level] - IP_HEADER_SIZE - ICMP_HEADER_SIZE;
    415       WinPing::PingResult result = ping.Ping(addr.ip(), size, 0, 1, false);
    416       if (result == WinPing::PING_FAIL) {
    417         error_ = EINVAL; // can't think of a better error ID
    418         return -1;
    419       } else if (result != WinPing::PING_TOO_LARGE) {
    420         *mtu = PACKET_MAXIMUMS[level];
    421         return 0;
    422       }
    423     }
    424 
    425     ASSERT(false);
    426     return -1;
    427 #elif defined(IOS) || defined(OSX)
    428     // No simple way to do this on Mac OS X.
    429     // SIOCGIFMTU would work if we knew which interface would be used, but
    430     // figuring that out is pretty complicated. For now we'll return an error
    431     // and let the caller pick a default MTU.
    432     error_ = EINVAL;
    433     return -1;
    434 #elif defined(LINUX) || defined(ANDROID)
    435     // Gets the path MTU.
    436     int value;
    437     socklen_t vlen = sizeof(value);
    438     int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
    439     if (err < 0) {
    440       UpdateLastError();
    441       return err;
    442     }
    443 
    444     ASSERT((0 <= value) && (value <= 65536));
    445     *mtu = value;
    446     return 0;
    447 #endif
    448   }
    449 
    450   SocketServer* socketserver() { return ss_; }
    451 
    452  protected:
    453   void OnResolveResult(SignalThread* thread) {
    454     if (thread != resolver_) {
    455       return;
    456     }
    457 
    458     int error = resolver_->error();
    459     if (error == 0) {
    460       error = DoConnect(resolver_->address());
    461     } else {
    462       Close();
    463     }
    464 
    465     if (error) {
    466       error_ = error;
    467       SignalCloseEvent(this, error_);
    468     }
    469   }
    470 
    471   void UpdateLastError() {
    472     error_ = LAST_SYSTEM_ERROR;
    473   }
    474 
    475   static int TranslateOption(Option opt, int* slevel, int* sopt) {
    476     switch (opt) {
    477       case OPT_DONTFRAGMENT:
    478 #ifdef WIN32
    479         *slevel = IPPROTO_IP;
    480         *sopt = IP_DONTFRAGMENT;
    481         break;
    482 #elif defined(IOS) || defined(OSX) || defined(BSD)
    483         LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
    484         return -1;
    485 #elif defined(POSIX)
    486         *slevel = IPPROTO_IP;
    487         *sopt = IP_MTU_DISCOVER;
    488         break;
    489 #endif
    490       case OPT_RCVBUF:
    491         *slevel = SOL_SOCKET;
    492         *sopt = SO_RCVBUF;
    493         break;
    494       case OPT_SNDBUF:
    495         *slevel = SOL_SOCKET;
    496         *sopt = SO_SNDBUF;
    497         break;
    498       case OPT_NODELAY:
    499         *slevel = IPPROTO_TCP;
    500         *sopt = TCP_NODELAY;
    501         break;
    502       default:
    503         ASSERT(false);
    504         return -1;
    505     }
    506     return 0;
    507   }
    508 
    509   PhysicalSocketServer* ss_;
    510   SOCKET s_;
    511   uint8 enabled_events_;
    512   bool udp_;
    513   int error_;
    514   ConnState state_;
    515   AsyncResolver* resolver_;
    516 
    517 #ifdef _DEBUG
    518   std::string dbg_addr_;
    519 #endif  // _DEBUG;
    520 };
    521 
    522 #ifdef POSIX
    523 class EventDispatcher : public Dispatcher {
    524  public:
    525   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
    526     if (pipe(afd_) < 0)
    527       LOG(LERROR) << "pipe failed";
    528     ss_->Add(this);
    529   }
    530 
    531   virtual ~EventDispatcher() {
    532     ss_->Remove(this);
    533     close(afd_[0]);
    534     close(afd_[1]);
    535   }
    536 
    537   virtual void Signal() {
    538     CritScope cs(&crit_);
    539     if (!fSignaled_) {
    540       const uint8 b[1] = { 0 };
    541       if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) {
    542         fSignaled_ = true;
    543       }
    544     }
    545   }
    546 
    547   virtual uint32 GetRequestedEvents() {
    548     return DE_READ;
    549   }
    550 
    551   virtual void OnPreEvent(uint32 ff) {
    552     // It is not possible to perfectly emulate an auto-resetting event with
    553     // pipes.  This simulates it by resetting before the event is handled.
    554 
    555     CritScope cs(&crit_);
    556     if (fSignaled_) {
    557       uint8 b[4];  // Allow for reading more than 1 byte, but expect 1.
    558       VERIFY(1 == read(afd_[0], b, sizeof(b)));
    559       fSignaled_ = false;
    560     }
    561   }
    562 
    563   virtual void OnEvent(uint32 ff, int err) {
    564     ASSERT(false);
    565   }
    566 
    567   virtual int GetDescriptor() {
    568     return afd_[0];
    569   }
    570 
    571   virtual bool IsDescriptorClosed() {
    572     return false;
    573   }
    574 
    575  private:
    576   PhysicalSocketServer *ss_;
    577   int afd_[2];
    578   bool fSignaled_;
    579   CriticalSection crit_;
    580 };
    581 
    582 // These two classes use the self-pipe trick to deliver POSIX signals to our
    583 // select loop. This is the only safe, reliable, cross-platform way to do
    584 // non-trivial things with a POSIX signal in an event-driven program (until
    585 // proper pselect() implementations become ubiquitous).
    586 
    587 class PosixSignalHandler {
    588  public:
    589   // POSIX only specifies 32 signals, but in principle the system might have
    590   // more and the programmer might choose to use them, so we size our array
    591   // for 128.
    592   static const int kNumPosixSignals = 128;
    593 
    594   static PosixSignalHandler *Instance() { return &instance_; }
    595 
    596   // Returns true if the given signal number is set.
    597   bool IsSignalSet(int signum) const {
    598     ASSERT(signum < ARRAY_SIZE(received_signal_));
    599     if (signum < ARRAY_SIZE(received_signal_)) {
    600       return received_signal_[signum];
    601     } else {
    602       return false;
    603     }
    604   }
    605 
    606   // Clears the given signal number.
    607   void ClearSignal(int signum) {
    608     ASSERT(signum < ARRAY_SIZE(received_signal_));
    609     if (signum < ARRAY_SIZE(received_signal_)) {
    610       received_signal_[signum] = false;
    611     }
    612   }
    613 
    614   // Returns the file descriptor to monitor for signal events.
    615   int GetDescriptor() const {
    616     return afd_[0];
    617   }
    618 
    619   // This is called directly from our real signal handler, so it must be
    620   // signal-handler-safe. That means it cannot assume anything about the
    621   // user-level state of the process, since the handler could be executed at any
    622   // time on any thread.
    623   void OnPosixSignalReceived(int signum) {
    624     if (signum >= ARRAY_SIZE(received_signal_)) {
    625       // We don't have space in our array for this.
    626       return;
    627     }
    628     // Set a flag saying we've seen this signal.
    629     received_signal_[signum] = true;
    630     // Notify application code that we got a signal.
    631     const uint8 b[1] = { 0 };
    632     if (-1 == write(afd_[1], b, sizeof(b))) {
    633       // Nothing we can do here. If there's an error somehow then there's
    634       // nothing we can safely do from a signal handler.
    635       // No, we can't even safely log it.
    636       // But, we still have to check the return value here. Otherwise,
    637       // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help.
    638       return;
    639     }
    640   }
    641 
    642  private:
    643   PosixSignalHandler() {
    644     if (pipe(afd_) < 0) {
    645       LOG_ERR(LS_ERROR) << "pipe failed";
    646       return;
    647     }
    648     if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) {
    649       LOG_ERR(LS_WARNING) << "fcntl #1 failed";
    650     }
    651     if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) {
    652       LOG_ERR(LS_WARNING) << "fcntl #2 failed";
    653     }
    654     memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)),
    655            0,
    656            sizeof(received_signal_));
    657   }
    658 
    659   ~PosixSignalHandler() {
    660     int fd1 = afd_[0];
    661     int fd2 = afd_[1];
    662     // We clobber the stored file descriptor numbers here or else in principle
    663     // a signal that happens to be delivered during application termination
    664     // could erroneously write a zero byte to an unrelated file handle in
    665     // OnPosixSignalReceived() if some other file happens to be opened later
    666     // during shutdown and happens to be given the same file descriptor number
    667     // as our pipe had. Unfortunately even with this precaution there is still a
    668     // race where that could occur if said signal happens to be handled
    669     // concurrently with this code and happens to have already read the value of
    670     // afd_[1] from memory before we clobber it, but that's unlikely.
    671     afd_[0] = -1;
    672     afd_[1] = -1;
    673     close(fd1);
    674     close(fd2);
    675   }
    676 
    677   // There is just a single global instance. (Signal handlers do not get any
    678   // sort of user-defined void * parameter, so they can't access anything that
    679   // isn't global.)
    680   static PosixSignalHandler instance_;
    681 
    682   int afd_[2];
    683   // These are boolean flags that will be set in our signal handler and read
    684   // and cleared from Wait(). There is a race involved in this, but it is
    685   // benign. The signal handler sets the flag before signaling the pipe, so
    686   // we'll never end up blocking in select() while a flag is still true.
    687   // However, if two of the same signal arrive close to each other then it's
    688   // possible that the second time the handler may set the flag while it's still
    689   // true, meaning that signal will be missed. But the first occurrence of it
    690   // will still be handled, so this isn't a problem.
    691   // Volatile is not necessary here for correctness, but this data _is_ volatile
    692   // so I've marked it as such.
    693   volatile uint8 received_signal_[kNumPosixSignals];
    694 };
    695 
    696 PosixSignalHandler PosixSignalHandler::instance_;
    697 
    698 class PosixSignalDispatcher : public Dispatcher {
    699  public:
    700   PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) {
    701     owner_->Add(this);
    702   }
    703 
    704   virtual ~PosixSignalDispatcher() {
    705     owner_->Remove(this);
    706   }
    707 
    708   virtual uint32 GetRequestedEvents() {
    709     return DE_READ;
    710   }
    711 
    712   virtual void OnPreEvent(uint32 ff) {
    713     // Events might get grouped if signals come very fast, so we read out up to
    714     // 16 bytes to make sure we keep the pipe empty.
    715     uint8 b[16];
    716     ssize_t ret = read(GetDescriptor(), b, sizeof(b));
    717     if (ret < 0) {
    718       LOG_ERR(LS_WARNING) << "Error in read()";
    719     } else if (ret == 0) {
    720       LOG(LS_WARNING) << "Should have read at least one byte";
    721     }
    722   }
    723 
    724   virtual void OnEvent(uint32 ff, int err) {
    725     for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals;
    726          ++signum) {
    727       if (PosixSignalHandler::Instance()->IsSignalSet(signum)) {
    728         PosixSignalHandler::Instance()->ClearSignal(signum);
    729         HandlerMap::iterator i = handlers_.find(signum);
    730         if (i == handlers_.end()) {
    731           // This can happen if a signal is delivered to our process at around
    732           // the same time as we unset our handler for it. It is not an error
    733           // condition, but it's unusual enough to be worth logging.
    734           LOG(LS_INFO) << "Received signal with no handler: " << signum;
    735         } else {
    736           // Otherwise, execute our handler.
    737           (*i->second)(signum);
    738         }
    739       }
    740     }
    741   }
    742 
    743   virtual int GetDescriptor() {
    744     return PosixSignalHandler::Instance()->GetDescriptor();
    745   }
    746 
    747   virtual bool IsDescriptorClosed() {
    748     return false;
    749   }
    750 
    751   void SetHandler(int signum, void (*handler)(int)) {
    752     handlers_[signum] = handler;
    753   }
    754 
    755   void ClearHandler(int signum) {
    756     handlers_.erase(signum);
    757   }
    758 
    759   bool HasHandlers() {
    760     return !handlers_.empty();
    761   }
    762 
    763  private:
    764   typedef std::map<int, void (*)(int)> HandlerMap;
    765 
    766   HandlerMap handlers_;
    767   // Our owner.
    768   PhysicalSocketServer *owner_;
    769 };
    770 
    771 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
    772  public:
    773   explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {
    774   }
    775   SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
    776   }
    777 
    778   virtual ~SocketDispatcher() {
    779     Close();
    780   }
    781 
    782   bool Initialize() {
    783     ss_->Add(this);
    784     fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
    785     return true;
    786   }
    787 
    788   virtual bool Create(int type) {
    789     // Change the socket to be non-blocking.
    790     if (!PhysicalSocket::Create(type))
    791       return false;
    792 
    793     return Initialize();
    794   }
    795 
    796   virtual int GetDescriptor() {
    797     return s_;
    798   }
    799 
    800   virtual bool IsDescriptorClosed() {
    801     // We don't have a reliable way of distinguishing end-of-stream
    802     // from readability.  So test on each readable call.  Is this
    803     // inefficient?  Probably.
    804     char ch;
    805     ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
    806     if (res > 0) {
    807       // Data available, so not closed.
    808       return false;
    809     } else if (res == 0) {
    810       // EOF, so closed.
    811       return true;
    812     } else {  // error
    813       switch (errno) {
    814         // Returned if we've already closed s_.
    815         case EBADF:
    816         // Returned during ungraceful peer shutdown.
    817         case ECONNRESET:
    818           return true;
    819         default:
    820           // Assume that all other errors are just blocking errors, meaning the
    821           // connection is still good but we just can't read from it right now.
    822           // This should only happen when connecting (and at most once), because
    823           // in all other cases this function is only called if the file
    824           // descriptor is already known to be in the readable state. However,
    825           // it's not necessary a problem if we spuriously interpret a
    826           // "connection lost"-type error as a blocking error, because typically
    827           // the next recv() will get EOF, so we'll still eventually notice that
    828           // the socket is closed.
    829           LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
    830           return false;
    831       }
    832     }
    833   }
    834 
    835   virtual uint32 GetRequestedEvents() {
    836     return enabled_events_;
    837   }
    838 
    839   virtual void OnPreEvent(uint32 ff) {
    840     if ((ff & DE_CONNECT) != 0)
    841       state_ = CS_CONNECTED;
    842     if ((ff & DE_CLOSE) != 0)
    843       state_ = CS_CLOSED;
    844   }
    845 
    846   virtual void OnEvent(uint32 ff, int err) {
    847     if ((ff & DE_READ) != 0) {
    848       enabled_events_ &= ~DE_READ;
    849       SignalReadEvent(this);
    850     }
    851     if ((ff & DE_WRITE) != 0) {
    852       enabled_events_ &= ~DE_WRITE;
    853       SignalWriteEvent(this);
    854     }
    855     if ((ff & DE_CONNECT) != 0) {
    856       enabled_events_ &= ~DE_CONNECT;
    857       SignalConnectEvent(this);
    858     }
    859     if ((ff & DE_ACCEPT) != 0) {
    860       enabled_events_ &= ~DE_ACCEPT;
    861       SignalReadEvent(this);
    862     }
    863     if ((ff & DE_CLOSE) != 0) {
    864       // The socket is now dead to us, so stop checking it.
    865       enabled_events_ = 0;
    866       SignalCloseEvent(this, err);
    867     }
    868   }
    869 
    870   virtual int Close() {
    871     if (s_ == INVALID_SOCKET)
    872       return 0;
    873 
    874     ss_->Remove(this);
    875     return PhysicalSocket::Close();
    876   }
    877 };
    878 
    879 class FileDispatcher: public Dispatcher, public AsyncFile {
    880  public:
    881   FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
    882     set_readable(true);
    883 
    884     ss_->Add(this);
    885 
    886     fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
    887   }
    888 
    889   virtual ~FileDispatcher() {
    890     ss_->Remove(this);
    891   }
    892 
    893   SocketServer* socketserver() { return ss_; }
    894 
    895   virtual int GetDescriptor() {
    896     return fd_;
    897   }
    898 
    899   virtual bool IsDescriptorClosed() {
    900     return false;
    901   }
    902 
    903   virtual uint32 GetRequestedEvents() {
    904     return flags_;
    905   }
    906 
    907   virtual void OnPreEvent(uint32 ff) {
    908   }
    909 
    910   virtual void OnEvent(uint32 ff, int err) {
    911     if ((ff & DE_READ) != 0)
    912       SignalReadEvent(this);
    913     if ((ff & DE_WRITE) != 0)
    914       SignalWriteEvent(this);
    915     if ((ff & DE_CLOSE) != 0)
    916       SignalCloseEvent(this, err);
    917   }
    918 
    919   virtual bool readable() {
    920     return (flags_ & DE_READ) != 0;
    921   }
    922 
    923   virtual void set_readable(bool value) {
    924     flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ);
    925   }
    926 
    927   virtual bool writable() {
    928     return (flags_ & DE_WRITE) != 0;
    929   }
    930 
    931   virtual void set_writable(bool value) {
    932     flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE);
    933   }
    934 
    935  private:
    936   PhysicalSocketServer* ss_;
    937   int fd_;
    938   int flags_;
    939 };
    940 
    941 AsyncFile* PhysicalSocketServer::CreateFile(int fd) {
    942   return new FileDispatcher(fd, this);
    943 }
    944 
    945 #endif // POSIX
    946 
    947 #ifdef WIN32
    948 static uint32 FlagsToEvents(uint32 events) {
    949   uint32 ffFD = FD_CLOSE;
    950   if (events & DE_READ)
    951     ffFD |= FD_READ;
    952   if (events & DE_WRITE)
    953     ffFD |= FD_WRITE;
    954   if (events & DE_CONNECT)
    955     ffFD |= FD_CONNECT;
    956   if (events & DE_ACCEPT)
    957     ffFD |= FD_ACCEPT;
    958   return ffFD;
    959 }
    960 
    961 class EventDispatcher : public Dispatcher {
    962  public:
    963   EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) {
    964     hev_ = WSACreateEvent();
    965     if (hev_) {
    966       ss_->Add(this);
    967     }
    968   }
    969 
    970   ~EventDispatcher() {
    971     if (hev_ != NULL) {
    972       ss_->Remove(this);
    973       WSACloseEvent(hev_);
    974       hev_ = NULL;
    975     }
    976   }
    977 
    978   virtual void Signal() {
    979     if (hev_ != NULL)
    980       WSASetEvent(hev_);
    981   }
    982 
    983   virtual uint32 GetRequestedEvents() {
    984     return 0;
    985   }
    986 
    987   virtual void OnPreEvent(uint32 ff) {
    988     WSAResetEvent(hev_);
    989   }
    990 
    991   virtual void OnEvent(uint32 ff, int err) {
    992   }
    993 
    994   virtual WSAEVENT GetWSAEvent() {
    995     return hev_;
    996   }
    997 
    998   virtual SOCKET GetSocket() {
    999     return INVALID_SOCKET;
   1000   }
   1001 
   1002   virtual bool CheckSignalClose() { return false; }
   1003 
   1004 private:
   1005   PhysicalSocketServer* ss_;
   1006   WSAEVENT hev_;
   1007 };
   1008 
   1009 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
   1010  public:
   1011   static int next_id_;
   1012   int id_;
   1013   bool signal_close_;
   1014   int signal_err_;
   1015 
   1016   SocketDispatcher(PhysicalSocketServer* ss)
   1017       : PhysicalSocket(ss),
   1018         id_(0),
   1019         signal_close_(false) {
   1020   }
   1021 
   1022   SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
   1023       : PhysicalSocket(ss, s),
   1024         id_(0),
   1025         signal_close_(false) {
   1026   }
   1027 
   1028   virtual ~SocketDispatcher() {
   1029     Close();
   1030   }
   1031 
   1032   bool Initialize() {
   1033     ASSERT(s_ != INVALID_SOCKET);
   1034     // Must be a non-blocking
   1035     u_long argp = 1;
   1036     ioctlsocket(s_, FIONBIO, &argp);
   1037     ss_->Add(this);
   1038     return true;
   1039   }
   1040 
   1041   virtual bool Create(int type) {
   1042     // Create socket
   1043     if (!PhysicalSocket::Create(type))
   1044       return false;
   1045 
   1046     if (!Initialize())
   1047       return false;
   1048 
   1049     do { id_ = ++next_id_; } while (id_ == 0);
   1050     return true;
   1051   }
   1052 
   1053   virtual int Close() {
   1054     if (s_ == INVALID_SOCKET)
   1055       return 0;
   1056 
   1057     id_ = 0;
   1058     signal_close_ = false;
   1059     ss_->Remove(this);
   1060     return PhysicalSocket::Close();
   1061   }
   1062 
   1063   virtual uint32 GetRequestedEvents() {
   1064     return enabled_events_;
   1065   }
   1066 
   1067   virtual void OnPreEvent(uint32 ff) {
   1068     if ((ff & DE_CONNECT) != 0)
   1069       state_ = CS_CONNECTED;
   1070     // We set CS_CLOSED from CheckSignalClose.
   1071   }
   1072 
   1073   virtual void OnEvent(uint32 ff, int err) {
   1074     int cache_id = id_;
   1075     if ((ff & DE_READ) != 0) {
   1076       enabled_events_ &= ~DE_READ;
   1077       SignalReadEvent(this);
   1078     }
   1079     if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
   1080       enabled_events_ &= ~DE_WRITE;
   1081       SignalWriteEvent(this);
   1082     }
   1083     if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
   1084       if (ff != DE_CONNECT)
   1085         LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
   1086       enabled_events_ &= ~DE_CONNECT;
   1087 #ifdef _DEBUG
   1088       dbg_addr_ = "Connected @ ";
   1089       dbg_addr_.append(GetRemoteAddress().ToString());
   1090 #endif  // _DEBUG
   1091       SignalConnectEvent(this);
   1092     }
   1093     if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
   1094       enabled_events_ &= ~DE_ACCEPT;
   1095       SignalReadEvent(this);
   1096     }
   1097     if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
   1098       signal_close_ = true;
   1099       signal_err_ = err;
   1100     }
   1101   }
   1102 
   1103   virtual WSAEVENT GetWSAEvent() {
   1104     return WSA_INVALID_EVENT;
   1105   }
   1106 
   1107   virtual SOCKET GetSocket() {
   1108     return s_;
   1109   }
   1110 
   1111   virtual bool CheckSignalClose() {
   1112     if (!signal_close_)
   1113       return false;
   1114 
   1115     char ch;
   1116     if (recv(s_, &ch, 1, MSG_PEEK) > 0)
   1117       return false;
   1118 
   1119     state_ = CS_CLOSED;
   1120     signal_close_ = false;
   1121     SignalCloseEvent(this, signal_err_);
   1122     return true;
   1123   }
   1124 };
   1125 
   1126 int SocketDispatcher::next_id_ = 0;
   1127 
   1128 #endif  // WIN32
   1129 
   1130 // Sets the value of a boolean value to false when signaled.
   1131 class Signaler : public EventDispatcher {
   1132  public:
   1133   Signaler(PhysicalSocketServer* ss, bool* pf)
   1134       : EventDispatcher(ss), pf_(pf) {
   1135   }
   1136   virtual ~Signaler() { }
   1137 
   1138   void OnEvent(uint32 ff, int err) {
   1139     if (pf_)
   1140       *pf_ = false;
   1141   }
   1142 
   1143  private:
   1144   bool *pf_;
   1145 };
   1146 
   1147 PhysicalSocketServer::PhysicalSocketServer()
   1148     : fWait_(false),
   1149       last_tick_tracked_(0),
   1150       last_tick_dispatch_count_(0) {
   1151   signal_wakeup_ = new Signaler(this, &fWait_);
   1152 #ifdef WIN32
   1153   socket_ev_ = WSACreateEvent();
   1154 #endif
   1155 }
   1156 
   1157 PhysicalSocketServer::~PhysicalSocketServer() {
   1158 #ifdef WIN32
   1159   WSACloseEvent(socket_ev_);
   1160 #endif
   1161 #ifdef POSIX
   1162   signal_dispatcher_.reset();
   1163 #endif
   1164   delete signal_wakeup_;
   1165   ASSERT(dispatchers_.empty());
   1166 }
   1167 
   1168 void PhysicalSocketServer::WakeUp() {
   1169   signal_wakeup_->Signal();
   1170 }
   1171 
   1172 Socket* PhysicalSocketServer::CreateSocket(int type) {
   1173   PhysicalSocket* socket = new PhysicalSocket(this);
   1174   if (socket->Create(type)) {
   1175     return socket;
   1176   } else {
   1177     delete socket;
   1178     return 0;
   1179   }
   1180 }
   1181 
   1182 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
   1183   SocketDispatcher* dispatcher = new SocketDispatcher(this);
   1184   if (dispatcher->Create(type)) {
   1185     return dispatcher;
   1186   } else {
   1187     delete dispatcher;
   1188     return 0;
   1189   }
   1190 }
   1191 
   1192 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
   1193   SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
   1194   if (dispatcher->Initialize()) {
   1195     return dispatcher;
   1196   } else {
   1197     delete dispatcher;
   1198     return 0;
   1199   }
   1200 }
   1201 
   1202 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
   1203   CritScope cs(&crit_);
   1204   // Prevent duplicates. This can cause dead dispatchers to stick around.
   1205   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
   1206                                            dispatchers_.end(),
   1207                                            pdispatcher);
   1208   if (pos != dispatchers_.end())
   1209     return;
   1210   dispatchers_.push_back(pdispatcher);
   1211 }
   1212 
   1213 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
   1214   CritScope cs(&crit_);
   1215   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
   1216                                            dispatchers_.end(),
   1217                                            pdispatcher);
   1218   ASSERT(pos != dispatchers_.end());
   1219   size_t index = pos - dispatchers_.begin();
   1220   dispatchers_.erase(pos);
   1221   for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
   1222        ++it) {
   1223     if (index < **it) {
   1224       --**it;
   1225     }
   1226   }
   1227 }
   1228 
   1229 #ifdef POSIX
   1230 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
   1231   // Calculate timing information
   1232 
   1233   struct timeval *ptvWait = NULL;
   1234   struct timeval tvWait;
   1235   struct timeval tvStop;
   1236   if (cmsWait != kForever) {
   1237     // Calculate wait timeval
   1238     tvWait.tv_sec = cmsWait / 1000;
   1239     tvWait.tv_usec = (cmsWait % 1000) * 1000;
   1240     ptvWait = &tvWait;
   1241 
   1242     // Calculate when to return in a timeval
   1243     gettimeofday(&tvStop, NULL);
   1244     tvStop.tv_sec += tvWait.tv_sec;
   1245     tvStop.tv_usec += tvWait.tv_usec;
   1246     if (tvStop.tv_usec >= 1000000) {
   1247       tvStop.tv_usec -= 1000000;
   1248       tvStop.tv_sec += 1;
   1249     }
   1250   }
   1251 
   1252   // Zero all fd_sets. Don't need to do this inside the loop since
   1253   // select() zeros the descriptors not signaled
   1254 
   1255   fd_set fdsRead;
   1256   FD_ZERO(&fdsRead);
   1257   fd_set fdsWrite;
   1258   FD_ZERO(&fdsWrite);
   1259 
   1260   fWait_ = true;
   1261 
   1262   while (fWait_) {
   1263     int fdmax = -1;
   1264     {
   1265       CritScope cr(&crit_);
   1266       for (size_t i = 0; i < dispatchers_.size(); ++i) {
   1267         // Query dispatchers for read and write wait state
   1268         Dispatcher *pdispatcher = dispatchers_[i];
   1269         ASSERT(pdispatcher);
   1270         if (!process_io && (pdispatcher != signal_wakeup_))
   1271           continue;
   1272         int fd = pdispatcher->GetDescriptor();
   1273         if (fd > fdmax)
   1274           fdmax = fd;
   1275 
   1276         uint32 ff = pdispatcher->GetRequestedEvents();
   1277         if (ff & (DE_READ | DE_ACCEPT))
   1278           FD_SET(fd, &fdsRead);
   1279         if (ff & (DE_WRITE | DE_CONNECT))
   1280           FD_SET(fd, &fdsWrite);
   1281       }
   1282     }
   1283 
   1284     // Wait then call handlers as appropriate
   1285     // < 0 means error
   1286     // 0 means timeout
   1287     // > 0 means count of descriptors ready
   1288     int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait);
   1289 
   1290     // If error, return error.
   1291     if (n < 0) {
   1292       if (errno != EINTR) {
   1293         LOG_E(LS_ERROR, EN, errno) << "select";
   1294         return false;
   1295       }
   1296       // Else ignore the error and keep going. If this EINTR was for one of the
   1297       // signals managed by this PhysicalSocketServer, the
   1298       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
   1299       // iteration.
   1300     } else if (n == 0) {
   1301       // If timeout, return success
   1302       return true;
   1303     } else {
   1304       // We have signaled descriptors
   1305       CritScope cr(&crit_);
   1306       for (size_t i = 0; i < dispatchers_.size(); ++i) {
   1307         Dispatcher *pdispatcher = dispatchers_[i];
   1308         int fd = pdispatcher->GetDescriptor();
   1309         uint32 ff = 0;
   1310         int errcode = 0;
   1311 
   1312         // Reap any error code, which can be signaled through reads or writes.
   1313         // TODO: Should we set errcode if getsockopt fails?
   1314         if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
   1315           socklen_t len = sizeof(errcode);
   1316           ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
   1317         }
   1318 
   1319         // Check readable descriptors. If we're waiting on an accept, signal
   1320         // that. Otherwise we're waiting for data, check to see if we're
   1321         // readable or really closed.
   1322         // TODO: Only peek at TCP descriptors.
   1323         if (FD_ISSET(fd, &fdsRead)) {
   1324           FD_CLR(fd, &fdsRead);
   1325           if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
   1326             ff |= DE_ACCEPT;
   1327           } else if (errcode || pdispatcher->IsDescriptorClosed()) {
   1328             ff |= DE_CLOSE;
   1329           } else {
   1330             ff |= DE_READ;
   1331           }
   1332         }
   1333 
   1334         // Check writable descriptors. If we're waiting on a connect, detect
   1335         // success versus failure by the reaped error code.
   1336         if (FD_ISSET(fd, &fdsWrite)) {
   1337           FD_CLR(fd, &fdsWrite);
   1338           if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
   1339             if (!errcode) {
   1340               ff |= DE_CONNECT;
   1341             } else {
   1342               ff |= DE_CLOSE;
   1343             }
   1344           } else {
   1345             ff |= DE_WRITE;
   1346           }
   1347         }
   1348 
   1349         // Tell the descriptor about the event.
   1350         if (ff != 0) {
   1351           pdispatcher->OnPreEvent(ff);
   1352           pdispatcher->OnEvent(ff, errcode);
   1353         }
   1354       }
   1355     }
   1356 
   1357     // Recalc the time remaining to wait. Doing it here means it doesn't get
   1358     // calced twice the first time through the loop
   1359 
   1360     if (cmsWait != kForever) {
   1361       ptvWait->tv_sec = 0;
   1362       ptvWait->tv_usec = 0;
   1363       struct timeval tvT;
   1364       gettimeofday(&tvT, NULL);
   1365       if ((tvStop.tv_sec > tvT.tv_sec)
   1366           || ((tvStop.tv_sec == tvT.tv_sec)
   1367               && (tvStop.tv_usec > tvT.tv_usec))) {
   1368         ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
   1369         ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
   1370         if (ptvWait->tv_usec < 0) {
   1371           ASSERT(ptvWait->tv_sec > 0);
   1372           ptvWait->tv_usec += 1000000;
   1373           ptvWait->tv_sec -= 1;
   1374         }
   1375       }
   1376     }
   1377   }
   1378 
   1379   return true;
   1380 }
   1381 
   1382 static void GlobalSignalHandler(int signum) {
   1383   PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
   1384 }
   1385 
   1386 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
   1387                                                  void (*handler)(int)) {
   1388   // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
   1389   // otherwise set one.
   1390   if (handler == SIG_IGN || handler == SIG_DFL) {
   1391     if (!InstallSignal(signum, handler)) {
   1392       return false;
   1393     }
   1394     if (signal_dispatcher_.get()) {
   1395       signal_dispatcher_->ClearHandler(signum);
   1396       if (!signal_dispatcher_->HasHandlers()) {
   1397         signal_dispatcher_.reset();
   1398       }
   1399     }
   1400   } else {
   1401     if (!signal_dispatcher_.get()) {
   1402       signal_dispatcher_.reset(new PosixSignalDispatcher(this));
   1403     }
   1404     signal_dispatcher_->SetHandler(signum, handler);
   1405     if (!InstallSignal(signum, &GlobalSignalHandler)) {
   1406       return false;
   1407     }
   1408   }
   1409   return true;
   1410 }
   1411 
   1412 bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) {
   1413   struct sigaction act;
   1414   // It doesn't really matter what we set this mask to.
   1415   if (sigemptyset(&act.sa_mask) != 0) {
   1416     LOG_ERR(LS_ERROR) << "Couldn't set mask";
   1417     return false;
   1418   }
   1419   act.sa_handler = handler;
   1420   // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it
   1421   // and it's a nuisance. Though some syscalls still return EINTR and there's no
   1422   // real standard for which ones. :(
   1423   act.sa_flags = SA_RESTART;
   1424   if (sigaction(signum, &act, NULL) != 0) {
   1425     LOG_ERR(LS_ERROR) << "Couldn't set sigaction";
   1426     return false;
   1427   }
   1428   return true;
   1429 }
   1430 #endif  // POSIX
   1431 
   1432 #ifdef WIN32
   1433 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
   1434   int cmsTotal = cmsWait;
   1435   int cmsElapsed = 0;
   1436   uint32 msStart = Time();
   1437 
   1438 #if LOGGING
   1439   if (last_tick_dispatch_count_ == 0) {
   1440     last_tick_tracked_ = msStart;
   1441   }
   1442 #endif
   1443 
   1444   fWait_ = true;
   1445   while (fWait_) {
   1446     std::vector<WSAEVENT> events;
   1447     std::vector<Dispatcher *> event_owners;
   1448 
   1449     events.push_back(socket_ev_);
   1450 
   1451     {
   1452       CritScope cr(&crit_);
   1453       size_t i = 0;
   1454       iterators_.push_back(&i);
   1455       // Don't track dispatchers_.size(), because we want to pick up any new
   1456       // dispatchers that were added while processing the loop.
   1457       while (i < dispatchers_.size()) {
   1458         Dispatcher* disp = dispatchers_[i++];
   1459         if (!process_io && (disp != signal_wakeup_))
   1460           continue;
   1461         SOCKET s = disp->GetSocket();
   1462         if (disp->CheckSignalClose()) {
   1463           // We just signalled close, don't poll this socket
   1464         } else if (s != INVALID_SOCKET) {
   1465           WSAEventSelect(s,
   1466                          events[0],
   1467                          FlagsToEvents(disp->GetRequestedEvents()));
   1468         } else {
   1469           events.push_back(disp->GetWSAEvent());
   1470           event_owners.push_back(disp);
   1471         }
   1472       }
   1473       ASSERT(iterators_.back() == &i);
   1474       iterators_.pop_back();
   1475     }
   1476 
   1477     // Which is shorter, the delay wait or the asked wait?
   1478 
   1479     int cmsNext;
   1480     if (cmsWait == kForever) {
   1481       cmsNext = cmsWait;
   1482     } else {
   1483       cmsNext = _max(0, cmsTotal - cmsElapsed);
   1484     }
   1485 
   1486     // Wait for one of the events to signal
   1487     DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
   1488                                         &events[0],
   1489                                         false,
   1490                                         cmsNext,
   1491                                         false);
   1492 
   1493 #if 0  // LOGGING
   1494     // we track this information purely for logging purposes.
   1495     last_tick_dispatch_count_++;
   1496     if (last_tick_dispatch_count_ >= 1000) {
   1497       int32 elapsed = TimeSince(last_tick_tracked_);
   1498       LOG(INFO) << "PhysicalSocketServer took " << elapsed
   1499                 << "ms for 1000 events";
   1500 
   1501       // If we get more than 1000 events in a second, we are spinning badly
   1502       // (normally it should take about 8-20 seconds).
   1503       ASSERT(elapsed > 1000);
   1504 
   1505       last_tick_tracked_ = Time();
   1506       last_tick_dispatch_count_ = 0;
   1507     }
   1508 #endif
   1509 
   1510     if (dw == WSA_WAIT_FAILED) {
   1511       // Failed?
   1512       // TODO: need a better strategy than this!
   1513       int error = WSAGetLastError();
   1514       ASSERT(false);
   1515       return false;
   1516     } else if (dw == WSA_WAIT_TIMEOUT) {
   1517       // Timeout?
   1518       return true;
   1519     } else {
   1520       // Figure out which one it is and call it
   1521       CritScope cr(&crit_);
   1522       int index = dw - WSA_WAIT_EVENT_0;
   1523       if (index > 0) {
   1524         --index; // The first event is the socket event
   1525         event_owners[index]->OnPreEvent(0);
   1526         event_owners[index]->OnEvent(0, 0);
   1527       } else if (process_io) {
   1528         size_t i = 0, end = dispatchers_.size();
   1529         iterators_.push_back(&i);
   1530         iterators_.push_back(&end);  // Don't iterate over new dispatchers.
   1531         while (i < end) {
   1532           Dispatcher* disp = dispatchers_[i++];
   1533           SOCKET s = disp->GetSocket();
   1534           if (s == INVALID_SOCKET)
   1535             continue;
   1536 
   1537           WSANETWORKEVENTS wsaEvents;
   1538           int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
   1539           if (err == 0) {
   1540 
   1541 #if LOGGING
   1542             {
   1543               if ((wsaEvents.lNetworkEvents & FD_READ) &&
   1544                   wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
   1545                 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error "
   1546                              << wsaEvents.iErrorCode[FD_READ_BIT];
   1547               }
   1548               if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
   1549                   wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
   1550                 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error "
   1551                              << wsaEvents.iErrorCode[FD_WRITE_BIT];
   1552               }
   1553               if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
   1554                   wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
   1555                 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error "
   1556                              << wsaEvents.iErrorCode[FD_CONNECT_BIT];
   1557               }
   1558               if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
   1559                   wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
   1560                 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error "
   1561                              << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
   1562               }
   1563               if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
   1564                   wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
   1565                 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error "
   1566                              << wsaEvents.iErrorCode[FD_CLOSE_BIT];
   1567               }
   1568             }
   1569 #endif
   1570             uint32 ff = 0;
   1571             int errcode = 0;
   1572             if (wsaEvents.lNetworkEvents & FD_READ)
   1573               ff |= DE_READ;
   1574             if (wsaEvents.lNetworkEvents & FD_WRITE)
   1575               ff |= DE_WRITE;
   1576             if (wsaEvents.lNetworkEvents & FD_CONNECT) {
   1577               if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
   1578                 ff |= DE_CONNECT;
   1579               } else {
   1580                 ff |= DE_CLOSE;
   1581                 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
   1582               }
   1583             }
   1584             if (wsaEvents.lNetworkEvents & FD_ACCEPT)
   1585               ff |= DE_ACCEPT;
   1586             if (wsaEvents.lNetworkEvents & FD_CLOSE) {
   1587               ff |= DE_CLOSE;
   1588               errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
   1589             }
   1590             if (ff != 0) {
   1591               disp->OnPreEvent(ff);
   1592               disp->OnEvent(ff, errcode);
   1593             }
   1594           }
   1595         }
   1596         ASSERT(iterators_.back() == &end);
   1597         iterators_.pop_back();
   1598         ASSERT(iterators_.back() == &i);
   1599         iterators_.pop_back();
   1600       }
   1601 
   1602       // Reset the network event until new activity occurs
   1603       WSAResetEvent(socket_ev_);
   1604     }
   1605 
   1606     // Break?
   1607     if (!fWait_)
   1608       break;
   1609     cmsElapsed = TimeSince(msStart);
   1610     if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
   1611        break;
   1612     }
   1613   }
   1614 
   1615   // Done
   1616   return true;
   1617 }
   1618 #endif  // WIN32
   1619 
   1620 }  // namespace talk_base
   1621