1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "jingle/glue/channel_socket_adapter.h" 6 7 #include <limits> 8 9 #include "base/callback.h" 10 #include "base/logging.h" 11 #include "base/message_loop/message_loop.h" 12 #include "net/base/io_buffer.h" 13 #include "net/base/net_errors.h" 14 #include "third_party/libjingle/source/talk/p2p/base/transportchannel.h" 15 16 namespace jingle_glue { 17 18 TransportChannelSocketAdapter::TransportChannelSocketAdapter( 19 cricket::TransportChannel* channel) 20 : message_loop_(base::MessageLoop::current()), 21 channel_(channel), 22 closed_error_code_(net::OK) { 23 DCHECK(channel_); 24 25 channel_->SignalReadPacket.connect( 26 this, &TransportChannelSocketAdapter::OnNewPacket); 27 channel_->SignalWritableState.connect( 28 this, &TransportChannelSocketAdapter::OnWritableState); 29 channel_->SignalDestroyed.connect( 30 this, &TransportChannelSocketAdapter::OnChannelDestroyed); 31 } 32 33 TransportChannelSocketAdapter::~TransportChannelSocketAdapter() { 34 if (!destruction_callback_.is_null()) 35 destruction_callback_.Run(); 36 } 37 38 void TransportChannelSocketAdapter::SetOnDestroyedCallback( 39 const base::Closure& callback) { 40 destruction_callback_ = callback; 41 } 42 43 int TransportChannelSocketAdapter::Read( 44 net::IOBuffer* buf, 45 int buffer_size, 46 const net::CompletionCallback& callback) { 47 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 48 DCHECK(buf); 49 DCHECK(!callback.is_null()); 50 CHECK(read_callback_.is_null()); 51 52 if (!channel_) { 53 DCHECK(closed_error_code_ != net::OK); 54 return closed_error_code_; 55 } 56 57 read_callback_ = callback; 58 read_buffer_ = buf; 59 read_buffer_size_ = buffer_size; 60 61 return net::ERR_IO_PENDING; 62 } 63 64 int TransportChannelSocketAdapter::Write( 65 net::IOBuffer* buffer, 66 int buffer_size, 67 const net::CompletionCallback& callback) { 68 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 69 DCHECK(buffer); 70 DCHECK(!callback.is_null()); 71 CHECK(write_callback_.is_null()); 72 73 if (!channel_) { 74 DCHECK(closed_error_code_ != net::OK); 75 return closed_error_code_; 76 } 77 78 int result; 79 if (channel_->writable()) { 80 result = channel_->SendPacket(buffer->data(), buffer_size); 81 if (result < 0) { 82 result = net::MapSystemError(channel_->GetError()); 83 84 // If the underlying socket returns IO pending where it shouldn't we 85 // pretend the packet is dropped and return as succeeded because no 86 // writeable callback will happen. 87 if (result == net::ERR_IO_PENDING) 88 result = net::OK; 89 } 90 } else { 91 // Channel is not writable yet. 92 result = net::ERR_IO_PENDING; 93 write_callback_ = callback; 94 write_buffer_ = buffer; 95 write_buffer_size_ = buffer_size; 96 } 97 98 return result; 99 } 100 101 bool TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) { 102 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 103 return channel_->SetOption(talk_base::Socket::OPT_RCVBUF, size) == 0; 104 } 105 106 bool TransportChannelSocketAdapter::SetSendBufferSize(int32 size) { 107 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 108 return channel_->SetOption(talk_base::Socket::OPT_SNDBUF, size) == 0; 109 } 110 111 void TransportChannelSocketAdapter::Close(int error_code) { 112 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 113 114 if (!channel_) // Already closed. 115 return; 116 117 DCHECK(error_code != net::OK); 118 closed_error_code_ = error_code; 119 channel_->SignalReadPacket.disconnect(this); 120 channel_->SignalDestroyed.disconnect(this); 121 channel_ = NULL; 122 123 if (!read_callback_.is_null()) { 124 net::CompletionCallback callback = read_callback_; 125 read_callback_.Reset(); 126 read_buffer_ = NULL; 127 callback.Run(error_code); 128 } 129 130 if (!write_callback_.is_null()) { 131 net::CompletionCallback callback = write_callback_; 132 write_callback_.Reset(); 133 write_buffer_ = NULL; 134 callback.Run(error_code); 135 } 136 } 137 138 void TransportChannelSocketAdapter::OnNewPacket( 139 cricket::TransportChannel* channel, 140 const char* data, 141 size_t data_size, 142 int flags) { 143 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 144 DCHECK_EQ(channel, channel_); 145 if (!read_callback_.is_null()) { 146 DCHECK(read_buffer_.get()); 147 CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max())); 148 149 if (read_buffer_size_ < static_cast<int>(data_size)) { 150 LOG(WARNING) << "Data buffer is smaller than the received packet. " 151 << "Dropping the data that doesn't fit."; 152 data_size = read_buffer_size_; 153 } 154 155 memcpy(read_buffer_->data(), data, data_size); 156 157 net::CompletionCallback callback = read_callback_; 158 read_callback_.Reset(); 159 read_buffer_ = NULL; 160 161 callback.Run(data_size); 162 } else { 163 LOG(WARNING) 164 << "Data was received without a callback. Dropping the packet."; 165 } 166 } 167 168 void TransportChannelSocketAdapter::OnWritableState( 169 cricket::TransportChannel* channel) { 170 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 171 // Try to send the packet if there is a pending write. 172 if (!write_callback_.is_null()) { 173 int result = channel_->SendPacket(write_buffer_->data(), 174 write_buffer_size_); 175 if (result < 0) 176 result = net::MapSystemError(channel_->GetError()); 177 178 if (result != net::ERR_IO_PENDING) { 179 net::CompletionCallback callback = write_callback_; 180 write_callback_.Reset(); 181 write_buffer_ = NULL; 182 callback.Run(result); 183 } 184 } 185 } 186 187 void TransportChannelSocketAdapter::OnChannelDestroyed( 188 cricket::TransportChannel* channel) { 189 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 190 DCHECK_EQ(channel, channel_); 191 Close(net::ERR_CONNECTION_ABORTED); 192 } 193 194 } // namespace jingle_glue 195