Home | History | Annotate | Download | only in webrtc
      1 /*
      2  * libjingle
      3  * Copyright 2012, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 #include "talk/app/webrtc/datachannel.h"
     28 
     29 #include <string>
     30 
     31 #include "talk/app/webrtc/mediastreamprovider.h"
     32 #include "talk/app/webrtc/sctputils.h"
     33 #include "talk/base/logging.h"
     34 #include "talk/base/refcount.h"
     35 
     36 namespace webrtc {
     37 
     38 static size_t kMaxQueuedReceivedDataPackets = 100;
     39 static size_t kMaxQueuedSendDataPackets = 100;
     40 
     41 enum {
     42   MSG_CHANNELREADY,
     43 };
     44 
     45 talk_base::scoped_refptr<DataChannel> DataChannel::Create(
     46     DataChannelProviderInterface* provider,
     47     cricket::DataChannelType dct,
     48     const std::string& label,
     49     const InternalDataChannelInit& config) {
     50   talk_base::scoped_refptr<DataChannel> channel(
     51       new talk_base::RefCountedObject<DataChannel>(provider, dct, label));
     52   if (!channel->Init(config)) {
     53     return NULL;
     54   }
     55   return channel;
     56 }
     57 
     58 DataChannel::DataChannel(
     59     DataChannelProviderInterface* provider,
     60     cricket::DataChannelType dct,
     61     const std::string& label)
     62     : label_(label),
     63       observer_(NULL),
     64       state_(kConnecting),
     65       data_channel_type_(dct),
     66       provider_(provider),
     67       waiting_for_open_ack_(false),
     68       was_ever_writable_(false),
     69       connected_to_provider_(false),
     70       send_ssrc_set_(false),
     71       receive_ssrc_set_(false),
     72       send_ssrc_(0),
     73       receive_ssrc_(0) {
     74 }
     75 
     76 bool DataChannel::Init(const InternalDataChannelInit& config) {
     77   if (data_channel_type_ == cricket::DCT_RTP &&
     78       (config.reliable ||
     79        config.id != -1 ||
     80        config.maxRetransmits != -1 ||
     81        config.maxRetransmitTime != -1)) {
     82     LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
     83                   << "invalid DataChannelInit.";
     84     return false;
     85   } else if (data_channel_type_ == cricket::DCT_SCTP) {
     86     if (config.id < -1 ||
     87         config.maxRetransmits < -1 ||
     88         config.maxRetransmitTime < -1) {
     89       LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
     90                     << "invalid DataChannelInit.";
     91       return false;
     92     }
     93     if (config.maxRetransmits != -1 && config.maxRetransmitTime != -1) {
     94       LOG(LS_ERROR) <<
     95           "maxRetransmits and maxRetransmitTime should not be both set.";
     96       return false;
     97     }
     98     config_ = config;
     99 
    100     // Try to connect to the transport in case the transport channel already
    101     // exists.
    102     OnTransportChannelCreated();
    103 
    104     // Checks if the transport is ready to send because the initial channel
    105     // ready signal may have been sent before the DataChannel creation.
    106     // This has to be done async because the upper layer objects (e.g.
    107     // Chrome glue and WebKit) are not wired up properly until after this
    108     // function returns.
    109     if (provider_->ReadyToSendData()) {
    110       talk_base::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL);
    111     }
    112   }
    113 
    114   return true;
    115 }
    116 
    117 DataChannel::~DataChannel() {
    118   ClearQueuedReceivedData();
    119   ClearQueuedSendData();
    120   ClearQueuedControlData();
    121 }
    122 
    123 void DataChannel::RegisterObserver(DataChannelObserver* observer) {
    124   observer_ = observer;
    125   DeliverQueuedReceivedData();
    126 }
    127 
    128 void DataChannel::UnregisterObserver() {
    129   observer_ = NULL;
    130 }
    131 
    132 bool DataChannel::reliable() const {
    133   if (data_channel_type_ == cricket::DCT_RTP) {
    134     return false;
    135   } else {
    136     return config_.maxRetransmits == -1 &&
    137            config_.maxRetransmitTime == -1;
    138   }
    139 }
    140 
    141 uint64 DataChannel::buffered_amount() const {
    142   uint64 buffered_amount = 0;
    143   for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
    144       it != queued_send_data_.end();
    145       ++it) {
    146     buffered_amount += (*it)->size();
    147   }
    148   return buffered_amount;
    149 }
    150 
    151 void DataChannel::Close() {
    152   if (state_ == kClosed)
    153     return;
    154   send_ssrc_ = 0;
    155   send_ssrc_set_ = false;
    156   SetState(kClosing);
    157   UpdateState();
    158 }
    159 
    160 bool DataChannel::Send(const DataBuffer& buffer) {
    161   if (state_ != kOpen) {
    162     return false;
    163   }
    164   // If the queue is non-empty, we're waiting for SignalReadyToSend,
    165   // so just add to the end of the queue and keep waiting.
    166   if (!queued_send_data_.empty()) {
    167     if (!QueueSendData(buffer)) {
    168       if (data_channel_type_ == cricket::DCT_RTP) {
    169         return false;
    170       }
    171       Close();
    172     }
    173     return true;
    174   }
    175 
    176   cricket::SendDataResult send_result;
    177   if (!InternalSendWithoutQueueing(buffer, &send_result)) {
    178     if (data_channel_type_ == cricket::DCT_RTP) {
    179       return false;
    180     }
    181     if (send_result != cricket::SDR_BLOCK || !QueueSendData(buffer)) {
    182       Close();
    183     }
    184   }
    185   return true;
    186 }
    187 
    188 void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
    189   queued_control_data_.push(buffer);
    190 }
    191 
    192 bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) {
    193   ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
    194          was_ever_writable_ &&
    195          config_.id >= 0 &&
    196          !config_.negotiated);
    197 
    198   talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
    199 
    200   cricket::SendDataParams send_params;
    201   send_params.ssrc = config_.id;
    202   send_params.ordered = true;
    203   send_params.type = cricket::DMT_CONTROL;
    204 
    205   cricket::SendDataResult send_result;
    206   bool retval = provider_->SendData(send_params, *buffer, &send_result);
    207   if (retval) {
    208     LOG(LS_INFO) << "Sent OPEN message on channel " << config_.id;
    209     // Send data as ordered before we receive any mesage from the remote peer
    210     // to make sure the remote peer will not receive any data before it receives
    211     // the OPEN message.
    212     waiting_for_open_ack_ = true;
    213   } else if (send_result == cricket::SDR_BLOCK) {
    214     // Link is congested.  Queue for later.
    215     QueueControl(buffer.release());
    216   } else {
    217     LOG(LS_ERROR) << "Failed to send OPEN message with result "
    218                   << send_result << " on channel " << config_.id;
    219   }
    220   return retval;
    221 }
    222 
    223 bool DataChannel::SendOpenAckMessage(const talk_base::Buffer* raw_buffer) {
    224   ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
    225          was_ever_writable_ &&
    226          config_.id >= 0);
    227 
    228   talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
    229 
    230   cricket::SendDataParams send_params;
    231   send_params.ssrc = config_.id;
    232   send_params.ordered = config_.ordered;
    233   send_params.type = cricket::DMT_CONTROL;
    234 
    235   cricket::SendDataResult send_result;
    236   bool retval = provider_->SendData(send_params, *buffer, &send_result);
    237   if (retval) {
    238     LOG(LS_INFO) << "Sent OPEN_ACK message on channel " << config_.id;
    239   } else if (send_result == cricket::SDR_BLOCK) {
    240     // Link is congested.  Queue for later.
    241     QueueControl(buffer.release());
    242   } else {
    243     LOG(LS_ERROR) << "Failed to send OPEN_ACK message with result "
    244                   << send_result << " on channel " << config_.id;
    245   }
    246   return retval;
    247 }
    248 
    249 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
    250   ASSERT(data_channel_type_ == cricket::DCT_RTP);
    251 
    252   if (receive_ssrc_set_) {
    253     return;
    254   }
    255   receive_ssrc_ = receive_ssrc;
    256   receive_ssrc_set_ = true;
    257   UpdateState();
    258 }
    259 
    260 // The remote peer request that this channel shall be closed.
    261 void DataChannel::RemotePeerRequestClose() {
    262   DoClose();
    263 }
    264 
    265 void DataChannel::SetSendSsrc(uint32 send_ssrc) {
    266   ASSERT(data_channel_type_ == cricket::DCT_RTP);
    267   if (send_ssrc_set_) {
    268     return;
    269   }
    270   send_ssrc_ = send_ssrc;
    271   send_ssrc_set_ = true;
    272   UpdateState();
    273 }
    274 
    275 void DataChannel::OnMessage(talk_base::Message* msg) {
    276   switch (msg->message_id) {
    277     case MSG_CHANNELREADY:
    278       OnChannelReady(true);
    279       break;
    280   }
    281 }
    282 
    283 // The underlaying data engine is closing.
    284 // This function makes sure the DataChannel is disconnected and changes state to
    285 // kClosed.
    286 void DataChannel::OnDataEngineClose() {
    287   DoClose();
    288 }
    289 
    290 void DataChannel::OnDataReceived(cricket::DataChannel* channel,
    291                                  const cricket::ReceiveDataParams& params,
    292                                  const talk_base::Buffer& payload) {
    293   uint32 expected_ssrc =
    294       (data_channel_type_ == cricket::DCT_RTP) ? receive_ssrc_ : config_.id;
    295   if (params.ssrc != expected_ssrc) {
    296     return;
    297   }
    298 
    299   if (params.type == cricket::DMT_CONTROL) {
    300     ASSERT(data_channel_type_ == cricket::DCT_SCTP);
    301     if (!waiting_for_open_ack_) {
    302       // Ignore it if we are not expecting an ACK message.
    303       LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, "
    304                       << "sid = " << params.ssrc;
    305       return;
    306     }
    307     if (ParseDataChannelOpenAckMessage(payload)) {
    308       // We can send unordered as soon as we receive the ACK message.
    309       waiting_for_open_ack_ = false;
    310       LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
    311                    << params.ssrc;
    312     } else {
    313       LOG(LS_WARNING) << "DataChannel failed to parse OPEN_ACK message, sid = "
    314                       << params.ssrc;
    315     }
    316     return;
    317   }
    318 
    319   ASSERT(params.type == cricket::DMT_BINARY ||
    320          params.type == cricket::DMT_TEXT);
    321 
    322   LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " << params.ssrc;
    323   // We can send unordered as soon as we receive any DATA message since the
    324   // remote side must have received the OPEN (and old clients do not send
    325   // OPEN_ACK).
    326   waiting_for_open_ack_ = false;
    327 
    328   bool binary = (params.type == cricket::DMT_BINARY);
    329   talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
    330   if (was_ever_writable_ && observer_) {
    331     observer_->OnMessage(*buffer.get());
    332   } else {
    333     if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
    334       LOG(LS_ERROR)
    335           << "Queued received data exceeds the max number of packets.";
    336       ClearQueuedReceivedData();
    337     }
    338     queued_received_data_.push(buffer.release());
    339   }
    340 }
    341 
    342 void DataChannel::OnChannelReady(bool writable) {
    343   if (!writable) {
    344     return;
    345   }
    346   // Update the readyState and send the queued control message if the channel
    347   // is writable for the first time; otherwise it means the channel was blocked
    348   // for sending and now unblocked, so send the queued data now.
    349   if (!was_ever_writable_) {
    350     was_ever_writable_ = true;
    351 
    352     if (data_channel_type_ == cricket::DCT_SCTP) {
    353       if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
    354         talk_base::Buffer* payload = new talk_base::Buffer;
    355         WriteDataChannelOpenMessage(label_, config_, payload);
    356         SendOpenMessage(payload);
    357       } else if (config_.open_handshake_role ==
    358                  InternalDataChannelInit::kAcker) {
    359         talk_base::Buffer* payload = new talk_base::Buffer;
    360         WriteDataChannelOpenAckMessage(payload);
    361         SendOpenAckMessage(payload);
    362       }
    363     }
    364 
    365     UpdateState();
    366     ASSERT(queued_send_data_.empty());
    367   } else if (state_ == kOpen) {
    368     DeliverQueuedSendData();
    369   }
    370 }
    371 
    372 void DataChannel::DoClose() {
    373   if (state_ == kClosed)
    374     return;
    375 
    376   receive_ssrc_set_ = false;
    377   send_ssrc_set_ = false;
    378   SetState(kClosing);
    379   UpdateState();
    380 }
    381 
    382 void DataChannel::UpdateState() {
    383   switch (state_) {
    384     case kConnecting: {
    385       if (send_ssrc_set_ == receive_ssrc_set_) {
    386         if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) {
    387           connected_to_provider_ = provider_->ConnectDataChannel(this);
    388         }
    389         if (was_ever_writable_) {
    390           // TODO(jiayl): Do not transition to kOpen if we failed to send the
    391           // OPEN message.
    392           DeliverQueuedControlData();
    393           SetState(kOpen);
    394           // If we have received buffers before the channel got writable.
    395           // Deliver them now.
    396           DeliverQueuedReceivedData();
    397         }
    398       }
    399       break;
    400     }
    401     case kOpen: {
    402       break;
    403     }
    404     case kClosing: {
    405       DisconnectFromTransport();
    406 
    407       if (!send_ssrc_set_ && !receive_ssrc_set_) {
    408         SetState(kClosed);
    409       }
    410       break;
    411     }
    412     case kClosed:
    413       break;
    414   }
    415 }
    416 
    417 void DataChannel::SetState(DataState state) {
    418   if (state_ == state)
    419     return;
    420 
    421   state_ = state;
    422   if (observer_) {
    423     observer_->OnStateChange();
    424   }
    425 }
    426 
    427 void DataChannel::DisconnectFromTransport() {
    428   if (!connected_to_provider_)
    429     return;
    430 
    431   provider_->DisconnectDataChannel(this);
    432   connected_to_provider_ = false;
    433 
    434   if (data_channel_type_ == cricket::DCT_SCTP) {
    435     provider_->RemoveSctpDataStream(config_.id);
    436   }
    437 }
    438 
    439 void DataChannel::DeliverQueuedReceivedData() {
    440   if (!was_ever_writable_ || !observer_) {
    441     return;
    442   }
    443 
    444   while (!queued_received_data_.empty()) {
    445     DataBuffer* buffer = queued_received_data_.front();
    446     observer_->OnMessage(*buffer);
    447     queued_received_data_.pop();
    448     delete buffer;
    449   }
    450 }
    451 
    452 void DataChannel::ClearQueuedReceivedData() {
    453   while (!queued_received_data_.empty()) {
    454     DataBuffer* buffer = queued_received_data_.front();
    455     queued_received_data_.pop();
    456     delete buffer;
    457   }
    458 }
    459 
    460 void DataChannel::DeliverQueuedSendData() {
    461   ASSERT(was_ever_writable_ && state_ == kOpen);
    462 
    463   // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
    464   // that the readyState is open. According to the standard, the channel should
    465   // not become open before the OPEN message is sent.
    466   DeliverQueuedControlData();
    467 
    468   while (!queued_send_data_.empty()) {
    469     DataBuffer* buffer = queued_send_data_.front();
    470     cricket::SendDataResult send_result;
    471     if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
    472       LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result "
    473                       << send_result;
    474       break;
    475     }
    476     queued_send_data_.pop_front();
    477     delete buffer;
    478   }
    479 }
    480 
    481 void DataChannel::ClearQueuedControlData() {
    482   while (!queued_control_data_.empty()) {
    483     const talk_base::Buffer *buf = queued_control_data_.front();
    484     queued_control_data_.pop();
    485     delete buf;
    486   }
    487 }
    488 
    489 void DataChannel::DeliverQueuedControlData() {
    490   ASSERT(was_ever_writable_);
    491   while (!queued_control_data_.empty()) {
    492     const talk_base::Buffer* buf = queued_control_data_.front();
    493     queued_control_data_.pop();
    494     if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
    495       SendOpenMessage(buf);
    496     } else {
    497       ASSERT(config_.open_handshake_role == InternalDataChannelInit::kAcker);
    498       SendOpenAckMessage(buf);
    499     }
    500   }
    501 }
    502 
    503 void DataChannel::ClearQueuedSendData() {
    504   while (!queued_send_data_.empty()) {
    505     DataBuffer* buffer = queued_send_data_.front();
    506     queued_send_data_.pop_front();
    507     delete buffer;
    508   }
    509 }
    510 
    511 bool DataChannel::InternalSendWithoutQueueing(
    512     const DataBuffer& buffer, cricket::SendDataResult* send_result) {
    513   cricket::SendDataParams send_params;
    514 
    515   if (data_channel_type_ == cricket::DCT_SCTP) {
    516     send_params.ordered = config_.ordered;
    517     // Send as ordered if it is waiting for the OPEN_ACK message.
    518     if (waiting_for_open_ack_ && !config_.ordered) {
    519       send_params.ordered = true;
    520       LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel "
    521                       << "because the OPEN_ACK message has not been received.";
    522     }
    523 
    524     send_params.max_rtx_count = config_.maxRetransmits;
    525     send_params.max_rtx_ms = config_.maxRetransmitTime;
    526     send_params.ssrc = config_.id;
    527   } else {
    528     send_params.ssrc = send_ssrc_;
    529   }
    530   send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
    531 
    532   return provider_->SendData(send_params, buffer.data, send_result);
    533 }
    534 
    535 bool DataChannel::QueueSendData(const DataBuffer& buffer) {
    536   if (queued_send_data_.size() >= kMaxQueuedSendDataPackets) {
    537     LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
    538     return false;
    539   }
    540   queued_send_data_.push_back(new DataBuffer(buffer));
    541   return true;
    542 }
    543 
    544 void DataChannel::SetSctpSid(int sid) {
    545   ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
    546   config_.id = sid;
    547   provider_->AddSctpDataStream(sid);
    548 }
    549 
    550 void DataChannel::OnTransportChannelCreated() {
    551   ASSERT(data_channel_type_ == cricket::DCT_SCTP);
    552   if (!connected_to_provider_) {
    553     connected_to_provider_ = provider_->ConnectDataChannel(this);
    554   }
    555   // The sid may have been unassigned when provider_->ConnectDataChannel was
    556   // done. So always add the streams even if connected_to_provider_ is true.
    557   if (config_.id >= 0) {
    558     provider_->AddSctpDataStream(config_.id);
    559   }
    560 }
    561 
    562 }  // namespace webrtc
    563