Home | History | Annotate | Download | only in webrtc
      1 /*
      2  * libjingle
      3  * Copyright 2012, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 #include "talk/app/webrtc/datachannel.h"
     28 
     29 #include <string>
     30 
     31 #include "talk/app/webrtc/mediastreamprovider.h"
     32 #include "talk/base/logging.h"
     33 #include "talk/base/refcount.h"
     34 #include "talk/media/sctp/sctputils.h"
     35 
     36 namespace webrtc {
     37 
     38 static size_t kMaxQueuedReceivedDataPackets = 100;
     39 static size_t kMaxQueuedSendDataPackets = 100;
     40 
     41 enum {
     42   MSG_CHANNELREADY,
     43 };
     44 
     45 talk_base::scoped_refptr<DataChannel> DataChannel::Create(
     46     DataChannelProviderInterface* provider,
     47     cricket::DataChannelType dct,
     48     const std::string& label,
     49     const DataChannelInit* config) {
     50   talk_base::scoped_refptr<DataChannel> channel(
     51       new talk_base::RefCountedObject<DataChannel>(provider, dct, label));
     52   if (!channel->Init(config)) {
     53     return NULL;
     54   }
     55   return channel;
     56 }
     57 
     58 DataChannel::DataChannel(
     59     DataChannelProviderInterface* provider,
     60     cricket::DataChannelType dct,
     61     const std::string& label)
     62     : label_(label),
     63       observer_(NULL),
     64       state_(kConnecting),
     65       was_ever_writable_(false),
     66       connected_to_provider_(false),
     67       data_channel_type_(dct),
     68       provider_(provider),
     69       send_ssrc_set_(false),
     70       send_ssrc_(0),
     71       receive_ssrc_set_(false),
     72       receive_ssrc_(0) {
     73 }
     74 
     75 bool DataChannel::Init(const DataChannelInit* config) {
     76   if (data_channel_type_ == cricket::DCT_RTP &&
     77       (config->reliable ||
     78        config->id != -1 ||
     79        config->maxRetransmits != -1 ||
     80        config->maxRetransmitTime != -1)) {
     81     LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
     82                   << "invalid DataChannelInit.";
     83     return false;
     84   } else if (data_channel_type_ == cricket::DCT_SCTP) {
     85     if (config->id < -1 ||
     86         config->maxRetransmits < -1 ||
     87         config->maxRetransmitTime < -1) {
     88       LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
     89                     << "invalid DataChannelInit.";
     90       return false;
     91     }
     92     if (config->maxRetransmits != -1 && config->maxRetransmitTime != -1) {
     93       LOG(LS_ERROR) <<
     94           "maxRetransmits and maxRetransmitTime should not be both set.";
     95       return false;
     96     }
     97     config_ = *config;
     98 
     99     // Try to connect to the transport in case the transport channel already
    100     // exists.
    101     OnTransportChannelCreated();
    102 
    103     // Checks if the transport is ready to send because the initial channel
    104     // ready signal may have been sent before the DataChannel creation.
    105     // This has to be done async because the upper layer objects (e.g.
    106     // Chrome glue and WebKit) are not wired up properly until after this
    107     // function returns.
    108     if (provider_->ReadyToSendData()) {
    109       talk_base::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL);
    110     }
    111   }
    112 
    113   return true;
    114 }
    115 
    116 DataChannel::~DataChannel() {
    117   ClearQueuedReceivedData();
    118   ClearQueuedSendData();
    119   ClearQueuedControlData();
    120 }
    121 
    122 void DataChannel::RegisterObserver(DataChannelObserver* observer) {
    123   observer_ = observer;
    124   DeliverQueuedReceivedData();
    125 }
    126 
    127 void DataChannel::UnregisterObserver() {
    128   observer_ = NULL;
    129 }
    130 
    131 bool DataChannel::reliable() const {
    132   if (data_channel_type_ == cricket::DCT_RTP) {
    133     return false;
    134   } else {
    135     return config_.maxRetransmits == -1 &&
    136            config_.maxRetransmitTime == -1;
    137   }
    138 }
    139 
    140 uint64 DataChannel::buffered_amount() const {
    141   uint64 buffered_amount = 0;
    142   for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
    143       it != queued_send_data_.end();
    144       ++it) {
    145     buffered_amount += (*it)->size();
    146   }
    147   return buffered_amount;
    148 }
    149 
    150 void DataChannel::Close() {
    151   if (state_ == kClosed)
    152     return;
    153   send_ssrc_ = 0;
    154   send_ssrc_set_ = false;
    155   SetState(kClosing);
    156   UpdateState();
    157 }
    158 
    159 bool DataChannel::Send(const DataBuffer& buffer) {
    160   if (state_ != kOpen) {
    161     return false;
    162   }
    163   // If the queue is non-empty, we're waiting for SignalReadyToSend,
    164   // so just add to the end of the queue and keep waiting.
    165   if (!queued_send_data_.empty()) {
    166     return QueueSendData(buffer);
    167   }
    168 
    169   cricket::SendDataResult send_result;
    170   if (!InternalSendWithoutQueueing(buffer, &send_result)) {
    171     if (send_result == cricket::SDR_BLOCK) {
    172       return QueueSendData(buffer);
    173     }
    174     // Fail for other results.
    175     // TODO(jiayl): We should close the data channel in this case.
    176     return false;
    177   }
    178   return true;
    179 }
    180 
    181 void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
    182   queued_control_data_.push(buffer);
    183 }
    184 
    185 bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) {
    186   ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
    187          was_ever_writable_ &&
    188          config_.id >= 0 &&
    189          !config_.negotiated);
    190 
    191   talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
    192 
    193   cricket::SendDataParams send_params;
    194   send_params.ssrc = config_.id;
    195   send_params.ordered = true;
    196   send_params.type = cricket::DMT_CONTROL;
    197 
    198   cricket::SendDataResult send_result;
    199   bool retval = provider_->SendData(send_params, *buffer, &send_result);
    200   if (!retval && send_result == cricket::SDR_BLOCK) {
    201     // Link is congested.  Queue for later.
    202     QueueControl(buffer.release());
    203   }
    204   return retval;
    205 }
    206 
    207 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
    208   ASSERT(data_channel_type_ == cricket::DCT_RTP);
    209 
    210   if (receive_ssrc_set_) {
    211     return;
    212   }
    213   receive_ssrc_ = receive_ssrc;
    214   receive_ssrc_set_ = true;
    215   UpdateState();
    216 }
    217 
    218 // The remote peer request that this channel shall be closed.
    219 void DataChannel::RemotePeerRequestClose() {
    220   DoClose();
    221 }
    222 
    223 void DataChannel::SetSendSsrc(uint32 send_ssrc) {
    224   ASSERT(data_channel_type_ == cricket::DCT_RTP);
    225   if (send_ssrc_set_) {
    226     return;
    227   }
    228   send_ssrc_ = send_ssrc;
    229   send_ssrc_set_ = true;
    230   UpdateState();
    231 }
    232 
    233 void DataChannel::OnMessage(talk_base::Message* msg) {
    234   switch (msg->message_id) {
    235     case MSG_CHANNELREADY:
    236       OnChannelReady(true);
    237       break;
    238   }
    239 }
    240 
    241 // The underlaying data engine is closing.
    242 // This function makes sure the DataChannel is disconnected and changes state to
    243 // kClosed.
    244 void DataChannel::OnDataEngineClose() {
    245   DoClose();
    246 }
    247 
    248 void DataChannel::OnDataReceived(cricket::DataChannel* channel,
    249                                  const cricket::ReceiveDataParams& params,
    250                                  const talk_base::Buffer& payload) {
    251   uint32 expected_ssrc =
    252       (data_channel_type_ == cricket::DCT_RTP) ? receive_ssrc_ : config_.id;
    253   if (params.ssrc != expected_ssrc) {
    254     return;
    255   }
    256 
    257   bool binary = (params.type == cricket::DMT_BINARY);
    258   talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
    259   if (was_ever_writable_ && observer_) {
    260     observer_->OnMessage(*buffer.get());
    261   } else {
    262     if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
    263       // TODO(jiayl): We should close the data channel in this case.
    264       LOG(LS_ERROR)
    265           << "Queued received data exceeds the max number of packes.";
    266       ClearQueuedReceivedData();
    267     }
    268     queued_received_data_.push(buffer.release());
    269   }
    270 }
    271 
    272 void DataChannel::OnChannelReady(bool writable) {
    273   if (!writable) {
    274     return;
    275   }
    276   // Update the readyState and send the queued control message if the channel
    277   // is writable for the first time; otherwise it means the channel was blocked
    278   // for sending and now unblocked, so send the queued data now.
    279   if (!was_ever_writable_) {
    280     was_ever_writable_ = true;
    281 
    282     if (data_channel_type_ == cricket::DCT_SCTP && !config_.negotiated) {
    283       talk_base::Buffer* payload = new talk_base::Buffer;
    284       if (!cricket::WriteDataChannelOpenMessage(label_, config_, payload)) {
    285         // TODO(jiayl): close the data channel on this error.
    286         LOG(LS_ERROR) << "Could not write data channel OPEN message";
    287         return;
    288       }
    289       SendOpenMessage(payload);
    290     }
    291 
    292     UpdateState();
    293     ASSERT(queued_send_data_.empty());
    294   } else if (state_ == kOpen) {
    295     DeliverQueuedSendData();
    296   }
    297 }
    298 
    299 void DataChannel::DoClose() {
    300   receive_ssrc_set_ = false;
    301   send_ssrc_set_ = false;
    302   SetState(kClosing);
    303   UpdateState();
    304 }
    305 
    306 void DataChannel::UpdateState() {
    307   switch (state_) {
    308     case kConnecting: {
    309       if (send_ssrc_set_ == receive_ssrc_set_) {
    310         if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) {
    311           connected_to_provider_ = provider_->ConnectDataChannel(this);
    312         }
    313         if (was_ever_writable_) {
    314           // TODO(jiayl): Do not transition to kOpen if we failed to send the
    315           // OPEN message.
    316           DeliverQueuedControlData();
    317           SetState(kOpen);
    318           // If we have received buffers before the channel got writable.
    319           // Deliver them now.
    320           DeliverQueuedReceivedData();
    321         }
    322       }
    323       break;
    324     }
    325     case kOpen: {
    326       break;
    327     }
    328     case kClosing: {
    329       DisconnectFromTransport();
    330 
    331       if (!send_ssrc_set_ && !receive_ssrc_set_) {
    332         SetState(kClosed);
    333       }
    334       break;
    335     }
    336     case kClosed:
    337       break;
    338   }
    339 }
    340 
    341 void DataChannel::SetState(DataState state) {
    342   state_ = state;
    343   if (observer_) {
    344     observer_->OnStateChange();
    345   }
    346 }
    347 
    348 void DataChannel::DisconnectFromTransport() {
    349   if (!connected_to_provider_)
    350     return;
    351 
    352   provider_->DisconnectDataChannel(this);
    353   connected_to_provider_ = false;
    354 
    355   if (data_channel_type_ == cricket::DCT_SCTP) {
    356     provider_->RemoveSctpDataStream(config_.id);
    357   }
    358 }
    359 
    360 void DataChannel::DeliverQueuedReceivedData() {
    361   if (!was_ever_writable_ || !observer_) {
    362     return;
    363   }
    364 
    365   while (!queued_received_data_.empty()) {
    366     DataBuffer* buffer = queued_received_data_.front();
    367     observer_->OnMessage(*buffer);
    368     queued_received_data_.pop();
    369     delete buffer;
    370   }
    371 }
    372 
    373 void DataChannel::ClearQueuedReceivedData() {
    374   while (!queued_received_data_.empty()) {
    375     DataBuffer* buffer = queued_received_data_.front();
    376     queued_received_data_.pop();
    377     delete buffer;
    378   }
    379 }
    380 
    381 void DataChannel::DeliverQueuedSendData() {
    382   ASSERT(was_ever_writable_ && state_ == kOpen);
    383 
    384   // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
    385   // that the readyState is open. According to the standard, the channel should
    386   // not become open before the OPEN message is sent.
    387   DeliverQueuedControlData();
    388 
    389   while (!queued_send_data_.empty()) {
    390     DataBuffer* buffer = queued_send_data_.front();
    391     cricket::SendDataResult send_result;
    392     if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
    393       LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result "
    394                       << send_result;
    395       break;
    396     }
    397     queued_send_data_.pop_front();
    398     delete buffer;
    399   }
    400 }
    401 
    402 void DataChannel::ClearQueuedControlData() {
    403   while (!queued_control_data_.empty()) {
    404     const talk_base::Buffer *buf = queued_control_data_.front();
    405     queued_control_data_.pop();
    406     delete buf;
    407   }
    408 }
    409 
    410 void DataChannel::DeliverQueuedControlData() {
    411   ASSERT(was_ever_writable_);
    412   while (!queued_control_data_.empty()) {
    413     const talk_base::Buffer* buf = queued_control_data_.front();
    414     queued_control_data_.pop();
    415     SendOpenMessage(buf);
    416   }
    417 }
    418 
    419 void DataChannel::ClearQueuedSendData() {
    420   while (!queued_send_data_.empty()) {
    421     DataBuffer* buffer = queued_send_data_.front();
    422     queued_send_data_.pop_front();
    423     delete buffer;
    424   }
    425 }
    426 
    427 bool DataChannel::InternalSendWithoutQueueing(
    428     const DataBuffer& buffer, cricket::SendDataResult* send_result) {
    429   cricket::SendDataParams send_params;
    430 
    431   if (data_channel_type_ == cricket::DCT_SCTP) {
    432     send_params.ordered = config_.ordered;
    433     send_params.max_rtx_count = config_.maxRetransmits;
    434     send_params.max_rtx_ms = config_.maxRetransmitTime;
    435     send_params.ssrc = config_.id;
    436   } else {
    437     send_params.ssrc = send_ssrc_;
    438   }
    439   send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
    440 
    441   return provider_->SendData(send_params, buffer.data, send_result);
    442 }
    443 
    444 bool DataChannel::QueueSendData(const DataBuffer& buffer) {
    445   if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
    446     LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
    447     return false;
    448   }
    449   queued_send_data_.push_back(new DataBuffer(buffer));
    450   return true;
    451 }
    452 
    453 void DataChannel::SetSctpSid(int sid) {
    454   ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
    455   config_.id = sid;
    456   provider_->AddSctpDataStream(sid);
    457 }
    458 
    459 void DataChannel::OnTransportChannelCreated() {
    460   ASSERT(data_channel_type_ == cricket::DCT_SCTP);
    461   if (!connected_to_provider_) {
    462     connected_to_provider_ = provider_->ConnectDataChannel(this);
    463   }
    464   // The sid may have been unassigned when provider_->ConnectDataChannel was
    465   // done. So always add the streams even if connected_to_provider_ is true.
    466   if (config_.id >= 0) {
    467     provider_->AddSctpDataStream(config_.id);
    468   }
    469 }
    470 
    471 }  // namespace webrtc
    472