1 /* 2 * libjingle 3 * Copyright 2004--2006, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include <string> 29 #include "talk/base/basictypes.h" 30 #include "talk/base/common.h" 31 #include "talk/base/logging.h" 32 #include "talk/base/scoped_ptr.h" 33 #include "talk/base/stringutils.h" 34 #include "talk/p2p/base/transportchannel.h" 35 #include "pseudotcpchannel.h" 36 37 using namespace talk_base; 38 39 namespace cricket { 40 41 extern const talk_base::ConstantLabel SESSION_STATES[]; 42 43 // MSG_WK_* - worker thread messages 44 // MSG_ST_* - stream thread messages 45 // MSG_SI_* - signal thread messages 46 47 enum { 48 MSG_WK_CLOCK = 1, 49 MSG_WK_PURGE, 50 MSG_ST_EVENT, 51 MSG_SI_DESTROYCHANNEL, 52 MSG_SI_DESTROY, 53 }; 54 55 struct EventData : public MessageData { 56 int event, error; 57 EventData(int ev, int err = 0) : event(ev), error(err) { } 58 }; 59 60 /////////////////////////////////////////////////////////////////////////////// 61 // PseudoTcpChannel::InternalStream 62 /////////////////////////////////////////////////////////////////////////////// 63 64 class PseudoTcpChannel::InternalStream : public StreamInterface { 65 public: 66 InternalStream(PseudoTcpChannel* parent); 67 virtual ~InternalStream(); 68 69 virtual StreamState GetState() const; 70 virtual StreamResult Read(void* buffer, size_t buffer_len, 71 size_t* read, int* error); 72 virtual StreamResult Write(const void* data, size_t data_len, 73 size_t* written, int* error); 74 virtual void Close(); 75 76 private: 77 // parent_ is accessed and modified exclusively on the event thread, to 78 // avoid thread contention. This means that the PseudoTcpChannel cannot go 79 // away until after it receives a Close() from TunnelStream. 80 PseudoTcpChannel* parent_; 81 }; 82 83 /////////////////////////////////////////////////////////////////////////////// 84 // PseudoTcpChannel 85 // Member object lifetime summaries: 86 // session_ - passed in constructor, cleared when channel_ goes away. 87 // channel_ - created in Connect, destroyed when session_ or tcp_ goes away. 88 // tcp_ - created in Connect, destroyed when channel_ goes away, or connection 89 // closes. 90 // worker_thread_ - created when channel_ is created, purged when channel_ is 91 // destroyed. 92 // stream_ - created in GetStream, destroyed by owner at arbitrary time. 93 // this - created in constructor, destroyed when worker_thread_ and stream_ 94 // are both gone. 95 /////////////////////////////////////////////////////////////////////////////// 96 97 // 98 // Signal thread methods 99 // 100 101 PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session) 102 : signal_thread_(session->session_manager()->signaling_thread()), 103 worker_thread_(NULL), 104 stream_thread_(stream_thread), 105 session_(session), channel_(NULL), tcp_(NULL), stream_(NULL), 106 stream_readable_(false), pending_read_event_(false), 107 ready_to_connect_(false) { 108 ASSERT(signal_thread_->IsCurrent()); 109 ASSERT(NULL != session_); 110 } 111 112 PseudoTcpChannel::~PseudoTcpChannel() { 113 ASSERT(signal_thread_->IsCurrent()); 114 ASSERT(worker_thread_ == NULL); 115 ASSERT(session_ == NULL); 116 ASSERT(channel_ == NULL); 117 ASSERT(stream_ == NULL); 118 ASSERT(tcp_ == NULL); 119 } 120 121 bool PseudoTcpChannel::Connect(const std::string& content_name, 122 const std::string& channel_name) { 123 ASSERT(signal_thread_->IsCurrent()); 124 CritScope lock(&cs_); 125 126 if (channel_) 127 return false; 128 129 ASSERT(session_ != NULL); 130 worker_thread_ = session_->session_manager()->worker_thread(); 131 content_name_ = content_name; 132 channel_ = session_->CreateChannel(content_name, channel_name); 133 channel_name_ = channel_name; 134 channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1); 135 136 channel_->SignalDestroyed.connect(this, 137 &PseudoTcpChannel::OnChannelDestroyed); 138 channel_->SignalWritableState.connect(this, 139 &PseudoTcpChannel::OnChannelWritableState); 140 channel_->SignalReadPacket.connect(this, 141 &PseudoTcpChannel::OnChannelRead); 142 channel_->SignalRouteChange.connect(this, 143 &PseudoTcpChannel::OnChannelConnectionChanged); 144 145 ASSERT(tcp_ == NULL); 146 tcp_ = new PseudoTcp(this, 0); 147 if (session_->initiator()) { 148 // Since we may try several protocols and network adapters that won't work, 149 // waiting until we get our first writable notification before initiating 150 // TCP negotiation. 151 ready_to_connect_ = true; 152 } 153 154 return true; 155 } 156 157 StreamInterface* PseudoTcpChannel::GetStream() { 158 ASSERT(signal_thread_->IsCurrent()); 159 CritScope lock(&cs_); 160 ASSERT(NULL != session_); 161 if (!stream_) 162 stream_ = new PseudoTcpChannel::InternalStream(this); 163 //TODO("should we disallow creation of new stream at some point?"); 164 return stream_; 165 } 166 167 void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) { 168 LOG_F(LS_INFO) << "(" << channel->name() << ")"; 169 ASSERT(signal_thread_->IsCurrent()); 170 CritScope lock(&cs_); 171 ASSERT(channel == channel_); 172 signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL); 173 // When MSG_WK_PURGE is received, we know there will be no more messages from 174 // the worker thread. 175 worker_thread_->Clear(this, MSG_WK_CLOCK); 176 worker_thread_->Post(this, MSG_WK_PURGE); 177 session_ = NULL; 178 channel_ = NULL; 179 if ((stream_ != NULL) 180 && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED))) 181 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0)); 182 if (tcp_) { 183 tcp_->Close(true); 184 AdjustClock(); 185 } 186 SignalChannelClosed(this); 187 } 188 189 void PseudoTcpChannel::OnSessionTerminate(Session* session) { 190 // When the session terminates before we even connected 191 CritScope lock(&cs_); 192 if (session_ != NULL && channel_ == NULL) { 193 ASSERT(session == session_); 194 ASSERT(worker_thread_ == NULL); 195 ASSERT(tcp_ == NULL); 196 LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel"; 197 session_ = NULL; 198 if (stream_ != NULL) 199 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1)); 200 } 201 202 // Even though session_ is being destroyed, we mustn't clear the pointer, 203 // since we'll need it to tear down channel_. 204 // 205 // TODO(wez): Is it always the case that if channel_ != NULL then we'll get 206 // a channel-destroyed notification? 207 } 208 209 void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) { 210 ASSERT(signal_thread_->IsCurrent()); 211 CritScope lock(&cs_); 212 ASSERT(tcp_ != NULL); 213 tcp_->GetOption(opt, value); 214 } 215 216 void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) { 217 ASSERT(signal_thread_->IsCurrent()); 218 CritScope lock(&cs_); 219 ASSERT(tcp_ != NULL); 220 tcp_->SetOption(opt, value); 221 } 222 223 // 224 // Stream thread methods 225 // 226 227 StreamState PseudoTcpChannel::GetState() const { 228 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 229 CritScope lock(&cs_); 230 if (!session_) 231 return SS_CLOSED; 232 if (!tcp_) 233 return SS_OPENING; 234 switch (tcp_->State()) { 235 case PseudoTcp::TCP_LISTEN: 236 case PseudoTcp::TCP_SYN_SENT: 237 case PseudoTcp::TCP_SYN_RECEIVED: 238 return SS_OPENING; 239 case PseudoTcp::TCP_ESTABLISHED: 240 return SS_OPEN; 241 case PseudoTcp::TCP_CLOSED: 242 default: 243 return SS_CLOSED; 244 } 245 } 246 247 StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len, 248 size_t* read, int* error) { 249 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 250 CritScope lock(&cs_); 251 if (!tcp_) 252 return SR_BLOCK; 253 254 stream_readable_ = false; 255 int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len); 256 //LOG_F(LS_VERBOSE) << "Recv returned: " << result; 257 if (result > 0) { 258 if (read) 259 *read = result; 260 // PseudoTcp doesn't currently support repeated Readable signals. Simulate 261 // them here. 262 stream_readable_ = true; 263 if (!pending_read_event_) { 264 pending_read_event_ = true; 265 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true); 266 } 267 return SR_SUCCESS; 268 } else if (IsBlockingError(tcp_->GetError())) { 269 return SR_BLOCK; 270 } else { 271 if (error) 272 *error = tcp_->GetError(); 273 return SR_ERROR; 274 } 275 // This spot is never reached. 276 } 277 278 StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len, 279 size_t* written, int* error) { 280 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 281 CritScope lock(&cs_); 282 if (!tcp_) 283 return SR_BLOCK; 284 int result = tcp_->Send(static_cast<const char*>(data), data_len); 285 //LOG_F(LS_VERBOSE) << "Send returned: " << result; 286 if (result > 0) { 287 if (written) 288 *written = result; 289 return SR_SUCCESS; 290 } else if (IsBlockingError(tcp_->GetError())) { 291 return SR_BLOCK; 292 } else { 293 if (error) 294 *error = tcp_->GetError(); 295 return SR_ERROR; 296 } 297 // This spot is never reached. 298 } 299 300 void PseudoTcpChannel::Close() { 301 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 302 CritScope lock(&cs_); 303 stream_ = NULL; 304 // Clear out any pending event notifications 305 stream_thread_->Clear(this, MSG_ST_EVENT); 306 if (tcp_) { 307 tcp_->Close(false); 308 AdjustClock(); 309 } else { 310 CheckDestroy(); 311 } 312 } 313 314 // 315 // Worker thread methods 316 // 317 318 void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) { 319 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 320 ASSERT(worker_thread_->IsCurrent()); 321 CritScope lock(&cs_); 322 if (!channel_) { 323 LOG_F(LS_WARNING) << "NULL channel"; 324 return; 325 } 326 ASSERT(channel == channel_); 327 if (!tcp_) { 328 LOG_F(LS_WARNING) << "NULL tcp"; 329 return; 330 } 331 if (!ready_to_connect_ || !channel->writable()) 332 return; 333 334 ready_to_connect_ = false; 335 tcp_->Connect(); 336 AdjustClock(); 337 } 338 339 void PseudoTcpChannel::OnChannelRead(TransportChannel* channel, 340 const char* data, size_t size) { 341 //LOG_F(LS_VERBOSE) << "(" << size << ")"; 342 ASSERT(worker_thread_->IsCurrent()); 343 CritScope lock(&cs_); 344 if (!channel_) { 345 LOG_F(LS_WARNING) << "NULL channel"; 346 return; 347 } 348 ASSERT(channel == channel_); 349 if (!tcp_) { 350 LOG_F(LS_WARNING) << "NULL tcp"; 351 return; 352 } 353 tcp_->NotifyPacket(data, size); 354 AdjustClock(); 355 } 356 357 void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel, 358 const SocketAddress& addr) { 359 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 360 ASSERT(worker_thread_->IsCurrent()); 361 CritScope lock(&cs_); 362 if (!channel_) { 363 LOG_F(LS_WARNING) << "NULL channel"; 364 return; 365 } 366 ASSERT(channel == channel_); 367 if (!tcp_) { 368 LOG_F(LS_WARNING) << "NULL tcp"; 369 return; 370 } 371 372 uint16 mtu = 1280; // safe default 373 talk_base::scoped_ptr<Socket> mtu_socket( 374 worker_thread_->socketserver()->CreateSocket(SOCK_DGRAM)); 375 if (mtu_socket->Connect(addr) < 0 || 376 mtu_socket->EstimateMTU(&mtu) < 0) { 377 LOG_F(LS_WARNING) << "Failed to estimate MTU, error=" 378 << mtu_socket->GetError(); 379 } 380 381 LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes"; 382 tcp_->NotifyMTU(mtu); 383 AdjustClock(); 384 } 385 386 void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) { 387 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 388 ASSERT(cs_.CurrentThreadIsOwner()); 389 ASSERT(worker_thread_->IsCurrent()); 390 ASSERT(tcp == tcp_); 391 if (stream_) { 392 stream_readable_ = true; 393 pending_read_event_ = true; 394 stream_thread_->Post(this, MSG_ST_EVENT, 395 new EventData(SE_OPEN | SE_READ | SE_WRITE)); 396 } 397 } 398 399 void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) { 400 //LOG_F(LS_VERBOSE); 401 ASSERT(cs_.CurrentThreadIsOwner()); 402 ASSERT(worker_thread_->IsCurrent()); 403 ASSERT(tcp == tcp_); 404 if (stream_) { 405 stream_readable_ = true; 406 if (!pending_read_event_) { 407 pending_read_event_ = true; 408 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ)); 409 } 410 } 411 } 412 413 void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) { 414 //LOG_F(LS_VERBOSE); 415 ASSERT(cs_.CurrentThreadIsOwner()); 416 ASSERT(worker_thread_->IsCurrent()); 417 ASSERT(tcp == tcp_); 418 if (stream_) 419 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE)); 420 } 421 422 void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) { 423 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 424 ASSERT(cs_.CurrentThreadIsOwner()); 425 ASSERT(worker_thread_->IsCurrent()); 426 ASSERT(tcp == tcp_); 427 if (stream_) 428 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError)); 429 } 430 431 // 432 // Multi-thread methods 433 // 434 435 void PseudoTcpChannel::OnMessage(Message* pmsg) { 436 if (pmsg->message_id == MSG_WK_CLOCK) { 437 438 ASSERT(worker_thread_->IsCurrent()); 439 //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)"; 440 CritScope lock(&cs_); 441 if (tcp_) { 442 tcp_->NotifyClock(PseudoTcp::Now()); 443 AdjustClock(false); 444 } 445 446 } else if (pmsg->message_id == MSG_WK_PURGE) { 447 448 ASSERT(worker_thread_->IsCurrent()); 449 LOG_F(LS_INFO) << "(MSG_WK_PURGE)"; 450 // At this point, we know there are no additional worker thread messages. 451 CritScope lock(&cs_); 452 ASSERT(NULL == session_); 453 ASSERT(NULL == channel_); 454 worker_thread_ = NULL; 455 CheckDestroy(); 456 457 } else if (pmsg->message_id == MSG_ST_EVENT) { 458 459 ASSERT(stream_thread_->IsCurrent()); 460 //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, " 461 // << data->event << ", " << data->error << ")"; 462 ASSERT(stream_ != NULL); 463 EventData* data = static_cast<EventData*>(pmsg->pdata); 464 if (data->event & SE_READ) { 465 CritScope lock(&cs_); 466 pending_read_event_ = false; 467 } 468 stream_->SignalEvent(stream_, data->event, data->error); 469 delete data; 470 471 } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) { 472 473 ASSERT(signal_thread_->IsCurrent()); 474 LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)"; 475 ASSERT(session_ != NULL); 476 ASSERT(channel_ != NULL); 477 session_->DestroyChannel(content_name_, channel_->name()); 478 479 } else if (pmsg->message_id == MSG_SI_DESTROY) { 480 481 ASSERT(signal_thread_->IsCurrent()); 482 LOG_F(LS_INFO) << "(MSG_SI_DESTROY)"; 483 // The message queue is empty, so it is safe to destroy ourselves. 484 delete this; 485 486 } else { 487 ASSERT(false); 488 } 489 } 490 491 IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket( 492 PseudoTcp* tcp, const char* buffer, size_t len) { 493 ASSERT(cs_.CurrentThreadIsOwner()); 494 ASSERT(tcp == tcp_); 495 ASSERT(NULL != channel_); 496 int sent = channel_->SendPacket(buffer, len); 497 if (sent > 0) { 498 //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent"; 499 return IPseudoTcpNotify::WR_SUCCESS; 500 } else if (IsBlockingError(channel_->GetError())) { 501 LOG_F(LS_VERBOSE) << "Blocking"; 502 return IPseudoTcpNotify::WR_SUCCESS; 503 } else if (channel_->GetError() == EMSGSIZE) { 504 LOG_F(LS_ERROR) << "EMSGSIZE"; 505 return IPseudoTcpNotify::WR_TOO_LARGE; 506 } else { 507 PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket"; 508 ASSERT(false); 509 return IPseudoTcpNotify::WR_FAIL; 510 } 511 } 512 513 void PseudoTcpChannel::AdjustClock(bool clear) { 514 ASSERT(cs_.CurrentThreadIsOwner()); 515 ASSERT(NULL != tcp_); 516 517 long timeout = 0; 518 if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) { 519 ASSERT(NULL != channel_); 520 // Reset the next clock, by clearing the old and setting a new one. 521 if (clear) 522 worker_thread_->Clear(this, MSG_WK_CLOCK); 523 worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK); 524 return; 525 } 526 527 delete tcp_; 528 tcp_ = NULL; 529 ready_to_connect_ = false; 530 531 if (channel_) { 532 // If TCP has failed, no need for channel_ anymore 533 signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL); 534 } 535 } 536 537 void PseudoTcpChannel::CheckDestroy() { 538 ASSERT(cs_.CurrentThreadIsOwner()); 539 if ((worker_thread_ != NULL) || (stream_ != NULL)) 540 return; 541 signal_thread_->Post(this, MSG_SI_DESTROY); 542 } 543 544 /////////////////////////////////////////////////////////////////////////////// 545 // PseudoTcpChannel::InternalStream 546 /////////////////////////////////////////////////////////////////////////////// 547 548 PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent) 549 : parent_(parent) { 550 } 551 552 PseudoTcpChannel::InternalStream::~InternalStream() { 553 Close(); 554 } 555 556 StreamState PseudoTcpChannel::InternalStream::GetState() const { 557 if (!parent_) 558 return SS_CLOSED; 559 return parent_->GetState(); 560 } 561 562 StreamResult PseudoTcpChannel::InternalStream::Read( 563 void* buffer, size_t buffer_len, size_t* read, int* error) { 564 if (!parent_) { 565 if (error) 566 *error = ENOTCONN; 567 return SR_ERROR; 568 } 569 return parent_->Read(buffer, buffer_len, read, error); 570 } 571 572 StreamResult PseudoTcpChannel::InternalStream::Write( 573 const void* data, size_t data_len, size_t* written, int* error) { 574 if (!parent_) { 575 if (error) 576 *error = ENOTCONN; 577 return SR_ERROR; 578 } 579 return parent_->Write(data, data_len, written, error); 580 } 581 582 void PseudoTcpChannel::InternalStream::Close() { 583 if (!parent_) 584 return; 585 parent_->Close(); 586 parent_ = NULL; 587 } 588 589 /////////////////////////////////////////////////////////////////////////////// 590 591 } // namespace cricket 592