Home | History | Annotate | Download | only in udp
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/udp/udp_socket_libevent.h"
      6 
      7 #include <errno.h>
      8 #include <fcntl.h>
      9 #include <netdb.h>
     10 #include <sys/socket.h>
     11 
     12 #include "base/eintr_wrapper.h"
     13 #include "base/logging.h"
     14 #include "base/message_loop.h"
     15 #include "base/metrics/stats_counters.h"
     16 #include "net/base/io_buffer.h"
     17 #include "net/base/ip_endpoint.h"
     18 #include "net/base/net_errors.h"
     19 #include "net/base/net_log.h"
     20 #include "net/base/net_util.h"
     21 #if defined(OS_POSIX)
     22 #include <netinet/in.h>
     23 #endif
     24 #if defined(USE_SYSTEM_LIBEVENT)
     25 #include <event.h>
     26 #else
     27 #include "third_party/libevent/event.h"
     28 #endif
     29 
     30 namespace net {
     31 
     32 UDPSocketLibevent::UDPSocketLibevent(net::NetLog* net_log,
     33                                      const net::NetLog::Source& source)
     34     : socket_(kInvalidSocket),
     35       read_watcher_(this),
     36       write_watcher_(this),
     37       read_buf_len_(0),
     38       recv_from_address_(NULL),
     39       write_buf_len_(0),
     40       read_callback_(NULL),
     41       write_callback_(NULL),
     42       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)) {
     43   scoped_refptr<NetLog::EventParameters> params;
     44   if (source.is_valid())
     45     params = new NetLogSourceParameter("source_dependency", source);
     46   net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE, params);
     47 }
     48 
     49 UDPSocketLibevent::~UDPSocketLibevent() {
     50   Close();
     51   net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE, NULL);
     52 }
     53 
     54 void UDPSocketLibevent::Close() {
     55   DCHECK(CalledOnValidThread());
     56 
     57   if (!is_connected())
     58     return;
     59 
     60   // Zero out any pending read/write callback state.
     61   read_buf_ = NULL;
     62   read_buf_len_ = 0;
     63   read_callback_ = NULL;
     64   recv_from_address_ = NULL;
     65   write_buf_ = NULL;
     66   write_buf_len_ = 0;
     67   write_callback_ = NULL;
     68   send_to_address_.reset();
     69 
     70   bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
     71   DCHECK(ok);
     72   ok = write_socket_watcher_.StopWatchingFileDescriptor();
     73   DCHECK(ok);
     74 
     75   if (HANDLE_EINTR(close(socket_)) < 0)
     76     PLOG(ERROR) << "close";
     77 
     78   socket_ = kInvalidSocket;
     79 }
     80 
     81 int UDPSocketLibevent::GetPeerAddress(IPEndPoint* address) const {
     82   DCHECK(CalledOnValidThread());
     83   DCHECK(address);
     84   if (!is_connected())
     85     return ERR_SOCKET_NOT_CONNECTED;
     86 
     87   if (!remote_address_.get()) {
     88     struct sockaddr_storage addr_storage;
     89     socklen_t addr_len = sizeof(addr_storage);
     90     struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
     91     if (getpeername(socket_, addr, &addr_len))
     92       return MapSystemError(errno);
     93     scoped_ptr<IPEndPoint> address(new IPEndPoint());
     94     if (!address->FromSockAddr(addr, addr_len))
     95       return ERR_FAILED;
     96     remote_address_.reset(address.release());
     97   }
     98 
     99   *address = *remote_address_;
    100   return OK;
    101 }
    102 
    103 int UDPSocketLibevent::GetLocalAddress(IPEndPoint* address) const {
    104   DCHECK(CalledOnValidThread());
    105   DCHECK(address);
    106   if (!is_connected())
    107     return ERR_SOCKET_NOT_CONNECTED;
    108 
    109   if (!local_address_.get()) {
    110     struct sockaddr_storage addr_storage;
    111     socklen_t addr_len = sizeof(addr_storage);
    112     struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
    113     if (getsockname(socket_, addr, &addr_len))
    114       return MapSystemError(errno);
    115     scoped_ptr<IPEndPoint> address(new IPEndPoint());
    116     if (!address->FromSockAddr(addr, addr_len))
    117       return ERR_FAILED;
    118     local_address_.reset(address.release());
    119   }
    120 
    121   *address = *local_address_;
    122   return OK;
    123 }
    124 
    125 int UDPSocketLibevent::Read(IOBuffer* buf,
    126                             int buf_len,
    127                             CompletionCallback* callback) {
    128   return RecvFrom(buf, buf_len, NULL, callback);
    129 }
    130 
    131 int UDPSocketLibevent::RecvFrom(IOBuffer* buf,
    132                                 int buf_len,
    133                                 IPEndPoint* address,
    134                                 CompletionCallback* callback) {
    135   DCHECK(CalledOnValidThread());
    136   DCHECK_NE(kInvalidSocket, socket_);
    137   DCHECK(!read_callback_);
    138   DCHECK(!recv_from_address_);
    139   DCHECK(callback);  // Synchronous operation not supported
    140   DCHECK_GT(buf_len, 0);
    141 
    142   int nread = InternalRecvFrom(buf, buf_len, address);
    143   if (nread != ERR_IO_PENDING)
    144     return nread;
    145 
    146   if (!MessageLoopForIO::current()->WatchFileDescriptor(
    147           socket_, true, MessageLoopForIO::WATCH_READ,
    148           &read_socket_watcher_, &read_watcher_)) {
    149     PLOG(ERROR) << "WatchFileDescriptor failed on read";
    150     return MapSystemError(errno);
    151   }
    152 
    153   read_buf_ = buf;
    154   read_buf_len_ = buf_len;
    155   recv_from_address_ = address;
    156   read_callback_ = callback;
    157   return ERR_IO_PENDING;
    158 }
    159 
    160 int UDPSocketLibevent::Write(IOBuffer* buf,
    161                              int buf_len,
    162                              CompletionCallback* callback) {
    163   return SendToOrWrite(buf, buf_len, NULL, callback);
    164 }
    165 
    166 int UDPSocketLibevent::SendTo(IOBuffer* buf,
    167                               int buf_len,
    168                               const IPEndPoint& address,
    169                               CompletionCallback* callback) {
    170   return SendToOrWrite(buf, buf_len, &address, callback);
    171 }
    172 
    173 int UDPSocketLibevent::SendToOrWrite(IOBuffer* buf,
    174                                      int buf_len,
    175                                      const IPEndPoint* address,
    176                                      CompletionCallback* callback) {
    177   DCHECK(CalledOnValidThread());
    178   DCHECK_NE(kInvalidSocket, socket_);
    179   DCHECK(!write_callback_);
    180   DCHECK(callback);  // Synchronous operation not supported
    181   DCHECK_GT(buf_len, 0);
    182 
    183   int nwrite = InternalSendTo(buf, buf_len, address);
    184   if (nwrite >= 0) {
    185     base::StatsCounter write_bytes("udp.write_bytes");
    186     write_bytes.Add(nwrite);
    187     return nwrite;
    188   }
    189   if (errno != EAGAIN && errno != EWOULDBLOCK)
    190     return MapSystemError(errno);
    191 
    192   if (!MessageLoopForIO::current()->WatchFileDescriptor(
    193           socket_, true, MessageLoopForIO::WATCH_WRITE,
    194           &write_socket_watcher_, &write_watcher_)) {
    195     DVLOG(1) << "WatchFileDescriptor failed on write, errno " << errno;
    196     return MapSystemError(errno);
    197   }
    198 
    199   write_buf_ = buf;
    200   write_buf_len_ = buf_len;
    201   DCHECK(!send_to_address_.get());
    202   if (address) {
    203     send_to_address_.reset(new IPEndPoint(*address));
    204   }
    205   write_callback_ = callback;
    206   return ERR_IO_PENDING;
    207 }
    208 
    209 int UDPSocketLibevent::Connect(const IPEndPoint& address) {
    210   DCHECK(!is_connected());
    211   DCHECK(!remote_address_.get());
    212   int rv = CreateSocket(address);
    213   if (rv < 0)
    214     return rv;
    215 
    216   struct sockaddr_storage addr_storage;
    217   size_t addr_len = sizeof(addr_storage);
    218   struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
    219   if (!address.ToSockAddr(addr, &addr_len))
    220     return ERR_FAILED;
    221 
    222   rv = HANDLE_EINTR(connect(socket_, addr, addr_len));
    223   if (rv < 0)
    224     return MapSystemError(errno);
    225 
    226   remote_address_.reset(new IPEndPoint(address));
    227   return rv;
    228 }
    229 
    230 int UDPSocketLibevent::Bind(const IPEndPoint& address) {
    231   DCHECK(!is_connected());
    232   DCHECK(!local_address_.get());
    233   int rv = CreateSocket(address);
    234   if (rv < 0)
    235     return rv;
    236 
    237   struct sockaddr_storage addr_storage;
    238   size_t addr_len = sizeof(addr_storage);
    239   struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
    240   if (!address.ToSockAddr(addr, &addr_len))
    241     return ERR_FAILED;
    242 
    243   rv = bind(socket_, addr, addr_len);
    244   if (rv < 0)
    245     return MapSystemError(errno);
    246 
    247   local_address_.reset();
    248   return rv;
    249 }
    250 
    251 void UDPSocketLibevent::DoReadCallback(int rv) {
    252   DCHECK_NE(rv, ERR_IO_PENDING);
    253   DCHECK(read_callback_);
    254 
    255   // since Run may result in Read being called, clear read_callback_ up front.
    256   CompletionCallback* c = read_callback_;
    257   read_callback_ = NULL;
    258   c->Run(rv);
    259 }
    260 
    261 void UDPSocketLibevent::DoWriteCallback(int rv) {
    262   DCHECK_NE(rv, ERR_IO_PENDING);
    263   DCHECK(write_callback_);
    264 
    265   // since Run may result in Write being called, clear write_callback_ up front.
    266   CompletionCallback* c = write_callback_;
    267   write_callback_ = NULL;
    268   c->Run(rv);
    269 }
    270 
    271 void UDPSocketLibevent::DidCompleteRead() {
    272   int result = InternalRecvFrom(read_buf_, read_buf_len_, recv_from_address_);
    273   if (result != ERR_IO_PENDING) {
    274     read_buf_ = NULL;
    275     read_buf_len_ = 0;
    276     recv_from_address_ = NULL;
    277     bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
    278     DCHECK(ok);
    279     DoReadCallback(result);
    280   }
    281 }
    282 
    283 int UDPSocketLibevent::CreateSocket(const IPEndPoint& address) {
    284   socket_ = socket(address.GetFamily(), SOCK_DGRAM, 0);
    285   if (socket_ == kInvalidSocket)
    286     return MapSystemError(errno);
    287   if (SetNonBlocking(socket_)) {
    288     const int err = MapSystemError(errno);
    289     Close();
    290     return err;
    291   }
    292   return OK;
    293 }
    294 
    295 void UDPSocketLibevent::DidCompleteWrite() {
    296   int result = InternalSendTo(write_buf_, write_buf_len_,
    297                               send_to_address_.get());
    298   if (result >= 0) {
    299     base::StatsCounter write_bytes("udp.write_bytes");
    300     write_bytes.Add(result);
    301   } else {
    302     result = MapSystemError(errno);
    303   }
    304 
    305   if (result != ERR_IO_PENDING) {
    306     write_buf_ = NULL;
    307     write_buf_len_ = 0;
    308     send_to_address_.reset();
    309     write_socket_watcher_.StopWatchingFileDescriptor();
    310     DoWriteCallback(result);
    311   }
    312 }
    313 
    314 int UDPSocketLibevent::InternalRecvFrom(IOBuffer* buf, int buf_len,
    315                                         IPEndPoint* address) {
    316   int bytes_transferred;
    317   int flags = 0;
    318 
    319   struct sockaddr_storage addr_storage;
    320   socklen_t addr_len = sizeof(addr_storage);
    321   struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
    322 
    323   bytes_transferred =
    324       HANDLE_EINTR(recvfrom(socket_,
    325                             buf->data(),
    326                             buf_len,
    327                             flags,
    328                             addr,
    329                             &addr_len));
    330   int result;
    331   if (bytes_transferred >= 0) {
    332     result = bytes_transferred;
    333     base::StatsCounter read_bytes("udp.read_bytes");
    334     read_bytes.Add(bytes_transferred);
    335     if (address) {
    336       if (!address->FromSockAddr(addr, addr_len))
    337         result = ERR_FAILED;
    338     }
    339   } else {
    340     result = MapSystemError(errno);
    341   }
    342   return result;
    343 }
    344 
    345 int UDPSocketLibevent::InternalSendTo(IOBuffer* buf, int buf_len,
    346                                       const IPEndPoint* address) {
    347   struct sockaddr_storage addr_storage;
    348   size_t addr_len = sizeof(addr_storage);
    349   struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
    350 
    351   if (!address) {
    352     addr = NULL;
    353     addr_len = 0;
    354   } else {
    355     if (!address->ToSockAddr(addr, &addr_len))
    356       return ERR_FAILED;
    357   }
    358 
    359   return HANDLE_EINTR(sendto(socket_,
    360                              buf->data(),
    361                              buf_len,
    362                              0,
    363                              addr,
    364                              addr_len));
    365 }
    366 
    367 }  // namespace net
    368