Home | History | Annotate | Download | only in glue
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "jingle/glue/pseudotcp_adapter.h"
      6 
      7 #include "base/compiler_specific.h"
      8 #include "base/logging.h"
      9 #include "base/time/time.h"
     10 #include "net/base/address_list.h"
     11 #include "net/base/completion_callback.h"
     12 #include "net/base/io_buffer.h"
     13 #include "net/base/net_errors.h"
     14 #include "net/base/net_util.h"
     15 
     16 using cricket::PseudoTcp;
     17 
     18 namespace {
     19 const int kReadBufferSize = 65536;  // Maximum size of a packet.
     20 const uint16 kDefaultMtu = 1280;
     21 }  // namespace
     22 
     23 namespace jingle_glue {
     24 
     25 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
     26                                public base::RefCounted<Core> {
     27  public:
     28   Core(net::Socket* socket);
     29 
     30   // Functions used to implement net::StreamSocket.
     31   int Read(net::IOBuffer* buffer, int buffer_size,
     32            const net::CompletionCallback& callback);
     33   int Write(net::IOBuffer* buffer, int buffer_size,
     34             const net::CompletionCallback& callback);
     35   int Connect(const net::CompletionCallback& callback);
     36   void Disconnect();
     37   bool IsConnected() const;
     38 
     39   // cricket::IPseudoTcpNotify interface.
     40   // These notifications are triggered from NotifyPacket.
     41   virtual void OnTcpOpen(cricket::PseudoTcp* tcp) OVERRIDE;
     42   virtual void OnTcpReadable(cricket::PseudoTcp* tcp) OVERRIDE;
     43   virtual void OnTcpWriteable(cricket::PseudoTcp* tcp) OVERRIDE;
     44   // This is triggered by NotifyClock or NotifyPacket.
     45   virtual void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) OVERRIDE;
     46   // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
     47   virtual WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
     48                                      const char* buffer, size_t len) OVERRIDE;
     49 
     50   void SetAckDelay(int delay_ms);
     51   void SetNoDelay(bool no_delay);
     52   void SetReceiveBufferSize(int32 size);
     53   void SetSendBufferSize(int32 size);
     54   void SetWriteWaitsForSend(bool write_waits_for_send);
     55 
     56   void DeleteSocket();
     57 
     58  private:
     59   friend class base::RefCounted<Core>;
     60   virtual ~Core();
     61 
     62   // These are invoked by the underlying Socket, and may trigger callbacks.
     63   // They hold a reference to |this| while running, to protect from deletion.
     64   void OnRead(int result);
     65   void OnWritten(int result);
     66 
     67   // These may trigger callbacks, so the holder must hold a reference on
     68   // the stack while calling them.
     69   void DoReadFromSocket();
     70   void HandleReadResults(int result);
     71   void HandleTcpClock();
     72 
     73   // Checks if current write has completed in the write-waits-for-send
     74   // mode.
     75   void CheckWriteComplete();
     76 
     77   // This re-sets |timer| without triggering callbacks.
     78   void AdjustClock();
     79 
     80   net::CompletionCallback connect_callback_;
     81   net::CompletionCallback read_callback_;
     82   net::CompletionCallback write_callback_;
     83 
     84   cricket::PseudoTcp pseudo_tcp_;
     85   scoped_ptr<net::Socket> socket_;
     86 
     87   scoped_refptr<net::IOBuffer> read_buffer_;
     88   int read_buffer_size_;
     89   scoped_refptr<net::IOBuffer> write_buffer_;
     90   int write_buffer_size_;
     91 
     92   // Whether we need to wait for data to be sent before completing write.
     93   bool write_waits_for_send_;
     94 
     95   // Set to true in the write-waits-for-send mode when we've
     96   // successfully writtend data to the send buffer and waiting for the
     97   // data to be sent to the remote end.
     98   bool waiting_write_position_;
     99 
    100   // Number of the bytes written by the last write stored while we wait
    101   // for the data to be sent (i.e. when waiting_write_position_ = true).
    102   int last_write_result_;
    103 
    104   bool socket_write_pending_;
    105   scoped_refptr<net::IOBuffer> socket_read_buffer_;
    106 
    107   base::OneShotTimer<Core> timer_;
    108 
    109   DISALLOW_COPY_AND_ASSIGN(Core);
    110 };
    111 
    112 
    113 PseudoTcpAdapter::Core::Core(net::Socket* socket)
    114     : pseudo_tcp_(this, 0),
    115       socket_(socket),
    116       write_waits_for_send_(false),
    117       waiting_write_position_(false),
    118       socket_write_pending_(false) {
    119   // Doesn't trigger callbacks.
    120   pseudo_tcp_.NotifyMTU(kDefaultMtu);
    121 }
    122 
    123 PseudoTcpAdapter::Core::~Core() {
    124 }
    125 
    126 int PseudoTcpAdapter::Core::Read(net::IOBuffer* buffer, int buffer_size,
    127                                  const net::CompletionCallback& callback) {
    128   DCHECK(read_callback_.is_null());
    129 
    130   // Reference the Core in case a callback deletes the adapter.
    131   scoped_refptr<Core> core(this);
    132 
    133   int result = pseudo_tcp_.Recv(buffer->data(), buffer_size);
    134   if (result < 0) {
    135     result = net::MapSystemError(pseudo_tcp_.GetError());
    136     DCHECK(result < 0);
    137   }
    138 
    139   if (result == net::ERR_IO_PENDING) {
    140     read_buffer_ = buffer;
    141     read_buffer_size_ = buffer_size;
    142     read_callback_ = callback;
    143   }
    144 
    145   AdjustClock();
    146 
    147   return result;
    148 }
    149 
    150 int PseudoTcpAdapter::Core::Write(net::IOBuffer* buffer, int buffer_size,
    151                                   const net::CompletionCallback& callback) {
    152   DCHECK(write_callback_.is_null());
    153 
    154   // Reference the Core in case a callback deletes the adapter.
    155   scoped_refptr<Core> core(this);
    156 
    157   int result = pseudo_tcp_.Send(buffer->data(), buffer_size);
    158   if (result < 0) {
    159     result = net::MapSystemError(pseudo_tcp_.GetError());
    160     DCHECK(result < 0);
    161   }
    162 
    163   AdjustClock();
    164 
    165   if (result == net::ERR_IO_PENDING) {
    166     write_buffer_ = buffer;
    167     write_buffer_size_ = buffer_size;
    168     write_callback_ = callback;
    169     return result;
    170   }
    171 
    172   if (result < 0)
    173     return result;
    174 
    175   // Need to wait until the data is sent to the peer when
    176   // send-confirmation mode is enabled.
    177   if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
    178     DCHECK(!waiting_write_position_);
    179     waiting_write_position_ = true;
    180     last_write_result_ = result;
    181     write_buffer_ = buffer;
    182     write_buffer_size_ = buffer_size;
    183     write_callback_ = callback;
    184     return net::ERR_IO_PENDING;
    185   }
    186 
    187   return result;
    188 }
    189 
    190 int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback& callback) {
    191   DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN);
    192 
    193   // Reference the Core in case a callback deletes the adapter.
    194   scoped_refptr<Core> core(this);
    195 
    196   // Start the connection attempt.
    197   int result = pseudo_tcp_.Connect();
    198   if (result < 0)
    199     return net::ERR_FAILED;
    200 
    201   AdjustClock();
    202 
    203   connect_callback_ = callback;
    204   DoReadFromSocket();
    205 
    206   return net::ERR_IO_PENDING;
    207 }
    208 
    209 void PseudoTcpAdapter::Core::Disconnect() {
    210   // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket.
    211   read_callback_.Reset();
    212   read_buffer_ = NULL;
    213   write_callback_.Reset();
    214   write_buffer_ = NULL;
    215   connect_callback_.Reset();
    216 
    217   // TODO(wez): Connect should succeed if called after Disconnect, which
    218   // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp
    219   // and create a new one in Connect.
    220   // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other
    221   // effect.  This should be addressed in PseudoTcp, really.
    222   // In the meantime we can fake OnTcpClosed notification and tear down the
    223   // PseudoTcp.
    224   pseudo_tcp_.Close(true);
    225 }
    226 
    227 bool PseudoTcpAdapter::Core::IsConnected() const {
    228   return pseudo_tcp_.State() == PseudoTcp::TCP_ESTABLISHED;
    229 }
    230 
    231 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
    232   DCHECK(tcp == &pseudo_tcp_);
    233 
    234   if (!connect_callback_.is_null()) {
    235     net::CompletionCallback callback = connect_callback_;
    236     connect_callback_.Reset();
    237     callback.Run(net::OK);
    238   }
    239 
    240   OnTcpReadable(tcp);
    241   OnTcpWriteable(tcp);
    242 }
    243 
    244 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) {
    245   DCHECK_EQ(tcp, &pseudo_tcp_);
    246   if (read_callback_.is_null())
    247     return;
    248 
    249   int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_);
    250   if (result < 0) {
    251     result = net::MapSystemError(pseudo_tcp_.GetError());
    252     DCHECK(result < 0);
    253     if (result == net::ERR_IO_PENDING)
    254       return;
    255   }
    256 
    257   AdjustClock();
    258 
    259   net::CompletionCallback callback = read_callback_;
    260   read_callback_.Reset();
    261   read_buffer_ = NULL;
    262   callback.Run(result);
    263 }
    264 
    265 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) {
    266   DCHECK_EQ(tcp, &pseudo_tcp_);
    267   if (write_callback_.is_null())
    268     return;
    269 
    270   if (waiting_write_position_) {
    271     CheckWriteComplete();
    272     return;
    273   }
    274 
    275   int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_);
    276   if (result < 0) {
    277     result = net::MapSystemError(pseudo_tcp_.GetError());
    278     DCHECK(result < 0);
    279     if (result == net::ERR_IO_PENDING)
    280       return;
    281   }
    282 
    283   AdjustClock();
    284 
    285   if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
    286     DCHECK(!waiting_write_position_);
    287     waiting_write_position_ = true;
    288     last_write_result_ = result;
    289     return;
    290   }
    291 
    292   net::CompletionCallback callback = write_callback_;
    293   write_callback_.Reset();
    294   write_buffer_ = NULL;
    295   callback.Run(result);
    296 }
    297 
    298 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32 error) {
    299   DCHECK_EQ(tcp, &pseudo_tcp_);
    300 
    301   if (!connect_callback_.is_null()) {
    302     net::CompletionCallback callback = connect_callback_;
    303     connect_callback_.Reset();
    304     callback.Run(net::MapSystemError(error));
    305   }
    306 
    307   if (!read_callback_.is_null()) {
    308     net::CompletionCallback callback = read_callback_;
    309     read_callback_.Reset();
    310     callback.Run(net::MapSystemError(error));
    311   }
    312 
    313   if (!write_callback_.is_null()) {
    314     net::CompletionCallback callback = write_callback_;
    315     write_callback_.Reset();
    316     callback.Run(net::MapSystemError(error));
    317   }
    318 }
    319 
    320 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms) {
    321   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_ACKDELAY, delay_ms);
    322 }
    323 
    324 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay) {
    325   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_NODELAY, no_delay ? 1 : 0);
    326 }
    327 
    328 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size) {
    329   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_RCVBUF, size);
    330 }
    331 
    332 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) {
    333   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
    334 }
    335 
    336 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
    337   write_waits_for_send_ = write_waits_for_send;
    338 }
    339 
    340 void PseudoTcpAdapter::Core::DeleteSocket() {
    341   socket_.reset();
    342 }
    343 
    344 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
    345     PseudoTcp* tcp,
    346     const char* buffer,
    347     size_t len) {
    348   DCHECK_EQ(tcp, &pseudo_tcp_);
    349 
    350   // If we already have a write pending, we behave like a congested network,
    351   // returning success for the write, but dropping the packet.  PseudoTcp will
    352   // back-off and retransmit, adjusting for the perceived congestion.
    353   if (socket_write_pending_)
    354     return IPseudoTcpNotify::WR_SUCCESS;
    355 
    356   scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len);
    357   memcpy(write_buffer->data(), buffer, len);
    358 
    359   // Our underlying socket is datagram-oriented, which means it should either
    360   // send exactly as many bytes as we requested, or fail.
    361   int result;
    362   if (socket_.get()) {
    363     result = socket_->Write(
    364         write_buffer.get(),
    365         len,
    366         base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this)));
    367   } else {
    368     result = net::ERR_CONNECTION_CLOSED;
    369   }
    370   if (result == net::ERR_IO_PENDING) {
    371     socket_write_pending_ = true;
    372     return IPseudoTcpNotify::WR_SUCCESS;
    373   } else if (result == net::ERR_MSG_TOO_BIG) {
    374     return IPseudoTcpNotify::WR_TOO_LARGE;
    375   } else if (result < 0) {
    376     return IPseudoTcpNotify::WR_FAIL;
    377   } else {
    378     return IPseudoTcpNotify::WR_SUCCESS;
    379   }
    380 }
    381 
    382 void PseudoTcpAdapter::Core::DoReadFromSocket() {
    383   if (!socket_read_buffer_.get())
    384     socket_read_buffer_ = new net::IOBuffer(kReadBufferSize);
    385 
    386   int result = 1;
    387   while (socket_.get() && result > 0) {
    388     result = socket_->Read(
    389         socket_read_buffer_.get(),
    390         kReadBufferSize,
    391         base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this)));
    392     if (result != net::ERR_IO_PENDING)
    393       HandleReadResults(result);
    394   }
    395 }
    396 
    397 void PseudoTcpAdapter::Core::HandleReadResults(int result) {
    398   if (result <= 0) {
    399     LOG(ERROR) << "Read returned " << result;
    400     return;
    401   }
    402 
    403   // TODO(wez): Disconnect on failure of NotifyPacket?
    404   pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result);
    405   AdjustClock();
    406 
    407   CheckWriteComplete();
    408 }
    409 
    410 void PseudoTcpAdapter::Core::OnRead(int result) {
    411   // Reference the Core in case a callback deletes the adapter.
    412   scoped_refptr<Core> core(this);
    413 
    414   HandleReadResults(result);
    415   if (result >= 0)
    416     DoReadFromSocket();
    417 }
    418 
    419 void PseudoTcpAdapter::Core::OnWritten(int result) {
    420   // Reference the Core in case a callback deletes the adapter.
    421   scoped_refptr<Core> core(this);
    422 
    423   socket_write_pending_ = false;
    424   if (result < 0) {
    425     LOG(WARNING) << "Write failed. Error code: " << result;
    426   }
    427 }
    428 
    429 void PseudoTcpAdapter::Core::AdjustClock() {
    430   long timeout = 0;
    431   if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) {
    432     timer_.Stop();
    433     timer_.Start(FROM_HERE,
    434                  base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this,
    435                  &PseudoTcpAdapter::Core::HandleTcpClock);
    436   }
    437 }
    438 
    439 void PseudoTcpAdapter::Core::HandleTcpClock() {
    440   // Reference the Core in case a callback deletes the adapter.
    441   scoped_refptr<Core> core(this);
    442 
    443   pseudo_tcp_.NotifyClock(PseudoTcp::Now());
    444   AdjustClock();
    445 
    446   CheckWriteComplete();
    447 }
    448 
    449 void PseudoTcpAdapter::Core::CheckWriteComplete() {
    450   if (!write_callback_.is_null() && waiting_write_position_) {
    451     if (pseudo_tcp_.GetBytesBufferedNotSent() == 0) {
    452       waiting_write_position_ = false;
    453 
    454       net::CompletionCallback callback = write_callback_;
    455       write_callback_.Reset();
    456       write_buffer_ = NULL;
    457       callback.Run(last_write_result_);
    458     }
    459   }
    460 }
    461 
    462 // Public interface implemention.
    463 
    464 PseudoTcpAdapter::PseudoTcpAdapter(net::Socket* socket)
    465     : core_(new Core(socket)) {
    466 }
    467 
    468 PseudoTcpAdapter::~PseudoTcpAdapter() {
    469   Disconnect();
    470 
    471   // Make sure that the underlying socket is destroyed before PseudoTcp.
    472   core_->DeleteSocket();
    473 }
    474 
    475 int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size,
    476                            const net::CompletionCallback& callback) {
    477   DCHECK(CalledOnValidThread());
    478   return core_->Read(buffer, buffer_size, callback);
    479 }
    480 
    481 int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size,
    482                             const net::CompletionCallback& callback) {
    483   DCHECK(CalledOnValidThread());
    484   return core_->Write(buffer, buffer_size, callback);
    485 }
    486 
    487 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size) {
    488   DCHECK(CalledOnValidThread());
    489 
    490   core_->SetReceiveBufferSize(size);
    491   return net::OK;
    492 }
    493 
    494 int PseudoTcpAdapter::SetSendBufferSize(int32 size) {
    495   DCHECK(CalledOnValidThread());
    496 
    497   core_->SetSendBufferSize(size);
    498   return net::OK;
    499 }
    500 
    501 int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) {
    502   DCHECK(CalledOnValidThread());
    503 
    504   // net::StreamSocket requires that Connect return OK if already connected.
    505   if (IsConnected())
    506     return net::OK;
    507 
    508   return core_->Connect(callback);
    509 }
    510 
    511 void PseudoTcpAdapter::Disconnect() {
    512   DCHECK(CalledOnValidThread());
    513   core_->Disconnect();
    514 }
    515 
    516 bool PseudoTcpAdapter::IsConnected() const {
    517   return core_->IsConnected();
    518 }
    519 
    520 bool PseudoTcpAdapter::IsConnectedAndIdle() const {
    521   DCHECK(CalledOnValidThread());
    522   NOTIMPLEMENTED();
    523   return false;
    524 }
    525 
    526 int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint* address) const {
    527   DCHECK(CalledOnValidThread());
    528 
    529   // We don't have a meaningful peer address, but we can't return an
    530   // error, so we return a INADDR_ANY instead.
    531   net::IPAddressNumber ip_address(net::kIPv4AddressSize);
    532   *address = net::IPEndPoint(ip_address, 0);
    533   return net::OK;
    534 }
    535 
    536 int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const {
    537   DCHECK(CalledOnValidThread());
    538   NOTIMPLEMENTED();
    539   return net::ERR_FAILED;
    540 }
    541 
    542 const net::BoundNetLog& PseudoTcpAdapter::NetLog() const {
    543   DCHECK(CalledOnValidThread());
    544   return net_log_;
    545 }
    546 
    547 void PseudoTcpAdapter::SetSubresourceSpeculation() {
    548   DCHECK(CalledOnValidThread());
    549   NOTIMPLEMENTED();
    550 }
    551 
    552 void PseudoTcpAdapter::SetOmniboxSpeculation() {
    553   DCHECK(CalledOnValidThread());
    554   NOTIMPLEMENTED();
    555 }
    556 
    557 bool PseudoTcpAdapter::WasEverUsed() const {
    558   DCHECK(CalledOnValidThread());
    559   NOTIMPLEMENTED();
    560   return true;
    561 }
    562 
    563 bool PseudoTcpAdapter::UsingTCPFastOpen() const {
    564   DCHECK(CalledOnValidThread());
    565   return false;
    566 }
    567 
    568 bool PseudoTcpAdapter::WasNpnNegotiated() const {
    569   DCHECK(CalledOnValidThread());
    570   return false;
    571 }
    572 
    573 net::NextProto PseudoTcpAdapter::GetNegotiatedProtocol() const {
    574   DCHECK(CalledOnValidThread());
    575   return net::kProtoUnknown;
    576 }
    577 
    578 bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo* ssl_info) {
    579   DCHECK(CalledOnValidThread());
    580   return false;
    581 }
    582 
    583 void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
    584   DCHECK(CalledOnValidThread());
    585   core_->SetAckDelay(delay_ms);
    586 }
    587 
    588 void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
    589   DCHECK(CalledOnValidThread());
    590   core_->SetNoDelay(no_delay);
    591 }
    592 
    593 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
    594   DCHECK(CalledOnValidThread());
    595   core_->SetWriteWaitsForSend(write_waits_for_send);
    596 }
    597 
    598 }  // namespace jingle_glue
    599