Home | History | Annotate | Download | only in webrtc
      1 /*
      2  * libjingle
      3  * Copyright 2012, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 #include "talk/app/webrtc/datachannel.h"
     28 
     29 #include <string>
     30 
     31 #include "talk/app/webrtc/webrtcsession.h"
     32 #include "talk/base/logging.h"
     33 #include "talk/base/refcount.h"
     34 
     35 namespace webrtc {
     36 
     37 static size_t kMaxQueuedReceivedDataPackets = 100;
     38 static size_t kMaxQueuedSendDataPackets = 100;
     39 
     40 talk_base::scoped_refptr<DataChannel> DataChannel::Create(
     41     WebRtcSession* session,
     42     const std::string& label,
     43     const DataChannelInit* config) {
     44   talk_base::scoped_refptr<DataChannel> channel(
     45       new talk_base::RefCountedObject<DataChannel>(session, label));
     46   if (!channel->Init(config)) {
     47     return NULL;
     48   }
     49   return channel;
     50 }
     51 
     52 DataChannel::DataChannel(WebRtcSession* session, const std::string& label)
     53     : label_(label),
     54       observer_(NULL),
     55       state_(kConnecting),
     56       was_ever_writable_(false),
     57       session_(session),
     58       data_session_(NULL),
     59       send_ssrc_set_(false),
     60       send_ssrc_(0),
     61       receive_ssrc_set_(false),
     62       receive_ssrc_(0) {
     63 }
     64 
     65 bool DataChannel::Init(const DataChannelInit* config) {
     66   if (config) {
     67     if (session_->data_channel_type() == cricket::DCT_RTP &&
     68         (config->reliable ||
     69          config->id != -1 ||
     70          config->maxRetransmits != -1 ||
     71          config->maxRetransmitTime != -1)) {
     72       LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
     73                     << "invalid DataChannelInit.";
     74       return false;
     75     } else if (session_->data_channel_type() == cricket::DCT_SCTP) {
     76       if (config->id < -1 ||
     77           config->maxRetransmits < -1 ||
     78           config->maxRetransmitTime < -1) {
     79         LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
     80                       << "invalid DataChannelInit.";
     81         return false;
     82       }
     83       if (config->maxRetransmits != -1 && config->maxRetransmitTime != -1) {
     84         LOG(LS_ERROR) <<
     85             "maxRetransmits and maxRetransmitTime should not be both set.";
     86         return false;
     87       }
     88     }
     89     config_ = *config;
     90   }
     91   return true;
     92 }
     93 
     94 bool DataChannel::HasNegotiationCompleted() {
     95   return send_ssrc_set_ == receive_ssrc_set_;
     96 }
     97 
     98 DataChannel::~DataChannel() {
     99   ClearQueuedReceivedData();
    100   ClearQueuedSendData();
    101 }
    102 
    103 void DataChannel::RegisterObserver(DataChannelObserver* observer) {
    104   observer_ = observer;
    105   DeliverQueuedReceivedData();
    106 }
    107 
    108 void DataChannel::UnregisterObserver() {
    109   observer_ = NULL;
    110 }
    111 
    112 bool DataChannel::reliable() const {
    113   if (session_->data_channel_type() == cricket::DCT_RTP) {
    114     return false;
    115   } else {
    116     return config_.maxRetransmits == -1 &&
    117            config_.maxRetransmitTime == -1;
    118   }
    119 }
    120 
    121 uint64 DataChannel::buffered_amount() const {
    122   uint64 buffered_amount = 0;
    123   for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
    124       it != queued_send_data_.end();
    125       ++it) {
    126     buffered_amount += (*it)->size();
    127   }
    128   return buffered_amount;
    129 }
    130 
    131 void DataChannel::Close() {
    132   if (state_ == kClosed)
    133     return;
    134   send_ssrc_ = 0;
    135   send_ssrc_set_ = false;
    136   SetState(kClosing);
    137   UpdateState();
    138 }
    139 
    140 bool DataChannel::Send(const DataBuffer& buffer) {
    141   if (state_ != kOpen) {
    142     return false;
    143   }
    144   // If the queue is non-empty, we're waiting for SignalReadyToSend,
    145   // so just add to the end of the queue and keep waiting.
    146   if (!queued_send_data_.empty()) {
    147     return QueueSendData(buffer);
    148   }
    149 
    150   cricket::SendDataResult send_result;
    151   if (!InternalSendWithoutQueueing(buffer, &send_result)) {
    152     if (send_result == cricket::SDR_BLOCK) {
    153       return QueueSendData(buffer);
    154     }
    155     // Fail for other results.
    156     // TODO(jiayl): We should close the data channel in this case.
    157     return false;
    158   }
    159   return true;
    160 }
    161 
    162 void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
    163   queued_control_data_.push(buffer);
    164 }
    165 
    166 bool DataChannel::SendControl(const talk_base::Buffer* buffer) {
    167   if (state_ != kOpen) {
    168     QueueControl(buffer);
    169     return true;
    170   }
    171   if (session_->data_channel_type() == cricket::DCT_RTP) {
    172     delete buffer;
    173     return false;
    174   }
    175 
    176   cricket::SendDataParams send_params;
    177   send_params.ssrc = config_.id;
    178   send_params.ordered = true;
    179   send_params.type = cricket::DMT_CONTROL;
    180 
    181   cricket::SendDataResult send_result;
    182   bool retval = session_->data_channel()->SendData(
    183       send_params, *buffer, &send_result);
    184   if (!retval && send_result == cricket::SDR_BLOCK) {
    185     // Link is congested.  Queue for later.
    186     QueueControl(buffer);
    187   } else {
    188     delete buffer;
    189   }
    190   return retval;
    191 }
    192 
    193 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
    194   if (receive_ssrc_set_) {
    195     ASSERT(session_->data_channel_type() == cricket::DCT_RTP ||
    196         receive_ssrc_ == send_ssrc_);
    197     return;
    198   }
    199   receive_ssrc_ = receive_ssrc;
    200   receive_ssrc_set_ = true;
    201   UpdateState();
    202 }
    203 
    204 // The remote peer request that this channel shall be closed.
    205 void DataChannel::RemotePeerRequestClose() {
    206   DoClose();
    207 }
    208 
    209 void DataChannel::SetSendSsrc(uint32 send_ssrc) {
    210   if (send_ssrc_set_) {
    211     ASSERT(session_->data_channel_type() == cricket::DCT_RTP ||
    212         receive_ssrc_ == send_ssrc_);
    213     return;
    214   }
    215   send_ssrc_ = send_ssrc;
    216   send_ssrc_set_ = true;
    217   UpdateState();
    218 }
    219 
    220 // The underlaying data engine is closing.
    221 // This function make sure the DataChannel is disconneced and change state to
    222 // kClosed.
    223 void DataChannel::OnDataEngineClose() {
    224   DoClose();
    225 }
    226 
    227 void DataChannel::OnDataReceived(cricket::DataChannel* channel,
    228                                  const cricket::ReceiveDataParams& params,
    229                                  const talk_base::Buffer& payload) {
    230   if (params.ssrc != receive_ssrc_) {
    231     return;
    232   }
    233 
    234   bool binary = (params.type == cricket::DMT_BINARY);
    235   talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
    236   if (was_ever_writable_ && observer_) {
    237     observer_->OnMessage(*buffer.get());
    238   } else {
    239     if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
    240       // TODO(jiayl): We should close the data channel in this case.
    241       LOG(LS_ERROR)
    242           << "Queued received data exceeds the max number of packes.";
    243       ClearQueuedReceivedData();
    244     }
    245     queued_received_data_.push(buffer.release());
    246   }
    247 }
    248 
    249 void DataChannel::OnChannelReady(bool writable) {
    250   if (!writable) {
    251     return;
    252   }
    253   // Update the readyState if the channel is writable for the first time;
    254   // otherwise it means the channel was blocked for sending and now unblocked,
    255   // so send the queued data now.
    256   if (!was_ever_writable_) {
    257     was_ever_writable_ = true;
    258     UpdateState();
    259   } else if (state_ == kOpen) {
    260     SendQueuedSendData();
    261   }
    262 }
    263 
    264 void DataChannel::DoClose() {
    265   receive_ssrc_set_ = false;
    266   send_ssrc_set_ = false;
    267   SetState(kClosing);
    268   UpdateState();
    269 }
    270 
    271 void DataChannel::UpdateState() {
    272   switch (state_) {
    273     case kConnecting: {
    274       if (HasNegotiationCompleted()) {
    275         if (!IsConnectedToDataSession()) {
    276           ConnectToDataSession();
    277         }
    278         if (was_ever_writable_) {
    279           SetState(kOpen);
    280           // If we have received buffers before the channel got writable.
    281           // Deliver them now.
    282           DeliverQueuedReceivedData();
    283         }
    284       }
    285       break;
    286     }
    287     case kOpen: {
    288       break;
    289     }
    290     case kClosing: {
    291       if (IsConnectedToDataSession()) {
    292         DisconnectFromDataSession();
    293       }
    294       if (HasNegotiationCompleted()) {
    295         SetState(kClosed);
    296       }
    297       break;
    298     }
    299     case kClosed:
    300       break;
    301   }
    302 }
    303 
    304 void DataChannel::SetState(DataState state) {
    305   state_ = state;
    306   if (observer_) {
    307     observer_->OnStateChange();
    308   }
    309 }
    310 
    311 void DataChannel::ConnectToDataSession() {
    312   if (!session_->data_channel()) {
    313     LOG(LS_ERROR) << "The DataEngine does not exist.";
    314     ASSERT(session_->data_channel() != NULL);
    315     return;
    316   }
    317 
    318   data_session_ = session_->data_channel();
    319   data_session_->SignalReadyToSendData.connect(this,
    320                                                &DataChannel::OnChannelReady);
    321   data_session_->SignalDataReceived.connect(this, &DataChannel::OnDataReceived);
    322   cricket::StreamParams params =
    323     cricket::StreamParams::CreateLegacy(id());
    324   data_session_->media_channel()->AddSendStream(params);
    325   data_session_->media_channel()->AddRecvStream(params);
    326 }
    327 
    328 void DataChannel::DisconnectFromDataSession() {
    329   if (data_session_->media_channel() != NULL) {
    330     data_session_->media_channel()->RemoveSendStream(id());
    331     data_session_->media_channel()->RemoveRecvStream(id());
    332   }
    333   data_session_->SignalReadyToSendData.disconnect(this);
    334   data_session_->SignalDataReceived.disconnect(this);
    335   data_session_ = NULL;
    336 }
    337 
    338 void DataChannel::DeliverQueuedReceivedData() {
    339   if (!was_ever_writable_ || !observer_) {
    340     return;
    341   }
    342 
    343   while (!queued_received_data_.empty()) {
    344     DataBuffer* buffer = queued_received_data_.front();
    345     observer_->OnMessage(*buffer);
    346     queued_received_data_.pop();
    347     delete buffer;
    348   }
    349 }
    350 
    351 void DataChannel::ClearQueuedReceivedData() {
    352   while (!queued_received_data_.empty()) {
    353     DataBuffer* buffer = queued_received_data_.front();
    354     queued_received_data_.pop();
    355     delete buffer;
    356   }
    357 }
    358 
    359 void DataChannel::SendQueuedSendData() {
    360   DeliverQueuedControlData();
    361   if (!was_ever_writable_) {
    362     return;
    363   }
    364 
    365   while (!queued_send_data_.empty()) {
    366     DataBuffer* buffer = queued_send_data_.front();
    367     cricket::SendDataResult send_result;
    368     if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
    369       LOG(LS_WARNING) << "SendQueuedSendData aborted due to send_result "
    370                       << send_result;
    371       break;
    372     }
    373     queued_send_data_.pop_front();
    374     delete buffer;
    375   }
    376 }
    377 
    378 void DataChannel::DeliverQueuedControlData() {
    379   if (was_ever_writable_) {
    380     while (!queued_control_data_.empty()) {
    381       const talk_base::Buffer *buf = queued_control_data_.front();
    382       queued_control_data_.pop();
    383       SendControl(buf);
    384     }
    385   }
    386 }
    387 
    388 void DataChannel::ClearQueuedSendData() {
    389   while (!queued_send_data_.empty()) {
    390     DataBuffer* buffer = queued_send_data_.front();
    391     queued_send_data_.pop_front();
    392     delete buffer;
    393   }
    394 }
    395 
    396 bool DataChannel::InternalSendWithoutQueueing(
    397     const DataBuffer& buffer, cricket::SendDataResult* send_result) {
    398   cricket::SendDataParams send_params;
    399 
    400   send_params.ssrc = send_ssrc_;
    401   if (session_->data_channel_type() == cricket::DCT_SCTP) {
    402     send_params.ordered = config_.ordered;
    403     send_params.max_rtx_count = config_.maxRetransmits;
    404     send_params.max_rtx_ms = config_.maxRetransmitTime;
    405   }
    406   send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
    407 
    408   return session_->data_channel()->SendData(send_params, buffer.data,
    409                                             send_result);
    410 }
    411 
    412 bool DataChannel::QueueSendData(const DataBuffer& buffer) {
    413   if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
    414     LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
    415     return false;
    416   }
    417   queued_send_data_.push_back(new DataBuffer(buffer));
    418   return true;
    419 }
    420 
    421 }  // namespace webrtc
    422