1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/quic_connection.h" 6 7 #include <string.h> 8 #include <sys/types.h> 9 #include <algorithm> 10 #include <iterator> 11 #include <limits> 12 #include <memory> 13 #include <set> 14 #include <utility> 15 16 #include "base/logging.h" 17 #include "base/stl_util.h" 18 #include "net/base/net_errors.h" 19 #include "net/quic/crypto/quic_decrypter.h" 20 #include "net/quic/crypto/quic_encrypter.h" 21 #include "net/quic/iovector.h" 22 #include "net/quic/quic_bandwidth.h" 23 #include "net/quic/quic_config.h" 24 #include "net/quic/quic_utils.h" 25 26 using base::hash_map; 27 using base::hash_set; 28 using base::StringPiece; 29 using std::list; 30 using std::make_pair; 31 using std::min; 32 using std::max; 33 using std::numeric_limits; 34 using std::vector; 35 using std::set; 36 using std::string; 37 38 int FLAGS_fake_packet_loss_percentage = 0; 39 40 // If true, then QUIC connections will bundle acks with any outgoing packet when 41 // an ack is being delayed. This is an optimization to reduce ack latency and 42 // packet count of pure ack packets. 43 bool FLAGS_bundle_ack_with_outgoing_packet = false; 44 45 namespace net { 46 47 class QuicDecrypter; 48 class QuicEncrypter; 49 50 namespace { 51 52 // The largest gap in packets we'll accept without closing the connection. 53 // This will likely have to be tuned. 54 const QuicPacketSequenceNumber kMaxPacketGap = 5000; 55 56 // Limit the number of FEC groups to two. If we get enough out of order packets 57 // that this becomes limiting, we can revisit. 58 const size_t kMaxFecGroups = 2; 59 60 // Limit the number of undecryptable packets we buffer in 61 // expectation of the CHLO/SHLO arriving. 62 const size_t kMaxUndecryptablePackets = 10; 63 64 bool Near(QuicPacketSequenceNumber a, QuicPacketSequenceNumber b) { 65 QuicPacketSequenceNumber delta = (a > b) ? a - b : b - a; 66 return delta <= kMaxPacketGap; 67 } 68 69 70 // An alarm that is scheduled to send an ack if a timeout occurs. 71 class AckAlarm : public QuicAlarm::Delegate { 72 public: 73 explicit AckAlarm(QuicConnection* connection) 74 : connection_(connection) { 75 } 76 77 virtual QuicTime OnAlarm() OVERRIDE { 78 connection_->SendAck(); 79 return QuicTime::Zero(); 80 } 81 82 private: 83 QuicConnection* connection_; 84 }; 85 86 // This alarm will be scheduled any time a data-bearing packet is sent out. 87 // When the alarm goes off, the connection checks to see if the oldest packets 88 // have been acked, and retransmit them if they have not. 89 class RetransmissionAlarm : public QuicAlarm::Delegate { 90 public: 91 explicit RetransmissionAlarm(QuicConnection* connection) 92 : connection_(connection) { 93 } 94 95 virtual QuicTime OnAlarm() OVERRIDE { 96 connection_->OnRetransmissionTimeout(); 97 return QuicTime::Zero(); 98 } 99 100 private: 101 QuicConnection* connection_; 102 }; 103 104 // An alarm that is scheduled when the sent scheduler requires a 105 // a delay before sending packets and fires when the packet may be sent. 106 class SendAlarm : public QuicAlarm::Delegate { 107 public: 108 explicit SendAlarm(QuicConnection* connection) 109 : connection_(connection) { 110 } 111 112 virtual QuicTime OnAlarm() OVERRIDE { 113 connection_->WriteIfNotBlocked(); 114 // Never reschedule the alarm, since OnCanWrite does that. 115 return QuicTime::Zero(); 116 } 117 118 private: 119 QuicConnection* connection_; 120 }; 121 122 class TimeoutAlarm : public QuicAlarm::Delegate { 123 public: 124 explicit TimeoutAlarm(QuicConnection* connection) 125 : connection_(connection) { 126 } 127 128 virtual QuicTime OnAlarm() OVERRIDE { 129 connection_->CheckForTimeout(); 130 // Never reschedule the alarm, since CheckForTimeout does that. 131 return QuicTime::Zero(); 132 } 133 134 private: 135 QuicConnection* connection_; 136 }; 137 138 // Indicates if any of the frames are intended to be sent with FORCE. 139 // Returns FORCE when one of the frames is a CONNECTION_CLOSE_FRAME. 140 net::QuicConnection::Force HasForcedFrames( 141 const RetransmittableFrames* retransmittable_frames) { 142 if (!retransmittable_frames) { 143 return net::QuicConnection::NO_FORCE; 144 } 145 for (size_t i = 0; i < retransmittable_frames->frames().size(); ++i) { 146 if (retransmittable_frames->frames()[i].type == CONNECTION_CLOSE_FRAME) { 147 return net::QuicConnection::FORCE; 148 } 149 } 150 return net::QuicConnection::NO_FORCE; 151 } 152 153 net::IsHandshake HasCryptoHandshake( 154 const RetransmittableFrames* retransmittable_frames) { 155 if (!retransmittable_frames) { 156 return net::NOT_HANDSHAKE; 157 } 158 for (size_t i = 0; i < retransmittable_frames->frames().size(); ++i) { 159 if (retransmittable_frames->frames()[i].type == STREAM_FRAME && 160 retransmittable_frames->frames()[i].stream_frame->stream_id == 161 kCryptoStreamId) { 162 return net::IS_HANDSHAKE; 163 } 164 } 165 return net::NOT_HANDSHAKE; 166 } 167 168 } // namespace 169 170 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") 171 172 QuicConnection::QuicConnection(QuicGuid guid, 173 IPEndPoint address, 174 QuicConnectionHelperInterface* helper, 175 QuicPacketWriter* writer, 176 bool is_server, 177 const QuicVersionVector& supported_versions) 178 : framer_(supported_versions, 179 helper->GetClock()->ApproximateNow(), 180 is_server), 181 helper_(helper), 182 writer_(writer), 183 encryption_level_(ENCRYPTION_NONE), 184 clock_(helper->GetClock()), 185 random_generator_(helper->GetRandomGenerator()), 186 guid_(guid), 187 peer_address_(address), 188 largest_seen_packet_with_ack_(0), 189 pending_version_negotiation_packet_(false), 190 write_blocked_(false), 191 received_packet_manager_(kTCP), 192 ack_alarm_(helper->CreateAlarm(new AckAlarm(this))), 193 retransmission_alarm_(helper->CreateAlarm(new RetransmissionAlarm(this))), 194 send_alarm_(helper->CreateAlarm(new SendAlarm(this))), 195 resume_writes_alarm_(helper->CreateAlarm(new SendAlarm(this))), 196 timeout_alarm_(helper->CreateAlarm(new TimeoutAlarm(this))), 197 debug_visitor_(NULL), 198 packet_creator_(guid_, &framer_, random_generator_, is_server), 199 packet_generator_(this, NULL, &packet_creator_), 200 idle_network_timeout_( 201 QuicTime::Delta::FromSeconds(kDefaultInitialTimeoutSecs)), 202 overall_connection_timeout_(QuicTime::Delta::Infinite()), 203 creation_time_(clock_->ApproximateNow()), 204 time_of_last_received_packet_(clock_->ApproximateNow()), 205 time_of_last_sent_packet_(clock_->ApproximateNow()), 206 sequence_number_of_last_inorder_packet_(0), 207 sent_packet_manager_(is_server, this, clock_, kTCP), 208 version_negotiation_state_(START_NEGOTIATION), 209 is_server_(is_server), 210 connected_(true), 211 address_migrating_(false) { 212 if (!is_server_) { 213 // Pacing will be enabled if the client negotiates it. 214 sent_packet_manager_.MaybeEnablePacing(); 215 } 216 DVLOG(1) << ENDPOINT << "Created connection with guid: " << guid; 217 timeout_alarm_->Set(clock_->ApproximateNow().Add(idle_network_timeout_)); 218 framer_.set_visitor(this); 219 framer_.set_received_entropy_calculator(&received_packet_manager_); 220 } 221 222 QuicConnection::~QuicConnection() { 223 STLDeleteElements(&undecryptable_packets_); 224 STLDeleteValues(&group_map_); 225 for (QueuedPacketList::iterator it = queued_packets_.begin(); 226 it != queued_packets_.end(); ++it) { 227 delete it->packet; 228 } 229 } 230 231 void QuicConnection::SetFromConfig(const QuicConfig& config) { 232 DCHECK_LT(0u, config.server_initial_congestion_window()); 233 SetIdleNetworkTimeout(config.idle_connection_state_lifetime()); 234 sent_packet_manager_.SetFromConfig(config); 235 // TODO(satyamshekhar): Set congestion control and ICSL also. 236 } 237 238 bool QuicConnection::SelectMutualVersion( 239 const QuicVersionVector& available_versions) { 240 // Try to find the highest mutual version by iterating over supported 241 // versions, starting with the highest, and breaking out of the loop once we 242 // find a matching version in the provided available_versions vector. 243 const QuicVersionVector& supported_versions = framer_.supported_versions(); 244 for (size_t i = 0; i < supported_versions.size(); ++i) { 245 const QuicVersion& version = supported_versions[i]; 246 if (std::find(available_versions.begin(), available_versions.end(), 247 version) != available_versions.end()) { 248 framer_.set_version(version); 249 return true; 250 } 251 } 252 253 return false; 254 } 255 256 void QuicConnection::OnError(QuicFramer* framer) { 257 // Packets that we cannot decrypt are dropped. 258 // TODO(rch): add stats to measure this. 259 if (!connected_ || framer->error() == QUIC_DECRYPTION_FAILURE) { 260 return; 261 } 262 SendConnectionCloseWithDetails(framer->error(), framer->detailed_error()); 263 } 264 265 void QuicConnection::OnPacket() { 266 DCHECK(last_stream_frames_.empty() && 267 last_goaway_frames_.empty() && 268 last_rst_frames_.empty() && 269 last_ack_frames_.empty() && 270 last_congestion_frames_.empty()); 271 } 272 273 void QuicConnection::OnPublicResetPacket( 274 const QuicPublicResetPacket& packet) { 275 if (debug_visitor_) { 276 debug_visitor_->OnPublicResetPacket(packet); 277 } 278 CloseConnection(QUIC_PUBLIC_RESET, true); 279 } 280 281 bool QuicConnection::OnProtocolVersionMismatch(QuicVersion received_version) { 282 DVLOG(1) << ENDPOINT << "Received packet with mismatched version " 283 << received_version; 284 // TODO(satyamshekhar): Implement no server state in this mode. 285 if (!is_server_) { 286 LOG(DFATAL) << ENDPOINT << "Framer called OnProtocolVersionMismatch. " 287 << "Closing connection."; 288 CloseConnection(QUIC_INTERNAL_ERROR, false); 289 return false; 290 } 291 DCHECK_NE(version(), received_version); 292 293 if (debug_visitor_) { 294 debug_visitor_->OnProtocolVersionMismatch(received_version); 295 } 296 297 switch (version_negotiation_state_) { 298 case START_NEGOTIATION: 299 if (!framer_.IsSupportedVersion(received_version)) { 300 SendVersionNegotiationPacket(); 301 version_negotiation_state_ = NEGOTIATION_IN_PROGRESS; 302 return false; 303 } 304 break; 305 306 case NEGOTIATION_IN_PROGRESS: 307 if (!framer_.IsSupportedVersion(received_version)) { 308 SendVersionNegotiationPacket(); 309 return false; 310 } 311 break; 312 313 case NEGOTIATED_VERSION: 314 // Might be old packets that were sent by the client before the version 315 // was negotiated. Drop these. 316 return false; 317 318 default: 319 DCHECK(false); 320 } 321 322 version_negotiation_state_ = NEGOTIATED_VERSION; 323 visitor_->OnSuccessfulVersionNegotiation(received_version); 324 DVLOG(1) << ENDPOINT << "version negotiated " << received_version; 325 326 // Store the new version. 327 framer_.set_version(received_version); 328 329 // TODO(satyamshekhar): Store the sequence number of this packet and close the 330 // connection if we ever received a packet with incorrect version and whose 331 // sequence number is greater. 332 return true; 333 } 334 335 // Handles version negotiation for client connection. 336 void QuicConnection::OnVersionNegotiationPacket( 337 const QuicVersionNegotiationPacket& packet) { 338 if (is_server_) { 339 LOG(DFATAL) << ENDPOINT << "Framer parsed VersionNegotiationPacket." 340 << " Closing connection."; 341 CloseConnection(QUIC_INTERNAL_ERROR, false); 342 return; 343 } 344 if (debug_visitor_) { 345 debug_visitor_->OnVersionNegotiationPacket(packet); 346 } 347 348 if (version_negotiation_state_ != START_NEGOTIATION) { 349 // Possibly a duplicate version negotiation packet. 350 return; 351 } 352 353 if (std::find(packet.versions.begin(), 354 packet.versions.end(), version()) != 355 packet.versions.end()) { 356 DLOG(WARNING) << ENDPOINT << "The server already supports our version. " 357 << "It should have accepted our connection."; 358 // Just drop the connection. 359 CloseConnection(QUIC_INVALID_VERSION_NEGOTIATION_PACKET, false); 360 return; 361 } 362 363 if (!SelectMutualVersion(packet.versions)) { 364 SendConnectionCloseWithDetails(QUIC_INVALID_VERSION, 365 "no common version found"); 366 return; 367 } 368 369 DVLOG(1) << ENDPOINT << "negotiating version " << version(); 370 server_supported_versions_ = packet.versions; 371 version_negotiation_state_ = NEGOTIATION_IN_PROGRESS; 372 RetransmitUnackedPackets(ALL_PACKETS); 373 } 374 375 void QuicConnection::OnRevivedPacket() { 376 } 377 378 bool QuicConnection::OnUnauthenticatedHeader(const QuicPacketHeader& header) { 379 return true; 380 } 381 382 bool QuicConnection::OnPacketHeader(const QuicPacketHeader& header) { 383 if (debug_visitor_) { 384 debug_visitor_->OnPacketHeader(header); 385 } 386 387 if (!ProcessValidatedPacket()) { 388 return false; 389 } 390 391 // Will be decrement below if we fall through to return true; 392 ++stats_.packets_dropped; 393 394 if (header.public_header.guid != guid_) { 395 DVLOG(1) << ENDPOINT << "Ignoring packet from unexpected GUID: " 396 << header.public_header.guid << " instead of " << guid_; 397 return false; 398 } 399 400 if (!Near(header.packet_sequence_number, 401 last_header_.packet_sequence_number)) { 402 DVLOG(1) << ENDPOINT << "Packet " << header.packet_sequence_number 403 << " out of bounds. Discarding"; 404 SendConnectionCloseWithDetails(QUIC_INVALID_PACKET_HEADER, 405 "Packet sequence number out of bounds"); 406 return false; 407 } 408 409 // If this packet has already been seen, or that the sender 410 // has told us will not be retransmitted, then stop processing the packet. 411 if (!received_packet_manager_.IsAwaitingPacket( 412 header.packet_sequence_number)) { 413 return false; 414 } 415 416 if (version_negotiation_state_ != NEGOTIATED_VERSION) { 417 if (is_server_) { 418 if (!header.public_header.version_flag) { 419 DLOG(WARNING) << ENDPOINT << "Got packet without version flag before " 420 << "version negotiated."; 421 // Packets should have the version flag till version negotiation is 422 // done. 423 CloseConnection(QUIC_INVALID_VERSION, false); 424 return false; 425 } else { 426 DCHECK_EQ(1u, header.public_header.versions.size()); 427 DCHECK_EQ(header.public_header.versions[0], version()); 428 version_negotiation_state_ = NEGOTIATED_VERSION; 429 visitor_->OnSuccessfulVersionNegotiation(version()); 430 } 431 } else { 432 DCHECK(!header.public_header.version_flag); 433 // If the client gets a packet without the version flag from the server 434 // it should stop sending version since the version negotiation is done. 435 packet_creator_.StopSendingVersion(); 436 version_negotiation_state_ = NEGOTIATED_VERSION; 437 visitor_->OnSuccessfulVersionNegotiation(version()); 438 } 439 } 440 441 DCHECK_EQ(NEGOTIATED_VERSION, version_negotiation_state_); 442 443 --stats_.packets_dropped; 444 DVLOG(1) << ENDPOINT << "Received packet header: " << header; 445 last_header_ = header; 446 DCHECK(connected_); 447 return true; 448 } 449 450 void QuicConnection::OnFecProtectedPayload(StringPiece payload) { 451 DCHECK_EQ(IN_FEC_GROUP, last_header_.is_in_fec_group); 452 DCHECK_NE(0u, last_header_.fec_group); 453 QuicFecGroup* group = GetFecGroup(); 454 if (group != NULL) { 455 group->Update(last_header_, payload); 456 } 457 } 458 459 bool QuicConnection::OnStreamFrame(const QuicStreamFrame& frame) { 460 DCHECK(connected_); 461 if (debug_visitor_) { 462 debug_visitor_->OnStreamFrame(frame); 463 } 464 last_stream_frames_.push_back(frame); 465 return true; 466 } 467 468 bool QuicConnection::OnAckFrame(const QuicAckFrame& incoming_ack) { 469 DCHECK(connected_); 470 if (debug_visitor_) { 471 debug_visitor_->OnAckFrame(incoming_ack); 472 } 473 DVLOG(1) << ENDPOINT << "OnAckFrame: " << incoming_ack; 474 475 if (last_header_.packet_sequence_number <= largest_seen_packet_with_ack_) { 476 DVLOG(1) << ENDPOINT << "Received an old ack frame: ignoring"; 477 return true; 478 } 479 480 if (!ValidateAckFrame(incoming_ack)) { 481 SendConnectionClose(QUIC_INVALID_ACK_DATA); 482 return false; 483 } 484 485 last_ack_frames_.push_back(incoming_ack); 486 return connected_; 487 } 488 489 void QuicConnection::ProcessAckFrame(const QuicAckFrame& incoming_ack) { 490 largest_seen_packet_with_ack_ = last_header_.packet_sequence_number; 491 492 received_packet_manager_.UpdatePacketInformationReceivedByPeer(incoming_ack); 493 received_packet_manager_.UpdatePacketInformationSentByPeer(incoming_ack); 494 // Possibly close any FecGroups which are now irrelevant. 495 CloseFecGroupsBefore(incoming_ack.sent_info.least_unacked + 1); 496 497 sent_entropy_manager_.ClearEntropyBefore( 498 received_packet_manager_.least_packet_awaited_by_peer() - 1); 499 500 bool reset_retransmission_alarm = 501 sent_packet_manager_.OnIncomingAck(incoming_ack.received_info, 502 time_of_last_received_packet_); 503 if (sent_packet_manager_.HasPendingRetransmissions()) { 504 WriteIfNotBlocked(); 505 } 506 507 if (reset_retransmission_alarm) { 508 retransmission_alarm_->Cancel(); 509 // Reset the RTO and FEC alarms if the are unacked packets. 510 if (sent_packet_manager_.HasUnackedPackets()) { 511 QuicTime::Delta retransmission_delay = 512 sent_packet_manager_.GetRetransmissionDelay(); 513 retransmission_alarm_->Set( 514 clock_->ApproximateNow().Add(retransmission_delay)); 515 } 516 } 517 } 518 519 bool QuicConnection::OnCongestionFeedbackFrame( 520 const QuicCongestionFeedbackFrame& feedback) { 521 DCHECK(connected_); 522 if (debug_visitor_) { 523 debug_visitor_->OnCongestionFeedbackFrame(feedback); 524 } 525 last_congestion_frames_.push_back(feedback); 526 return connected_; 527 } 528 529 bool QuicConnection::ValidateAckFrame(const QuicAckFrame& incoming_ack) { 530 if (incoming_ack.received_info.largest_observed > 531 packet_creator_.sequence_number()) { 532 DLOG(ERROR) << ENDPOINT << "Peer's observed unsent packet:" 533 << incoming_ack.received_info.largest_observed << " vs " 534 << packet_creator_.sequence_number(); 535 // We got an error for data we have not sent. Error out. 536 return false; 537 } 538 539 if (incoming_ack.received_info.largest_observed < 540 received_packet_manager_.peer_largest_observed_packet()) { 541 DLOG(ERROR) << ENDPOINT << "Peer's largest_observed packet decreased:" 542 << incoming_ack.received_info.largest_observed << " vs " 543 << received_packet_manager_.peer_largest_observed_packet(); 544 // A new ack has a diminished largest_observed value. Error out. 545 // If this was an old packet, we wouldn't even have checked. 546 return false; 547 } 548 549 if (incoming_ack.sent_info.least_unacked < 550 received_packet_manager_.peer_least_packet_awaiting_ack()) { 551 DLOG(ERROR) << ENDPOINT << "Peer's sent low least_unacked: " 552 << incoming_ack.sent_info.least_unacked << " vs " 553 << received_packet_manager_.peer_least_packet_awaiting_ack(); 554 // We never process old ack frames, so this number should only increase. 555 return false; 556 } 557 558 if (incoming_ack.sent_info.least_unacked > 559 last_header_.packet_sequence_number) { 560 DLOG(ERROR) << ENDPOINT << "Peer sent least_unacked:" 561 << incoming_ack.sent_info.least_unacked 562 << " greater than the enclosing packet sequence number:" 563 << last_header_.packet_sequence_number; 564 return false; 565 } 566 567 if (!incoming_ack.received_info.missing_packets.empty() && 568 *incoming_ack.received_info.missing_packets.rbegin() > 569 incoming_ack.received_info.largest_observed) { 570 DLOG(ERROR) << ENDPOINT << "Peer sent missing packet: " 571 << *incoming_ack.received_info.missing_packets.rbegin() 572 << " which is greater than largest observed: " 573 << incoming_ack.received_info.largest_observed; 574 return false; 575 } 576 577 if (!incoming_ack.received_info.missing_packets.empty() && 578 *incoming_ack.received_info.missing_packets.begin() < 579 received_packet_manager_.least_packet_awaited_by_peer()) { 580 DLOG(ERROR) << ENDPOINT << "Peer sent missing packet: " 581 << *incoming_ack.received_info.missing_packets.begin() 582 << " which is smaller than least_packet_awaited_by_peer_: " 583 << received_packet_manager_.least_packet_awaited_by_peer(); 584 return false; 585 } 586 587 if (!sent_entropy_manager_.IsValidEntropy( 588 incoming_ack.received_info.largest_observed, 589 incoming_ack.received_info.missing_packets, 590 incoming_ack.received_info.entropy_hash)) { 591 DLOG(ERROR) << ENDPOINT << "Peer sent invalid entropy."; 592 return false; 593 } 594 595 return true; 596 } 597 598 void QuicConnection::OnFecData(const QuicFecData& fec) { 599 DCHECK_EQ(IN_FEC_GROUP, last_header_.is_in_fec_group); 600 DCHECK_NE(0u, last_header_.fec_group); 601 QuicFecGroup* group = GetFecGroup(); 602 if (group != NULL) { 603 group->UpdateFec(last_header_.packet_sequence_number, 604 last_header_.entropy_flag, fec); 605 } 606 } 607 608 bool QuicConnection::OnRstStreamFrame(const QuicRstStreamFrame& frame) { 609 DCHECK(connected_); 610 if (debug_visitor_) { 611 debug_visitor_->OnRstStreamFrame(frame); 612 } 613 DVLOG(1) << ENDPOINT << "Stream reset with error " 614 << QuicUtils::StreamErrorToString(frame.error_code); 615 last_rst_frames_.push_back(frame); 616 return connected_; 617 } 618 619 bool QuicConnection::OnConnectionCloseFrame( 620 const QuicConnectionCloseFrame& frame) { 621 DCHECK(connected_); 622 if (debug_visitor_) { 623 debug_visitor_->OnConnectionCloseFrame(frame); 624 } 625 DVLOG(1) << ENDPOINT << "Connection " << guid() << " closed with error " 626 << QuicUtils::ErrorToString(frame.error_code) 627 << " " << frame.error_details; 628 last_close_frames_.push_back(frame); 629 return connected_; 630 } 631 632 bool QuicConnection::OnGoAwayFrame(const QuicGoAwayFrame& frame) { 633 DCHECK(connected_); 634 DVLOG(1) << ENDPOINT << "Go away received with error " 635 << QuicUtils::ErrorToString(frame.error_code) 636 << " and reason:" << frame.reason_phrase; 637 last_goaway_frames_.push_back(frame); 638 return connected_; 639 } 640 641 void QuicConnection::OnPacketComplete() { 642 // Don't do anything if this packet closed the connection. 643 if (!connected_) { 644 ClearLastFrames(); 645 return; 646 } 647 648 DVLOG(1) << ENDPOINT << (last_packet_revived_ ? "Revived" : "Got") 649 << " packet " << last_header_.packet_sequence_number 650 << " with " << last_ack_frames_.size() << " acks, " 651 << last_congestion_frames_.size() << " congestions, " 652 << last_goaway_frames_.size() << " goaways, " 653 << last_rst_frames_.size() << " rsts, " 654 << last_close_frames_.size() << " closes, " 655 << last_stream_frames_.size() 656 << " stream frames for " << last_header_.public_header.guid; 657 658 // Must called before ack processing, because processing acks removes entries 659 // from unacket_packets_, increasing the least_unacked. 660 const bool last_packet_should_instigate_ack = ShouldLastPacketInstigateAck(); 661 662 // If the incoming packet was missing, send an ack immediately. 663 bool send_ack_immediately = received_packet_manager_.IsMissing( 664 last_header_.packet_sequence_number); 665 666 // Ensure the visitor can process the stream frames before recording and 667 // processing the rest of the packet. 668 if (last_stream_frames_.empty() || 669 visitor_->OnStreamFrames(last_stream_frames_)) { 670 received_packet_manager_.RecordPacketReceived(last_size_, 671 last_header_, 672 time_of_last_received_packet_, 673 last_packet_revived_); 674 for (size_t i = 0; i < last_stream_frames_.size(); ++i) { 675 stats_.stream_bytes_received += 676 last_stream_frames_[i].data.TotalBufferSize(); 677 } 678 } 679 680 // Process stream resets, then acks, then congestion feedback. 681 for (size_t i = 0; i < last_goaway_frames_.size(); ++i) { 682 visitor_->OnGoAway(last_goaway_frames_[i]); 683 } 684 for (size_t i = 0; i < last_rst_frames_.size(); ++i) { 685 visitor_->OnRstStream(last_rst_frames_[i]); 686 } 687 for (size_t i = 0; i < last_ack_frames_.size(); ++i) { 688 ProcessAckFrame(last_ack_frames_[i]); 689 } 690 for (size_t i = 0; i < last_congestion_frames_.size(); ++i) { 691 sent_packet_manager_.OnIncomingQuicCongestionFeedbackFrame( 692 last_congestion_frames_[i], time_of_last_received_packet_); 693 } 694 if (!last_close_frames_.empty()) { 695 CloseConnection(last_close_frames_[0].error_code, true); 696 DCHECK(!connected_); 697 } 698 699 // If there are new missing packets to report, send an ack immediately. 700 if (received_packet_manager_.HasNewMissingPackets()) { 701 send_ack_immediately = true; 702 } 703 704 MaybeSendInResponseToPacket(send_ack_immediately, 705 last_packet_should_instigate_ack); 706 707 ClearLastFrames(); 708 } 709 710 void QuicConnection::ClearLastFrames() { 711 last_stream_frames_.clear(); 712 last_goaway_frames_.clear(); 713 last_rst_frames_.clear(); 714 last_ack_frames_.clear(); 715 last_congestion_frames_.clear(); 716 } 717 718 QuicAckFrame* QuicConnection::CreateAckFrame() { 719 QuicAckFrame* outgoing_ack = new QuicAckFrame(); 720 received_packet_manager_.UpdateReceivedPacketInfo( 721 &(outgoing_ack->received_info), clock_->ApproximateNow()); 722 UpdateSentPacketInfo(&(outgoing_ack->sent_info)); 723 DVLOG(1) << ENDPOINT << "Creating ack frame: " << *outgoing_ack; 724 return outgoing_ack; 725 } 726 727 QuicCongestionFeedbackFrame* QuicConnection::CreateFeedbackFrame() { 728 return new QuicCongestionFeedbackFrame(outgoing_congestion_feedback_); 729 } 730 731 bool QuicConnection::ShouldLastPacketInstigateAck() { 732 if (!last_stream_frames_.empty() || 733 !last_goaway_frames_.empty() || 734 !last_rst_frames_.empty()) { 735 return true; 736 } 737 738 // If the peer is still waiting for a packet that we are no 739 // longer planning to send, we should send an ack to raise 740 // the high water mark. 741 if (!last_ack_frames_.empty() && 742 !last_ack_frames_.back().received_info.missing_packets.empty()) { 743 return sent_packet_manager_.GetLeastUnackedSentPacket() > 744 *last_ack_frames_.back().received_info.missing_packets.begin(); 745 } 746 return false; 747 } 748 749 void QuicConnection::MaybeSendInResponseToPacket( 750 bool send_ack_immediately, 751 bool last_packet_should_instigate_ack) { 752 // |include_ack| is false since we decide about ack bundling below. 753 ScopedPacketBundler bundler(this, false); 754 755 if (last_packet_should_instigate_ack) { 756 // In general, we ack every second packet. When we don't ack the first 757 // packet, we set the delayed ack alarm. Thus, if the ack alarm is set 758 // then we know this is the second packet, and we should send an ack. 759 if (send_ack_immediately || ack_alarm_->IsSet()) { 760 SendAck(); 761 DCHECK(!ack_alarm_->IsSet()); 762 } else { 763 ack_alarm_->Set(clock_->ApproximateNow().Add( 764 sent_packet_manager_.DelayedAckTime())); 765 DVLOG(1) << "Ack timer set; next packet or timer will trigger ACK."; 766 } 767 } 768 769 if (!last_ack_frames_.empty()) { 770 // Now the we have received an ack, we might be able to send packets which 771 // are queued locally, or drain streams which are blocked. 772 QuicTime::Delta delay = sent_packet_manager_.TimeUntilSend( 773 time_of_last_received_packet_, NOT_RETRANSMISSION, 774 HAS_RETRANSMITTABLE_DATA, NOT_HANDSHAKE); 775 if (delay.IsZero()) { 776 send_alarm_->Cancel(); 777 WriteIfNotBlocked(); 778 } else if (!delay.IsInfinite()) { 779 send_alarm_->Cancel(); 780 send_alarm_->Set(time_of_last_received_packet_.Add(delay)); 781 } 782 } 783 } 784 785 void QuicConnection::SendVersionNegotiationPacket() { 786 scoped_ptr<QuicEncryptedPacket> version_packet( 787 packet_creator_.SerializeVersionNegotiationPacket( 788 framer_.supported_versions())); 789 // TODO(satyamshekhar): implement zero server state negotiation. 790 WriteResult result = 791 writer_->WritePacket(version_packet->data(), version_packet->length(), 792 self_address().address(), peer_address(), this); 793 if (result.status == WRITE_STATUS_BLOCKED) { 794 write_blocked_ = true; 795 } 796 if (result.status == WRITE_STATUS_OK || 797 (result.status == WRITE_STATUS_BLOCKED && 798 writer_->IsWriteBlockedDataBuffered())) { 799 pending_version_negotiation_packet_ = false; 800 return; 801 } 802 if (result.status == WRITE_STATUS_ERROR) { 803 // We can't send an error as the socket is presumably borked. 804 CloseConnection(QUIC_PACKET_WRITE_ERROR, false); 805 } 806 pending_version_negotiation_packet_ = true; 807 } 808 809 QuicConsumedData QuicConnection::SendStreamData( 810 QuicStreamId id, 811 const IOVector& data, 812 QuicStreamOffset offset, 813 bool fin, 814 QuicAckNotifier::DelegateInterface* delegate) { 815 if (!fin && data.Empty()) { 816 LOG(DFATAL) << "Attempt to send empty stream frame"; 817 } 818 819 // This notifier will be owned by the AckNotifierManager (or deleted below if 820 // no data or FIN was consumed). 821 QuicAckNotifier* notifier = NULL; 822 if (delegate) { 823 notifier = new QuicAckNotifier(delegate); 824 } 825 826 // Opportunistically bundle an ack with this outgoing packet, unless it's the 827 // crypto stream. 828 ScopedPacketBundler ack_bundler(this, id != kCryptoStreamId); 829 QuicConsumedData consumed_data = 830 packet_generator_.ConsumeData(id, data, offset, fin, notifier); 831 832 if (notifier && 833 (consumed_data.bytes_consumed == 0 && !consumed_data.fin_consumed)) { 834 // No data was consumed, nor was a fin consumed, so delete the notifier. 835 delete notifier; 836 } 837 838 return consumed_data; 839 } 840 841 void QuicConnection::SendRstStream(QuicStreamId id, 842 QuicRstStreamErrorCode error) { 843 DVLOG(1) << "Sending RST_STREAM: " << id << " code: " << error; 844 // Opportunistically bundle an ack with this outgoing packet. 845 ScopedPacketBundler ack_bundler(this, true); 846 packet_generator_.AddControlFrame( 847 QuicFrame(new QuicRstStreamFrame(id, error))); 848 } 849 850 const QuicConnectionStats& QuicConnection::GetStats() { 851 // Update rtt and estimated bandwidth. 852 stats_.rtt = sent_packet_manager_.SmoothedRtt().ToMicroseconds(); 853 stats_.estimated_bandwidth = 854 sent_packet_manager_.BandwidthEstimate().ToBytesPerSecond(); 855 return stats_; 856 } 857 858 void QuicConnection::ProcessUdpPacket(const IPEndPoint& self_address, 859 const IPEndPoint& peer_address, 860 const QuicEncryptedPacket& packet) { 861 if (!connected_) { 862 return; 863 } 864 if (debug_visitor_) { 865 debug_visitor_->OnPacketReceived(self_address, peer_address, packet); 866 } 867 last_packet_revived_ = false; 868 last_size_ = packet.length(); 869 870 address_migrating_ = false; 871 872 if (peer_address_.address().empty()) { 873 peer_address_ = peer_address; 874 } 875 if (self_address_.address().empty()) { 876 self_address_ = self_address; 877 } 878 879 if (!(peer_address == peer_address_ && self_address == self_address_)) { 880 address_migrating_ = true; 881 } 882 883 stats_.bytes_received += packet.length(); 884 ++stats_.packets_received; 885 886 if (!framer_.ProcessPacket(packet)) { 887 // If we are unable to decrypt this packet, it might be 888 // because the CHLO or SHLO packet was lost. 889 if (encryption_level_ != ENCRYPTION_FORWARD_SECURE && 890 framer_.error() == QUIC_DECRYPTION_FAILURE && 891 undecryptable_packets_.size() < kMaxUndecryptablePackets) { 892 QueueUndecryptablePacket(packet); 893 } 894 DVLOG(1) << ENDPOINT << "Unable to process packet. Last packet processed: " 895 << last_header_.packet_sequence_number; 896 return; 897 } 898 MaybeProcessUndecryptablePackets(); 899 MaybeProcessRevivedPacket(); 900 } 901 902 bool QuicConnection::OnCanWrite() { 903 write_blocked_ = false; 904 return DoWrite(); 905 } 906 907 bool QuicConnection::WriteIfNotBlocked() { 908 if (write_blocked_) { 909 return false; 910 } 911 return DoWrite(); 912 } 913 914 bool QuicConnection::DoWrite() { 915 DCHECK(!write_blocked_); 916 WriteQueuedPackets(); 917 918 WritePendingRetransmissions(); 919 920 IsHandshake pending_handshake = visitor_->HasPendingHandshake() ? 921 IS_HANDSHAKE : NOT_HANDSHAKE; 922 // Sending queued packets may have caused the socket to become write blocked, 923 // or the congestion manager to prohibit sending. If we've sent everything 924 // we had queued and we're still not blocked, let the visitor know it can 925 // write more. 926 if (CanWrite(NOT_RETRANSMISSION, HAS_RETRANSMITTABLE_DATA, 927 pending_handshake)) { 928 // Set |include_ack| to false in bundler; ack inclusion happens elsewhere. 929 scoped_ptr<ScopedPacketBundler> bundler( 930 new ScopedPacketBundler(this, false)); 931 bool all_bytes_written = visitor_->OnCanWrite(); 932 bundler.reset(); 933 // After the visitor writes, it may have caused the socket to become write 934 // blocked or the congestion manager to prohibit sending, so check again. 935 pending_handshake = visitor_->HasPendingHandshake() ? IS_HANDSHAKE 936 : NOT_HANDSHAKE; 937 if (!all_bytes_written && !resume_writes_alarm_->IsSet() && 938 CanWrite(NOT_RETRANSMISSION, HAS_RETRANSMITTABLE_DATA, 939 pending_handshake)) { 940 // We're not write blocked, but some stream didn't write out all of its 941 // bytes. Register for 'immediate' resumption so we'll keep writing after 942 // other quic connections have had a chance to use the socket. 943 resume_writes_alarm_->Set(clock_->ApproximateNow()); 944 } 945 } 946 947 return !write_blocked_; 948 } 949 950 bool QuicConnection::ProcessValidatedPacket() { 951 if (address_migrating_) { 952 SendConnectionCloseWithDetails( 953 QUIC_ERROR_MIGRATING_ADDRESS, 954 "Address migration is not yet a supported feature"); 955 return false; 956 } 957 time_of_last_received_packet_ = clock_->Now(); 958 DVLOG(1) << ENDPOINT << "time of last received packet: " 959 << time_of_last_received_packet_.ToDebuggingValue(); 960 961 if (is_server_ && encryption_level_ == ENCRYPTION_NONE && 962 last_size_ > options()->max_packet_length) { 963 options()->max_packet_length = last_size_; 964 } 965 return true; 966 } 967 968 bool QuicConnection::WriteQueuedPackets() { 969 DCHECK(!write_blocked_); 970 971 if (pending_version_negotiation_packet_) { 972 SendVersionNegotiationPacket(); 973 } 974 975 QueuedPacketList::iterator packet_iterator = queued_packets_.begin(); 976 while (!write_blocked_ && packet_iterator != queued_packets_.end()) { 977 if (WritePacket(packet_iterator->encryption_level, 978 packet_iterator->sequence_number, 979 packet_iterator->packet, 980 packet_iterator->transmission_type, 981 packet_iterator->retransmittable, 982 packet_iterator->handshake, 983 packet_iterator->forced)) { 984 packet_iterator = queued_packets_.erase(packet_iterator); 985 } else { 986 // Continue, because some queued packets may still be writable. 987 // This can happen if a retransmit send fail. 988 ++packet_iterator; 989 } 990 } 991 992 return !write_blocked_; 993 } 994 995 void QuicConnection::WritePendingRetransmissions() { 996 // Keep writing as long as there's a pending retransmission which can be 997 // written. 998 while (sent_packet_manager_.HasPendingRetransmissions()) { 999 const QuicSentPacketManager::PendingRetransmission pending = 1000 sent_packet_manager_.NextPendingRetransmission(); 1001 if (HasForcedFrames(&pending.retransmittable_frames) == NO_FORCE && 1002 !CanWrite(pending.transmission_type, HAS_RETRANSMITTABLE_DATA, 1003 HasCryptoHandshake(&pending.retransmittable_frames))) { 1004 break; 1005 } 1006 1007 // Re-packetize the frames with a new sequence number for retransmission. 1008 // Retransmitted data packets do not use FEC, even when it's enabled. 1009 // Retransmitted packets use the same sequence number length as the 1010 // original. 1011 // Flush the packet creator before making a new packet. 1012 // TODO(ianswett): Implement ReserializeAllFrames as a separate path that 1013 // does not require the creator to be flushed. 1014 Flush(); 1015 SerializedPacket serialized_packet = packet_creator_.ReserializeAllFrames( 1016 pending.retransmittable_frames.frames(), 1017 pending.sequence_number_length); 1018 1019 DVLOG(1) << ENDPOINT << "Retransmitting " << pending.sequence_number 1020 << " as " << serialized_packet.sequence_number; 1021 if (debug_visitor_) { 1022 debug_visitor_->OnPacketRetransmitted( 1023 pending.sequence_number, serialized_packet.sequence_number); 1024 } 1025 sent_packet_manager_.OnRetransmittedPacket( 1026 pending.sequence_number, serialized_packet.sequence_number); 1027 1028 SendOrQueuePacket(pending.retransmittable_frames.encryption_level(), 1029 serialized_packet, 1030 pending.transmission_type); 1031 } 1032 } 1033 1034 void QuicConnection::RetransmitUnackedPackets( 1035 RetransmissionType retransmission_type) { 1036 sent_packet_manager_.RetransmitUnackedPackets(retransmission_type); 1037 1038 WriteIfNotBlocked(); 1039 } 1040 1041 bool QuicConnection::ShouldGeneratePacket( 1042 TransmissionType transmission_type, 1043 HasRetransmittableData retransmittable, 1044 IsHandshake handshake) { 1045 // We should serialize handshake packets immediately to ensure that they 1046 // end up sent at the right encryption level. 1047 if (handshake == IS_HANDSHAKE) { 1048 return true; 1049 } 1050 1051 return CanWrite(transmission_type, retransmittable, handshake); 1052 } 1053 1054 bool QuicConnection::CanWrite(TransmissionType transmission_type, 1055 HasRetransmittableData retransmittable, 1056 IsHandshake handshake) { 1057 if (write_blocked_) { 1058 return false; 1059 } 1060 1061 // TODO(rch): consider removing this check so that if an ACK comes in 1062 // before the alarm goes it, we might be able send out a packet. 1063 // This check assumes that if the send alarm is set, it applies equally to all 1064 // types of transmissions. 1065 if (send_alarm_->IsSet()) { 1066 DVLOG(1) << "Send alarm set. Not sending."; 1067 return false; 1068 } 1069 1070 QuicTime now = clock_->Now(); 1071 QuicTime::Delta delay = sent_packet_manager_.TimeUntilSend( 1072 now, transmission_type, retransmittable, handshake); 1073 if (delay.IsInfinite()) { 1074 return false; 1075 } 1076 1077 // If the scheduler requires a delay, then we can not send this packet now. 1078 if (!delay.IsZero()) { 1079 send_alarm_->Cancel(); 1080 send_alarm_->Set(now.Add(delay)); 1081 DVLOG(1) << "Delaying sending."; 1082 return false; 1083 } 1084 return true; 1085 } 1086 1087 void QuicConnection::SetupRetransmissionAlarm( 1088 QuicPacketSequenceNumber sequence_number) { 1089 if (!sent_packet_manager_.HasRetransmittableFrames(sequence_number)) { 1090 DVLOG(1) << ENDPOINT << "Will not retransmit packet " << sequence_number; 1091 return; 1092 } 1093 1094 // Do not set the retransmission alarm if we're already handling one, since 1095 // it will be reset when OnRetransmissionTimeout completes. 1096 if (retransmission_alarm_->IsSet()) { 1097 return; 1098 } 1099 1100 QuicTime::Delta retransmission_delay = 1101 sent_packet_manager_.GetRetransmissionDelay(); 1102 retransmission_alarm_->Set( 1103 clock_->ApproximateNow().Add(retransmission_delay)); 1104 } 1105 1106 bool QuicConnection::WritePacket(EncryptionLevel level, 1107 QuicPacketSequenceNumber sequence_number, 1108 QuicPacket* packet, 1109 TransmissionType transmission_type, 1110 HasRetransmittableData retransmittable, 1111 IsHandshake handshake, 1112 Force forced) { 1113 if (ShouldDiscardPacket(level, sequence_number, retransmittable)) { 1114 delete packet; 1115 return true; 1116 } 1117 1118 // If we're write blocked, we know we can't write. 1119 if (write_blocked_) { 1120 return false; 1121 } 1122 1123 // If we are not forced and we can't write, then simply return false; 1124 if (forced == NO_FORCE && 1125 !CanWrite(transmission_type, retransmittable, handshake)) { 1126 return false; 1127 } 1128 1129 // Some encryption algorithms require the packet sequence numbers not be 1130 // repeated. 1131 DCHECK_LE(sequence_number_of_last_inorder_packet_, sequence_number); 1132 // Only increase this when packets have not been queued. Once they're queued 1133 // due to a write block, there is the chance of sending forced and other 1134 // higher priority packets out of order. 1135 if (queued_packets_.empty()) { 1136 sequence_number_of_last_inorder_packet_ = sequence_number; 1137 } 1138 1139 scoped_ptr<QuicEncryptedPacket> encrypted( 1140 framer_.EncryptPacket(level, sequence_number, *packet)); 1141 if (encrypted.get() == NULL) { 1142 LOG(DFATAL) << ENDPOINT << "Failed to encrypt packet number " 1143 << sequence_number; 1144 CloseConnection(QUIC_ENCRYPTION_FAILURE, false); 1145 return false; 1146 } 1147 1148 // If it's the ConnectionClose packet, the only FORCED frame type, 1149 // clone a copy for resending later by the TimeWaitListManager. 1150 if (forced == FORCE) { 1151 DCHECK(connection_close_packet_.get() == NULL); 1152 connection_close_packet_.reset(encrypted->Clone()); 1153 } 1154 1155 if (encrypted->length() > options()->max_packet_length) { 1156 LOG(DFATAL) << "Writing an encrypted packet larger than max_packet_length:" 1157 << options()->max_packet_length << " encrypted length: " 1158 << encrypted->length(); 1159 } 1160 DVLOG(1) << ENDPOINT << "Sending packet number " << sequence_number 1161 << " : " << (packet->is_fec_packet() ? "FEC " : 1162 (retransmittable == HAS_RETRANSMITTABLE_DATA 1163 ? "data bearing " : " ack only ")) 1164 << ", encryption level: " 1165 << QuicUtils::EncryptionLevelToString(level) 1166 << ", length:" << packet->length() << ", encrypted length:" 1167 << encrypted->length(); 1168 DVLOG(2) << ENDPOINT << "packet(" << sequence_number << "): " << std::endl 1169 << QuicUtils::StringToHexASCIIDump(packet->AsStringPiece()); 1170 1171 DCHECK(encrypted->length() <= kMaxPacketSize) 1172 << "Packet " << sequence_number << " will not be read; too large: " 1173 << packet->length() << " " << encrypted->length() << " " 1174 << " forced: " << (forced == FORCE ? "yes" : "no"); 1175 1176 DCHECK(pending_write_.get() == NULL); 1177 pending_write_.reset(new PendingWrite(sequence_number, transmission_type, 1178 retransmittable, level, 1179 packet->is_fec_packet(), 1180 packet->length())); 1181 1182 WriteResult result = 1183 writer_->WritePacket(encrypted->data(), encrypted->length(), 1184 self_address().address(), peer_address(), this); 1185 if (result.error_code == ERR_IO_PENDING) { 1186 DCHECK_EQ(WRITE_STATUS_BLOCKED, result.status); 1187 } 1188 if (debug_visitor_) { 1189 // Pass the write result to the visitor. 1190 debug_visitor_->OnPacketSent(sequence_number, level, *encrypted, result); 1191 } 1192 if (result.status == WRITE_STATUS_BLOCKED) { 1193 // TODO(satyashekhar): It might be more efficient (fewer system calls), if 1194 // all connections share this variable i.e this becomes a part of 1195 // PacketWriterInterface. 1196 write_blocked_ = true; 1197 // If the socket buffers the the data, then the packet should not 1198 // be queued and sent again, which would result in an unnecessary 1199 // duplicate packet being sent. The helper must call OnPacketSent 1200 // when the packet is actually sent. 1201 if (writer_->IsWriteBlockedDataBuffered()) { 1202 delete packet; 1203 return true; 1204 } 1205 pending_write_.reset(); 1206 return false; 1207 } 1208 1209 if (OnPacketSent(result)) { 1210 delete packet; 1211 return true; 1212 } 1213 return false; 1214 } 1215 1216 bool QuicConnection::ShouldDiscardPacket( 1217 EncryptionLevel level, 1218 QuicPacketSequenceNumber sequence_number, 1219 HasRetransmittableData retransmittable) { 1220 if (!connected_) { 1221 DVLOG(1) << ENDPOINT 1222 << "Not sending packet as connection is disconnected."; 1223 return true; 1224 } 1225 1226 if (encryption_level_ == ENCRYPTION_FORWARD_SECURE && 1227 level == ENCRYPTION_NONE) { 1228 // Drop packets that are NULL encrypted since the peer won't accept them 1229 // anymore. 1230 DVLOG(1) << ENDPOINT << "Dropping packet: " << sequence_number 1231 << " since the packet is NULL encrypted."; 1232 sent_packet_manager_.DiscardUnackedPacket(sequence_number); 1233 return true; 1234 } 1235 1236 if (retransmittable == HAS_RETRANSMITTABLE_DATA) { 1237 if (!sent_packet_manager_.IsUnacked(sequence_number)) { 1238 // This is a crazy edge case, but if we retransmit a packet, 1239 // (but have to queue it for some reason) then receive an ack 1240 // for the previous transmission (but not the retransmission) 1241 // then receive a truncated ACK which causes us to raise the 1242 // high water mark, all before we're able to send the packet 1243 // then we can simply drop it. 1244 DVLOG(1) << ENDPOINT << "Dropping packet: " << sequence_number 1245 << " since it has already been acked."; 1246 return true; 1247 } 1248 1249 if (sent_packet_manager_.IsPreviousTransmission(sequence_number)) { 1250 // If somehow we have already retransmitted this packet *before* 1251 // we actually send it for the first time (I think this is probably 1252 // impossible in the real world), then don't bother sending it. 1253 // We don't want to call DiscardUnackedPacket because in this case 1254 // the peer has not yet ACK'd the data. We need the subsequent 1255 // retransmission to be sent. 1256 DVLOG(1) << ENDPOINT << "Dropping packet: " << sequence_number 1257 << " since it has already been retransmitted."; 1258 return true; 1259 } 1260 1261 if (!sent_packet_manager_.HasRetransmittableFrames(sequence_number)) { 1262 DVLOG(1) << ENDPOINT << "Dropping packet: " << sequence_number 1263 << " since a previous transmission has been acked."; 1264 sent_packet_manager_.DiscardUnackedPacket(sequence_number); 1265 return true; 1266 } 1267 } 1268 1269 return false; 1270 } 1271 1272 bool QuicConnection::OnPacketSent(WriteResult result) { 1273 DCHECK_NE(WRITE_STATUS_BLOCKED, result.status); 1274 if (pending_write_.get() == NULL) { 1275 LOG(DFATAL) << "OnPacketSent called without a pending write."; 1276 return false; 1277 } 1278 1279 QuicPacketSequenceNumber sequence_number = pending_write_->sequence_number; 1280 TransmissionType transmission_type = pending_write_->transmission_type; 1281 HasRetransmittableData retransmittable = pending_write_->retransmittable; 1282 bool is_fec_packet = pending_write_->is_fec_packet; 1283 size_t length = pending_write_->length; 1284 pending_write_.reset(); 1285 1286 if (result.status == WRITE_STATUS_ERROR) { 1287 DVLOG(1) << "Write failed with error code: " << result.error_code; 1288 // We can't send an error as the socket is presumably borked. 1289 CloseConnection(QUIC_PACKET_WRITE_ERROR, false); 1290 return false; 1291 } 1292 1293 QuicTime now = clock_->Now(); 1294 if (transmission_type == NOT_RETRANSMISSION) { 1295 time_of_last_sent_packet_ = now; 1296 } 1297 DVLOG(1) << ENDPOINT << "time of last sent packet: " 1298 << now.ToDebuggingValue(); 1299 1300 // Set the retransmit alarm only when we have sent the packet to the client 1301 // and not when it goes to the pending queue, otherwise we will end up adding 1302 // an entry to retransmission_timeout_ every time we attempt a write. 1303 if (retransmittable == HAS_RETRANSMITTABLE_DATA || is_fec_packet) { 1304 SetupRetransmissionAlarm(sequence_number); 1305 } 1306 1307 // TODO(ianswett): Change the sequence number length and other packet creator 1308 // options by a more explicit API than setting a struct value directly. 1309 packet_creator_.UpdateSequenceNumberLength( 1310 received_packet_manager_.least_packet_awaited_by_peer(), 1311 sent_packet_manager_.BandwidthEstimate().ToBytesPerPeriod( 1312 sent_packet_manager_.SmoothedRtt())); 1313 1314 sent_packet_manager_.OnPacketSent(sequence_number, now, length, 1315 transmission_type, retransmittable); 1316 1317 stats_.bytes_sent += result.bytes_written; 1318 ++stats_.packets_sent; 1319 1320 if (transmission_type == NACK_RETRANSMISSION || 1321 transmission_type == RTO_RETRANSMISSION) { 1322 stats_.bytes_retransmitted += result.bytes_written; 1323 ++stats_.packets_retransmitted; 1324 } 1325 1326 return true; 1327 } 1328 1329 bool QuicConnection::OnSerializedPacket( 1330 const SerializedPacket& serialized_packet) { 1331 if (serialized_packet.retransmittable_frames) { 1332 serialized_packet.retransmittable_frames-> 1333 set_encryption_level(encryption_level_); 1334 } 1335 sent_packet_manager_.OnSerializedPacket(serialized_packet); 1336 // The TransmissionType is NOT_RETRANSMISSION because all retransmissions 1337 // serialize packets and invoke SendOrQueuePacket directly. 1338 return SendOrQueuePacket(encryption_level_, 1339 serialized_packet, 1340 NOT_RETRANSMISSION); 1341 } 1342 1343 QuicPacketSequenceNumber QuicConnection::GetNextPacketSequenceNumber() { 1344 return packet_creator_.sequence_number() + 1; 1345 } 1346 1347 bool QuicConnection::SendOrQueuePacket(EncryptionLevel level, 1348 const SerializedPacket& packet, 1349 TransmissionType transmission_type) { 1350 IsHandshake handshake = HasCryptoHandshake(packet.retransmittable_frames); 1351 Force forced = HasForcedFrames(packet.retransmittable_frames); 1352 HasRetransmittableData retransmittable = 1353 (transmission_type != NOT_RETRANSMISSION || 1354 packet.retransmittable_frames != NULL) ? 1355 HAS_RETRANSMITTABLE_DATA : NO_RETRANSMITTABLE_DATA; 1356 sent_entropy_manager_.RecordPacketEntropyHash(packet.sequence_number, 1357 packet.entropy_hash); 1358 if (WritePacket(level, packet.sequence_number, packet.packet, 1359 transmission_type, retransmittable, handshake, forced)) { 1360 return true; 1361 } 1362 queued_packets_.push_back(QueuedPacket(packet.sequence_number, packet.packet, 1363 level, transmission_type, 1364 retransmittable, handshake, forced)); 1365 return false; 1366 } 1367 1368 void QuicConnection::UpdateSentPacketInfo(SentPacketInfo* sent_info) { 1369 sent_info->least_unacked = sent_packet_manager_.GetLeastUnackedSentPacket(); 1370 sent_info->entropy_hash = sent_entropy_manager_.EntropyHash( 1371 sent_info->least_unacked - 1); 1372 } 1373 1374 void QuicConnection::SendAck() { 1375 ack_alarm_->Cancel(); 1376 // TODO(rch): delay this until the CreateFeedbackFrame 1377 // method is invoked. This requires changes SetShouldSendAck 1378 // to be a no-arg method, and re-jiggering its implementation. 1379 bool send_feedback = false; 1380 if (received_packet_manager_.GenerateCongestionFeedback( 1381 &outgoing_congestion_feedback_)) { 1382 DVLOG(1) << ENDPOINT << "Sending feedback: " 1383 << outgoing_congestion_feedback_; 1384 send_feedback = true; 1385 } 1386 1387 packet_generator_.SetShouldSendAck(send_feedback); 1388 } 1389 1390 void QuicConnection::OnRetransmissionTimeout() { 1391 if (!sent_packet_manager_.HasUnackedPackets()) { 1392 return; 1393 } 1394 1395 ++stats_.rto_count; 1396 1397 sent_packet_manager_.OnRetransmissionTimeout(); 1398 1399 WriteIfNotBlocked(); 1400 1401 // Ensure the retransmission alarm is always set if there are unacked packets. 1402 if (sent_packet_manager_.HasUnackedPackets() && !HasQueuedData() && 1403 !retransmission_alarm_->IsSet()) { 1404 QuicTime rto_timeout = clock_->ApproximateNow().Add( 1405 sent_packet_manager_.GetRetransmissionDelay()); 1406 retransmission_alarm_->Set(rto_timeout); 1407 } 1408 } 1409 1410 void QuicConnection::SetEncrypter(EncryptionLevel level, 1411 QuicEncrypter* encrypter) { 1412 framer_.SetEncrypter(level, encrypter); 1413 } 1414 1415 const QuicEncrypter* QuicConnection::encrypter(EncryptionLevel level) const { 1416 return framer_.encrypter(level); 1417 } 1418 1419 void QuicConnection::SetDefaultEncryptionLevel( 1420 EncryptionLevel level) { 1421 encryption_level_ = level; 1422 } 1423 1424 void QuicConnection::SetDecrypter(QuicDecrypter* decrypter) { 1425 framer_.SetDecrypter(decrypter); 1426 } 1427 1428 void QuicConnection::SetAlternativeDecrypter(QuicDecrypter* decrypter, 1429 bool latch_once_used) { 1430 framer_.SetAlternativeDecrypter(decrypter, latch_once_used); 1431 } 1432 1433 const QuicDecrypter* QuicConnection::decrypter() const { 1434 return framer_.decrypter(); 1435 } 1436 1437 const QuicDecrypter* QuicConnection::alternative_decrypter() const { 1438 return framer_.alternative_decrypter(); 1439 } 1440 1441 void QuicConnection::QueueUndecryptablePacket( 1442 const QuicEncryptedPacket& packet) { 1443 DVLOG(1) << ENDPOINT << "Queueing undecryptable packet."; 1444 undecryptable_packets_.push_back(packet.Clone()); 1445 } 1446 1447 void QuicConnection::MaybeProcessUndecryptablePackets() { 1448 if (undecryptable_packets_.empty() || 1449 encryption_level_ == ENCRYPTION_NONE) { 1450 return; 1451 } 1452 1453 while (connected_ && !undecryptable_packets_.empty()) { 1454 DVLOG(1) << ENDPOINT << "Attempting to process undecryptable packet"; 1455 QuicEncryptedPacket* packet = undecryptable_packets_.front(); 1456 if (!framer_.ProcessPacket(*packet) && 1457 framer_.error() == QUIC_DECRYPTION_FAILURE) { 1458 DVLOG(1) << ENDPOINT << "Unable to process undecryptable packet..."; 1459 break; 1460 } 1461 DVLOG(1) << ENDPOINT << "Processed undecryptable packet!"; 1462 delete packet; 1463 undecryptable_packets_.pop_front(); 1464 } 1465 1466 // Once forward secure encryption is in use, there will be no 1467 // new keys installed and hence any undecryptable packets will 1468 // never be able to be decrypted. 1469 if (encryption_level_ == ENCRYPTION_FORWARD_SECURE) { 1470 STLDeleteElements(&undecryptable_packets_); 1471 } 1472 } 1473 1474 void QuicConnection::MaybeProcessRevivedPacket() { 1475 QuicFecGroup* group = GetFecGroup(); 1476 if (!connected_ || group == NULL || !group->CanRevive()) { 1477 return; 1478 } 1479 QuicPacketHeader revived_header; 1480 char revived_payload[kMaxPacketSize]; 1481 size_t len = group->Revive(&revived_header, revived_payload, kMaxPacketSize); 1482 revived_header.public_header.guid = guid_; 1483 revived_header.public_header.version_flag = false; 1484 revived_header.public_header.reset_flag = false; 1485 revived_header.fec_flag = false; 1486 revived_header.is_in_fec_group = NOT_IN_FEC_GROUP; 1487 revived_header.fec_group = 0; 1488 group_map_.erase(last_header_.fec_group); 1489 delete group; 1490 1491 last_packet_revived_ = true; 1492 if (debug_visitor_) { 1493 debug_visitor_->OnRevivedPacket(revived_header, 1494 StringPiece(revived_payload, len)); 1495 } 1496 1497 ++stats_.packets_revived; 1498 framer_.ProcessRevivedPacket(&revived_header, 1499 StringPiece(revived_payload, len)); 1500 } 1501 1502 QuicFecGroup* QuicConnection::GetFecGroup() { 1503 QuicFecGroupNumber fec_group_num = last_header_.fec_group; 1504 if (fec_group_num == 0) { 1505 return NULL; 1506 } 1507 if (group_map_.count(fec_group_num) == 0) { 1508 if (group_map_.size() >= kMaxFecGroups) { // Too many groups 1509 if (fec_group_num < group_map_.begin()->first) { 1510 // The group being requested is a group we've seen before and deleted. 1511 // Don't recreate it. 1512 return NULL; 1513 } 1514 // Clear the lowest group number. 1515 delete group_map_.begin()->second; 1516 group_map_.erase(group_map_.begin()); 1517 } 1518 group_map_[fec_group_num] = new QuicFecGroup(); 1519 } 1520 return group_map_[fec_group_num]; 1521 } 1522 1523 void QuicConnection::SendConnectionClose(QuicErrorCode error) { 1524 SendConnectionCloseWithDetails(error, string()); 1525 } 1526 1527 void QuicConnection::SendConnectionCloseWithDetails(QuicErrorCode error, 1528 const string& details) { 1529 if (!write_blocked_) { 1530 SendConnectionClosePacket(error, details); 1531 } 1532 CloseConnection(error, false); 1533 } 1534 1535 void QuicConnection::SendConnectionClosePacket(QuicErrorCode error, 1536 const string& details) { 1537 DVLOG(1) << ENDPOINT << "Force closing " << guid() << " with error " 1538 << QuicUtils::ErrorToString(error) << " (" << error << ") " 1539 << details; 1540 ScopedPacketBundler ack_bundler(this, true); 1541 QuicConnectionCloseFrame* frame = new QuicConnectionCloseFrame(); 1542 frame->error_code = error; 1543 frame->error_details = details; 1544 packet_generator_.AddControlFrame(QuicFrame(frame)); 1545 Flush(); 1546 } 1547 1548 void QuicConnection::CloseConnection(QuicErrorCode error, bool from_peer) { 1549 DCHECK(connected_); 1550 connected_ = false; 1551 visitor_->OnConnectionClosed(error, from_peer); 1552 // Cancel the alarms so they don't trigger any action now that the 1553 // connection is closed. 1554 ack_alarm_->Cancel(); 1555 resume_writes_alarm_->Cancel(); 1556 retransmission_alarm_->Cancel(); 1557 send_alarm_->Cancel(); 1558 timeout_alarm_->Cancel(); 1559 } 1560 1561 void QuicConnection::SendGoAway(QuicErrorCode error, 1562 QuicStreamId last_good_stream_id, 1563 const string& reason) { 1564 DVLOG(1) << ENDPOINT << "Going away with error " 1565 << QuicUtils::ErrorToString(error) 1566 << " (" << error << ")"; 1567 1568 // Opportunistically bundle an ack with this outgoing packet. 1569 ScopedPacketBundler ack_bundler(this, true); 1570 packet_generator_.AddControlFrame( 1571 QuicFrame(new QuicGoAwayFrame(error, last_good_stream_id, reason))); 1572 } 1573 1574 void QuicConnection::CloseFecGroupsBefore( 1575 QuicPacketSequenceNumber sequence_number) { 1576 FecGroupMap::iterator it = group_map_.begin(); 1577 while (it != group_map_.end()) { 1578 // If this is the current group or the group doesn't protect this packet 1579 // we can ignore it. 1580 if (last_header_.fec_group == it->first || 1581 !it->second->ProtectsPacketsBefore(sequence_number)) { 1582 ++it; 1583 continue; 1584 } 1585 QuicFecGroup* fec_group = it->second; 1586 DCHECK(!fec_group->CanRevive()); 1587 FecGroupMap::iterator next = it; 1588 ++next; 1589 group_map_.erase(it); 1590 delete fec_group; 1591 it = next; 1592 } 1593 } 1594 1595 void QuicConnection::Flush() { 1596 packet_generator_.FlushAllQueuedFrames(); 1597 } 1598 1599 bool QuicConnection::HasQueuedData() const { 1600 return pending_version_negotiation_packet_ || 1601 !queued_packets_.empty() || packet_generator_.HasQueuedFrames(); 1602 } 1603 1604 void QuicConnection::SetIdleNetworkTimeout(QuicTime::Delta timeout) { 1605 if (timeout < idle_network_timeout_) { 1606 idle_network_timeout_ = timeout; 1607 CheckForTimeout(); 1608 } else { 1609 idle_network_timeout_ = timeout; 1610 } 1611 } 1612 1613 void QuicConnection::SetOverallConnectionTimeout(QuicTime::Delta timeout) { 1614 if (timeout < overall_connection_timeout_) { 1615 overall_connection_timeout_ = timeout; 1616 CheckForTimeout(); 1617 } else { 1618 overall_connection_timeout_ = timeout; 1619 } 1620 } 1621 1622 bool QuicConnection::CheckForTimeout() { 1623 QuicTime now = clock_->ApproximateNow(); 1624 QuicTime time_of_last_packet = std::max(time_of_last_received_packet_, 1625 time_of_last_sent_packet_); 1626 1627 // |delta| can be < 0 as |now| is approximate time but |time_of_last_packet| 1628 // is accurate time. However, this should not change the behavior of 1629 // timeout handling. 1630 QuicTime::Delta delta = now.Subtract(time_of_last_packet); 1631 DVLOG(1) << ENDPOINT << "last packet " 1632 << time_of_last_packet.ToDebuggingValue() 1633 << " now:" << now.ToDebuggingValue() 1634 << " delta:" << delta.ToMicroseconds() 1635 << " network_timeout: " << idle_network_timeout_.ToMicroseconds(); 1636 if (delta >= idle_network_timeout_) { 1637 DVLOG(1) << ENDPOINT << "Connection timedout due to no network activity."; 1638 SendConnectionClose(QUIC_CONNECTION_TIMED_OUT); 1639 return true; 1640 } 1641 1642 // Next timeout delta. 1643 QuicTime::Delta timeout = idle_network_timeout_.Subtract(delta); 1644 1645 if (!overall_connection_timeout_.IsInfinite()) { 1646 QuicTime::Delta connected_time = now.Subtract(creation_time_); 1647 DVLOG(1) << ENDPOINT << "connection time: " 1648 << connected_time.ToMilliseconds() << " overall timeout: " 1649 << overall_connection_timeout_.ToMilliseconds(); 1650 if (connected_time >= overall_connection_timeout_) { 1651 DVLOG(1) << ENDPOINT << 1652 "Connection timedout due to overall connection timeout."; 1653 SendConnectionClose(QUIC_CONNECTION_TIMED_OUT); 1654 return true; 1655 } 1656 1657 // Take the min timeout. 1658 QuicTime::Delta connection_timeout = 1659 overall_connection_timeout_.Subtract(connected_time); 1660 if (connection_timeout < timeout) { 1661 timeout = connection_timeout; 1662 } 1663 } 1664 1665 timeout_alarm_->Cancel(); 1666 timeout_alarm_->Set(clock_->ApproximateNow().Add(timeout)); 1667 return false; 1668 } 1669 1670 QuicConnection::ScopedPacketBundler::ScopedPacketBundler( 1671 QuicConnection* connection, 1672 bool include_ack) 1673 : connection_(connection), 1674 already_in_batch_mode_(connection->packet_generator_.InBatchMode()) { 1675 // Move generator into batch mode. If caller wants us to include an ack, 1676 // check the delayed-ack timer to see if there's ack info to be sent. 1677 if (!already_in_batch_mode_) { 1678 DVLOG(1) << "Entering Batch Mode."; 1679 connection_->packet_generator_.StartBatchOperations(); 1680 } 1681 if (include_ack && connection_->ack_alarm_->IsSet()) { 1682 DVLOG(1) << "Bundling ack with outgoing packet."; 1683 connection_->SendAck(); 1684 } 1685 } 1686 1687 QuicConnection::ScopedPacketBundler::~ScopedPacketBundler() { 1688 // If we changed the generator's batch state, restore original batch state. 1689 if (!already_in_batch_mode_) { 1690 DVLOG(1) << "Leaving Batch Mode."; 1691 connection_->packet_generator_.FinishBatchOperations(); 1692 } 1693 DCHECK_EQ(already_in_batch_mode_, 1694 connection_->packet_generator_.InBatchMode()); 1695 } 1696 1697 } // namespace net 1698