1 /* 2 * libjingle 3 * Copyright 2012, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 #include "talk/app/webrtc/datachannel.h" 28 29 #include <string> 30 31 #include "talk/app/webrtc/webrtcsession.h" 32 #include "talk/base/logging.h" 33 #include "talk/base/refcount.h" 34 35 namespace webrtc { 36 37 static size_t kMaxQueuedReceivedDataPackets = 100; 38 static size_t kMaxQueuedSendDataPackets = 100; 39 40 talk_base::scoped_refptr<DataChannel> DataChannel::Create( 41 WebRtcSession* session, 42 const std::string& label, 43 const DataChannelInit* config) { 44 talk_base::scoped_refptr<DataChannel> channel( 45 new talk_base::RefCountedObject<DataChannel>(session, label)); 46 if (!channel->Init(config)) { 47 return NULL; 48 } 49 return channel; 50 } 51 52 DataChannel::DataChannel(WebRtcSession* session, const std::string& label) 53 : label_(label), 54 observer_(NULL), 55 state_(kConnecting), 56 was_ever_writable_(false), 57 session_(session), 58 data_session_(NULL), 59 send_ssrc_set_(false), 60 send_ssrc_(0), 61 receive_ssrc_set_(false), 62 receive_ssrc_(0) { 63 } 64 65 bool DataChannel::Init(const DataChannelInit* config) { 66 if (config) { 67 if (session_->data_channel_type() == cricket::DCT_RTP && 68 (config->reliable || 69 config->id != -1 || 70 config->maxRetransmits != -1 || 71 config->maxRetransmitTime != -1)) { 72 LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to " 73 << "invalid DataChannelInit."; 74 return false; 75 } else if (session_->data_channel_type() == cricket::DCT_SCTP) { 76 if (config->id < -1 || 77 config->maxRetransmits < -1 || 78 config->maxRetransmitTime < -1) { 79 LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to " 80 << "invalid DataChannelInit."; 81 return false; 82 } 83 if (config->maxRetransmits != -1 && config->maxRetransmitTime != -1) { 84 LOG(LS_ERROR) << 85 "maxRetransmits and maxRetransmitTime should not be both set."; 86 return false; 87 } 88 } 89 config_ = *config; 90 } 91 return true; 92 } 93 94 bool DataChannel::HasNegotiationCompleted() { 95 return send_ssrc_set_ == receive_ssrc_set_; 96 } 97 98 DataChannel::~DataChannel() { 99 ClearQueuedReceivedData(); 100 ClearQueuedSendData(); 101 } 102 103 void DataChannel::RegisterObserver(DataChannelObserver* observer) { 104 observer_ = observer; 105 DeliverQueuedReceivedData(); 106 } 107 108 void DataChannel::UnregisterObserver() { 109 observer_ = NULL; 110 } 111 112 bool DataChannel::reliable() const { 113 if (session_->data_channel_type() == cricket::DCT_RTP) { 114 return false; 115 } else { 116 return config_.maxRetransmits == -1 && 117 config_.maxRetransmitTime == -1; 118 } 119 } 120 121 uint64 DataChannel::buffered_amount() const { 122 uint64 buffered_amount = 0; 123 for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin(); 124 it != queued_send_data_.end(); 125 ++it) { 126 buffered_amount += (*it)->size(); 127 } 128 return buffered_amount; 129 } 130 131 void DataChannel::Close() { 132 if (state_ == kClosed) 133 return; 134 send_ssrc_ = 0; 135 send_ssrc_set_ = false; 136 SetState(kClosing); 137 UpdateState(); 138 } 139 140 bool DataChannel::Send(const DataBuffer& buffer) { 141 if (state_ != kOpen) { 142 return false; 143 } 144 // If the queue is non-empty, we're waiting for SignalReadyToSend, 145 // so just add to the end of the queue and keep waiting. 146 if (!queued_send_data_.empty()) { 147 return QueueSendData(buffer); 148 } 149 150 cricket::SendDataResult send_result; 151 if (!InternalSendWithoutQueueing(buffer, &send_result)) { 152 if (send_result == cricket::SDR_BLOCK) { 153 return QueueSendData(buffer); 154 } 155 // Fail for other results. 156 // TODO(jiayl): We should close the data channel in this case. 157 return false; 158 } 159 return true; 160 } 161 162 void DataChannel::QueueControl(const talk_base::Buffer* buffer) { 163 queued_control_data_.push(buffer); 164 } 165 166 bool DataChannel::SendControl(const talk_base::Buffer* buffer) { 167 if (state_ != kOpen) { 168 QueueControl(buffer); 169 return true; 170 } 171 if (session_->data_channel_type() == cricket::DCT_RTP) { 172 delete buffer; 173 return false; 174 } 175 176 cricket::SendDataParams send_params; 177 send_params.ssrc = config_.id; 178 send_params.ordered = true; 179 send_params.type = cricket::DMT_CONTROL; 180 181 cricket::SendDataResult send_result; 182 bool retval = session_->data_channel()->SendData( 183 send_params, *buffer, &send_result); 184 if (!retval && send_result == cricket::SDR_BLOCK) { 185 // Link is congested. Queue for later. 186 QueueControl(buffer); 187 } else { 188 delete buffer; 189 } 190 return retval; 191 } 192 193 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) { 194 if (receive_ssrc_set_) { 195 ASSERT(session_->data_channel_type() == cricket::DCT_RTP || 196 receive_ssrc_ == send_ssrc_); 197 return; 198 } 199 receive_ssrc_ = receive_ssrc; 200 receive_ssrc_set_ = true; 201 UpdateState(); 202 } 203 204 // The remote peer request that this channel shall be closed. 205 void DataChannel::RemotePeerRequestClose() { 206 DoClose(); 207 } 208 209 void DataChannel::SetSendSsrc(uint32 send_ssrc) { 210 if (send_ssrc_set_) { 211 ASSERT(session_->data_channel_type() == cricket::DCT_RTP || 212 receive_ssrc_ == send_ssrc_); 213 return; 214 } 215 send_ssrc_ = send_ssrc; 216 send_ssrc_set_ = true; 217 UpdateState(); 218 } 219 220 // The underlaying data engine is closing. 221 // This function make sure the DataChannel is disconneced and change state to 222 // kClosed. 223 void DataChannel::OnDataEngineClose() { 224 DoClose(); 225 } 226 227 void DataChannel::OnDataReceived(cricket::DataChannel* channel, 228 const cricket::ReceiveDataParams& params, 229 const talk_base::Buffer& payload) { 230 if (params.ssrc != receive_ssrc_) { 231 return; 232 } 233 234 bool binary = (params.type == cricket::DMT_BINARY); 235 talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary)); 236 if (was_ever_writable_ && observer_) { 237 observer_->OnMessage(*buffer.get()); 238 } else { 239 if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) { 240 // TODO(jiayl): We should close the data channel in this case. 241 LOG(LS_ERROR) 242 << "Queued received data exceeds the max number of packes."; 243 ClearQueuedReceivedData(); 244 } 245 queued_received_data_.push(buffer.release()); 246 } 247 } 248 249 void DataChannel::OnChannelReady(bool writable) { 250 if (!writable) { 251 return; 252 } 253 // Update the readyState if the channel is writable for the first time; 254 // otherwise it means the channel was blocked for sending and now unblocked, 255 // so send the queued data now. 256 if (!was_ever_writable_) { 257 was_ever_writable_ = true; 258 UpdateState(); 259 } else if (state_ == kOpen) { 260 SendQueuedSendData(); 261 } 262 } 263 264 void DataChannel::DoClose() { 265 receive_ssrc_set_ = false; 266 send_ssrc_set_ = false; 267 SetState(kClosing); 268 UpdateState(); 269 } 270 271 void DataChannel::UpdateState() { 272 switch (state_) { 273 case kConnecting: { 274 if (HasNegotiationCompleted()) { 275 if (!IsConnectedToDataSession()) { 276 ConnectToDataSession(); 277 } 278 if (was_ever_writable_) { 279 SetState(kOpen); 280 // If we have received buffers before the channel got writable. 281 // Deliver them now. 282 DeliverQueuedReceivedData(); 283 } 284 } 285 break; 286 } 287 case kOpen: { 288 break; 289 } 290 case kClosing: { 291 if (IsConnectedToDataSession()) { 292 DisconnectFromDataSession(); 293 } 294 if (HasNegotiationCompleted()) { 295 SetState(kClosed); 296 } 297 break; 298 } 299 case kClosed: 300 break; 301 } 302 } 303 304 void DataChannel::SetState(DataState state) { 305 state_ = state; 306 if (observer_) { 307 observer_->OnStateChange(); 308 } 309 } 310 311 void DataChannel::ConnectToDataSession() { 312 if (!session_->data_channel()) { 313 LOG(LS_ERROR) << "The DataEngine does not exist."; 314 ASSERT(session_->data_channel() != NULL); 315 return; 316 } 317 318 data_session_ = session_->data_channel(); 319 data_session_->SignalReadyToSendData.connect(this, 320 &DataChannel::OnChannelReady); 321 data_session_->SignalDataReceived.connect(this, &DataChannel::OnDataReceived); 322 cricket::StreamParams params = 323 cricket::StreamParams::CreateLegacy(id()); 324 data_session_->media_channel()->AddSendStream(params); 325 data_session_->media_channel()->AddRecvStream(params); 326 } 327 328 void DataChannel::DisconnectFromDataSession() { 329 if (data_session_->media_channel() != NULL) { 330 data_session_->media_channel()->RemoveSendStream(id()); 331 data_session_->media_channel()->RemoveRecvStream(id()); 332 } 333 data_session_->SignalReadyToSendData.disconnect(this); 334 data_session_->SignalDataReceived.disconnect(this); 335 data_session_ = NULL; 336 } 337 338 void DataChannel::DeliverQueuedReceivedData() { 339 if (!was_ever_writable_ || !observer_) { 340 return; 341 } 342 343 while (!queued_received_data_.empty()) { 344 DataBuffer* buffer = queued_received_data_.front(); 345 observer_->OnMessage(*buffer); 346 queued_received_data_.pop(); 347 delete buffer; 348 } 349 } 350 351 void DataChannel::ClearQueuedReceivedData() { 352 while (!queued_received_data_.empty()) { 353 DataBuffer* buffer = queued_received_data_.front(); 354 queued_received_data_.pop(); 355 delete buffer; 356 } 357 } 358 359 void DataChannel::SendQueuedSendData() { 360 DeliverQueuedControlData(); 361 if (!was_ever_writable_) { 362 return; 363 } 364 365 while (!queued_send_data_.empty()) { 366 DataBuffer* buffer = queued_send_data_.front(); 367 cricket::SendDataResult send_result; 368 if (!InternalSendWithoutQueueing(*buffer, &send_result)) { 369 LOG(LS_WARNING) << "SendQueuedSendData aborted due to send_result " 370 << send_result; 371 break; 372 } 373 queued_send_data_.pop_front(); 374 delete buffer; 375 } 376 } 377 378 void DataChannel::DeliverQueuedControlData() { 379 if (was_ever_writable_) { 380 while (!queued_control_data_.empty()) { 381 const talk_base::Buffer *buf = queued_control_data_.front(); 382 queued_control_data_.pop(); 383 SendControl(buf); 384 } 385 } 386 } 387 388 void DataChannel::ClearQueuedSendData() { 389 while (!queued_send_data_.empty()) { 390 DataBuffer* buffer = queued_send_data_.front(); 391 queued_send_data_.pop_front(); 392 delete buffer; 393 } 394 } 395 396 bool DataChannel::InternalSendWithoutQueueing( 397 const DataBuffer& buffer, cricket::SendDataResult* send_result) { 398 cricket::SendDataParams send_params; 399 400 send_params.ssrc = send_ssrc_; 401 if (session_->data_channel_type() == cricket::DCT_SCTP) { 402 send_params.ordered = config_.ordered; 403 send_params.max_rtx_count = config_.maxRetransmits; 404 send_params.max_rtx_ms = config_.maxRetransmitTime; 405 } 406 send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT; 407 408 return session_->data_channel()->SendData(send_params, buffer.data, 409 send_result); 410 } 411 412 bool DataChannel::QueueSendData(const DataBuffer& buffer) { 413 if (queued_send_data_.size() > kMaxQueuedSendDataPackets) { 414 LOG(LS_ERROR) << "Can't buffer any more data in the data channel."; 415 return false; 416 } 417 queued_send_data_.push_back(new DataBuffer(buffer)); 418 return true; 419 } 420 421 } // namespace webrtc 422