Home | History | Annotate | Download | only in glue
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "jingle/glue/channel_socket_adapter.h"
      6 
      7 #include <limits>
      8 
      9 #include "base/callback.h"
     10 #include "base/logging.h"
     11 #include "base/message_loop/message_loop.h"
     12 #include "net/base/io_buffer.h"
     13 #include "net/base/net_errors.h"
     14 #include "third_party/libjingle/source/talk/p2p/base/transportchannel.h"
     15 
     16 namespace jingle_glue {
     17 
     18 TransportChannelSocketAdapter::TransportChannelSocketAdapter(
     19     cricket::TransportChannel* channel)
     20     : message_loop_(base::MessageLoop::current()),
     21       channel_(channel),
     22       closed_error_code_(net::OK) {
     23   DCHECK(channel_);
     24 
     25   channel_->SignalReadPacket.connect(
     26       this, &TransportChannelSocketAdapter::OnNewPacket);
     27   channel_->SignalWritableState.connect(
     28       this, &TransportChannelSocketAdapter::OnWritableState);
     29   channel_->SignalDestroyed.connect(
     30       this, &TransportChannelSocketAdapter::OnChannelDestroyed);
     31 }
     32 
     33 TransportChannelSocketAdapter::~TransportChannelSocketAdapter() {
     34   if (!destruction_callback_.is_null())
     35     destruction_callback_.Run();
     36 }
     37 
     38 void TransportChannelSocketAdapter::SetOnDestroyedCallback(
     39     const base::Closure& callback) {
     40   destruction_callback_ = callback;
     41 }
     42 
     43 int TransportChannelSocketAdapter::Read(
     44     net::IOBuffer* buf,
     45     int buffer_size,
     46     const net::CompletionCallback& callback) {
     47   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
     48   DCHECK(buf);
     49   DCHECK(!callback.is_null());
     50   CHECK(read_callback_.is_null());
     51 
     52   if (!channel_) {
     53     DCHECK(closed_error_code_ != net::OK);
     54     return closed_error_code_;
     55   }
     56 
     57   read_callback_ = callback;
     58   read_buffer_ = buf;
     59   read_buffer_size_ = buffer_size;
     60 
     61   return net::ERR_IO_PENDING;
     62 }
     63 
     64 int TransportChannelSocketAdapter::Write(
     65     net::IOBuffer* buffer,
     66     int buffer_size,
     67     const net::CompletionCallback& callback) {
     68   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
     69   DCHECK(buffer);
     70   DCHECK(!callback.is_null());
     71   CHECK(write_callback_.is_null());
     72 
     73   if (!channel_) {
     74     DCHECK(closed_error_code_ != net::OK);
     75     return closed_error_code_;
     76   }
     77 
     78   int result;
     79   if (channel_->writable()) {
     80     result = channel_->SendPacket(buffer->data(), buffer_size);
     81     if (result < 0) {
     82       result = net::MapSystemError(channel_->GetError());
     83 
     84       // If the underlying socket returns IO pending where it shouldn't we
     85       // pretend the packet is dropped and return as succeeded because no
     86       // writeable callback will happen.
     87       if (result == net::ERR_IO_PENDING)
     88         result = net::OK;
     89     }
     90   } else {
     91     // Channel is not writable yet.
     92     result = net::ERR_IO_PENDING;
     93     write_callback_ = callback;
     94     write_buffer_ = buffer;
     95     write_buffer_size_ = buffer_size;
     96   }
     97 
     98   return result;
     99 }
    100 
    101 bool TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) {
    102   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    103   return channel_->SetOption(talk_base::Socket::OPT_RCVBUF, size) == 0;
    104 }
    105 
    106 bool TransportChannelSocketAdapter::SetSendBufferSize(int32 size) {
    107   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    108   return channel_->SetOption(talk_base::Socket::OPT_SNDBUF, size) == 0;
    109 }
    110 
    111 void TransportChannelSocketAdapter::Close(int error_code) {
    112   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    113 
    114   if (!channel_)  // Already closed.
    115     return;
    116 
    117   DCHECK(error_code != net::OK);
    118   closed_error_code_ = error_code;
    119   channel_->SignalReadPacket.disconnect(this);
    120   channel_->SignalDestroyed.disconnect(this);
    121   channel_ = NULL;
    122 
    123   if (!read_callback_.is_null()) {
    124     net::CompletionCallback callback = read_callback_;
    125     read_callback_.Reset();
    126     read_buffer_ = NULL;
    127     callback.Run(error_code);
    128   }
    129 
    130   if (!write_callback_.is_null()) {
    131     net::CompletionCallback callback = write_callback_;
    132     write_callback_.Reset();
    133     write_buffer_ = NULL;
    134     callback.Run(error_code);
    135   }
    136 }
    137 
    138 void TransportChannelSocketAdapter::OnNewPacket(
    139     cricket::TransportChannel* channel,
    140     const char* data,
    141     size_t data_size,
    142     int flags) {
    143   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    144   DCHECK_EQ(channel, channel_);
    145   if (!read_callback_.is_null()) {
    146     DCHECK(read_buffer_.get());
    147     CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max()));
    148 
    149     if (read_buffer_size_ < static_cast<int>(data_size)) {
    150       LOG(WARNING) << "Data buffer is smaller than the received packet. "
    151                    << "Dropping the data that doesn't fit.";
    152       data_size = read_buffer_size_;
    153     }
    154 
    155     memcpy(read_buffer_->data(), data, data_size);
    156 
    157     net::CompletionCallback callback = read_callback_;
    158     read_callback_.Reset();
    159     read_buffer_ = NULL;
    160 
    161     callback.Run(data_size);
    162   } else {
    163     LOG(WARNING)
    164         << "Data was received without a callback. Dropping the packet.";
    165   }
    166 }
    167 
    168 void TransportChannelSocketAdapter::OnWritableState(
    169     cricket::TransportChannel* channel) {
    170   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    171   // Try to send the packet if there is a pending write.
    172   if (!write_callback_.is_null()) {
    173     int result = channel_->SendPacket(write_buffer_->data(),
    174                                       write_buffer_size_);
    175     if (result < 0)
    176       result = net::MapSystemError(channel_->GetError());
    177 
    178     if (result != net::ERR_IO_PENDING) {
    179       net::CompletionCallback callback = write_callback_;
    180       write_callback_.Reset();
    181       write_buffer_ = NULL;
    182       callback.Run(result);
    183     }
    184   }
    185 }
    186 
    187 void TransportChannelSocketAdapter::OnChannelDestroyed(
    188     cricket::TransportChannel* channel) {
    189   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    190   DCHECK_EQ(channel, channel_);
    191   Close(net::ERR_CONNECTION_ABORTED);
    192 }
    193 
    194 }  // namespace jingle_glue
    195