Home | History | Annotate | Download | only in tunnel
      1 /*
      2  * libjingle
      3  * Copyright 2004--2006, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include <string>
     29 #include "talk/base/basictypes.h"
     30 #include "talk/base/common.h"
     31 #include "talk/base/logging.h"
     32 #include "talk/base/scoped_ptr.h"
     33 #include "talk/base/stringutils.h"
     34 #include "talk/p2p/base/transportchannel.h"
     35 #include "pseudotcpchannel.h"
     36 
     37 using namespace talk_base;
     38 
     39 namespace cricket {
     40 
     41 extern const talk_base::ConstantLabel SESSION_STATES[];
     42 
     43 // MSG_WK_* - worker thread messages
     44 // MSG_ST_* - stream thread messages
     45 // MSG_SI_* - signal thread messages
     46 
     47 enum {
     48   MSG_WK_CLOCK = 1,
     49   MSG_WK_PURGE,
     50   MSG_ST_EVENT,
     51   MSG_SI_DESTROYCHANNEL,
     52   MSG_SI_DESTROY,
     53 };
     54 
     55 struct EventData : public MessageData {
     56   int event, error;
     57   EventData(int ev, int err = 0) : event(ev), error(err) { }
     58 };
     59 
     60 ///////////////////////////////////////////////////////////////////////////////
     61 // PseudoTcpChannel::InternalStream
     62 ///////////////////////////////////////////////////////////////////////////////
     63 
     64 class PseudoTcpChannel::InternalStream : public StreamInterface {
     65 public:
     66   InternalStream(PseudoTcpChannel* parent);
     67   virtual ~InternalStream();
     68 
     69   virtual StreamState GetState() const;
     70   virtual StreamResult Read(void* buffer, size_t buffer_len,
     71                                        size_t* read, int* error);
     72   virtual StreamResult Write(const void* data, size_t data_len,
     73                                         size_t* written, int* error);
     74   virtual void Close();
     75 
     76 private:
     77   // parent_ is accessed and modified exclusively on the event thread, to
     78   // avoid thread contention.  This means that the PseudoTcpChannel cannot go
     79   // away until after it receives a Close() from TunnelStream.
     80   PseudoTcpChannel* parent_;
     81 };
     82 
     83 ///////////////////////////////////////////////////////////////////////////////
     84 // PseudoTcpChannel
     85 // Member object lifetime summaries:
     86 //   session_ - passed in constructor, cleared when channel_ goes away.
     87 //   channel_ - created in Connect, destroyed when session_ or tcp_ goes away.
     88 //   tcp_ - created in Connect, destroyed when channel_ goes away, or connection
     89 //     closes.
     90 //   worker_thread_ - created when channel_ is created, purged when channel_ is
     91 //     destroyed.
     92 //   stream_ - created in GetStream, destroyed by owner at arbitrary time.
     93 //   this - created in constructor, destroyed when worker_thread_ and stream_
     94 //     are both gone.
     95 ///////////////////////////////////////////////////////////////////////////////
     96 
     97 //
     98 // Signal thread methods
     99 //
    100 
    101 PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)
    102   : signal_thread_(session->session_manager()->signaling_thread()),
    103     worker_thread_(NULL),
    104     stream_thread_(stream_thread),
    105     session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),
    106     stream_readable_(false), pending_read_event_(false),
    107     ready_to_connect_(false) {
    108   ASSERT(signal_thread_->IsCurrent());
    109   ASSERT(NULL != session_);
    110 }
    111 
    112 PseudoTcpChannel::~PseudoTcpChannel() {
    113   ASSERT(signal_thread_->IsCurrent());
    114   ASSERT(worker_thread_ == NULL);
    115   ASSERT(session_ == NULL);
    116   ASSERT(channel_ == NULL);
    117   ASSERT(stream_ == NULL);
    118   ASSERT(tcp_ == NULL);
    119 }
    120 
    121 bool PseudoTcpChannel::Connect(const std::string& content_name,
    122                                const std::string& channel_name) {
    123   ASSERT(signal_thread_->IsCurrent());
    124   CritScope lock(&cs_);
    125 
    126   if (channel_)
    127     return false;
    128 
    129   ASSERT(session_ != NULL);
    130   worker_thread_ = session_->session_manager()->worker_thread();
    131   content_name_ = content_name;
    132   channel_ = session_->CreateChannel(content_name, channel_name);
    133   channel_name_ = channel_name;
    134   channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);
    135 
    136   channel_->SignalDestroyed.connect(this,
    137     &PseudoTcpChannel::OnChannelDestroyed);
    138   channel_->SignalWritableState.connect(this,
    139     &PseudoTcpChannel::OnChannelWritableState);
    140   channel_->SignalReadPacket.connect(this,
    141     &PseudoTcpChannel::OnChannelRead);
    142   channel_->SignalRouteChange.connect(this,
    143     &PseudoTcpChannel::OnChannelConnectionChanged);
    144 
    145   ASSERT(tcp_ == NULL);
    146   tcp_ = new PseudoTcp(this, 0);
    147   if (session_->initiator()) {
    148     // Since we may try several protocols and network adapters that won't work,
    149     // waiting until we get our first writable notification before initiating
    150     // TCP negotiation.
    151     ready_to_connect_ = true;
    152   }
    153 
    154   return true;
    155 }
    156 
    157 StreamInterface* PseudoTcpChannel::GetStream() {
    158   ASSERT(signal_thread_->IsCurrent());
    159   CritScope lock(&cs_);
    160   ASSERT(NULL != session_);
    161   if (!stream_)
    162     stream_ = new PseudoTcpChannel::InternalStream(this);
    163   //TODO("should we disallow creation of new stream at some point?");
    164   return stream_;
    165 }
    166 
    167 void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {
    168   LOG_F(LS_INFO) << "(" << channel->name() << ")";
    169   ASSERT(signal_thread_->IsCurrent());
    170   CritScope lock(&cs_);
    171   ASSERT(channel == channel_);
    172   signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);
    173   // When MSG_WK_PURGE is received, we know there will be no more messages from
    174   // the worker thread.
    175   worker_thread_->Clear(this, MSG_WK_CLOCK);
    176   worker_thread_->Post(this, MSG_WK_PURGE);
    177   session_ = NULL;
    178   channel_ = NULL;
    179   if ((stream_ != NULL)
    180       && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))
    181     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));
    182   if (tcp_) {
    183     tcp_->Close(true);
    184     AdjustClock();
    185   }
    186   SignalChannelClosed(this);
    187 }
    188 
    189 void PseudoTcpChannel::OnSessionTerminate(Session* session) {
    190   // When the session terminates before we even connected
    191   CritScope lock(&cs_);
    192   if (session_ != NULL && channel_ == NULL) {
    193     ASSERT(session == session_);
    194     ASSERT(worker_thread_ == NULL);
    195     ASSERT(tcp_ == NULL);
    196     LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel";
    197     session_ = NULL;
    198     if (stream_ != NULL)
    199       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1));
    200   }
    201 
    202   // Even though session_ is being destroyed, we mustn't clear the pointer,
    203   // since we'll need it to tear down channel_.
    204   //
    205   // TODO(wez): Is it always the case that if channel_ != NULL then we'll get
    206   // a channel-destroyed notification?
    207 }
    208 
    209 void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) {
    210   ASSERT(signal_thread_->IsCurrent());
    211   CritScope lock(&cs_);
    212   ASSERT(tcp_ != NULL);
    213   tcp_->GetOption(opt, value);
    214 }
    215 
    216 void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) {
    217   ASSERT(signal_thread_->IsCurrent());
    218   CritScope lock(&cs_);
    219   ASSERT(tcp_ != NULL);
    220   tcp_->SetOption(opt, value);
    221 }
    222 
    223 //
    224 // Stream thread methods
    225 //
    226 
    227 StreamState PseudoTcpChannel::GetState() const {
    228   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    229   CritScope lock(&cs_);
    230   if (!session_)
    231     return SS_CLOSED;
    232   if (!tcp_)
    233     return SS_OPENING;
    234   switch (tcp_->State()) {
    235     case PseudoTcp::TCP_LISTEN:
    236     case PseudoTcp::TCP_SYN_SENT:
    237     case PseudoTcp::TCP_SYN_RECEIVED:
    238       return SS_OPENING;
    239     case PseudoTcp::TCP_ESTABLISHED:
    240       return SS_OPEN;
    241     case PseudoTcp::TCP_CLOSED:
    242     default:
    243       return SS_CLOSED;
    244   }
    245 }
    246 
    247 StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,
    248                                     size_t* read, int* error) {
    249   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    250   CritScope lock(&cs_);
    251   if (!tcp_)
    252     return SR_BLOCK;
    253 
    254   stream_readable_ = false;
    255   int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);
    256   //LOG_F(LS_VERBOSE) << "Recv returned: " << result;
    257   if (result > 0) {
    258     if (read)
    259       *read = result;
    260     // PseudoTcp doesn't currently support repeated Readable signals.  Simulate
    261     // them here.
    262     stream_readable_ = true;
    263     if (!pending_read_event_) {
    264       pending_read_event_ = true;
    265       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);
    266     }
    267     return SR_SUCCESS;
    268   } else if (IsBlockingError(tcp_->GetError())) {
    269     return SR_BLOCK;
    270   } else {
    271     if (error)
    272       *error = tcp_->GetError();
    273     return SR_ERROR;
    274   }
    275   // This spot is never reached.
    276 }
    277 
    278 StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,
    279                                      size_t* written, int* error) {
    280   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    281   CritScope lock(&cs_);
    282   if (!tcp_)
    283     return SR_BLOCK;
    284   int result = tcp_->Send(static_cast<const char*>(data), data_len);
    285   //LOG_F(LS_VERBOSE) << "Send returned: " << result;
    286   if (result > 0) {
    287     if (written)
    288       *written = result;
    289     return SR_SUCCESS;
    290   } else if (IsBlockingError(tcp_->GetError())) {
    291     return SR_BLOCK;
    292   } else {
    293     if (error)
    294       *error = tcp_->GetError();
    295     return SR_ERROR;
    296   }
    297   // This spot is never reached.
    298 }
    299 
    300 void PseudoTcpChannel::Close() {
    301   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
    302   CritScope lock(&cs_);
    303   stream_ = NULL;
    304   // Clear out any pending event notifications
    305   stream_thread_->Clear(this, MSG_ST_EVENT);
    306   if (tcp_) {
    307     tcp_->Close(false);
    308     AdjustClock();
    309   } else {
    310     CheckDestroy();
    311   }
    312 }
    313 
    314 //
    315 // Worker thread methods
    316 //
    317 
    318 void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {
    319   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    320   ASSERT(worker_thread_->IsCurrent());
    321   CritScope lock(&cs_);
    322   if (!channel_) {
    323     LOG_F(LS_WARNING) << "NULL channel";
    324     return;
    325   }
    326   ASSERT(channel == channel_);
    327   if (!tcp_) {
    328     LOG_F(LS_WARNING) << "NULL tcp";
    329     return;
    330   }
    331   if (!ready_to_connect_ || !channel->writable())
    332     return;
    333 
    334   ready_to_connect_ = false;
    335   tcp_->Connect();
    336   AdjustClock();
    337 }
    338 
    339 void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,
    340                                      const char* data, size_t size) {
    341   //LOG_F(LS_VERBOSE) << "(" << size << ")";
    342   ASSERT(worker_thread_->IsCurrent());
    343   CritScope lock(&cs_);
    344   if (!channel_) {
    345     LOG_F(LS_WARNING) << "NULL channel";
    346     return;
    347   }
    348   ASSERT(channel == channel_);
    349   if (!tcp_) {
    350     LOG_F(LS_WARNING) << "NULL tcp";
    351     return;
    352   }
    353   tcp_->NotifyPacket(data, size);
    354   AdjustClock();
    355 }
    356 
    357 void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,
    358                                                   const SocketAddress& addr) {
    359   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    360   ASSERT(worker_thread_->IsCurrent());
    361   CritScope lock(&cs_);
    362   if (!channel_) {
    363     LOG_F(LS_WARNING) << "NULL channel";
    364     return;
    365   }
    366   ASSERT(channel == channel_);
    367   if (!tcp_) {
    368     LOG_F(LS_WARNING) << "NULL tcp";
    369     return;
    370   }
    371 
    372   uint16 mtu = 1280;  // safe default
    373   talk_base::scoped_ptr<Socket> mtu_socket(
    374       worker_thread_->socketserver()->CreateSocket(SOCK_DGRAM));
    375   if (mtu_socket->Connect(addr) < 0 ||
    376       mtu_socket->EstimateMTU(&mtu) < 0) {
    377     LOG_F(LS_WARNING) << "Failed to estimate MTU, error="
    378                       << mtu_socket->GetError();
    379   }
    380 
    381   LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes";
    382   tcp_->NotifyMTU(mtu);
    383   AdjustClock();
    384 }
    385 
    386 void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {
    387   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    388   ASSERT(cs_.CurrentThreadIsOwner());
    389   ASSERT(worker_thread_->IsCurrent());
    390   ASSERT(tcp == tcp_);
    391   if (stream_) {
    392     stream_readable_ = true;
    393     pending_read_event_ = true;
    394     stream_thread_->Post(this, MSG_ST_EVENT,
    395                          new EventData(SE_OPEN | SE_READ | SE_WRITE));
    396   }
    397 }
    398 
    399 void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {
    400   //LOG_F(LS_VERBOSE);
    401   ASSERT(cs_.CurrentThreadIsOwner());
    402   ASSERT(worker_thread_->IsCurrent());
    403   ASSERT(tcp == tcp_);
    404   if (stream_) {
    405     stream_readable_ = true;
    406     if (!pending_read_event_) {
    407       pending_read_event_ = true;
    408       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));
    409     }
    410   }
    411 }
    412 
    413 void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {
    414   //LOG_F(LS_VERBOSE);
    415   ASSERT(cs_.CurrentThreadIsOwner());
    416   ASSERT(worker_thread_->IsCurrent());
    417   ASSERT(tcp == tcp_);
    418   if (stream_)
    419     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));
    420 }
    421 
    422 void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {
    423   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
    424   ASSERT(cs_.CurrentThreadIsOwner());
    425   ASSERT(worker_thread_->IsCurrent());
    426   ASSERT(tcp == tcp_);
    427   if (stream_)
    428     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));
    429 }
    430 
    431 //
    432 // Multi-thread methods
    433 //
    434 
    435 void PseudoTcpChannel::OnMessage(Message* pmsg) {
    436   if (pmsg->message_id == MSG_WK_CLOCK) {
    437 
    438     ASSERT(worker_thread_->IsCurrent());
    439     //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";
    440     CritScope lock(&cs_);
    441     if (tcp_) {
    442       tcp_->NotifyClock(PseudoTcp::Now());
    443       AdjustClock(false);
    444     }
    445 
    446   } else if (pmsg->message_id == MSG_WK_PURGE) {
    447 
    448     ASSERT(worker_thread_->IsCurrent());
    449     LOG_F(LS_INFO) << "(MSG_WK_PURGE)";
    450     // At this point, we know there are no additional worker thread messages.
    451     CritScope lock(&cs_);
    452     ASSERT(NULL == session_);
    453     ASSERT(NULL == channel_);
    454     worker_thread_ = NULL;
    455     CheckDestroy();
    456 
    457   } else if (pmsg->message_id == MSG_ST_EVENT) {
    458 
    459     ASSERT(stream_thread_->IsCurrent());
    460     //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "
    461     //             << data->event << ", " << data->error << ")";
    462     ASSERT(stream_ != NULL);
    463     EventData* data = static_cast<EventData*>(pmsg->pdata);
    464     if (data->event & SE_READ) {
    465       CritScope lock(&cs_);
    466       pending_read_event_ = false;
    467     }
    468     stream_->SignalEvent(stream_, data->event, data->error);
    469     delete data;
    470 
    471   } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {
    472 
    473     ASSERT(signal_thread_->IsCurrent());
    474     LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";
    475     ASSERT(session_ != NULL);
    476     ASSERT(channel_ != NULL);
    477     session_->DestroyChannel(content_name_, channel_->name());
    478 
    479   } else if (pmsg->message_id == MSG_SI_DESTROY) {
    480 
    481     ASSERT(signal_thread_->IsCurrent());
    482     LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";
    483     // The message queue is empty, so it is safe to destroy ourselves.
    484     delete this;
    485 
    486   } else {
    487     ASSERT(false);
    488   }
    489 }
    490 
    491 IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(
    492     PseudoTcp* tcp, const char* buffer, size_t len) {
    493   ASSERT(cs_.CurrentThreadIsOwner());
    494   ASSERT(tcp == tcp_);
    495   ASSERT(NULL != channel_);
    496   int sent = channel_->SendPacket(buffer, len);
    497   if (sent > 0) {
    498     //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";
    499     return IPseudoTcpNotify::WR_SUCCESS;
    500   } else if (IsBlockingError(channel_->GetError())) {
    501     LOG_F(LS_VERBOSE) << "Blocking";
    502     return IPseudoTcpNotify::WR_SUCCESS;
    503   } else if (channel_->GetError() == EMSGSIZE) {
    504     LOG_F(LS_ERROR) << "EMSGSIZE";
    505     return IPseudoTcpNotify::WR_TOO_LARGE;
    506   } else {
    507     PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";
    508     ASSERT(false);
    509     return IPseudoTcpNotify::WR_FAIL;
    510   }
    511 }
    512 
    513 void PseudoTcpChannel::AdjustClock(bool clear) {
    514   ASSERT(cs_.CurrentThreadIsOwner());
    515   ASSERT(NULL != tcp_);
    516 
    517   long timeout = 0;
    518   if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {
    519     ASSERT(NULL != channel_);
    520     // Reset the next clock, by clearing the old and setting a new one.
    521     if (clear)
    522       worker_thread_->Clear(this, MSG_WK_CLOCK);
    523     worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);
    524     return;
    525   }
    526 
    527   delete tcp_;
    528   tcp_ = NULL;
    529   ready_to_connect_ = false;
    530 
    531   if (channel_) {
    532     // If TCP has failed, no need for channel_ anymore
    533     signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);
    534   }
    535 }
    536 
    537 void PseudoTcpChannel::CheckDestroy() {
    538   ASSERT(cs_.CurrentThreadIsOwner());
    539   if ((worker_thread_ != NULL) || (stream_ != NULL))
    540     return;
    541   signal_thread_->Post(this, MSG_SI_DESTROY);
    542 }
    543 
    544 ///////////////////////////////////////////////////////////////////////////////
    545 // PseudoTcpChannel::InternalStream
    546 ///////////////////////////////////////////////////////////////////////////////
    547 
    548 PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)
    549   : parent_(parent) {
    550 }
    551 
    552 PseudoTcpChannel::InternalStream::~InternalStream() {
    553   Close();
    554 }
    555 
    556 StreamState PseudoTcpChannel::InternalStream::GetState() const {
    557   if (!parent_)
    558     return SS_CLOSED;
    559   return parent_->GetState();
    560 }
    561 
    562 StreamResult PseudoTcpChannel::InternalStream::Read(
    563     void* buffer, size_t buffer_len, size_t* read, int* error) {
    564   if (!parent_) {
    565     if (error)
    566       *error = ENOTCONN;
    567     return SR_ERROR;
    568   }
    569   return parent_->Read(buffer, buffer_len, read, error);
    570 }
    571 
    572 StreamResult PseudoTcpChannel::InternalStream::Write(
    573     const void* data, size_t data_len,  size_t* written, int* error) {
    574   if (!parent_) {
    575     if (error)
    576       *error = ENOTCONN;
    577     return SR_ERROR;
    578   }
    579   return parent_->Write(data, data_len, written, error);
    580 }
    581 
    582 void PseudoTcpChannel::InternalStream::Close() {
    583   if (!parent_)
    584     return;
    585   parent_->Close();
    586   parent_ = NULL;
    587 }
    588 
    589 ///////////////////////////////////////////////////////////////////////////////
    590 
    591 } // namespace cricket
    592