1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/udp/udp_socket_libevent.h" 6 7 #include <errno.h> 8 #include <fcntl.h> 9 #include <netdb.h> 10 #include <sys/socket.h> 11 #include <netinet/in.h> 12 13 #include "base/callback.h" 14 #include "base/logging.h" 15 #include "base/message_loop/message_loop.h" 16 #include "base/metrics/stats_counters.h" 17 #include "base/posix/eintr_wrapper.h" 18 #include "base/rand_util.h" 19 #include "net/base/io_buffer.h" 20 #include "net/base/ip_endpoint.h" 21 #include "net/base/net_errors.h" 22 #include "net/base/net_log.h" 23 #include "net/base/net_util.h" 24 #include "net/udp/udp_net_log_parameters.h" 25 26 namespace { 27 28 const int kBindRetries = 10; 29 const int kPortStart = 1024; 30 const int kPortEnd = 65535; 31 32 } // namespace 33 34 namespace net { 35 36 UDPSocketLibevent::UDPSocketLibevent( 37 DatagramSocket::BindType bind_type, 38 const RandIntCallback& rand_int_cb, 39 net::NetLog* net_log, 40 const net::NetLog::Source& source) 41 : socket_(kInvalidSocket), 42 addr_family_(0), 43 socket_options_(SOCKET_OPTION_MULTICAST_LOOP), 44 multicast_time_to_live_(1), 45 bind_type_(bind_type), 46 rand_int_cb_(rand_int_cb), 47 read_watcher_(this), 48 write_watcher_(this), 49 read_buf_len_(0), 50 recv_from_address_(NULL), 51 write_buf_len_(0), 52 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)) { 53 net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE, 54 source.ToEventParametersCallback()); 55 if (bind_type == DatagramSocket::RANDOM_BIND) 56 DCHECK(!rand_int_cb.is_null()); 57 } 58 59 UDPSocketLibevent::~UDPSocketLibevent() { 60 Close(); 61 net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE); 62 } 63 64 void UDPSocketLibevent::Close() { 65 DCHECK(CalledOnValidThread()); 66 67 if (!is_connected()) 68 return; 69 70 // Zero out any pending read/write callback state. 71 read_buf_ = NULL; 72 read_buf_len_ = 0; 73 read_callback_.Reset(); 74 recv_from_address_ = NULL; 75 write_buf_ = NULL; 76 write_buf_len_ = 0; 77 write_callback_.Reset(); 78 send_to_address_.reset(); 79 80 bool ok = read_socket_watcher_.StopWatchingFileDescriptor(); 81 DCHECK(ok); 82 ok = write_socket_watcher_.StopWatchingFileDescriptor(); 83 DCHECK(ok); 84 85 if (HANDLE_EINTR(close(socket_)) < 0) 86 PLOG(ERROR) << "close"; 87 88 socket_ = kInvalidSocket; 89 addr_family_ = 0; 90 } 91 92 int UDPSocketLibevent::GetPeerAddress(IPEndPoint* address) const { 93 DCHECK(CalledOnValidThread()); 94 DCHECK(address); 95 if (!is_connected()) 96 return ERR_SOCKET_NOT_CONNECTED; 97 98 if (!remote_address_.get()) { 99 SockaddrStorage storage; 100 if (getpeername(socket_, storage.addr, &storage.addr_len)) 101 return MapSystemError(errno); 102 scoped_ptr<IPEndPoint> address(new IPEndPoint()); 103 if (!address->FromSockAddr(storage.addr, storage.addr_len)) 104 return ERR_FAILED; 105 remote_address_.reset(address.release()); 106 } 107 108 *address = *remote_address_; 109 return OK; 110 } 111 112 int UDPSocketLibevent::GetLocalAddress(IPEndPoint* address) const { 113 DCHECK(CalledOnValidThread()); 114 DCHECK(address); 115 if (!is_connected()) 116 return ERR_SOCKET_NOT_CONNECTED; 117 118 if (!local_address_.get()) { 119 SockaddrStorage storage; 120 if (getsockname(socket_, storage.addr, &storage.addr_len)) 121 return MapSystemError(errno); 122 scoped_ptr<IPEndPoint> address(new IPEndPoint()); 123 if (!address->FromSockAddr(storage.addr, storage.addr_len)) 124 return ERR_FAILED; 125 local_address_.reset(address.release()); 126 net_log_.AddEvent(NetLog::TYPE_UDP_LOCAL_ADDRESS, 127 CreateNetLogUDPConnectCallback(local_address_.get())); 128 } 129 130 *address = *local_address_; 131 return OK; 132 } 133 134 int UDPSocketLibevent::Read(IOBuffer* buf, 135 int buf_len, 136 const CompletionCallback& callback) { 137 return RecvFrom(buf, buf_len, NULL, callback); 138 } 139 140 int UDPSocketLibevent::RecvFrom(IOBuffer* buf, 141 int buf_len, 142 IPEndPoint* address, 143 const CompletionCallback& callback) { 144 DCHECK(CalledOnValidThread()); 145 DCHECK_NE(kInvalidSocket, socket_); 146 DCHECK(read_callback_.is_null()); 147 DCHECK(!recv_from_address_); 148 DCHECK(!callback.is_null()); // Synchronous operation not supported 149 DCHECK_GT(buf_len, 0); 150 151 int nread = InternalRecvFrom(buf, buf_len, address); 152 if (nread != ERR_IO_PENDING) 153 return nread; 154 155 if (!base::MessageLoopForIO::current()->WatchFileDescriptor( 156 socket_, true, base::MessageLoopForIO::WATCH_READ, 157 &read_socket_watcher_, &read_watcher_)) { 158 PLOG(ERROR) << "WatchFileDescriptor failed on read"; 159 int result = MapSystemError(errno); 160 LogRead(result, NULL, 0, NULL); 161 return result; 162 } 163 164 read_buf_ = buf; 165 read_buf_len_ = buf_len; 166 recv_from_address_ = address; 167 read_callback_ = callback; 168 return ERR_IO_PENDING; 169 } 170 171 int UDPSocketLibevent::Write(IOBuffer* buf, 172 int buf_len, 173 const CompletionCallback& callback) { 174 return SendToOrWrite(buf, buf_len, NULL, callback); 175 } 176 177 int UDPSocketLibevent::SendTo(IOBuffer* buf, 178 int buf_len, 179 const IPEndPoint& address, 180 const CompletionCallback& callback) { 181 return SendToOrWrite(buf, buf_len, &address, callback); 182 } 183 184 int UDPSocketLibevent::SendToOrWrite(IOBuffer* buf, 185 int buf_len, 186 const IPEndPoint* address, 187 const CompletionCallback& callback) { 188 DCHECK(CalledOnValidThread()); 189 DCHECK_NE(kInvalidSocket, socket_); 190 DCHECK(write_callback_.is_null()); 191 DCHECK(!callback.is_null()); // Synchronous operation not supported 192 DCHECK_GT(buf_len, 0); 193 194 int result = InternalSendTo(buf, buf_len, address); 195 if (result != ERR_IO_PENDING) 196 return result; 197 198 if (!base::MessageLoopForIO::current()->WatchFileDescriptor( 199 socket_, true, base::MessageLoopForIO::WATCH_WRITE, 200 &write_socket_watcher_, &write_watcher_)) { 201 DVLOG(1) << "WatchFileDescriptor failed on write, errno " << errno; 202 int result = MapSystemError(errno); 203 LogWrite(result, NULL, NULL); 204 return result; 205 } 206 207 write_buf_ = buf; 208 write_buf_len_ = buf_len; 209 DCHECK(!send_to_address_.get()); 210 if (address) { 211 send_to_address_.reset(new IPEndPoint(*address)); 212 } 213 write_callback_ = callback; 214 return ERR_IO_PENDING; 215 } 216 217 int UDPSocketLibevent::Connect(const IPEndPoint& address) { 218 net_log_.BeginEvent(NetLog::TYPE_UDP_CONNECT, 219 CreateNetLogUDPConnectCallback(&address)); 220 int rv = InternalConnect(address); 221 if (rv != OK) 222 Close(); 223 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_UDP_CONNECT, rv); 224 return rv; 225 } 226 227 int UDPSocketLibevent::InternalConnect(const IPEndPoint& address) { 228 DCHECK(CalledOnValidThread()); 229 DCHECK(!is_connected()); 230 DCHECK(!remote_address_.get()); 231 int rv = CreateSocket(address); 232 if (rv < 0) 233 return rv; 234 235 if (bind_type_ == DatagramSocket::RANDOM_BIND) 236 rv = RandomBind(address); 237 // else connect() does the DatagramSocket::DEFAULT_BIND 238 239 if (rv < 0) { 240 Close(); 241 return rv; 242 } 243 244 SockaddrStorage storage; 245 if (!address.ToSockAddr(storage.addr, &storage.addr_len)) { 246 Close(); 247 return ERR_ADDRESS_INVALID; 248 } 249 250 rv = HANDLE_EINTR(connect(socket_, storage.addr, storage.addr_len)); 251 if (rv < 0) { 252 // Close() may change the current errno. Map errno beforehand. 253 int result = MapSystemError(errno); 254 Close(); 255 return result; 256 } 257 258 remote_address_.reset(new IPEndPoint(address)); 259 return rv; 260 } 261 262 int UDPSocketLibevent::Bind(const IPEndPoint& address) { 263 DCHECK(CalledOnValidThread()); 264 DCHECK(!is_connected()); 265 int rv = CreateSocket(address); 266 if (rv < 0) 267 return rv; 268 269 rv = SetSocketOptions(); 270 if (rv < 0) { 271 Close(); 272 return rv; 273 } 274 rv = DoBind(address); 275 if (rv < 0) { 276 Close(); 277 return rv; 278 } 279 local_address_.reset(); 280 return rv; 281 } 282 283 bool UDPSocketLibevent::SetReceiveBufferSize(int32 size) { 284 DCHECK(CalledOnValidThread()); 285 int rv = setsockopt(socket_, SOL_SOCKET, SO_RCVBUF, 286 reinterpret_cast<const char*>(&size), sizeof(size)); 287 DCHECK(!rv) << "Could not set socket receive buffer size: " << errno; 288 return rv == 0; 289 } 290 291 bool UDPSocketLibevent::SetSendBufferSize(int32 size) { 292 DCHECK(CalledOnValidThread()); 293 int rv = setsockopt(socket_, SOL_SOCKET, SO_SNDBUF, 294 reinterpret_cast<const char*>(&size), sizeof(size)); 295 DCHECK(!rv) << "Could not set socket send buffer size: " << errno; 296 return rv == 0; 297 } 298 299 void UDPSocketLibevent::AllowAddressReuse() { 300 DCHECK(CalledOnValidThread()); 301 DCHECK(!is_connected()); 302 303 socket_options_ |= SOCKET_OPTION_REUSE_ADDRESS; 304 } 305 306 void UDPSocketLibevent::AllowBroadcast() { 307 DCHECK(CalledOnValidThread()); 308 DCHECK(!is_connected()); 309 310 socket_options_ |= SOCKET_OPTION_BROADCAST; 311 } 312 313 void UDPSocketLibevent::ReadWatcher::OnFileCanReadWithoutBlocking(int) { 314 if (!socket_->read_callback_.is_null()) 315 socket_->DidCompleteRead(); 316 } 317 318 void UDPSocketLibevent::WriteWatcher::OnFileCanWriteWithoutBlocking(int) { 319 if (!socket_->write_callback_.is_null()) 320 socket_->DidCompleteWrite(); 321 } 322 323 void UDPSocketLibevent::DoReadCallback(int rv) { 324 DCHECK_NE(rv, ERR_IO_PENDING); 325 DCHECK(!read_callback_.is_null()); 326 327 // since Run may result in Read being called, clear read_callback_ up front. 328 CompletionCallback c = read_callback_; 329 read_callback_.Reset(); 330 c.Run(rv); 331 } 332 333 void UDPSocketLibevent::DoWriteCallback(int rv) { 334 DCHECK_NE(rv, ERR_IO_PENDING); 335 DCHECK(!write_callback_.is_null()); 336 337 // since Run may result in Write being called, clear write_callback_ up front. 338 CompletionCallback c = write_callback_; 339 write_callback_.Reset(); 340 c.Run(rv); 341 } 342 343 void UDPSocketLibevent::DidCompleteRead() { 344 int result = 345 InternalRecvFrom(read_buf_.get(), read_buf_len_, recv_from_address_); 346 if (result != ERR_IO_PENDING) { 347 read_buf_ = NULL; 348 read_buf_len_ = 0; 349 recv_from_address_ = NULL; 350 bool ok = read_socket_watcher_.StopWatchingFileDescriptor(); 351 DCHECK(ok); 352 DoReadCallback(result); 353 } 354 } 355 356 void UDPSocketLibevent::LogRead(int result, 357 const char* bytes, 358 socklen_t addr_len, 359 const sockaddr* addr) const { 360 if (result < 0) { 361 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result); 362 return; 363 } 364 365 if (net_log_.IsLoggingAllEvents()) { 366 DCHECK(addr_len > 0); 367 DCHECK(addr); 368 369 IPEndPoint address; 370 bool is_address_valid = address.FromSockAddr(addr, addr_len); 371 net_log_.AddEvent( 372 NetLog::TYPE_UDP_BYTES_RECEIVED, 373 CreateNetLogUDPDataTranferCallback( 374 result, bytes, 375 is_address_valid ? &address : NULL)); 376 } 377 378 base::StatsCounter read_bytes("udp.read_bytes"); 379 read_bytes.Add(result); 380 } 381 382 int UDPSocketLibevent::CreateSocket(const IPEndPoint& address) { 383 addr_family_ = address.GetSockAddrFamily(); 384 socket_ = socket(addr_family_, SOCK_DGRAM, 0); 385 if (socket_ == kInvalidSocket) 386 return MapSystemError(errno); 387 if (SetNonBlocking(socket_)) { 388 const int err = MapSystemError(errno); 389 Close(); 390 return err; 391 } 392 return OK; 393 } 394 395 void UDPSocketLibevent::DidCompleteWrite() { 396 int result = 397 InternalSendTo(write_buf_.get(), write_buf_len_, send_to_address_.get()); 398 399 if (result != ERR_IO_PENDING) { 400 write_buf_ = NULL; 401 write_buf_len_ = 0; 402 send_to_address_.reset(); 403 write_socket_watcher_.StopWatchingFileDescriptor(); 404 DoWriteCallback(result); 405 } 406 } 407 408 void UDPSocketLibevent::LogWrite(int result, 409 const char* bytes, 410 const IPEndPoint* address) const { 411 if (result < 0) { 412 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_SEND_ERROR, result); 413 return; 414 } 415 416 if (net_log_.IsLoggingAllEvents()) { 417 net_log_.AddEvent( 418 NetLog::TYPE_UDP_BYTES_SENT, 419 CreateNetLogUDPDataTranferCallback(result, bytes, address)); 420 } 421 422 base::StatsCounter write_bytes("udp.write_bytes"); 423 write_bytes.Add(result); 424 } 425 426 int UDPSocketLibevent::InternalRecvFrom(IOBuffer* buf, int buf_len, 427 IPEndPoint* address) { 428 int bytes_transferred; 429 int flags = 0; 430 431 SockaddrStorage storage; 432 433 bytes_transferred = 434 HANDLE_EINTR(recvfrom(socket_, 435 buf->data(), 436 buf_len, 437 flags, 438 storage.addr, 439 &storage.addr_len)); 440 int result; 441 if (bytes_transferred >= 0) { 442 result = bytes_transferred; 443 if (address && !address->FromSockAddr(storage.addr, storage.addr_len)) 444 result = ERR_FAILED; 445 } else { 446 result = MapSystemError(errno); 447 } 448 if (result != ERR_IO_PENDING) 449 LogRead(result, buf->data(), storage.addr_len, storage.addr); 450 return result; 451 } 452 453 int UDPSocketLibevent::InternalSendTo(IOBuffer* buf, int buf_len, 454 const IPEndPoint* address) { 455 SockaddrStorage storage; 456 struct sockaddr* addr = storage.addr; 457 if (!address) { 458 addr = NULL; 459 storage.addr_len = 0; 460 } else { 461 if (!address->ToSockAddr(storage.addr, &storage.addr_len)) { 462 int result = ERR_FAILED; 463 LogWrite(result, NULL, NULL); 464 return result; 465 } 466 } 467 468 int result = HANDLE_EINTR(sendto(socket_, 469 buf->data(), 470 buf_len, 471 0, 472 addr, 473 storage.addr_len)); 474 if (result < 0) 475 result = MapSystemError(errno); 476 if (result != ERR_IO_PENDING) 477 LogWrite(result, buf->data(), address); 478 return result; 479 } 480 481 int UDPSocketLibevent::SetSocketOptions() { 482 int true_value = 1; 483 if (socket_options_ & SOCKET_OPTION_REUSE_ADDRESS) { 484 int rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &true_value, 485 sizeof(true_value)); 486 if (rv < 0) 487 return MapSystemError(errno); 488 } 489 if (socket_options_ & SOCKET_OPTION_BROADCAST) { 490 int rv; 491 #if defined(OS_MACOSX) 492 // SO_REUSEPORT on OSX permits multiple processes to each receive 493 // UDP multicast or broadcast datagrams destined for the bound 494 // port. 495 rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &true_value, 496 sizeof(true_value)); 497 #else 498 rv = setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &true_value, 499 sizeof(true_value)); 500 #endif // defined(OS_MACOSX) 501 if (rv < 0) 502 return MapSystemError(errno); 503 } 504 505 if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) { 506 int rv; 507 if (addr_family_ == AF_INET) { 508 u_char loop = 0; 509 rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_LOOP, 510 &loop, sizeof(loop)); 511 } else { 512 u_int loop = 0; 513 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, 514 &loop, sizeof(loop)); 515 } 516 if (rv < 0) 517 return MapSystemError(errno); 518 } 519 if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) { 520 int rv; 521 if (addr_family_ == AF_INET) { 522 u_char ttl = multicast_time_to_live_; 523 rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_TTL, 524 &ttl, sizeof(ttl)); 525 } else { 526 // Signed interger. -1 to use route default. 527 int ttl = multicast_time_to_live_; 528 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, 529 &ttl, sizeof(ttl)); 530 } 531 if (rv < 0) 532 return MapSystemError(errno); 533 } 534 return OK; 535 } 536 537 int UDPSocketLibevent::DoBind(const IPEndPoint& address) { 538 SockaddrStorage storage; 539 if (!address.ToSockAddr(storage.addr, &storage.addr_len)) 540 return ERR_ADDRESS_INVALID; 541 int rv = bind(socket_, storage.addr, storage.addr_len); 542 return rv < 0 ? MapSystemError(errno) : rv; 543 } 544 545 int UDPSocketLibevent::RandomBind(const IPEndPoint& address) { 546 DCHECK(bind_type_ == DatagramSocket::RANDOM_BIND && !rand_int_cb_.is_null()); 547 548 // Construct IPAddressNumber of appropriate size (IPv4 or IPv6) of 0s. 549 IPAddressNumber ip(address.address().size()); 550 551 for (int i = 0; i < kBindRetries; ++i) { 552 int rv = DoBind(IPEndPoint(ip, rand_int_cb_.Run(kPortStart, kPortEnd))); 553 if (rv == OK || rv != ERR_ADDRESS_IN_USE) 554 return rv; 555 } 556 return DoBind(IPEndPoint(ip, 0)); 557 } 558 559 int UDPSocketLibevent::JoinGroup(const IPAddressNumber& group_address) const { 560 DCHECK(CalledOnValidThread()); 561 if (!is_connected()) 562 return ERR_SOCKET_NOT_CONNECTED; 563 564 switch (group_address.size()) { 565 case kIPv4AddressSize: { 566 if (addr_family_ != AF_INET) 567 return ERR_ADDRESS_INVALID; 568 ip_mreq mreq; 569 mreq.imr_interface.s_addr = INADDR_ANY; 570 memcpy(&mreq.imr_multiaddr, &group_address[0], kIPv4AddressSize); 571 int rv = setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP, 572 &mreq, sizeof(mreq)); 573 if (rv < 0) 574 return MapSystemError(errno); 575 return OK; 576 } 577 case kIPv6AddressSize: { 578 if (addr_family_ != AF_INET6) 579 return ERR_ADDRESS_INVALID; 580 ipv6_mreq mreq; 581 mreq.ipv6mr_interface = 0; // 0 indicates default multicast interface. 582 memcpy(&mreq.ipv6mr_multiaddr, &group_address[0], kIPv6AddressSize); 583 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_JOIN_GROUP, 584 &mreq, sizeof(mreq)); 585 if (rv < 0) 586 return MapSystemError(errno); 587 return OK; 588 } 589 default: 590 NOTREACHED() << "Invalid address family"; 591 return ERR_ADDRESS_INVALID; 592 } 593 } 594 595 int UDPSocketLibevent::LeaveGroup(const IPAddressNumber& group_address) const { 596 DCHECK(CalledOnValidThread()); 597 598 if (!is_connected()) 599 return ERR_SOCKET_NOT_CONNECTED; 600 601 switch (group_address.size()) { 602 case kIPv4AddressSize: { 603 if (addr_family_ != AF_INET) 604 return ERR_ADDRESS_INVALID; 605 ip_mreq mreq; 606 mreq.imr_interface.s_addr = INADDR_ANY; 607 memcpy(&mreq.imr_multiaddr, &group_address[0], kIPv4AddressSize); 608 int rv = setsockopt(socket_, IPPROTO_IP, IP_DROP_MEMBERSHIP, 609 &mreq, sizeof(mreq)); 610 if (rv < 0) 611 return MapSystemError(errno); 612 return OK; 613 } 614 case kIPv6AddressSize: { 615 if (addr_family_ != AF_INET6) 616 return ERR_ADDRESS_INVALID; 617 ipv6_mreq mreq; 618 mreq.ipv6mr_interface = 0; // 0 indicates default multicast interface. 619 memcpy(&mreq.ipv6mr_multiaddr, &group_address[0], kIPv6AddressSize); 620 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_LEAVE_GROUP, 621 &mreq, sizeof(mreq)); 622 if (rv < 0) 623 return MapSystemError(errno); 624 return OK; 625 } 626 default: 627 NOTREACHED() << "Invalid address family"; 628 return ERR_ADDRESS_INVALID; 629 } 630 } 631 632 int UDPSocketLibevent::SetMulticastTimeToLive(int time_to_live) { 633 DCHECK(CalledOnValidThread()); 634 if (is_connected()) 635 return ERR_SOCKET_IS_CONNECTED; 636 637 if (time_to_live < 0 || time_to_live > 255) 638 return ERR_INVALID_ARGUMENT; 639 multicast_time_to_live_ = time_to_live; 640 return OK; 641 } 642 643 int UDPSocketLibevent::SetMulticastLoopbackMode(bool loopback) { 644 DCHECK(CalledOnValidThread()); 645 if (is_connected()) 646 return ERR_SOCKET_IS_CONNECTED; 647 648 if (loopback) 649 socket_options_ |= SOCKET_OPTION_MULTICAST_LOOP; 650 else 651 socket_options_ &= ~SOCKET_OPTION_MULTICAST_LOOP; 652 return OK; 653 } 654 } // namespace net 655