1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "jingle/glue/pseudotcp_adapter.h" 6 7 #include "base/compiler_specific.h" 8 #include "base/logging.h" 9 #include "base/time/time.h" 10 #include "net/base/address_list.h" 11 #include "net/base/completion_callback.h" 12 #include "net/base/io_buffer.h" 13 #include "net/base/net_errors.h" 14 #include "net/base/net_util.h" 15 16 using cricket::PseudoTcp; 17 18 namespace { 19 const int kReadBufferSize = 65536; // Maximum size of a packet. 20 const uint16 kDefaultMtu = 1280; 21 } // namespace 22 23 namespace jingle_glue { 24 25 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify, 26 public base::RefCounted<Core> { 27 public: 28 Core(net::Socket* socket); 29 30 // Functions used to implement net::StreamSocket. 31 int Read(net::IOBuffer* buffer, int buffer_size, 32 const net::CompletionCallback& callback); 33 int Write(net::IOBuffer* buffer, int buffer_size, 34 const net::CompletionCallback& callback); 35 int Connect(const net::CompletionCallback& callback); 36 void Disconnect(); 37 bool IsConnected() const; 38 39 // cricket::IPseudoTcpNotify interface. 40 // These notifications are triggered from NotifyPacket. 41 virtual void OnTcpOpen(cricket::PseudoTcp* tcp) OVERRIDE; 42 virtual void OnTcpReadable(cricket::PseudoTcp* tcp) OVERRIDE; 43 virtual void OnTcpWriteable(cricket::PseudoTcp* tcp) OVERRIDE; 44 // This is triggered by NotifyClock or NotifyPacket. 45 virtual void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) OVERRIDE; 46 // This is triggered by NotifyClock, NotifyPacket, Recv and Send. 47 virtual WriteResult TcpWritePacket(cricket::PseudoTcp* tcp, 48 const char* buffer, size_t len) OVERRIDE; 49 50 void SetAckDelay(int delay_ms); 51 void SetNoDelay(bool no_delay); 52 void SetReceiveBufferSize(int32 size); 53 void SetSendBufferSize(int32 size); 54 void SetWriteWaitsForSend(bool write_waits_for_send); 55 56 void DeleteSocket(); 57 58 private: 59 friend class base::RefCounted<Core>; 60 virtual ~Core(); 61 62 // These are invoked by the underlying Socket, and may trigger callbacks. 63 // They hold a reference to |this| while running, to protect from deletion. 64 void OnRead(int result); 65 void OnWritten(int result); 66 67 // These may trigger callbacks, so the holder must hold a reference on 68 // the stack while calling them. 69 void DoReadFromSocket(); 70 void HandleReadResults(int result); 71 void HandleTcpClock(); 72 73 // Checks if current write has completed in the write-waits-for-send 74 // mode. 75 void CheckWriteComplete(); 76 77 // This re-sets |timer| without triggering callbacks. 78 void AdjustClock(); 79 80 net::CompletionCallback connect_callback_; 81 net::CompletionCallback read_callback_; 82 net::CompletionCallback write_callback_; 83 84 cricket::PseudoTcp pseudo_tcp_; 85 scoped_ptr<net::Socket> socket_; 86 87 scoped_refptr<net::IOBuffer> read_buffer_; 88 int read_buffer_size_; 89 scoped_refptr<net::IOBuffer> write_buffer_; 90 int write_buffer_size_; 91 92 // Whether we need to wait for data to be sent before completing write. 93 bool write_waits_for_send_; 94 95 // Set to true in the write-waits-for-send mode when we've 96 // successfully writtend data to the send buffer and waiting for the 97 // data to be sent to the remote end. 98 bool waiting_write_position_; 99 100 // Number of the bytes written by the last write stored while we wait 101 // for the data to be sent (i.e. when waiting_write_position_ = true). 102 int last_write_result_; 103 104 bool socket_write_pending_; 105 scoped_refptr<net::IOBuffer> socket_read_buffer_; 106 107 base::OneShotTimer<Core> timer_; 108 109 DISALLOW_COPY_AND_ASSIGN(Core); 110 }; 111 112 113 PseudoTcpAdapter::Core::Core(net::Socket* socket) 114 : pseudo_tcp_(this, 0), 115 socket_(socket), 116 write_waits_for_send_(false), 117 waiting_write_position_(false), 118 socket_write_pending_(false) { 119 // Doesn't trigger callbacks. 120 pseudo_tcp_.NotifyMTU(kDefaultMtu); 121 } 122 123 PseudoTcpAdapter::Core::~Core() { 124 } 125 126 int PseudoTcpAdapter::Core::Read(net::IOBuffer* buffer, int buffer_size, 127 const net::CompletionCallback& callback) { 128 DCHECK(read_callback_.is_null()); 129 130 // Reference the Core in case a callback deletes the adapter. 131 scoped_refptr<Core> core(this); 132 133 int result = pseudo_tcp_.Recv(buffer->data(), buffer_size); 134 if (result < 0) { 135 result = net::MapSystemError(pseudo_tcp_.GetError()); 136 DCHECK(result < 0); 137 } 138 139 if (result == net::ERR_IO_PENDING) { 140 read_buffer_ = buffer; 141 read_buffer_size_ = buffer_size; 142 read_callback_ = callback; 143 } 144 145 AdjustClock(); 146 147 return result; 148 } 149 150 int PseudoTcpAdapter::Core::Write(net::IOBuffer* buffer, int buffer_size, 151 const net::CompletionCallback& callback) { 152 DCHECK(write_callback_.is_null()); 153 154 // Reference the Core in case a callback deletes the adapter. 155 scoped_refptr<Core> core(this); 156 157 int result = pseudo_tcp_.Send(buffer->data(), buffer_size); 158 if (result < 0) { 159 result = net::MapSystemError(pseudo_tcp_.GetError()); 160 DCHECK(result < 0); 161 } 162 163 AdjustClock(); 164 165 if (result == net::ERR_IO_PENDING) { 166 write_buffer_ = buffer; 167 write_buffer_size_ = buffer_size; 168 write_callback_ = callback; 169 return result; 170 } 171 172 if (result < 0) 173 return result; 174 175 // Need to wait until the data is sent to the peer when 176 // send-confirmation mode is enabled. 177 if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) { 178 DCHECK(!waiting_write_position_); 179 waiting_write_position_ = true; 180 last_write_result_ = result; 181 write_buffer_ = buffer; 182 write_buffer_size_ = buffer_size; 183 write_callback_ = callback; 184 return net::ERR_IO_PENDING; 185 } 186 187 return result; 188 } 189 190 int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback& callback) { 191 DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN); 192 193 // Reference the Core in case a callback deletes the adapter. 194 scoped_refptr<Core> core(this); 195 196 // Start the connection attempt. 197 int result = pseudo_tcp_.Connect(); 198 if (result < 0) 199 return net::ERR_FAILED; 200 201 AdjustClock(); 202 203 connect_callback_ = callback; 204 DoReadFromSocket(); 205 206 return net::ERR_IO_PENDING; 207 } 208 209 void PseudoTcpAdapter::Core::Disconnect() { 210 // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket. 211 read_callback_.Reset(); 212 read_buffer_ = NULL; 213 write_callback_.Reset(); 214 write_buffer_ = NULL; 215 connect_callback_.Reset(); 216 217 // TODO(wez): Connect should succeed if called after Disconnect, which 218 // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp 219 // and create a new one in Connect. 220 // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other 221 // effect. This should be addressed in PseudoTcp, really. 222 // In the meantime we can fake OnTcpClosed notification and tear down the 223 // PseudoTcp. 224 pseudo_tcp_.Close(true); 225 } 226 227 bool PseudoTcpAdapter::Core::IsConnected() const { 228 return pseudo_tcp_.State() == PseudoTcp::TCP_ESTABLISHED; 229 } 230 231 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) { 232 DCHECK(tcp == &pseudo_tcp_); 233 234 if (!connect_callback_.is_null()) { 235 net::CompletionCallback callback = connect_callback_; 236 connect_callback_.Reset(); 237 callback.Run(net::OK); 238 } 239 240 OnTcpReadable(tcp); 241 OnTcpWriteable(tcp); 242 } 243 244 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) { 245 DCHECK_EQ(tcp, &pseudo_tcp_); 246 if (read_callback_.is_null()) 247 return; 248 249 int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_); 250 if (result < 0) { 251 result = net::MapSystemError(pseudo_tcp_.GetError()); 252 DCHECK(result < 0); 253 if (result == net::ERR_IO_PENDING) 254 return; 255 } 256 257 AdjustClock(); 258 259 net::CompletionCallback callback = read_callback_; 260 read_callback_.Reset(); 261 read_buffer_ = NULL; 262 callback.Run(result); 263 } 264 265 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) { 266 DCHECK_EQ(tcp, &pseudo_tcp_); 267 if (write_callback_.is_null()) 268 return; 269 270 if (waiting_write_position_) { 271 CheckWriteComplete(); 272 return; 273 } 274 275 int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_); 276 if (result < 0) { 277 result = net::MapSystemError(pseudo_tcp_.GetError()); 278 DCHECK(result < 0); 279 if (result == net::ERR_IO_PENDING) 280 return; 281 } 282 283 AdjustClock(); 284 285 if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) { 286 DCHECK(!waiting_write_position_); 287 waiting_write_position_ = true; 288 last_write_result_ = result; 289 return; 290 } 291 292 net::CompletionCallback callback = write_callback_; 293 write_callback_.Reset(); 294 write_buffer_ = NULL; 295 callback.Run(result); 296 } 297 298 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32 error) { 299 DCHECK_EQ(tcp, &pseudo_tcp_); 300 301 if (!connect_callback_.is_null()) { 302 net::CompletionCallback callback = connect_callback_; 303 connect_callback_.Reset(); 304 callback.Run(net::MapSystemError(error)); 305 } 306 307 if (!read_callback_.is_null()) { 308 net::CompletionCallback callback = read_callback_; 309 read_callback_.Reset(); 310 callback.Run(net::MapSystemError(error)); 311 } 312 313 if (!write_callback_.is_null()) { 314 net::CompletionCallback callback = write_callback_; 315 write_callback_.Reset(); 316 callback.Run(net::MapSystemError(error)); 317 } 318 } 319 320 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms) { 321 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_ACKDELAY, delay_ms); 322 } 323 324 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay) { 325 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_NODELAY, no_delay ? 1 : 0); 326 } 327 328 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size) { 329 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_RCVBUF, size); 330 } 331 332 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) { 333 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size); 334 } 335 336 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) { 337 write_waits_for_send_ = write_waits_for_send; 338 } 339 340 void PseudoTcpAdapter::Core::DeleteSocket() { 341 socket_.reset(); 342 } 343 344 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket( 345 PseudoTcp* tcp, 346 const char* buffer, 347 size_t len) { 348 DCHECK_EQ(tcp, &pseudo_tcp_); 349 350 // If we already have a write pending, we behave like a congested network, 351 // returning success for the write, but dropping the packet. PseudoTcp will 352 // back-off and retransmit, adjusting for the perceived congestion. 353 if (socket_write_pending_) 354 return IPseudoTcpNotify::WR_SUCCESS; 355 356 scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len); 357 memcpy(write_buffer->data(), buffer, len); 358 359 // Our underlying socket is datagram-oriented, which means it should either 360 // send exactly as many bytes as we requested, or fail. 361 int result; 362 if (socket_.get()) { 363 result = socket_->Write( 364 write_buffer.get(), 365 len, 366 base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this))); 367 } else { 368 result = net::ERR_CONNECTION_CLOSED; 369 } 370 if (result == net::ERR_IO_PENDING) { 371 socket_write_pending_ = true; 372 return IPseudoTcpNotify::WR_SUCCESS; 373 } else if (result == net::ERR_MSG_TOO_BIG) { 374 return IPseudoTcpNotify::WR_TOO_LARGE; 375 } else if (result < 0) { 376 return IPseudoTcpNotify::WR_FAIL; 377 } else { 378 return IPseudoTcpNotify::WR_SUCCESS; 379 } 380 } 381 382 void PseudoTcpAdapter::Core::DoReadFromSocket() { 383 if (!socket_read_buffer_.get()) 384 socket_read_buffer_ = new net::IOBuffer(kReadBufferSize); 385 386 int result = 1; 387 while (socket_.get() && result > 0) { 388 result = socket_->Read( 389 socket_read_buffer_.get(), 390 kReadBufferSize, 391 base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this))); 392 if (result != net::ERR_IO_PENDING) 393 HandleReadResults(result); 394 } 395 } 396 397 void PseudoTcpAdapter::Core::HandleReadResults(int result) { 398 if (result <= 0) { 399 LOG(ERROR) << "Read returned " << result; 400 return; 401 } 402 403 // TODO(wez): Disconnect on failure of NotifyPacket? 404 pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result); 405 AdjustClock(); 406 407 CheckWriteComplete(); 408 } 409 410 void PseudoTcpAdapter::Core::OnRead(int result) { 411 // Reference the Core in case a callback deletes the adapter. 412 scoped_refptr<Core> core(this); 413 414 HandleReadResults(result); 415 if (result >= 0) 416 DoReadFromSocket(); 417 } 418 419 void PseudoTcpAdapter::Core::OnWritten(int result) { 420 // Reference the Core in case a callback deletes the adapter. 421 scoped_refptr<Core> core(this); 422 423 socket_write_pending_ = false; 424 if (result < 0) { 425 LOG(WARNING) << "Write failed. Error code: " << result; 426 } 427 } 428 429 void PseudoTcpAdapter::Core::AdjustClock() { 430 long timeout = 0; 431 if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) { 432 timer_.Stop(); 433 timer_.Start(FROM_HERE, 434 base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this, 435 &PseudoTcpAdapter::Core::HandleTcpClock); 436 } 437 } 438 439 void PseudoTcpAdapter::Core::HandleTcpClock() { 440 // Reference the Core in case a callback deletes the adapter. 441 scoped_refptr<Core> core(this); 442 443 pseudo_tcp_.NotifyClock(PseudoTcp::Now()); 444 AdjustClock(); 445 446 CheckWriteComplete(); 447 } 448 449 void PseudoTcpAdapter::Core::CheckWriteComplete() { 450 if (!write_callback_.is_null() && waiting_write_position_) { 451 if (pseudo_tcp_.GetBytesBufferedNotSent() == 0) { 452 waiting_write_position_ = false; 453 454 net::CompletionCallback callback = write_callback_; 455 write_callback_.Reset(); 456 write_buffer_ = NULL; 457 callback.Run(last_write_result_); 458 } 459 } 460 } 461 462 // Public interface implemention. 463 464 PseudoTcpAdapter::PseudoTcpAdapter(net::Socket* socket) 465 : core_(new Core(socket)) { 466 } 467 468 PseudoTcpAdapter::~PseudoTcpAdapter() { 469 Disconnect(); 470 471 // Make sure that the underlying socket is destroyed before PseudoTcp. 472 core_->DeleteSocket(); 473 } 474 475 int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size, 476 const net::CompletionCallback& callback) { 477 DCHECK(CalledOnValidThread()); 478 return core_->Read(buffer, buffer_size, callback); 479 } 480 481 int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size, 482 const net::CompletionCallback& callback) { 483 DCHECK(CalledOnValidThread()); 484 return core_->Write(buffer, buffer_size, callback); 485 } 486 487 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size) { 488 DCHECK(CalledOnValidThread()); 489 490 core_->SetReceiveBufferSize(size); 491 return net::OK; 492 } 493 494 int PseudoTcpAdapter::SetSendBufferSize(int32 size) { 495 DCHECK(CalledOnValidThread()); 496 497 core_->SetSendBufferSize(size); 498 return net::OK; 499 } 500 501 int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) { 502 DCHECK(CalledOnValidThread()); 503 504 // net::StreamSocket requires that Connect return OK if already connected. 505 if (IsConnected()) 506 return net::OK; 507 508 return core_->Connect(callback); 509 } 510 511 void PseudoTcpAdapter::Disconnect() { 512 DCHECK(CalledOnValidThread()); 513 core_->Disconnect(); 514 } 515 516 bool PseudoTcpAdapter::IsConnected() const { 517 return core_->IsConnected(); 518 } 519 520 bool PseudoTcpAdapter::IsConnectedAndIdle() const { 521 DCHECK(CalledOnValidThread()); 522 NOTIMPLEMENTED(); 523 return false; 524 } 525 526 int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint* address) const { 527 DCHECK(CalledOnValidThread()); 528 529 // We don't have a meaningful peer address, but we can't return an 530 // error, so we return a INADDR_ANY instead. 531 net::IPAddressNumber ip_address(net::kIPv4AddressSize); 532 *address = net::IPEndPoint(ip_address, 0); 533 return net::OK; 534 } 535 536 int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const { 537 DCHECK(CalledOnValidThread()); 538 NOTIMPLEMENTED(); 539 return net::ERR_FAILED; 540 } 541 542 const net::BoundNetLog& PseudoTcpAdapter::NetLog() const { 543 DCHECK(CalledOnValidThread()); 544 return net_log_; 545 } 546 547 void PseudoTcpAdapter::SetSubresourceSpeculation() { 548 DCHECK(CalledOnValidThread()); 549 NOTIMPLEMENTED(); 550 } 551 552 void PseudoTcpAdapter::SetOmniboxSpeculation() { 553 DCHECK(CalledOnValidThread()); 554 NOTIMPLEMENTED(); 555 } 556 557 bool PseudoTcpAdapter::WasEverUsed() const { 558 DCHECK(CalledOnValidThread()); 559 NOTIMPLEMENTED(); 560 return true; 561 } 562 563 bool PseudoTcpAdapter::UsingTCPFastOpen() const { 564 DCHECK(CalledOnValidThread()); 565 return false; 566 } 567 568 bool PseudoTcpAdapter::WasNpnNegotiated() const { 569 DCHECK(CalledOnValidThread()); 570 return false; 571 } 572 573 net::NextProto PseudoTcpAdapter::GetNegotiatedProtocol() const { 574 DCHECK(CalledOnValidThread()); 575 return net::kProtoUnknown; 576 } 577 578 bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo* ssl_info) { 579 DCHECK(CalledOnValidThread()); 580 return false; 581 } 582 583 void PseudoTcpAdapter::SetAckDelay(int delay_ms) { 584 DCHECK(CalledOnValidThread()); 585 core_->SetAckDelay(delay_ms); 586 } 587 588 void PseudoTcpAdapter::SetNoDelay(bool no_delay) { 589 DCHECK(CalledOnValidThread()); 590 core_->SetNoDelay(no_delay); 591 } 592 593 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) { 594 DCHECK(CalledOnValidThread()); 595 core_->SetWriteWaitsForSend(write_waits_for_send); 596 } 597 598 } // namespace jingle_glue 599