1 /* 2 * libjingle SCTP 3 * Copyright 2012 Google Inc, and Robin Seggelmann 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/media/sctp/sctpdataengine.h" 29 30 #include <stdarg.h> 31 #include <stdio.h> 32 #include <sstream> 33 #include <vector> 34 35 #include "talk/base/buffer.h" 36 #include "talk/base/helpers.h" 37 #include "talk/base/logging.h" 38 #include "talk/base/safe_conversions.h" 39 #include "talk/media/base/codec.h" 40 #include "talk/media/base/constants.h" 41 #include "talk/media/base/streamparams.h" 42 #include "usrsctplib/usrsctp.h" 43 44 namespace { 45 typedef cricket::SctpDataMediaChannel::StreamSet StreamSet; 46 // Returns a comma-separated, human-readable list of the stream IDs in 's' 47 std::string ListStreams(const StreamSet& s) { 48 std::stringstream result; 49 bool first = true; 50 for (StreamSet::const_iterator it = s.begin(); it != s.end(); ++it) { 51 if (!first) { 52 result << ", " << *it; 53 } else { 54 result << *it; 55 first = false; 56 } 57 } 58 return result.str(); 59 } 60 61 // Returns a pipe-separated, human-readable list of the SCTP_STREAM_RESET 62 // flags in 'flags' 63 std::string ListFlags(int flags) { 64 std::stringstream result; 65 bool first = true; 66 // Skip past the first 12 chars (strlen("SCTP_STREAM_")) 67 #define MAKEFLAG(X) { X, #X + 12} 68 struct flaginfo_t { 69 int value; 70 const char* name; 71 } flaginfo[] = { 72 MAKEFLAG(SCTP_STREAM_RESET_INCOMING_SSN), 73 MAKEFLAG(SCTP_STREAM_RESET_OUTGOING_SSN), 74 MAKEFLAG(SCTP_STREAM_RESET_DENIED), 75 MAKEFLAG(SCTP_STREAM_RESET_FAILED), 76 MAKEFLAG(SCTP_STREAM_CHANGE_DENIED) 77 }; 78 #undef MAKEFLAG 79 for (int i = 0; i < ARRAY_SIZE(flaginfo); ++i) { 80 if (flags & flaginfo[i].value) { 81 if (!first) result << " | "; 82 result << flaginfo[i].name; 83 first = false; 84 } 85 } 86 return result.str(); 87 } 88 89 // Returns a comma-separated, human-readable list of the integers in 'array'. 90 // All 'num_elems' of them. 91 std::string ListArray(const uint16* array, int num_elems) { 92 std::stringstream result; 93 for (int i = 0; i < num_elems; ++i) { 94 if (i) { 95 result << ", " << array[i]; 96 } else { 97 result << array[i]; 98 } 99 } 100 return result.str(); 101 } 102 } // namespace 103 104 namespace cricket { 105 typedef talk_base::ScopedMessageData<SctpInboundPacket> InboundPacketMessage; 106 typedef talk_base::ScopedMessageData<talk_base::Buffer> OutboundPacketMessage; 107 108 // TODO(ldixon): Find where this is defined, and also check is Sctp really 109 // respects this. 110 static const size_t kSctpMtu = 1280; 111 112 enum { 113 MSG_SCTPINBOUNDPACKET = 1, // MessageData is SctpInboundPacket 114 MSG_SCTPOUTBOUNDPACKET = 2, // MessageData is talk_base:Buffer 115 }; 116 117 struct SctpInboundPacket { 118 talk_base::Buffer buffer; 119 ReceiveDataParams params; 120 // The |flags| parameter is used by SCTP to distinguish notification packets 121 // from other types of packets. 122 int flags; 123 }; 124 125 // Helper for logging SCTP messages. 126 static void debug_sctp_printf(const char *format, ...) { 127 char s[255]; 128 va_list ap; 129 va_start(ap, format); 130 vsnprintf(s, sizeof(s), format, ap); 131 LOG(LS_INFO) << "SCTP: " << s; 132 va_end(ap); 133 } 134 135 // Get the PPID to use for the terminating fragment of this type. 136 static SctpDataMediaChannel::PayloadProtocolIdentifier GetPpid( 137 cricket::DataMessageType type) { 138 switch (type) { 139 default: 140 case cricket::DMT_NONE: 141 return SctpDataMediaChannel::PPID_NONE; 142 case cricket::DMT_CONTROL: 143 return SctpDataMediaChannel::PPID_CONTROL; 144 case cricket::DMT_BINARY: 145 return SctpDataMediaChannel::PPID_BINARY_LAST; 146 case cricket::DMT_TEXT: 147 return SctpDataMediaChannel::PPID_TEXT_LAST; 148 }; 149 } 150 151 static bool GetDataMediaType( 152 SctpDataMediaChannel::PayloadProtocolIdentifier ppid, 153 cricket::DataMessageType *dest) { 154 ASSERT(dest != NULL); 155 switch (ppid) { 156 case SctpDataMediaChannel::PPID_BINARY_PARTIAL: 157 case SctpDataMediaChannel::PPID_BINARY_LAST: 158 *dest = cricket::DMT_BINARY; 159 return true; 160 161 case SctpDataMediaChannel::PPID_TEXT_PARTIAL: 162 case SctpDataMediaChannel::PPID_TEXT_LAST: 163 *dest = cricket::DMT_TEXT; 164 return true; 165 166 case SctpDataMediaChannel::PPID_CONTROL: 167 *dest = cricket::DMT_CONTROL; 168 return true; 169 170 case SctpDataMediaChannel::PPID_NONE: 171 *dest = cricket::DMT_NONE; 172 return true; 173 174 default: 175 return false; 176 } 177 } 178 179 // This is the callback usrsctp uses when there's data to send on the network 180 // that has been wrapped appropriatly for the SCTP protocol. 181 static int OnSctpOutboundPacket(void* addr, void* data, size_t length, 182 uint8_t tos, uint8_t set_df) { 183 SctpDataMediaChannel* channel = static_cast<SctpDataMediaChannel*>(addr); 184 LOG(LS_VERBOSE) << "global OnSctpOutboundPacket():" 185 << "addr: " << addr << "; length: " << length 186 << "; tos: " << std::hex << static_cast<int>(tos) 187 << "; set_df: " << std::hex << static_cast<int>(set_df); 188 // Note: We have to copy the data; the caller will delete it. 189 OutboundPacketMessage* msg = 190 new OutboundPacketMessage(new talk_base::Buffer(data, length)); 191 channel->worker_thread()->Post(channel, MSG_SCTPOUTBOUNDPACKET, msg); 192 return 0; 193 } 194 195 // This is the callback called from usrsctp when data has been received, after 196 // a packet has been interpreted and parsed by usrsctp and found to contain 197 // payload data. It is called by a usrsctp thread. It is assumed this function 198 // will free the memory used by 'data'. 199 static int OnSctpInboundPacket(struct socket* sock, union sctp_sockstore addr, 200 void* data, size_t length, 201 struct sctp_rcvinfo rcv, int flags, 202 void* ulp_info) { 203 SctpDataMediaChannel* channel = static_cast<SctpDataMediaChannel*>(ulp_info); 204 // Post data to the channel's receiver thread (copying it). 205 // TODO(ldixon): Unclear if copy is needed as this method is responsible for 206 // memory cleanup. But this does simplify code. 207 const SctpDataMediaChannel::PayloadProtocolIdentifier ppid = 208 static_cast<SctpDataMediaChannel::PayloadProtocolIdentifier>( 209 talk_base::HostToNetwork32(rcv.rcv_ppid)); 210 cricket::DataMessageType type = cricket::DMT_NONE; 211 if (!GetDataMediaType(ppid, &type) && !(flags & MSG_NOTIFICATION)) { 212 // It's neither a notification nor a recognized data packet. Drop it. 213 LOG(LS_ERROR) << "Received an unknown PPID " << ppid 214 << " on an SCTP packet. Dropping."; 215 } else { 216 SctpInboundPacket* packet = new SctpInboundPacket; 217 packet->buffer.SetData(data, length); 218 packet->params.ssrc = rcv.rcv_sid; 219 packet->params.seq_num = rcv.rcv_ssn; 220 packet->params.timestamp = rcv.rcv_tsn; 221 packet->params.type = type; 222 packet->flags = flags; 223 // The ownership of |packet| transfers to |msg|. 224 InboundPacketMessage* msg = new InboundPacketMessage(packet); 225 channel->worker_thread()->Post(channel, MSG_SCTPINBOUNDPACKET, msg); 226 } 227 free(data); 228 return 1; 229 } 230 231 // Set the initial value of the static SCTP Data Engines reference count. 232 int SctpDataEngine::usrsctp_engines_count = 0; 233 234 SctpDataEngine::SctpDataEngine() { 235 if (usrsctp_engines_count == 0) { 236 // First argument is udp_encapsulation_port, which is not releveant for our 237 // AF_CONN use of sctp. 238 usrsctp_init(0, cricket::OnSctpOutboundPacket, debug_sctp_printf); 239 240 // To turn on/off detailed SCTP debugging. You will also need to have the 241 // SCTP_DEBUG cpp defines flag. 242 // usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL); 243 244 // TODO(ldixon): Consider turning this on/off. 245 usrsctp_sysctl_set_sctp_ecn_enable(0); 246 247 // TODO(ldixon): Consider turning this on/off. 248 // This is not needed right now (we don't do dynamic address changes): 249 // If SCTP Auto-ASCONF is enabled, the peer is informed automatically 250 // when a new address is added or removed. This feature is enabled by 251 // default. 252 // usrsctp_sysctl_set_sctp_auto_asconf(0); 253 254 // TODO(ldixon): Consider turning this on/off. 255 // Add a blackhole sysctl. Setting it to 1 results in no ABORTs 256 // being sent in response to INITs, setting it to 2 results 257 // in no ABORTs being sent for received OOTB packets. 258 // This is similar to the TCP sysctl. 259 // 260 // See: http://lakerest.net/pipermail/sctp-coders/2012-January/009438.html 261 // See: http://svnweb.freebsd.org/base?view=revision&revision=229805 262 // usrsctp_sysctl_set_sctp_blackhole(2); 263 264 // Set the number of default outgoing streams. This is the number we'll 265 // send in the SCTP INIT message. The 'appropriate default' in the 266 // second paragraph of 267 // http://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-05#section-6.2 268 // is cricket::kMaxSctpSid. 269 usrsctp_sysctl_set_sctp_nr_outgoing_streams_default( 270 cricket::kMaxSctpSid); 271 } 272 usrsctp_engines_count++; 273 274 cricket::DataCodec codec(kGoogleSctpDataCodecId, kGoogleSctpDataCodecName, 0); 275 codec.SetParam(kCodecParamPort, kSctpDefaultPort); 276 codecs_.push_back(codec); 277 } 278 279 SctpDataEngine::~SctpDataEngine() { 280 usrsctp_engines_count--; 281 LOG(LS_VERBOSE) << "usrsctp_engines_count:" << usrsctp_engines_count; 282 283 if (usrsctp_engines_count == 0) { 284 // usrsctp_finish() may fail if it's called too soon after the channels are 285 // closed. Wait and try again until it succeeds for up to 3 seconds. 286 for (size_t i = 0; i < 300; ++i) { 287 if (usrsctp_finish() == 0) 288 return; 289 290 talk_base::Thread::SleepMs(10); 291 } 292 LOG(LS_ERROR) << "Failed to shutdown usrsctp."; 293 } 294 } 295 296 DataMediaChannel* SctpDataEngine::CreateChannel( 297 DataChannelType data_channel_type) { 298 if (data_channel_type != DCT_SCTP) { 299 return NULL; 300 } 301 return new SctpDataMediaChannel(talk_base::Thread::Current()); 302 } 303 304 SctpDataMediaChannel::SctpDataMediaChannel(talk_base::Thread* thread) 305 : worker_thread_(thread), 306 local_port_(kSctpDefaultPort), 307 remote_port_(kSctpDefaultPort), 308 sock_(NULL), 309 sending_(false), 310 receiving_(false), 311 debug_name_("SctpDataMediaChannel") { 312 } 313 314 SctpDataMediaChannel::~SctpDataMediaChannel() { 315 CloseSctpSocket(); 316 } 317 318 sockaddr_conn SctpDataMediaChannel::GetSctpSockAddr(int port) { 319 sockaddr_conn sconn = {0}; 320 sconn.sconn_family = AF_CONN; 321 #ifdef HAVE_SCONN_LEN 322 sconn.sconn_len = sizeof(sockaddr_conn); 323 #endif 324 // Note: conversion from int to uint16_t happens here. 325 sconn.sconn_port = talk_base::HostToNetwork16(port); 326 sconn.sconn_addr = this; 327 return sconn; 328 } 329 330 bool SctpDataMediaChannel::OpenSctpSocket() { 331 if (sock_) { 332 LOG(LS_VERBOSE) << debug_name_ 333 << "->Ignoring attempt to re-create existing socket."; 334 return false; 335 } 336 sock_ = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, 337 cricket::OnSctpInboundPacket, NULL, 0, this); 338 if (!sock_) { 339 LOG_ERRNO(LS_ERROR) << debug_name_ << "Failed to create SCTP socket."; 340 return false; 341 } 342 343 // Make the socket non-blocking. Connect, close, shutdown etc will not block 344 // the thread waiting for the socket operation to complete. 345 if (usrsctp_set_non_blocking(sock_, 1) < 0) { 346 LOG_ERRNO(LS_ERROR) << debug_name_ << "Failed to set SCTP to non blocking."; 347 return false; 348 } 349 350 // This ensures that the usrsctp close call deletes the association. This 351 // prevents usrsctp from calling OnSctpOutboundPacket with references to 352 // this class as the address. 353 linger linger_opt; 354 linger_opt.l_onoff = 1; 355 linger_opt.l_linger = 0; 356 if (usrsctp_setsockopt(sock_, SOL_SOCKET, SO_LINGER, &linger_opt, 357 sizeof(linger_opt))) { 358 LOG_ERRNO(LS_ERROR) << debug_name_ << "Failed to set SO_LINGER."; 359 return false; 360 } 361 362 // Enable stream ID resets. 363 struct sctp_assoc_value stream_rst; 364 stream_rst.assoc_id = SCTP_ALL_ASSOC; 365 stream_rst.assoc_value = 1; 366 if (usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, 367 &stream_rst, sizeof(stream_rst))) { 368 LOG_ERRNO(LS_ERROR) << debug_name_ 369 << "Failed to set SCTP_ENABLE_STREAM_RESET."; 370 return false; 371 } 372 373 // Nagle. 374 uint32_t nodelay = 1; 375 if (usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, 376 sizeof(nodelay))) { 377 LOG_ERRNO(LS_ERROR) << debug_name_ << "Failed to set SCTP_NODELAY."; 378 return false; 379 } 380 381 // Subscribe to SCTP event notifications. 382 int event_types[] = {SCTP_ASSOC_CHANGE, 383 SCTP_PEER_ADDR_CHANGE, 384 SCTP_SEND_FAILED_EVENT, 385 SCTP_SENDER_DRY_EVENT, 386 SCTP_STREAM_RESET_EVENT}; 387 struct sctp_event event = {0}; 388 event.se_assoc_id = SCTP_ALL_ASSOC; 389 event.se_on = 1; 390 for (size_t i = 0; i < ARRAY_SIZE(event_types); i++) { 391 event.se_type = event_types[i]; 392 if (usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_EVENT, &event, 393 sizeof(event)) < 0) { 394 LOG_ERRNO(LS_ERROR) << debug_name_ << "Failed to set SCTP_EVENT type: " 395 << event.se_type; 396 return false; 397 } 398 } 399 400 // Register this class as an address for usrsctp. This is used by SCTP to 401 // direct the packets received (by the created socket) to this class. 402 usrsctp_register_address(this); 403 sending_ = true; 404 return true; 405 } 406 407 void SctpDataMediaChannel::CloseSctpSocket() { 408 sending_ = false; 409 if (sock_) { 410 // We assume that SO_LINGER option is set to close the association when 411 // close is called. This means that any pending packets in usrsctp will be 412 // discarded instead of being sent. 413 usrsctp_close(sock_); 414 sock_ = NULL; 415 usrsctp_deregister_address(this); 416 } 417 } 418 419 bool SctpDataMediaChannel::Connect() { 420 LOG(LS_VERBOSE) << debug_name_ << "->Connect()."; 421 422 // If we already have a socket connection, just return. 423 if (sock_) { 424 LOG(LS_WARNING) << debug_name_ << "->Connect(): Ignored as socket " 425 "is already established."; 426 return true; 427 } 428 429 // If no socket (it was closed) try to start it again. This can happen when 430 // the socket we are connecting to closes, does an sctp shutdown handshake, 431 // or behaves unexpectedly causing us to perform a CloseSctpSocket. 432 if (!sock_ && !OpenSctpSocket()) { 433 return false; 434 } 435 436 // Note: conversion from int to uint16_t happens on assignment. 437 sockaddr_conn local_sconn = GetSctpSockAddr(local_port_); 438 if (usrsctp_bind(sock_, reinterpret_cast<sockaddr *>(&local_sconn), 439 sizeof(local_sconn)) < 0) { 440 LOG_ERRNO(LS_ERROR) << debug_name_ << "->Connect(): " 441 << ("Failed usrsctp_bind"); 442 CloseSctpSocket(); 443 return false; 444 } 445 446 // Note: conversion from int to uint16_t happens on assignment. 447 sockaddr_conn remote_sconn = GetSctpSockAddr(remote_port_); 448 int connect_result = usrsctp_connect( 449 sock_, reinterpret_cast<sockaddr *>(&remote_sconn), sizeof(remote_sconn)); 450 if (connect_result < 0 && errno != SCTP_EINPROGRESS) { 451 LOG_ERRNO(LS_ERROR) << debug_name_ << "Failed usrsctp_connect. got errno=" 452 << errno << ", but wanted " << SCTP_EINPROGRESS; 453 CloseSctpSocket(); 454 return false; 455 } 456 return true; 457 } 458 459 void SctpDataMediaChannel::Disconnect() { 460 // TODO(ldixon): Consider calling |usrsctp_shutdown(sock_, ...)| to do a 461 // shutdown handshake and remove the association. 462 CloseSctpSocket(); 463 } 464 465 bool SctpDataMediaChannel::SetSend(bool send) { 466 if (!sending_ && send) { 467 return Connect(); 468 } 469 if (sending_ && !send) { 470 Disconnect(); 471 } 472 return true; 473 } 474 475 bool SctpDataMediaChannel::SetReceive(bool receive) { 476 receiving_ = receive; 477 return true; 478 } 479 480 bool SctpDataMediaChannel::AddSendStream(const StreamParams& stream) { 481 return AddStream(stream); 482 } 483 484 bool SctpDataMediaChannel::RemoveSendStream(uint32 ssrc) { 485 return ResetStream(ssrc); 486 } 487 488 bool SctpDataMediaChannel::AddRecvStream(const StreamParams& stream) { 489 // SCTP DataChannels are always bi-directional and calling AddSendStream will 490 // enable both sending and receiving on the stream. So AddRecvStream is a 491 // no-op. 492 return true; 493 } 494 495 bool SctpDataMediaChannel::RemoveRecvStream(uint32 ssrc) { 496 // SCTP DataChannels are always bi-directional and calling RemoveSendStream 497 // will disable both sending and receiving on the stream. So RemoveRecvStream 498 // is a no-op. 499 return true; 500 } 501 502 bool SctpDataMediaChannel::SendData( 503 const SendDataParams& params, 504 const talk_base::Buffer& payload, 505 SendDataResult* result) { 506 if (result) { 507 // Preset |result| to assume an error. If SendData succeeds, we'll 508 // overwrite |*result| once more at the end. 509 *result = SDR_ERROR; 510 } 511 512 if (!sending_) { 513 LOG(LS_WARNING) << debug_name_ << "->SendData(...): " 514 << "Not sending packet with ssrc=" << params.ssrc 515 << " len=" << payload.length() << " before SetSend(true)."; 516 return false; 517 } 518 519 if (params.type != cricket::DMT_CONTROL && 520 open_streams_.find(params.ssrc) == open_streams_.end()) { 521 LOG(LS_WARNING) << debug_name_ << "->SendData(...): " 522 << "Not sending data because ssrc is unknown: " 523 << params.ssrc; 524 return false; 525 } 526 527 // 528 // Send data using SCTP. 529 ssize_t send_res = 0; // result from usrsctp_sendv. 530 struct sctp_sendv_spa spa = {0}; 531 spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID; 532 spa.sendv_sndinfo.snd_sid = params.ssrc; 533 spa.sendv_sndinfo.snd_ppid = talk_base::HostToNetwork32( 534 GetPpid(params.type)); 535 536 // Ordered implies reliable. 537 if (!params.ordered) { 538 spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED; 539 if (params.max_rtx_count >= 0 || params.max_rtx_ms == 0) { 540 spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; 541 spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX; 542 spa.sendv_prinfo.pr_value = params.max_rtx_count; 543 } else { 544 spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; 545 spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL; 546 spa.sendv_prinfo.pr_value = params.max_rtx_ms; 547 } 548 } 549 550 // We don't fragment. 551 send_res = usrsctp_sendv(sock_, payload.data(), 552 static_cast<size_t>(payload.length()), 553 NULL, 0, &spa, 554 talk_base::checked_cast<socklen_t>(sizeof(spa)), 555 SCTP_SENDV_SPA, 0); 556 if (send_res < 0) { 557 if (errno == SCTP_EWOULDBLOCK) { 558 *result = SDR_BLOCK; 559 LOG(LS_INFO) << debug_name_ << "->SendData(...): EWOULDBLOCK returned"; 560 } else { 561 LOG_ERRNO(LS_ERROR) << "ERROR:" << debug_name_ 562 << "->SendData(...): " 563 << " usrsctp_sendv: "; 564 } 565 return false; 566 } 567 if (result) { 568 // Only way out now is success. 569 *result = SDR_SUCCESS; 570 } 571 return true; 572 } 573 574 // Called by network interface when a packet has been received. 575 void SctpDataMediaChannel::OnPacketReceived( 576 talk_base::Buffer* packet, const talk_base::PacketTime& packet_time) { 577 LOG(LS_VERBOSE) << debug_name_ << "->OnPacketReceived(...): " << " length=" 578 << packet->length() << ", sending: " << sending_; 579 // Only give receiving packets to usrsctp after if connected. This enables two 580 // peers to each make a connect call, but for them not to receive an INIT 581 // packet before they have called connect; least the last receiver of the INIT 582 // packet will have called connect, and a connection will be established. 583 if (sending_) { 584 // Pass received packet to SCTP stack. Once processed by usrsctp, the data 585 // will be will be given to the global OnSctpInboundData, and then, 586 // marshalled by a Post and handled with OnMessage. 587 usrsctp_conninput(this, packet->data(), packet->length(), 0); 588 } else { 589 // TODO(ldixon): Consider caching the packet for very slightly better 590 // reliability. 591 } 592 } 593 594 void SctpDataMediaChannel::OnInboundPacketFromSctpToChannel( 595 SctpInboundPacket* packet) { 596 LOG(LS_VERBOSE) << debug_name_ << "->OnInboundPacketFromSctpToChannel(...): " 597 << "Received SCTP data:" 598 << " ssrc=" << packet->params.ssrc 599 << " notification: " << (packet->flags & MSG_NOTIFICATION) 600 << " length=" << packet->buffer.length(); 601 // Sending a packet with data == NULL (no data) is SCTPs "close the 602 // connection" message. This sets sock_ = NULL; 603 if (!packet->buffer.length() || !packet->buffer.data()) { 604 LOG(LS_INFO) << debug_name_ << "->OnInboundPacketFromSctpToChannel(...): " 605 "No data, closing."; 606 return; 607 } 608 if (packet->flags & MSG_NOTIFICATION) { 609 OnNotificationFromSctp(&packet->buffer); 610 } else { 611 OnDataFromSctpToChannel(packet->params, &packet->buffer); 612 } 613 } 614 615 void SctpDataMediaChannel::OnDataFromSctpToChannel( 616 const ReceiveDataParams& params, talk_base::Buffer* buffer) { 617 if (receiving_) { 618 LOG(LS_VERBOSE) << debug_name_ << "->OnDataFromSctpToChannel(...): " 619 << "Posting with length: " << buffer->length() 620 << " on stream " << params.ssrc; 621 // Reports all received messages to upper layers, no matter whether the sid 622 // is known. 623 SignalDataReceived(params, buffer->data(), buffer->length()); 624 } else { 625 LOG(LS_WARNING) << debug_name_ << "->OnDataFromSctpToChannel(...): " 626 << "Not receiving packet with sid=" << params.ssrc 627 << " len=" << buffer->length() 628 << " before SetReceive(true)."; 629 } 630 } 631 632 bool SctpDataMediaChannel::AddStream(const StreamParams& stream) { 633 if (!stream.has_ssrcs()) { 634 return false; 635 } 636 637 const uint32 ssrc = stream.first_ssrc(); 638 if (open_streams_.find(ssrc) != open_streams_.end()) { 639 LOG(LS_WARNING) << debug_name_ << "->Add(Send|Recv)Stream(...): " 640 << "Not adding data stream '" << stream.id 641 << "' with ssrc=" << ssrc 642 << " because stream is already open."; 643 return false; 644 } else if (queued_reset_streams_.find(ssrc) != queued_reset_streams_.end() 645 || sent_reset_streams_.find(ssrc) != sent_reset_streams_.end()) { 646 LOG(LS_WARNING) << debug_name_ << "->Add(Send|Recv)Stream(...): " 647 << "Not adding data stream '" << stream.id 648 << "' with ssrc=" << ssrc 649 << " because stream is still closing."; 650 return false; 651 } 652 653 open_streams_.insert(ssrc); 654 return true; 655 } 656 657 bool SctpDataMediaChannel::ResetStream(uint32 ssrc) { 658 // We typically get this called twice for the same stream, once each for 659 // Send and Recv. 660 StreamSet::iterator found = open_streams_.find(ssrc); 661 662 if (found == open_streams_.end()) { 663 LOG(LS_VERBOSE) << debug_name_ << "->ResetStream(" << ssrc << "): " 664 << "stream not found."; 665 return false; 666 } else { 667 LOG(LS_VERBOSE) << debug_name_ << "->ResetStream(" << ssrc << "): " 668 << "Removing and queuing RE-CONFIG chunk."; 669 open_streams_.erase(found); 670 } 671 672 // SCTP won't let you have more than one stream reset pending at a time, but 673 // you can close multiple streams in a single reset. So, we keep an internal 674 // queue of streams-to-reset, and send them as one reset message in 675 // SendQueuedStreamResets(). 676 queued_reset_streams_.insert(ssrc); 677 678 // Signal our stream-reset logic that it should try to send now, if it can. 679 SendQueuedStreamResets(); 680 681 // The stream will actually get removed when we get the acknowledgment. 682 return true; 683 } 684 685 void SctpDataMediaChannel::OnNotificationFromSctp(talk_base::Buffer* buffer) { 686 const sctp_notification& notification = 687 reinterpret_cast<const sctp_notification&>(*buffer->data()); 688 ASSERT(notification.sn_header.sn_length == buffer->length()); 689 690 // TODO(ldixon): handle notifications appropriately. 691 switch (notification.sn_header.sn_type) { 692 case SCTP_ASSOC_CHANGE: 693 LOG(LS_VERBOSE) << "SCTP_ASSOC_CHANGE"; 694 OnNotificationAssocChange(notification.sn_assoc_change); 695 break; 696 case SCTP_REMOTE_ERROR: 697 LOG(LS_INFO) << "SCTP_REMOTE_ERROR"; 698 break; 699 case SCTP_SHUTDOWN_EVENT: 700 LOG(LS_INFO) << "SCTP_SHUTDOWN_EVENT"; 701 break; 702 case SCTP_ADAPTATION_INDICATION: 703 LOG(LS_INFO) << "SCTP_ADAPTATION_INDICATION"; 704 break; 705 case SCTP_PARTIAL_DELIVERY_EVENT: 706 LOG(LS_INFO) << "SCTP_PARTIAL_DELIVERY_EVENT"; 707 break; 708 case SCTP_AUTHENTICATION_EVENT: 709 LOG(LS_INFO) << "SCTP_AUTHENTICATION_EVENT"; 710 break; 711 case SCTP_SENDER_DRY_EVENT: 712 LOG(LS_VERBOSE) << "SCTP_SENDER_DRY_EVENT"; 713 SignalReadyToSend(true); 714 break; 715 // TODO(ldixon): Unblock after congestion. 716 case SCTP_NOTIFICATIONS_STOPPED_EVENT: 717 LOG(LS_INFO) << "SCTP_NOTIFICATIONS_STOPPED_EVENT"; 718 break; 719 case SCTP_SEND_FAILED_EVENT: 720 LOG(LS_INFO) << "SCTP_SEND_FAILED_EVENT"; 721 break; 722 case SCTP_STREAM_RESET_EVENT: 723 OnStreamResetEvent(¬ification.sn_strreset_event); 724 break; 725 case SCTP_ASSOC_RESET_EVENT: 726 LOG(LS_INFO) << "SCTP_ASSOC_RESET_EVENT"; 727 break; 728 case SCTP_STREAM_CHANGE_EVENT: 729 LOG(LS_INFO) << "SCTP_STREAM_CHANGE_EVENT"; 730 // An acknowledgment we get after our stream resets have gone through, 731 // if they've failed. We log the message, but don't react -- we don't 732 // keep around the last-transmitted set of SSIDs we wanted to close for 733 // error recovery. It doesn't seem likely to occur, and if so, likely 734 // harmless within the lifetime of a single SCTP association. 735 break; 736 default: 737 LOG(LS_WARNING) << "Unknown SCTP event: " 738 << notification.sn_header.sn_type; 739 break; 740 } 741 } 742 743 void SctpDataMediaChannel::OnNotificationAssocChange( 744 const sctp_assoc_change& change) { 745 switch (change.sac_state) { 746 case SCTP_COMM_UP: 747 LOG(LS_VERBOSE) << "Association change SCTP_COMM_UP"; 748 break; 749 case SCTP_COMM_LOST: 750 LOG(LS_INFO) << "Association change SCTP_COMM_LOST"; 751 break; 752 case SCTP_RESTART: 753 LOG(LS_INFO) << "Association change SCTP_RESTART"; 754 break; 755 case SCTP_SHUTDOWN_COMP: 756 LOG(LS_INFO) << "Association change SCTP_SHUTDOWN_COMP"; 757 break; 758 case SCTP_CANT_STR_ASSOC: 759 LOG(LS_INFO) << "Association change SCTP_CANT_STR_ASSOC"; 760 break; 761 default: 762 LOG(LS_INFO) << "Association change UNKNOWN"; 763 break; 764 } 765 } 766 767 void SctpDataMediaChannel::OnStreamResetEvent( 768 const struct sctp_stream_reset_event* evt) { 769 // A stream reset always involves two RE-CONFIG chunks for us -- we always 770 // simultaneously reset a sid's sequence number in both directions. The 771 // requesting side transmits a RE-CONFIG chunk and waits for the peer to send 772 // one back. Both sides get this SCTP_STREAM_RESET_EVENT when they receive 773 // RE-CONFIGs. 774 const int num_ssrcs = (evt->strreset_length - sizeof(*evt)) / 775 sizeof(evt->strreset_stream_list[0]); 776 LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ 777 << "): Flags = 0x" 778 << std::hex << evt->strreset_flags << " (" 779 << ListFlags(evt->strreset_flags) << ")"; 780 LOG(LS_VERBOSE) << "Assoc = " << evt->strreset_assoc_id << ", Streams = [" 781 << ListArray(evt->strreset_stream_list, num_ssrcs) 782 << "], Open: [" 783 << ListStreams(open_streams_) << "], Q'd: [" 784 << ListStreams(queued_reset_streams_) << "], Sent: [" 785 << ListStreams(sent_reset_streams_) << "]"; 786 787 // If both sides try to reset some streams at the same time (even if they're 788 // disjoint sets), we can get reset failures. 789 if (evt->strreset_flags & SCTP_STREAM_RESET_FAILED) { 790 // OK, just try again. The stream IDs sent over when the RESET_FAILED flag 791 // is set seem to be garbage values. Ignore them. 792 queued_reset_streams_.insert( 793 sent_reset_streams_.begin(), 794 sent_reset_streams_.end()); 795 sent_reset_streams_.clear(); 796 797 } else if (evt->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { 798 // Each side gets an event for each direction of a stream. That is, 799 // closing sid k will make each side receive INCOMING and OUTGOING reset 800 // events for k. As per RFC6525, Section 5, paragraph 2, each side will 801 // get an INCOMING event first. 802 for (int i = 0; i < num_ssrcs; i++) { 803 const int stream_id = evt->strreset_stream_list[i]; 804 805 // See if this stream ID was closed by our peer or ourselves. 806 StreamSet::iterator it = sent_reset_streams_.find(stream_id); 807 808 // The reset was requested locally. 809 if (it != sent_reset_streams_.end()) { 810 LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ 811 << "): local sid " << stream_id << " acknowledged."; 812 sent_reset_streams_.erase(it); 813 814 } else if ((it = open_streams_.find(stream_id)) 815 != open_streams_.end()) { 816 // The peer requested the reset. 817 LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ 818 << "): closing sid " << stream_id; 819 open_streams_.erase(it); 820 SignalStreamClosedRemotely(stream_id); 821 822 } else if ((it = queued_reset_streams_.find(stream_id)) 823 != queued_reset_streams_.end()) { 824 // The peer requested the reset, but there was a local reset 825 // queued. 826 LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ 827 << "): double-sided close for sid " << stream_id; 828 // Both sides want the stream closed, and the peer got to send the 829 // RE-CONFIG first. Treat it like the local Remove(Send|Recv)Stream 830 // finished quickly. 831 queued_reset_streams_.erase(it); 832 833 } else { 834 // This stream is unknown. Sometimes this can be from an 835 // RESET_FAILED-related retransmit. 836 LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ 837 << "): Unknown sid " << stream_id; 838 } 839 } 840 } 841 842 // Always try to send the queued RESET because this call indicates that the 843 // last local RESET or remote RESET has made some progress. 844 SendQueuedStreamResets(); 845 } 846 847 // Puts the specified |param| from the codec identified by |id| into |dest| 848 // and returns true. Or returns false if it wasn't there, leaving |dest| 849 // untouched. 850 static bool GetCodecIntParameter(const std::vector<DataCodec>& codecs, 851 int id, const std::string& name, 852 const std::string& param, int* dest) { 853 std::string value; 854 Codec match_pattern; 855 match_pattern.id = id; 856 match_pattern.name = name; 857 for (size_t i = 0; i < codecs.size(); ++i) { 858 if (codecs[i].Matches(match_pattern)) { 859 if (codecs[i].GetParam(param, &value)) { 860 *dest = talk_base::FromString<int>(value); 861 return true; 862 } 863 } 864 } 865 return false; 866 } 867 868 bool SctpDataMediaChannel::SetSendCodecs(const std::vector<DataCodec>& codecs) { 869 return GetCodecIntParameter( 870 codecs, kGoogleSctpDataCodecId, kGoogleSctpDataCodecName, kCodecParamPort, 871 &remote_port_); 872 } 873 874 bool SctpDataMediaChannel::SetRecvCodecs(const std::vector<DataCodec>& codecs) { 875 return GetCodecIntParameter( 876 codecs, kGoogleSctpDataCodecId, kGoogleSctpDataCodecName, kCodecParamPort, 877 &local_port_); 878 } 879 880 void SctpDataMediaChannel::OnPacketFromSctpToNetwork( 881 talk_base::Buffer* buffer) { 882 if (buffer->length() > kSctpMtu) { 883 LOG(LS_ERROR) << debug_name_ << "->OnPacketFromSctpToNetwork(...): " 884 << "SCTP seems to have made a packet that is bigger " 885 "than its official MTU."; 886 } 887 MediaChannel::SendPacket(buffer); 888 } 889 890 bool SctpDataMediaChannel::SendQueuedStreamResets() { 891 if (!sent_reset_streams_.empty() || queued_reset_streams_.empty()) 892 return true; 893 894 LOG(LS_VERBOSE) << "SendQueuedStreamResets[" << debug_name_ << "]: Sending [" 895 << ListStreams(queued_reset_streams_) << "], Open: [" 896 << ListStreams(open_streams_) << "], Sent: [" 897 << ListStreams(sent_reset_streams_) << "]"; 898 899 const size_t num_streams = queued_reset_streams_.size(); 900 const size_t num_bytes = sizeof(struct sctp_reset_streams) 901 + (num_streams * sizeof(uint16)); 902 903 std::vector<uint8> reset_stream_buf(num_bytes, 0); 904 struct sctp_reset_streams* resetp = reinterpret_cast<sctp_reset_streams*>( 905 &reset_stream_buf[0]); 906 resetp->srs_assoc_id = SCTP_ALL_ASSOC; 907 resetp->srs_flags = SCTP_STREAM_RESET_INCOMING | SCTP_STREAM_RESET_OUTGOING; 908 resetp->srs_number_streams = talk_base::checked_cast<uint16_t>(num_streams); 909 int result_idx = 0; 910 for (StreamSet::iterator it = queued_reset_streams_.begin(); 911 it != queued_reset_streams_.end(); ++it) { 912 resetp->srs_stream_list[result_idx++] = *it; 913 } 914 915 int ret = usrsctp_setsockopt( 916 sock_, IPPROTO_SCTP, SCTP_RESET_STREAMS, resetp, 917 talk_base::checked_cast<socklen_t>(reset_stream_buf.size())); 918 if (ret < 0) { 919 LOG_ERRNO(LS_ERROR) << debug_name_ << "Failed to send a stream reset for " 920 << num_streams << " streams"; 921 return false; 922 } 923 924 // sent_reset_streams_ is empty, and all the queued_reset_streams_ go into 925 // it now. 926 queued_reset_streams_.swap(sent_reset_streams_); 927 return true; 928 } 929 930 void SctpDataMediaChannel::OnMessage(talk_base::Message* msg) { 931 switch (msg->message_id) { 932 case MSG_SCTPINBOUNDPACKET: { 933 talk_base::scoped_ptr<InboundPacketMessage> pdata( 934 static_cast<InboundPacketMessage*>(msg->pdata)); 935 OnInboundPacketFromSctpToChannel(pdata->data().get()); 936 break; 937 } 938 case MSG_SCTPOUTBOUNDPACKET: { 939 talk_base::scoped_ptr<OutboundPacketMessage> pdata( 940 static_cast<OutboundPacketMessage*>(msg->pdata)); 941 OnPacketFromSctpToNetwork(pdata->data().get()); 942 break; 943 } 944 } 945 } 946 } // namespace cricket 947