1 /* 2 * libjingle 3 * Copyright 2012, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 #include "talk/app/webrtc/datachannel.h" 28 29 #include <string> 30 31 #include "talk/app/webrtc/mediastreamprovider.h" 32 #include "talk/base/logging.h" 33 #include "talk/base/refcount.h" 34 #include "talk/media/sctp/sctputils.h" 35 36 namespace webrtc { 37 38 static size_t kMaxQueuedReceivedDataPackets = 100; 39 static size_t kMaxQueuedSendDataPackets = 100; 40 41 enum { 42 MSG_CHANNELREADY, 43 }; 44 45 talk_base::scoped_refptr<DataChannel> DataChannel::Create( 46 DataChannelProviderInterface* provider, 47 cricket::DataChannelType dct, 48 const std::string& label, 49 const DataChannelInit* config) { 50 talk_base::scoped_refptr<DataChannel> channel( 51 new talk_base::RefCountedObject<DataChannel>(provider, dct, label)); 52 if (!channel->Init(config)) { 53 return NULL; 54 } 55 return channel; 56 } 57 58 DataChannel::DataChannel( 59 DataChannelProviderInterface* provider, 60 cricket::DataChannelType dct, 61 const std::string& label) 62 : label_(label), 63 observer_(NULL), 64 state_(kConnecting), 65 was_ever_writable_(false), 66 connected_to_provider_(false), 67 data_channel_type_(dct), 68 provider_(provider), 69 send_ssrc_set_(false), 70 send_ssrc_(0), 71 receive_ssrc_set_(false), 72 receive_ssrc_(0) { 73 } 74 75 bool DataChannel::Init(const DataChannelInit* config) { 76 if (data_channel_type_ == cricket::DCT_RTP && 77 (config->reliable || 78 config->id != -1 || 79 config->maxRetransmits != -1 || 80 config->maxRetransmitTime != -1)) { 81 LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to " 82 << "invalid DataChannelInit."; 83 return false; 84 } else if (data_channel_type_ == cricket::DCT_SCTP) { 85 if (config->id < -1 || 86 config->maxRetransmits < -1 || 87 config->maxRetransmitTime < -1) { 88 LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to " 89 << "invalid DataChannelInit."; 90 return false; 91 } 92 if (config->maxRetransmits != -1 && config->maxRetransmitTime != -1) { 93 LOG(LS_ERROR) << 94 "maxRetransmits and maxRetransmitTime should not be both set."; 95 return false; 96 } 97 config_ = *config; 98 99 // Try to connect to the transport in case the transport channel already 100 // exists. 101 OnTransportChannelCreated(); 102 103 // Checks if the transport is ready to send because the initial channel 104 // ready signal may have been sent before the DataChannel creation. 105 // This has to be done async because the upper layer objects (e.g. 106 // Chrome glue and WebKit) are not wired up properly until after this 107 // function returns. 108 if (provider_->ReadyToSendData()) { 109 talk_base::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL); 110 } 111 } 112 113 return true; 114 } 115 116 DataChannel::~DataChannel() { 117 ClearQueuedReceivedData(); 118 ClearQueuedSendData(); 119 ClearQueuedControlData(); 120 } 121 122 void DataChannel::RegisterObserver(DataChannelObserver* observer) { 123 observer_ = observer; 124 DeliverQueuedReceivedData(); 125 } 126 127 void DataChannel::UnregisterObserver() { 128 observer_ = NULL; 129 } 130 131 bool DataChannel::reliable() const { 132 if (data_channel_type_ == cricket::DCT_RTP) { 133 return false; 134 } else { 135 return config_.maxRetransmits == -1 && 136 config_.maxRetransmitTime == -1; 137 } 138 } 139 140 uint64 DataChannel::buffered_amount() const { 141 uint64 buffered_amount = 0; 142 for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin(); 143 it != queued_send_data_.end(); 144 ++it) { 145 buffered_amount += (*it)->size(); 146 } 147 return buffered_amount; 148 } 149 150 void DataChannel::Close() { 151 if (state_ == kClosed) 152 return; 153 send_ssrc_ = 0; 154 send_ssrc_set_ = false; 155 SetState(kClosing); 156 UpdateState(); 157 } 158 159 bool DataChannel::Send(const DataBuffer& buffer) { 160 if (state_ != kOpen) { 161 return false; 162 } 163 // If the queue is non-empty, we're waiting for SignalReadyToSend, 164 // so just add to the end of the queue and keep waiting. 165 if (!queued_send_data_.empty()) { 166 return QueueSendData(buffer); 167 } 168 169 cricket::SendDataResult send_result; 170 if (!InternalSendWithoutQueueing(buffer, &send_result)) { 171 if (send_result == cricket::SDR_BLOCK) { 172 return QueueSendData(buffer); 173 } 174 // Fail for other results. 175 // TODO(jiayl): We should close the data channel in this case. 176 return false; 177 } 178 return true; 179 } 180 181 void DataChannel::QueueControl(const talk_base::Buffer* buffer) { 182 queued_control_data_.push(buffer); 183 } 184 185 bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) { 186 ASSERT(data_channel_type_ == cricket::DCT_SCTP && 187 was_ever_writable_ && 188 config_.id >= 0 && 189 !config_.negotiated); 190 191 talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer); 192 193 cricket::SendDataParams send_params; 194 send_params.ssrc = config_.id; 195 send_params.ordered = true; 196 send_params.type = cricket::DMT_CONTROL; 197 198 cricket::SendDataResult send_result; 199 bool retval = provider_->SendData(send_params, *buffer, &send_result); 200 if (!retval && send_result == cricket::SDR_BLOCK) { 201 // Link is congested. Queue for later. 202 QueueControl(buffer.release()); 203 } 204 return retval; 205 } 206 207 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) { 208 ASSERT(data_channel_type_ == cricket::DCT_RTP); 209 210 if (receive_ssrc_set_) { 211 return; 212 } 213 receive_ssrc_ = receive_ssrc; 214 receive_ssrc_set_ = true; 215 UpdateState(); 216 } 217 218 // The remote peer request that this channel shall be closed. 219 void DataChannel::RemotePeerRequestClose() { 220 DoClose(); 221 } 222 223 void DataChannel::SetSendSsrc(uint32 send_ssrc) { 224 ASSERT(data_channel_type_ == cricket::DCT_RTP); 225 if (send_ssrc_set_) { 226 return; 227 } 228 send_ssrc_ = send_ssrc; 229 send_ssrc_set_ = true; 230 UpdateState(); 231 } 232 233 void DataChannel::OnMessage(talk_base::Message* msg) { 234 switch (msg->message_id) { 235 case MSG_CHANNELREADY: 236 OnChannelReady(true); 237 break; 238 } 239 } 240 241 // The underlaying data engine is closing. 242 // This function makes sure the DataChannel is disconnected and changes state to 243 // kClosed. 244 void DataChannel::OnDataEngineClose() { 245 DoClose(); 246 } 247 248 void DataChannel::OnDataReceived(cricket::DataChannel* channel, 249 const cricket::ReceiveDataParams& params, 250 const talk_base::Buffer& payload) { 251 uint32 expected_ssrc = 252 (data_channel_type_ == cricket::DCT_RTP) ? receive_ssrc_ : config_.id; 253 if (params.ssrc != expected_ssrc) { 254 return; 255 } 256 257 bool binary = (params.type == cricket::DMT_BINARY); 258 talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary)); 259 if (was_ever_writable_ && observer_) { 260 observer_->OnMessage(*buffer.get()); 261 } else { 262 if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) { 263 // TODO(jiayl): We should close the data channel in this case. 264 LOG(LS_ERROR) 265 << "Queued received data exceeds the max number of packes."; 266 ClearQueuedReceivedData(); 267 } 268 queued_received_data_.push(buffer.release()); 269 } 270 } 271 272 void DataChannel::OnChannelReady(bool writable) { 273 if (!writable) { 274 return; 275 } 276 // Update the readyState and send the queued control message if the channel 277 // is writable for the first time; otherwise it means the channel was blocked 278 // for sending and now unblocked, so send the queued data now. 279 if (!was_ever_writable_) { 280 was_ever_writable_ = true; 281 282 if (data_channel_type_ == cricket::DCT_SCTP && !config_.negotiated) { 283 talk_base::Buffer* payload = new talk_base::Buffer; 284 if (!cricket::WriteDataChannelOpenMessage(label_, config_, payload)) { 285 // TODO(jiayl): close the data channel on this error. 286 LOG(LS_ERROR) << "Could not write data channel OPEN message"; 287 return; 288 } 289 SendOpenMessage(payload); 290 } 291 292 UpdateState(); 293 ASSERT(queued_send_data_.empty()); 294 } else if (state_ == kOpen) { 295 DeliverQueuedSendData(); 296 } 297 } 298 299 void DataChannel::DoClose() { 300 receive_ssrc_set_ = false; 301 send_ssrc_set_ = false; 302 SetState(kClosing); 303 UpdateState(); 304 } 305 306 void DataChannel::UpdateState() { 307 switch (state_) { 308 case kConnecting: { 309 if (send_ssrc_set_ == receive_ssrc_set_) { 310 if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) { 311 connected_to_provider_ = provider_->ConnectDataChannel(this); 312 } 313 if (was_ever_writable_) { 314 // TODO(jiayl): Do not transition to kOpen if we failed to send the 315 // OPEN message. 316 DeliverQueuedControlData(); 317 SetState(kOpen); 318 // If we have received buffers before the channel got writable. 319 // Deliver them now. 320 DeliverQueuedReceivedData(); 321 } 322 } 323 break; 324 } 325 case kOpen: { 326 break; 327 } 328 case kClosing: { 329 DisconnectFromTransport(); 330 331 if (!send_ssrc_set_ && !receive_ssrc_set_) { 332 SetState(kClosed); 333 } 334 break; 335 } 336 case kClosed: 337 break; 338 } 339 } 340 341 void DataChannel::SetState(DataState state) { 342 state_ = state; 343 if (observer_) { 344 observer_->OnStateChange(); 345 } 346 } 347 348 void DataChannel::DisconnectFromTransport() { 349 if (!connected_to_provider_) 350 return; 351 352 provider_->DisconnectDataChannel(this); 353 connected_to_provider_ = false; 354 355 if (data_channel_type_ == cricket::DCT_SCTP) { 356 provider_->RemoveSctpDataStream(config_.id); 357 } 358 } 359 360 void DataChannel::DeliverQueuedReceivedData() { 361 if (!was_ever_writable_ || !observer_) { 362 return; 363 } 364 365 while (!queued_received_data_.empty()) { 366 DataBuffer* buffer = queued_received_data_.front(); 367 observer_->OnMessage(*buffer); 368 queued_received_data_.pop(); 369 delete buffer; 370 } 371 } 372 373 void DataChannel::ClearQueuedReceivedData() { 374 while (!queued_received_data_.empty()) { 375 DataBuffer* buffer = queued_received_data_.front(); 376 queued_received_data_.pop(); 377 delete buffer; 378 } 379 } 380 381 void DataChannel::DeliverQueuedSendData() { 382 ASSERT(was_ever_writable_ && state_ == kOpen); 383 384 // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition 385 // that the readyState is open. According to the standard, the channel should 386 // not become open before the OPEN message is sent. 387 DeliverQueuedControlData(); 388 389 while (!queued_send_data_.empty()) { 390 DataBuffer* buffer = queued_send_data_.front(); 391 cricket::SendDataResult send_result; 392 if (!InternalSendWithoutQueueing(*buffer, &send_result)) { 393 LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result " 394 << send_result; 395 break; 396 } 397 queued_send_data_.pop_front(); 398 delete buffer; 399 } 400 } 401 402 void DataChannel::ClearQueuedControlData() { 403 while (!queued_control_data_.empty()) { 404 const talk_base::Buffer *buf = queued_control_data_.front(); 405 queued_control_data_.pop(); 406 delete buf; 407 } 408 } 409 410 void DataChannel::DeliverQueuedControlData() { 411 ASSERT(was_ever_writable_); 412 while (!queued_control_data_.empty()) { 413 const talk_base::Buffer* buf = queued_control_data_.front(); 414 queued_control_data_.pop(); 415 SendOpenMessage(buf); 416 } 417 } 418 419 void DataChannel::ClearQueuedSendData() { 420 while (!queued_send_data_.empty()) { 421 DataBuffer* buffer = queued_send_data_.front(); 422 queued_send_data_.pop_front(); 423 delete buffer; 424 } 425 } 426 427 bool DataChannel::InternalSendWithoutQueueing( 428 const DataBuffer& buffer, cricket::SendDataResult* send_result) { 429 cricket::SendDataParams send_params; 430 431 if (data_channel_type_ == cricket::DCT_SCTP) { 432 send_params.ordered = config_.ordered; 433 send_params.max_rtx_count = config_.maxRetransmits; 434 send_params.max_rtx_ms = config_.maxRetransmitTime; 435 send_params.ssrc = config_.id; 436 } else { 437 send_params.ssrc = send_ssrc_; 438 } 439 send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT; 440 441 return provider_->SendData(send_params, buffer.data, send_result); 442 } 443 444 bool DataChannel::QueueSendData(const DataBuffer& buffer) { 445 if (queued_send_data_.size() > kMaxQueuedSendDataPackets) { 446 LOG(LS_ERROR) << "Can't buffer any more data in the data channel."; 447 return false; 448 } 449 queued_send_data_.push_back(new DataBuffer(buffer)); 450 return true; 451 } 452 453 void DataChannel::SetSctpSid(int sid) { 454 ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP); 455 config_.id = sid; 456 provider_->AddSctpDataStream(sid); 457 } 458 459 void DataChannel::OnTransportChannelCreated() { 460 ASSERT(data_channel_type_ == cricket::DCT_SCTP); 461 if (!connected_to_provider_) { 462 connected_to_provider_ = provider_->ConnectDataChannel(this); 463 } 464 // The sid may have been unassigned when provider_->ConnectDataChannel was 465 // done. So always add the streams even if connected_to_provider_ is true. 466 if (config_.id >= 0) { 467 provider_->AddSctpDataStream(config_.id); 468 } 469 } 470 471 } // namespace webrtc 472