1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #if defined(_MSC_VER) && _MSC_VER < 1300 12 #pragma warning(disable:4786) 13 #endif 14 15 #include <assert.h> 16 17 #if defined(WEBRTC_POSIX) 18 #include <string.h> 19 #include <errno.h> 20 #include <fcntl.h> 21 #include <sys/time.h> 22 #include <sys/select.h> 23 #include <unistd.h> 24 #include <signal.h> 25 #endif 26 27 #if defined(WEBRTC_WIN) 28 #define WIN32_LEAN_AND_MEAN 29 #include <windows.h> 30 #include <winsock2.h> 31 #include <ws2tcpip.h> 32 #undef SetPort 33 #endif 34 35 #include <algorithm> 36 #include <map> 37 38 #include "webrtc/base/basictypes.h" 39 #include "webrtc/base/byteorder.h" 40 #include "webrtc/base/common.h" 41 #include "webrtc/base/logging.h" 42 #include "webrtc/base/nethelpers.h" 43 #include "webrtc/base/physicalsocketserver.h" 44 #include "webrtc/base/timeutils.h" 45 #include "webrtc/base/winping.h" 46 #include "webrtc/base/win32socketinit.h" 47 48 // stm: this will tell us if we are on OSX 49 #ifdef HAVE_CONFIG_H 50 #include "config.h" 51 #endif 52 53 #if defined(WEBRTC_POSIX) 54 #include <netinet/tcp.h> // for TCP_NODELAY 55 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h 56 typedef void* SockOptArg; 57 #endif // WEBRTC_POSIX 58 59 #if defined(WEBRTC_WIN) 60 typedef char* SockOptArg; 61 #endif 62 63 namespace rtc { 64 65 #if defined(WEBRTC_WIN) 66 // Standard MTUs, from RFC 1191 67 const uint16 PACKET_MAXIMUMS[] = { 68 65535, // Theoretical maximum, Hyperchannel 69 32000, // Nothing 70 17914, // 16Mb IBM Token Ring 71 8166, // IEEE 802.4 72 //4464, // IEEE 802.5 (4Mb max) 73 4352, // FDDI 74 //2048, // Wideband Network 75 2002, // IEEE 802.5 (4Mb recommended) 76 //1536, // Expermental Ethernet Networks 77 //1500, // Ethernet, Point-to-Point (default) 78 1492, // IEEE 802.3 79 1006, // SLIP, ARPANET 80 //576, // X.25 Networks 81 //544, // DEC IP Portal 82 //512, // NETBIOS 83 508, // IEEE 802/Source-Rt Bridge, ARCNET 84 296, // Point-to-Point (low delay) 85 68, // Official minimum 86 0, // End of list marker 87 }; 88 89 static const int IP_HEADER_SIZE = 20u; 90 static const int IPV6_HEADER_SIZE = 40u; 91 static const int ICMP_HEADER_SIZE = 8u; 92 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u; 93 #endif 94 95 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { 96 public: 97 PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET) 98 : ss_(ss), s_(s), enabled_events_(0), error_(0), 99 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED), 100 resolver_(NULL) { 101 #if defined(WEBRTC_WIN) 102 // EnsureWinsockInit() ensures that winsock is initialized. The default 103 // version of this function doesn't do anything because winsock is 104 // initialized by constructor of a static object. If neccessary libjingle 105 // users can link it with a different version of this function by replacing 106 // win32socketinit.cc. See win32socketinit.cc for more details. 107 EnsureWinsockInit(); 108 #endif 109 if (s_ != INVALID_SOCKET) { 110 enabled_events_ = DE_READ | DE_WRITE; 111 112 int type = SOCK_STREAM; 113 socklen_t len = sizeof(type); 114 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len)); 115 udp_ = (SOCK_DGRAM == type); 116 } 117 } 118 119 virtual ~PhysicalSocket() { 120 Close(); 121 } 122 123 // Creates the underlying OS socket (same as the "socket" function). 124 virtual bool Create(int family, int type) { 125 Close(); 126 s_ = ::socket(family, type, 0); 127 udp_ = (SOCK_DGRAM == type); 128 UpdateLastError(); 129 if (udp_) 130 enabled_events_ = DE_READ | DE_WRITE; 131 return s_ != INVALID_SOCKET; 132 } 133 134 SocketAddress GetLocalAddress() const { 135 sockaddr_storage addr_storage = {0}; 136 socklen_t addrlen = sizeof(addr_storage); 137 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 138 int result = ::getsockname(s_, addr, &addrlen); 139 SocketAddress address; 140 if (result >= 0) { 141 SocketAddressFromSockAddrStorage(addr_storage, &address); 142 } else { 143 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket=" 144 << s_; 145 } 146 return address; 147 } 148 149 SocketAddress GetRemoteAddress() const { 150 sockaddr_storage addr_storage = {0}; 151 socklen_t addrlen = sizeof(addr_storage); 152 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 153 int result = ::getpeername(s_, addr, &addrlen); 154 SocketAddress address; 155 if (result >= 0) { 156 SocketAddressFromSockAddrStorage(addr_storage, &address); 157 } else { 158 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket=" 159 << s_; 160 } 161 return address; 162 } 163 164 int Bind(const SocketAddress& bind_addr) { 165 sockaddr_storage addr_storage; 166 size_t len = bind_addr.ToSockAddrStorage(&addr_storage); 167 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 168 int err = ::bind(s_, addr, static_cast<int>(len)); 169 UpdateLastError(); 170 #ifdef _DEBUG 171 if (0 == err) { 172 dbg_addr_ = "Bound @ "; 173 dbg_addr_.append(GetLocalAddress().ToString()); 174 } 175 #endif // _DEBUG 176 return err; 177 } 178 179 int Connect(const SocketAddress& addr) { 180 // TODO: Implicit creation is required to reconnect... 181 // ...but should we make it more explicit? 182 if (state_ != CS_CLOSED) { 183 SetError(EALREADY); 184 return SOCKET_ERROR; 185 } 186 if (addr.IsUnresolved()) { 187 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect"; 188 resolver_ = new AsyncResolver(); 189 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult); 190 resolver_->Start(addr); 191 state_ = CS_CONNECTING; 192 return 0; 193 } 194 195 return DoConnect(addr); 196 } 197 198 int DoConnect(const SocketAddress& connect_addr) { 199 if ((s_ == INVALID_SOCKET) && 200 !Create(connect_addr.family(), SOCK_STREAM)) { 201 return SOCKET_ERROR; 202 } 203 sockaddr_storage addr_storage; 204 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); 205 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 206 int err = ::connect(s_, addr, static_cast<int>(len)); 207 UpdateLastError(); 208 if (err == 0) { 209 state_ = CS_CONNECTED; 210 } else if (IsBlockingError(GetError())) { 211 state_ = CS_CONNECTING; 212 enabled_events_ |= DE_CONNECT; 213 } else { 214 return SOCKET_ERROR; 215 } 216 217 enabled_events_ |= DE_READ | DE_WRITE; 218 return 0; 219 } 220 221 int GetError() const { 222 CritScope cs(&crit_); 223 return error_; 224 } 225 226 void SetError(int error) { 227 CritScope cs(&crit_); 228 error_ = error; 229 } 230 231 ConnState GetState() const { 232 return state_; 233 } 234 235 int GetOption(Option opt, int* value) { 236 int slevel; 237 int sopt; 238 if (TranslateOption(opt, &slevel, &sopt) == -1) 239 return -1; 240 socklen_t optlen = sizeof(*value); 241 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen); 242 if (ret != -1 && opt == OPT_DONTFRAGMENT) { 243 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 244 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0; 245 #endif 246 } 247 return ret; 248 } 249 250 int SetOption(Option opt, int value) { 251 int slevel; 252 int sopt; 253 if (TranslateOption(opt, &slevel, &sopt) == -1) 254 return -1; 255 if (opt == OPT_DONTFRAGMENT) { 256 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 257 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT; 258 #endif 259 } 260 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value)); 261 } 262 263 int Send(const void *pv, size_t cb) { 264 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb, 265 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 266 // Suppress SIGPIPE. Without this, attempting to send on a socket whose 267 // other end is closed will result in a SIGPIPE signal being raised to 268 // our process, which by default will terminate the process, which we 269 // don't want. By specifying this flag, we'll just get the error EPIPE 270 // instead and can handle the error gracefully. 271 MSG_NOSIGNAL 272 #else 273 0 274 #endif 275 ); 276 UpdateLastError(); 277 MaybeRemapSendError(); 278 // We have seen minidumps where this may be false. 279 ASSERT(sent <= static_cast<int>(cb)); 280 if ((sent < 0) && IsBlockingError(GetError())) { 281 enabled_events_ |= DE_WRITE; 282 } 283 return sent; 284 } 285 286 int SendTo(const void* buffer, size_t length, const SocketAddress& addr) { 287 sockaddr_storage saddr; 288 size_t len = addr.ToSockAddrStorage(&saddr); 289 int sent = ::sendto( 290 s_, static_cast<const char *>(buffer), static_cast<int>(length), 291 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 292 // Suppress SIGPIPE. See above for explanation. 293 MSG_NOSIGNAL, 294 #else 295 0, 296 #endif 297 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len)); 298 UpdateLastError(); 299 MaybeRemapSendError(); 300 // We have seen minidumps where this may be false. 301 ASSERT(sent <= static_cast<int>(length)); 302 if ((sent < 0) && IsBlockingError(GetError())) { 303 enabled_events_ |= DE_WRITE; 304 } 305 return sent; 306 } 307 308 int Recv(void* buffer, size_t length) { 309 int received = ::recv(s_, static_cast<char*>(buffer), 310 static_cast<int>(length), 0); 311 if ((received == 0) && (length != 0)) { 312 // Note: on graceful shutdown, recv can return 0. In this case, we 313 // pretend it is blocking, and then signal close, so that simplifying 314 // assumptions can be made about Recv. 315 LOG(LS_WARNING) << "EOF from socket; deferring close event"; 316 // Must turn this back on so that the select() loop will notice the close 317 // event. 318 enabled_events_ |= DE_READ; 319 SetError(EWOULDBLOCK); 320 return SOCKET_ERROR; 321 } 322 UpdateLastError(); 323 int error = GetError(); 324 bool success = (received >= 0) || IsBlockingError(error); 325 if (udp_ || success) { 326 enabled_events_ |= DE_READ; 327 } 328 if (!success) { 329 LOG_F(LS_VERBOSE) << "Error = " << error; 330 } 331 return received; 332 } 333 334 int RecvFrom(void* buffer, size_t length, SocketAddress *out_addr) { 335 sockaddr_storage addr_storage; 336 socklen_t addr_len = sizeof(addr_storage); 337 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 338 int received = ::recvfrom(s_, static_cast<char*>(buffer), 339 static_cast<int>(length), 0, addr, &addr_len); 340 UpdateLastError(); 341 if ((received >= 0) && (out_addr != NULL)) 342 SocketAddressFromSockAddrStorage(addr_storage, out_addr); 343 int error = GetError(); 344 bool success = (received >= 0) || IsBlockingError(error); 345 if (udp_ || success) { 346 enabled_events_ |= DE_READ; 347 } 348 if (!success) { 349 LOG_F(LS_VERBOSE) << "Error = " << error; 350 } 351 return received; 352 } 353 354 int Listen(int backlog) { 355 int err = ::listen(s_, backlog); 356 UpdateLastError(); 357 if (err == 0) { 358 state_ = CS_CONNECTING; 359 enabled_events_ |= DE_ACCEPT; 360 #ifdef _DEBUG 361 dbg_addr_ = "Listening @ "; 362 dbg_addr_.append(GetLocalAddress().ToString()); 363 #endif // _DEBUG 364 } 365 return err; 366 } 367 368 AsyncSocket* Accept(SocketAddress *out_addr) { 369 sockaddr_storage addr_storage; 370 socklen_t addr_len = sizeof(addr_storage); 371 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 372 SOCKET s = ::accept(s_, addr, &addr_len); 373 UpdateLastError(); 374 if (s == INVALID_SOCKET) 375 return NULL; 376 enabled_events_ |= DE_ACCEPT; 377 if (out_addr != NULL) 378 SocketAddressFromSockAddrStorage(addr_storage, out_addr); 379 return ss_->WrapSocket(s); 380 } 381 382 int Close() { 383 if (s_ == INVALID_SOCKET) 384 return 0; 385 int err = ::closesocket(s_); 386 UpdateLastError(); 387 s_ = INVALID_SOCKET; 388 state_ = CS_CLOSED; 389 enabled_events_ = 0; 390 if (resolver_) { 391 resolver_->Destroy(false); 392 resolver_ = NULL; 393 } 394 return err; 395 } 396 397 int EstimateMTU(uint16* mtu) { 398 SocketAddress addr = GetRemoteAddress(); 399 if (addr.IsAny()) { 400 SetError(ENOTCONN); 401 return -1; 402 } 403 404 #if defined(WEBRTC_WIN) 405 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|. 406 WinPing ping; 407 if (!ping.IsValid()) { 408 SetError(EINVAL); // can't think of a better error ID 409 return -1; 410 } 411 int header_size = ICMP_HEADER_SIZE; 412 if (addr.family() == AF_INET6) { 413 header_size += IPV6_HEADER_SIZE; 414 } else if (addr.family() == AF_INET) { 415 header_size += IP_HEADER_SIZE; 416 } 417 418 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) { 419 int32 size = PACKET_MAXIMUMS[level] - header_size; 420 WinPing::PingResult result = ping.Ping(addr.ipaddr(), size, 421 ICMP_PING_TIMEOUT_MILLIS, 422 1, false); 423 if (result == WinPing::PING_FAIL) { 424 SetError(EINVAL); // can't think of a better error ID 425 return -1; 426 } else if (result != WinPing::PING_TOO_LARGE) { 427 *mtu = PACKET_MAXIMUMS[level]; 428 return 0; 429 } 430 } 431 432 ASSERT(false); 433 return -1; 434 #elif defined(WEBRTC_MAC) 435 // No simple way to do this on Mac OS X. 436 // SIOCGIFMTU would work if we knew which interface would be used, but 437 // figuring that out is pretty complicated. For now we'll return an error 438 // and let the caller pick a default MTU. 439 SetError(EINVAL); 440 return -1; 441 #elif defined(WEBRTC_LINUX) 442 // Gets the path MTU. 443 int value; 444 socklen_t vlen = sizeof(value); 445 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen); 446 if (err < 0) { 447 UpdateLastError(); 448 return err; 449 } 450 451 ASSERT((0 <= value) && (value <= 65536)); 452 *mtu = value; 453 return 0; 454 #elif defined(__native_client__) 455 // Most socket operations, including this, will fail in NaCl's sandbox. 456 error_ = EACCES; 457 return -1; 458 #endif 459 } 460 461 SocketServer* socketserver() { return ss_; } 462 463 protected: 464 void OnResolveResult(AsyncResolverInterface* resolver) { 465 if (resolver != resolver_) { 466 return; 467 } 468 469 int error = resolver_->GetError(); 470 if (error == 0) { 471 error = DoConnect(resolver_->address()); 472 } else { 473 Close(); 474 } 475 476 if (error) { 477 SetError(error); 478 SignalCloseEvent(this, error); 479 } 480 } 481 482 void UpdateLastError() { 483 SetError(LAST_SYSTEM_ERROR); 484 } 485 486 void MaybeRemapSendError() { 487 #if defined(WEBRTC_MAC) 488 // https://developer.apple.com/library/mac/documentation/Darwin/ 489 // Reference/ManPages/man2/sendto.2.html 490 // ENOBUFS - The output queue for a network interface is full. 491 // This generally indicates that the interface has stopped sending, 492 // but may be caused by transient congestion. 493 if (GetError() == ENOBUFS) { 494 SetError(EWOULDBLOCK); 495 } 496 #endif 497 } 498 499 static int TranslateOption(Option opt, int* slevel, int* sopt) { 500 switch (opt) { 501 case OPT_DONTFRAGMENT: 502 #if defined(WEBRTC_WIN) 503 *slevel = IPPROTO_IP; 504 *sopt = IP_DONTFRAGMENT; 505 break; 506 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__) 507 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported."; 508 return -1; 509 #elif defined(WEBRTC_POSIX) 510 *slevel = IPPROTO_IP; 511 *sopt = IP_MTU_DISCOVER; 512 break; 513 #endif 514 case OPT_RCVBUF: 515 *slevel = SOL_SOCKET; 516 *sopt = SO_RCVBUF; 517 break; 518 case OPT_SNDBUF: 519 *slevel = SOL_SOCKET; 520 *sopt = SO_SNDBUF; 521 break; 522 case OPT_NODELAY: 523 *slevel = IPPROTO_TCP; 524 *sopt = TCP_NODELAY; 525 break; 526 case OPT_DSCP: 527 LOG(LS_WARNING) << "Socket::OPT_DSCP not supported."; 528 return -1; 529 case OPT_RTP_SENDTIME_EXTN_ID: 530 return -1; // No logging is necessary as this not a OS socket option. 531 default: 532 ASSERT(false); 533 return -1; 534 } 535 return 0; 536 } 537 538 PhysicalSocketServer* ss_; 539 SOCKET s_; 540 uint8 enabled_events_; 541 bool udp_; 542 int error_; 543 // Protects |error_| that is accessed from different threads. 544 mutable CriticalSection crit_; 545 ConnState state_; 546 AsyncResolver* resolver_; 547 548 #ifdef _DEBUG 549 std::string dbg_addr_; 550 #endif // _DEBUG; 551 }; 552 553 #if defined(WEBRTC_POSIX) 554 class EventDispatcher : public Dispatcher { 555 public: 556 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) { 557 if (pipe(afd_) < 0) 558 LOG(LERROR) << "pipe failed"; 559 ss_->Add(this); 560 } 561 562 virtual ~EventDispatcher() { 563 ss_->Remove(this); 564 close(afd_[0]); 565 close(afd_[1]); 566 } 567 568 virtual void Signal() { 569 CritScope cs(&crit_); 570 if (!fSignaled_) { 571 const uint8 b[1] = { 0 }; 572 if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) { 573 fSignaled_ = true; 574 } 575 } 576 } 577 578 virtual uint32 GetRequestedEvents() { 579 return DE_READ; 580 } 581 582 virtual void OnPreEvent(uint32 ff) { 583 // It is not possible to perfectly emulate an auto-resetting event with 584 // pipes. This simulates it by resetting before the event is handled. 585 586 CritScope cs(&crit_); 587 if (fSignaled_) { 588 uint8 b[4]; // Allow for reading more than 1 byte, but expect 1. 589 VERIFY(1 == read(afd_[0], b, sizeof(b))); 590 fSignaled_ = false; 591 } 592 } 593 594 virtual void OnEvent(uint32 ff, int err) { 595 ASSERT(false); 596 } 597 598 virtual int GetDescriptor() { 599 return afd_[0]; 600 } 601 602 virtual bool IsDescriptorClosed() { 603 return false; 604 } 605 606 private: 607 PhysicalSocketServer *ss_; 608 int afd_[2]; 609 bool fSignaled_; 610 CriticalSection crit_; 611 }; 612 613 // These two classes use the self-pipe trick to deliver POSIX signals to our 614 // select loop. This is the only safe, reliable, cross-platform way to do 615 // non-trivial things with a POSIX signal in an event-driven program (until 616 // proper pselect() implementations become ubiquitous). 617 618 class PosixSignalHandler { 619 public: 620 // POSIX only specifies 32 signals, but in principle the system might have 621 // more and the programmer might choose to use them, so we size our array 622 // for 128. 623 static const int kNumPosixSignals = 128; 624 625 // There is just a single global instance. (Signal handlers do not get any 626 // sort of user-defined void * parameter, so they can't access anything that 627 // isn't global.) 628 static PosixSignalHandler* Instance() { 629 LIBJINGLE_DEFINE_STATIC_LOCAL(PosixSignalHandler, instance, ()); 630 return &instance; 631 } 632 633 // Returns true if the given signal number is set. 634 bool IsSignalSet(int signum) const { 635 ASSERT(signum < ARRAY_SIZE(received_signal_)); 636 if (signum < ARRAY_SIZE(received_signal_)) { 637 return received_signal_[signum]; 638 } else { 639 return false; 640 } 641 } 642 643 // Clears the given signal number. 644 void ClearSignal(int signum) { 645 ASSERT(signum < ARRAY_SIZE(received_signal_)); 646 if (signum < ARRAY_SIZE(received_signal_)) { 647 received_signal_[signum] = false; 648 } 649 } 650 651 // Returns the file descriptor to monitor for signal events. 652 int GetDescriptor() const { 653 return afd_[0]; 654 } 655 656 // This is called directly from our real signal handler, so it must be 657 // signal-handler-safe. That means it cannot assume anything about the 658 // user-level state of the process, since the handler could be executed at any 659 // time on any thread. 660 void OnPosixSignalReceived(int signum) { 661 if (signum >= ARRAY_SIZE(received_signal_)) { 662 // We don't have space in our array for this. 663 return; 664 } 665 // Set a flag saying we've seen this signal. 666 received_signal_[signum] = true; 667 // Notify application code that we got a signal. 668 const uint8 b[1] = { 0 }; 669 if (-1 == write(afd_[1], b, sizeof(b))) { 670 // Nothing we can do here. If there's an error somehow then there's 671 // nothing we can safely do from a signal handler. 672 // No, we can't even safely log it. 673 // But, we still have to check the return value here. Otherwise, 674 // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help. 675 return; 676 } 677 } 678 679 private: 680 PosixSignalHandler() { 681 if (pipe(afd_) < 0) { 682 LOG_ERR(LS_ERROR) << "pipe failed"; 683 return; 684 } 685 if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) { 686 LOG_ERR(LS_WARNING) << "fcntl #1 failed"; 687 } 688 if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) { 689 LOG_ERR(LS_WARNING) << "fcntl #2 failed"; 690 } 691 memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)), 692 0, 693 sizeof(received_signal_)); 694 } 695 696 ~PosixSignalHandler() { 697 int fd1 = afd_[0]; 698 int fd2 = afd_[1]; 699 // We clobber the stored file descriptor numbers here or else in principle 700 // a signal that happens to be delivered during application termination 701 // could erroneously write a zero byte to an unrelated file handle in 702 // OnPosixSignalReceived() if some other file happens to be opened later 703 // during shutdown and happens to be given the same file descriptor number 704 // as our pipe had. Unfortunately even with this precaution there is still a 705 // race where that could occur if said signal happens to be handled 706 // concurrently with this code and happens to have already read the value of 707 // afd_[1] from memory before we clobber it, but that's unlikely. 708 afd_[0] = -1; 709 afd_[1] = -1; 710 close(fd1); 711 close(fd2); 712 } 713 714 int afd_[2]; 715 // These are boolean flags that will be set in our signal handler and read 716 // and cleared from Wait(). There is a race involved in this, but it is 717 // benign. The signal handler sets the flag before signaling the pipe, so 718 // we'll never end up blocking in select() while a flag is still true. 719 // However, if two of the same signal arrive close to each other then it's 720 // possible that the second time the handler may set the flag while it's still 721 // true, meaning that signal will be missed. But the first occurrence of it 722 // will still be handled, so this isn't a problem. 723 // Volatile is not necessary here for correctness, but this data _is_ volatile 724 // so I've marked it as such. 725 volatile uint8 received_signal_[kNumPosixSignals]; 726 }; 727 728 class PosixSignalDispatcher : public Dispatcher { 729 public: 730 PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) { 731 owner_->Add(this); 732 } 733 734 virtual ~PosixSignalDispatcher() { 735 owner_->Remove(this); 736 } 737 738 virtual uint32 GetRequestedEvents() { 739 return DE_READ; 740 } 741 742 virtual void OnPreEvent(uint32 ff) { 743 // Events might get grouped if signals come very fast, so we read out up to 744 // 16 bytes to make sure we keep the pipe empty. 745 uint8 b[16]; 746 ssize_t ret = read(GetDescriptor(), b, sizeof(b)); 747 if (ret < 0) { 748 LOG_ERR(LS_WARNING) << "Error in read()"; 749 } else if (ret == 0) { 750 LOG(LS_WARNING) << "Should have read at least one byte"; 751 } 752 } 753 754 virtual void OnEvent(uint32 ff, int err) { 755 for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals; 756 ++signum) { 757 if (PosixSignalHandler::Instance()->IsSignalSet(signum)) { 758 PosixSignalHandler::Instance()->ClearSignal(signum); 759 HandlerMap::iterator i = handlers_.find(signum); 760 if (i == handlers_.end()) { 761 // This can happen if a signal is delivered to our process at around 762 // the same time as we unset our handler for it. It is not an error 763 // condition, but it's unusual enough to be worth logging. 764 LOG(LS_INFO) << "Received signal with no handler: " << signum; 765 } else { 766 // Otherwise, execute our handler. 767 (*i->second)(signum); 768 } 769 } 770 } 771 } 772 773 virtual int GetDescriptor() { 774 return PosixSignalHandler::Instance()->GetDescriptor(); 775 } 776 777 virtual bool IsDescriptorClosed() { 778 return false; 779 } 780 781 void SetHandler(int signum, void (*handler)(int)) { 782 handlers_[signum] = handler; 783 } 784 785 void ClearHandler(int signum) { 786 handlers_.erase(signum); 787 } 788 789 bool HasHandlers() { 790 return !handlers_.empty(); 791 } 792 793 private: 794 typedef std::map<int, void (*)(int)> HandlerMap; 795 796 HandlerMap handlers_; 797 // Our owner. 798 PhysicalSocketServer *owner_; 799 }; 800 801 class SocketDispatcher : public Dispatcher, public PhysicalSocket { 802 public: 803 explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) { 804 } 805 SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) { 806 } 807 808 virtual ~SocketDispatcher() { 809 Close(); 810 } 811 812 bool Initialize() { 813 ss_->Add(this); 814 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK); 815 return true; 816 } 817 818 virtual bool Create(int type) { 819 return Create(AF_INET, type); 820 } 821 822 virtual bool Create(int family, int type) { 823 // Change the socket to be non-blocking. 824 if (!PhysicalSocket::Create(family, type)) 825 return false; 826 827 return Initialize(); 828 } 829 830 virtual int GetDescriptor() { 831 return s_; 832 } 833 834 virtual bool IsDescriptorClosed() { 835 // We don't have a reliable way of distinguishing end-of-stream 836 // from readability. So test on each readable call. Is this 837 // inefficient? Probably. 838 char ch; 839 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK); 840 if (res > 0) { 841 // Data available, so not closed. 842 return false; 843 } else if (res == 0) { 844 // EOF, so closed. 845 return true; 846 } else { // error 847 switch (errno) { 848 // Returned if we've already closed s_. 849 case EBADF: 850 // Returned during ungraceful peer shutdown. 851 case ECONNRESET: 852 return true; 853 default: 854 // Assume that all other errors are just blocking errors, meaning the 855 // connection is still good but we just can't read from it right now. 856 // This should only happen when connecting (and at most once), because 857 // in all other cases this function is only called if the file 858 // descriptor is already known to be in the readable state. However, 859 // it's not necessary a problem if we spuriously interpret a 860 // "connection lost"-type error as a blocking error, because typically 861 // the next recv() will get EOF, so we'll still eventually notice that 862 // the socket is closed. 863 LOG_ERR(LS_WARNING) << "Assuming benign blocking error"; 864 return false; 865 } 866 } 867 } 868 869 virtual uint32 GetRequestedEvents() { 870 return enabled_events_; 871 } 872 873 virtual void OnPreEvent(uint32 ff) { 874 if ((ff & DE_CONNECT) != 0) 875 state_ = CS_CONNECTED; 876 if ((ff & DE_CLOSE) != 0) 877 state_ = CS_CLOSED; 878 } 879 880 virtual void OnEvent(uint32 ff, int err) { 881 // Make sure we deliver connect/accept first. Otherwise, consumers may see 882 // something like a READ followed by a CONNECT, which would be odd. 883 if ((ff & DE_CONNECT) != 0) { 884 enabled_events_ &= ~DE_CONNECT; 885 SignalConnectEvent(this); 886 } 887 if ((ff & DE_ACCEPT) != 0) { 888 enabled_events_ &= ~DE_ACCEPT; 889 SignalReadEvent(this); 890 } 891 if ((ff & DE_READ) != 0) { 892 enabled_events_ &= ~DE_READ; 893 SignalReadEvent(this); 894 } 895 if ((ff & DE_WRITE) != 0) { 896 enabled_events_ &= ~DE_WRITE; 897 SignalWriteEvent(this); 898 } 899 if ((ff & DE_CLOSE) != 0) { 900 // The socket is now dead to us, so stop checking it. 901 enabled_events_ = 0; 902 SignalCloseEvent(this, err); 903 } 904 } 905 906 virtual int Close() { 907 if (s_ == INVALID_SOCKET) 908 return 0; 909 910 ss_->Remove(this); 911 return PhysicalSocket::Close(); 912 } 913 }; 914 915 class FileDispatcher: public Dispatcher, public AsyncFile { 916 public: 917 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) { 918 set_readable(true); 919 920 ss_->Add(this); 921 922 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK); 923 } 924 925 virtual ~FileDispatcher() { 926 ss_->Remove(this); 927 } 928 929 SocketServer* socketserver() { return ss_; } 930 931 virtual int GetDescriptor() { 932 return fd_; 933 } 934 935 virtual bool IsDescriptorClosed() { 936 return false; 937 } 938 939 virtual uint32 GetRequestedEvents() { 940 return flags_; 941 } 942 943 virtual void OnPreEvent(uint32 ff) { 944 } 945 946 virtual void OnEvent(uint32 ff, int err) { 947 if ((ff & DE_READ) != 0) 948 SignalReadEvent(this); 949 if ((ff & DE_WRITE) != 0) 950 SignalWriteEvent(this); 951 if ((ff & DE_CLOSE) != 0) 952 SignalCloseEvent(this, err); 953 } 954 955 virtual bool readable() { 956 return (flags_ & DE_READ) != 0; 957 } 958 959 virtual void set_readable(bool value) { 960 flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ); 961 } 962 963 virtual bool writable() { 964 return (flags_ & DE_WRITE) != 0; 965 } 966 967 virtual void set_writable(bool value) { 968 flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE); 969 } 970 971 private: 972 PhysicalSocketServer* ss_; 973 int fd_; 974 int flags_; 975 }; 976 977 AsyncFile* PhysicalSocketServer::CreateFile(int fd) { 978 return new FileDispatcher(fd, this); 979 } 980 981 #endif // WEBRTC_POSIX 982 983 #if defined(WEBRTC_WIN) 984 static uint32 FlagsToEvents(uint32 events) { 985 uint32 ffFD = FD_CLOSE; 986 if (events & DE_READ) 987 ffFD |= FD_READ; 988 if (events & DE_WRITE) 989 ffFD |= FD_WRITE; 990 if (events & DE_CONNECT) 991 ffFD |= FD_CONNECT; 992 if (events & DE_ACCEPT) 993 ffFD |= FD_ACCEPT; 994 return ffFD; 995 } 996 997 class EventDispatcher : public Dispatcher { 998 public: 999 EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) { 1000 hev_ = WSACreateEvent(); 1001 if (hev_) { 1002 ss_->Add(this); 1003 } 1004 } 1005 1006 ~EventDispatcher() { 1007 if (hev_ != NULL) { 1008 ss_->Remove(this); 1009 WSACloseEvent(hev_); 1010 hev_ = NULL; 1011 } 1012 } 1013 1014 virtual void Signal() { 1015 if (hev_ != NULL) 1016 WSASetEvent(hev_); 1017 } 1018 1019 virtual uint32 GetRequestedEvents() { 1020 return 0; 1021 } 1022 1023 virtual void OnPreEvent(uint32 ff) { 1024 WSAResetEvent(hev_); 1025 } 1026 1027 virtual void OnEvent(uint32 ff, int err) { 1028 } 1029 1030 virtual WSAEVENT GetWSAEvent() { 1031 return hev_; 1032 } 1033 1034 virtual SOCKET GetSocket() { 1035 return INVALID_SOCKET; 1036 } 1037 1038 virtual bool CheckSignalClose() { return false; } 1039 1040 private: 1041 PhysicalSocketServer* ss_; 1042 WSAEVENT hev_; 1043 }; 1044 1045 class SocketDispatcher : public Dispatcher, public PhysicalSocket { 1046 public: 1047 static int next_id_; 1048 int id_; 1049 bool signal_close_; 1050 int signal_err_; 1051 1052 SocketDispatcher(PhysicalSocketServer* ss) 1053 : PhysicalSocket(ss), 1054 id_(0), 1055 signal_close_(false) { 1056 } 1057 1058 SocketDispatcher(SOCKET s, PhysicalSocketServer* ss) 1059 : PhysicalSocket(ss, s), 1060 id_(0), 1061 signal_close_(false) { 1062 } 1063 1064 virtual ~SocketDispatcher() { 1065 Close(); 1066 } 1067 1068 bool Initialize() { 1069 ASSERT(s_ != INVALID_SOCKET); 1070 // Must be a non-blocking 1071 u_long argp = 1; 1072 ioctlsocket(s_, FIONBIO, &argp); 1073 ss_->Add(this); 1074 return true; 1075 } 1076 1077 virtual bool Create(int type) { 1078 return Create(AF_INET, type); 1079 } 1080 1081 virtual bool Create(int family, int type) { 1082 // Create socket 1083 if (!PhysicalSocket::Create(family, type)) 1084 return false; 1085 1086 if (!Initialize()) 1087 return false; 1088 1089 do { id_ = ++next_id_; } while (id_ == 0); 1090 return true; 1091 } 1092 1093 virtual int Close() { 1094 if (s_ == INVALID_SOCKET) 1095 return 0; 1096 1097 id_ = 0; 1098 signal_close_ = false; 1099 ss_->Remove(this); 1100 return PhysicalSocket::Close(); 1101 } 1102 1103 virtual uint32 GetRequestedEvents() { 1104 return enabled_events_; 1105 } 1106 1107 virtual void OnPreEvent(uint32 ff) { 1108 if ((ff & DE_CONNECT) != 0) 1109 state_ = CS_CONNECTED; 1110 // We set CS_CLOSED from CheckSignalClose. 1111 } 1112 1113 virtual void OnEvent(uint32 ff, int err) { 1114 int cache_id = id_; 1115 // Make sure we deliver connect/accept first. Otherwise, consumers may see 1116 // something like a READ followed by a CONNECT, which would be odd. 1117 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { 1118 if (ff != DE_CONNECT) 1119 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; 1120 enabled_events_ &= ~DE_CONNECT; 1121 #ifdef _DEBUG 1122 dbg_addr_ = "Connected @ "; 1123 dbg_addr_.append(GetRemoteAddress().ToString()); 1124 #endif // _DEBUG 1125 SignalConnectEvent(this); 1126 } 1127 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { 1128 enabled_events_ &= ~DE_ACCEPT; 1129 SignalReadEvent(this); 1130 } 1131 if ((ff & DE_READ) != 0) { 1132 enabled_events_ &= ~DE_READ; 1133 SignalReadEvent(this); 1134 } 1135 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { 1136 enabled_events_ &= ~DE_WRITE; 1137 SignalWriteEvent(this); 1138 } 1139 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { 1140 signal_close_ = true; 1141 signal_err_ = err; 1142 } 1143 } 1144 1145 virtual WSAEVENT GetWSAEvent() { 1146 return WSA_INVALID_EVENT; 1147 } 1148 1149 virtual SOCKET GetSocket() { 1150 return s_; 1151 } 1152 1153 virtual bool CheckSignalClose() { 1154 if (!signal_close_) 1155 return false; 1156 1157 char ch; 1158 if (recv(s_, &ch, 1, MSG_PEEK) > 0) 1159 return false; 1160 1161 state_ = CS_CLOSED; 1162 signal_close_ = false; 1163 SignalCloseEvent(this, signal_err_); 1164 return true; 1165 } 1166 }; 1167 1168 int SocketDispatcher::next_id_ = 0; 1169 1170 #endif // WEBRTC_WIN 1171 1172 // Sets the value of a boolean value to false when signaled. 1173 class Signaler : public EventDispatcher { 1174 public: 1175 Signaler(PhysicalSocketServer* ss, bool* pf) 1176 : EventDispatcher(ss), pf_(pf) { 1177 } 1178 virtual ~Signaler() { } 1179 1180 void OnEvent(uint32 ff, int err) { 1181 if (pf_) 1182 *pf_ = false; 1183 } 1184 1185 private: 1186 bool *pf_; 1187 }; 1188 1189 PhysicalSocketServer::PhysicalSocketServer() 1190 : fWait_(false) { 1191 signal_wakeup_ = new Signaler(this, &fWait_); 1192 #if defined(WEBRTC_WIN) 1193 socket_ev_ = WSACreateEvent(); 1194 #endif 1195 } 1196 1197 PhysicalSocketServer::~PhysicalSocketServer() { 1198 #if defined(WEBRTC_WIN) 1199 WSACloseEvent(socket_ev_); 1200 #endif 1201 #if defined(WEBRTC_POSIX) 1202 signal_dispatcher_.reset(); 1203 #endif 1204 delete signal_wakeup_; 1205 ASSERT(dispatchers_.empty()); 1206 } 1207 1208 void PhysicalSocketServer::WakeUp() { 1209 signal_wakeup_->Signal(); 1210 } 1211 1212 Socket* PhysicalSocketServer::CreateSocket(int type) { 1213 return CreateSocket(AF_INET, type); 1214 } 1215 1216 Socket* PhysicalSocketServer::CreateSocket(int family, int type) { 1217 PhysicalSocket* socket = new PhysicalSocket(this); 1218 if (socket->Create(family, type)) { 1219 return socket; 1220 } else { 1221 delete socket; 1222 return 0; 1223 } 1224 } 1225 1226 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) { 1227 return CreateAsyncSocket(AF_INET, type); 1228 } 1229 1230 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) { 1231 SocketDispatcher* dispatcher = new SocketDispatcher(this); 1232 if (dispatcher->Create(family, type)) { 1233 return dispatcher; 1234 } else { 1235 delete dispatcher; 1236 return 0; 1237 } 1238 } 1239 1240 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { 1241 SocketDispatcher* dispatcher = new SocketDispatcher(s, this); 1242 if (dispatcher->Initialize()) { 1243 return dispatcher; 1244 } else { 1245 delete dispatcher; 1246 return 0; 1247 } 1248 } 1249 1250 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1251 CritScope cs(&crit_); 1252 // Prevent duplicates. This can cause dead dispatchers to stick around. 1253 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1254 dispatchers_.end(), 1255 pdispatcher); 1256 if (pos != dispatchers_.end()) 1257 return; 1258 dispatchers_.push_back(pdispatcher); 1259 } 1260 1261 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { 1262 CritScope cs(&crit_); 1263 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1264 dispatchers_.end(), 1265 pdispatcher); 1266 // We silently ignore duplicate calls to Add, so we should silently ignore 1267 // the (expected) symmetric calls to Remove. Note that this may still hide 1268 // a real issue, so we at least log a warning about it. 1269 if (pos == dispatchers_.end()) { 1270 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " 1271 << "dispatcher, potentially from a duplicate call to Add."; 1272 return; 1273 } 1274 size_t index = pos - dispatchers_.begin(); 1275 dispatchers_.erase(pos); 1276 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); 1277 ++it) { 1278 if (index < **it) { 1279 --**it; 1280 } 1281 } 1282 } 1283 1284 #if defined(WEBRTC_POSIX) 1285 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1286 // Calculate timing information 1287 1288 struct timeval *ptvWait = NULL; 1289 struct timeval tvWait; 1290 struct timeval tvStop; 1291 if (cmsWait != kForever) { 1292 // Calculate wait timeval 1293 tvWait.tv_sec = cmsWait / 1000; 1294 tvWait.tv_usec = (cmsWait % 1000) * 1000; 1295 ptvWait = &tvWait; 1296 1297 // Calculate when to return in a timeval 1298 gettimeofday(&tvStop, NULL); 1299 tvStop.tv_sec += tvWait.tv_sec; 1300 tvStop.tv_usec += tvWait.tv_usec; 1301 if (tvStop.tv_usec >= 1000000) { 1302 tvStop.tv_usec -= 1000000; 1303 tvStop.tv_sec += 1; 1304 } 1305 } 1306 1307 // Zero all fd_sets. Don't need to do this inside the loop since 1308 // select() zeros the descriptors not signaled 1309 1310 fd_set fdsRead; 1311 FD_ZERO(&fdsRead); 1312 fd_set fdsWrite; 1313 FD_ZERO(&fdsWrite); 1314 1315 fWait_ = true; 1316 1317 while (fWait_) { 1318 int fdmax = -1; 1319 { 1320 CritScope cr(&crit_); 1321 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1322 // Query dispatchers for read and write wait state 1323 Dispatcher *pdispatcher = dispatchers_[i]; 1324 ASSERT(pdispatcher); 1325 if (!process_io && (pdispatcher != signal_wakeup_)) 1326 continue; 1327 int fd = pdispatcher->GetDescriptor(); 1328 if (fd > fdmax) 1329 fdmax = fd; 1330 1331 uint32 ff = pdispatcher->GetRequestedEvents(); 1332 if (ff & (DE_READ | DE_ACCEPT)) 1333 FD_SET(fd, &fdsRead); 1334 if (ff & (DE_WRITE | DE_CONNECT)) 1335 FD_SET(fd, &fdsWrite); 1336 } 1337 } 1338 1339 // Wait then call handlers as appropriate 1340 // < 0 means error 1341 // 0 means timeout 1342 // > 0 means count of descriptors ready 1343 int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait); 1344 1345 // If error, return error. 1346 if (n < 0) { 1347 if (errno != EINTR) { 1348 LOG_E(LS_ERROR, EN, errno) << "select"; 1349 return false; 1350 } 1351 // Else ignore the error and keep going. If this EINTR was for one of the 1352 // signals managed by this PhysicalSocketServer, the 1353 // PosixSignalDeliveryDispatcher will be in the signaled state in the next 1354 // iteration. 1355 } else if (n == 0) { 1356 // If timeout, return success 1357 return true; 1358 } else { 1359 // We have signaled descriptors 1360 CritScope cr(&crit_); 1361 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1362 Dispatcher *pdispatcher = dispatchers_[i]; 1363 int fd = pdispatcher->GetDescriptor(); 1364 uint32 ff = 0; 1365 int errcode = 0; 1366 1367 // Reap any error code, which can be signaled through reads or writes. 1368 // TODO: Should we set errcode if getsockopt fails? 1369 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { 1370 socklen_t len = sizeof(errcode); 1371 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); 1372 } 1373 1374 // Check readable descriptors. If we're waiting on an accept, signal 1375 // that. Otherwise we're waiting for data, check to see if we're 1376 // readable or really closed. 1377 // TODO: Only peek at TCP descriptors. 1378 if (FD_ISSET(fd, &fdsRead)) { 1379 FD_CLR(fd, &fdsRead); 1380 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { 1381 ff |= DE_ACCEPT; 1382 } else if (errcode || pdispatcher->IsDescriptorClosed()) { 1383 ff |= DE_CLOSE; 1384 } else { 1385 ff |= DE_READ; 1386 } 1387 } 1388 1389 // Check writable descriptors. If we're waiting on a connect, detect 1390 // success versus failure by the reaped error code. 1391 if (FD_ISSET(fd, &fdsWrite)) { 1392 FD_CLR(fd, &fdsWrite); 1393 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) { 1394 if (!errcode) { 1395 ff |= DE_CONNECT; 1396 } else { 1397 ff |= DE_CLOSE; 1398 } 1399 } else { 1400 ff |= DE_WRITE; 1401 } 1402 } 1403 1404 // Tell the descriptor about the event. 1405 if (ff != 0) { 1406 pdispatcher->OnPreEvent(ff); 1407 pdispatcher->OnEvent(ff, errcode); 1408 } 1409 } 1410 } 1411 1412 // Recalc the time remaining to wait. Doing it here means it doesn't get 1413 // calced twice the first time through the loop 1414 if (ptvWait) { 1415 ptvWait->tv_sec = 0; 1416 ptvWait->tv_usec = 0; 1417 struct timeval tvT; 1418 gettimeofday(&tvT, NULL); 1419 if ((tvStop.tv_sec > tvT.tv_sec) 1420 || ((tvStop.tv_sec == tvT.tv_sec) 1421 && (tvStop.tv_usec > tvT.tv_usec))) { 1422 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec; 1423 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec; 1424 if (ptvWait->tv_usec < 0) { 1425 ASSERT(ptvWait->tv_sec > 0); 1426 ptvWait->tv_usec += 1000000; 1427 ptvWait->tv_sec -= 1; 1428 } 1429 } 1430 } 1431 } 1432 1433 return true; 1434 } 1435 1436 static void GlobalSignalHandler(int signum) { 1437 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); 1438 } 1439 1440 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, 1441 void (*handler)(int)) { 1442 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, 1443 // otherwise set one. 1444 if (handler == SIG_IGN || handler == SIG_DFL) { 1445 if (!InstallSignal(signum, handler)) { 1446 return false; 1447 } 1448 if (signal_dispatcher_) { 1449 signal_dispatcher_->ClearHandler(signum); 1450 if (!signal_dispatcher_->HasHandlers()) { 1451 signal_dispatcher_.reset(); 1452 } 1453 } 1454 } else { 1455 if (!signal_dispatcher_) { 1456 signal_dispatcher_.reset(new PosixSignalDispatcher(this)); 1457 } 1458 signal_dispatcher_->SetHandler(signum, handler); 1459 if (!InstallSignal(signum, &GlobalSignalHandler)) { 1460 return false; 1461 } 1462 } 1463 return true; 1464 } 1465 1466 Dispatcher* PhysicalSocketServer::signal_dispatcher() { 1467 return signal_dispatcher_.get(); 1468 } 1469 1470 bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) { 1471 struct sigaction act; 1472 // It doesn't really matter what we set this mask to. 1473 if (sigemptyset(&act.sa_mask) != 0) { 1474 LOG_ERR(LS_ERROR) << "Couldn't set mask"; 1475 return false; 1476 } 1477 act.sa_handler = handler; 1478 #if !defined(__native_client__) 1479 // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it 1480 // and it's a nuisance. Though some syscalls still return EINTR and there's no 1481 // real standard for which ones. :( 1482 act.sa_flags = SA_RESTART; 1483 #else 1484 act.sa_flags = 0; 1485 #endif 1486 if (sigaction(signum, &act, NULL) != 0) { 1487 LOG_ERR(LS_ERROR) << "Couldn't set sigaction"; 1488 return false; 1489 } 1490 return true; 1491 } 1492 #endif // WEBRTC_POSIX 1493 1494 #if defined(WEBRTC_WIN) 1495 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1496 int cmsTotal = cmsWait; 1497 int cmsElapsed = 0; 1498 uint32 msStart = Time(); 1499 1500 fWait_ = true; 1501 while (fWait_) { 1502 std::vector<WSAEVENT> events; 1503 std::vector<Dispatcher *> event_owners; 1504 1505 events.push_back(socket_ev_); 1506 1507 { 1508 CritScope cr(&crit_); 1509 size_t i = 0; 1510 iterators_.push_back(&i); 1511 // Don't track dispatchers_.size(), because we want to pick up any new 1512 // dispatchers that were added while processing the loop. 1513 while (i < dispatchers_.size()) { 1514 Dispatcher* disp = dispatchers_[i++]; 1515 if (!process_io && (disp != signal_wakeup_)) 1516 continue; 1517 SOCKET s = disp->GetSocket(); 1518 if (disp->CheckSignalClose()) { 1519 // We just signalled close, don't poll this socket 1520 } else if (s != INVALID_SOCKET) { 1521 WSAEventSelect(s, 1522 events[0], 1523 FlagsToEvents(disp->GetRequestedEvents())); 1524 } else { 1525 events.push_back(disp->GetWSAEvent()); 1526 event_owners.push_back(disp); 1527 } 1528 } 1529 ASSERT(iterators_.back() == &i); 1530 iterators_.pop_back(); 1531 } 1532 1533 // Which is shorter, the delay wait or the asked wait? 1534 1535 int cmsNext; 1536 if (cmsWait == kForever) { 1537 cmsNext = cmsWait; 1538 } else { 1539 cmsNext = _max(0, cmsTotal - cmsElapsed); 1540 } 1541 1542 // Wait for one of the events to signal 1543 DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), 1544 &events[0], 1545 false, 1546 cmsNext, 1547 false); 1548 1549 if (dw == WSA_WAIT_FAILED) { 1550 // Failed? 1551 // TODO: need a better strategy than this! 1552 WSAGetLastError(); 1553 ASSERT(false); 1554 return false; 1555 } else if (dw == WSA_WAIT_TIMEOUT) { 1556 // Timeout? 1557 return true; 1558 } else { 1559 // Figure out which one it is and call it 1560 CritScope cr(&crit_); 1561 int index = dw - WSA_WAIT_EVENT_0; 1562 if (index > 0) { 1563 --index; // The first event is the socket event 1564 event_owners[index]->OnPreEvent(0); 1565 event_owners[index]->OnEvent(0, 0); 1566 } else if (process_io) { 1567 size_t i = 0, end = dispatchers_.size(); 1568 iterators_.push_back(&i); 1569 iterators_.push_back(&end); // Don't iterate over new dispatchers. 1570 while (i < end) { 1571 Dispatcher* disp = dispatchers_[i++]; 1572 SOCKET s = disp->GetSocket(); 1573 if (s == INVALID_SOCKET) 1574 continue; 1575 1576 WSANETWORKEVENTS wsaEvents; 1577 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents); 1578 if (err == 0) { 1579 1580 #if LOGGING 1581 { 1582 if ((wsaEvents.lNetworkEvents & FD_READ) && 1583 wsaEvents.iErrorCode[FD_READ_BIT] != 0) { 1584 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error " 1585 << wsaEvents.iErrorCode[FD_READ_BIT]; 1586 } 1587 if ((wsaEvents.lNetworkEvents & FD_WRITE) && 1588 wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) { 1589 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error " 1590 << wsaEvents.iErrorCode[FD_WRITE_BIT]; 1591 } 1592 if ((wsaEvents.lNetworkEvents & FD_CONNECT) && 1593 wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) { 1594 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error " 1595 << wsaEvents.iErrorCode[FD_CONNECT_BIT]; 1596 } 1597 if ((wsaEvents.lNetworkEvents & FD_ACCEPT) && 1598 wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) { 1599 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error " 1600 << wsaEvents.iErrorCode[FD_ACCEPT_BIT]; 1601 } 1602 if ((wsaEvents.lNetworkEvents & FD_CLOSE) && 1603 wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) { 1604 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error " 1605 << wsaEvents.iErrorCode[FD_CLOSE_BIT]; 1606 } 1607 } 1608 #endif 1609 uint32 ff = 0; 1610 int errcode = 0; 1611 if (wsaEvents.lNetworkEvents & FD_READ) 1612 ff |= DE_READ; 1613 if (wsaEvents.lNetworkEvents & FD_WRITE) 1614 ff |= DE_WRITE; 1615 if (wsaEvents.lNetworkEvents & FD_CONNECT) { 1616 if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) { 1617 ff |= DE_CONNECT; 1618 } else { 1619 ff |= DE_CLOSE; 1620 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT]; 1621 } 1622 } 1623 if (wsaEvents.lNetworkEvents & FD_ACCEPT) 1624 ff |= DE_ACCEPT; 1625 if (wsaEvents.lNetworkEvents & FD_CLOSE) { 1626 ff |= DE_CLOSE; 1627 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT]; 1628 } 1629 if (ff != 0) { 1630 disp->OnPreEvent(ff); 1631 disp->OnEvent(ff, errcode); 1632 } 1633 } 1634 } 1635 ASSERT(iterators_.back() == &end); 1636 iterators_.pop_back(); 1637 ASSERT(iterators_.back() == &i); 1638 iterators_.pop_back(); 1639 } 1640 1641 // Reset the network event until new activity occurs 1642 WSAResetEvent(socket_ev_); 1643 } 1644 1645 // Break? 1646 if (!fWait_) 1647 break; 1648 cmsElapsed = TimeSince(msStart); 1649 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) { 1650 break; 1651 } 1652 } 1653 1654 // Done 1655 return true; 1656 } 1657 #endif // WEBRTC_WIN 1658 1659 } // namespace rtc 1660