Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_session.h"
      6 
      7 #include "base/basictypes.h"
      8 #include "base/logging.h"
      9 #include "base/memory/linked_ptr.h"
     10 #include "base/message_loop.h"
     11 #include "base/metrics/field_trial.h"
     12 #include "base/metrics/stats_counters.h"
     13 #include "base/stl_util-inl.h"
     14 #include "base/string_number_conversions.h"
     15 #include "base/string_util.h"
     16 #include "base/stringprintf.h"
     17 #include "base/time.h"
     18 #include "base/utf_string_conversions.h"
     19 #include "base/values.h"
     20 #include "net/base/connection_type_histograms.h"
     21 #include "net/base/net_log.h"
     22 #include "net/base/net_util.h"
     23 #include "net/http/http_network_session.h"
     24 #include "net/socket/ssl_client_socket.h"
     25 #include "net/spdy/spdy_frame_builder.h"
     26 #include "net/spdy/spdy_http_utils.h"
     27 #include "net/spdy/spdy_protocol.h"
     28 #include "net/spdy/spdy_session_pool.h"
     29 #include "net/spdy/spdy_settings_storage.h"
     30 #include "net/spdy/spdy_stream.h"
     31 
     32 namespace net {
     33 
     34 NetLogSpdySynParameter::NetLogSpdySynParameter(
     35     const linked_ptr<spdy::SpdyHeaderBlock>& headers,
     36     spdy::SpdyControlFlags flags,
     37     spdy::SpdyStreamId id,
     38     spdy::SpdyStreamId associated_stream)
     39     : headers_(headers),
     40       flags_(flags),
     41       id_(id),
     42       associated_stream_(associated_stream) {
     43 }
     44 
     45 NetLogSpdySynParameter::~NetLogSpdySynParameter() {
     46 }
     47 
     48 Value* NetLogSpdySynParameter::ToValue() const {
     49   DictionaryValue* dict = new DictionaryValue();
     50   ListValue* headers_list = new ListValue();
     51   for (spdy::SpdyHeaderBlock::const_iterator it = headers_->begin();
     52       it != headers_->end(); ++it) {
     53     headers_list->Append(new StringValue(base::StringPrintf(
     54         "%s: %s", it->first.c_str(), it->second.c_str())));
     55   }
     56   dict->SetInteger("flags", flags_);
     57   dict->Set("headers", headers_list);
     58   dict->SetInteger("id", id_);
     59   if (associated_stream_)
     60     dict->SetInteger("associated_stream", associated_stream_);
     61   return dict;
     62 }
     63 
     64 namespace {
     65 
     66 const int kReadBufferSize = 8 * 1024;
     67 
     68 class NetLogSpdySessionParameter : public NetLog::EventParameters {
     69  public:
     70   NetLogSpdySessionParameter(const HostPortProxyPair& host_pair)
     71       : host_pair_(host_pair) {}
     72   virtual Value* ToValue() const {
     73     DictionaryValue* dict = new DictionaryValue();
     74     dict->Set("host", new StringValue(host_pair_.first.ToString()));
     75     dict->Set("proxy", new StringValue(host_pair_.second.ToPacString()));
     76     return dict;
     77   }
     78  private:
     79   const HostPortProxyPair host_pair_;
     80   DISALLOW_COPY_AND_ASSIGN(NetLogSpdySessionParameter);
     81 };
     82 
     83 class NetLogSpdySettingsParameter : public NetLog::EventParameters {
     84  public:
     85   explicit NetLogSpdySettingsParameter(const spdy::SpdySettings& settings)
     86       : settings_(settings) {}
     87 
     88   virtual Value* ToValue() const {
     89     DictionaryValue* dict = new DictionaryValue();
     90     ListValue* settings = new ListValue();
     91     for (spdy::SpdySettings::const_iterator it = settings_.begin();
     92          it != settings_.end(); ++it) {
     93       settings->Append(new StringValue(
     94           base::StringPrintf("[%u:%u]", it->first.id(), it->second)));
     95     }
     96     dict->Set("settings", settings);
     97     return dict;
     98   }
     99 
    100  private:
    101   ~NetLogSpdySettingsParameter() {}
    102   const spdy::SpdySettings settings_;
    103 
    104   DISALLOW_COPY_AND_ASSIGN(NetLogSpdySettingsParameter);
    105 };
    106 
    107 class NetLogSpdyWindowUpdateParameter : public NetLog::EventParameters {
    108  public:
    109   NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,
    110                                   int delta,
    111                                   int window_size)
    112       : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
    113 
    114   virtual Value* ToValue() const {
    115     DictionaryValue* dict = new DictionaryValue();
    116     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
    117     dict->SetInteger("delta", delta_);
    118     dict->SetInteger("window_size", window_size_);
    119     return dict;
    120   }
    121 
    122  private:
    123   ~NetLogSpdyWindowUpdateParameter() {}
    124   const spdy::SpdyStreamId stream_id_;
    125   const int delta_;
    126   const int window_size_;
    127 
    128   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyWindowUpdateParameter);
    129 };
    130 
    131 class NetLogSpdyDataParameter : public NetLog::EventParameters {
    132  public:
    133   NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,
    134                           int size,
    135                           spdy::SpdyDataFlags flags)
    136       : stream_id_(stream_id), size_(size), flags_(flags) {}
    137 
    138   virtual Value* ToValue() const {
    139     DictionaryValue* dict = new DictionaryValue();
    140     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
    141     dict->SetInteger("size", size_);
    142     dict->SetInteger("flags", static_cast<int>(flags_));
    143     return dict;
    144   }
    145 
    146  private:
    147   ~NetLogSpdyDataParameter() {}
    148   const spdy::SpdyStreamId stream_id_;
    149   const int size_;
    150   const spdy::SpdyDataFlags flags_;
    151 
    152   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyDataParameter);
    153 };
    154 
    155 class NetLogSpdyRstParameter : public NetLog::EventParameters {
    156  public:
    157   NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id, int status)
    158       : stream_id_(stream_id), status_(status) {}
    159 
    160   virtual Value* ToValue() const {
    161     DictionaryValue* dict = new DictionaryValue();
    162     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
    163     dict->SetInteger("status", status_);
    164     return dict;
    165   }
    166 
    167  private:
    168   ~NetLogSpdyRstParameter() {}
    169   const spdy::SpdyStreamId stream_id_;
    170   const int status_;
    171 
    172   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyRstParameter);
    173 };
    174 
    175 class NetLogSpdyPingParameter : public NetLog::EventParameters {
    176  public:
    177   explicit NetLogSpdyPingParameter(uint32 unique_id) : unique_id_(unique_id) {}
    178 
    179   virtual Value* ToValue() const {
    180     DictionaryValue* dict = new DictionaryValue();
    181     dict->SetInteger("unique_id", unique_id_);
    182     return dict;
    183   }
    184 
    185  private:
    186   ~NetLogSpdyPingParameter() {}
    187   const uint32 unique_id_;
    188 
    189   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyPingParameter);
    190 };
    191 
    192 class NetLogSpdyGoAwayParameter : public NetLog::EventParameters {
    193  public:
    194   NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,
    195                             int active_streams,
    196                             int unclaimed_streams)
    197       : last_stream_id_(last_stream_id),
    198         active_streams_(active_streams),
    199         unclaimed_streams_(unclaimed_streams) {}
    200 
    201   virtual Value* ToValue() const {
    202     DictionaryValue* dict = new DictionaryValue();
    203     dict->SetInteger("last_accepted_stream_id",
    204                      static_cast<int>(last_stream_id_));
    205     dict->SetInteger("active_streams", active_streams_);
    206     dict->SetInteger("unclaimed_streams", unclaimed_streams_);
    207     return dict;
    208   }
    209 
    210  private:
    211   ~NetLogSpdyGoAwayParameter() {}
    212   const spdy::SpdyStreamId last_stream_id_;
    213   const int active_streams_;
    214   const int unclaimed_streams_;
    215 
    216   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyGoAwayParameter);
    217 };
    218 
    219 }  // namespace
    220 
    221 // static
    222 bool SpdySession::use_ssl_ = true;
    223 
    224 // static
    225 bool SpdySession::use_flow_control_ = false;
    226 
    227 // static
    228 size_t SpdySession::max_concurrent_stream_limit_ = 256;
    229 
    230 // static
    231 bool SpdySession::enable_ping_based_connection_checking_ = true;
    232 
    233 // static
    234 int SpdySession::connection_at_risk_of_loss_ms_ = 0;
    235 
    236 // static
    237 int SpdySession::trailing_ping_delay_time_ms_ = 1000;
    238 
    239 // static
    240 int SpdySession::hung_interval_ms_ = 10000;
    241 
    242 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
    243                          SpdySessionPool* spdy_session_pool,
    244                          SpdySettingsStorage* spdy_settings,
    245                          NetLog* net_log)
    246     : ALLOW_THIS_IN_INITIALIZER_LIST(
    247           read_callback_(this, &SpdySession::OnReadComplete)),
    248       ALLOW_THIS_IN_INITIALIZER_LIST(
    249           write_callback_(this, &SpdySession::OnWriteComplete)),
    250       ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
    251       host_port_proxy_pair_(host_port_proxy_pair),
    252       spdy_session_pool_(spdy_session_pool),
    253       spdy_settings_(spdy_settings),
    254       connection_(new ClientSocketHandle),
    255       read_buffer_(new IOBuffer(kReadBufferSize)),
    256       read_pending_(false),
    257       stream_hi_water_mark_(1),  // Always start at 1 for the first stream id.
    258       write_pending_(false),
    259       delayed_write_pending_(false),
    260       is_secure_(false),
    261       certificate_error_code_(OK),
    262       error_(OK),
    263       state_(IDLE),
    264       max_concurrent_streams_(kDefaultMaxConcurrentStreams),
    265       streams_initiated_count_(0),
    266       streams_pushed_count_(0),
    267       streams_pushed_and_claimed_count_(0),
    268       streams_abandoned_count_(0),
    269       frames_received_(0),
    270       bytes_received_(0),
    271       sent_settings_(false),
    272       received_settings_(false),
    273       stalled_streams_(0),
    274       pings_in_flight_(0),
    275       next_ping_id_(1),
    276       received_data_time_(base::TimeTicks::Now()),
    277       trailing_ping_pending_(false),
    278       check_ping_status_pending_(false),
    279       need_to_send_ping_(false),
    280       initial_send_window_size_(spdy::kSpdyStreamInitialWindowSize),
    281       initial_recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
    282       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)) {
    283   DCHECK(HttpStreamFactory::spdy_enabled());
    284   net_log_.BeginEvent(
    285       NetLog::TYPE_SPDY_SESSION,
    286       make_scoped_refptr(
    287           new NetLogSpdySessionParameter(host_port_proxy_pair_)));
    288 
    289   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
    290 
    291   spdy_framer_.set_visitor(this);
    292 
    293   SendSettings();
    294 }
    295 
    296 SpdySession::~SpdySession() {
    297   if (state_ != CLOSED) {
    298     state_ = CLOSED;
    299 
    300     // Cleanup all the streams.
    301     CloseAllStreams(net::ERR_ABORTED);
    302   }
    303 
    304   if (connection_->is_initialized()) {
    305     // With Spdy we can't recycle sockets.
    306     connection_->socket()->Disconnect();
    307   }
    308 
    309   // Streams should all be gone now.
    310   DCHECK_EQ(0u, num_active_streams());
    311   DCHECK_EQ(0u, num_unclaimed_pushed_streams());
    312 
    313   DCHECK(pending_callback_map_.empty());
    314 
    315   RecordHistograms();
    316 
    317   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION, NULL);
    318 }
    319 
    320 net::Error SpdySession::InitializeWithSocket(
    321     ClientSocketHandle* connection,
    322     bool is_secure,
    323     int certificate_error_code) {
    324   base::StatsCounter spdy_sessions("spdy.sessions");
    325   spdy_sessions.Increment();
    326 
    327   state_ = CONNECTED;
    328   connection_.reset(connection);
    329   is_secure_ = is_secure;
    330   certificate_error_code_ = certificate_error_code;
    331 
    332   // Write out any data that we might have to send, such as the settings frame.
    333   WriteSocketLater();
    334   net::Error error = ReadSocket();
    335   if (error == ERR_IO_PENDING)
    336     return OK;
    337   return error;
    338 }
    339 
    340 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
    341   if (state_ != CONNECTED)
    342     return false;
    343 
    344   SSLInfo ssl_info;
    345   bool was_npn_negotiated;
    346   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated))
    347     return true;   // This is not a secure session, so all domains are okay.
    348 
    349   return ssl_info.cert->VerifyNameMatch(domain);
    350 }
    351 
    352 int SpdySession::GetPushStream(
    353     const GURL& url,
    354     scoped_refptr<SpdyStream>* stream,
    355     const BoundNetLog& stream_net_log) {
    356   CHECK_NE(state_, CLOSED);
    357 
    358   *stream = NULL;
    359 
    360   // Don't allow access to secure push streams over an unauthenticated, but
    361   // encrypted SSL socket.
    362   if (is_secure_ && certificate_error_code_ != OK &&
    363       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
    364     LOG(ERROR) << "Tried to get pushed spdy stream for secure content over an "
    365                << "unauthenticated session.";
    366     CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
    367     return ERR_SPDY_PROTOCOL_ERROR;
    368   }
    369 
    370   *stream = GetActivePushStream(url.spec());
    371   if (stream->get()) {
    372     DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
    373     streams_pushed_and_claimed_count_++;
    374     return OK;
    375   }
    376   return 0;
    377 }
    378 
    379 int SpdySession::CreateStream(
    380     const GURL& url,
    381     RequestPriority priority,
    382     scoped_refptr<SpdyStream>* spdy_stream,
    383     const BoundNetLog& stream_net_log,
    384     CompletionCallback* callback) {
    385   if (!max_concurrent_streams_ ||
    386       active_streams_.size() < max_concurrent_streams_) {
    387     return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
    388   }
    389 
    390   stalled_streams_++;
    391   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL);
    392   create_stream_queues_[priority].push(
    393       PendingCreateStream(url, priority, spdy_stream,
    394                           stream_net_log, callback));
    395   return ERR_IO_PENDING;
    396 }
    397 
    398 void SpdySession::ProcessPendingCreateStreams() {
    399   while (!max_concurrent_streams_ ||
    400          active_streams_.size() < max_concurrent_streams_) {
    401     bool no_pending_create_streams = true;
    402     for (int i = 0;i < NUM_PRIORITIES;++i) {
    403       if (!create_stream_queues_[i].empty()) {
    404         PendingCreateStream pending_create = create_stream_queues_[i].front();
    405         create_stream_queues_[i].pop();
    406         no_pending_create_streams = false;
    407         int error = CreateStreamImpl(*pending_create.url,
    408                                      pending_create.priority,
    409                                      pending_create.spdy_stream,
    410                                      *pending_create.stream_net_log);
    411         scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream;
    412         DCHECK(!ContainsKey(pending_callback_map_, stream));
    413         pending_callback_map_[stream] =
    414             CallbackResultPair(pending_create.callback, error);
    415         MessageLoop::current()->PostTask(
    416             FROM_HERE,
    417             method_factory_.NewRunnableMethod(
    418                 &SpdySession::InvokeUserStreamCreationCallback, stream));
    419         break;
    420       }
    421     }
    422     if (no_pending_create_streams)
    423       return;  // there were no streams in any queue
    424   }
    425 }
    426 
    427 void SpdySession::CancelPendingCreateStreams(
    428     const scoped_refptr<SpdyStream>* spdy_stream) {
    429   PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream);
    430   if (it != pending_callback_map_.end()) {
    431     pending_callback_map_.erase(it);
    432     return;
    433   }
    434 
    435   for (int i = 0;i < NUM_PRIORITIES;++i) {
    436     PendingCreateStreamQueue tmp;
    437     // Make a copy removing this trans
    438     while (!create_stream_queues_[i].empty()) {
    439       PendingCreateStream pending_create = create_stream_queues_[i].front();
    440       create_stream_queues_[i].pop();
    441       if (pending_create.spdy_stream != spdy_stream)
    442         tmp.push(pending_create);
    443     }
    444     // Now copy it back
    445     while (!tmp.empty()) {
    446       create_stream_queues_[i].push(tmp.front());
    447       tmp.pop();
    448     }
    449   }
    450 }
    451 
    452 int SpdySession::CreateStreamImpl(
    453     const GURL& url,
    454     RequestPriority priority,
    455     scoped_refptr<SpdyStream>* spdy_stream,
    456     const BoundNetLog& stream_net_log) {
    457   // Make sure that we don't try to send https/wss over an unauthenticated, but
    458   // encrypted SSL socket.
    459   if (is_secure_ && certificate_error_code_ != OK &&
    460       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
    461     LOG(ERROR) << "Tried to create spdy stream for secure content over an "
    462                << "unauthenticated session.";
    463     CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
    464     return ERR_SPDY_PROTOCOL_ERROR;
    465   }
    466 
    467   const std::string& path = url.PathForRequest();
    468 
    469   const spdy::SpdyStreamId stream_id = GetNewStreamId();
    470 
    471   *spdy_stream = new SpdyStream(this,
    472                                 stream_id,
    473                                 false,
    474                                 stream_net_log);
    475   const scoped_refptr<SpdyStream>& stream = *spdy_stream;
    476 
    477   stream->set_priority(priority);
    478   stream->set_path(path);
    479   stream->set_send_window_size(initial_send_window_size_);
    480   stream->set_recv_window_size(initial_recv_window_size_);
    481   ActivateStream(stream);
    482 
    483   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
    484       static_cast<int>(priority), 0, 10, 11);
    485 
    486   // TODO(mbelshe): Optimize memory allocations
    487   DCHECK(priority >= net::HIGHEST && priority < net::NUM_PRIORITIES);
    488 
    489   DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
    490   return OK;
    491 }
    492 
    493 int SpdySession::WriteSynStream(
    494     spdy::SpdyStreamId stream_id,
    495     RequestPriority priority,
    496     spdy::SpdyControlFlags flags,
    497     const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
    498   // Find our stream
    499   if (!IsStreamActive(stream_id))
    500     return ERR_INVALID_SPDY_STREAM;
    501   const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
    502   CHECK_EQ(stream->stream_id(), stream_id);
    503 
    504   SendPrefacePingIfNoneInFlight();
    505 
    506   scoped_ptr<spdy::SpdySynStreamControlFrame> syn_frame(
    507       spdy_framer_.CreateSynStream(
    508           stream_id, 0,
    509           ConvertRequestPriorityToSpdyPriority(priority),
    510           flags, false, headers.get()));
    511   QueueFrame(syn_frame.get(), priority, stream);
    512 
    513   base::StatsCounter spdy_requests("spdy.requests");
    514   spdy_requests.Increment();
    515   streams_initiated_count_++;
    516 
    517   if (net_log().IsLoggingAllEvents()) {
    518     net_log().AddEvent(
    519         NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
    520         make_scoped_refptr(
    521             new NetLogSpdySynParameter(headers, flags, stream_id, 0)));
    522   }
    523 
    524   // Some servers don't like too many pings, so we limit our current sending to
    525   // no more than one ping for any syn sent.  To do this, we avoid ever setting
    526   // this to true unless we send a syn (which we have just done).  This approach
    527   // may change over time as servers change their responses to pings.
    528   need_to_send_ping_ = true;
    529 
    530   return ERR_IO_PENDING;
    531 }
    532 
    533 int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id,
    534                                  net::IOBuffer* data, int len,
    535                                  spdy::SpdyDataFlags flags) {
    536   // Find our stream
    537   DCHECK(IsStreamActive(stream_id));
    538   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
    539   CHECK_EQ(stream->stream_id(), stream_id);
    540   if (!stream)
    541     return ERR_INVALID_SPDY_STREAM;
    542 
    543   SendPrefacePingIfNoneInFlight();
    544 
    545   if (len > kMaxSpdyFrameChunkSize) {
    546     len = kMaxSpdyFrameChunkSize;
    547     flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
    548   }
    549 
    550   // Obey send window size of the stream if flow control is enabled.
    551   if (use_flow_control_) {
    552     if (stream->send_window_size() <= 0) {
    553       // Because we queue frames onto the session, it is possible that
    554       // a stream was not flow controlled at the time it attempted the
    555       // write, but when we go to fulfill the write, it is now flow
    556       // controlled.  This is why we need the session to mark the stream
    557       // as stalled - because only the session knows for sure when the
    558       // stall occurs.
    559       stream->set_stalled_by_flow_control(true);
    560       net_log().AddEvent(
    561           NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
    562           make_scoped_refptr(
    563               new NetLogIntegerParameter("stream_id", stream_id)));
    564       return ERR_IO_PENDING;
    565     }
    566     int new_len = std::min(len, stream->send_window_size());
    567     if (new_len < len) {
    568       len = new_len;
    569       flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
    570     }
    571     stream->DecreaseSendWindowSize(len);
    572   }
    573 
    574   if (net_log().IsLoggingAllEvents()) {
    575     net_log().AddEvent(
    576         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
    577         make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags)));
    578   }
    579 
    580   // TODO(mbelshe): reduce memory copies here.
    581   scoped_ptr<spdy::SpdyDataFrame> frame(
    582       spdy_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
    583   QueueFrame(frame.get(), stream->priority(), stream);
    584   return ERR_IO_PENDING;
    585 }
    586 
    587 void SpdySession::CloseStream(spdy::SpdyStreamId stream_id, int status) {
    588   // TODO(mbelshe): We should send a RST_STREAM control frame here
    589   //                so that the server can cancel a large send.
    590 
    591   DeleteStream(stream_id, status);
    592 }
    593 
    594 void SpdySession::ResetStream(
    595     spdy::SpdyStreamId stream_id, spdy::SpdyStatusCodes status) {
    596 
    597   net_log().AddEvent(
    598       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
    599       make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status)));
    600 
    601   scoped_ptr<spdy::SpdyRstStreamControlFrame> rst_frame(
    602       spdy_framer_.CreateRstStream(stream_id, status));
    603 
    604   // Default to lowest priority unless we know otherwise.
    605   int priority = 3;
    606   if(IsStreamActive(stream_id)) {
    607     scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
    608     priority = stream->priority();
    609   }
    610   QueueFrame(rst_frame.get(), priority, NULL);
    611   DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
    612 }
    613 
    614 bool SpdySession::IsStreamActive(spdy::SpdyStreamId stream_id) const {
    615   return ContainsKey(active_streams_, stream_id);
    616 }
    617 
    618 LoadState SpdySession::GetLoadState() const {
    619   // NOTE: The application only queries the LoadState via the
    620   //       SpdyNetworkTransaction, and details are only needed when
    621   //       we're in the process of connecting.
    622 
    623   // If we're connecting, defer to the connection to give us the actual
    624   // LoadState.
    625   if (state_ == CONNECTING)
    626     return connection_->GetLoadState();
    627 
    628   // Just report that we're idle since the session could be doing
    629   // many things concurrently.
    630   return LOAD_STATE_IDLE;
    631 }
    632 
    633 void SpdySession::OnReadComplete(int bytes_read) {
    634   // Parse a frame.  For now this code requires that the frame fit into our
    635   // buffer (32KB).
    636   // TODO(mbelshe): support arbitrarily large frames!
    637 
    638   read_pending_ = false;
    639 
    640   if (bytes_read <= 0) {
    641     // Session is tearing down.
    642     net::Error error = static_cast<net::Error>(bytes_read);
    643     if (bytes_read == 0)
    644       error = ERR_CONNECTION_CLOSED;
    645     CloseSessionOnError(error, true);
    646     return;
    647   }
    648 
    649   bytes_received_ += bytes_read;
    650 
    651   received_data_time_ = base::TimeTicks::Now();
    652 
    653   // The SpdyFramer will use callbacks onto |this| as it parses frames.
    654   // When errors occur, those callbacks can lead to teardown of all references
    655   // to |this|, so maintain a reference to self during this call for safe
    656   // cleanup.
    657   scoped_refptr<SpdySession> self(this);
    658 
    659   char *data = read_buffer_->data();
    660   while (bytes_read &&
    661          spdy_framer_.error_code() == spdy::SpdyFramer::SPDY_NO_ERROR) {
    662     uint32 bytes_processed = spdy_framer_.ProcessInput(data, bytes_read);
    663     bytes_read -= bytes_processed;
    664     data += bytes_processed;
    665     if (spdy_framer_.state() == spdy::SpdyFramer::SPDY_DONE)
    666       spdy_framer_.Reset();
    667   }
    668 
    669   if (state_ != CLOSED)
    670     ReadSocket();
    671 }
    672 
    673 void SpdySession::OnWriteComplete(int result) {
    674   DCHECK(write_pending_);
    675   DCHECK(in_flight_write_.size());
    676 
    677   write_pending_ = false;
    678 
    679   scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
    680 
    681   if (result >= 0) {
    682     // It should not be possible to have written more bytes than our
    683     // in_flight_write_.
    684     DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
    685 
    686     in_flight_write_.buffer()->DidConsume(result);
    687 
    688     // We only notify the stream when we've fully written the pending frame.
    689     if (!in_flight_write_.buffer()->BytesRemaining()) {
    690       if (stream) {
    691         // Report the number of bytes written to the caller, but exclude the
    692         // frame size overhead.  NOTE: if this frame was compressed the
    693         // reported bytes written is the compressed size, not the original
    694         // size.
    695         if (result > 0) {
    696           result = in_flight_write_.buffer()->size();
    697           DCHECK_GE(result, static_cast<int>(spdy::SpdyFrame::size()));
    698           result -= static_cast<int>(spdy::SpdyFrame::size());
    699         }
    700 
    701         // It is possible that the stream was cancelled while we were writing
    702         // to the socket.
    703         if (!stream->cancelled())
    704           stream->OnWriteComplete(result);
    705       }
    706 
    707       // Cleanup the write which just completed.
    708       in_flight_write_.release();
    709     }
    710 
    711     // Write more data.  We're already in a continuation, so we can
    712     // go ahead and write it immediately (without going back to the
    713     // message loop).
    714     WriteSocketLater();
    715   } else {
    716     in_flight_write_.release();
    717 
    718     // The stream is now errored.  Close it down.
    719     CloseSessionOnError(static_cast<net::Error>(result), true);
    720   }
    721 }
    722 
    723 net::Error SpdySession::ReadSocket() {
    724   if (read_pending_)
    725     return OK;
    726 
    727   if (state_ == CLOSED) {
    728     NOTREACHED();
    729     return ERR_UNEXPECTED;
    730   }
    731 
    732   CHECK(connection_.get());
    733   CHECK(connection_->socket());
    734   int bytes_read = connection_->socket()->Read(read_buffer_.get(),
    735                                                kReadBufferSize,
    736                                                &read_callback_);
    737   switch (bytes_read) {
    738     case 0:
    739       // Socket is closed!
    740       CloseSessionOnError(ERR_CONNECTION_CLOSED, true);
    741       return ERR_CONNECTION_CLOSED;
    742     case net::ERR_IO_PENDING:
    743       // Waiting for data.  Nothing to do now.
    744       read_pending_ = true;
    745       return ERR_IO_PENDING;
    746     default:
    747       // Data was read, process it.
    748       // Schedule the work through the message loop to avoid recursive
    749       // callbacks.
    750       read_pending_ = true;
    751       MessageLoop::current()->PostTask(
    752           FROM_HERE,
    753           method_factory_.NewRunnableMethod(
    754               &SpdySession::OnReadComplete, bytes_read));
    755       break;
    756   }
    757   return OK;
    758 }
    759 
    760 void SpdySession::WriteSocketLater() {
    761   if (delayed_write_pending_)
    762     return;
    763 
    764   if (state_ < CONNECTED)
    765     return;
    766 
    767   delayed_write_pending_ = true;
    768   MessageLoop::current()->PostTask(
    769       FROM_HERE,
    770       method_factory_.NewRunnableMethod(&SpdySession::WriteSocket));
    771 }
    772 
    773 void SpdySession::WriteSocket() {
    774   // This function should only be called via WriteSocketLater.
    775   DCHECK(delayed_write_pending_);
    776   delayed_write_pending_ = false;
    777 
    778   // If the socket isn't connected yet, just wait; we'll get called
    779   // again when the socket connection completes.  If the socket is
    780   // closed, just return.
    781   if (state_ < CONNECTED || state_ == CLOSED)
    782     return;
    783 
    784   if (write_pending_)   // Another write is in progress still.
    785     return;
    786 
    787   // Loop sending frames until we've sent everything or until the write
    788   // returns error (or ERR_IO_PENDING).
    789   while (in_flight_write_.buffer() || !queue_.empty()) {
    790     if (!in_flight_write_.buffer()) {
    791       // Grab the next SpdyFrame to send.
    792       SpdyIOBuffer next_buffer = queue_.top();
    793       queue_.pop();
    794 
    795       // We've deferred compression until just before we write it to the socket,
    796       // which is now.  At this time, we don't compress our data frames.
    797       spdy::SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
    798       size_t size;
    799       if (spdy_framer_.IsCompressible(uncompressed_frame)) {
    800         scoped_ptr<spdy::SpdyFrame> compressed_frame(
    801             spdy_framer_.CompressFrame(uncompressed_frame));
    802         if (!compressed_frame.get()) {
    803           LOG(ERROR) << "SPDY Compression failure";
    804           CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
    805           return;
    806         }
    807 
    808         size = compressed_frame->length() + spdy::SpdyFrame::size();
    809 
    810         DCHECK_GT(size, 0u);
    811 
    812         // TODO(mbelshe): We have too much copying of data here.
    813         IOBufferWithSize* buffer = new IOBufferWithSize(size);
    814         memcpy(buffer->data(), compressed_frame->data(), size);
    815 
    816         // Attempt to send the frame.
    817         in_flight_write_ = SpdyIOBuffer(buffer, size, 0, next_buffer.stream());
    818       } else {
    819         size = uncompressed_frame.length() + spdy::SpdyFrame::size();
    820         in_flight_write_ = next_buffer;
    821       }
    822     } else {
    823       DCHECK(in_flight_write_.buffer()->BytesRemaining());
    824     }
    825 
    826     write_pending_ = true;
    827     int rv = connection_->socket()->Write(in_flight_write_.buffer(),
    828         in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
    829     if (rv == net::ERR_IO_PENDING)
    830       break;
    831 
    832     // We sent the frame successfully.
    833     OnWriteComplete(rv);
    834 
    835     // TODO(mbelshe):  Test this error case.  Maybe we should mark the socket
    836     //                 as in an error state.
    837     if (rv < 0)
    838       break;
    839   }
    840 }
    841 
    842 void SpdySession::CloseAllStreams(net::Error status) {
    843   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
    844   base::StatsCounter abandoned_push_streams(
    845       "spdy.abandoned_push_streams");
    846 
    847   if (!active_streams_.empty())
    848     abandoned_streams.Add(active_streams_.size());
    849   if (!unclaimed_pushed_streams_.empty()) {
    850     streams_abandoned_count_ += unclaimed_pushed_streams_.size();
    851     abandoned_push_streams.Add(unclaimed_pushed_streams_.size());
    852     unclaimed_pushed_streams_.clear();
    853   }
    854 
    855   for (int i = 0;i < NUM_PRIORITIES;++i) {
    856     while (!create_stream_queues_[i].empty()) {
    857       PendingCreateStream pending_create = create_stream_queues_[i].front();
    858       create_stream_queues_[i].pop();
    859       pending_create.callback->Run(ERR_ABORTED);
    860     }
    861   }
    862 
    863   while (!active_streams_.empty()) {
    864     ActiveStreamMap::iterator it = active_streams_.begin();
    865     const scoped_refptr<SpdyStream>& stream = it->second;
    866     DCHECK(stream);
    867     LOG(WARNING) << "ABANDONED (stream_id=" << stream->stream_id()
    868                  << "): " << stream->path();
    869     DeleteStream(stream->stream_id(), status);
    870   }
    871 
    872   // We also need to drain the queue.
    873   while (queue_.size())
    874     queue_.pop();
    875 }
    876 
    877 int SpdySession::GetNewStreamId() {
    878   int id = stream_hi_water_mark_;
    879   stream_hi_water_mark_ += 2;
    880   if (stream_hi_water_mark_ > 0x7fff)
    881     stream_hi_water_mark_ = 1;
    882   return id;
    883 }
    884 
    885 void SpdySession::QueueFrame(spdy::SpdyFrame* frame,
    886                              spdy::SpdyPriority priority,
    887                              SpdyStream* stream) {
    888   int length = spdy::SpdyFrame::size() + frame->length();
    889   IOBuffer* buffer = new IOBuffer(length);
    890   memcpy(buffer->data(), frame->data(), length);
    891   queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
    892 
    893   WriteSocketLater();
    894 }
    895 
    896 void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool) {
    897   // Closing all streams can have a side-effect of dropping the last reference
    898   // to |this|.  Hold a reference through this function.
    899   scoped_refptr<SpdySession> self(this);
    900 
    901   DCHECK_LT(err, OK);
    902   net_log_.AddEvent(
    903       NetLog::TYPE_SPDY_SESSION_CLOSE,
    904       make_scoped_refptr(new NetLogIntegerParameter("status", err)));
    905 
    906   // Don't close twice.  This can occur because we can have both
    907   // a read and a write outstanding, and each can complete with
    908   // an error.
    909   if (state_ != CLOSED) {
    910     state_ = CLOSED;
    911     error_ = err;
    912     if (remove_from_pool)
    913       RemoveFromPool();
    914     CloseAllStreams(err);
    915   }
    916 }
    917 
    918 Value* SpdySession::GetInfoAsValue() const {
    919   DictionaryValue* dict = new DictionaryValue();
    920 
    921   dict->SetInteger("source_id", net_log_.source().id);
    922 
    923   dict->SetString("host_port_pair", host_port_proxy_pair_.first.ToString());
    924   dict->SetString("proxy", host_port_proxy_pair_.second.ToURI());
    925 
    926   dict->SetInteger("active_streams", active_streams_.size());
    927 
    928   dict->SetInteger("unclaimed_pushed_streams",
    929       unclaimed_pushed_streams_.size());
    930 
    931   dict->SetBoolean("is_secure", is_secure_);
    932 
    933   dict->SetInteger("error", error_);
    934   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
    935 
    936   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
    937   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
    938   dict->SetInteger("streams_pushed_and_claimed_count",
    939       streams_pushed_and_claimed_count_);
    940   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
    941   dict->SetInteger("frames_received", frames_received_);
    942 
    943   dict->SetBoolean("sent_settings", sent_settings_);
    944   dict->SetBoolean("received_settings", received_settings_);
    945   return dict;
    946 }
    947 
    948 int SpdySession::GetPeerAddress(AddressList* address) const {
    949   if (!connection_->socket())
    950     return ERR_SOCKET_NOT_CONNECTED;
    951 
    952   return connection_->socket()->GetPeerAddress(address);
    953 }
    954 
    955 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
    956   if (!connection_->socket())
    957     return ERR_SOCKET_NOT_CONNECTED;
    958 
    959   return connection_->socket()->GetLocalAddress(address);
    960 }
    961 
    962 void SpdySession::ActivateStream(SpdyStream* stream) {
    963   const spdy::SpdyStreamId id = stream->stream_id();
    964   DCHECK(!IsStreamActive(id));
    965 
    966   active_streams_[id] = stream;
    967 }
    968 
    969 void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) {
    970   // For push streams, if they are being deleted normally, we leave
    971   // the stream in the unclaimed_pushed_streams_ list.  However, if
    972   // the stream is errored out, clean it up entirely.
    973   if (status != OK) {
    974     PushedStreamMap::iterator it;
    975     for (it = unclaimed_pushed_streams_.begin();
    976          it != unclaimed_pushed_streams_.end(); ++it) {
    977       scoped_refptr<SpdyStream> curr = it->second;
    978       if (id == curr->stream_id()) {
    979         unclaimed_pushed_streams_.erase(it);
    980         break;
    981       }
    982     }
    983   }
    984 
    985   // The stream might have been deleted.
    986   ActiveStreamMap::iterator it2 = active_streams_.find(id);
    987   if (it2 == active_streams_.end())
    988     return;
    989 
    990   // If this is an active stream, call the callback.
    991   const scoped_refptr<SpdyStream> stream(it2->second);
    992   active_streams_.erase(it2);
    993   if (stream)
    994     stream->OnClose(status);
    995   ProcessPendingCreateStreams();
    996 }
    997 
    998 void SpdySession::RemoveFromPool() {
    999   if (spdy_session_pool_) {
   1000     spdy_session_pool_->Remove(make_scoped_refptr(this));
   1001     spdy_session_pool_ = NULL;
   1002   }
   1003 }
   1004 
   1005 scoped_refptr<SpdyStream> SpdySession::GetActivePushStream(
   1006     const std::string& path) {
   1007   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
   1008 
   1009   PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(path);
   1010   if (it != unclaimed_pushed_streams_.end()) {
   1011     net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL);
   1012     scoped_refptr<SpdyStream> stream = it->second;
   1013     unclaimed_pushed_streams_.erase(it);
   1014     used_push_streams.Increment();
   1015     return stream;
   1016   }
   1017   return NULL;
   1018 }
   1019 
   1020 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
   1021   if (is_secure_) {
   1022     SSLClientSocket* ssl_socket =
   1023         reinterpret_cast<SSLClientSocket*>(connection_->socket());
   1024     ssl_socket->GetSSLInfo(ssl_info);
   1025     *was_npn_negotiated = ssl_socket->was_npn_negotiated();
   1026     return true;
   1027   }
   1028   return false;
   1029 }
   1030 
   1031 bool SpdySession::GetSSLCertRequestInfo(
   1032     SSLCertRequestInfo* cert_request_info) {
   1033   if (is_secure_) {
   1034     SSLClientSocket* ssl_socket =
   1035         reinterpret_cast<SSLClientSocket*>(connection_->socket());
   1036     ssl_socket->GetSSLCertRequestInfo(cert_request_info);
   1037     return true;
   1038   }
   1039   return false;
   1040 }
   1041 
   1042 void SpdySession::OnError(spdy::SpdyFramer* framer) {
   1043   CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
   1044 }
   1045 
   1046 void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id,
   1047                                     const char* data,
   1048                                     size_t len) {
   1049   if (net_log().IsLoggingAllEvents()) {
   1050     net_log().AddEvent(
   1051         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
   1052         make_scoped_refptr(new NetLogSpdyDataParameter(
   1053             stream_id, len, spdy::SpdyDataFlags())));
   1054   }
   1055 
   1056   if (!IsStreamActive(stream_id)) {
   1057     // NOTE:  it may just be that the stream was cancelled.
   1058     LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
   1059     return;
   1060   }
   1061 
   1062   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1063   stream->OnDataReceived(data, len);
   1064 }
   1065 
   1066 bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers,
   1067                           const scoped_refptr<SpdyStream> stream) {
   1068   int rv = OK;
   1069   rv = stream->OnResponseReceived(headers);
   1070   if (rv < 0) {
   1071     DCHECK_NE(rv, ERR_IO_PENDING);
   1072     const spdy::SpdyStreamId stream_id = stream->stream_id();
   1073     DeleteStream(stream_id, rv);
   1074     return false;
   1075   }
   1076   return true;
   1077 }
   1078 
   1079 void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame,
   1080                         const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
   1081   spdy::SpdyStreamId stream_id = frame.stream_id();
   1082   spdy::SpdyStreamId associated_stream_id = frame.associated_stream_id();
   1083 
   1084   if (net_log_.IsLoggingAllEvents()) {
   1085     net_log_.AddEvent(
   1086         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
   1087         make_scoped_refptr(new NetLogSpdySynParameter(
   1088             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
   1089             stream_id, associated_stream_id)));
   1090   }
   1091 
   1092   // Server-initiated streams should have even sequence numbers.
   1093   if ((stream_id & 0x1) != 0) {
   1094     LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
   1095     return;
   1096   }
   1097 
   1098   if (IsStreamActive(stream_id)) {
   1099     LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
   1100     return;
   1101   }
   1102 
   1103   if (associated_stream_id == 0) {
   1104     LOG(WARNING) << "Received invalid OnSyn associated stream id "
   1105                  << associated_stream_id
   1106                  << " for stream " << stream_id;
   1107     ResetStream(stream_id, spdy::INVALID_STREAM);
   1108     return;
   1109   }
   1110 
   1111   streams_pushed_count_++;
   1112 
   1113   // TODO(mbelshe): DCHECK that this is a GET method?
   1114 
   1115   // Verify that the response had a URL for us.
   1116   const std::string& url = ContainsKey(*headers, "url") ?
   1117       headers->find("url")->second : "";
   1118   if (url.empty()) {
   1119     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
   1120     LOG(WARNING) << "Pushed stream did not contain a url.";
   1121     return;
   1122   }
   1123 
   1124   GURL gurl(url);
   1125   if (!gurl.is_valid()) {
   1126     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
   1127     LOG(WARNING) << "Pushed stream url was invalid: " << url;
   1128     return;
   1129   }
   1130 
   1131   // Verify we have a valid stream association.
   1132   if (!IsStreamActive(associated_stream_id)) {
   1133     LOG(WARNING) << "Received OnSyn with inactive associated stream "
   1134                << associated_stream_id;
   1135     ResetStream(stream_id, spdy::INVALID_ASSOCIATED_STREAM);
   1136     return;
   1137   }
   1138 
   1139   scoped_refptr<SpdyStream> associated_stream =
   1140       active_streams_[associated_stream_id];
   1141   GURL associated_url(associated_stream->GetUrl());
   1142   if (associated_url.GetOrigin() != gurl.GetOrigin()) {
   1143     LOG(WARNING) << "Rejected Cross Origin Push Stream "
   1144                  << associated_stream_id;
   1145     ResetStream(stream_id, spdy::REFUSED_STREAM);
   1146     return;
   1147   }
   1148 
   1149   // There should not be an existing pushed stream with the same path.
   1150   PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url);
   1151   if (it != unclaimed_pushed_streams_.end()) {
   1152     LOG(WARNING) << "Received duplicate pushed stream with url: " << url;
   1153     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
   1154     return;
   1155   }
   1156 
   1157   scoped_refptr<SpdyStream> stream(
   1158       new SpdyStream(this, stream_id, true, net_log_));
   1159 
   1160   stream->set_path(gurl.PathForRequest());
   1161 
   1162   unclaimed_pushed_streams_[url] = stream;
   1163 
   1164   ActivateStream(stream);
   1165   stream->set_response_received();
   1166 
   1167   // Parse the headers.
   1168   if (!Respond(*headers, stream))
   1169     return;
   1170 
   1171   base::StatsCounter push_requests("spdy.pushed_streams");
   1172   push_requests.Increment();
   1173 }
   1174 
   1175 void SpdySession::OnSynReply(const spdy::SpdySynReplyControlFrame& frame,
   1176                              const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
   1177   spdy::SpdyStreamId stream_id = frame.stream_id();
   1178 
   1179   bool valid_stream = IsStreamActive(stream_id);
   1180   if (!valid_stream) {
   1181     // NOTE:  it may just be that the stream was cancelled.
   1182     LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
   1183     return;
   1184   }
   1185 
   1186   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1187   CHECK_EQ(stream->stream_id(), stream_id);
   1188   CHECK(!stream->cancelled());
   1189 
   1190   if (stream->response_received()) {
   1191     LOG(WARNING) << "Received duplicate SYN_REPLY for stream " << stream_id;
   1192     CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR);
   1193     return;
   1194   }
   1195   stream->set_response_received();
   1196 
   1197   if (net_log().IsLoggingAllEvents()) {
   1198     net_log().AddEvent(
   1199         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
   1200         make_scoped_refptr(new NetLogSpdySynParameter(
   1201             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
   1202             stream_id, 0)));
   1203   }
   1204 
   1205   Respond(*headers, stream);
   1206 }
   1207 
   1208 void SpdySession::OnHeaders(const spdy::SpdyHeadersControlFrame& frame,
   1209                             const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
   1210   spdy::SpdyStreamId stream_id = frame.stream_id();
   1211 
   1212   bool valid_stream = IsStreamActive(stream_id);
   1213   if (!valid_stream) {
   1214     // NOTE:  it may just be that the stream was cancelled.
   1215     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
   1216     return;
   1217   }
   1218 
   1219   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1220   CHECK_EQ(stream->stream_id(), stream_id);
   1221   CHECK(!stream->cancelled());
   1222 
   1223   if (net_log().IsLoggingAllEvents()) {
   1224     net_log().AddEvent(
   1225         NetLog::TYPE_SPDY_SESSION_HEADERS,
   1226         make_scoped_refptr(new NetLogSpdySynParameter(
   1227             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
   1228             stream_id, 0)));
   1229   }
   1230 
   1231   int rv = stream->OnHeaders(*headers);
   1232   if (rv < 0) {
   1233     DCHECK_NE(rv, ERR_IO_PENDING);
   1234     const spdy::SpdyStreamId stream_id = stream->stream_id();
   1235     DeleteStream(stream_id, rv);
   1236   }
   1237 }
   1238 
   1239 void SpdySession::OnControl(const spdy::SpdyControlFrame* frame) {
   1240   const linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
   1241   uint32 type = frame->type();
   1242   if (type == spdy::SYN_STREAM ||
   1243       type == spdy::SYN_REPLY ||
   1244       type == spdy::HEADERS) {
   1245     if (!spdy_framer_.ParseHeaderBlock(frame, headers.get())) {
   1246       LOG(WARNING) << "Could not parse Spdy Control Frame Header.";
   1247       int stream_id = 0;
   1248       if (type == spdy::SYN_STREAM) {
   1249         stream_id = (reinterpret_cast<const spdy::SpdySynStreamControlFrame*>
   1250                      (frame))->stream_id();
   1251       } else if (type == spdy::SYN_REPLY) {
   1252         stream_id = (reinterpret_cast<const spdy::SpdySynReplyControlFrame*>
   1253                      (frame))->stream_id();
   1254       } else if (type == spdy::HEADERS) {
   1255         stream_id = (reinterpret_cast<const spdy::SpdyHeadersControlFrame*>
   1256                      (frame))->stream_id();
   1257       }
   1258       if(IsStreamActive(stream_id))
   1259         ResetStream(stream_id, spdy::PROTOCOL_ERROR);
   1260       return;
   1261     }
   1262   }
   1263 
   1264   frames_received_++;
   1265 
   1266   switch (type) {
   1267     case spdy::GOAWAY:
   1268       OnGoAway(*reinterpret_cast<const spdy::SpdyGoAwayControlFrame*>(frame));
   1269       break;
   1270     case spdy::PING:
   1271       OnPing(*reinterpret_cast<const spdy::SpdyPingControlFrame*>(frame));
   1272       break;
   1273     case spdy::SETTINGS:
   1274       OnSettings(
   1275           *reinterpret_cast<const spdy::SpdySettingsControlFrame*>(frame));
   1276       break;
   1277     case spdy::RST_STREAM:
   1278       OnRst(*reinterpret_cast<const spdy::SpdyRstStreamControlFrame*>(frame));
   1279       break;
   1280     case spdy::SYN_STREAM:
   1281       OnSyn(*reinterpret_cast<const spdy::SpdySynStreamControlFrame*>(frame),
   1282             headers);
   1283       break;
   1284     case spdy::HEADERS:
   1285       OnHeaders(*reinterpret_cast<const spdy::SpdyHeadersControlFrame*>(frame),
   1286                 headers);
   1287       break;
   1288     case spdy::SYN_REPLY:
   1289       OnSynReply(
   1290           *reinterpret_cast<const spdy::SpdySynReplyControlFrame*>(frame),
   1291           headers);
   1292       break;
   1293     case spdy::WINDOW_UPDATE:
   1294       OnWindowUpdate(
   1295           *reinterpret_cast<const spdy::SpdyWindowUpdateControlFrame*>(frame));
   1296       break;
   1297     default:
   1298       DCHECK(false);  // Error!
   1299   }
   1300 }
   1301 
   1302 bool SpdySession::OnControlFrameHeaderData(spdy::SpdyStreamId stream_id,
   1303                                            const char* header_data,
   1304                                            size_t len) {
   1305   DCHECK(false);
   1306   return false;
   1307 }
   1308 
   1309 void SpdySession::OnDataFrameHeader(const spdy::SpdyDataFrame* frame) {
   1310   DCHECK(false);
   1311 }
   1312 
   1313 void SpdySession::OnRst(const spdy::SpdyRstStreamControlFrame& frame) {
   1314   spdy::SpdyStreamId stream_id = frame.stream_id();
   1315 
   1316   net_log().AddEvent(
   1317       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
   1318       make_scoped_refptr(
   1319           new NetLogSpdyRstParameter(stream_id, frame.status())));
   1320 
   1321   bool valid_stream = IsStreamActive(stream_id);
   1322   if (!valid_stream) {
   1323     // NOTE:  it may just be that the stream was cancelled.
   1324     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
   1325     return;
   1326   }
   1327   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1328   CHECK_EQ(stream->stream_id(), stream_id);
   1329   CHECK(!stream->cancelled());
   1330 
   1331   if (frame.status() == 0) {
   1332     stream->OnDataReceived(NULL, 0);
   1333   } else {
   1334     LOG(ERROR) << "Spdy stream closed: " << frame.status();
   1335     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
   1336     //                For now, it doesn't matter much - it is a protocol error.
   1337     DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
   1338   }
   1339 }
   1340 
   1341 void SpdySession::OnGoAway(const spdy::SpdyGoAwayControlFrame& frame) {
   1342   net_log_.AddEvent(
   1343       NetLog::TYPE_SPDY_SESSION_GOAWAY,
   1344       make_scoped_refptr(
   1345           new NetLogSpdyGoAwayParameter(frame.last_accepted_stream_id(),
   1346                                         active_streams_.size(),
   1347                                         unclaimed_pushed_streams_.size())));
   1348   RemoveFromPool();
   1349   CloseAllStreams(net::ERR_ABORTED);
   1350 
   1351   // TODO(willchan): Cancel any streams that are past the GoAway frame's
   1352   // |last_accepted_stream_id|.
   1353 
   1354   // Don't bother killing any streams that are still reading.  They'll either
   1355   // complete successfully or get an ERR_CONNECTION_CLOSED when the socket is
   1356   // closed.
   1357 }
   1358 
   1359 void SpdySession::OnPing(const spdy::SpdyPingControlFrame& frame) {
   1360   net_log_.AddEvent(
   1361       NetLog::TYPE_SPDY_SESSION_PING,
   1362       make_scoped_refptr(new NetLogSpdyPingParameter(frame.unique_id())));
   1363 
   1364   // Send response to a PING from server.
   1365   if (frame.unique_id() % 2 == 0) {
   1366     WritePingFrame(frame.unique_id());
   1367     return;
   1368   }
   1369 
   1370   --pings_in_flight_;
   1371   if (pings_in_flight_ < 0) {
   1372     CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
   1373     return;
   1374   }
   1375 
   1376   if (pings_in_flight_ > 0)
   1377     return;
   1378 
   1379   if (!need_to_send_ping_)
   1380     return;
   1381 
   1382   PlanToSendTrailingPing();
   1383 }
   1384 
   1385 void SpdySession::OnSettings(const spdy::SpdySettingsControlFrame& frame) {
   1386   spdy::SpdySettings settings;
   1387   if (spdy_framer_.ParseSettings(&frame, &settings)) {
   1388     HandleSettings(settings);
   1389     spdy_settings_->Set(host_port_pair(), settings);
   1390   }
   1391 
   1392   received_settings_ = true;
   1393 
   1394   net_log_.AddEvent(
   1395       NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
   1396       make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
   1397 }
   1398 
   1399 void SpdySession::OnWindowUpdate(
   1400     const spdy::SpdyWindowUpdateControlFrame& frame) {
   1401   spdy::SpdyStreamId stream_id = frame.stream_id();
   1402   if (!IsStreamActive(stream_id)) {
   1403     LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
   1404     return;
   1405   }
   1406 
   1407   int delta_window_size = static_cast<int>(frame.delta_window_size());
   1408   if (delta_window_size < 1) {
   1409     LOG(WARNING) << "Received WINDOW_UPDATE with an invalid delta_window_size "
   1410                  << delta_window_size;
   1411     ResetStream(stream_id, spdy::FLOW_CONTROL_ERROR);
   1412     return;
   1413   }
   1414 
   1415   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1416   CHECK_EQ(stream->stream_id(), stream_id);
   1417   CHECK(!stream->cancelled());
   1418 
   1419   if (use_flow_control_)
   1420     stream->IncreaseSendWindowSize(delta_window_size);
   1421 
   1422   net_log_.AddEvent(
   1423       NetLog::TYPE_SPDY_SESSION_SEND_WINDOW_UPDATE,
   1424       make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
   1425           stream_id, delta_window_size, stream->send_window_size())));
   1426 }
   1427 
   1428 void SpdySession::SendWindowUpdate(spdy::SpdyStreamId stream_id,
   1429                                    int delta_window_size) {
   1430   DCHECK(IsStreamActive(stream_id));
   1431   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1432   CHECK_EQ(stream->stream_id(), stream_id);
   1433 
   1434   net_log_.AddEvent(
   1435       NetLog::TYPE_SPDY_SESSION_RECV_WINDOW_UPDATE,
   1436       make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
   1437           stream_id, delta_window_size, stream->recv_window_size())));
   1438 
   1439   scoped_ptr<spdy::SpdyWindowUpdateControlFrame> window_update_frame(
   1440       spdy_framer_.CreateWindowUpdate(stream_id, delta_window_size));
   1441   QueueFrame(window_update_frame.get(), stream->priority(), stream);
   1442 }
   1443 
   1444 // Given a cwnd that we would have sent to the server, modify it based on the
   1445 // field trial policy.
   1446 uint32 ApplyCwndFieldTrialPolicy(int cwnd) {
   1447   base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd");
   1448   if (!trial) {
   1449       LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList";
   1450       return cwnd;
   1451   }
   1452   if (trial->group_name() == "cwnd10")
   1453     return 10;
   1454   else if (trial->group_name() == "cwnd16")
   1455     return 16;
   1456   else if (trial->group_name() == "cwndMin16")
   1457     return std::max(cwnd, 16);
   1458   else if (trial->group_name() == "cwndMin10")
   1459     return std::max(cwnd, 10);
   1460   else if (trial->group_name() == "cwndDynamic")
   1461     return cwnd;
   1462   NOTREACHED();
   1463   return cwnd;
   1464 }
   1465 
   1466 void SpdySession::SendSettings() {
   1467   // Note:  we're copying the settings here, so that we can potentially modify
   1468   // the settings for the field trial.  When removing the field trial, make
   1469   // this a reference to the const SpdySettings again.
   1470   spdy::SpdySettings settings = spdy_settings_->Get(host_port_pair());
   1471   if (settings.empty())
   1472     return;
   1473 
   1474   // Record Histogram Data and Apply the SpdyCwnd FieldTrial if applicable.
   1475   for (spdy::SpdySettings::iterator i = settings.begin(),
   1476            end = settings.end(); i != end; ++i) {
   1477     const uint32 id = i->first.id();
   1478     const uint32 val = i->second;
   1479     switch (id) {
   1480       case spdy::SETTINGS_CURRENT_CWND:
   1481         uint32 cwnd = 0;
   1482         cwnd = ApplyCwndFieldTrialPolicy(val);
   1483         UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent",
   1484                                     cwnd,
   1485                                     1, 200, 100);
   1486         if (cwnd != val) {
   1487           i->second = cwnd;
   1488           i->first.set_flags(spdy::SETTINGS_FLAG_PLEASE_PERSIST);
   1489           spdy_settings_->Set(host_port_pair(), settings);
   1490         }
   1491         break;
   1492     }
   1493   }
   1494 
   1495   HandleSettings(settings);
   1496 
   1497   net_log_.AddEvent(
   1498       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
   1499       make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
   1500 
   1501   // Create the SETTINGS frame and send it.
   1502   scoped_ptr<spdy::SpdySettingsControlFrame> settings_frame(
   1503       spdy_framer_.CreateSettings(settings));
   1504   sent_settings_ = true;
   1505   QueueFrame(settings_frame.get(), 0, NULL);
   1506 }
   1507 
   1508 void SpdySession::HandleSettings(const spdy::SpdySettings& settings) {
   1509   for (spdy::SpdySettings::const_iterator i = settings.begin(),
   1510            end = settings.end(); i != end; ++i) {
   1511     const uint32 id = i->first.id();
   1512     const uint32 val = i->second;
   1513     switch (id) {
   1514       case spdy::SETTINGS_MAX_CONCURRENT_STREAMS:
   1515         max_concurrent_streams_ = std::min(static_cast<size_t>(val),
   1516                                            max_concurrent_stream_limit_);
   1517         ProcessPendingCreateStreams();
   1518         break;
   1519     }
   1520   }
   1521 }
   1522 
   1523 void SpdySession::SendPrefacePingIfNoneInFlight() {
   1524   if (pings_in_flight_ || trailing_ping_pending_ ||
   1525       !enable_ping_based_connection_checking_)
   1526     return;
   1527 
   1528   const base::TimeDelta kConnectionAtRiskOfLoss =
   1529       base::TimeDelta::FromMilliseconds(connection_at_risk_of_loss_ms_);
   1530 
   1531   base::TimeTicks now = base::TimeTicks::Now();
   1532   // If we haven't heard from server, then send a preface-PING.
   1533   if ((now - received_data_time_) > kConnectionAtRiskOfLoss)
   1534     SendPrefacePing();
   1535 
   1536   PlanToSendTrailingPing();
   1537 }
   1538 
   1539 void SpdySession::SendPrefacePing() {
   1540   // TODO(rtenneti): Send preface pings when more servers support additional
   1541   // pings.
   1542   // WritePingFrame(next_ping_id_);
   1543 }
   1544 
   1545 void SpdySession::PlanToSendTrailingPing() {
   1546   if (trailing_ping_pending_)
   1547     return;
   1548 
   1549   trailing_ping_pending_ = true;
   1550   MessageLoop::current()->PostDelayedTask(
   1551       FROM_HERE,
   1552       method_factory_.NewRunnableMethod(&SpdySession::SendTrailingPing),
   1553       trailing_ping_delay_time_ms_);
   1554 }
   1555 
   1556 void SpdySession::SendTrailingPing() {
   1557   DCHECK(trailing_ping_pending_);
   1558   trailing_ping_pending_ = false;
   1559   WritePingFrame(next_ping_id_);
   1560 }
   1561 
   1562 void SpdySession::WritePingFrame(uint32 unique_id) {
   1563   scoped_ptr<spdy::SpdyPingControlFrame> ping_frame(
   1564       spdy_framer_.CreatePingFrame(next_ping_id_));
   1565   QueueFrame(ping_frame.get(), SPDY_PRIORITY_HIGHEST, NULL);
   1566 
   1567   if (net_log().IsLoggingAllEvents()) {
   1568     net_log().AddEvent(
   1569         NetLog::TYPE_SPDY_SESSION_PING,
   1570         make_scoped_refptr(new NetLogSpdyPingParameter(next_ping_id_)));
   1571   }
   1572   if (unique_id % 2 != 0) {
   1573     next_ping_id_ += 2;
   1574     ++pings_in_flight_;
   1575     need_to_send_ping_ = false;
   1576     PlanToCheckPingStatus();
   1577   }
   1578 }
   1579 
   1580 void SpdySession::PlanToCheckPingStatus() {
   1581   if (check_ping_status_pending_)
   1582     return;
   1583 
   1584   check_ping_status_pending_ = true;
   1585   MessageLoop::current()->PostDelayedTask(
   1586       FROM_HERE,
   1587       method_factory_.NewRunnableMethod(
   1588           &SpdySession::CheckPingStatus, base::TimeTicks::Now()),
   1589       hung_interval_ms_);
   1590 }
   1591 
   1592 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
   1593   // Check if we got a response back for all PINGs we had sent.
   1594   if (pings_in_flight_ == 0) {
   1595     check_ping_status_pending_ = false;
   1596     return;
   1597   }
   1598 
   1599   DCHECK(check_ping_status_pending_);
   1600 
   1601   const base::TimeDelta kHungInterval =
   1602       base::TimeDelta::FromMilliseconds(hung_interval_ms_);
   1603 
   1604   base::TimeTicks now = base::TimeTicks::Now();
   1605   base::TimeDelta delay = kHungInterval - (now - received_data_time_);
   1606 
   1607   if (delay.InMilliseconds() < 0 || received_data_time_ < last_check_time) {
   1608     DCHECK(now - received_data_time_ > kHungInterval);
   1609     CloseSessionOnError(net::ERR_SPDY_PING_FAILED, true);
   1610     return;
   1611   }
   1612 
   1613   // Check the status of connection after a delay.
   1614   MessageLoop::current()->PostDelayedTask(
   1615       FROM_HERE,
   1616       method_factory_.NewRunnableMethod(&SpdySession::CheckPingStatus, now),
   1617       delay.InMilliseconds());
   1618 }
   1619 
   1620 void SpdySession::RecordHistograms() {
   1621   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
   1622                               streams_initiated_count_,
   1623                               0, 300, 50);
   1624   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
   1625                               streams_pushed_count_,
   1626                               0, 300, 50);
   1627   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
   1628                               streams_pushed_and_claimed_count_,
   1629                               0, 300, 50);
   1630   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
   1631                               streams_abandoned_count_,
   1632                               0, 300, 50);
   1633   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
   1634                             sent_settings_ ? 1 : 0, 2);
   1635   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
   1636                             received_settings_ ? 1 : 0, 2);
   1637   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
   1638                               stalled_streams_,
   1639                               0, 300, 50);
   1640   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
   1641                             stalled_streams_ > 0 ? 1 : 0, 2);
   1642 
   1643   if (received_settings_) {
   1644     // Enumerate the saved settings, and set histograms for it.
   1645     const spdy::SpdySettings& settings = spdy_settings_->Get(host_port_pair());
   1646 
   1647     spdy::SpdySettings::const_iterator it;
   1648     for (it = settings.begin(); it != settings.end(); ++it) {
   1649       const spdy::SpdySetting setting = *it;
   1650       switch (setting.first.id()) {
   1651         case spdy::SETTINGS_CURRENT_CWND:
   1652           // Record several different histograms to see if cwnd converges
   1653           // for larger volumes of data being sent.
   1654           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
   1655                                       setting.second,
   1656                                       1, 200, 100);
   1657           if (bytes_received_ > 10 * 1024) {
   1658             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
   1659                                         setting.second,
   1660                                         1, 200, 100);
   1661             if (bytes_received_ > 25 * 1024) {
   1662               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
   1663                                           setting.second,
   1664                                           1, 200, 100);
   1665               if (bytes_received_ > 50 * 1024) {
   1666                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
   1667                                             setting.second,
   1668                                             1, 200, 100);
   1669                 if (bytes_received_ > 100 * 1024) {
   1670                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
   1671                                               setting.second,
   1672                                               1, 200, 100);
   1673                 }
   1674               }
   1675             }
   1676           }
   1677           break;
   1678         case spdy::SETTINGS_ROUND_TRIP_TIME:
   1679           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
   1680                                       setting.second,
   1681                                       1, 1200, 100);
   1682           break;
   1683         case spdy::SETTINGS_DOWNLOAD_RETRANS_RATE:
   1684           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
   1685                                       setting.second,
   1686                                       1, 100, 50);
   1687           break;
   1688       }
   1689     }
   1690   }
   1691 }
   1692 
   1693 void SpdySession::InvokeUserStreamCreationCallback(
   1694     scoped_refptr<SpdyStream>* stream) {
   1695   PendingCallbackMap::iterator it = pending_callback_map_.find(stream);
   1696 
   1697   // Exit if the request has already been cancelled.
   1698   if (it == pending_callback_map_.end())
   1699     return;
   1700 
   1701   CompletionCallback* callback = it->second.callback;
   1702   int result = it->second.result;
   1703   pending_callback_map_.erase(it);
   1704   callback->Run(result);
   1705 }
   1706 
   1707 }  // namespace net
   1708