1 /* 2 * libjingle 3 * Copyright 2004--2006, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include <string> 29 #include "talk/base/basictypes.h" 30 #include "talk/base/common.h" 31 #include "talk/base/logging.h" 32 #include "talk/base/scoped_ptr.h" 33 #include "talk/base/stringutils.h" 34 #include "talk/p2p/base/candidate.h" 35 #include "talk/p2p/base/transportchannel.h" 36 #include "pseudotcpchannel.h" 37 38 using namespace talk_base; 39 40 namespace cricket { 41 42 extern const talk_base::ConstantLabel SESSION_STATES[]; 43 44 // MSG_WK_* - worker thread messages 45 // MSG_ST_* - stream thread messages 46 // MSG_SI_* - signal thread messages 47 48 enum { 49 MSG_WK_CLOCK = 1, 50 MSG_WK_PURGE, 51 MSG_ST_EVENT, 52 MSG_SI_DESTROYCHANNEL, 53 MSG_SI_DESTROY, 54 }; 55 56 struct EventData : public MessageData { 57 int event, error; 58 EventData(int ev, int err = 0) : event(ev), error(err) { } 59 }; 60 61 /////////////////////////////////////////////////////////////////////////////// 62 // PseudoTcpChannel::InternalStream 63 /////////////////////////////////////////////////////////////////////////////// 64 65 class PseudoTcpChannel::InternalStream : public StreamInterface { 66 public: 67 InternalStream(PseudoTcpChannel* parent); 68 virtual ~InternalStream(); 69 70 virtual StreamState GetState() const; 71 virtual StreamResult Read(void* buffer, size_t buffer_len, 72 size_t* read, int* error); 73 virtual StreamResult Write(const void* data, size_t data_len, 74 size_t* written, int* error); 75 virtual void Close(); 76 77 private: 78 // parent_ is accessed and modified exclusively on the event thread, to 79 // avoid thread contention. This means that the PseudoTcpChannel cannot go 80 // away until after it receives a Close() from TunnelStream. 81 PseudoTcpChannel* parent_; 82 }; 83 84 /////////////////////////////////////////////////////////////////////////////// 85 // PseudoTcpChannel 86 // Member object lifetime summaries: 87 // session_ - passed in constructor, cleared when channel_ goes away. 88 // channel_ - created in Connect, destroyed when session_ or tcp_ goes away. 89 // tcp_ - created in Connect, destroyed when channel_ goes away, or connection 90 // closes. 91 // worker_thread_ - created when channel_ is created, purged when channel_ is 92 // destroyed. 93 // stream_ - created in GetStream, destroyed by owner at arbitrary time. 94 // this - created in constructor, destroyed when worker_thread_ and stream_ 95 // are both gone. 96 /////////////////////////////////////////////////////////////////////////////// 97 98 // 99 // Signal thread methods 100 // 101 102 PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session) 103 : signal_thread_(session->session_manager()->signaling_thread()), 104 worker_thread_(NULL), 105 stream_thread_(stream_thread), 106 session_(session), channel_(NULL), tcp_(NULL), stream_(NULL), 107 stream_readable_(false), pending_read_event_(false), 108 ready_to_connect_(false) { 109 ASSERT(signal_thread_->IsCurrent()); 110 ASSERT(NULL != session_); 111 } 112 113 PseudoTcpChannel::~PseudoTcpChannel() { 114 ASSERT(signal_thread_->IsCurrent()); 115 ASSERT(worker_thread_ == NULL); 116 ASSERT(session_ == NULL); 117 ASSERT(channel_ == NULL); 118 ASSERT(stream_ == NULL); 119 ASSERT(tcp_ == NULL); 120 } 121 122 bool PseudoTcpChannel::Connect(const std::string& content_name, 123 const std::string& channel_name, 124 int component) { 125 ASSERT(signal_thread_->IsCurrent()); 126 CritScope lock(&cs_); 127 128 if (channel_) 129 return false; 130 131 ASSERT(session_ != NULL); 132 worker_thread_ = session_->session_manager()->worker_thread(); 133 content_name_ = content_name; 134 channel_ = session_->CreateChannel( 135 content_name, channel_name, component); 136 channel_name_ = channel_name; 137 channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1); 138 139 channel_->SignalDestroyed.connect(this, 140 &PseudoTcpChannel::OnChannelDestroyed); 141 channel_->SignalWritableState.connect(this, 142 &PseudoTcpChannel::OnChannelWritableState); 143 channel_->SignalReadPacket.connect(this, 144 &PseudoTcpChannel::OnChannelRead); 145 channel_->SignalRouteChange.connect(this, 146 &PseudoTcpChannel::OnChannelConnectionChanged); 147 148 ASSERT(tcp_ == NULL); 149 tcp_ = new PseudoTcp(this, 0); 150 if (session_->initiator()) { 151 // Since we may try several protocols and network adapters that won't work, 152 // waiting until we get our first writable notification before initiating 153 // TCP negotiation. 154 ready_to_connect_ = true; 155 } 156 157 return true; 158 } 159 160 StreamInterface* PseudoTcpChannel::GetStream() { 161 ASSERT(signal_thread_->IsCurrent()); 162 CritScope lock(&cs_); 163 ASSERT(NULL != session_); 164 if (!stream_) 165 stream_ = new PseudoTcpChannel::InternalStream(this); 166 //TODO("should we disallow creation of new stream at some point?"); 167 return stream_; 168 } 169 170 void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) { 171 LOG_F(LS_INFO) << "(" << channel->component() << ")"; 172 ASSERT(signal_thread_->IsCurrent()); 173 CritScope lock(&cs_); 174 ASSERT(channel == channel_); 175 signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL); 176 // When MSG_WK_PURGE is received, we know there will be no more messages from 177 // the worker thread. 178 worker_thread_->Clear(this, MSG_WK_CLOCK); 179 worker_thread_->Post(this, MSG_WK_PURGE); 180 session_ = NULL; 181 channel_ = NULL; 182 if ((stream_ != NULL) 183 && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED))) 184 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0)); 185 if (tcp_) { 186 tcp_->Close(true); 187 AdjustClock(); 188 } 189 SignalChannelClosed(this); 190 } 191 192 void PseudoTcpChannel::OnSessionTerminate(Session* session) { 193 // When the session terminates before we even connected 194 CritScope lock(&cs_); 195 if (session_ != NULL && channel_ == NULL) { 196 ASSERT(session == session_); 197 ASSERT(worker_thread_ == NULL); 198 ASSERT(tcp_ == NULL); 199 LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel"; 200 session_ = NULL; 201 if (stream_ != NULL) 202 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1)); 203 } 204 205 // Even though session_ is being destroyed, we mustn't clear the pointer, 206 // since we'll need it to tear down channel_. 207 // 208 // TODO: Is it always the case that if channel_ != NULL then we'll get 209 // a channel-destroyed notification? 210 } 211 212 void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) { 213 ASSERT(signal_thread_->IsCurrent()); 214 CritScope lock(&cs_); 215 ASSERT(tcp_ != NULL); 216 tcp_->GetOption(opt, value); 217 } 218 219 void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) { 220 ASSERT(signal_thread_->IsCurrent()); 221 CritScope lock(&cs_); 222 ASSERT(tcp_ != NULL); 223 tcp_->SetOption(opt, value); 224 } 225 226 // 227 // Stream thread methods 228 // 229 230 StreamState PseudoTcpChannel::GetState() const { 231 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 232 CritScope lock(&cs_); 233 if (!session_) 234 return SS_CLOSED; 235 if (!tcp_) 236 return SS_OPENING; 237 switch (tcp_->State()) { 238 case PseudoTcp::TCP_LISTEN: 239 case PseudoTcp::TCP_SYN_SENT: 240 case PseudoTcp::TCP_SYN_RECEIVED: 241 return SS_OPENING; 242 case PseudoTcp::TCP_ESTABLISHED: 243 return SS_OPEN; 244 case PseudoTcp::TCP_CLOSED: 245 default: 246 return SS_CLOSED; 247 } 248 } 249 250 StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len, 251 size_t* read, int* error) { 252 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 253 CritScope lock(&cs_); 254 if (!tcp_) 255 return SR_BLOCK; 256 257 stream_readable_ = false; 258 int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len); 259 //LOG_F(LS_VERBOSE) << "Recv returned: " << result; 260 if (result > 0) { 261 if (read) 262 *read = result; 263 // PseudoTcp doesn't currently support repeated Readable signals. Simulate 264 // them here. 265 stream_readable_ = true; 266 if (!pending_read_event_) { 267 pending_read_event_ = true; 268 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true); 269 } 270 return SR_SUCCESS; 271 } else if (IsBlockingError(tcp_->GetError())) { 272 return SR_BLOCK; 273 } else { 274 if (error) 275 *error = tcp_->GetError(); 276 return SR_ERROR; 277 } 278 // This spot is never reached. 279 } 280 281 StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len, 282 size_t* written, int* error) { 283 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 284 CritScope lock(&cs_); 285 if (!tcp_) 286 return SR_BLOCK; 287 int result = tcp_->Send(static_cast<const char*>(data), data_len); 288 //LOG_F(LS_VERBOSE) << "Send returned: " << result; 289 if (result > 0) { 290 if (written) 291 *written = result; 292 return SR_SUCCESS; 293 } else if (IsBlockingError(tcp_->GetError())) { 294 return SR_BLOCK; 295 } else { 296 if (error) 297 *error = tcp_->GetError(); 298 return SR_ERROR; 299 } 300 // This spot is never reached. 301 } 302 303 void PseudoTcpChannel::Close() { 304 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 305 CritScope lock(&cs_); 306 stream_ = NULL; 307 // Clear out any pending event notifications 308 stream_thread_->Clear(this, MSG_ST_EVENT); 309 if (tcp_) { 310 tcp_->Close(false); 311 AdjustClock(); 312 } else { 313 CheckDestroy(); 314 } 315 } 316 317 // 318 // Worker thread methods 319 // 320 321 void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) { 322 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 323 ASSERT(worker_thread_->IsCurrent()); 324 CritScope lock(&cs_); 325 if (!channel_) { 326 LOG_F(LS_WARNING) << "NULL channel"; 327 return; 328 } 329 ASSERT(channel == channel_); 330 if (!tcp_) { 331 LOG_F(LS_WARNING) << "NULL tcp"; 332 return; 333 } 334 if (!ready_to_connect_ || !channel->writable()) 335 return; 336 337 ready_to_connect_ = false; 338 tcp_->Connect(); 339 AdjustClock(); 340 } 341 342 void PseudoTcpChannel::OnChannelRead(TransportChannel* channel, 343 const char* data, size_t size, int flags) { 344 //LOG_F(LS_VERBOSE) << "(" << size << ")"; 345 ASSERT(worker_thread_->IsCurrent()); 346 CritScope lock(&cs_); 347 if (!channel_) { 348 LOG_F(LS_WARNING) << "NULL channel"; 349 return; 350 } 351 ASSERT(channel == channel_); 352 if (!tcp_) { 353 LOG_F(LS_WARNING) << "NULL tcp"; 354 return; 355 } 356 tcp_->NotifyPacket(data, size); 357 AdjustClock(); 358 } 359 360 void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel, 361 const Candidate& candidate) { 362 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 363 ASSERT(worker_thread_->IsCurrent()); 364 CritScope lock(&cs_); 365 if (!channel_) { 366 LOG_F(LS_WARNING) << "NULL channel"; 367 return; 368 } 369 ASSERT(channel == channel_); 370 if (!tcp_) { 371 LOG_F(LS_WARNING) << "NULL tcp"; 372 return; 373 } 374 375 uint16 mtu = 1280; // safe default 376 int family = candidate.address().family(); 377 Socket* socket = 378 worker_thread_->socketserver()->CreateAsyncSocket(family, SOCK_DGRAM); 379 talk_base::scoped_ptr<Socket> mtu_socket(socket); 380 if (socket == NULL) { 381 LOG_F(LS_WARNING) << "Couldn't create socket while estimating MTU."; 382 } else { 383 if (mtu_socket->Connect(candidate.address()) < 0 || 384 mtu_socket->EstimateMTU(&mtu) < 0) { 385 LOG_F(LS_WARNING) << "Failed to estimate MTU, error=" 386 << mtu_socket->GetError(); 387 } 388 } 389 390 LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes"; 391 tcp_->NotifyMTU(mtu); 392 AdjustClock(); 393 } 394 395 void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) { 396 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 397 ASSERT(cs_.CurrentThreadIsOwner()); 398 ASSERT(worker_thread_->IsCurrent()); 399 ASSERT(tcp == tcp_); 400 if (stream_) { 401 stream_readable_ = true; 402 pending_read_event_ = true; 403 stream_thread_->Post(this, MSG_ST_EVENT, 404 new EventData(SE_OPEN | SE_READ | SE_WRITE)); 405 } 406 } 407 408 void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) { 409 //LOG_F(LS_VERBOSE); 410 ASSERT(cs_.CurrentThreadIsOwner()); 411 ASSERT(worker_thread_->IsCurrent()); 412 ASSERT(tcp == tcp_); 413 if (stream_) { 414 stream_readable_ = true; 415 if (!pending_read_event_) { 416 pending_read_event_ = true; 417 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ)); 418 } 419 } 420 } 421 422 void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) { 423 //LOG_F(LS_VERBOSE); 424 ASSERT(cs_.CurrentThreadIsOwner()); 425 ASSERT(worker_thread_->IsCurrent()); 426 ASSERT(tcp == tcp_); 427 if (stream_) 428 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE)); 429 } 430 431 void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) { 432 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 433 ASSERT(cs_.CurrentThreadIsOwner()); 434 ASSERT(worker_thread_->IsCurrent()); 435 ASSERT(tcp == tcp_); 436 if (stream_) 437 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError)); 438 } 439 440 // 441 // Multi-thread methods 442 // 443 444 void PseudoTcpChannel::OnMessage(Message* pmsg) { 445 if (pmsg->message_id == MSG_WK_CLOCK) { 446 447 ASSERT(worker_thread_->IsCurrent()); 448 //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)"; 449 CritScope lock(&cs_); 450 if (tcp_) { 451 tcp_->NotifyClock(PseudoTcp::Now()); 452 AdjustClock(false); 453 } 454 455 } else if (pmsg->message_id == MSG_WK_PURGE) { 456 457 ASSERT(worker_thread_->IsCurrent()); 458 LOG_F(LS_INFO) << "(MSG_WK_PURGE)"; 459 // At this point, we know there are no additional worker thread messages. 460 CritScope lock(&cs_); 461 ASSERT(NULL == session_); 462 ASSERT(NULL == channel_); 463 worker_thread_ = NULL; 464 CheckDestroy(); 465 466 } else if (pmsg->message_id == MSG_ST_EVENT) { 467 468 ASSERT(stream_thread_->IsCurrent()); 469 //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, " 470 // << data->event << ", " << data->error << ")"; 471 ASSERT(stream_ != NULL); 472 EventData* data = static_cast<EventData*>(pmsg->pdata); 473 if (data->event & SE_READ) { 474 CritScope lock(&cs_); 475 pending_read_event_ = false; 476 } 477 stream_->SignalEvent(stream_, data->event, data->error); 478 delete data; 479 480 } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) { 481 482 ASSERT(signal_thread_->IsCurrent()); 483 LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)"; 484 ASSERT(session_ != NULL); 485 ASSERT(channel_ != NULL); 486 session_->DestroyChannel(content_name_, channel_->component()); 487 488 } else if (pmsg->message_id == MSG_SI_DESTROY) { 489 490 ASSERT(signal_thread_->IsCurrent()); 491 LOG_F(LS_INFO) << "(MSG_SI_DESTROY)"; 492 // The message queue is empty, so it is safe to destroy ourselves. 493 delete this; 494 495 } else { 496 ASSERT(false); 497 } 498 } 499 500 IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket( 501 PseudoTcp* tcp, const char* buffer, size_t len) { 502 ASSERT(cs_.CurrentThreadIsOwner()); 503 ASSERT(tcp == tcp_); 504 ASSERT(NULL != channel_); 505 int sent = channel_->SendPacket(buffer, len); 506 if (sent > 0) { 507 //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent"; 508 return IPseudoTcpNotify::WR_SUCCESS; 509 } else if (IsBlockingError(channel_->GetError())) { 510 LOG_F(LS_VERBOSE) << "Blocking"; 511 return IPseudoTcpNotify::WR_SUCCESS; 512 } else if (channel_->GetError() == EMSGSIZE) { 513 LOG_F(LS_ERROR) << "EMSGSIZE"; 514 return IPseudoTcpNotify::WR_TOO_LARGE; 515 } else { 516 PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket"; 517 ASSERT(false); 518 return IPseudoTcpNotify::WR_FAIL; 519 } 520 } 521 522 void PseudoTcpChannel::AdjustClock(bool clear) { 523 ASSERT(cs_.CurrentThreadIsOwner()); 524 ASSERT(NULL != tcp_); 525 526 long timeout = 0; 527 if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) { 528 ASSERT(NULL != channel_); 529 // Reset the next clock, by clearing the old and setting a new one. 530 if (clear) 531 worker_thread_->Clear(this, MSG_WK_CLOCK); 532 worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK); 533 return; 534 } 535 536 delete tcp_; 537 tcp_ = NULL; 538 ready_to_connect_ = false; 539 540 if (channel_) { 541 // If TCP has failed, no need for channel_ anymore 542 signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL); 543 } 544 } 545 546 void PseudoTcpChannel::CheckDestroy() { 547 ASSERT(cs_.CurrentThreadIsOwner()); 548 if ((worker_thread_ != NULL) || (stream_ != NULL)) 549 return; 550 signal_thread_->Post(this, MSG_SI_DESTROY); 551 } 552 553 /////////////////////////////////////////////////////////////////////////////// 554 // PseudoTcpChannel::InternalStream 555 /////////////////////////////////////////////////////////////////////////////// 556 557 PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent) 558 : parent_(parent) { 559 } 560 561 PseudoTcpChannel::InternalStream::~InternalStream() { 562 Close(); 563 } 564 565 StreamState PseudoTcpChannel::InternalStream::GetState() const { 566 if (!parent_) 567 return SS_CLOSED; 568 return parent_->GetState(); 569 } 570 571 StreamResult PseudoTcpChannel::InternalStream::Read( 572 void* buffer, size_t buffer_len, size_t* read, int* error) { 573 if (!parent_) { 574 if (error) 575 *error = ENOTCONN; 576 return SR_ERROR; 577 } 578 return parent_->Read(buffer, buffer_len, read, error); 579 } 580 581 StreamResult PseudoTcpChannel::InternalStream::Write( 582 const void* data, size_t data_len, size_t* written, int* error) { 583 if (!parent_) { 584 if (error) 585 *error = ENOTCONN; 586 return SR_ERROR; 587 } 588 return parent_->Write(data, data_len, written, error); 589 } 590 591 void PseudoTcpChannel::InternalStream::Close() { 592 if (!parent_) 593 return; 594 parent_->Close(); 595 parent_ = NULL; 596 } 597 598 /////////////////////////////////////////////////////////////////////////////// 599 600 } // namespace cricket 601