Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/tools/quic/quic_dispatcher.h"
      6 
      7 #include <errno.h>
      8 
      9 #include "base/debug/stack_trace.h"
     10 #include "base/logging.h"
     11 #include "base/stl_util.h"
     12 #include "net/quic/quic_blocked_writer_interface.h"
     13 #include "net/quic/quic_flags.h"
     14 #include "net/quic/quic_utils.h"
     15 #include "net/tools/epoll_server/epoll_server.h"
     16 #include "net/tools/quic/quic_default_packet_writer.h"
     17 #include "net/tools/quic/quic_epoll_connection_helper.h"
     18 #include "net/tools/quic/quic_socket_utils.h"
     19 #include "net/tools/quic/quic_time_wait_list_manager.h"
     20 
     21 namespace net {
     22 
     23 namespace tools {
     24 
     25 using base::StringPiece;
     26 using std::make_pair;
     27 
     28 class DeleteSessionsAlarm : public EpollAlarm {
     29  public:
     30   explicit DeleteSessionsAlarm(QuicDispatcher* dispatcher)
     31       : dispatcher_(dispatcher) {
     32   }
     33 
     34   virtual int64 OnAlarm() OVERRIDE {
     35     EpollAlarm::OnAlarm();
     36     dispatcher_->DeleteSessions();
     37     return 0;
     38   }
     39 
     40  private:
     41   QuicDispatcher* dispatcher_;
     42 };
     43 
     44 class QuicDispatcher::QuicFramerVisitor : public QuicFramerVisitorInterface {
     45  public:
     46   explicit QuicFramerVisitor(QuicDispatcher* dispatcher)
     47       : dispatcher_(dispatcher),
     48         connection_id_(0) {}
     49 
     50   // QuicFramerVisitorInterface implementation
     51   virtual void OnPacket() OVERRIDE {}
     52   virtual bool OnUnauthenticatedPublicHeader(
     53       const QuicPacketPublicHeader& header) OVERRIDE {
     54     connection_id_ = header.connection_id;
     55     return dispatcher_->OnUnauthenticatedPublicHeader(header);
     56   }
     57   virtual bool OnUnauthenticatedHeader(
     58       const QuicPacketHeader& header) OVERRIDE {
     59     dispatcher_->OnUnauthenticatedHeader(header);
     60     return false;
     61   }
     62   virtual void OnError(QuicFramer* framer) OVERRIDE {
     63     DVLOG(1) << QuicUtils::ErrorToString(framer->error());
     64   }
     65 
     66   virtual bool OnProtocolVersionMismatch(
     67       QuicVersion /*received_version*/) OVERRIDE {
     68     if (dispatcher_->time_wait_list_manager()->IsConnectionIdInTimeWait(
     69             connection_id_)) {
     70       // Keep processing after protocol mismatch - this will be dealt with by
     71       // the TimeWaitListManager.
     72       return true;
     73     } else {
     74       DLOG(DFATAL) << "Version mismatch, connection ID (" << connection_id_
     75                    << ") not in time wait list.";
     76       return false;
     77     }
     78   }
     79 
     80   // The following methods should never get called because we always return
     81   // false from OnUnauthenticatedHeader().  As a result, we never process the
     82   // payload of the packet.
     83   virtual void OnPublicResetPacket(
     84       const QuicPublicResetPacket& /*packet*/) OVERRIDE {
     85     DCHECK(false);
     86   }
     87   virtual void OnVersionNegotiationPacket(
     88       const QuicVersionNegotiationPacket& /*packet*/) OVERRIDE {
     89     DCHECK(false);
     90   }
     91   virtual void OnDecryptedPacket(EncryptionLevel level) OVERRIDE {
     92     DCHECK(false);
     93   }
     94   virtual bool OnPacketHeader(const QuicPacketHeader& /*header*/) OVERRIDE {
     95     DCHECK(false);
     96     return false;
     97   }
     98   virtual void OnRevivedPacket() OVERRIDE {
     99     DCHECK(false);
    100   }
    101   virtual void OnFecProtectedPayload(StringPiece /*payload*/) OVERRIDE {
    102     DCHECK(false);
    103   }
    104   virtual bool OnStreamFrame(const QuicStreamFrame& /*frame*/) OVERRIDE {
    105     DCHECK(false);
    106     return false;
    107   }
    108   virtual bool OnAckFrame(const QuicAckFrame& /*frame*/) OVERRIDE {
    109     DCHECK(false);
    110     return false;
    111   }
    112   virtual bool OnCongestionFeedbackFrame(
    113       const QuicCongestionFeedbackFrame& /*frame*/) OVERRIDE {
    114     DCHECK(false);
    115     return false;
    116   }
    117   virtual bool OnStopWaitingFrame(
    118       const QuicStopWaitingFrame& /*frame*/) OVERRIDE {
    119     DCHECK(false);
    120     return false;
    121   }
    122   virtual bool OnPingFrame(const QuicPingFrame& /*frame*/) OVERRIDE {
    123     DCHECK(false);
    124     return false;
    125   }
    126   virtual bool OnRstStreamFrame(const QuicRstStreamFrame& /*frame*/) OVERRIDE {
    127     DCHECK(false);
    128     return false;
    129   }
    130   virtual bool OnConnectionCloseFrame(
    131       const QuicConnectionCloseFrame & /*frame*/) OVERRIDE {
    132     DCHECK(false);
    133     return false;
    134   }
    135   virtual bool OnGoAwayFrame(const QuicGoAwayFrame& /*frame*/) OVERRIDE {
    136     DCHECK(false);
    137     return false;
    138   }
    139   virtual bool OnWindowUpdateFrame(const QuicWindowUpdateFrame& /*frame*/)
    140       OVERRIDE {
    141     DCHECK(false);
    142     return false;
    143   }
    144   virtual bool OnBlockedFrame(const QuicBlockedFrame& frame) OVERRIDE {
    145     DCHECK(false);
    146     return false;
    147   }
    148   virtual void OnFecData(const QuicFecData& /*fec*/) OVERRIDE {
    149     DCHECK(false);
    150   }
    151   virtual void OnPacketComplete() OVERRIDE {
    152     DCHECK(false);
    153   }
    154 
    155  private:
    156   QuicDispatcher* dispatcher_;
    157 
    158   // Latched in OnUnauthenticatedPublicHeader for use later.
    159   QuicConnectionId connection_id_;
    160 };
    161 
    162 QuicDispatcher::QuicDispatcher(const QuicConfig& config,
    163                                const QuicCryptoServerConfig& crypto_config,
    164                                const QuicVersionVector& supported_versions,
    165                                EpollServer* epoll_server)
    166     : config_(config),
    167       crypto_config_(crypto_config),
    168       delete_sessions_alarm_(new DeleteSessionsAlarm(this)),
    169       epoll_server_(epoll_server),
    170       helper_(new QuicEpollConnectionHelper(epoll_server_)),
    171       supported_versions_(supported_versions),
    172       supported_versions_no_flow_control_(supported_versions),
    173       supported_versions_no_connection_flow_control_(supported_versions),
    174       current_packet_(NULL),
    175       framer_(supported_versions, /*unused*/ QuicTime::Zero(), true),
    176       framer_visitor_(new QuicFramerVisitor(this)) {
    177   framer_.set_visitor(framer_visitor_.get());
    178 }
    179 
    180 QuicDispatcher::~QuicDispatcher() {
    181   STLDeleteValues(&session_map_);
    182   STLDeleteElements(&closed_session_list_);
    183 }
    184 
    185 void QuicDispatcher::Initialize(int fd) {
    186   DCHECK(writer_ == NULL);
    187   writer_.reset(CreateWriter(fd));
    188   time_wait_list_manager_.reset(CreateQuicTimeWaitListManager());
    189 
    190   // Remove all versions > QUIC_VERSION_16 from the
    191   // supported_versions_no_flow_control_ vector.
    192   QuicVersionVector::iterator it =
    193       find(supported_versions_no_flow_control_.begin(),
    194            supported_versions_no_flow_control_.end(), QUIC_VERSION_17);
    195   if (it != supported_versions_no_flow_control_.end()) {
    196     supported_versions_no_flow_control_.erase(
    197         supported_versions_no_flow_control_.begin(), it + 1);
    198   }
    199   CHECK(!supported_versions_no_flow_control_.empty());
    200 
    201   // Remove all versions > QUIC_VERSION_18 from the
    202   // supported_versions_no_connection_flow_control_ vector.
    203   QuicVersionVector::iterator connection_it = find(
    204       supported_versions_no_connection_flow_control_.begin(),
    205       supported_versions_no_connection_flow_control_.end(), QUIC_VERSION_19);
    206   if (connection_it != supported_versions_no_connection_flow_control_.end()) {
    207     supported_versions_no_connection_flow_control_.erase(
    208         supported_versions_no_connection_flow_control_.begin(),
    209         connection_it + 1);
    210   }
    211   CHECK(!supported_versions_no_connection_flow_control_.empty());
    212 }
    213 
    214 void QuicDispatcher::ProcessPacket(const IPEndPoint& server_address,
    215                                    const IPEndPoint& client_address,
    216                                    const QuicEncryptedPacket& packet) {
    217   current_server_address_ = server_address;
    218   current_client_address_ = client_address;
    219   current_packet_ = &packet;
    220   // ProcessPacket will cause the packet to be dispatched in
    221   // OnUnauthenticatedPublicHeader, or sent to the time wait list manager
    222   // in OnAuthenticatedHeader.
    223   framer_.ProcessPacket(packet);
    224   // TODO(rjshade): Return a status describing if/why a packet was dropped,
    225   //                and log somehow.  Maybe expose as a varz.
    226 }
    227 
    228 bool QuicDispatcher::OnUnauthenticatedPublicHeader(
    229     const QuicPacketPublicHeader& header) {
    230   QuicSession* session = NULL;
    231 
    232   QuicConnectionId connection_id = header.connection_id;
    233   SessionMap::iterator it = session_map_.find(connection_id);
    234   if (it == session_map_.end()) {
    235     if (header.reset_flag) {
    236       return false;
    237     }
    238     if (time_wait_list_manager_->IsConnectionIdInTimeWait(connection_id)) {
    239       return HandlePacketForTimeWait(header);
    240     }
    241 
    242     // Ensure the packet has a version negotiation bit set before creating a new
    243     // session for it.  All initial packets for a new connection are required to
    244     // have the flag set.  Otherwise it may be a stray packet.
    245     if (header.version_flag) {
    246       session = CreateQuicSession(connection_id, current_server_address_,
    247                                   current_client_address_);
    248     }
    249 
    250     if (session == NULL) {
    251       DVLOG(1) << "Failed to create session for " << connection_id;
    252       // Add this connection_id fo the time-wait state, to safely reject future
    253       // packets.
    254 
    255       if (header.version_flag &&
    256           !framer_.IsSupportedVersion(header.versions.front())) {
    257         // TODO(ianswett): Produce a no-version version negotiation packet.
    258         return false;
    259       }
    260 
    261       // Use the version in the packet if possible, otherwise assume the latest.
    262       QuicVersion version = header.version_flag ? header.versions.front() :
    263           supported_versions_.front();
    264       time_wait_list_manager_->AddConnectionIdToTimeWait(
    265           connection_id, version, NULL);
    266       DCHECK(time_wait_list_manager_->IsConnectionIdInTimeWait(connection_id));
    267       return HandlePacketForTimeWait(header);
    268     }
    269     DVLOG(1) << "Created new session for " << connection_id;
    270     session_map_.insert(make_pair(connection_id, session));
    271   } else {
    272     session = it->second;
    273   }
    274 
    275   session->connection()->ProcessUdpPacket(
    276       current_server_address_, current_client_address_, *current_packet_);
    277 
    278   // Do not parse the packet further.  The session will process it completely.
    279   return false;
    280 }
    281 
    282 void QuicDispatcher::OnUnauthenticatedHeader(const QuicPacketHeader& header) {
    283   DCHECK(time_wait_list_manager_->IsConnectionIdInTimeWait(
    284       header.public_header.connection_id));
    285   time_wait_list_manager_->ProcessPacket(current_server_address_,
    286                                          current_client_address_,
    287                                          header.public_header.connection_id,
    288                                          header.packet_sequence_number,
    289                                          *current_packet_);
    290 }
    291 
    292 void QuicDispatcher::CleanUpSession(SessionMap::iterator it) {
    293   QuicConnection* connection = it->second->connection();
    294   QuicEncryptedPacket* connection_close_packet =
    295       connection->ReleaseConnectionClosePacket();
    296   write_blocked_list_.erase(connection);
    297   time_wait_list_manager_->AddConnectionIdToTimeWait(it->first,
    298                                                      connection->version(),
    299                                                      connection_close_packet);
    300   session_map_.erase(it);
    301 }
    302 
    303 void QuicDispatcher::DeleteSessions() {
    304   STLDeleteElements(&closed_session_list_);
    305 }
    306 
    307 void QuicDispatcher::OnCanWrite() {
    308   // We got an EPOLLOUT: the socket should not be blocked.
    309   writer_->SetWritable();
    310 
    311   // Give each writer one attempt to write.
    312   int num_writers = write_blocked_list_.size();
    313   for (int i = 0; i < num_writers; ++i) {
    314     if (write_blocked_list_.empty()) {
    315       return;
    316     }
    317     QuicBlockedWriterInterface* blocked_writer =
    318         write_blocked_list_.begin()->first;
    319     write_blocked_list_.erase(write_blocked_list_.begin());
    320     blocked_writer->OnCanWrite();
    321     if (writer_->IsWriteBlocked()) {
    322       // We were unable to write.  Wait for the next EPOLLOUT. The writer is
    323       // responsible for adding itself to the blocked list via OnWriteBlocked().
    324       return;
    325     }
    326   }
    327 }
    328 
    329 bool QuicDispatcher::HasPendingWrites() const {
    330   return !write_blocked_list_.empty();
    331 }
    332 
    333 void QuicDispatcher::Shutdown() {
    334   while (!session_map_.empty()) {
    335     QuicSession* session = session_map_.begin()->second;
    336     session->connection()->SendConnectionClose(QUIC_PEER_GOING_AWAY);
    337     // Validate that the session removes itself from the session map on close.
    338     DCHECK(session_map_.empty() || session_map_.begin()->second != session);
    339   }
    340   DeleteSessions();
    341 }
    342 
    343 void QuicDispatcher::OnConnectionClosed(QuicConnectionId connection_id,
    344                                         QuicErrorCode error) {
    345   SessionMap::iterator it = session_map_.find(connection_id);
    346   if (it == session_map_.end()) {
    347     LOG(DFATAL) << "ConnectionId " << connection_id
    348                 << " does not exist in the session map.  "
    349                 << "Error: " << QuicUtils::ErrorToString(error);
    350     LOG(DFATAL) << base::debug::StackTrace().ToString();
    351     return;
    352   }
    353 
    354   DLOG_IF(INFO, error != QUIC_NO_ERROR) << "Closing connection ("
    355                                         << connection_id
    356                                         << ") due to error: "
    357                                         << QuicUtils::ErrorToString(error);
    358 
    359   if (closed_session_list_.empty()) {
    360     epoll_server_->RegisterAlarmApproximateDelta(
    361         0, delete_sessions_alarm_.get());
    362   }
    363   closed_session_list_.push_back(it->second);
    364   CleanUpSession(it);
    365 }
    366 
    367 void QuicDispatcher::OnWriteBlocked(QuicBlockedWriterInterface* writer) {
    368   DCHECK(writer_->IsWriteBlocked());
    369   write_blocked_list_.insert(make_pair(writer, true));
    370 }
    371 
    372 QuicPacketWriter* QuicDispatcher::CreateWriter(int fd) {
    373   return new QuicDefaultPacketWriter(fd);
    374 }
    375 
    376 QuicSession* QuicDispatcher::CreateQuicSession(
    377     QuicConnectionId connection_id,
    378     const IPEndPoint& server_address,
    379     const IPEndPoint& client_address) {
    380   QuicServerSession* session = new QuicServerSession(
    381       config_,
    382       CreateQuicConnection(connection_id, server_address, client_address),
    383       this);
    384   session->InitializeSession(crypto_config_);
    385   return session;
    386 }
    387 
    388 QuicConnection* QuicDispatcher::CreateQuicConnection(
    389     QuicConnectionId connection_id,
    390     const IPEndPoint& server_address,
    391     const IPEndPoint& client_address) {
    392   if (FLAGS_enable_quic_stream_flow_control_2 &&
    393       FLAGS_enable_quic_connection_flow_control_2) {
    394     DLOG(INFO) << "Creating QuicDispatcher with all versions.";
    395     return new QuicConnection(connection_id, client_address, helper_.get(),
    396                               writer_.get(), true, supported_versions_);
    397   }
    398 
    399   if (FLAGS_enable_quic_stream_flow_control_2 &&
    400       !FLAGS_enable_quic_connection_flow_control_2) {
    401     DLOG(INFO) << "Connection flow control disabled, creating QuicDispatcher "
    402                << "WITHOUT version 19 or higher.";
    403     return new QuicConnection(connection_id, client_address, helper_.get(),
    404                               writer_.get(), true,
    405                               supported_versions_no_connection_flow_control_);
    406   }
    407 
    408   DLOG(INFO) << "Flow control disabled, creating QuicDispatcher WITHOUT "
    409              << "version 17 or higher.";
    410   return new QuicConnection(connection_id, client_address, helper_.get(),
    411                             writer_.get(), true,
    412                             supported_versions_no_flow_control_);
    413 }
    414 
    415 QuicTimeWaitListManager* QuicDispatcher::CreateQuicTimeWaitListManager() {
    416   return new QuicTimeWaitListManager(
    417       writer_.get(), this, epoll_server(), supported_versions());
    418 }
    419 
    420 bool QuicDispatcher::HandlePacketForTimeWait(
    421     const QuicPacketPublicHeader& header) {
    422   if (header.reset_flag) {
    423     // Public reset packets do not have sequence numbers, so ignore the packet.
    424     return false;
    425   }
    426 
    427   // Switch the framer to the correct version, so that the sequence number can
    428   // be parsed correctly.
    429   framer_.set_version(time_wait_list_manager_->GetQuicVersionFromConnectionId(
    430       header.connection_id));
    431 
    432   // Continue parsing the packet to extract the sequence number.  Then
    433   // send it to the time wait manager in OnUnathenticatedHeader.
    434   return true;
    435 }
    436 
    437 }  // namespace tools
    438 }  // namespace net
    439