Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #if defined(_MSC_VER) && _MSC_VER < 1300
     12 #pragma warning(disable:4786)
     13 #endif
     14 
     15 #include <assert.h>
     16 
     17 #if defined(WEBRTC_POSIX)
     18 #include <string.h>
     19 #include <errno.h>
     20 #include <fcntl.h>
     21 #include <sys/time.h>
     22 #include <sys/select.h>
     23 #include <unistd.h>
     24 #include <signal.h>
     25 #endif
     26 
     27 #if defined(WEBRTC_WIN)
     28 #define WIN32_LEAN_AND_MEAN
     29 #include <windows.h>
     30 #include <winsock2.h>
     31 #include <ws2tcpip.h>
     32 #undef SetPort
     33 #endif
     34 
     35 #include <algorithm>
     36 #include <map>
     37 
     38 #include "webrtc/base/basictypes.h"
     39 #include "webrtc/base/byteorder.h"
     40 #include "webrtc/base/common.h"
     41 #include "webrtc/base/logging.h"
     42 #include "webrtc/base/nethelpers.h"
     43 #include "webrtc/base/physicalsocketserver.h"
     44 #include "webrtc/base/timeutils.h"
     45 #include "webrtc/base/winping.h"
     46 #include "webrtc/base/win32socketinit.h"
     47 
     48 // stm: this will tell us if we are on OSX
     49 #ifdef HAVE_CONFIG_H
     50 #include "config.h"
     51 #endif
     52 
     53 #if defined(WEBRTC_POSIX)
     54 #include <netinet/tcp.h>  // for TCP_NODELAY
     55 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
     56 typedef void* SockOptArg;
     57 #endif  // WEBRTC_POSIX
     58 
     59 #if defined(WEBRTC_WIN)
     60 typedef char* SockOptArg;
     61 #endif
     62 
     63 namespace rtc {
     64 
     65 #if defined(WEBRTC_WIN)
     66 // Standard MTUs, from RFC 1191
     67 const uint16 PACKET_MAXIMUMS[] = {
     68   65535,    // Theoretical maximum, Hyperchannel
     69   32000,    // Nothing
     70   17914,    // 16Mb IBM Token Ring
     71   8166,     // IEEE 802.4
     72   //4464,   // IEEE 802.5 (4Mb max)
     73   4352,     // FDDI
     74   //2048,   // Wideband Network
     75   2002,     // IEEE 802.5 (4Mb recommended)
     76   //1536,   // Expermental Ethernet Networks
     77   //1500,   // Ethernet, Point-to-Point (default)
     78   1492,     // IEEE 802.3
     79   1006,     // SLIP, ARPANET
     80   //576,    // X.25 Networks
     81   //544,    // DEC IP Portal
     82   //512,    // NETBIOS
     83   508,      // IEEE 802/Source-Rt Bridge, ARCNET
     84   296,      // Point-to-Point (low delay)
     85   68,       // Official minimum
     86   0,        // End of list marker
     87 };
     88 
     89 static const int IP_HEADER_SIZE = 20u;
     90 static const int IPV6_HEADER_SIZE = 40u;
     91 static const int ICMP_HEADER_SIZE = 8u;
     92 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u;
     93 #endif
     94 
     95 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
     96  public:
     97   PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET)
     98     : ss_(ss), s_(s), enabled_events_(0), error_(0),
     99       state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
    100       resolver_(NULL) {
    101 #if defined(WEBRTC_WIN)
    102     // EnsureWinsockInit() ensures that winsock is initialized. The default
    103     // version of this function doesn't do anything because winsock is
    104     // initialized by constructor of a static object. If neccessary libjingle
    105     // users can link it with a different version of this function by replacing
    106     // win32socketinit.cc. See win32socketinit.cc for more details.
    107     EnsureWinsockInit();
    108 #endif
    109     if (s_ != INVALID_SOCKET) {
    110       enabled_events_ = DE_READ | DE_WRITE;
    111 
    112       int type = SOCK_STREAM;
    113       socklen_t len = sizeof(type);
    114       VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
    115       udp_ = (SOCK_DGRAM == type);
    116     }
    117   }
    118 
    119   virtual ~PhysicalSocket() {
    120     Close();
    121   }
    122 
    123   // Creates the underlying OS socket (same as the "socket" function).
    124   virtual bool Create(int family, int type) {
    125     Close();
    126     s_ = ::socket(family, type, 0);
    127     udp_ = (SOCK_DGRAM == type);
    128     UpdateLastError();
    129     if (udp_)
    130       enabled_events_ = DE_READ | DE_WRITE;
    131     return s_ != INVALID_SOCKET;
    132   }
    133 
    134   SocketAddress GetLocalAddress() const {
    135     sockaddr_storage addr_storage = {0};
    136     socklen_t addrlen = sizeof(addr_storage);
    137     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    138     int result = ::getsockname(s_, addr, &addrlen);
    139     SocketAddress address;
    140     if (result >= 0) {
    141       SocketAddressFromSockAddrStorage(addr_storage, &address);
    142     } else {
    143       LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
    144                       << s_;
    145     }
    146     return address;
    147   }
    148 
    149   SocketAddress GetRemoteAddress() const {
    150     sockaddr_storage addr_storage = {0};
    151     socklen_t addrlen = sizeof(addr_storage);
    152     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    153     int result = ::getpeername(s_, addr, &addrlen);
    154     SocketAddress address;
    155     if (result >= 0) {
    156       SocketAddressFromSockAddrStorage(addr_storage, &address);
    157     } else {
    158       LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
    159                       << s_;
    160     }
    161     return address;
    162   }
    163 
    164   int Bind(const SocketAddress& bind_addr) {
    165     sockaddr_storage addr_storage;
    166     size_t len = bind_addr.ToSockAddrStorage(&addr_storage);
    167     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    168     int err = ::bind(s_, addr, static_cast<int>(len));
    169     UpdateLastError();
    170 #ifdef _DEBUG
    171     if (0 == err) {
    172       dbg_addr_ = "Bound @ ";
    173       dbg_addr_.append(GetLocalAddress().ToString());
    174     }
    175 #endif  // _DEBUG
    176     return err;
    177   }
    178 
    179   int Connect(const SocketAddress& addr) {
    180     // TODO: Implicit creation is required to reconnect...
    181     // ...but should we make it more explicit?
    182     if (state_ != CS_CLOSED) {
    183       SetError(EALREADY);
    184       return SOCKET_ERROR;
    185     }
    186     if (addr.IsUnresolved()) {
    187       LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
    188       resolver_ = new AsyncResolver();
    189       resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
    190       resolver_->Start(addr);
    191       state_ = CS_CONNECTING;
    192       return 0;
    193     }
    194 
    195     return DoConnect(addr);
    196   }
    197 
    198   int DoConnect(const SocketAddress& connect_addr) {
    199     if ((s_ == INVALID_SOCKET) &&
    200         !Create(connect_addr.family(), SOCK_STREAM)) {
    201       return SOCKET_ERROR;
    202     }
    203     sockaddr_storage addr_storage;
    204     size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
    205     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    206     int err = ::connect(s_, addr, static_cast<int>(len));
    207     UpdateLastError();
    208     if (err == 0) {
    209       state_ = CS_CONNECTED;
    210     } else if (IsBlockingError(GetError())) {
    211       state_ = CS_CONNECTING;
    212       enabled_events_ |= DE_CONNECT;
    213     } else {
    214       return SOCKET_ERROR;
    215     }
    216 
    217     enabled_events_ |= DE_READ | DE_WRITE;
    218     return 0;
    219   }
    220 
    221   int GetError() const {
    222     CritScope cs(&crit_);
    223     return error_;
    224   }
    225 
    226   void SetError(int error) {
    227     CritScope cs(&crit_);
    228     error_ = error;
    229   }
    230 
    231   ConnState GetState() const {
    232     return state_;
    233   }
    234 
    235   int GetOption(Option opt, int* value) {
    236     int slevel;
    237     int sopt;
    238     if (TranslateOption(opt, &slevel, &sopt) == -1)
    239       return -1;
    240     socklen_t optlen = sizeof(*value);
    241     int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
    242     if (ret != -1 && opt == OPT_DONTFRAGMENT) {
    243 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
    244       *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
    245 #endif
    246     }
    247     return ret;
    248   }
    249 
    250   int SetOption(Option opt, int value) {
    251     int slevel;
    252     int sopt;
    253     if (TranslateOption(opt, &slevel, &sopt) == -1)
    254       return -1;
    255     if (opt == OPT_DONTFRAGMENT) {
    256 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
    257       value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
    258 #endif
    259     }
    260     return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
    261   }
    262 
    263   int Send(const void *pv, size_t cb) {
    264     int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
    265 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
    266         // Suppress SIGPIPE. Without this, attempting to send on a socket whose
    267         // other end is closed will result in a SIGPIPE signal being raised to
    268         // our process, which by default will terminate the process, which we
    269         // don't want. By specifying this flag, we'll just get the error EPIPE
    270         // instead and can handle the error gracefully.
    271         MSG_NOSIGNAL
    272 #else
    273         0
    274 #endif
    275         );
    276     UpdateLastError();
    277     MaybeRemapSendError();
    278     // We have seen minidumps where this may be false.
    279     ASSERT(sent <= static_cast<int>(cb));
    280     if ((sent < 0) && IsBlockingError(GetError())) {
    281       enabled_events_ |= DE_WRITE;
    282     }
    283     return sent;
    284   }
    285 
    286   int SendTo(const void* buffer, size_t length, const SocketAddress& addr) {
    287     sockaddr_storage saddr;
    288     size_t len = addr.ToSockAddrStorage(&saddr);
    289     int sent = ::sendto(
    290         s_, static_cast<const char *>(buffer), static_cast<int>(length),
    291 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
    292         // Suppress SIGPIPE. See above for explanation.
    293         MSG_NOSIGNAL,
    294 #else
    295         0,
    296 #endif
    297         reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
    298     UpdateLastError();
    299     MaybeRemapSendError();
    300     // We have seen minidumps where this may be false.
    301     ASSERT(sent <= static_cast<int>(length));
    302     if ((sent < 0) && IsBlockingError(GetError())) {
    303       enabled_events_ |= DE_WRITE;
    304     }
    305     return sent;
    306   }
    307 
    308   int Recv(void* buffer, size_t length) {
    309     int received = ::recv(s_, static_cast<char*>(buffer),
    310                           static_cast<int>(length), 0);
    311     if ((received == 0) && (length != 0)) {
    312       // Note: on graceful shutdown, recv can return 0.  In this case, we
    313       // pretend it is blocking, and then signal close, so that simplifying
    314       // assumptions can be made about Recv.
    315       LOG(LS_WARNING) << "EOF from socket; deferring close event";
    316       // Must turn this back on so that the select() loop will notice the close
    317       // event.
    318       enabled_events_ |= DE_READ;
    319       SetError(EWOULDBLOCK);
    320       return SOCKET_ERROR;
    321     }
    322     UpdateLastError();
    323     int error = GetError();
    324     bool success = (received >= 0) || IsBlockingError(error);
    325     if (udp_ || success) {
    326       enabled_events_ |= DE_READ;
    327     }
    328     if (!success) {
    329       LOG_F(LS_VERBOSE) << "Error = " << error;
    330     }
    331     return received;
    332   }
    333 
    334   int RecvFrom(void* buffer, size_t length, SocketAddress *out_addr) {
    335     sockaddr_storage addr_storage;
    336     socklen_t addr_len = sizeof(addr_storage);
    337     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    338     int received = ::recvfrom(s_, static_cast<char*>(buffer),
    339                               static_cast<int>(length), 0, addr, &addr_len);
    340     UpdateLastError();
    341     if ((received >= 0) && (out_addr != NULL))
    342       SocketAddressFromSockAddrStorage(addr_storage, out_addr);
    343     int error = GetError();
    344     bool success = (received >= 0) || IsBlockingError(error);
    345     if (udp_ || success) {
    346       enabled_events_ |= DE_READ;
    347     }
    348     if (!success) {
    349       LOG_F(LS_VERBOSE) << "Error = " << error;
    350     }
    351     return received;
    352   }
    353 
    354   int Listen(int backlog) {
    355     int err = ::listen(s_, backlog);
    356     UpdateLastError();
    357     if (err == 0) {
    358       state_ = CS_CONNECTING;
    359       enabled_events_ |= DE_ACCEPT;
    360 #ifdef _DEBUG
    361       dbg_addr_ = "Listening @ ";
    362       dbg_addr_.append(GetLocalAddress().ToString());
    363 #endif  // _DEBUG
    364     }
    365     return err;
    366   }
    367 
    368   AsyncSocket* Accept(SocketAddress *out_addr) {
    369     sockaddr_storage addr_storage;
    370     socklen_t addr_len = sizeof(addr_storage);
    371     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
    372     SOCKET s = ::accept(s_, addr, &addr_len);
    373     UpdateLastError();
    374     if (s == INVALID_SOCKET)
    375       return NULL;
    376     enabled_events_ |= DE_ACCEPT;
    377     if (out_addr != NULL)
    378       SocketAddressFromSockAddrStorage(addr_storage, out_addr);
    379     return ss_->WrapSocket(s);
    380   }
    381 
    382   int Close() {
    383     if (s_ == INVALID_SOCKET)
    384       return 0;
    385     int err = ::closesocket(s_);
    386     UpdateLastError();
    387     s_ = INVALID_SOCKET;
    388     state_ = CS_CLOSED;
    389     enabled_events_ = 0;
    390     if (resolver_) {
    391       resolver_->Destroy(false);
    392       resolver_ = NULL;
    393     }
    394     return err;
    395   }
    396 
    397   int EstimateMTU(uint16* mtu) {
    398     SocketAddress addr = GetRemoteAddress();
    399     if (addr.IsAny()) {
    400       SetError(ENOTCONN);
    401       return -1;
    402     }
    403 
    404 #if defined(WEBRTC_WIN)
    405     // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
    406     WinPing ping;
    407     if (!ping.IsValid()) {
    408       SetError(EINVAL);  // can't think of a better error ID
    409       return -1;
    410     }
    411     int header_size = ICMP_HEADER_SIZE;
    412     if (addr.family() == AF_INET6) {
    413       header_size += IPV6_HEADER_SIZE;
    414     } else if (addr.family() == AF_INET) {
    415       header_size += IP_HEADER_SIZE;
    416     }
    417 
    418     for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
    419       int32 size = PACKET_MAXIMUMS[level] - header_size;
    420       WinPing::PingResult result = ping.Ping(addr.ipaddr(), size,
    421                                              ICMP_PING_TIMEOUT_MILLIS,
    422                                              1, false);
    423       if (result == WinPing::PING_FAIL) {
    424         SetError(EINVAL);  // can't think of a better error ID
    425         return -1;
    426       } else if (result != WinPing::PING_TOO_LARGE) {
    427         *mtu = PACKET_MAXIMUMS[level];
    428         return 0;
    429       }
    430     }
    431 
    432     ASSERT(false);
    433     return -1;
    434 #elif defined(WEBRTC_MAC)
    435     // No simple way to do this on Mac OS X.
    436     // SIOCGIFMTU would work if we knew which interface would be used, but
    437     // figuring that out is pretty complicated. For now we'll return an error
    438     // and let the caller pick a default MTU.
    439     SetError(EINVAL);
    440     return -1;
    441 #elif defined(WEBRTC_LINUX)
    442     // Gets the path MTU.
    443     int value;
    444     socklen_t vlen = sizeof(value);
    445     int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
    446     if (err < 0) {
    447       UpdateLastError();
    448       return err;
    449     }
    450 
    451     ASSERT((0 <= value) && (value <= 65536));
    452     *mtu = value;
    453     return 0;
    454 #elif defined(__native_client__)
    455     // Most socket operations, including this, will fail in NaCl's sandbox.
    456     error_ = EACCES;
    457     return -1;
    458 #endif
    459   }
    460 
    461   SocketServer* socketserver() { return ss_; }
    462 
    463  protected:
    464   void OnResolveResult(AsyncResolverInterface* resolver) {
    465     if (resolver != resolver_) {
    466       return;
    467     }
    468 
    469     int error = resolver_->GetError();
    470     if (error == 0) {
    471       error = DoConnect(resolver_->address());
    472     } else {
    473       Close();
    474     }
    475 
    476     if (error) {
    477       SetError(error);
    478       SignalCloseEvent(this, error);
    479     }
    480   }
    481 
    482   void UpdateLastError() {
    483     SetError(LAST_SYSTEM_ERROR);
    484   }
    485 
    486   void MaybeRemapSendError() {
    487 #if defined(WEBRTC_MAC)
    488     // https://developer.apple.com/library/mac/documentation/Darwin/
    489     // Reference/ManPages/man2/sendto.2.html
    490     // ENOBUFS - The output queue for a network interface is full.
    491     // This generally indicates that the interface has stopped sending,
    492     // but may be caused by transient congestion.
    493     if (GetError() == ENOBUFS) {
    494       SetError(EWOULDBLOCK);
    495     }
    496 #endif
    497   }
    498 
    499   static int TranslateOption(Option opt, int* slevel, int* sopt) {
    500     switch (opt) {
    501       case OPT_DONTFRAGMENT:
    502 #if defined(WEBRTC_WIN)
    503         *slevel = IPPROTO_IP;
    504         *sopt = IP_DONTFRAGMENT;
    505         break;
    506 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__)
    507         LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
    508         return -1;
    509 #elif defined(WEBRTC_POSIX)
    510         *slevel = IPPROTO_IP;
    511         *sopt = IP_MTU_DISCOVER;
    512         break;
    513 #endif
    514       case OPT_RCVBUF:
    515         *slevel = SOL_SOCKET;
    516         *sopt = SO_RCVBUF;
    517         break;
    518       case OPT_SNDBUF:
    519         *slevel = SOL_SOCKET;
    520         *sopt = SO_SNDBUF;
    521         break;
    522       case OPT_NODELAY:
    523         *slevel = IPPROTO_TCP;
    524         *sopt = TCP_NODELAY;
    525         break;
    526       case OPT_DSCP:
    527         LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
    528         return -1;
    529       case OPT_RTP_SENDTIME_EXTN_ID:
    530         return -1;  // No logging is necessary as this not a OS socket option.
    531       default:
    532         ASSERT(false);
    533         return -1;
    534     }
    535     return 0;
    536   }
    537 
    538   PhysicalSocketServer* ss_;
    539   SOCKET s_;
    540   uint8 enabled_events_;
    541   bool udp_;
    542   int error_;
    543   // Protects |error_| that is accessed from different threads.
    544   mutable CriticalSection crit_;
    545   ConnState state_;
    546   AsyncResolver* resolver_;
    547 
    548 #ifdef _DEBUG
    549   std::string dbg_addr_;
    550 #endif  // _DEBUG;
    551 };
    552 
    553 #if defined(WEBRTC_POSIX)
    554 class EventDispatcher : public Dispatcher {
    555  public:
    556   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
    557     if (pipe(afd_) < 0)
    558       LOG(LERROR) << "pipe failed";
    559     ss_->Add(this);
    560   }
    561 
    562   virtual ~EventDispatcher() {
    563     ss_->Remove(this);
    564     close(afd_[0]);
    565     close(afd_[1]);
    566   }
    567 
    568   virtual void Signal() {
    569     CritScope cs(&crit_);
    570     if (!fSignaled_) {
    571       const uint8 b[1] = { 0 };
    572       if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) {
    573         fSignaled_ = true;
    574       }
    575     }
    576   }
    577 
    578   virtual uint32 GetRequestedEvents() {
    579     return DE_READ;
    580   }
    581 
    582   virtual void OnPreEvent(uint32 ff) {
    583     // It is not possible to perfectly emulate an auto-resetting event with
    584     // pipes.  This simulates it by resetting before the event is handled.
    585 
    586     CritScope cs(&crit_);
    587     if (fSignaled_) {
    588       uint8 b[4];  // Allow for reading more than 1 byte, but expect 1.
    589       VERIFY(1 == read(afd_[0], b, sizeof(b)));
    590       fSignaled_ = false;
    591     }
    592   }
    593 
    594   virtual void OnEvent(uint32 ff, int err) {
    595     ASSERT(false);
    596   }
    597 
    598   virtual int GetDescriptor() {
    599     return afd_[0];
    600   }
    601 
    602   virtual bool IsDescriptorClosed() {
    603     return false;
    604   }
    605 
    606  private:
    607   PhysicalSocketServer *ss_;
    608   int afd_[2];
    609   bool fSignaled_;
    610   CriticalSection crit_;
    611 };
    612 
    613 // These two classes use the self-pipe trick to deliver POSIX signals to our
    614 // select loop. This is the only safe, reliable, cross-platform way to do
    615 // non-trivial things with a POSIX signal in an event-driven program (until
    616 // proper pselect() implementations become ubiquitous).
    617 
    618 class PosixSignalHandler {
    619  public:
    620   // POSIX only specifies 32 signals, but in principle the system might have
    621   // more and the programmer might choose to use them, so we size our array
    622   // for 128.
    623   static const int kNumPosixSignals = 128;
    624 
    625   // There is just a single global instance. (Signal handlers do not get any
    626   // sort of user-defined void * parameter, so they can't access anything that
    627   // isn't global.)
    628   static PosixSignalHandler* Instance() {
    629     LIBJINGLE_DEFINE_STATIC_LOCAL(PosixSignalHandler, instance, ());
    630     return &instance;
    631   }
    632 
    633   // Returns true if the given signal number is set.
    634   bool IsSignalSet(int signum) const {
    635     ASSERT(signum < ARRAY_SIZE(received_signal_));
    636     if (signum < ARRAY_SIZE(received_signal_)) {
    637       return received_signal_[signum];
    638     } else {
    639       return false;
    640     }
    641   }
    642 
    643   // Clears the given signal number.
    644   void ClearSignal(int signum) {
    645     ASSERT(signum < ARRAY_SIZE(received_signal_));
    646     if (signum < ARRAY_SIZE(received_signal_)) {
    647       received_signal_[signum] = false;
    648     }
    649   }
    650 
    651   // Returns the file descriptor to monitor for signal events.
    652   int GetDescriptor() const {
    653     return afd_[0];
    654   }
    655 
    656   // This is called directly from our real signal handler, so it must be
    657   // signal-handler-safe. That means it cannot assume anything about the
    658   // user-level state of the process, since the handler could be executed at any
    659   // time on any thread.
    660   void OnPosixSignalReceived(int signum) {
    661     if (signum >= ARRAY_SIZE(received_signal_)) {
    662       // We don't have space in our array for this.
    663       return;
    664     }
    665     // Set a flag saying we've seen this signal.
    666     received_signal_[signum] = true;
    667     // Notify application code that we got a signal.
    668     const uint8 b[1] = { 0 };
    669     if (-1 == write(afd_[1], b, sizeof(b))) {
    670       // Nothing we can do here. If there's an error somehow then there's
    671       // nothing we can safely do from a signal handler.
    672       // No, we can't even safely log it.
    673       // But, we still have to check the return value here. Otherwise,
    674       // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help.
    675       return;
    676     }
    677   }
    678 
    679  private:
    680   PosixSignalHandler() {
    681     if (pipe(afd_) < 0) {
    682       LOG_ERR(LS_ERROR) << "pipe failed";
    683       return;
    684     }
    685     if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) {
    686       LOG_ERR(LS_WARNING) << "fcntl #1 failed";
    687     }
    688     if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) {
    689       LOG_ERR(LS_WARNING) << "fcntl #2 failed";
    690     }
    691     memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)),
    692            0,
    693            sizeof(received_signal_));
    694   }
    695 
    696   ~PosixSignalHandler() {
    697     int fd1 = afd_[0];
    698     int fd2 = afd_[1];
    699     // We clobber the stored file descriptor numbers here or else in principle
    700     // a signal that happens to be delivered during application termination
    701     // could erroneously write a zero byte to an unrelated file handle in
    702     // OnPosixSignalReceived() if some other file happens to be opened later
    703     // during shutdown and happens to be given the same file descriptor number
    704     // as our pipe had. Unfortunately even with this precaution there is still a
    705     // race where that could occur if said signal happens to be handled
    706     // concurrently with this code and happens to have already read the value of
    707     // afd_[1] from memory before we clobber it, but that's unlikely.
    708     afd_[0] = -1;
    709     afd_[1] = -1;
    710     close(fd1);
    711     close(fd2);
    712   }
    713 
    714   int afd_[2];
    715   // These are boolean flags that will be set in our signal handler and read
    716   // and cleared from Wait(). There is a race involved in this, but it is
    717   // benign. The signal handler sets the flag before signaling the pipe, so
    718   // we'll never end up blocking in select() while a flag is still true.
    719   // However, if two of the same signal arrive close to each other then it's
    720   // possible that the second time the handler may set the flag while it's still
    721   // true, meaning that signal will be missed. But the first occurrence of it
    722   // will still be handled, so this isn't a problem.
    723   // Volatile is not necessary here for correctness, but this data _is_ volatile
    724   // so I've marked it as such.
    725   volatile uint8 received_signal_[kNumPosixSignals];
    726 };
    727 
    728 class PosixSignalDispatcher : public Dispatcher {
    729  public:
    730   PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) {
    731     owner_->Add(this);
    732   }
    733 
    734   virtual ~PosixSignalDispatcher() {
    735     owner_->Remove(this);
    736   }
    737 
    738   virtual uint32 GetRequestedEvents() {
    739     return DE_READ;
    740   }
    741 
    742   virtual void OnPreEvent(uint32 ff) {
    743     // Events might get grouped if signals come very fast, so we read out up to
    744     // 16 bytes to make sure we keep the pipe empty.
    745     uint8 b[16];
    746     ssize_t ret = read(GetDescriptor(), b, sizeof(b));
    747     if (ret < 0) {
    748       LOG_ERR(LS_WARNING) << "Error in read()";
    749     } else if (ret == 0) {
    750       LOG(LS_WARNING) << "Should have read at least one byte";
    751     }
    752   }
    753 
    754   virtual void OnEvent(uint32 ff, int err) {
    755     for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals;
    756          ++signum) {
    757       if (PosixSignalHandler::Instance()->IsSignalSet(signum)) {
    758         PosixSignalHandler::Instance()->ClearSignal(signum);
    759         HandlerMap::iterator i = handlers_.find(signum);
    760         if (i == handlers_.end()) {
    761           // This can happen if a signal is delivered to our process at around
    762           // the same time as we unset our handler for it. It is not an error
    763           // condition, but it's unusual enough to be worth logging.
    764           LOG(LS_INFO) << "Received signal with no handler: " << signum;
    765         } else {
    766           // Otherwise, execute our handler.
    767           (*i->second)(signum);
    768         }
    769       }
    770     }
    771   }
    772 
    773   virtual int GetDescriptor() {
    774     return PosixSignalHandler::Instance()->GetDescriptor();
    775   }
    776 
    777   virtual bool IsDescriptorClosed() {
    778     return false;
    779   }
    780 
    781   void SetHandler(int signum, void (*handler)(int)) {
    782     handlers_[signum] = handler;
    783   }
    784 
    785   void ClearHandler(int signum) {
    786     handlers_.erase(signum);
    787   }
    788 
    789   bool HasHandlers() {
    790     return !handlers_.empty();
    791   }
    792 
    793  private:
    794   typedef std::map<int, void (*)(int)> HandlerMap;
    795 
    796   HandlerMap handlers_;
    797   // Our owner.
    798   PhysicalSocketServer *owner_;
    799 };
    800 
    801 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
    802  public:
    803   explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {
    804   }
    805   SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
    806   }
    807 
    808   virtual ~SocketDispatcher() {
    809     Close();
    810   }
    811 
    812   bool Initialize() {
    813     ss_->Add(this);
    814     fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
    815     return true;
    816   }
    817 
    818   virtual bool Create(int type) {
    819     return Create(AF_INET, type);
    820   }
    821 
    822   virtual bool Create(int family, int type) {
    823     // Change the socket to be non-blocking.
    824     if (!PhysicalSocket::Create(family, type))
    825       return false;
    826 
    827     return Initialize();
    828   }
    829 
    830   virtual int GetDescriptor() {
    831     return s_;
    832   }
    833 
    834   virtual bool IsDescriptorClosed() {
    835     // We don't have a reliable way of distinguishing end-of-stream
    836     // from readability.  So test on each readable call.  Is this
    837     // inefficient?  Probably.
    838     char ch;
    839     ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
    840     if (res > 0) {
    841       // Data available, so not closed.
    842       return false;
    843     } else if (res == 0) {
    844       // EOF, so closed.
    845       return true;
    846     } else {  // error
    847       switch (errno) {
    848         // Returned if we've already closed s_.
    849         case EBADF:
    850         // Returned during ungraceful peer shutdown.
    851         case ECONNRESET:
    852           return true;
    853         default:
    854           // Assume that all other errors are just blocking errors, meaning the
    855           // connection is still good but we just can't read from it right now.
    856           // This should only happen when connecting (and at most once), because
    857           // in all other cases this function is only called if the file
    858           // descriptor is already known to be in the readable state. However,
    859           // it's not necessary a problem if we spuriously interpret a
    860           // "connection lost"-type error as a blocking error, because typically
    861           // the next recv() will get EOF, so we'll still eventually notice that
    862           // the socket is closed.
    863           LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
    864           return false;
    865       }
    866     }
    867   }
    868 
    869   virtual uint32 GetRequestedEvents() {
    870     return enabled_events_;
    871   }
    872 
    873   virtual void OnPreEvent(uint32 ff) {
    874     if ((ff & DE_CONNECT) != 0)
    875       state_ = CS_CONNECTED;
    876     if ((ff & DE_CLOSE) != 0)
    877       state_ = CS_CLOSED;
    878   }
    879 
    880   virtual void OnEvent(uint32 ff, int err) {
    881     // Make sure we deliver connect/accept first. Otherwise, consumers may see
    882     // something like a READ followed by a CONNECT, which would be odd.
    883     if ((ff & DE_CONNECT) != 0) {
    884       enabled_events_ &= ~DE_CONNECT;
    885       SignalConnectEvent(this);
    886     }
    887     if ((ff & DE_ACCEPT) != 0) {
    888       enabled_events_ &= ~DE_ACCEPT;
    889       SignalReadEvent(this);
    890     }
    891     if ((ff & DE_READ) != 0) {
    892       enabled_events_ &= ~DE_READ;
    893       SignalReadEvent(this);
    894     }
    895     if ((ff & DE_WRITE) != 0) {
    896       enabled_events_ &= ~DE_WRITE;
    897       SignalWriteEvent(this);
    898     }
    899     if ((ff & DE_CLOSE) != 0) {
    900       // The socket is now dead to us, so stop checking it.
    901       enabled_events_ = 0;
    902       SignalCloseEvent(this, err);
    903     }
    904   }
    905 
    906   virtual int Close() {
    907     if (s_ == INVALID_SOCKET)
    908       return 0;
    909 
    910     ss_->Remove(this);
    911     return PhysicalSocket::Close();
    912   }
    913 };
    914 
    915 class FileDispatcher: public Dispatcher, public AsyncFile {
    916  public:
    917   FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
    918     set_readable(true);
    919 
    920     ss_->Add(this);
    921 
    922     fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
    923   }
    924 
    925   virtual ~FileDispatcher() {
    926     ss_->Remove(this);
    927   }
    928 
    929   SocketServer* socketserver() { return ss_; }
    930 
    931   virtual int GetDescriptor() {
    932     return fd_;
    933   }
    934 
    935   virtual bool IsDescriptorClosed() {
    936     return false;
    937   }
    938 
    939   virtual uint32 GetRequestedEvents() {
    940     return flags_;
    941   }
    942 
    943   virtual void OnPreEvent(uint32 ff) {
    944   }
    945 
    946   virtual void OnEvent(uint32 ff, int err) {
    947     if ((ff & DE_READ) != 0)
    948       SignalReadEvent(this);
    949     if ((ff & DE_WRITE) != 0)
    950       SignalWriteEvent(this);
    951     if ((ff & DE_CLOSE) != 0)
    952       SignalCloseEvent(this, err);
    953   }
    954 
    955   virtual bool readable() {
    956     return (flags_ & DE_READ) != 0;
    957   }
    958 
    959   virtual void set_readable(bool value) {
    960     flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ);
    961   }
    962 
    963   virtual bool writable() {
    964     return (flags_ & DE_WRITE) != 0;
    965   }
    966 
    967   virtual void set_writable(bool value) {
    968     flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE);
    969   }
    970 
    971  private:
    972   PhysicalSocketServer* ss_;
    973   int fd_;
    974   int flags_;
    975 };
    976 
    977 AsyncFile* PhysicalSocketServer::CreateFile(int fd) {
    978   return new FileDispatcher(fd, this);
    979 }
    980 
    981 #endif // WEBRTC_POSIX
    982 
    983 #if defined(WEBRTC_WIN)
    984 static uint32 FlagsToEvents(uint32 events) {
    985   uint32 ffFD = FD_CLOSE;
    986   if (events & DE_READ)
    987     ffFD |= FD_READ;
    988   if (events & DE_WRITE)
    989     ffFD |= FD_WRITE;
    990   if (events & DE_CONNECT)
    991     ffFD |= FD_CONNECT;
    992   if (events & DE_ACCEPT)
    993     ffFD |= FD_ACCEPT;
    994   return ffFD;
    995 }
    996 
    997 class EventDispatcher : public Dispatcher {
    998  public:
    999   EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) {
   1000     hev_ = WSACreateEvent();
   1001     if (hev_) {
   1002       ss_->Add(this);
   1003     }
   1004   }
   1005 
   1006   ~EventDispatcher() {
   1007     if (hev_ != NULL) {
   1008       ss_->Remove(this);
   1009       WSACloseEvent(hev_);
   1010       hev_ = NULL;
   1011     }
   1012   }
   1013 
   1014   virtual void Signal() {
   1015     if (hev_ != NULL)
   1016       WSASetEvent(hev_);
   1017   }
   1018 
   1019   virtual uint32 GetRequestedEvents() {
   1020     return 0;
   1021   }
   1022 
   1023   virtual void OnPreEvent(uint32 ff) {
   1024     WSAResetEvent(hev_);
   1025   }
   1026 
   1027   virtual void OnEvent(uint32 ff, int err) {
   1028   }
   1029 
   1030   virtual WSAEVENT GetWSAEvent() {
   1031     return hev_;
   1032   }
   1033 
   1034   virtual SOCKET GetSocket() {
   1035     return INVALID_SOCKET;
   1036   }
   1037 
   1038   virtual bool CheckSignalClose() { return false; }
   1039 
   1040 private:
   1041   PhysicalSocketServer* ss_;
   1042   WSAEVENT hev_;
   1043 };
   1044 
   1045 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
   1046  public:
   1047   static int next_id_;
   1048   int id_;
   1049   bool signal_close_;
   1050   int signal_err_;
   1051 
   1052   SocketDispatcher(PhysicalSocketServer* ss)
   1053       : PhysicalSocket(ss),
   1054         id_(0),
   1055         signal_close_(false) {
   1056   }
   1057 
   1058   SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
   1059       : PhysicalSocket(ss, s),
   1060         id_(0),
   1061         signal_close_(false) {
   1062   }
   1063 
   1064   virtual ~SocketDispatcher() {
   1065     Close();
   1066   }
   1067 
   1068   bool Initialize() {
   1069     ASSERT(s_ != INVALID_SOCKET);
   1070     // Must be a non-blocking
   1071     u_long argp = 1;
   1072     ioctlsocket(s_, FIONBIO, &argp);
   1073     ss_->Add(this);
   1074     return true;
   1075   }
   1076 
   1077   virtual bool Create(int type) {
   1078     return Create(AF_INET, type);
   1079   }
   1080 
   1081   virtual bool Create(int family, int type) {
   1082     // Create socket
   1083     if (!PhysicalSocket::Create(family, type))
   1084       return false;
   1085 
   1086     if (!Initialize())
   1087       return false;
   1088 
   1089     do { id_ = ++next_id_; } while (id_ == 0);
   1090     return true;
   1091   }
   1092 
   1093   virtual int Close() {
   1094     if (s_ == INVALID_SOCKET)
   1095       return 0;
   1096 
   1097     id_ = 0;
   1098     signal_close_ = false;
   1099     ss_->Remove(this);
   1100     return PhysicalSocket::Close();
   1101   }
   1102 
   1103   virtual uint32 GetRequestedEvents() {
   1104     return enabled_events_;
   1105   }
   1106 
   1107   virtual void OnPreEvent(uint32 ff) {
   1108     if ((ff & DE_CONNECT) != 0)
   1109       state_ = CS_CONNECTED;
   1110     // We set CS_CLOSED from CheckSignalClose.
   1111   }
   1112 
   1113   virtual void OnEvent(uint32 ff, int err) {
   1114     int cache_id = id_;
   1115     // Make sure we deliver connect/accept first. Otherwise, consumers may see
   1116     // something like a READ followed by a CONNECT, which would be odd.
   1117     if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
   1118       if (ff != DE_CONNECT)
   1119         LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
   1120       enabled_events_ &= ~DE_CONNECT;
   1121 #ifdef _DEBUG
   1122       dbg_addr_ = "Connected @ ";
   1123       dbg_addr_.append(GetRemoteAddress().ToString());
   1124 #endif  // _DEBUG
   1125       SignalConnectEvent(this);
   1126     }
   1127     if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
   1128       enabled_events_ &= ~DE_ACCEPT;
   1129       SignalReadEvent(this);
   1130     }
   1131     if ((ff & DE_READ) != 0) {
   1132       enabled_events_ &= ~DE_READ;
   1133       SignalReadEvent(this);
   1134     }
   1135     if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
   1136       enabled_events_ &= ~DE_WRITE;
   1137       SignalWriteEvent(this);
   1138     }
   1139     if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
   1140       signal_close_ = true;
   1141       signal_err_ = err;
   1142     }
   1143   }
   1144 
   1145   virtual WSAEVENT GetWSAEvent() {
   1146     return WSA_INVALID_EVENT;
   1147   }
   1148 
   1149   virtual SOCKET GetSocket() {
   1150     return s_;
   1151   }
   1152 
   1153   virtual bool CheckSignalClose() {
   1154     if (!signal_close_)
   1155       return false;
   1156 
   1157     char ch;
   1158     if (recv(s_, &ch, 1, MSG_PEEK) > 0)
   1159       return false;
   1160 
   1161     state_ = CS_CLOSED;
   1162     signal_close_ = false;
   1163     SignalCloseEvent(this, signal_err_);
   1164     return true;
   1165   }
   1166 };
   1167 
   1168 int SocketDispatcher::next_id_ = 0;
   1169 
   1170 #endif  // WEBRTC_WIN
   1171 
   1172 // Sets the value of a boolean value to false when signaled.
   1173 class Signaler : public EventDispatcher {
   1174  public:
   1175   Signaler(PhysicalSocketServer* ss, bool* pf)
   1176       : EventDispatcher(ss), pf_(pf) {
   1177   }
   1178   virtual ~Signaler() { }
   1179 
   1180   void OnEvent(uint32 ff, int err) {
   1181     if (pf_)
   1182       *pf_ = false;
   1183   }
   1184 
   1185  private:
   1186   bool *pf_;
   1187 };
   1188 
   1189 PhysicalSocketServer::PhysicalSocketServer()
   1190     : fWait_(false) {
   1191   signal_wakeup_ = new Signaler(this, &fWait_);
   1192 #if defined(WEBRTC_WIN)
   1193   socket_ev_ = WSACreateEvent();
   1194 #endif
   1195 }
   1196 
   1197 PhysicalSocketServer::~PhysicalSocketServer() {
   1198 #if defined(WEBRTC_WIN)
   1199   WSACloseEvent(socket_ev_);
   1200 #endif
   1201 #if defined(WEBRTC_POSIX)
   1202   signal_dispatcher_.reset();
   1203 #endif
   1204   delete signal_wakeup_;
   1205   ASSERT(dispatchers_.empty());
   1206 }
   1207 
   1208 void PhysicalSocketServer::WakeUp() {
   1209   signal_wakeup_->Signal();
   1210 }
   1211 
   1212 Socket* PhysicalSocketServer::CreateSocket(int type) {
   1213   return CreateSocket(AF_INET, type);
   1214 }
   1215 
   1216 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
   1217   PhysicalSocket* socket = new PhysicalSocket(this);
   1218   if (socket->Create(family, type)) {
   1219     return socket;
   1220   } else {
   1221     delete socket;
   1222     return 0;
   1223   }
   1224 }
   1225 
   1226 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
   1227   return CreateAsyncSocket(AF_INET, type);
   1228 }
   1229 
   1230 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
   1231   SocketDispatcher* dispatcher = new SocketDispatcher(this);
   1232   if (dispatcher->Create(family, type)) {
   1233     return dispatcher;
   1234   } else {
   1235     delete dispatcher;
   1236     return 0;
   1237   }
   1238 }
   1239 
   1240 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
   1241   SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
   1242   if (dispatcher->Initialize()) {
   1243     return dispatcher;
   1244   } else {
   1245     delete dispatcher;
   1246     return 0;
   1247   }
   1248 }
   1249 
   1250 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
   1251   CritScope cs(&crit_);
   1252   // Prevent duplicates. This can cause dead dispatchers to stick around.
   1253   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
   1254                                            dispatchers_.end(),
   1255                                            pdispatcher);
   1256   if (pos != dispatchers_.end())
   1257     return;
   1258   dispatchers_.push_back(pdispatcher);
   1259 }
   1260 
   1261 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
   1262   CritScope cs(&crit_);
   1263   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
   1264                                            dispatchers_.end(),
   1265                                            pdispatcher);
   1266   // We silently ignore duplicate calls to Add, so we should silently ignore
   1267   // the (expected) symmetric calls to Remove. Note that this may still hide
   1268   // a real issue, so we at least log a warning about it.
   1269   if (pos == dispatchers_.end()) {
   1270     LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
   1271                     << "dispatcher, potentially from a duplicate call to Add.";
   1272     return;
   1273   }
   1274   size_t index = pos - dispatchers_.begin();
   1275   dispatchers_.erase(pos);
   1276   for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
   1277        ++it) {
   1278     if (index < **it) {
   1279       --**it;
   1280     }
   1281   }
   1282 }
   1283 
   1284 #if defined(WEBRTC_POSIX)
   1285 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
   1286   // Calculate timing information
   1287 
   1288   struct timeval *ptvWait = NULL;
   1289   struct timeval tvWait;
   1290   struct timeval tvStop;
   1291   if (cmsWait != kForever) {
   1292     // Calculate wait timeval
   1293     tvWait.tv_sec = cmsWait / 1000;
   1294     tvWait.tv_usec = (cmsWait % 1000) * 1000;
   1295     ptvWait = &tvWait;
   1296 
   1297     // Calculate when to return in a timeval
   1298     gettimeofday(&tvStop, NULL);
   1299     tvStop.tv_sec += tvWait.tv_sec;
   1300     tvStop.tv_usec += tvWait.tv_usec;
   1301     if (tvStop.tv_usec >= 1000000) {
   1302       tvStop.tv_usec -= 1000000;
   1303       tvStop.tv_sec += 1;
   1304     }
   1305   }
   1306 
   1307   // Zero all fd_sets. Don't need to do this inside the loop since
   1308   // select() zeros the descriptors not signaled
   1309 
   1310   fd_set fdsRead;
   1311   FD_ZERO(&fdsRead);
   1312   fd_set fdsWrite;
   1313   FD_ZERO(&fdsWrite);
   1314 
   1315   fWait_ = true;
   1316 
   1317   while (fWait_) {
   1318     int fdmax = -1;
   1319     {
   1320       CritScope cr(&crit_);
   1321       for (size_t i = 0; i < dispatchers_.size(); ++i) {
   1322         // Query dispatchers for read and write wait state
   1323         Dispatcher *pdispatcher = dispatchers_[i];
   1324         ASSERT(pdispatcher);
   1325         if (!process_io && (pdispatcher != signal_wakeup_))
   1326           continue;
   1327         int fd = pdispatcher->GetDescriptor();
   1328         if (fd > fdmax)
   1329           fdmax = fd;
   1330 
   1331         uint32 ff = pdispatcher->GetRequestedEvents();
   1332         if (ff & (DE_READ | DE_ACCEPT))
   1333           FD_SET(fd, &fdsRead);
   1334         if (ff & (DE_WRITE | DE_CONNECT))
   1335           FD_SET(fd, &fdsWrite);
   1336       }
   1337     }
   1338 
   1339     // Wait then call handlers as appropriate
   1340     // < 0 means error
   1341     // 0 means timeout
   1342     // > 0 means count of descriptors ready
   1343     int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait);
   1344 
   1345     // If error, return error.
   1346     if (n < 0) {
   1347       if (errno != EINTR) {
   1348         LOG_E(LS_ERROR, EN, errno) << "select";
   1349         return false;
   1350       }
   1351       // Else ignore the error and keep going. If this EINTR was for one of the
   1352       // signals managed by this PhysicalSocketServer, the
   1353       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
   1354       // iteration.
   1355     } else if (n == 0) {
   1356       // If timeout, return success
   1357       return true;
   1358     } else {
   1359       // We have signaled descriptors
   1360       CritScope cr(&crit_);
   1361       for (size_t i = 0; i < dispatchers_.size(); ++i) {
   1362         Dispatcher *pdispatcher = dispatchers_[i];
   1363         int fd = pdispatcher->GetDescriptor();
   1364         uint32 ff = 0;
   1365         int errcode = 0;
   1366 
   1367         // Reap any error code, which can be signaled through reads or writes.
   1368         // TODO: Should we set errcode if getsockopt fails?
   1369         if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
   1370           socklen_t len = sizeof(errcode);
   1371           ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
   1372         }
   1373 
   1374         // Check readable descriptors. If we're waiting on an accept, signal
   1375         // that. Otherwise we're waiting for data, check to see if we're
   1376         // readable or really closed.
   1377         // TODO: Only peek at TCP descriptors.
   1378         if (FD_ISSET(fd, &fdsRead)) {
   1379           FD_CLR(fd, &fdsRead);
   1380           if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
   1381             ff |= DE_ACCEPT;
   1382           } else if (errcode || pdispatcher->IsDescriptorClosed()) {
   1383             ff |= DE_CLOSE;
   1384           } else {
   1385             ff |= DE_READ;
   1386           }
   1387         }
   1388 
   1389         // Check writable descriptors. If we're waiting on a connect, detect
   1390         // success versus failure by the reaped error code.
   1391         if (FD_ISSET(fd, &fdsWrite)) {
   1392           FD_CLR(fd, &fdsWrite);
   1393           if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
   1394             if (!errcode) {
   1395               ff |= DE_CONNECT;
   1396             } else {
   1397               ff |= DE_CLOSE;
   1398             }
   1399           } else {
   1400             ff |= DE_WRITE;
   1401           }
   1402         }
   1403 
   1404         // Tell the descriptor about the event.
   1405         if (ff != 0) {
   1406           pdispatcher->OnPreEvent(ff);
   1407           pdispatcher->OnEvent(ff, errcode);
   1408         }
   1409       }
   1410     }
   1411 
   1412     // Recalc the time remaining to wait. Doing it here means it doesn't get
   1413     // calced twice the first time through the loop
   1414     if (ptvWait) {
   1415       ptvWait->tv_sec = 0;
   1416       ptvWait->tv_usec = 0;
   1417       struct timeval tvT;
   1418       gettimeofday(&tvT, NULL);
   1419       if ((tvStop.tv_sec > tvT.tv_sec)
   1420           || ((tvStop.tv_sec == tvT.tv_sec)
   1421               && (tvStop.tv_usec > tvT.tv_usec))) {
   1422         ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
   1423         ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
   1424         if (ptvWait->tv_usec < 0) {
   1425           ASSERT(ptvWait->tv_sec > 0);
   1426           ptvWait->tv_usec += 1000000;
   1427           ptvWait->tv_sec -= 1;
   1428         }
   1429       }
   1430     }
   1431   }
   1432 
   1433   return true;
   1434 }
   1435 
   1436 static void GlobalSignalHandler(int signum) {
   1437   PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
   1438 }
   1439 
   1440 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
   1441                                                  void (*handler)(int)) {
   1442   // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
   1443   // otherwise set one.
   1444   if (handler == SIG_IGN || handler == SIG_DFL) {
   1445     if (!InstallSignal(signum, handler)) {
   1446       return false;
   1447     }
   1448     if (signal_dispatcher_) {
   1449       signal_dispatcher_->ClearHandler(signum);
   1450       if (!signal_dispatcher_->HasHandlers()) {
   1451         signal_dispatcher_.reset();
   1452       }
   1453     }
   1454   } else {
   1455     if (!signal_dispatcher_) {
   1456       signal_dispatcher_.reset(new PosixSignalDispatcher(this));
   1457     }
   1458     signal_dispatcher_->SetHandler(signum, handler);
   1459     if (!InstallSignal(signum, &GlobalSignalHandler)) {
   1460       return false;
   1461     }
   1462   }
   1463   return true;
   1464 }
   1465 
   1466 Dispatcher* PhysicalSocketServer::signal_dispatcher() {
   1467   return signal_dispatcher_.get();
   1468 }
   1469 
   1470 bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) {
   1471   struct sigaction act;
   1472   // It doesn't really matter what we set this mask to.
   1473   if (sigemptyset(&act.sa_mask) != 0) {
   1474     LOG_ERR(LS_ERROR) << "Couldn't set mask";
   1475     return false;
   1476   }
   1477   act.sa_handler = handler;
   1478 #if !defined(__native_client__)
   1479   // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it
   1480   // and it's a nuisance. Though some syscalls still return EINTR and there's no
   1481   // real standard for which ones. :(
   1482   act.sa_flags = SA_RESTART;
   1483 #else
   1484   act.sa_flags = 0;
   1485 #endif
   1486   if (sigaction(signum, &act, NULL) != 0) {
   1487     LOG_ERR(LS_ERROR) << "Couldn't set sigaction";
   1488     return false;
   1489   }
   1490   return true;
   1491 }
   1492 #endif  // WEBRTC_POSIX
   1493 
   1494 #if defined(WEBRTC_WIN)
   1495 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
   1496   int cmsTotal = cmsWait;
   1497   int cmsElapsed = 0;
   1498   uint32 msStart = Time();
   1499 
   1500   fWait_ = true;
   1501   while (fWait_) {
   1502     std::vector<WSAEVENT> events;
   1503     std::vector<Dispatcher *> event_owners;
   1504 
   1505     events.push_back(socket_ev_);
   1506 
   1507     {
   1508       CritScope cr(&crit_);
   1509       size_t i = 0;
   1510       iterators_.push_back(&i);
   1511       // Don't track dispatchers_.size(), because we want to pick up any new
   1512       // dispatchers that were added while processing the loop.
   1513       while (i < dispatchers_.size()) {
   1514         Dispatcher* disp = dispatchers_[i++];
   1515         if (!process_io && (disp != signal_wakeup_))
   1516           continue;
   1517         SOCKET s = disp->GetSocket();
   1518         if (disp->CheckSignalClose()) {
   1519           // We just signalled close, don't poll this socket
   1520         } else if (s != INVALID_SOCKET) {
   1521           WSAEventSelect(s,
   1522                          events[0],
   1523                          FlagsToEvents(disp->GetRequestedEvents()));
   1524         } else {
   1525           events.push_back(disp->GetWSAEvent());
   1526           event_owners.push_back(disp);
   1527         }
   1528       }
   1529       ASSERT(iterators_.back() == &i);
   1530       iterators_.pop_back();
   1531     }
   1532 
   1533     // Which is shorter, the delay wait or the asked wait?
   1534 
   1535     int cmsNext;
   1536     if (cmsWait == kForever) {
   1537       cmsNext = cmsWait;
   1538     } else {
   1539       cmsNext = _max(0, cmsTotal - cmsElapsed);
   1540     }
   1541 
   1542     // Wait for one of the events to signal
   1543     DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
   1544                                         &events[0],
   1545                                         false,
   1546                                         cmsNext,
   1547                                         false);
   1548 
   1549     if (dw == WSA_WAIT_FAILED) {
   1550       // Failed?
   1551       // TODO: need a better strategy than this!
   1552       WSAGetLastError();
   1553       ASSERT(false);
   1554       return false;
   1555     } else if (dw == WSA_WAIT_TIMEOUT) {
   1556       // Timeout?
   1557       return true;
   1558     } else {
   1559       // Figure out which one it is and call it
   1560       CritScope cr(&crit_);
   1561       int index = dw - WSA_WAIT_EVENT_0;
   1562       if (index > 0) {
   1563         --index; // The first event is the socket event
   1564         event_owners[index]->OnPreEvent(0);
   1565         event_owners[index]->OnEvent(0, 0);
   1566       } else if (process_io) {
   1567         size_t i = 0, end = dispatchers_.size();
   1568         iterators_.push_back(&i);
   1569         iterators_.push_back(&end);  // Don't iterate over new dispatchers.
   1570         while (i < end) {
   1571           Dispatcher* disp = dispatchers_[i++];
   1572           SOCKET s = disp->GetSocket();
   1573           if (s == INVALID_SOCKET)
   1574             continue;
   1575 
   1576           WSANETWORKEVENTS wsaEvents;
   1577           int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
   1578           if (err == 0) {
   1579 
   1580 #if LOGGING
   1581             {
   1582               if ((wsaEvents.lNetworkEvents & FD_READ) &&
   1583                   wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
   1584                 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error "
   1585                              << wsaEvents.iErrorCode[FD_READ_BIT];
   1586               }
   1587               if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
   1588                   wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
   1589                 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error "
   1590                              << wsaEvents.iErrorCode[FD_WRITE_BIT];
   1591               }
   1592               if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
   1593                   wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
   1594                 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error "
   1595                              << wsaEvents.iErrorCode[FD_CONNECT_BIT];
   1596               }
   1597               if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
   1598                   wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
   1599                 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error "
   1600                              << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
   1601               }
   1602               if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
   1603                   wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
   1604                 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error "
   1605                              << wsaEvents.iErrorCode[FD_CLOSE_BIT];
   1606               }
   1607             }
   1608 #endif
   1609             uint32 ff = 0;
   1610             int errcode = 0;
   1611             if (wsaEvents.lNetworkEvents & FD_READ)
   1612               ff |= DE_READ;
   1613             if (wsaEvents.lNetworkEvents & FD_WRITE)
   1614               ff |= DE_WRITE;
   1615             if (wsaEvents.lNetworkEvents & FD_CONNECT) {
   1616               if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
   1617                 ff |= DE_CONNECT;
   1618               } else {
   1619                 ff |= DE_CLOSE;
   1620                 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
   1621               }
   1622             }
   1623             if (wsaEvents.lNetworkEvents & FD_ACCEPT)
   1624               ff |= DE_ACCEPT;
   1625             if (wsaEvents.lNetworkEvents & FD_CLOSE) {
   1626               ff |= DE_CLOSE;
   1627               errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
   1628             }
   1629             if (ff != 0) {
   1630               disp->OnPreEvent(ff);
   1631               disp->OnEvent(ff, errcode);
   1632             }
   1633           }
   1634         }
   1635         ASSERT(iterators_.back() == &end);
   1636         iterators_.pop_back();
   1637         ASSERT(iterators_.back() == &i);
   1638         iterators_.pop_back();
   1639       }
   1640 
   1641       // Reset the network event until new activity occurs
   1642       WSAResetEvent(socket_ev_);
   1643     }
   1644 
   1645     // Break?
   1646     if (!fWait_)
   1647       break;
   1648     cmsElapsed = TimeSince(msStart);
   1649     if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
   1650        break;
   1651     }
   1652   }
   1653 
   1654   // Done
   1655   return true;
   1656 }
   1657 #endif  // WEBRTC_WIN
   1658 
   1659 }  // namespace rtc
   1660