Home | History | Annotate | Download | only in glue
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "jingle/glue/channel_socket_adapter.h"
      6 
      7 #include <limits>
      8 
      9 #include "base/callback.h"
     10 #include "base/logging.h"
     11 #include "base/message_loop/message_loop.h"
     12 #include "net/base/io_buffer.h"
     13 #include "net/base/net_errors.h"
     14 #include "third_party/libjingle/source/talk/p2p/base/transportchannel.h"
     15 
     16 namespace jingle_glue {
     17 
     18 TransportChannelSocketAdapter::TransportChannelSocketAdapter(
     19     cricket::TransportChannel* channel)
     20     : message_loop_(base::MessageLoop::current()),
     21       channel_(channel),
     22       closed_error_code_(net::OK) {
     23   DCHECK(channel_);
     24 
     25   channel_->SignalReadPacket.connect(
     26       this, &TransportChannelSocketAdapter::OnNewPacket);
     27   channel_->SignalWritableState.connect(
     28       this, &TransportChannelSocketAdapter::OnWritableState);
     29   channel_->SignalDestroyed.connect(
     30       this, &TransportChannelSocketAdapter::OnChannelDestroyed);
     31 }
     32 
     33 TransportChannelSocketAdapter::~TransportChannelSocketAdapter() {
     34   if (!destruction_callback_.is_null())
     35     destruction_callback_.Run();
     36 }
     37 
     38 void TransportChannelSocketAdapter::SetOnDestroyedCallback(
     39     const base::Closure& callback) {
     40   destruction_callback_ = callback;
     41 }
     42 
     43 int TransportChannelSocketAdapter::Read(
     44     net::IOBuffer* buf,
     45     int buffer_size,
     46     const net::CompletionCallback& callback) {
     47   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
     48   DCHECK(buf);
     49   DCHECK(!callback.is_null());
     50   CHECK(read_callback_.is_null());
     51 
     52   if (!channel_) {
     53     DCHECK(closed_error_code_ != net::OK);
     54     return closed_error_code_;
     55   }
     56 
     57   read_callback_ = callback;
     58   read_buffer_ = buf;
     59   read_buffer_size_ = buffer_size;
     60 
     61   return net::ERR_IO_PENDING;
     62 }
     63 
     64 int TransportChannelSocketAdapter::Write(
     65     net::IOBuffer* buffer,
     66     int buffer_size,
     67     const net::CompletionCallback& callback) {
     68   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
     69   DCHECK(buffer);
     70   DCHECK(!callback.is_null());
     71   CHECK(write_callback_.is_null());
     72 
     73   if (!channel_) {
     74     DCHECK(closed_error_code_ != net::OK);
     75     return closed_error_code_;
     76   }
     77 
     78   int result;
     79   if (channel_->writable()) {
     80     result = channel_->SendPacket(buffer->data(), buffer_size,
     81                                   talk_base::DSCP_NO_CHANGE);
     82     if (result < 0) {
     83       result = net::MapSystemError(channel_->GetError());
     84 
     85       // If the underlying socket returns IO pending where it shouldn't we
     86       // pretend the packet is dropped and return as succeeded because no
     87       // writeable callback will happen.
     88       if (result == net::ERR_IO_PENDING)
     89         result = net::OK;
     90     }
     91   } else {
     92     // Channel is not writable yet.
     93     result = net::ERR_IO_PENDING;
     94     write_callback_ = callback;
     95     write_buffer_ = buffer;
     96     write_buffer_size_ = buffer_size;
     97   }
     98 
     99   return result;
    100 }
    101 
    102 bool TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) {
    103   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    104   return channel_->SetOption(talk_base::Socket::OPT_RCVBUF, size) == 0;
    105 }
    106 
    107 bool TransportChannelSocketAdapter::SetSendBufferSize(int32 size) {
    108   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    109   return channel_->SetOption(talk_base::Socket::OPT_SNDBUF, size) == 0;
    110 }
    111 
    112 void TransportChannelSocketAdapter::Close(int error_code) {
    113   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    114 
    115   if (!channel_)  // Already closed.
    116     return;
    117 
    118   DCHECK(error_code != net::OK);
    119   closed_error_code_ = error_code;
    120   channel_->SignalReadPacket.disconnect(this);
    121   channel_->SignalDestroyed.disconnect(this);
    122   channel_ = NULL;
    123 
    124   if (!read_callback_.is_null()) {
    125     net::CompletionCallback callback = read_callback_;
    126     read_callback_.Reset();
    127     read_buffer_ = NULL;
    128     callback.Run(error_code);
    129   }
    130 
    131   if (!write_callback_.is_null()) {
    132     net::CompletionCallback callback = write_callback_;
    133     write_callback_.Reset();
    134     write_buffer_ = NULL;
    135     callback.Run(error_code);
    136   }
    137 }
    138 
    139 void TransportChannelSocketAdapter::OnNewPacket(
    140     cricket::TransportChannel* channel,
    141     const char* data,
    142     size_t data_size,
    143     const talk_base::PacketTime& packet_time,
    144     int flags) {
    145   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    146   DCHECK_EQ(channel, channel_);
    147   if (!read_callback_.is_null()) {
    148     DCHECK(read_buffer_.get());
    149     CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max()));
    150 
    151     if (read_buffer_size_ < static_cast<int>(data_size)) {
    152       LOG(WARNING) << "Data buffer is smaller than the received packet. "
    153                    << "Dropping the data that doesn't fit.";
    154       data_size = read_buffer_size_;
    155     }
    156 
    157     memcpy(read_buffer_->data(), data, data_size);
    158 
    159     net::CompletionCallback callback = read_callback_;
    160     read_callback_.Reset();
    161     read_buffer_ = NULL;
    162 
    163     callback.Run(data_size);
    164   } else {
    165     LOG(WARNING)
    166         << "Data was received without a callback. Dropping the packet.";
    167   }
    168 }
    169 
    170 void TransportChannelSocketAdapter::OnWritableState(
    171     cricket::TransportChannel* channel) {
    172   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    173   // Try to send the packet if there is a pending write.
    174   if (!write_callback_.is_null()) {
    175     int result = channel_->SendPacket(write_buffer_->data(),
    176                                       write_buffer_size_,
    177                                       talk_base::DSCP_NO_CHANGE);
    178     if (result < 0)
    179       result = net::MapSystemError(channel_->GetError());
    180 
    181     if (result != net::ERR_IO_PENDING) {
    182       net::CompletionCallback callback = write_callback_;
    183       write_callback_.Reset();
    184       write_buffer_ = NULL;
    185       callback.Run(result);
    186     }
    187   }
    188 }
    189 
    190 void TransportChannelSocketAdapter::OnChannelDestroyed(
    191     cricket::TransportChannel* channel) {
    192   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    193   DCHECK_EQ(channel, channel_);
    194   Close(net::ERR_CONNECTION_ABORTED);
    195 }
    196 
    197 }  // namespace jingle_glue
    198