Home | History | Annotate | Download | only in tunnel
      1 /*
      2  * libjingle
      3  * Copyright 2004--2006, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include <string>
     29 #include "talk/base/basictypes.h"
     30 #include "talk/base/common.h"
     31 #include "talk/base/logging.h"
     32 #include "talk/base/scoped_ptr.h"
     33 #include "talk/base/stringutils.h"
     34 #include "talk/p2p/base/candidate.h"
     35 #include "talk/p2p/base/transportchannel.h"
     36 #include "pseudotcpchannel.h"
     37 
     38 using namespace talk_base;
     39 
     40 namespace cricket {
     41 
     42 extern const talk_base::ConstantLabel SESSION_STATES[];
     43 
     44 // MSG_WK_* - worker thread messages
     45 // MSG_ST_* - stream thread messages
     46 // MSG_SI_* - signal thread messages
     47 
     48 enum {
     49   MSG_WK_CLOCK = 1,
     50   MSG_WK_PURGE,
     51   MSG_ST_EVENT,
     52   MSG_SI_DESTROYCHANNEL,
     53   MSG_SI_DESTROY,
     54 };
     55 
     56 struct EventData : public MessageData {
     57   int event, error;
     58   EventData(int ev, int err = 0) : event(ev), error(err) { }
     59 };
     60 
     61 ///////////////////////////////////////////////////////////////////////////////
     62 // PseudoTcpChannel::InternalStream
     63 ///////////////////////////////////////////////////////////////////////////////
     64 
     65 class PseudoTcpChannel::InternalStream : public StreamInterface {
     66 public:
     67   InternalStream(PseudoTcpChannel* parent);
     68   virtual ~InternalStream();
     69 
     70   virtual StreamState GetState() const;
     71   virtual StreamResult Read(void* buffer, size_t buffer_len,
     72                                        size_t* read, int* error);
     73   virtual StreamResult Write(const void* data, size_t data_len,
     74                                         size_t* written, int* error);
     75   virtual void Close();
     76 
     77 private:
     78   // parent_ is accessed and modified exclusively on the event thread, to
     79   // avoid thread contention.  This means that the PseudoTcpChannel cannot go
     80   // away until after it receives a Close() from TunnelStream.
     81   PseudoTcpChannel* parent_;
     82 };
     83 
     84 ///////////////////////////////////////////////////////////////////////////////
     85 // PseudoTcpChannel
     86 // Member object lifetime summaries:
     87 //   session_ - passed in constructor, cleared when channel_ goes away.
     88 //   channel_ - created in Connect, destroyed when session_ or tcp_ goes away.
     89 //   tcp_ - created in Connect, destroyed when channel_ goes away, or connection
     90 //     closes.
     91 //   worker_thread_ - created when channel_ is created, purged when channel_ is
     92 //     destroyed.
     93 //   stream_ - created in GetStream, destroyed by owner at arbitrary time.
     94 //   this - created in constructor, destroyed when worker_thread_ and stream_
     95 //     are both gone.
     96 ///////////////////////////////////////////////////////////////////////////////
     97 
     98 //
     99 // Signal thread methods
    100 //
    101 
    102 PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)
    103   : signal_thread_(session->session_manager()->signaling_thread()),
    104     worker_thread_(NULL),
    105     stream_thread_(stream_thread),
    106     session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),
    107     stream_readable_(false), pending_read_event_(false),
    108     ready_to_connect_(false) {
    109   ASSERT(signal_thread_->IsCurrent());
    110   ASSERT(NULL != session_);
    111 }
    112 
    113 PseudoTcpChannel::~PseudoTcpChannel() {
    114   ASSERT(signal_thread_->IsCurrent());
    115   ASSERT(worker_thread_ == NULL);
    116   ASSERT(session_ == NULL);
    117   ASSERT(channel_ == NULL);
    118   ASSERT(stream_ == NULL);
    119   ASSERT(tcp_ == NULL);
    120 }
    121 
    122 bool PseudoTcpChannel::Connect(const std::string& content_name,
    123                                const std::string& channel_name,
    124                                int component) {
    125   ASSERT(signal_thread_->IsCurrent());
    126   CritScope lock(&cs_);
    127 
    128   if (channel_)
    129     return false;
    130 
    131   ASSERT(session_ != NULL);
    132   worker_thread_ = session_->session_manager()->worker_thread();
    133   content_name_ = content_name;
    134   channel_ = session_->CreateChannel(
    135       content_name, channel_name, component);
    136   channel_name_ = channel_name;
    137   channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);
    138 
    139   channel_->SignalDestroyed.connect(this,
    140     &PseudoTcpChannel::OnChannelDestroyed);
    141   channel_->SignalWritableState.connect(this,
    142     &PseudoTcpChannel::OnChannelWritableState);
    143   channel_->SignalReadPacket.connect(this,
    144     &PseudoTcpChannel::OnChannelRead);
    145   channel_->SignalRouteChange.connect(this,
    146     &PseudoTcpChannel::OnChannelConnectionChanged);
    147 
    148   ASSERT(tcp_ == NULL);
    149   tcp_ = new PseudoTcp(this, 0);
    150   if (session_->initiator()) {
    151     // Since we may try several protocols and network adapters that won't work,
    152     // waiting until we get our first writable notification before initiating
    153     // TCP negotiation.
    154     ready_to_connect_ = true;
    155   }
    156 
    157   return true;
    158 }
    159 
    160 StreamInterface* PseudoTcpChannel::GetStream() {
    161   ASSERT(signal_thread_->IsCurrent());
    162   CritScope lock(&cs_);
    163   ASSERT(NULL != session_);
    164   if (!stream_)
    165     stream_ = new PseudoTcpChannel::InternalStream(this);
    166   //TODO("should we disallow creation of new stream at some point?");
    167   return stream_;
    168 }
    169 
    170 void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {
    171   LOG_F(LS_INFO) << "(" << channel->component() << ")";
    172   ASSERT(signal_thread_->IsCurrent());
    173   CritScope lock(&cs_);
    174   ASSERT(channel == channel_);
    175   signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);
    176   // When MSG_WK_PURGE is received, we know there will be no more messages from
    177   // the worker thread.
    178   worker_thread_->Clear(this, MSG_WK_CLOCK);
    179   worker_thread_->Post(this, MSG_WK_PURGE);
    180   session_ = NULL;
    181   channel_ = NULL;
    182   if ((stream_ != NULL)
    183       && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))
    184     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));
    185   if (tcp_) {
    186     tcp_->Close(true);
    187     AdjustClock();
    188   }
    189   SignalChannelClosed(this);
    190 }
    191 
    192 void PseudoTcpChannel::OnSessionTerminate(Session* session) {
    193   // When the session terminates before we even connected
    194   CritScope lock(&cs_);
    195   if (session_ != NULL && channel_ == NULL) {
    196     ASSERT(session == session_);
    197     ASSERT(worker_thread_ == NULL);
    198     ASSERT(tcp_ == NULL);
    199     LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel";
    200     session_ = NULL;
    201     if (stream_ != NULL)
    202       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1));
    203   }
    204 
    205   // Even though session_ is being destroyed, we mustn't clear the pointer,
    206   // since we'll need it to tear down channel_.
    207   //
    208   // TODO: Is it always the case that if channel_ != NULL then we'll get
    209   // a channel-destroyed notification?
    210 }
    211 
    212 void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) {
    213   ASSERT(signal_thread_->IsCurrent());
    214   CritScope lock(&cs_);
    215   ASSERT(tcp_ != NULL);
    216   tcp_->GetOption(opt, value);
    217 }
    218 
    219 void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) {
    220   ASSERT(signal_thread_->IsCurrent());
    221   CritScope lock(&cs_);
    222   ASSERT(tcp_ != NULL);
    223   tcp_->SetOption(opt, value);
    224 }
    225 
    226 //
    227 // Stream thread methods
    228 //
    229 
    230 StreamState PseudoTcpChannel::GetState() const {
    231   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    232   CritScope lock(&cs_);
    233   if (!session_)
    234     return SS_CLOSED;
    235   if (!tcp_)
    236     return SS_OPENING;
    237   switch (tcp_->State()) {
    238     case PseudoTcp::TCP_LISTEN:
    239     case PseudoTcp::TCP_SYN_SENT:
    240     case PseudoTcp::TCP_SYN_RECEIVED:
    241       return SS_OPENING;
    242     case PseudoTcp::TCP_ESTABLISHED:
    243       return SS_OPEN;
    244     case PseudoTcp::TCP_CLOSED:
    245     default:
    246       return SS_CLOSED;
    247   }
    248 }
    249 
    250 StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,
    251                                     size_t* read, int* error) {
    252   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    253   CritScope lock(&cs_);
    254   if (!tcp_)
    255     return SR_BLOCK;
    256 
    257   stream_readable_ = false;
    258   int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);
    259   //LOG_F(LS_VERBOSE) << "Recv returned: " << result;
    260   if (result > 0) {
    261     if (read)
    262       *read = result;
    263     // PseudoTcp doesn't currently support repeated Readable signals.  Simulate
    264     // them here.
    265     stream_readable_ = true;
    266     if (!pending_read_event_) {
    267       pending_read_event_ = true;
    268       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);
    269     }
    270     return SR_SUCCESS;
    271   } else if (IsBlockingError(tcp_->GetError())) {
    272     return SR_BLOCK;
    273   } else {
    274     if (error)
    275       *error = tcp_->GetError();
    276     return SR_ERROR;
    277   }
    278   // This spot is never reached.
    279 }
    280 
    281 StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,
    282                                      size_t* written, int* error) {
    283   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    284   CritScope lock(&cs_);
    285   if (!tcp_)
    286     return SR_BLOCK;
    287   int result = tcp_->Send(static_cast<const char*>(data), data_len);
    288   //LOG_F(LS_VERBOSE) << "Send returned: " << result;
    289   if (result > 0) {
    290     if (written)
    291       *written = result;
    292     return SR_SUCCESS;
    293   } else if (IsBlockingError(tcp_->GetError())) {
    294     return SR_BLOCK;
    295   } else {
    296     if (error)
    297       *error = tcp_->GetError();
    298     return SR_ERROR;
    299   }
    300   // This spot is never reached.
    301 }
    302 
    303 void PseudoTcpChannel::Close() {
    304   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    305   CritScope lock(&cs_);
    306   stream_ = NULL;
    307   // Clear out any pending event notifications
    308   stream_thread_->Clear(this, MSG_ST_EVENT);
    309   if (tcp_) {
    310     tcp_->Close(false);
    311     AdjustClock();
    312   } else {
    313     CheckDestroy();
    314   }
    315 }
    316 
    317 //
    318 // Worker thread methods
    319 //
    320 
    321 void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {
    322   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    323   ASSERT(worker_thread_->IsCurrent());
    324   CritScope lock(&cs_);
    325   if (!channel_) {
    326     LOG_F(LS_WARNING) << "NULL channel";
    327     return;
    328   }
    329   ASSERT(channel == channel_);
    330   if (!tcp_) {
    331     LOG_F(LS_WARNING) << "NULL tcp";
    332     return;
    333   }
    334   if (!ready_to_connect_ || !channel->writable())
    335     return;
    336 
    337   ready_to_connect_ = false;
    338   tcp_->Connect();
    339   AdjustClock();
    340 }
    341 
    342 void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,
    343                                      const char* data, size_t size,
    344                                      const talk_base::PacketTime& packet_time,
    345                                      int flags) {
    346   //LOG_F(LS_VERBOSE) << "(" << size << ")";
    347   ASSERT(worker_thread_->IsCurrent());
    348   CritScope lock(&cs_);
    349   if (!channel_) {
    350     LOG_F(LS_WARNING) << "NULL channel";
    351     return;
    352   }
    353   ASSERT(channel == channel_);
    354   if (!tcp_) {
    355     LOG_F(LS_WARNING) << "NULL tcp";
    356     return;
    357   }
    358   tcp_->NotifyPacket(data, size);
    359   AdjustClock();
    360 }
    361 
    362 void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,
    363                                                   const Candidate& candidate) {
    364   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    365   ASSERT(worker_thread_->IsCurrent());
    366   CritScope lock(&cs_);
    367   if (!channel_) {
    368     LOG_F(LS_WARNING) << "NULL channel";
    369     return;
    370   }
    371   ASSERT(channel == channel_);
    372   if (!tcp_) {
    373     LOG_F(LS_WARNING) << "NULL tcp";
    374     return;
    375   }
    376 
    377   uint16 mtu = 1280;  // safe default
    378   int family = candidate.address().family();
    379   Socket* socket =
    380       worker_thread_->socketserver()->CreateAsyncSocket(family, SOCK_DGRAM);
    381   talk_base::scoped_ptr<Socket> mtu_socket(socket);
    382   if (socket == NULL) {
    383     LOG_F(LS_WARNING) << "Couldn't create socket while estimating MTU.";
    384   } else {
    385     if (mtu_socket->Connect(candidate.address()) < 0 ||
    386         mtu_socket->EstimateMTU(&mtu) < 0) {
    387       LOG_F(LS_WARNING) << "Failed to estimate MTU, error="
    388                         << mtu_socket->GetError();
    389     }
    390   }
    391 
    392   LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes";
    393   tcp_->NotifyMTU(mtu);
    394   AdjustClock();
    395 }
    396 
    397 void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {
    398   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    399   ASSERT(cs_.CurrentThreadIsOwner());
    400   ASSERT(worker_thread_->IsCurrent());
    401   ASSERT(tcp == tcp_);
    402   if (stream_) {
    403     stream_readable_ = true;
    404     pending_read_event_ = true;
    405     stream_thread_->Post(this, MSG_ST_EVENT,
    406                          new EventData(SE_OPEN | SE_READ | SE_WRITE));
    407   }
    408 }
    409 
    410 void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {
    411   //LOG_F(LS_VERBOSE);
    412   ASSERT(cs_.CurrentThreadIsOwner());
    413   ASSERT(worker_thread_->IsCurrent());
    414   ASSERT(tcp == tcp_);
    415   if (stream_) {
    416     stream_readable_ = true;
    417     if (!pending_read_event_) {
    418       pending_read_event_ = true;
    419       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));
    420     }
    421   }
    422 }
    423 
    424 void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {
    425   //LOG_F(LS_VERBOSE);
    426   ASSERT(cs_.CurrentThreadIsOwner());
    427   ASSERT(worker_thread_->IsCurrent());
    428   ASSERT(tcp == tcp_);
    429   if (stream_)
    430     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));
    431 }
    432 
    433 void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {
    434   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    435   ASSERT(cs_.CurrentThreadIsOwner());
    436   ASSERT(worker_thread_->IsCurrent());
    437   ASSERT(tcp == tcp_);
    438   if (stream_)
    439     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));
    440 }
    441 
    442 //
    443 // Multi-thread methods
    444 //
    445 
    446 void PseudoTcpChannel::OnMessage(Message* pmsg) {
    447   if (pmsg->message_id == MSG_WK_CLOCK) {
    448 
    449     ASSERT(worker_thread_->IsCurrent());
    450     //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";
    451     CritScope lock(&cs_);
    452     if (tcp_) {
    453       tcp_->NotifyClock(PseudoTcp::Now());
    454       AdjustClock(false);
    455     }
    456 
    457   } else if (pmsg->message_id == MSG_WK_PURGE) {
    458 
    459     ASSERT(worker_thread_->IsCurrent());
    460     LOG_F(LS_INFO) << "(MSG_WK_PURGE)";
    461     // At this point, we know there are no additional worker thread messages.
    462     CritScope lock(&cs_);
    463     ASSERT(NULL == session_);
    464     ASSERT(NULL == channel_);
    465     worker_thread_ = NULL;
    466     CheckDestroy();
    467 
    468   } else if (pmsg->message_id == MSG_ST_EVENT) {
    469 
    470     ASSERT(stream_thread_->IsCurrent());
    471     //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "
    472     //             << data->event << ", " << data->error << ")";
    473     ASSERT(stream_ != NULL);
    474     EventData* data = static_cast<EventData*>(pmsg->pdata);
    475     if (data->event & SE_READ) {
    476       CritScope lock(&cs_);
    477       pending_read_event_ = false;
    478     }
    479     stream_->SignalEvent(stream_, data->event, data->error);
    480     delete data;
    481 
    482   } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {
    483 
    484     ASSERT(signal_thread_->IsCurrent());
    485     LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";
    486     ASSERT(session_ != NULL);
    487     ASSERT(channel_ != NULL);
    488     session_->DestroyChannel(content_name_, channel_->component());
    489 
    490   } else if (pmsg->message_id == MSG_SI_DESTROY) {
    491 
    492     ASSERT(signal_thread_->IsCurrent());
    493     LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";
    494     // The message queue is empty, so it is safe to destroy ourselves.
    495     delete this;
    496 
    497   } else {
    498     ASSERT(false);
    499   }
    500 }
    501 
    502 IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(
    503     PseudoTcp* tcp, const char* buffer, size_t len) {
    504   ASSERT(cs_.CurrentThreadIsOwner());
    505   ASSERT(tcp == tcp_);
    506   ASSERT(NULL != channel_);
    507   talk_base::PacketOptions packet_options;
    508   int sent = channel_->SendPacket(buffer, len, packet_options);
    509   if (sent > 0) {
    510     //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";
    511     return IPseudoTcpNotify::WR_SUCCESS;
    512   } else if (IsBlockingError(channel_->GetError())) {
    513     LOG_F(LS_VERBOSE) << "Blocking";
    514     return IPseudoTcpNotify::WR_SUCCESS;
    515   } else if (channel_->GetError() == EMSGSIZE) {
    516     LOG_F(LS_ERROR) << "EMSGSIZE";
    517     return IPseudoTcpNotify::WR_TOO_LARGE;
    518   } else {
    519     PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";
    520     ASSERT(false);
    521     return IPseudoTcpNotify::WR_FAIL;
    522   }
    523 }
    524 
    525 void PseudoTcpChannel::AdjustClock(bool clear) {
    526   ASSERT(cs_.CurrentThreadIsOwner());
    527   ASSERT(NULL != tcp_);
    528 
    529   long timeout = 0;
    530   if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {
    531     ASSERT(NULL != channel_);
    532     // Reset the next clock, by clearing the old and setting a new one.
    533     if (clear)
    534       worker_thread_->Clear(this, MSG_WK_CLOCK);
    535     worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);
    536     return;
    537   }
    538 
    539   delete tcp_;
    540   tcp_ = NULL;
    541   ready_to_connect_ = false;
    542 
    543   if (channel_) {
    544     // If TCP has failed, no need for channel_ anymore
    545     signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);
    546   }
    547 }
    548 
    549 void PseudoTcpChannel::CheckDestroy() {
    550   ASSERT(cs_.CurrentThreadIsOwner());
    551   if ((worker_thread_ != NULL) || (stream_ != NULL))
    552     return;
    553   signal_thread_->Post(this, MSG_SI_DESTROY);
    554 }
    555 
    556 ///////////////////////////////////////////////////////////////////////////////
    557 // PseudoTcpChannel::InternalStream
    558 ///////////////////////////////////////////////////////////////////////////////
    559 
    560 PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)
    561   : parent_(parent) {
    562 }
    563 
    564 PseudoTcpChannel::InternalStream::~InternalStream() {
    565   Close();
    566 }
    567 
    568 StreamState PseudoTcpChannel::InternalStream::GetState() const {
    569   if (!parent_)
    570     return SS_CLOSED;
    571   return parent_->GetState();
    572 }
    573 
    574 StreamResult PseudoTcpChannel::InternalStream::Read(
    575     void* buffer, size_t buffer_len, size_t* read, int* error) {
    576   if (!parent_) {
    577     if (error)
    578       *error = ENOTCONN;
    579     return SR_ERROR;
    580   }
    581   return parent_->Read(buffer, buffer_len, read, error);
    582 }
    583 
    584 StreamResult PseudoTcpChannel::InternalStream::Write(
    585     const void* data, size_t data_len,  size_t* written, int* error) {
    586   if (!parent_) {
    587     if (error)
    588       *error = ENOTCONN;
    589     return SR_ERROR;
    590   }
    591   return parent_->Write(data, data_len, written, error);
    592 }
    593 
    594 void PseudoTcpChannel::InternalStream::Close() {
    595   if (!parent_)
    596     return;
    597   parent_->Close();
    598   parent_ = NULL;
    599 }
    600 
    601 ///////////////////////////////////////////////////////////////////////////////
    602 
    603 } // namespace cricket
    604