Home | History | Annotate | Download | only in tunnel
      1 /*
      2  * libjingle
      3  * Copyright 2004--2006, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include <string>
     29 #include "talk/base/basictypes.h"
     30 #include "talk/base/common.h"
     31 #include "talk/base/logging.h"
     32 #include "talk/base/scoped_ptr.h"
     33 #include "talk/base/stringutils.h"
     34 #include "talk/p2p/base/candidate.h"
     35 #include "talk/p2p/base/transportchannel.h"
     36 #include "pseudotcpchannel.h"
     37 
     38 using namespace talk_base;
     39 
     40 namespace cricket {
     41 
     42 extern const talk_base::ConstantLabel SESSION_STATES[];
     43 
     44 // MSG_WK_* - worker thread messages
     45 // MSG_ST_* - stream thread messages
     46 // MSG_SI_* - signal thread messages
     47 
     48 enum {
     49   MSG_WK_CLOCK = 1,
     50   MSG_WK_PURGE,
     51   MSG_ST_EVENT,
     52   MSG_SI_DESTROYCHANNEL,
     53   MSG_SI_DESTROY,
     54 };
     55 
     56 struct EventData : public MessageData {
     57   int event, error;
     58   EventData(int ev, int err = 0) : event(ev), error(err) { }
     59 };
     60 
     61 ///////////////////////////////////////////////////////////////////////////////
     62 // PseudoTcpChannel::InternalStream
     63 ///////////////////////////////////////////////////////////////////////////////
     64 
     65 class PseudoTcpChannel::InternalStream : public StreamInterface {
     66 public:
     67   InternalStream(PseudoTcpChannel* parent);
     68   virtual ~InternalStream();
     69 
     70   virtual StreamState GetState() const;
     71   virtual StreamResult Read(void* buffer, size_t buffer_len,
     72                                        size_t* read, int* error);
     73   virtual StreamResult Write(const void* data, size_t data_len,
     74                                         size_t* written, int* error);
     75   virtual void Close();
     76 
     77 private:
     78   // parent_ is accessed and modified exclusively on the event thread, to
     79   // avoid thread contention.  This means that the PseudoTcpChannel cannot go
     80   // away until after it receives a Close() from TunnelStream.
     81   PseudoTcpChannel* parent_;
     82 };
     83 
     84 ///////////////////////////////////////////////////////////////////////////////
     85 // PseudoTcpChannel
     86 // Member object lifetime summaries:
     87 //   session_ - passed in constructor, cleared when channel_ goes away.
     88 //   channel_ - created in Connect, destroyed when session_ or tcp_ goes away.
     89 //   tcp_ - created in Connect, destroyed when channel_ goes away, or connection
     90 //     closes.
     91 //   worker_thread_ - created when channel_ is created, purged when channel_ is
     92 //     destroyed.
     93 //   stream_ - created in GetStream, destroyed by owner at arbitrary time.
     94 //   this - created in constructor, destroyed when worker_thread_ and stream_
     95 //     are both gone.
     96 ///////////////////////////////////////////////////////////////////////////////
     97 
     98 //
     99 // Signal thread methods
    100 //
    101 
    102 PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)
    103   : signal_thread_(session->session_manager()->signaling_thread()),
    104     worker_thread_(NULL),
    105     stream_thread_(stream_thread),
    106     session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),
    107     stream_readable_(false), pending_read_event_(false),
    108     ready_to_connect_(false) {
    109   ASSERT(signal_thread_->IsCurrent());
    110   ASSERT(NULL != session_);
    111 }
    112 
    113 PseudoTcpChannel::~PseudoTcpChannel() {
    114   ASSERT(signal_thread_->IsCurrent());
    115   ASSERT(worker_thread_ == NULL);
    116   ASSERT(session_ == NULL);
    117   ASSERT(channel_ == NULL);
    118   ASSERT(stream_ == NULL);
    119   ASSERT(tcp_ == NULL);
    120 }
    121 
    122 bool PseudoTcpChannel::Connect(const std::string& content_name,
    123                                const std::string& channel_name,
    124                                int component) {
    125   ASSERT(signal_thread_->IsCurrent());
    126   CritScope lock(&cs_);
    127 
    128   if (channel_)
    129     return false;
    130 
    131   ASSERT(session_ != NULL);
    132   worker_thread_ = session_->session_manager()->worker_thread();
    133   content_name_ = content_name;
    134   channel_ = session_->CreateChannel(
    135       content_name, channel_name, component);
    136   channel_name_ = channel_name;
    137   channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);
    138 
    139   channel_->SignalDestroyed.connect(this,
    140     &PseudoTcpChannel::OnChannelDestroyed);
    141   channel_->SignalWritableState.connect(this,
    142     &PseudoTcpChannel::OnChannelWritableState);
    143   channel_->SignalReadPacket.connect(this,
    144     &PseudoTcpChannel::OnChannelRead);
    145   channel_->SignalRouteChange.connect(this,
    146     &PseudoTcpChannel::OnChannelConnectionChanged);
    147 
    148   ASSERT(tcp_ == NULL);
    149   tcp_ = new PseudoTcp(this, 0);
    150   if (session_->initiator()) {
    151     // Since we may try several protocols and network adapters that won't work,
    152     // waiting until we get our first writable notification before initiating
    153     // TCP negotiation.
    154     ready_to_connect_ = true;
    155   }
    156 
    157   return true;
    158 }
    159 
    160 StreamInterface* PseudoTcpChannel::GetStream() {
    161   ASSERT(signal_thread_->IsCurrent());
    162   CritScope lock(&cs_);
    163   ASSERT(NULL != session_);
    164   if (!stream_)
    165     stream_ = new PseudoTcpChannel::InternalStream(this);
    166   //TODO("should we disallow creation of new stream at some point?");
    167   return stream_;
    168 }
    169 
    170 void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {
    171   LOG_F(LS_INFO) << "(" << channel->component() << ")";
    172   ASSERT(signal_thread_->IsCurrent());
    173   CritScope lock(&cs_);
    174   ASSERT(channel == channel_);
    175   signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);
    176   // When MSG_WK_PURGE is received, we know there will be no more messages from
    177   // the worker thread.
    178   worker_thread_->Clear(this, MSG_WK_CLOCK);
    179   worker_thread_->Post(this, MSG_WK_PURGE);
    180   session_ = NULL;
    181   channel_ = NULL;
    182   if ((stream_ != NULL)
    183       && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))
    184     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));
    185   if (tcp_) {
    186     tcp_->Close(true);
    187     AdjustClock();
    188   }
    189   SignalChannelClosed(this);
    190 }
    191 
    192 void PseudoTcpChannel::OnSessionTerminate(Session* session) {
    193   // When the session terminates before we even connected
    194   CritScope lock(&cs_);
    195   if (session_ != NULL && channel_ == NULL) {
    196     ASSERT(session == session_);
    197     ASSERT(worker_thread_ == NULL);
    198     ASSERT(tcp_ == NULL);
    199     LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel";
    200     session_ = NULL;
    201     if (stream_ != NULL)
    202       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1));
    203   }
    204 
    205   // Even though session_ is being destroyed, we mustn't clear the pointer,
    206   // since we'll need it to tear down channel_.
    207   //
    208   // TODO: Is it always the case that if channel_ != NULL then we'll get
    209   // a channel-destroyed notification?
    210 }
    211 
    212 void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) {
    213   ASSERT(signal_thread_->IsCurrent());
    214   CritScope lock(&cs_);
    215   ASSERT(tcp_ != NULL);
    216   tcp_->GetOption(opt, value);
    217 }
    218 
    219 void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) {
    220   ASSERT(signal_thread_->IsCurrent());
    221   CritScope lock(&cs_);
    222   ASSERT(tcp_ != NULL);
    223   tcp_->SetOption(opt, value);
    224 }
    225 
    226 //
    227 // Stream thread methods
    228 //
    229 
    230 StreamState PseudoTcpChannel::GetState() const {
    231   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    232   CritScope lock(&cs_);
    233   if (!session_)
    234     return SS_CLOSED;
    235   if (!tcp_)
    236     return SS_OPENING;
    237   switch (tcp_->State()) {
    238     case PseudoTcp::TCP_LISTEN:
    239     case PseudoTcp::TCP_SYN_SENT:
    240     case PseudoTcp::TCP_SYN_RECEIVED:
    241       return SS_OPENING;
    242     case PseudoTcp::TCP_ESTABLISHED:
    243       return SS_OPEN;
    244     case PseudoTcp::TCP_CLOSED:
    245     default:
    246       return SS_CLOSED;
    247   }
    248 }
    249 
    250 StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,
    251                                     size_t* read, int* error) {
    252   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    253   CritScope lock(&cs_);
    254   if (!tcp_)
    255     return SR_BLOCK;
    256 
    257   stream_readable_ = false;
    258   int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);
    259   //LOG_F(LS_VERBOSE) << "Recv returned: " << result;
    260   if (result > 0) {
    261     if (read)
    262       *read = result;
    263     // PseudoTcp doesn't currently support repeated Readable signals.  Simulate
    264     // them here.
    265     stream_readable_ = true;
    266     if (!pending_read_event_) {
    267       pending_read_event_ = true;
    268       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);
    269     }
    270     return SR_SUCCESS;
    271   } else if (IsBlockingError(tcp_->GetError())) {
    272     return SR_BLOCK;
    273   } else {
    274     if (error)
    275       *error = tcp_->GetError();
    276     return SR_ERROR;
    277   }
    278   // This spot is never reached.
    279 }
    280 
    281 StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,
    282                                      size_t* written, int* error) {
    283   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    284   CritScope lock(&cs_);
    285   if (!tcp_)
    286     return SR_BLOCK;
    287   int result = tcp_->Send(static_cast<const char*>(data), data_len);
    288   //LOG_F(LS_VERBOSE) << "Send returned: " << result;
    289   if (result > 0) {
    290     if (written)
    291       *written = result;
    292     return SR_SUCCESS;
    293   } else if (IsBlockingError(tcp_->GetError())) {
    294     return SR_BLOCK;
    295   } else {
    296     if (error)
    297       *error = tcp_->GetError();
    298     return SR_ERROR;
    299   }
    300   // This spot is never reached.
    301 }
    302 
    303 void PseudoTcpChannel::Close() {
    304   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    305   CritScope lock(&cs_);
    306   stream_ = NULL;
    307   // Clear out any pending event notifications
    308   stream_thread_->Clear(this, MSG_ST_EVENT);
    309   if (tcp_) {
    310     tcp_->Close(false);
    311     AdjustClock();
    312   } else {
    313     CheckDestroy();
    314   }
    315 }
    316 
    317 //
    318 // Worker thread methods
    319 //
    320 
    321 void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {
    322   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    323   ASSERT(worker_thread_->IsCurrent());
    324   CritScope lock(&cs_);
    325   if (!channel_) {
    326     LOG_F(LS_WARNING) << "NULL channel";
    327     return;
    328   }
    329   ASSERT(channel == channel_);
    330   if (!tcp_) {
    331     LOG_F(LS_WARNING) << "NULL tcp";
    332     return;
    333   }
    334   if (!ready_to_connect_ || !channel->writable())
    335     return;
    336 
    337   ready_to_connect_ = false;
    338   tcp_->Connect();
    339   AdjustClock();
    340 }
    341 
    342 void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,
    343                                      const char* data, size_t size, int flags) {
    344   //LOG_F(LS_VERBOSE) << "(" << size << ")";
    345   ASSERT(worker_thread_->IsCurrent());
    346   CritScope lock(&cs_);
    347   if (!channel_) {
    348     LOG_F(LS_WARNING) << "NULL channel";
    349     return;
    350   }
    351   ASSERT(channel == channel_);
    352   if (!tcp_) {
    353     LOG_F(LS_WARNING) << "NULL tcp";
    354     return;
    355   }
    356   tcp_->NotifyPacket(data, size);
    357   AdjustClock();
    358 }
    359 
    360 void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,
    361                                                   const Candidate& candidate) {
    362   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    363   ASSERT(worker_thread_->IsCurrent());
    364   CritScope lock(&cs_);
    365   if (!channel_) {
    366     LOG_F(LS_WARNING) << "NULL channel";
    367     return;
    368   }
    369   ASSERT(channel == channel_);
    370   if (!tcp_) {
    371     LOG_F(LS_WARNING) << "NULL tcp";
    372     return;
    373   }
    374 
    375   uint16 mtu = 1280;  // safe default
    376   int family = candidate.address().family();
    377   Socket* socket =
    378       worker_thread_->socketserver()->CreateAsyncSocket(family, SOCK_DGRAM);
    379   talk_base::scoped_ptr<Socket> mtu_socket(socket);
    380   if (socket == NULL) {
    381     LOG_F(LS_WARNING) << "Couldn't create socket while estimating MTU.";
    382   } else {
    383     if (mtu_socket->Connect(candidate.address()) < 0 ||
    384         mtu_socket->EstimateMTU(&mtu) < 0) {
    385       LOG_F(LS_WARNING) << "Failed to estimate MTU, error="
    386                         << mtu_socket->GetError();
    387     }
    388   }
    389 
    390   LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes";
    391   tcp_->NotifyMTU(mtu);
    392   AdjustClock();
    393 }
    394 
    395 void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {
    396   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    397   ASSERT(cs_.CurrentThreadIsOwner());
    398   ASSERT(worker_thread_->IsCurrent());
    399   ASSERT(tcp == tcp_);
    400   if (stream_) {
    401     stream_readable_ = true;
    402     pending_read_event_ = true;
    403     stream_thread_->Post(this, MSG_ST_EVENT,
    404                          new EventData(SE_OPEN | SE_READ | SE_WRITE));
    405   }
    406 }
    407 
    408 void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {
    409   //LOG_F(LS_VERBOSE);
    410   ASSERT(cs_.CurrentThreadIsOwner());
    411   ASSERT(worker_thread_->IsCurrent());
    412   ASSERT(tcp == tcp_);
    413   if (stream_) {
    414     stream_readable_ = true;
    415     if (!pending_read_event_) {
    416       pending_read_event_ = true;
    417       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));
    418     }
    419   }
    420 }
    421 
    422 void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {
    423   //LOG_F(LS_VERBOSE);
    424   ASSERT(cs_.CurrentThreadIsOwner());
    425   ASSERT(worker_thread_->IsCurrent());
    426   ASSERT(tcp == tcp_);
    427   if (stream_)
    428     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));
    429 }
    430 
    431 void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {
    432   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    433   ASSERT(cs_.CurrentThreadIsOwner());
    434   ASSERT(worker_thread_->IsCurrent());
    435   ASSERT(tcp == tcp_);
    436   if (stream_)
    437     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));
    438 }
    439 
    440 //
    441 // Multi-thread methods
    442 //
    443 
    444 void PseudoTcpChannel::OnMessage(Message* pmsg) {
    445   if (pmsg->message_id == MSG_WK_CLOCK) {
    446 
    447     ASSERT(worker_thread_->IsCurrent());
    448     //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";
    449     CritScope lock(&cs_);
    450     if (tcp_) {
    451       tcp_->NotifyClock(PseudoTcp::Now());
    452       AdjustClock(false);
    453     }
    454 
    455   } else if (pmsg->message_id == MSG_WK_PURGE) {
    456 
    457     ASSERT(worker_thread_->IsCurrent());
    458     LOG_F(LS_INFO) << "(MSG_WK_PURGE)";
    459     // At this point, we know there are no additional worker thread messages.
    460     CritScope lock(&cs_);
    461     ASSERT(NULL == session_);
    462     ASSERT(NULL == channel_);
    463     worker_thread_ = NULL;
    464     CheckDestroy();
    465 
    466   } else if (pmsg->message_id == MSG_ST_EVENT) {
    467 
    468     ASSERT(stream_thread_->IsCurrent());
    469     //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "
    470     //             << data->event << ", " << data->error << ")";
    471     ASSERT(stream_ != NULL);
    472     EventData* data = static_cast<EventData*>(pmsg->pdata);
    473     if (data->event & SE_READ) {
    474       CritScope lock(&cs_);
    475       pending_read_event_ = false;
    476     }
    477     stream_->SignalEvent(stream_, data->event, data->error);
    478     delete data;
    479 
    480   } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {
    481 
    482     ASSERT(signal_thread_->IsCurrent());
    483     LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";
    484     ASSERT(session_ != NULL);
    485     ASSERT(channel_ != NULL);
    486     session_->DestroyChannel(content_name_, channel_->component());
    487 
    488   } else if (pmsg->message_id == MSG_SI_DESTROY) {
    489 
    490     ASSERT(signal_thread_->IsCurrent());
    491     LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";
    492     // The message queue is empty, so it is safe to destroy ourselves.
    493     delete this;
    494 
    495   } else {
    496     ASSERT(false);
    497   }
    498 }
    499 
    500 IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(
    501     PseudoTcp* tcp, const char* buffer, size_t len) {
    502   ASSERT(cs_.CurrentThreadIsOwner());
    503   ASSERT(tcp == tcp_);
    504   ASSERT(NULL != channel_);
    505   int sent = channel_->SendPacket(buffer, len);
    506   if (sent > 0) {
    507     //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";
    508     return IPseudoTcpNotify::WR_SUCCESS;
    509   } else if (IsBlockingError(channel_->GetError())) {
    510     LOG_F(LS_VERBOSE) << "Blocking";
    511     return IPseudoTcpNotify::WR_SUCCESS;
    512   } else if (channel_->GetError() == EMSGSIZE) {
    513     LOG_F(LS_ERROR) << "EMSGSIZE";
    514     return IPseudoTcpNotify::WR_TOO_LARGE;
    515   } else {
    516     PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";
    517     ASSERT(false);
    518     return IPseudoTcpNotify::WR_FAIL;
    519   }
    520 }
    521 
    522 void PseudoTcpChannel::AdjustClock(bool clear) {
    523   ASSERT(cs_.CurrentThreadIsOwner());
    524   ASSERT(NULL != tcp_);
    525 
    526   long timeout = 0;
    527   if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {
    528     ASSERT(NULL != channel_);
    529     // Reset the next clock, by clearing the old and setting a new one.
    530     if (clear)
    531       worker_thread_->Clear(this, MSG_WK_CLOCK);
    532     worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);
    533     return;
    534   }
    535 
    536   delete tcp_;
    537   tcp_ = NULL;
    538   ready_to_connect_ = false;
    539 
    540   if (channel_) {
    541     // If TCP has failed, no need for channel_ anymore
    542     signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);
    543   }
    544 }
    545 
    546 void PseudoTcpChannel::CheckDestroy() {
    547   ASSERT(cs_.CurrentThreadIsOwner());
    548   if ((worker_thread_ != NULL) || (stream_ != NULL))
    549     return;
    550   signal_thread_->Post(this, MSG_SI_DESTROY);
    551 }
    552 
    553 ///////////////////////////////////////////////////////////////////////////////
    554 // PseudoTcpChannel::InternalStream
    555 ///////////////////////////////////////////////////////////////////////////////
    556 
    557 PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)
    558   : parent_(parent) {
    559 }
    560 
    561 PseudoTcpChannel::InternalStream::~InternalStream() {
    562   Close();
    563 }
    564 
    565 StreamState PseudoTcpChannel::InternalStream::GetState() const {
    566   if (!parent_)
    567     return SS_CLOSED;
    568   return parent_->GetState();
    569 }
    570 
    571 StreamResult PseudoTcpChannel::InternalStream::Read(
    572     void* buffer, size_t buffer_len, size_t* read, int* error) {
    573   if (!parent_) {
    574     if (error)
    575       *error = ENOTCONN;
    576     return SR_ERROR;
    577   }
    578   return parent_->Read(buffer, buffer_len, read, error);
    579 }
    580 
    581 StreamResult PseudoTcpChannel::InternalStream::Write(
    582     const void* data, size_t data_len,  size_t* written, int* error) {
    583   if (!parent_) {
    584     if (error)
    585       *error = ENOTCONN;
    586     return SR_ERROR;
    587   }
    588   return parent_->Write(data, data_len, written, error);
    589 }
    590 
    591 void PseudoTcpChannel::InternalStream::Close() {
    592   if (!parent_)
    593     return;
    594   parent_->Close();
    595   parent_ = NULL;
    596 }
    597 
    598 ///////////////////////////////////////////////////////////////////////////////
    599 
    600 } // namespace cricket
    601