Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/tools/quic/quic_dispatcher.h"
      6 
      7 #include <errno.h>
      8 
      9 #include "base/debug/stack_trace.h"
     10 #include "base/logging.h"
     11 #include "base/stl_util.h"
     12 #include "net/quic/quic_blocked_writer_interface.h"
     13 #include "net/quic/quic_flags.h"
     14 #include "net/quic/quic_utils.h"
     15 #include "net/tools/epoll_server/epoll_server.h"
     16 #include "net/tools/quic/quic_default_packet_writer.h"
     17 #include "net/tools/quic/quic_epoll_connection_helper.h"
     18 #include "net/tools/quic/quic_per_connection_packet_writer.h"
     19 #include "net/tools/quic/quic_socket_utils.h"
     20 #include "net/tools/quic/quic_time_wait_list_manager.h"
     21 
     22 namespace net {
     23 
     24 namespace tools {
     25 
     26 using base::StringPiece;
     27 using std::make_pair;
     28 
     29 class DeleteSessionsAlarm : public EpollAlarm {
     30  public:
     31   explicit DeleteSessionsAlarm(QuicDispatcher* dispatcher)
     32       : dispatcher_(dispatcher) {
     33   }
     34 
     35   virtual int64 OnAlarm() OVERRIDE {
     36     EpollAlarm::OnAlarm();
     37     dispatcher_->DeleteSessions();
     38     return 0;
     39   }
     40 
     41  private:
     42   QuicDispatcher* dispatcher_;
     43 };
     44 
     45 class QuicDispatcher::QuicFramerVisitor : public QuicFramerVisitorInterface {
     46  public:
     47   explicit QuicFramerVisitor(QuicDispatcher* dispatcher)
     48       : dispatcher_(dispatcher),
     49         connection_id_(0) {}
     50 
     51   // QuicFramerVisitorInterface implementation
     52   virtual void OnPacket() OVERRIDE {}
     53   virtual bool OnUnauthenticatedPublicHeader(
     54       const QuicPacketPublicHeader& header) OVERRIDE {
     55     connection_id_ = header.connection_id;
     56     return dispatcher_->OnUnauthenticatedPublicHeader(header);
     57   }
     58   virtual bool OnUnauthenticatedHeader(
     59       const QuicPacketHeader& header) OVERRIDE {
     60     dispatcher_->OnUnauthenticatedHeader(header);
     61     return false;
     62   }
     63   virtual void OnError(QuicFramer* framer) OVERRIDE {
     64     DVLOG(1) << QuicUtils::ErrorToString(framer->error());
     65   }
     66 
     67   virtual bool OnProtocolVersionMismatch(
     68       QuicVersion /*received_version*/) OVERRIDE {
     69     if (dispatcher_->time_wait_list_manager()->IsConnectionIdInTimeWait(
     70             connection_id_)) {
     71       // Keep processing after protocol mismatch - this will be dealt with by
     72       // the TimeWaitListManager.
     73       return true;
     74     } else {
     75       DLOG(DFATAL) << "Version mismatch, connection ID (" << connection_id_
     76                    << ") not in time wait list.";
     77       return false;
     78     }
     79   }
     80 
     81   // The following methods should never get called because we always return
     82   // false from OnUnauthenticatedHeader().  As a result, we never process the
     83   // payload of the packet.
     84   virtual void OnPublicResetPacket(
     85       const QuicPublicResetPacket& /*packet*/) OVERRIDE {
     86     DCHECK(false);
     87   }
     88   virtual void OnVersionNegotiationPacket(
     89       const QuicVersionNegotiationPacket& /*packet*/) OVERRIDE {
     90     DCHECK(false);
     91   }
     92   virtual void OnDecryptedPacket(EncryptionLevel level) OVERRIDE {
     93     DCHECK(false);
     94   }
     95   virtual bool OnPacketHeader(const QuicPacketHeader& /*header*/) OVERRIDE {
     96     DCHECK(false);
     97     return false;
     98   }
     99   virtual void OnRevivedPacket() OVERRIDE {
    100     DCHECK(false);
    101   }
    102   virtual void OnFecProtectedPayload(StringPiece /*payload*/) OVERRIDE {
    103     DCHECK(false);
    104   }
    105   virtual bool OnStreamFrame(const QuicStreamFrame& /*frame*/) OVERRIDE {
    106     DCHECK(false);
    107     return false;
    108   }
    109   virtual bool OnAckFrame(const QuicAckFrame& /*frame*/) OVERRIDE {
    110     DCHECK(false);
    111     return false;
    112   }
    113   virtual bool OnCongestionFeedbackFrame(
    114       const QuicCongestionFeedbackFrame& /*frame*/) OVERRIDE {
    115     DCHECK(false);
    116     return false;
    117   }
    118   virtual bool OnStopWaitingFrame(
    119       const QuicStopWaitingFrame& /*frame*/) OVERRIDE {
    120     DCHECK(false);
    121     return false;
    122   }
    123   virtual bool OnPingFrame(const QuicPingFrame& /*frame*/) OVERRIDE {
    124     DCHECK(false);
    125     return false;
    126   }
    127   virtual bool OnRstStreamFrame(const QuicRstStreamFrame& /*frame*/) OVERRIDE {
    128     DCHECK(false);
    129     return false;
    130   }
    131   virtual bool OnConnectionCloseFrame(
    132       const QuicConnectionCloseFrame & /*frame*/) OVERRIDE {
    133     DCHECK(false);
    134     return false;
    135   }
    136   virtual bool OnGoAwayFrame(const QuicGoAwayFrame& /*frame*/) OVERRIDE {
    137     DCHECK(false);
    138     return false;
    139   }
    140   virtual bool OnWindowUpdateFrame(const QuicWindowUpdateFrame& /*frame*/)
    141       OVERRIDE {
    142     DCHECK(false);
    143     return false;
    144   }
    145   virtual bool OnBlockedFrame(const QuicBlockedFrame& frame) OVERRIDE {
    146     DCHECK(false);
    147     return false;
    148   }
    149   virtual void OnFecData(const QuicFecData& /*fec*/) OVERRIDE {
    150     DCHECK(false);
    151   }
    152   virtual void OnPacketComplete() OVERRIDE {
    153     DCHECK(false);
    154   }
    155 
    156  private:
    157   QuicDispatcher* dispatcher_;
    158 
    159   // Latched in OnUnauthenticatedPublicHeader for use later.
    160   QuicConnectionId connection_id_;
    161 };
    162 
    163 QuicPacketWriter* QuicDispatcher::DefaultPacketWriterFactory::Create(
    164     QuicPacketWriter* writer,
    165     QuicConnection* connection) {
    166   return new QuicPerConnectionPacketWriter(writer, connection);
    167 }
    168 
    169 QuicDispatcher::PacketWriterFactoryAdapter::PacketWriterFactoryAdapter(
    170     QuicDispatcher* dispatcher)
    171     : dispatcher_(dispatcher) {}
    172 
    173 QuicDispatcher::PacketWriterFactoryAdapter::~PacketWriterFactoryAdapter() {}
    174 
    175 QuicPacketWriter* QuicDispatcher::PacketWriterFactoryAdapter::Create(
    176     QuicConnection* connection) const {
    177   return dispatcher_->packet_writer_factory_->Create(
    178       dispatcher_->writer_.get(),
    179       connection);
    180 }
    181 
    182 QuicDispatcher::QuicDispatcher(const QuicConfig& config,
    183                                const QuicCryptoServerConfig& crypto_config,
    184                                const QuicVersionVector& supported_versions,
    185                                PacketWriterFactory* packet_writer_factory,
    186                                EpollServer* epoll_server)
    187     : config_(config),
    188       crypto_config_(crypto_config),
    189       delete_sessions_alarm_(new DeleteSessionsAlarm(this)),
    190       epoll_server_(epoll_server),
    191       helper_(new QuicEpollConnectionHelper(epoll_server_)),
    192       packet_writer_factory_(packet_writer_factory),
    193       connection_writer_factory_(this),
    194       supported_versions_(supported_versions),
    195       current_packet_(NULL),
    196       framer_(supported_versions, /*unused*/ QuicTime::Zero(), true),
    197       framer_visitor_(new QuicFramerVisitor(this)) {
    198   framer_.set_visitor(framer_visitor_.get());
    199 }
    200 
    201 QuicDispatcher::~QuicDispatcher() {
    202   STLDeleteValues(&session_map_);
    203   STLDeleteElements(&closed_session_list_);
    204 }
    205 
    206 void QuicDispatcher::Initialize(int fd) {
    207   DCHECK(writer_ == NULL);
    208   writer_.reset(CreateWriter(fd));
    209   time_wait_list_manager_.reset(CreateQuicTimeWaitListManager());
    210 }
    211 
    212 void QuicDispatcher::ProcessPacket(const IPEndPoint& server_address,
    213                                    const IPEndPoint& client_address,
    214                                    const QuicEncryptedPacket& packet) {
    215   current_server_address_ = server_address;
    216   current_client_address_ = client_address;
    217   current_packet_ = &packet;
    218   // ProcessPacket will cause the packet to be dispatched in
    219   // OnUnauthenticatedPublicHeader, or sent to the time wait list manager
    220   // in OnAuthenticatedHeader.
    221   framer_.ProcessPacket(packet);
    222   // TODO(rjshade): Return a status describing if/why a packet was dropped,
    223   //                and log somehow.  Maybe expose as a varz.
    224 }
    225 
    226 bool QuicDispatcher::OnUnauthenticatedPublicHeader(
    227     const QuicPacketPublicHeader& header) {
    228   QuicSession* session = NULL;
    229 
    230   QuicConnectionId connection_id = header.connection_id;
    231   SessionMap::iterator it = session_map_.find(connection_id);
    232   if (it == session_map_.end()) {
    233     if (header.reset_flag) {
    234       return false;
    235     }
    236     if (time_wait_list_manager_->IsConnectionIdInTimeWait(connection_id)) {
    237       return HandlePacketForTimeWait(header);
    238     }
    239 
    240     // Ensure the packet has a version negotiation bit set before creating a new
    241     // session for it.  All initial packets for a new connection are required to
    242     // have the flag set.  Otherwise it may be a stray packet.
    243     if (header.version_flag) {
    244       session = CreateQuicSession(connection_id, current_server_address_,
    245                                   current_client_address_);
    246     }
    247 
    248     if (session == NULL) {
    249       DVLOG(1) << "Failed to create session for " << connection_id;
    250       // Add this connection_id fo the time-wait state, to safely reject future
    251       // packets.
    252 
    253       if (header.version_flag &&
    254           !framer_.IsSupportedVersion(header.versions.front())) {
    255         // TODO(ianswett): Produce a no-version version negotiation packet.
    256         return false;
    257       }
    258 
    259       // Use the version in the packet if possible, otherwise assume the latest.
    260       QuicVersion version = header.version_flag ? header.versions.front() :
    261           supported_versions_.front();
    262       time_wait_list_manager_->AddConnectionIdToTimeWait(
    263           connection_id, version, NULL);
    264       DCHECK(time_wait_list_manager_->IsConnectionIdInTimeWait(connection_id));
    265       return HandlePacketForTimeWait(header);
    266     }
    267     DVLOG(1) << "Created new session for " << connection_id;
    268     session_map_.insert(make_pair(connection_id, session));
    269   } else {
    270     session = it->second;
    271   }
    272 
    273   session->connection()->ProcessUdpPacket(
    274       current_server_address_, current_client_address_, *current_packet_);
    275 
    276   // Do not parse the packet further.  The session will process it completely.
    277   return false;
    278 }
    279 
    280 void QuicDispatcher::OnUnauthenticatedHeader(const QuicPacketHeader& header) {
    281   DCHECK(time_wait_list_manager_->IsConnectionIdInTimeWait(
    282       header.public_header.connection_id));
    283   time_wait_list_manager_->ProcessPacket(current_server_address_,
    284                                          current_client_address_,
    285                                          header.public_header.connection_id,
    286                                          header.packet_sequence_number,
    287                                          *current_packet_);
    288 }
    289 
    290 void QuicDispatcher::CleanUpSession(SessionMap::iterator it) {
    291   QuicConnection* connection = it->second->connection();
    292   QuicEncryptedPacket* connection_close_packet =
    293       connection->ReleaseConnectionClosePacket();
    294   write_blocked_list_.erase(connection);
    295   time_wait_list_manager_->AddConnectionIdToTimeWait(it->first,
    296                                                      connection->version(),
    297                                                      connection_close_packet);
    298   session_map_.erase(it);
    299 }
    300 
    301 void QuicDispatcher::DeleteSessions() {
    302   STLDeleteElements(&closed_session_list_);
    303 }
    304 
    305 void QuicDispatcher::OnCanWrite() {
    306   // We got an EPOLLOUT: the socket should not be blocked.
    307   writer_->SetWritable();
    308 
    309   // Give all the blocked writers one chance to write, until we're blocked again
    310   // or there's no work left.
    311   while (!write_blocked_list_.empty() && !writer_->IsWriteBlocked()) {
    312     QuicBlockedWriterInterface* blocked_writer =
    313         write_blocked_list_.begin()->first;
    314     write_blocked_list_.erase(write_blocked_list_.begin());
    315     blocked_writer->OnCanWrite();
    316   }
    317 }
    318 
    319 bool QuicDispatcher::HasPendingWrites() const {
    320   return !write_blocked_list_.empty();
    321 }
    322 
    323 void QuicDispatcher::Shutdown() {
    324   while (!session_map_.empty()) {
    325     QuicSession* session = session_map_.begin()->second;
    326     session->connection()->SendConnectionClose(QUIC_PEER_GOING_AWAY);
    327     // Validate that the session removes itself from the session map on close.
    328     DCHECK(session_map_.empty() || session_map_.begin()->second != session);
    329   }
    330   DeleteSessions();
    331 }
    332 
    333 void QuicDispatcher::OnConnectionClosed(QuicConnectionId connection_id,
    334                                         QuicErrorCode error) {
    335   SessionMap::iterator it = session_map_.find(connection_id);
    336   if (it == session_map_.end()) {
    337     LOG(DFATAL) << "ConnectionId " << connection_id
    338                 << " does not exist in the session map.  "
    339                 << "Error: " << QuicUtils::ErrorToString(error);
    340     LOG(DFATAL) << base::debug::StackTrace().ToString();
    341     return;
    342   }
    343 
    344   DLOG_IF(INFO, error != QUIC_NO_ERROR) << "Closing connection ("
    345                                         << connection_id
    346                                         << ") due to error: "
    347                                         << QuicUtils::ErrorToString(error);
    348 
    349   if (closed_session_list_.empty()) {
    350     epoll_server_->RegisterAlarmApproximateDelta(
    351         0, delete_sessions_alarm_.get());
    352   }
    353   closed_session_list_.push_back(it->second);
    354   CleanUpSession(it);
    355 }
    356 
    357 void QuicDispatcher::OnWriteBlocked(
    358     QuicBlockedWriterInterface* blocked_writer) {
    359   if (!writer_->IsWriteBlocked()) {
    360     LOG(DFATAL) <<
    361         "QuicDispatcher::OnWriteBlocked called when the writer is not blocked.";
    362     // Return without adding the connection to the blocked list, to avoid
    363     // infinite loops in OnCanWrite.
    364     return;
    365   }
    366   write_blocked_list_.insert(make_pair(blocked_writer, true));
    367 }
    368 
    369 QuicPacketWriter* QuicDispatcher::CreateWriter(int fd) {
    370   return new QuicDefaultPacketWriter(fd);
    371 }
    372 
    373 QuicSession* QuicDispatcher::CreateQuicSession(
    374     QuicConnectionId connection_id,
    375     const IPEndPoint& server_address,
    376     const IPEndPoint& client_address) {
    377   QuicServerSession* session = new QuicServerSession(
    378       config_,
    379       CreateQuicConnection(connection_id, server_address, client_address),
    380       this);
    381   session->InitializeSession(crypto_config_);
    382   return session;
    383 }
    384 
    385 QuicConnection* QuicDispatcher::CreateQuicConnection(
    386     QuicConnectionId connection_id,
    387     const IPEndPoint& server_address,
    388     const IPEndPoint& client_address) {
    389   return new QuicConnection(connection_id,
    390                             client_address,
    391                             helper_.get(),
    392                             connection_writer_factory_,
    393                             /* owns_writer= */ true,
    394                             /* is_server= */ true,
    395                             supported_versions_);
    396 }
    397 
    398 QuicTimeWaitListManager* QuicDispatcher::CreateQuicTimeWaitListManager() {
    399   return new QuicTimeWaitListManager(
    400       writer_.get(), this, epoll_server(), supported_versions());
    401 }
    402 
    403 bool QuicDispatcher::HandlePacketForTimeWait(
    404     const QuicPacketPublicHeader& header) {
    405   if (header.reset_flag) {
    406     // Public reset packets do not have sequence numbers, so ignore the packet.
    407     return false;
    408   }
    409 
    410   // Switch the framer to the correct version, so that the sequence number can
    411   // be parsed correctly.
    412   framer_.set_version(time_wait_list_manager_->GetQuicVersionFromConnectionId(
    413       header.connection_id));
    414 
    415   // Continue parsing the packet to extract the sequence number.  Then
    416   // send it to the time wait manager in OnUnathenticatedHeader.
    417   return true;
    418 }
    419 
    420 }  // namespace tools
    421 }  // namespace net
    422