1 /* 2 * libjingle 3 * Copyright 2012 Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/app/webrtc/datachannel.h" 29 30 #include <string> 31 32 #include "talk/app/webrtc/mediastreamprovider.h" 33 #include "talk/app/webrtc/sctputils.h" 34 #include "talk/media/sctp/sctpdataengine.h" 35 #include "webrtc/base/logging.h" 36 #include "webrtc/base/refcount.h" 37 38 namespace webrtc { 39 40 static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024; 41 static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024; 42 43 enum { 44 MSG_CHANNELREADY, 45 }; 46 47 bool SctpSidAllocator::AllocateSid(rtc::SSLRole role, int* sid) { 48 int potential_sid = (role == rtc::SSL_CLIENT) ? 0 : 1; 49 while (!IsSidAvailable(potential_sid)) { 50 potential_sid += 2; 51 if (potential_sid > static_cast<int>(cricket::kMaxSctpSid)) { 52 return false; 53 } 54 } 55 56 *sid = potential_sid; 57 used_sids_.insert(potential_sid); 58 return true; 59 } 60 61 bool SctpSidAllocator::ReserveSid(int sid) { 62 if (!IsSidAvailable(sid)) { 63 return false; 64 } 65 used_sids_.insert(sid); 66 return true; 67 } 68 69 void SctpSidAllocator::ReleaseSid(int sid) { 70 auto it = used_sids_.find(sid); 71 if (it != used_sids_.end()) { 72 used_sids_.erase(it); 73 } 74 } 75 76 bool SctpSidAllocator::IsSidAvailable(int sid) const { 77 if (sid < 0 || sid > static_cast<int>(cricket::kMaxSctpSid)) { 78 return false; 79 } 80 return used_sids_.find(sid) == used_sids_.end(); 81 } 82 83 DataChannel::PacketQueue::PacketQueue() : byte_count_(0) {} 84 85 DataChannel::PacketQueue::~PacketQueue() { 86 Clear(); 87 } 88 89 bool DataChannel::PacketQueue::Empty() const { 90 return packets_.empty(); 91 } 92 93 DataBuffer* DataChannel::PacketQueue::Front() { 94 return packets_.front(); 95 } 96 97 void DataChannel::PacketQueue::Pop() { 98 if (packets_.empty()) { 99 return; 100 } 101 102 byte_count_ -= packets_.front()->size(); 103 packets_.pop_front(); 104 } 105 106 void DataChannel::PacketQueue::Push(DataBuffer* packet) { 107 byte_count_ += packet->size(); 108 packets_.push_back(packet); 109 } 110 111 void DataChannel::PacketQueue::Clear() { 112 while (!packets_.empty()) { 113 delete packets_.front(); 114 packets_.pop_front(); 115 } 116 byte_count_ = 0; 117 } 118 119 void DataChannel::PacketQueue::Swap(PacketQueue* other) { 120 size_t other_byte_count = other->byte_count_; 121 other->byte_count_ = byte_count_; 122 byte_count_ = other_byte_count; 123 124 other->packets_.swap(packets_); 125 } 126 127 rtc::scoped_refptr<DataChannel> DataChannel::Create( 128 DataChannelProviderInterface* provider, 129 cricket::DataChannelType dct, 130 const std::string& label, 131 const InternalDataChannelInit& config) { 132 rtc::scoped_refptr<DataChannel> channel( 133 new rtc::RefCountedObject<DataChannel>(provider, dct, label)); 134 if (!channel->Init(config)) { 135 return NULL; 136 } 137 return channel; 138 } 139 140 DataChannel::DataChannel( 141 DataChannelProviderInterface* provider, 142 cricket::DataChannelType dct, 143 const std::string& label) 144 : label_(label), 145 observer_(NULL), 146 state_(kConnecting), 147 data_channel_type_(dct), 148 provider_(provider), 149 handshake_state_(kHandshakeInit), 150 connected_to_provider_(false), 151 send_ssrc_set_(false), 152 receive_ssrc_set_(false), 153 writable_(false), 154 send_ssrc_(0), 155 receive_ssrc_(0) { 156 } 157 158 bool DataChannel::Init(const InternalDataChannelInit& config) { 159 if (data_channel_type_ == cricket::DCT_RTP) { 160 if (config.reliable || 161 config.id != -1 || 162 config.maxRetransmits != -1 || 163 config.maxRetransmitTime != -1) { 164 LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to " 165 << "invalid DataChannelInit."; 166 return false; 167 } 168 handshake_state_ = kHandshakeReady; 169 } else if (data_channel_type_ == cricket::DCT_SCTP) { 170 if (config.id < -1 || 171 config.maxRetransmits < -1 || 172 config.maxRetransmitTime < -1) { 173 LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to " 174 << "invalid DataChannelInit."; 175 return false; 176 } 177 if (config.maxRetransmits != -1 && config.maxRetransmitTime != -1) { 178 LOG(LS_ERROR) << 179 "maxRetransmits and maxRetransmitTime should not be both set."; 180 return false; 181 } 182 config_ = config; 183 184 switch (config_.open_handshake_role) { 185 case webrtc::InternalDataChannelInit::kNone: // pre-negotiated 186 handshake_state_ = kHandshakeReady; 187 break; 188 case webrtc::InternalDataChannelInit::kOpener: 189 handshake_state_ = kHandshakeShouldSendOpen; 190 break; 191 case webrtc::InternalDataChannelInit::kAcker: 192 handshake_state_ = kHandshakeShouldSendAck; 193 break; 194 }; 195 196 // Try to connect to the transport in case the transport channel already 197 // exists. 198 OnTransportChannelCreated(); 199 200 // Checks if the transport is ready to send because the initial channel 201 // ready signal may have been sent before the DataChannel creation. 202 // This has to be done async because the upper layer objects (e.g. 203 // Chrome glue and WebKit) are not wired up properly until after this 204 // function returns. 205 if (provider_->ReadyToSendData()) { 206 rtc::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL); 207 } 208 } 209 210 return true; 211 } 212 213 DataChannel::~DataChannel() {} 214 215 void DataChannel::RegisterObserver(DataChannelObserver* observer) { 216 observer_ = observer; 217 DeliverQueuedReceivedData(); 218 } 219 220 void DataChannel::UnregisterObserver() { 221 observer_ = NULL; 222 } 223 224 bool DataChannel::reliable() const { 225 if (data_channel_type_ == cricket::DCT_RTP) { 226 return false; 227 } else { 228 return config_.maxRetransmits == -1 && 229 config_.maxRetransmitTime == -1; 230 } 231 } 232 233 uint64_t DataChannel::buffered_amount() const { 234 return queued_send_data_.byte_count(); 235 } 236 237 void DataChannel::Close() { 238 if (state_ == kClosed) 239 return; 240 send_ssrc_ = 0; 241 send_ssrc_set_ = false; 242 SetState(kClosing); 243 UpdateState(); 244 } 245 246 bool DataChannel::Send(const DataBuffer& buffer) { 247 if (state_ != kOpen) { 248 return false; 249 } 250 251 // TODO(jiayl): the spec is unclear about if the remote side should get the 252 // onmessage event. We need to figure out the expected behavior and change the 253 // code accordingly. 254 if (buffer.size() == 0) { 255 return true; 256 } 257 258 // If the queue is non-empty, we're waiting for SignalReadyToSend, 259 // so just add to the end of the queue and keep waiting. 260 if (!queued_send_data_.Empty()) { 261 // Only SCTP DataChannel queues the outgoing data when the transport is 262 // blocked. 263 ASSERT(data_channel_type_ == cricket::DCT_SCTP); 264 if (!QueueSendDataMessage(buffer)) { 265 Close(); 266 } 267 return true; 268 } 269 270 bool success = SendDataMessage(buffer, true); 271 if (data_channel_type_ == cricket::DCT_RTP) { 272 return success; 273 } 274 275 // Always return true for SCTP DataChannel per the spec. 276 return true; 277 } 278 279 void DataChannel::SetReceiveSsrc(uint32_t receive_ssrc) { 280 ASSERT(data_channel_type_ == cricket::DCT_RTP); 281 282 if (receive_ssrc_set_) { 283 return; 284 } 285 receive_ssrc_ = receive_ssrc; 286 receive_ssrc_set_ = true; 287 UpdateState(); 288 } 289 290 // The remote peer request that this channel shall be closed. 291 void DataChannel::RemotePeerRequestClose() { 292 DoClose(); 293 } 294 295 void DataChannel::SetSctpSid(int sid) { 296 ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP); 297 if (config_.id == sid) { 298 return; 299 } 300 301 config_.id = sid; 302 provider_->AddSctpDataStream(sid); 303 } 304 305 void DataChannel::OnTransportChannelCreated() { 306 ASSERT(data_channel_type_ == cricket::DCT_SCTP); 307 if (!connected_to_provider_) { 308 connected_to_provider_ = provider_->ConnectDataChannel(this); 309 } 310 // The sid may have been unassigned when provider_->ConnectDataChannel was 311 // done. So always add the streams even if connected_to_provider_ is true. 312 if (config_.id >= 0) { 313 provider_->AddSctpDataStream(config_.id); 314 } 315 } 316 317 // The underlying transport channel was destroyed. 318 // This function makes sure the DataChannel is disconnected and changes state to 319 // kClosed. 320 void DataChannel::OnTransportChannelDestroyed() { 321 DoClose(); 322 } 323 324 void DataChannel::SetSendSsrc(uint32_t send_ssrc) { 325 ASSERT(data_channel_type_ == cricket::DCT_RTP); 326 if (send_ssrc_set_) { 327 return; 328 } 329 send_ssrc_ = send_ssrc; 330 send_ssrc_set_ = true; 331 UpdateState(); 332 } 333 334 void DataChannel::OnMessage(rtc::Message* msg) { 335 switch (msg->message_id) { 336 case MSG_CHANNELREADY: 337 OnChannelReady(true); 338 break; 339 } 340 } 341 342 void DataChannel::OnDataReceived(cricket::DataChannel* channel, 343 const cricket::ReceiveDataParams& params, 344 const rtc::Buffer& payload) { 345 uint32_t expected_ssrc = 346 (data_channel_type_ == cricket::DCT_RTP) ? receive_ssrc_ : config_.id; 347 if (params.ssrc != expected_ssrc) { 348 return; 349 } 350 351 if (params.type == cricket::DMT_CONTROL) { 352 ASSERT(data_channel_type_ == cricket::DCT_SCTP); 353 if (handshake_state_ != kHandshakeWaitingForAck) { 354 // Ignore it if we are not expecting an ACK message. 355 LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, " 356 << "sid = " << params.ssrc; 357 return; 358 } 359 if (ParseDataChannelOpenAckMessage(payload)) { 360 // We can send unordered as soon as we receive the ACK message. 361 handshake_state_ = kHandshakeReady; 362 LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = " 363 << params.ssrc; 364 } else { 365 LOG(LS_WARNING) << "DataChannel failed to parse OPEN_ACK message, sid = " 366 << params.ssrc; 367 } 368 return; 369 } 370 371 ASSERT(params.type == cricket::DMT_BINARY || 372 params.type == cricket::DMT_TEXT); 373 374 LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " << params.ssrc; 375 // We can send unordered as soon as we receive any DATA message since the 376 // remote side must have received the OPEN (and old clients do not send 377 // OPEN_ACK). 378 if (handshake_state_ == kHandshakeWaitingForAck) { 379 handshake_state_ = kHandshakeReady; 380 } 381 382 bool binary = (params.type == cricket::DMT_BINARY); 383 rtc::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary)); 384 if (state_ == kOpen && observer_) { 385 observer_->OnMessage(*buffer.get()); 386 } else { 387 if (queued_received_data_.byte_count() + payload.size() > 388 kMaxQueuedReceivedDataBytes) { 389 LOG(LS_ERROR) << "Queued received data exceeds the max buffer size."; 390 391 queued_received_data_.Clear(); 392 if (data_channel_type_ != cricket::DCT_RTP) { 393 Close(); 394 } 395 396 return; 397 } 398 queued_received_data_.Push(buffer.release()); 399 } 400 } 401 402 void DataChannel::OnStreamClosedRemotely(uint32_t sid) { 403 if (data_channel_type_ == cricket::DCT_SCTP && sid == config_.id) { 404 Close(); 405 } 406 } 407 408 void DataChannel::OnChannelReady(bool writable) { 409 writable_ = writable; 410 if (!writable) { 411 return; 412 } 413 414 SendQueuedControlMessages(); 415 SendQueuedDataMessages(); 416 UpdateState(); 417 } 418 419 void DataChannel::DoClose() { 420 if (state_ == kClosed) 421 return; 422 423 receive_ssrc_set_ = false; 424 send_ssrc_set_ = false; 425 SetState(kClosing); 426 UpdateState(); 427 } 428 429 void DataChannel::UpdateState() { 430 // UpdateState determines what to do from a few state variables. Include 431 // all conditions required for each state transition here for 432 // clarity. OnChannelReady(true) will send any queued data and then invoke 433 // UpdateState(). 434 switch (state_) { 435 case kConnecting: { 436 if (send_ssrc_set_ == receive_ssrc_set_) { 437 if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) { 438 connected_to_provider_ = provider_->ConnectDataChannel(this); 439 } 440 if (connected_to_provider_) { 441 if (handshake_state_ == kHandshakeShouldSendOpen) { 442 rtc::Buffer payload; 443 WriteDataChannelOpenMessage(label_, config_, &payload); 444 SendControlMessage(payload); 445 } else if (handshake_state_ == kHandshakeShouldSendAck) { 446 rtc::Buffer payload; 447 WriteDataChannelOpenAckMessage(&payload); 448 SendControlMessage(payload); 449 } 450 if (writable_ && 451 (handshake_state_ == kHandshakeReady || 452 handshake_state_ == kHandshakeWaitingForAck)) { 453 SetState(kOpen); 454 // If we have received buffers before the channel got writable. 455 // Deliver them now. 456 DeliverQueuedReceivedData(); 457 } 458 } 459 } 460 break; 461 } 462 case kOpen: { 463 break; 464 } 465 case kClosing: { 466 if (queued_send_data_.Empty() && queued_control_data_.Empty()) { 467 if (connected_to_provider_) { 468 DisconnectFromProvider(); 469 } 470 471 if (!connected_to_provider_ && !send_ssrc_set_ && !receive_ssrc_set_) { 472 SetState(kClosed); 473 } 474 } 475 break; 476 } 477 case kClosed: 478 break; 479 } 480 } 481 482 void DataChannel::SetState(DataState state) { 483 if (state_ == state) { 484 return; 485 } 486 487 state_ = state; 488 if (observer_) { 489 observer_->OnStateChange(); 490 } 491 if (state_ == kClosed) { 492 SignalClosed(this); 493 } 494 } 495 496 void DataChannel::DisconnectFromProvider() { 497 if (!connected_to_provider_) 498 return; 499 500 provider_->DisconnectDataChannel(this); 501 connected_to_provider_ = false; 502 503 if (data_channel_type_ == cricket::DCT_SCTP && config_.id >= 0) { 504 provider_->RemoveSctpDataStream(config_.id); 505 } 506 } 507 508 void DataChannel::DeliverQueuedReceivedData() { 509 if (!observer_) { 510 return; 511 } 512 513 while (!queued_received_data_.Empty()) { 514 rtc::scoped_ptr<DataBuffer> buffer(queued_received_data_.Front()); 515 observer_->OnMessage(*buffer); 516 queued_received_data_.Pop(); 517 } 518 } 519 520 void DataChannel::SendQueuedDataMessages() { 521 if (queued_send_data_.Empty()) { 522 return; 523 } 524 525 ASSERT(state_ == kOpen || state_ == kClosing); 526 527 uint64_t start_buffered_amount = buffered_amount(); 528 while (!queued_send_data_.Empty()) { 529 DataBuffer* buffer = queued_send_data_.Front(); 530 if (!SendDataMessage(*buffer, false)) { 531 // Leave the message in the queue if sending is aborted. 532 break; 533 } 534 queued_send_data_.Pop(); 535 delete buffer; 536 } 537 538 if (observer_ && buffered_amount() < start_buffered_amount) { 539 observer_->OnBufferedAmountChange(start_buffered_amount); 540 } 541 } 542 543 bool DataChannel::SendDataMessage(const DataBuffer& buffer, 544 bool queue_if_blocked) { 545 cricket::SendDataParams send_params; 546 547 if (data_channel_type_ == cricket::DCT_SCTP) { 548 send_params.ordered = config_.ordered; 549 // Send as ordered if it is still going through OPEN/ACK signaling. 550 if (handshake_state_ != kHandshakeReady && !config_.ordered) { 551 send_params.ordered = true; 552 LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel " 553 << "because the OPEN_ACK message has not been received."; 554 } 555 556 send_params.max_rtx_count = config_.maxRetransmits; 557 send_params.max_rtx_ms = config_.maxRetransmitTime; 558 send_params.ssrc = config_.id; 559 } else { 560 send_params.ssrc = send_ssrc_; 561 } 562 send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT; 563 564 cricket::SendDataResult send_result = cricket::SDR_SUCCESS; 565 bool success = provider_->SendData(send_params, buffer.data, &send_result); 566 567 if (success) { 568 return true; 569 } 570 571 if (data_channel_type_ != cricket::DCT_SCTP) { 572 return false; 573 } 574 575 if (send_result == cricket::SDR_BLOCK) { 576 if (!queue_if_blocked || QueueSendDataMessage(buffer)) { 577 return false; 578 } 579 } 580 // Close the channel if the error is not SDR_BLOCK, or if queuing the 581 // message failed. 582 LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, " 583 << "send_result = " << send_result; 584 Close(); 585 586 return false; 587 } 588 589 bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { 590 size_t start_buffered_amount = buffered_amount(); 591 if (start_buffered_amount >= kMaxQueuedSendDataBytes) { 592 LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; 593 return false; 594 } 595 queued_send_data_.Push(new DataBuffer(buffer)); 596 597 // The buffer can have length zero, in which case there is no change. 598 if (observer_ && buffered_amount() > start_buffered_amount) { 599 observer_->OnBufferedAmountChange(start_buffered_amount); 600 } 601 return true; 602 } 603 604 void DataChannel::SendQueuedControlMessages() { 605 PacketQueue control_packets; 606 control_packets.Swap(&queued_control_data_); 607 608 while (!control_packets.Empty()) { 609 rtc::scoped_ptr<DataBuffer> buf(control_packets.Front()); 610 SendControlMessage(buf->data); 611 control_packets.Pop(); 612 } 613 } 614 615 void DataChannel::QueueControlMessage(const rtc::Buffer& buffer) { 616 queued_control_data_.Push(new DataBuffer(buffer, true)); 617 } 618 619 bool DataChannel::SendControlMessage(const rtc::Buffer& buffer) { 620 bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen; 621 622 ASSERT(data_channel_type_ == cricket::DCT_SCTP && 623 writable_ && 624 config_.id >= 0 && 625 (!is_open_message || !config_.negotiated)); 626 627 cricket::SendDataParams send_params; 628 send_params.ssrc = config_.id; 629 // Send data as ordered before we receive any message from the remote peer to 630 // make sure the remote peer will not receive any data before it receives the 631 // OPEN message. 632 send_params.ordered = config_.ordered || is_open_message; 633 send_params.type = cricket::DMT_CONTROL; 634 635 cricket::SendDataResult send_result = cricket::SDR_SUCCESS; 636 bool retval = provider_->SendData(send_params, buffer, &send_result); 637 if (retval) { 638 LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id; 639 640 if (handshake_state_ == kHandshakeShouldSendAck) { 641 handshake_state_ = kHandshakeReady; 642 } else if (handshake_state_ == kHandshakeShouldSendOpen) { 643 handshake_state_ = kHandshakeWaitingForAck; 644 } 645 } else if (send_result == cricket::SDR_BLOCK) { 646 QueueControlMessage(buffer); 647 } else { 648 LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send" 649 << " the CONTROL message, send_result = " << send_result; 650 Close(); 651 } 652 return retval; 653 } 654 655 } // namespace webrtc 656