Home | History | Annotate | Download | only in webrtc
      1 /*
      2  * libjingle
      3  * Copyright 2012, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 #include "talk/app/webrtc/datachannel.h"
     28 
     29 #include <string>
     30 
     31 #include "talk/app/webrtc/mediastreamprovider.h"
     32 #include "talk/app/webrtc/sctputils.h"
     33 #include "webrtc/base/logging.h"
     34 #include "webrtc/base/refcount.h"
     35 
     36 namespace webrtc {
     37 
     38 static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
     39 static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024;
     40 
     41 enum {
     42   MSG_CHANNELREADY,
     43 };
     44 
     45 DataChannel::PacketQueue::PacketQueue() : byte_count_(0) {}
     46 
     47 DataChannel::PacketQueue::~PacketQueue() {
     48   Clear();
     49 }
     50 
     51 bool DataChannel::PacketQueue::Empty() const {
     52   return packets_.empty();
     53 }
     54 
     55 DataBuffer* DataChannel::PacketQueue::Front() {
     56   return packets_.front();
     57 }
     58 
     59 void DataChannel::PacketQueue::Pop() {
     60   if (packets_.empty()) {
     61     return;
     62   }
     63 
     64   byte_count_ -= packets_.front()->size();
     65   packets_.pop_front();
     66 }
     67 
     68 void DataChannel::PacketQueue::Push(DataBuffer* packet) {
     69   byte_count_ += packet->size();
     70   packets_.push_back(packet);
     71 }
     72 
     73 void DataChannel::PacketQueue::Clear() {
     74   while (!packets_.empty()) {
     75     delete packets_.front();
     76     packets_.pop_front();
     77   }
     78   byte_count_ = 0;
     79 }
     80 
     81 void DataChannel::PacketQueue::Swap(PacketQueue* other) {
     82   size_t other_byte_count = other->byte_count_;
     83   other->byte_count_ = byte_count_;
     84   byte_count_ = other_byte_count;
     85 
     86   other->packets_.swap(packets_);
     87 }
     88 
     89 rtc::scoped_refptr<DataChannel> DataChannel::Create(
     90     DataChannelProviderInterface* provider,
     91     cricket::DataChannelType dct,
     92     const std::string& label,
     93     const InternalDataChannelInit& config) {
     94   rtc::scoped_refptr<DataChannel> channel(
     95       new rtc::RefCountedObject<DataChannel>(provider, dct, label));
     96   if (!channel->Init(config)) {
     97     return NULL;
     98   }
     99   return channel;
    100 }
    101 
    102 DataChannel::DataChannel(
    103     DataChannelProviderInterface* provider,
    104     cricket::DataChannelType dct,
    105     const std::string& label)
    106     : label_(label),
    107       observer_(NULL),
    108       state_(kConnecting),
    109       data_channel_type_(dct),
    110       provider_(provider),
    111       waiting_for_open_ack_(false),
    112       was_ever_writable_(false),
    113       connected_to_provider_(false),
    114       send_ssrc_set_(false),
    115       receive_ssrc_set_(false),
    116       send_ssrc_(0),
    117       receive_ssrc_(0) {
    118 }
    119 
    120 bool DataChannel::Init(const InternalDataChannelInit& config) {
    121   if (data_channel_type_ == cricket::DCT_RTP &&
    122       (config.reliable ||
    123        config.id != -1 ||
    124        config.maxRetransmits != -1 ||
    125        config.maxRetransmitTime != -1)) {
    126     LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
    127                   << "invalid DataChannelInit.";
    128     return false;
    129   } else if (data_channel_type_ == cricket::DCT_SCTP) {
    130     if (config.id < -1 ||
    131         config.maxRetransmits < -1 ||
    132         config.maxRetransmitTime < -1) {
    133       LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
    134                     << "invalid DataChannelInit.";
    135       return false;
    136     }
    137     if (config.maxRetransmits != -1 && config.maxRetransmitTime != -1) {
    138       LOG(LS_ERROR) <<
    139           "maxRetransmits and maxRetransmitTime should not be both set.";
    140       return false;
    141     }
    142     config_ = config;
    143 
    144     // Try to connect to the transport in case the transport channel already
    145     // exists.
    146     OnTransportChannelCreated();
    147 
    148     // Checks if the transport is ready to send because the initial channel
    149     // ready signal may have been sent before the DataChannel creation.
    150     // This has to be done async because the upper layer objects (e.g.
    151     // Chrome glue and WebKit) are not wired up properly until after this
    152     // function returns.
    153     if (provider_->ReadyToSendData()) {
    154       rtc::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL);
    155     }
    156   }
    157 
    158   return true;
    159 }
    160 
    161 DataChannel::~DataChannel() {}
    162 
    163 void DataChannel::RegisterObserver(DataChannelObserver* observer) {
    164   observer_ = observer;
    165   DeliverQueuedReceivedData();
    166 }
    167 
    168 void DataChannel::UnregisterObserver() {
    169   observer_ = NULL;
    170 }
    171 
    172 bool DataChannel::reliable() const {
    173   if (data_channel_type_ == cricket::DCT_RTP) {
    174     return false;
    175   } else {
    176     return config_.maxRetransmits == -1 &&
    177            config_.maxRetransmitTime == -1;
    178   }
    179 }
    180 
    181 uint64 DataChannel::buffered_amount() const {
    182   return queued_send_data_.byte_count();
    183 }
    184 
    185 void DataChannel::Close() {
    186   if (state_ == kClosed)
    187     return;
    188   send_ssrc_ = 0;
    189   send_ssrc_set_ = false;
    190   SetState(kClosing);
    191   UpdateState();
    192 }
    193 
    194 bool DataChannel::Send(const DataBuffer& buffer) {
    195   if (state_ != kOpen) {
    196     return false;
    197   }
    198 
    199   // TODO(jiayl): the spec is unclear about if the remote side should get the
    200   // onmessage event. We need to figure out the expected behavior and change the
    201   // code accordingly.
    202   if (buffer.size() == 0) {
    203     return true;
    204   }
    205 
    206   // If the queue is non-empty, we're waiting for SignalReadyToSend,
    207   // so just add to the end of the queue and keep waiting.
    208   if (!queued_send_data_.Empty()) {
    209     // Only SCTP DataChannel queues the outgoing data when the transport is
    210     // blocked.
    211     ASSERT(data_channel_type_ == cricket::DCT_SCTP);
    212     if (!QueueSendDataMessage(buffer)) {
    213       Close();
    214     }
    215     return true;
    216   }
    217 
    218   bool success = SendDataMessage(buffer);
    219   if (data_channel_type_ == cricket::DCT_RTP) {
    220     return success;
    221   }
    222 
    223   // Always return true for SCTP DataChannel per the spec.
    224   return true;
    225 }
    226 
    227 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
    228   ASSERT(data_channel_type_ == cricket::DCT_RTP);
    229 
    230   if (receive_ssrc_set_) {
    231     return;
    232   }
    233   receive_ssrc_ = receive_ssrc;
    234   receive_ssrc_set_ = true;
    235   UpdateState();
    236 }
    237 
    238 // The remote peer request that this channel shall be closed.
    239 void DataChannel::RemotePeerRequestClose() {
    240   DoClose();
    241 }
    242 
    243 void DataChannel::SetSctpSid(int sid) {
    244   ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
    245   if (config_.id == sid)
    246     return;
    247 
    248   config_.id = sid;
    249   provider_->AddSctpDataStream(sid);
    250 }
    251 
    252 void DataChannel::OnTransportChannelCreated() {
    253   ASSERT(data_channel_type_ == cricket::DCT_SCTP);
    254   if (!connected_to_provider_) {
    255     connected_to_provider_ = provider_->ConnectDataChannel(this);
    256   }
    257   // The sid may have been unassigned when provider_->ConnectDataChannel was
    258   // done. So always add the streams even if connected_to_provider_ is true.
    259   if (config_.id >= 0) {
    260     provider_->AddSctpDataStream(config_.id);
    261   }
    262 }
    263 
    264 void DataChannel::SetSendSsrc(uint32 send_ssrc) {
    265   ASSERT(data_channel_type_ == cricket::DCT_RTP);
    266   if (send_ssrc_set_) {
    267     return;
    268   }
    269   send_ssrc_ = send_ssrc;
    270   send_ssrc_set_ = true;
    271   UpdateState();
    272 }
    273 
    274 void DataChannel::OnMessage(rtc::Message* msg) {
    275   switch (msg->message_id) {
    276     case MSG_CHANNELREADY:
    277       OnChannelReady(true);
    278       break;
    279   }
    280 }
    281 
    282 // The underlaying data engine is closing.
    283 // This function makes sure the DataChannel is disconnected and changes state to
    284 // kClosed.
    285 void DataChannel::OnDataEngineClose() {
    286   DoClose();
    287 }
    288 
    289 void DataChannel::OnDataReceived(cricket::DataChannel* channel,
    290                                  const cricket::ReceiveDataParams& params,
    291                                  const rtc::Buffer& payload) {
    292   uint32 expected_ssrc =
    293       (data_channel_type_ == cricket::DCT_RTP) ? receive_ssrc_ : config_.id;
    294   if (params.ssrc != expected_ssrc) {
    295     return;
    296   }
    297 
    298   if (params.type == cricket::DMT_CONTROL) {
    299     ASSERT(data_channel_type_ == cricket::DCT_SCTP);
    300     if (!waiting_for_open_ack_) {
    301       // Ignore it if we are not expecting an ACK message.
    302       LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, "
    303                       << "sid = " << params.ssrc;
    304       return;
    305     }
    306     if (ParseDataChannelOpenAckMessage(payload)) {
    307       // We can send unordered as soon as we receive the ACK message.
    308       waiting_for_open_ack_ = false;
    309       LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
    310                    << params.ssrc;
    311     } else {
    312       LOG(LS_WARNING) << "DataChannel failed to parse OPEN_ACK message, sid = "
    313                       << params.ssrc;
    314     }
    315     return;
    316   }
    317 
    318   ASSERT(params.type == cricket::DMT_BINARY ||
    319          params.type == cricket::DMT_TEXT);
    320 
    321   LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " << params.ssrc;
    322   // We can send unordered as soon as we receive any DATA message since the
    323   // remote side must have received the OPEN (and old clients do not send
    324   // OPEN_ACK).
    325   waiting_for_open_ack_ = false;
    326 
    327   bool binary = (params.type == cricket::DMT_BINARY);
    328   rtc::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
    329   if (was_ever_writable_ && observer_) {
    330     observer_->OnMessage(*buffer.get());
    331   } else {
    332     if (queued_received_data_.byte_count() + payload.length() >
    333             kMaxQueuedReceivedDataBytes) {
    334       LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
    335 
    336       queued_received_data_.Clear();
    337       if (data_channel_type_ != cricket::DCT_RTP) {
    338         Close();
    339       }
    340 
    341       return;
    342     }
    343     queued_received_data_.Push(buffer.release());
    344   }
    345 }
    346 
    347 void DataChannel::OnChannelReady(bool writable) {
    348   if (!writable) {
    349     return;
    350   }
    351   // Update the readyState and send the queued control message if the channel
    352   // is writable for the first time; otherwise it means the channel was blocked
    353   // for sending and now unblocked, so send the queued data now.
    354   if (!was_ever_writable_) {
    355     was_ever_writable_ = true;
    356 
    357     if (data_channel_type_ == cricket::DCT_SCTP) {
    358       rtc::Buffer payload;
    359 
    360       if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
    361         WriteDataChannelOpenMessage(label_, config_, &payload);
    362         SendControlMessage(payload);
    363       } else if (config_.open_handshake_role ==
    364                      InternalDataChannelInit::kAcker) {
    365         WriteDataChannelOpenAckMessage(&payload);
    366         SendControlMessage(payload);
    367       }
    368     }
    369 
    370     UpdateState();
    371     ASSERT(queued_send_data_.Empty());
    372   } else if (state_ == kOpen) {
    373     // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
    374     // that the readyState is open. According to the standard, the channel
    375     // should not become open before the OPEN message is sent.
    376     SendQueuedControlMessages();
    377 
    378     SendQueuedDataMessages();
    379   }
    380 }
    381 
    382 void DataChannel::DoClose() {
    383   if (state_ == kClosed)
    384     return;
    385 
    386   receive_ssrc_set_ = false;
    387   send_ssrc_set_ = false;
    388   SetState(kClosing);
    389   UpdateState();
    390 }
    391 
    392 void DataChannel::UpdateState() {
    393   switch (state_) {
    394     case kConnecting: {
    395       if (send_ssrc_set_ == receive_ssrc_set_) {
    396         if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) {
    397           connected_to_provider_ = provider_->ConnectDataChannel(this);
    398         }
    399         if (was_ever_writable_) {
    400           // TODO(jiayl): Do not transition to kOpen if we failed to send the
    401           // OPEN message.
    402           SendQueuedControlMessages();
    403           SetState(kOpen);
    404           // If we have received buffers before the channel got writable.
    405           // Deliver them now.
    406           DeliverQueuedReceivedData();
    407         }
    408       }
    409       break;
    410     }
    411     case kOpen: {
    412       break;
    413     }
    414     case kClosing: {
    415       DisconnectFromTransport();
    416 
    417       if (!send_ssrc_set_ && !receive_ssrc_set_) {
    418         SetState(kClosed);
    419       }
    420       break;
    421     }
    422     case kClosed:
    423       break;
    424   }
    425 }
    426 
    427 void DataChannel::SetState(DataState state) {
    428   if (state_ == state)
    429     return;
    430 
    431   state_ = state;
    432   if (observer_) {
    433     observer_->OnStateChange();
    434   }
    435 }
    436 
    437 void DataChannel::DisconnectFromTransport() {
    438   if (!connected_to_provider_)
    439     return;
    440 
    441   provider_->DisconnectDataChannel(this);
    442   connected_to_provider_ = false;
    443 
    444   if (data_channel_type_ == cricket::DCT_SCTP) {
    445     provider_->RemoveSctpDataStream(config_.id);
    446   }
    447 }
    448 
    449 void DataChannel::DeliverQueuedReceivedData() {
    450   if (!was_ever_writable_ || !observer_) {
    451     return;
    452   }
    453 
    454   while (!queued_received_data_.Empty()) {
    455     rtc::scoped_ptr<DataBuffer> buffer(queued_received_data_.Front());
    456     observer_->OnMessage(*buffer);
    457     queued_received_data_.Pop();
    458   }
    459 }
    460 
    461 void DataChannel::SendQueuedDataMessages() {
    462   ASSERT(was_ever_writable_ && state_ == kOpen);
    463 
    464   PacketQueue packet_buffer;
    465   packet_buffer.Swap(&queued_send_data_);
    466 
    467   while (!packet_buffer.Empty()) {
    468     rtc::scoped_ptr<DataBuffer> buffer(packet_buffer.Front());
    469     SendDataMessage(*buffer);
    470     packet_buffer.Pop();
    471   }
    472 }
    473 
    474 bool DataChannel::SendDataMessage(const DataBuffer& buffer) {
    475   cricket::SendDataParams send_params;
    476 
    477   if (data_channel_type_ == cricket::DCT_SCTP) {
    478     send_params.ordered = config_.ordered;
    479     // Send as ordered if it is waiting for the OPEN_ACK message.
    480     if (waiting_for_open_ack_ && !config_.ordered) {
    481       send_params.ordered = true;
    482       LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel "
    483                       << "because the OPEN_ACK message has not been received.";
    484     }
    485 
    486     send_params.max_rtx_count = config_.maxRetransmits;
    487     send_params.max_rtx_ms = config_.maxRetransmitTime;
    488     send_params.ssrc = config_.id;
    489   } else {
    490     send_params.ssrc = send_ssrc_;
    491   }
    492   send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
    493 
    494   cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
    495   bool success = provider_->SendData(send_params, buffer.data, &send_result);
    496 
    497   if (!success && data_channel_type_ == cricket::DCT_SCTP) {
    498     if (send_result != cricket::SDR_BLOCK || !QueueSendDataMessage(buffer)) {
    499       LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
    500                     << "send_result = " << send_result;
    501       Close();
    502     }
    503   }
    504   return success;
    505 }
    506 
    507 bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
    508   if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) {
    509     LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
    510     return false;
    511   }
    512   queued_send_data_.Push(new DataBuffer(buffer));
    513   return true;
    514 }
    515 
    516 void DataChannel::SendQueuedControlMessages() {
    517   ASSERT(was_ever_writable_);
    518 
    519   PacketQueue control_packets;
    520   control_packets.Swap(&queued_control_data_);
    521 
    522   while (!control_packets.Empty()) {
    523     rtc::scoped_ptr<DataBuffer> buf(control_packets.Front());
    524     SendControlMessage(buf->data);
    525     control_packets.Pop();
    526   }
    527 }
    528 
    529 void DataChannel::QueueControlMessage(const rtc::Buffer& buffer) {
    530   queued_control_data_.Push(new DataBuffer(buffer, true));
    531 }
    532 
    533 bool DataChannel::SendControlMessage(const rtc::Buffer& buffer) {
    534   bool is_open_message =
    535       (config_.open_handshake_role == InternalDataChannelInit::kOpener);
    536 
    537   ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
    538          was_ever_writable_ &&
    539          config_.id >= 0 &&
    540          (!is_open_message || !config_.negotiated));
    541 
    542   cricket::SendDataParams send_params;
    543   send_params.ssrc = config_.id;
    544   send_params.ordered = config_.ordered || is_open_message;
    545   send_params.type = cricket::DMT_CONTROL;
    546 
    547   cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
    548   bool retval = provider_->SendData(send_params, buffer, &send_result);
    549   if (retval) {
    550     LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id;
    551 
    552     if (is_open_message) {
    553       // Send data as ordered before we receive any message from the remote peer
    554       // to make sure the remote peer will not receive any data before it
    555       // receives the OPEN message.
    556       waiting_for_open_ack_ = true;
    557     }
    558   } else if (send_result == cricket::SDR_BLOCK) {
    559     QueueControlMessage(buffer);
    560   } else {
    561     LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
    562                   << " the CONTROL message, send_result = " << send_result;
    563     Close();
    564   }
    565   return retval;
    566 }
    567 
    568 }  // namespace webrtc
    569