Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_session.h"
      6 
      7 #include <algorithm>
      8 #include <map>
      9 
     10 #include "base/basictypes.h"
     11 #include "base/bind.h"
     12 #include "base/compiler_specific.h"
     13 #include "base/logging.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "base/metrics/field_trial.h"
     16 #include "base/metrics/histogram.h"
     17 #include "base/metrics/sparse_histogram.h"
     18 #include "base/metrics/stats_counters.h"
     19 #include "base/stl_util.h"
     20 #include "base/strings/string_number_conversions.h"
     21 #include "base/strings/string_util.h"
     22 #include "base/strings/stringprintf.h"
     23 #include "base/strings/utf_string_conversions.h"
     24 #include "base/time/time.h"
     25 #include "base/values.h"
     26 #include "crypto/ec_private_key.h"
     27 #include "crypto/ec_signature_creator.h"
     28 #include "net/base/connection_type_histograms.h"
     29 #include "net/base/net_log.h"
     30 #include "net/base/net_util.h"
     31 #include "net/cert/asn1_util.h"
     32 #include "net/http/http_network_session.h"
     33 #include "net/http/http_server_properties.h"
     34 #include "net/spdy/spdy_buffer_producer.h"
     35 #include "net/spdy/spdy_credential_builder.h"
     36 #include "net/spdy/spdy_frame_builder.h"
     37 #include "net/spdy/spdy_http_utils.h"
     38 #include "net/spdy/spdy_protocol.h"
     39 #include "net/spdy/spdy_session_pool.h"
     40 #include "net/spdy/spdy_stream.h"
     41 #include "net/ssl/server_bound_cert_service.h"
     42 
     43 namespace net {
     44 
     45 namespace {
     46 
     47 const int kReadBufferSize = 8 * 1024;
     48 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
     49 const int kHungIntervalSeconds = 10;
     50 
     51 // Always start at 1 for the first stream id.
     52 const SpdyStreamId kFirstStreamId = 1;
     53 
     54 // Minimum seconds that unclaimed pushed streams will be kept in memory.
     55 const int kMinPushedStreamLifetimeSeconds = 300;
     56 
     57 base::Value* NetLogSpdySynCallback(const SpdyHeaderBlock* headers,
     58                                    bool fin,
     59                                    bool unidirectional,
     60                                    SpdyStreamId stream_id,
     61                                    SpdyStreamId associated_stream,
     62                                    NetLog::LogLevel /* log_level */) {
     63   base::DictionaryValue* dict = new base::DictionaryValue();
     64   base::ListValue* headers_list = new base::ListValue();
     65   for (SpdyHeaderBlock::const_iterator it = headers->begin();
     66        it != headers->end(); ++it) {
     67     headers_list->Append(new base::StringValue(base::StringPrintf(
     68         "%s: %s", it->first.c_str(),
     69         (ShouldShowHttpHeaderValue(
     70             it->first) ? it->second : "[elided]").c_str())));
     71   }
     72   dict->SetBoolean("fin", fin);
     73   dict->SetBoolean("unidirectional", unidirectional);
     74   dict->Set("headers", headers_list);
     75   dict->SetInteger("stream_id", stream_id);
     76   if (associated_stream)
     77     dict->SetInteger("associated_stream", associated_stream);
     78   return dict;
     79 }
     80 
     81 base::Value* NetLogSpdyCredentialCallback(size_t slot,
     82                                           const std::string* origin,
     83                                           NetLog::LogLevel /* log_level */) {
     84   base::DictionaryValue* dict = new base::DictionaryValue();
     85   dict->SetInteger("slot", slot);
     86   dict->SetString("origin", *origin);
     87   return dict;
     88 }
     89 
     90 base::Value* NetLogSpdySessionCloseCallback(int net_error,
     91                                             const std::string* description,
     92                                             NetLog::LogLevel /* log_level */) {
     93   base::DictionaryValue* dict = new base::DictionaryValue();
     94   dict->SetInteger("net_error", net_error);
     95   dict->SetString("description", *description);
     96   return dict;
     97 }
     98 
     99 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
    100                                        NetLog::LogLevel /* log_level */) {
    101   base::DictionaryValue* dict = new base::DictionaryValue();
    102   dict->SetString("host", host_pair->first.ToString());
    103   dict->SetString("proxy", host_pair->second.ToPacString());
    104   return dict;
    105 }
    106 
    107 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
    108                                         bool clear_persisted,
    109                                         NetLog::LogLevel /* log_level */) {
    110   base::DictionaryValue* dict = new base::DictionaryValue();
    111   dict->SetString("host", host_port_pair.ToString());
    112   dict->SetBoolean("clear_persisted", clear_persisted);
    113   return dict;
    114 }
    115 
    116 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
    117                                        SpdySettingsFlags flags,
    118                                        uint32 value,
    119                                        NetLog::LogLevel /* log_level */) {
    120   base::DictionaryValue* dict = new base::DictionaryValue();
    121   dict->SetInteger("id", id);
    122   dict->SetInteger("flags", flags);
    123   dict->SetInteger("value", value);
    124   return dict;
    125 }
    126 
    127 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings,
    128                                             NetLog::LogLevel /* log_level */) {
    129   base::DictionaryValue* dict = new base::DictionaryValue();
    130   base::ListValue* settings_list = new base::ListValue();
    131   for (SettingsMap::const_iterator it = settings->begin();
    132        it != settings->end(); ++it) {
    133     const SpdySettingsIds id = it->first;
    134     const SpdySettingsFlags flags = it->second.first;
    135     const uint32 value = it->second.second;
    136     settings_list->Append(new base::StringValue(
    137         base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value)));
    138   }
    139   dict->Set("settings", settings_list);
    140   return dict;
    141 }
    142 
    143 base::Value* NetLogSpdyWindowUpdateFrameCallback(
    144     SpdyStreamId stream_id,
    145     uint32 delta,
    146     NetLog::LogLevel /* log_level */) {
    147   base::DictionaryValue* dict = new base::DictionaryValue();
    148   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    149   dict->SetInteger("delta", delta);
    150   return dict;
    151 }
    152 
    153 base::Value* NetLogSpdySessionWindowUpdateCallback(
    154     int32 delta,
    155     int32 window_size,
    156     NetLog::LogLevel /* log_level */) {
    157   base::DictionaryValue* dict = new base::DictionaryValue();
    158   dict->SetInteger("delta", delta);
    159   dict->SetInteger("window_size", window_size);
    160   return dict;
    161 }
    162 
    163 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
    164                                     int size,
    165                                     bool fin,
    166                                     NetLog::LogLevel /* log_level */) {
    167   base::DictionaryValue* dict = new base::DictionaryValue();
    168   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    169   dict->SetInteger("size", size);
    170   dict->SetBoolean("fin", fin);
    171   return dict;
    172 }
    173 
    174 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
    175                                    int status,
    176                                    const std::string* description,
    177                                    NetLog::LogLevel /* log_level */) {
    178   base::DictionaryValue* dict = new base::DictionaryValue();
    179   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    180   dict->SetInteger("status", status);
    181   dict->SetString("description", *description);
    182   return dict;
    183 }
    184 
    185 base::Value* NetLogSpdyPingCallback(uint32 unique_id,
    186                                     const char* type,
    187                                     NetLog::LogLevel /* log_level */) {
    188   base::DictionaryValue* dict = new base::DictionaryValue();
    189   dict->SetInteger("unique_id", unique_id);
    190   dict->SetString("type", type);
    191   return dict;
    192 }
    193 
    194 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
    195                                       int active_streams,
    196                                       int unclaimed_streams,
    197                                       SpdyGoAwayStatus status,
    198                                       NetLog::LogLevel /* log_level */) {
    199   base::DictionaryValue* dict = new base::DictionaryValue();
    200   dict->SetInteger("last_accepted_stream_id",
    201                    static_cast<int>(last_stream_id));
    202   dict->SetInteger("active_streams", active_streams);
    203   dict->SetInteger("unclaimed_streams", unclaimed_streams);
    204   dict->SetInteger("status", static_cast<int>(status));
    205   return dict;
    206 }
    207 
    208 // The maximum number of concurrent streams we will ever create.  Even if
    209 // the server permits more, we will never exceed this limit.
    210 const size_t kMaxConcurrentStreamLimit = 256;
    211 
    212 }  // namespace
    213 
    214 SpdyStreamRequest::SpdyStreamRequest() {
    215   Reset();
    216 }
    217 
    218 SpdyStreamRequest::~SpdyStreamRequest() {
    219   CancelRequest();
    220 }
    221 
    222 int SpdyStreamRequest::StartRequest(
    223     SpdyStreamType type,
    224     const base::WeakPtr<SpdySession>& session,
    225     const GURL& url,
    226     RequestPriority priority,
    227     const BoundNetLog& net_log,
    228     const CompletionCallback& callback) {
    229   DCHECK(session.get());
    230   DCHECK(!session_.get());
    231   DCHECK(!stream_.get());
    232   DCHECK(callback_.is_null());
    233 
    234   type_ = type;
    235   session_ = session;
    236   url_ = url;
    237   priority_ = priority;
    238   net_log_ = net_log;
    239   callback_ = callback;
    240 
    241   base::WeakPtr<SpdyStream> stream;
    242   int rv = session->TryCreateStream(this, &stream);
    243   if (rv == OK) {
    244     Reset();
    245     stream_ = stream;
    246   }
    247   return rv;
    248 }
    249 
    250 void SpdyStreamRequest::CancelRequest() {
    251   if (session_.get())
    252     session_->CancelStreamRequest(this);
    253   Reset();
    254 }
    255 
    256 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
    257   DCHECK(!session_.get());
    258   base::WeakPtr<SpdyStream> stream = stream_;
    259   DCHECK(stream.get());
    260   Reset();
    261   return stream;
    262 }
    263 
    264 void SpdyStreamRequest::OnRequestCompleteSuccess(
    265     base::WeakPtr<SpdyStream>* stream) {
    266   DCHECK(session_.get());
    267   DCHECK(!stream_.get());
    268   DCHECK(!callback_.is_null());
    269   CompletionCallback callback = callback_;
    270   Reset();
    271   DCHECK(*stream);
    272   stream_ = *stream;
    273   callback.Run(OK);
    274 }
    275 
    276 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
    277   DCHECK(session_.get());
    278   DCHECK(!stream_.get());
    279   DCHECK(!callback_.is_null());
    280   CompletionCallback callback = callback_;
    281   Reset();
    282   DCHECK_NE(rv, OK);
    283   callback.Run(rv);
    284 }
    285 
    286 void SpdyStreamRequest::Reset() {
    287   type_ = SPDY_BIDIRECTIONAL_STREAM;
    288   session_.reset();
    289   stream_.reset();
    290   url_ = GURL();
    291   priority_ = MINIMUM_PRIORITY;
    292   net_log_ = BoundNetLog();
    293   callback_.Reset();
    294 }
    295 
    296 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
    297     : stream(NULL),
    298       waiting_for_syn_reply(false) {}
    299 
    300 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
    301     : stream(stream),
    302       waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {}
    303 
    304 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
    305 
    306 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
    307 
    308 SpdySession::PushedStreamInfo::PushedStreamInfo(
    309     SpdyStreamId stream_id,
    310     base::TimeTicks creation_time)
    311     : stream_id(stream_id),
    312       creation_time(creation_time) {}
    313 
    314 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
    315 
    316 SpdySession::SpdySession(
    317     const SpdySessionKey& spdy_session_key,
    318     const base::WeakPtr<HttpServerProperties>& http_server_properties,
    319     bool verify_domain_authentication,
    320     bool enable_sending_initial_data,
    321     bool enable_credential_frames,
    322     bool enable_compression,
    323     bool enable_ping_based_connection_checking,
    324     NextProto default_protocol,
    325     size_t stream_initial_recv_window_size,
    326     size_t initial_max_concurrent_streams,
    327     size_t max_concurrent_streams_limit,
    328     TimeFunc time_func,
    329     const HostPortPair& trusted_spdy_proxy,
    330     NetLog* net_log)
    331     : weak_factory_(this),
    332       in_io_loop_(false),
    333       spdy_session_key_(spdy_session_key),
    334       pool_(NULL),
    335       http_server_properties_(http_server_properties),
    336       read_buffer_(new IOBuffer(kReadBufferSize)),
    337       stream_hi_water_mark_(kFirstStreamId),
    338       in_flight_write_frame_type_(DATA),
    339       in_flight_write_frame_size_(0),
    340       is_secure_(false),
    341       certificate_error_code_(OK),
    342       availability_state_(STATE_AVAILABLE),
    343       read_state_(READ_STATE_DO_READ),
    344       write_state_(WRITE_STATE_IDLE),
    345       error_on_close_(OK),
    346       max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
    347                               kInitialMaxConcurrentStreams :
    348                               initial_max_concurrent_streams),
    349       max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ?
    350                                     kMaxConcurrentStreamLimit :
    351                                     max_concurrent_streams_limit),
    352       streams_initiated_count_(0),
    353       streams_pushed_count_(0),
    354       streams_pushed_and_claimed_count_(0),
    355       streams_abandoned_count_(0),
    356       total_bytes_received_(0),
    357       sent_settings_(false),
    358       received_settings_(false),
    359       stalled_streams_(0),
    360       pings_in_flight_(0),
    361       next_ping_id_(1),
    362       last_activity_time_(time_func()),
    363       check_ping_status_pending_(false),
    364       send_connection_header_prefix_(false),
    365       flow_control_state_(FLOW_CONTROL_NONE),
    366       stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
    367       stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 ?
    368                                        kDefaultInitialRecvWindowSize :
    369                                        stream_initial_recv_window_size),
    370       session_send_window_size_(0),
    371       session_recv_window_size_(0),
    372       session_unacked_recv_window_bytes_(0),
    373       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
    374       verify_domain_authentication_(verify_domain_authentication),
    375       enable_sending_initial_data_(enable_sending_initial_data),
    376       enable_credential_frames_(enable_credential_frames),
    377       enable_compression_(enable_compression),
    378       enable_ping_based_connection_checking_(
    379           enable_ping_based_connection_checking),
    380       protocol_(default_protocol),
    381       credential_state_(SpdyCredentialState::kDefaultNumSlots),
    382       connection_at_risk_of_loss_time_(
    383           base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
    384       hung_interval_(
    385           base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
    386       trusted_spdy_proxy_(trusted_spdy_proxy),
    387       time_func_(time_func) {
    388   // TODO(akalin): Change this to kProtoSPDYMinimumVersion once we
    389   // stop supporting SPDY/1.
    390   DCHECK_GE(protocol_, kProtoSPDY2);
    391   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
    392   DCHECK(HttpStreamFactory::spdy_enabled());
    393   net_log_.BeginEvent(
    394       NetLog::TYPE_SPDY_SESSION,
    395       base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
    396   next_unclaimed_push_stream_sweep_time_ = time_func_() +
    397       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
    398   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
    399 }
    400 
    401 SpdySession::~SpdySession() {
    402   CHECK(!in_io_loop_);
    403   DCHECK(!pool_);
    404   DcheckClosed();
    405 
    406   // TODO(akalin): Check connection->is_initialized() instead. This
    407   // requires re-working CreateFakeSpdySession(), though.
    408   DCHECK(connection_->socket());
    409   // With SPDY we can't recycle sockets.
    410   connection_->socket()->Disconnect();
    411 
    412   RecordHistograms();
    413 
    414   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
    415 }
    416 
    417 Error SpdySession::InitializeWithSocket(
    418     scoped_ptr<ClientSocketHandle> connection,
    419     SpdySessionPool* pool,
    420     bool is_secure,
    421     int certificate_error_code) {
    422   CHECK(!in_io_loop_);
    423   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
    424   DCHECK_EQ(read_state_, READ_STATE_DO_READ);
    425   DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
    426   DCHECK(!connection_);
    427 
    428   DCHECK(certificate_error_code == OK ||
    429          certificate_error_code < ERR_IO_PENDING);
    430   // TODO(akalin): Check connection->is_initialized() instead. This
    431   // requires re-working CreateFakeSpdySession(), though.
    432   DCHECK(connection->socket());
    433 
    434   base::StatsCounter spdy_sessions("spdy.sessions");
    435   spdy_sessions.Increment();
    436 
    437   connection_ = connection.Pass();
    438   is_secure_ = is_secure;
    439   certificate_error_code_ = certificate_error_code;
    440 
    441   NextProto protocol_negotiated =
    442       connection_->socket()->GetNegotiatedProtocol();
    443   if (protocol_negotiated != kProtoUnknown) {
    444     protocol_ = protocol_negotiated;
    445   }
    446   // TODO(akalin): Change this to kProtoSPDYMinimumVersion once we
    447   // stop supporting SPDY/1.
    448   DCHECK_GE(protocol_, kProtoSPDY2);
    449   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
    450 
    451   SSLClientSocket* ssl_socket = GetSSLClientSocket();
    452   if (ssl_socket && ssl_socket->WasChannelIDSent()) {
    453     // According to the SPDY spec, the credential associated with the TLS
    454     // connection is stored in slot[1].
    455     credential_state_.SetHasCredential(GURL("https://" +
    456                                             host_port_pair().ToString()));
    457   }
    458 
    459   if (protocol_ == kProtoHTTP2Draft04)
    460     send_connection_header_prefix_ = true;
    461 
    462   if (protocol_ >= kProtoSPDY31) {
    463     flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
    464     session_send_window_size_ = kSpdySessionInitialWindowSize;
    465     session_recv_window_size_ = kSpdySessionInitialWindowSize;
    466   } else if (protocol_ >= kProtoSPDY3) {
    467     flow_control_state_ = FLOW_CONTROL_STREAM;
    468   } else {
    469     flow_control_state_ = FLOW_CONTROL_NONE;
    470   }
    471 
    472   buffered_spdy_framer_.reset(
    473       new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
    474                              enable_compression_));
    475   buffered_spdy_framer_->set_visitor(this);
    476   buffered_spdy_framer_->set_debug_visitor(this);
    477   UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion);
    478 #if defined(SPDY_PROXY_AUTH_ORIGIN)
    479   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
    480                         host_port_pair().Equals(HostPortPair::FromURL(
    481                             GURL(SPDY_PROXY_AUTH_ORIGIN))));
    482 #endif
    483 
    484   net_log_.AddEvent(
    485       NetLog::TYPE_SPDY_SESSION_INITIALIZED,
    486       connection_->socket()->NetLog().source().ToEventParametersCallback());
    487 
    488   int error = DoReadLoop(READ_STATE_DO_READ, OK);
    489   if (error == ERR_IO_PENDING)
    490     error = OK;
    491   if (error == OK) {
    492     DCHECK_NE(availability_state_, STATE_CLOSED);
    493     connection_->AddLayeredPool(this);
    494     if (enable_sending_initial_data_)
    495       SendInitialData();
    496     pool_ = pool;
    497   } else {
    498     DcheckClosed();
    499   }
    500   return static_cast<Error>(error);
    501 }
    502 
    503 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
    504   if (!verify_domain_authentication_)
    505     return true;
    506 
    507   if (availability_state_ == STATE_CLOSED)
    508     return false;
    509 
    510   SSLInfo ssl_info;
    511   bool was_npn_negotiated;
    512   NextProto protocol_negotiated = kProtoUnknown;
    513   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
    514     return true;   // This is not a secure session, so all domains are okay.
    515 
    516   return !ssl_info.client_cert_sent &&
    517       (enable_credential_frames_ || !ssl_info.channel_id_sent ||
    518        ServerBoundCertService::GetDomainForHost(domain) ==
    519        ServerBoundCertService::GetDomainForHost(host_port_pair().host())) &&
    520        ssl_info.cert->VerifyNameMatch(domain);
    521 }
    522 
    523 int SpdySession::GetPushStream(
    524     const GURL& url,
    525     base::WeakPtr<SpdyStream>* stream,
    526     const BoundNetLog& stream_net_log) {
    527   CHECK(!in_io_loop_);
    528 
    529   stream->reset();
    530 
    531   // TODO(akalin): Add unit test exercising this code path.
    532   if (availability_state_ == STATE_CLOSED)
    533     return ERR_CONNECTION_CLOSED;
    534 
    535   Error err = TryAccessStream(url);
    536   if (err != OK)
    537     return err;
    538 
    539   *stream = GetActivePushStream(url);
    540   if (*stream) {
    541     DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
    542     streams_pushed_and_claimed_count_++;
    543   }
    544   return OK;
    545 }
    546 
    547 // {,Try}CreateStream() and TryAccessStream() can be called with
    548 // |in_io_loop_| set if a stream is being created in response to
    549 // another being closed due to received data.
    550 
    551 Error SpdySession::TryAccessStream(const GURL& url) {
    552   DCHECK_NE(availability_state_, STATE_CLOSED);
    553 
    554   if (is_secure_ && certificate_error_code_ != OK &&
    555       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
    556     RecordProtocolErrorHistogram(
    557         PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
    558     CloseSessionResult result = DoCloseSession(
    559         static_cast<Error>(certificate_error_code_),
    560         "Tried to get SPDY stream for secure content over an unauthenticated "
    561         "session.");
    562     DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
    563     return ERR_SPDY_PROTOCOL_ERROR;
    564   }
    565   return OK;
    566 }
    567 
    568 int SpdySession::TryCreateStream(SpdyStreamRequest* request,
    569                                  base::WeakPtr<SpdyStream>* stream) {
    570   CHECK(request);
    571 
    572   if (availability_state_ == STATE_GOING_AWAY)
    573     return ERR_FAILED;
    574 
    575   // TODO(akalin): Add unit test exercising this code path.
    576   if (availability_state_ == STATE_CLOSED)
    577     return ERR_CONNECTION_CLOSED;
    578 
    579   Error err = TryAccessStream(request->url());
    580   if (err != OK)
    581     return err;
    582 
    583   if (!max_concurrent_streams_ ||
    584       (active_streams_.size() + created_streams_.size() <
    585        max_concurrent_streams_)) {
    586     return CreateStream(*request, stream);
    587   }
    588 
    589   stalled_streams_++;
    590   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
    591   pending_create_stream_queues_[request->priority()].push_back(request);
    592   return ERR_IO_PENDING;
    593 }
    594 
    595 int SpdySession::CreateStream(const SpdyStreamRequest& request,
    596                               base::WeakPtr<SpdyStream>* stream) {
    597   DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
    598   DCHECK_LT(request.priority(), NUM_PRIORITIES);
    599 
    600   if (availability_state_ == STATE_GOING_AWAY)
    601     return ERR_FAILED;
    602 
    603   // TODO(akalin): Add unit test exercising this code path.
    604   if (availability_state_ == STATE_CLOSED)
    605     return ERR_CONNECTION_CLOSED;
    606 
    607   Error err = TryAccessStream(request.url());
    608   if (err != OK) {
    609     // This should have been caught in TryCreateStream().
    610     NOTREACHED();
    611     return err;
    612   }
    613 
    614   DCHECK(connection_->socket());
    615   DCHECK(connection_->socket()->IsConnected());
    616   if (connection_->socket()) {
    617     UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
    618                           connection_->socket()->IsConnected());
    619     if (!connection_->socket()->IsConnected()) {
    620       CloseSessionResult result = DoCloseSession(
    621           ERR_CONNECTION_CLOSED,
    622           "Tried to create SPDY stream for a closed socket connection.");
    623       DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
    624       return ERR_CONNECTION_CLOSED;
    625     }
    626   }
    627 
    628   scoped_ptr<SpdyStream> new_stream(
    629       new SpdyStream(request.type(), GetWeakPtr(), request.url(),
    630                      request.priority(),
    631                      stream_initial_send_window_size_,
    632                      stream_initial_recv_window_size_,
    633                      request.net_log()));
    634   *stream = new_stream->GetWeakPtr();
    635   InsertCreatedStream(new_stream.Pass());
    636 
    637   UMA_HISTOGRAM_CUSTOM_COUNTS(
    638       "Net.SpdyPriorityCount",
    639       static_cast<int>(request.priority()), 0, 10, 11);
    640 
    641   return OK;
    642 }
    643 
    644 void SpdySession::CancelStreamRequest(SpdyStreamRequest* request) {
    645   CHECK(request);
    646 
    647   if (DCHECK_IS_ON()) {
    648     // |request| should not be in a queue not matching its priority.
    649     for (int i = 0; i < NUM_PRIORITIES; ++i) {
    650       if (request->priority() == i)
    651         continue;
    652       PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
    653       DCHECK(std::find(queue->begin(), queue->end(), request) == queue->end());
    654     }
    655   }
    656 
    657   PendingStreamRequestQueue* queue =
    658       &pending_create_stream_queues_[request->priority()];
    659   // Remove |request| from |queue| while preserving the order of the
    660   // other elements.
    661   PendingStreamRequestQueue::iterator it =
    662       std::find(queue->begin(), queue->end(), request);
    663   if (it != queue->end()) {
    664     it = queue->erase(it);
    665     // |request| should be in the queue at most once, and if it is
    666     // present, should not be pending completion.
    667     DCHECK(std::find(it, queue->end(), request) == queue->end());
    668     DCHECK(!ContainsKey(pending_stream_request_completions_,
    669                         request));
    670     return;
    671   }
    672 
    673   pending_stream_request_completions_.erase(request);
    674 }
    675 
    676 void SpdySession::ProcessPendingStreamRequests() {
    677   // Like |max_concurrent_streams_|, 0 means infinite for
    678   // |max_requests_to_process|.
    679   size_t max_requests_to_process = 0;
    680   if (max_concurrent_streams_ != 0) {
    681     max_requests_to_process =
    682         max_concurrent_streams_ -
    683         (active_streams_.size() + created_streams_.size());
    684   }
    685   for (size_t i = 0;
    686        max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
    687     bool processed_request = false;
    688     for (int j = NUM_PRIORITIES - 1; j >= MINIMUM_PRIORITY; --j) {
    689       if (pending_create_stream_queues_[j].empty())
    690         continue;
    691 
    692       SpdyStreamRequest* pending_request =
    693           pending_create_stream_queues_[j].front();
    694       CHECK(pending_request);
    695       pending_create_stream_queues_[j].pop_front();
    696       processed_request = true;
    697       DCHECK(!ContainsKey(pending_stream_request_completions_,
    698                           pending_request));
    699       pending_stream_request_completions_.insert(pending_request);
    700       base::MessageLoop::current()->PostTask(
    701           FROM_HERE,
    702           base::Bind(&SpdySession::CompleteStreamRequest,
    703                      weak_factory_.GetWeakPtr(), pending_request));
    704       break;
    705     }
    706     if (!processed_request)
    707       break;
    708   }
    709 }
    710 
    711 bool SpdySession::NeedsCredentials() const {
    712   if (!is_secure_)
    713     return false;
    714   SSLClientSocket* ssl_socket = GetSSLClientSocket();
    715   if (ssl_socket->GetNegotiatedProtocol() < kProtoSPDY3)
    716     return false;
    717   return ssl_socket->WasChannelIDSent();
    718 }
    719 
    720 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
    721   pooled_aliases_.insert(alias_key);
    722 }
    723 
    724 int SpdySession::GetProtocolVersion() const {
    725   DCHECK(buffered_spdy_framer_.get());
    726   return buffered_spdy_framer_->protocol_version();
    727 }
    728 
    729 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
    730   return weak_factory_.GetWeakPtr();
    731 }
    732 
    733 bool SpdySession::CloseOneIdleConnection() {
    734   CHECK(!in_io_loop_);
    735   DCHECK_NE(availability_state_, STATE_CLOSED);
    736   DCHECK(pool_);
    737   if (!active_streams_.empty())
    738     return false;
    739   CloseSessionResult result =
    740       DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection.");
    741   if (result != SESSION_CLOSED_AND_REMOVED) {
    742     NOTREACHED();
    743     return false;
    744   }
    745   return true;
    746 }
    747 
    748 void SpdySession::EnqueueStreamWrite(
    749     const base::WeakPtr<SpdyStream>& stream,
    750     SpdyFrameType frame_type,
    751     scoped_ptr<SpdyBufferProducer> producer) {
    752   DCHECK(frame_type == HEADERS ||
    753          frame_type == DATA ||
    754          frame_type == CREDENTIAL ||
    755          frame_type == SYN_STREAM);
    756   EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
    757 }
    758 
    759 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
    760     SpdyStreamId stream_id,
    761     RequestPriority priority,
    762     uint8 credential_slot,
    763     SpdyControlFlags flags,
    764     const SpdyHeaderBlock& headers) {
    765   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
    766   CHECK(it != active_streams_.end());
    767   CHECK_EQ(it->second.stream->stream_id(), stream_id);
    768 
    769   SendPrefacePingIfNoneInFlight();
    770 
    771   DCHECK(buffered_spdy_framer_.get());
    772   scoped_ptr<SpdyFrame> syn_frame(
    773       buffered_spdy_framer_->CreateSynStream(
    774           stream_id, 0,
    775           ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()),
    776           credential_slot, flags, enable_compression_, &headers));
    777 
    778   base::StatsCounter spdy_requests("spdy.requests");
    779   spdy_requests.Increment();
    780   streams_initiated_count_++;
    781 
    782   if (net_log().IsLoggingAllEvents()) {
    783     net_log().AddEvent(
    784         NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
    785         base::Bind(&NetLogSpdySynCallback, &headers,
    786                    (flags & CONTROL_FLAG_FIN) != 0,
    787                    (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
    788                    stream_id, 0));
    789   }
    790 
    791   return syn_frame.Pass();
    792 }
    793 
    794 int SpdySession::CreateCredentialFrame(
    795     const std::string& origin,
    796     const std::string& key,
    797     const std::string& cert,
    798     RequestPriority priority,
    799     scoped_ptr<SpdyFrame>* credential_frame) {
    800   DCHECK(is_secure_);
    801   SSLClientSocket* ssl_socket = GetSSLClientSocket();
    802   DCHECK(ssl_socket);
    803   DCHECK(ssl_socket->WasChannelIDSent());
    804 
    805   SpdyCredential credential;
    806   std::string tls_unique;
    807   ssl_socket->GetTLSUniqueChannelBinding(&tls_unique);
    808   size_t slot = credential_state_.SetHasCredential(GURL(origin));
    809   int rv = SpdyCredentialBuilder::Build(tls_unique, key, cert, slot,
    810                                         &credential);
    811   DCHECK_NE(rv, ERR_IO_PENDING);
    812   if (rv != OK)
    813     return rv;
    814 
    815   DCHECK(buffered_spdy_framer_.get());
    816   credential_frame->reset(
    817       buffered_spdy_framer_->CreateCredentialFrame(credential));
    818 
    819   if (net_log().IsLoggingAllEvents()) {
    820     net_log().AddEvent(
    821         NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
    822         base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
    823   }
    824   return OK;
    825 }
    826 
    827 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
    828                                                      IOBuffer* data,
    829                                                      int len,
    830                                                      SpdyDataFlags flags) {
    831   if (availability_state_ == STATE_CLOSED) {
    832     NOTREACHED();
    833     return scoped_ptr<SpdyBuffer>();
    834   }
    835 
    836   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
    837   CHECK(it != active_streams_.end());
    838   SpdyStream* stream = it->second.stream;
    839   CHECK_EQ(stream->stream_id(), stream_id);
    840 
    841   if (len < 0) {
    842     NOTREACHED();
    843     return scoped_ptr<SpdyBuffer>();
    844   }
    845 
    846   int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
    847 
    848   bool send_stalled_by_stream =
    849       (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
    850       (stream->send_window_size() <= 0);
    851   bool send_stalled_by_session = IsSendStalled();
    852 
    853   // NOTE: There's an enum of the same name in histograms.xml.
    854   enum SpdyFrameFlowControlState {
    855     SEND_NOT_STALLED,
    856     SEND_STALLED_BY_STREAM,
    857     SEND_STALLED_BY_SESSION,
    858     SEND_STALLED_BY_STREAM_AND_SESSION,
    859   };
    860 
    861   SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
    862   if (send_stalled_by_stream) {
    863     if (send_stalled_by_session) {
    864       frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
    865     } else {
    866       frame_flow_control_state = SEND_STALLED_BY_STREAM;
    867     }
    868   } else if (send_stalled_by_session) {
    869     frame_flow_control_state = SEND_STALLED_BY_SESSION;
    870   }
    871 
    872   if (flow_control_state_ == FLOW_CONTROL_STREAM) {
    873     UMA_HISTOGRAM_ENUMERATION(
    874         "Net.SpdyFrameStreamFlowControlState",
    875         frame_flow_control_state,
    876         SEND_STALLED_BY_STREAM + 1);
    877   } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
    878     UMA_HISTOGRAM_ENUMERATION(
    879         "Net.SpdyFrameStreamAndSessionFlowControlState",
    880         frame_flow_control_state,
    881         SEND_STALLED_BY_STREAM_AND_SESSION + 1);
    882   }
    883 
    884   // Obey send window size of the stream if stream flow control is
    885   // enabled.
    886   if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
    887     if (send_stalled_by_stream) {
    888       stream->set_send_stalled_by_flow_control(true);
    889       // Even though we're currently stalled only by the stream, we
    890       // might end up being stalled by the session also.
    891       QueueSendStalledStream(*stream);
    892       net_log().AddEvent(
    893           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
    894           NetLog::IntegerCallback("stream_id", stream_id));
    895       return scoped_ptr<SpdyBuffer>();
    896     }
    897 
    898     effective_len = std::min(effective_len, stream->send_window_size());
    899   }
    900 
    901   // Obey send window size of the session if session flow control is
    902   // enabled.
    903   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
    904     if (send_stalled_by_session) {
    905       stream->set_send_stalled_by_flow_control(true);
    906       QueueSendStalledStream(*stream);
    907       net_log().AddEvent(
    908           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
    909           NetLog::IntegerCallback("stream_id", stream_id));
    910       return scoped_ptr<SpdyBuffer>();
    911     }
    912 
    913     effective_len = std::min(effective_len, session_send_window_size_);
    914   }
    915 
    916   DCHECK_GE(effective_len, 0);
    917 
    918   // Clear FIN flag if only some of the data will be in the data
    919   // frame.
    920   if (effective_len < len)
    921     flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
    922 
    923   if (net_log().IsLoggingAllEvents()) {
    924     net_log().AddEvent(
    925         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
    926         base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
    927                    (flags & DATA_FLAG_FIN) != 0));
    928   }
    929 
    930   // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
    931   if (effective_len > 0)
    932     SendPrefacePingIfNoneInFlight();
    933 
    934   // TODO(mbelshe): reduce memory copies here.
    935   DCHECK(buffered_spdy_framer_.get());
    936   scoped_ptr<SpdyFrame> frame(
    937       buffered_spdy_framer_->CreateDataFrame(
    938           stream_id, data->data(),
    939           static_cast<uint32>(effective_len), flags));
    940 
    941   scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
    942 
    943   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
    944     DecreaseSendWindowSize(static_cast<int32>(effective_len));
    945     data_buffer->AddConsumeCallback(
    946         base::Bind(&SpdySession::OnWriteBufferConsumed,
    947                    weak_factory_.GetWeakPtr(),
    948                    static_cast<size_t>(effective_len)));
    949   }
    950 
    951   return data_buffer.Pass();
    952 }
    953 
    954 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
    955   DCHECK_NE(stream_id, 0u);
    956 
    957   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
    958   if (it == active_streams_.end()) {
    959     NOTREACHED();
    960     return;
    961   }
    962 
    963   CloseActiveStreamIterator(it, status);
    964 }
    965 
    966 void SpdySession::CloseCreatedStream(
    967     const base::WeakPtr<SpdyStream>& stream, int status) {
    968   DCHECK_EQ(stream->stream_id(), 0u);
    969 
    970   CreatedStreamSet::iterator it = created_streams_.find(stream.get());
    971   if (it == created_streams_.end()) {
    972     NOTREACHED();
    973     return;
    974   }
    975 
    976   CloseCreatedStreamIterator(it, status);
    977 }
    978 
    979 void SpdySession::ResetStream(SpdyStreamId stream_id,
    980                               SpdyRstStreamStatus status,
    981                               const std::string& description) {
    982   DCHECK_NE(stream_id, 0u);
    983 
    984   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
    985   if (it == active_streams_.end()) {
    986     NOTREACHED();
    987     return;
    988   }
    989 
    990   ResetStreamIterator(it, status, description);
    991 }
    992 
    993 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
    994   return ContainsKey(active_streams_, stream_id);
    995 }
    996 
    997 LoadState SpdySession::GetLoadState() const {
    998   // Just report that we're idle since the session could be doing
    999   // many things concurrently.
   1000   return LOAD_STATE_IDLE;
   1001 }
   1002 
   1003 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
   1004                                             int status) {
   1005   // TODO(mbelshe): We should send a RST_STREAM control frame here
   1006   //                so that the server can cancel a large send.
   1007 
   1008   scoped_ptr<SpdyStream> owned_stream(it->second.stream);
   1009   active_streams_.erase(it);
   1010 
   1011   // TODO(akalin): When SpdyStream was ref-counted (and
   1012   // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
   1013   // was only done when status was not OK. This meant that pushed
   1014   // streams can still be claimed after they're closed. This is
   1015   // probably something that we still want to support, although server
   1016   // push is hardly used. Write tests for this and fix this. (See
   1017   // http://crbug.com/261712 .)
   1018   if (owned_stream->type() == SPDY_PUSH_STREAM)
   1019       unclaimed_pushed_streams_.erase(owned_stream->url());
   1020 
   1021   DeleteStream(owned_stream.Pass(), status);
   1022 }
   1023 
   1024 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
   1025                                              int status) {
   1026   scoped_ptr<SpdyStream> owned_stream(*it);
   1027   created_streams_.erase(it);
   1028   DeleteStream(owned_stream.Pass(), status);
   1029 }
   1030 
   1031 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
   1032                                       SpdyRstStreamStatus status,
   1033                                       const std::string& description) {
   1034   // Send the RST_STREAM frame first as CloseActiveStreamIterator()
   1035   // may close us.
   1036   SpdyStreamId stream_id = it->first;
   1037   RequestPriority priority = it->second.stream->priority();
   1038   EnqueueResetStreamFrame(stream_id, priority, status, description);
   1039 
   1040   // Removes any pending writes for the stream except for possibly an
   1041   // in-flight one.
   1042   CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
   1043 }
   1044 
   1045 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
   1046                                           RequestPriority priority,
   1047                                           SpdyRstStreamStatus status,
   1048                                           const std::string& description) {
   1049   DCHECK_NE(stream_id, 0u);
   1050 
   1051   net_log().AddEvent(
   1052       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
   1053       base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
   1054 
   1055   DCHECK(buffered_spdy_framer_.get());
   1056   scoped_ptr<SpdyFrame> rst_frame(
   1057       buffered_spdy_framer_->CreateRstStream(stream_id, status));
   1058 
   1059   EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
   1060   RecordProtocolErrorHistogram(
   1061       static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
   1062 }
   1063 
   1064 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
   1065   CHECK(!in_io_loop_);
   1066   DCHECK_NE(availability_state_, STATE_CLOSED);
   1067   DCHECK_EQ(read_state_, expected_read_state);
   1068 
   1069   result = DoReadLoop(expected_read_state, result);
   1070 
   1071   if (availability_state_ == STATE_CLOSED) {
   1072     DCHECK_EQ(result, error_on_close_);
   1073     DCHECK_LT(error_on_close_, ERR_IO_PENDING);
   1074     RemoveFromPool();
   1075     return;
   1076   }
   1077 
   1078   DCHECK(result == OK || result == ERR_IO_PENDING);
   1079 }
   1080 
   1081 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
   1082   CHECK(!in_io_loop_);
   1083   DCHECK_NE(availability_state_, STATE_CLOSED);
   1084   DCHECK_EQ(read_state_, expected_read_state);
   1085 
   1086   in_io_loop_ = true;
   1087 
   1088   int bytes_read_without_yielding = 0;
   1089 
   1090   // Loop until the session is closed, the read becomes blocked, or
   1091   // the read limit is exceeded.
   1092   while (true) {
   1093     switch (read_state_) {
   1094       case READ_STATE_DO_READ:
   1095         DCHECK_EQ(result, OK);
   1096         result = DoRead();
   1097         break;
   1098       case READ_STATE_DO_READ_COMPLETE:
   1099         if (result > 0)
   1100           bytes_read_without_yielding += result;
   1101         result = DoReadComplete(result);
   1102         break;
   1103       default:
   1104         NOTREACHED() << "read_state_: " << read_state_;
   1105         break;
   1106     }
   1107 
   1108     if (availability_state_ == STATE_CLOSED) {
   1109       DCHECK_EQ(result, error_on_close_);
   1110       DCHECK_LT(result, ERR_IO_PENDING);
   1111       break;
   1112     }
   1113 
   1114     if (result == ERR_IO_PENDING)
   1115       break;
   1116 
   1117     if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
   1118       read_state_ = READ_STATE_DO_READ;
   1119       base::MessageLoop::current()->PostTask(
   1120           FROM_HERE,
   1121           base::Bind(&SpdySession::PumpReadLoop,
   1122                      weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
   1123       result = ERR_IO_PENDING;
   1124       break;
   1125     }
   1126   }
   1127 
   1128   CHECK(in_io_loop_);
   1129   in_io_loop_ = false;
   1130 
   1131   return result;
   1132 }
   1133 
   1134 int SpdySession::DoRead() {
   1135   CHECK(in_io_loop_);
   1136   DCHECK_NE(availability_state_, STATE_CLOSED);
   1137 
   1138   CHECK(connection_);
   1139   CHECK(connection_->socket());
   1140   read_state_ = READ_STATE_DO_READ_COMPLETE;
   1141   return connection_->socket()->Read(
   1142       read_buffer_.get(),
   1143       kReadBufferSize,
   1144       base::Bind(&SpdySession::PumpReadLoop,
   1145                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
   1146 }
   1147 
   1148 int SpdySession::DoReadComplete(int result) {
   1149   CHECK(in_io_loop_);
   1150   DCHECK_NE(availability_state_, STATE_CLOSED);
   1151 
   1152   // Parse a frame.  For now this code requires that the frame fit into our
   1153   // buffer (kReadBufferSize).
   1154   // TODO(mbelshe): support arbitrarily large frames!
   1155 
   1156   if (result == 0) {
   1157     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
   1158                                 total_bytes_received_, 1, 100000000, 50);
   1159     CloseSessionResult close_session_result =
   1160         DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed");
   1161     DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
   1162     DCHECK_EQ(availability_state_, STATE_CLOSED);
   1163     DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED);
   1164     return ERR_CONNECTION_CLOSED;
   1165   }
   1166 
   1167   if (result < 0) {
   1168     CloseSessionResult close_session_result =
   1169         DoCloseSession(static_cast<Error>(result), "result is < 0.");
   1170     DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
   1171     DCHECK_EQ(availability_state_, STATE_CLOSED);
   1172     DCHECK_EQ(error_on_close_, result);
   1173     return result;
   1174   }
   1175 
   1176   total_bytes_received_ += result;
   1177 
   1178   last_activity_time_ = time_func_();
   1179 
   1180   DCHECK(buffered_spdy_framer_.get());
   1181   char* data = read_buffer_->data();
   1182   while (result > 0) {
   1183     uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
   1184     result -= bytes_processed;
   1185     data += bytes_processed;
   1186 
   1187     if (availability_state_ == STATE_CLOSED) {
   1188       DCHECK_LT(error_on_close_, ERR_IO_PENDING);
   1189       return error_on_close_;
   1190     }
   1191 
   1192     DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
   1193   }
   1194 
   1195   read_state_ = READ_STATE_DO_READ;
   1196   return OK;
   1197 }
   1198 
   1199 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
   1200   CHECK(!in_io_loop_);
   1201   DCHECK_NE(availability_state_, STATE_CLOSED);
   1202   DCHECK_EQ(write_state_, expected_write_state);
   1203 
   1204   result = DoWriteLoop(expected_write_state, result);
   1205 
   1206   if (availability_state_ == STATE_CLOSED) {
   1207     DCHECK_EQ(result, error_on_close_);
   1208     DCHECK_LT(error_on_close_, ERR_IO_PENDING);
   1209     RemoveFromPool();
   1210     return;
   1211   }
   1212 
   1213   DCHECK(result == OK || result == ERR_IO_PENDING);
   1214 }
   1215 
   1216 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
   1217   CHECK(!in_io_loop_);
   1218   DCHECK_NE(availability_state_, STATE_CLOSED);
   1219   DCHECK_NE(write_state_, WRITE_STATE_IDLE);
   1220   DCHECK_EQ(write_state_, expected_write_state);
   1221 
   1222   in_io_loop_ = true;
   1223 
   1224   // Loop until the session is closed or the write becomes blocked.
   1225   while (true) {
   1226     switch (write_state_) {
   1227       case WRITE_STATE_DO_WRITE:
   1228         DCHECK_EQ(result, OK);
   1229         result = DoWrite();
   1230         break;
   1231       case WRITE_STATE_DO_WRITE_COMPLETE:
   1232         result = DoWriteComplete(result);
   1233         break;
   1234       case WRITE_STATE_IDLE:
   1235       default:
   1236         NOTREACHED() << "write_state_: " << write_state_;
   1237         break;
   1238     }
   1239 
   1240     if (availability_state_ == STATE_CLOSED) {
   1241       DCHECK_EQ(result, error_on_close_);
   1242       DCHECK_LT(result, ERR_IO_PENDING);
   1243       break;
   1244     }
   1245 
   1246     if (write_state_ == WRITE_STATE_IDLE) {
   1247       DCHECK_EQ(result, ERR_IO_PENDING);
   1248       break;
   1249     }
   1250 
   1251     if (result == ERR_IO_PENDING)
   1252       break;
   1253   }
   1254 
   1255   CHECK(in_io_loop_);
   1256   in_io_loop_ = false;
   1257 
   1258   return result;
   1259 }
   1260 
   1261 int SpdySession::DoWrite() {
   1262   CHECK(in_io_loop_);
   1263   DCHECK_NE(availability_state_, STATE_CLOSED);
   1264 
   1265   DCHECK(buffered_spdy_framer_);
   1266   if (in_flight_write_) {
   1267     DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
   1268   } else {
   1269     // Grab the next frame to send.
   1270     SpdyFrameType frame_type = DATA;
   1271     scoped_ptr<SpdyBufferProducer> producer;
   1272     base::WeakPtr<SpdyStream> stream;
   1273     if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
   1274       write_state_ = WRITE_STATE_IDLE;
   1275       return ERR_IO_PENDING;
   1276     }
   1277 
   1278     if (stream.get())
   1279       DCHECK(!stream->IsClosed());
   1280 
   1281     // Activate the stream only when sending the SYN_STREAM frame to
   1282     // guarantee monotonically-increasing stream IDs.
   1283     if (frame_type == SYN_STREAM) {
   1284       if (stream.get() && stream->stream_id() == 0) {
   1285         scoped_ptr<SpdyStream> owned_stream =
   1286             ActivateCreatedStream(stream.get());
   1287         InsertActivatedStream(owned_stream.Pass());
   1288       } else {
   1289         NOTREACHED();
   1290         return ERR_UNEXPECTED;
   1291       }
   1292     }
   1293 
   1294     in_flight_write_ = producer->ProduceBuffer();
   1295     if (!in_flight_write_) {
   1296       NOTREACHED();
   1297       return ERR_UNEXPECTED;
   1298     }
   1299     in_flight_write_frame_type_ = frame_type;
   1300     in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
   1301     DCHECK_GE(in_flight_write_frame_size_,
   1302               buffered_spdy_framer_->GetFrameMinimumSize());
   1303     in_flight_write_stream_ = stream;
   1304   }
   1305 
   1306   write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
   1307 
   1308   // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
   1309   // with Socket implementations that don't store their IOBuffer
   1310   // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
   1311   scoped_refptr<IOBuffer> write_io_buffer =
   1312       in_flight_write_->GetIOBufferForRemainingData();
   1313   return connection_->socket()->Write(
   1314       write_io_buffer.get(),
   1315       in_flight_write_->GetRemainingSize(),
   1316       base::Bind(&SpdySession::PumpWriteLoop,
   1317                  weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
   1318 }
   1319 
   1320 int SpdySession::DoWriteComplete(int result) {
   1321   CHECK(in_io_loop_);
   1322   DCHECK_NE(availability_state_, STATE_CLOSED);
   1323   DCHECK_NE(result, ERR_IO_PENDING);
   1324   DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
   1325 
   1326   last_activity_time_ = time_func_();
   1327 
   1328   if (result < 0) {
   1329     DCHECK_NE(result, ERR_IO_PENDING);
   1330     in_flight_write_.reset();
   1331     in_flight_write_frame_type_ = DATA;
   1332     in_flight_write_frame_size_ = 0;
   1333     in_flight_write_stream_.reset();
   1334     CloseSessionResult close_session_result =
   1335         DoCloseSession(static_cast<Error>(result), "Write error");
   1336     DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
   1337     DCHECK_EQ(availability_state_, STATE_CLOSED);
   1338     DCHECK_EQ(error_on_close_, result);
   1339     return result;
   1340   }
   1341 
   1342   // It should not be possible to have written more bytes than our
   1343   // in_flight_write_.
   1344   DCHECK_LE(static_cast<size_t>(result),
   1345             in_flight_write_->GetRemainingSize());
   1346 
   1347   if (result > 0) {
   1348     in_flight_write_->Consume(static_cast<size_t>(result));
   1349 
   1350     // We only notify the stream when we've fully written the pending frame.
   1351     if (in_flight_write_->GetRemainingSize() == 0) {
   1352       // It is possible that the stream was cancelled while we were
   1353       // writing to the socket.
   1354       if (in_flight_write_stream_.get()) {
   1355         DCHECK_GT(in_flight_write_frame_size_, 0u);
   1356         in_flight_write_stream_->OnFrameWriteComplete(
   1357             in_flight_write_frame_type_,
   1358             in_flight_write_frame_size_);
   1359       }
   1360 
   1361       // Cleanup the write which just completed.
   1362       in_flight_write_.reset();
   1363       in_flight_write_frame_type_ = DATA;
   1364       in_flight_write_frame_size_ = 0;
   1365       in_flight_write_stream_.reset();
   1366     }
   1367   }
   1368 
   1369   write_state_ = WRITE_STATE_DO_WRITE;
   1370   return OK;
   1371 }
   1372 
   1373 void SpdySession::DcheckGoingAway() const {
   1374   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
   1375   if (DCHECK_IS_ON()) {
   1376     for (int i = 0; i < NUM_PRIORITIES; ++i) {
   1377       DCHECK(pending_create_stream_queues_[i].empty());
   1378     }
   1379   }
   1380   DCHECK(pending_stream_request_completions_.empty());
   1381   DCHECK(created_streams_.empty());
   1382 }
   1383 
   1384 void SpdySession::DcheckClosed() const {
   1385   DcheckGoingAway();
   1386   DCHECK_EQ(availability_state_, STATE_CLOSED);
   1387   DCHECK_LT(error_on_close_, ERR_IO_PENDING);
   1388   DCHECK(active_streams_.empty());
   1389   DCHECK(unclaimed_pushed_streams_.empty());
   1390   DCHECK(write_queue_.IsEmpty());
   1391 }
   1392 
   1393 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
   1394                                  Error status) {
   1395   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
   1396 
   1397   // The loops below are carefully written to avoid reentrancy problems.
   1398   //
   1399   // TODO(akalin): Any of the functions below can cause |this| to be
   1400   // deleted, so handle that below (and add tests for it).
   1401 
   1402   for (int i = 0; i < NUM_PRIORITIES; ++i) {
   1403     PendingStreamRequestQueue queue;
   1404     queue.swap(pending_create_stream_queues_[i]);
   1405     for (PendingStreamRequestQueue::const_iterator it = queue.begin();
   1406          it != queue.end(); ++it) {
   1407       CHECK(*it);
   1408       (*it)->OnRequestCompleteFailure(ERR_ABORTED);
   1409     }
   1410   }
   1411 
   1412   PendingStreamRequestCompletionSet pending_completions;
   1413   pending_completions.swap(pending_stream_request_completions_);
   1414   for (PendingStreamRequestCompletionSet::const_iterator it =
   1415            pending_completions.begin();
   1416        it != pending_completions.end(); ++it) {
   1417     (*it)->OnRequestCompleteFailure(ERR_ABORTED);
   1418   }
   1419 
   1420   while (true) {
   1421     ActiveStreamMap::iterator it =
   1422         active_streams_.lower_bound(last_good_stream_id + 1);
   1423     if (it == active_streams_.end())
   1424       break;
   1425     LogAbandonedActiveStream(it, status);
   1426     CloseActiveStreamIterator(it, status);
   1427   }
   1428 
   1429   while (!created_streams_.empty()) {
   1430     CreatedStreamSet::iterator it = created_streams_.begin();
   1431     LogAbandonedStream(*it, status);
   1432     CloseCreatedStreamIterator(it, status);
   1433   }
   1434 
   1435   write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
   1436 
   1437   DcheckGoingAway();
   1438 }
   1439 
   1440 void SpdySession::MaybeFinishGoingAway() {
   1441   DcheckGoingAway();
   1442   if (active_streams_.empty() && availability_state_ != STATE_CLOSED) {
   1443     CloseSessionResult result =
   1444         DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away");
   1445     DCHECK_NE(result, SESSION_ALREADY_CLOSED);
   1446   }
   1447 }
   1448 
   1449 SpdySession::CloseSessionResult SpdySession::DoCloseSession(
   1450     Error err,
   1451     const std::string& description) {
   1452   DCHECK_LT(err, ERR_IO_PENDING);
   1453 
   1454   if (availability_state_ == STATE_CLOSED)
   1455     return SESSION_ALREADY_CLOSED;
   1456 
   1457   net_log_.AddEvent(
   1458       NetLog::TYPE_SPDY_SESSION_CLOSE,
   1459       base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
   1460 
   1461   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
   1462   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
   1463                               total_bytes_received_, 1, 100000000, 50);
   1464 
   1465   // |pool_| will be NULL when |InitializeWithSocket()| is in the
   1466   // call stack.
   1467   if (pool_ && availability_state_ != STATE_GOING_AWAY)
   1468     pool_->MakeSessionUnavailable(GetWeakPtr());
   1469 
   1470   availability_state_ = STATE_CLOSED;
   1471   error_on_close_ = err;
   1472 
   1473   StartGoingAway(0, err);
   1474   write_queue_.Clear();
   1475 
   1476   DcheckClosed();
   1477 
   1478   if (in_io_loop_)
   1479     return SESSION_CLOSED_BUT_NOT_REMOVED;
   1480 
   1481   RemoveFromPool();
   1482   return SESSION_CLOSED_AND_REMOVED;
   1483 }
   1484 
   1485 void SpdySession::RemoveFromPool() {
   1486   DcheckClosed();
   1487   CHECK(pool_);
   1488 
   1489   SpdySessionPool* pool = pool_;
   1490   pool_ = NULL;
   1491   pool->RemoveUnavailableSession(GetWeakPtr());
   1492 }
   1493 
   1494 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
   1495   DCHECK(stream);
   1496   std::string description = base::StringPrintf(
   1497       "ABANDONED (stream_id=%d): ", stream->stream_id()) +
   1498       stream->url().spec();
   1499   stream->LogStreamError(status, description);
   1500   // We don't increment the streams abandoned counter here. If the
   1501   // stream isn't active (i.e., it hasn't written anything to the wire
   1502   // yet) then it's as if it never existed. If it is active, then
   1503   // LogAbandonedActiveStream() will increment the counters.
   1504 }
   1505 
   1506 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
   1507                                            Error status) {
   1508   DCHECK_GT(it->first, 0u);
   1509   LogAbandonedStream(it->second.stream, status);
   1510   ++streams_abandoned_count_;
   1511   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
   1512   abandoned_streams.Increment();
   1513   if (it->second.stream->type() == SPDY_PUSH_STREAM &&
   1514       unclaimed_pushed_streams_.find(it->second.stream->url()) !=
   1515       unclaimed_pushed_streams_.end()) {
   1516     base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
   1517     abandoned_push_streams.Increment();
   1518   }
   1519 }
   1520 
   1521 int SpdySession::GetNewStreamId() {
   1522   int id = stream_hi_water_mark_;
   1523   stream_hi_water_mark_ += 2;
   1524   if (stream_hi_water_mark_ > 0x7fff)
   1525     stream_hi_water_mark_ = 1;
   1526   return id;
   1527 }
   1528 
   1529 void SpdySession::CloseSessionOnError(Error err,
   1530                                       const std::string& description) {
   1531   // We may be called from anywhere, so we can't expect a particular
   1532   // return value.
   1533   ignore_result(DoCloseSession(err, description));
   1534 }
   1535 
   1536 base::Value* SpdySession::GetInfoAsValue() const {
   1537   base::DictionaryValue* dict = new base::DictionaryValue();
   1538 
   1539   dict->SetInteger("source_id", net_log_.source().id);
   1540 
   1541   dict->SetString("host_port_pair", host_port_pair().ToString());
   1542   if (!pooled_aliases_.empty()) {
   1543     base::ListValue* alias_list = new base::ListValue();
   1544     for (std::set<SpdySessionKey>::const_iterator it =
   1545              pooled_aliases_.begin();
   1546          it != pooled_aliases_.end(); it++) {
   1547       alias_list->Append(new base::StringValue(
   1548           it->host_port_pair().ToString()));
   1549     }
   1550     dict->Set("aliases", alias_list);
   1551   }
   1552   dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
   1553 
   1554   dict->SetInteger("active_streams", active_streams_.size());
   1555 
   1556   dict->SetInteger("unclaimed_pushed_streams",
   1557                    unclaimed_pushed_streams_.size());
   1558 
   1559   dict->SetBoolean("is_secure", is_secure_);
   1560 
   1561   dict->SetString("protocol_negotiated",
   1562                   SSLClientSocket::NextProtoToString(
   1563                       connection_->socket()->GetNegotiatedProtocol()));
   1564 
   1565   dict->SetInteger("error", error_on_close_);
   1566   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
   1567 
   1568   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
   1569   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
   1570   dict->SetInteger("streams_pushed_and_claimed_count",
   1571       streams_pushed_and_claimed_count_);
   1572   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
   1573   DCHECK(buffered_spdy_framer_.get());
   1574   dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
   1575 
   1576   dict->SetBoolean("sent_settings", sent_settings_);
   1577   dict->SetBoolean("received_settings", received_settings_);
   1578 
   1579   dict->SetInteger("send_window_size", session_send_window_size_);
   1580   dict->SetInteger("recv_window_size", session_recv_window_size_);
   1581   dict->SetInteger("unacked_recv_window_bytes",
   1582                    session_unacked_recv_window_bytes_);
   1583   return dict;
   1584 }
   1585 
   1586 bool SpdySession::IsReused() const {
   1587   return buffered_spdy_framer_->frames_received() > 0;
   1588 }
   1589 
   1590 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
   1591                                     LoadTimingInfo* load_timing_info) const {
   1592   return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
   1593                                         load_timing_info);
   1594 }
   1595 
   1596 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
   1597   int rv = ERR_SOCKET_NOT_CONNECTED;
   1598   if (connection_->socket()) {
   1599     rv = connection_->socket()->GetPeerAddress(address);
   1600   }
   1601 
   1602   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
   1603                         rv == ERR_SOCKET_NOT_CONNECTED);
   1604 
   1605   return rv;
   1606 }
   1607 
   1608 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
   1609   int rv = ERR_SOCKET_NOT_CONNECTED;
   1610   if (connection_->socket()) {
   1611     rv = connection_->socket()->GetLocalAddress(address);
   1612   }
   1613 
   1614   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
   1615                         rv == ERR_SOCKET_NOT_CONNECTED);
   1616 
   1617   return rv;
   1618 }
   1619 
   1620 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
   1621                                       SpdyFrameType frame_type,
   1622                                       scoped_ptr<SpdyFrame> frame) {
   1623   DCHECK(frame_type == RST_STREAM ||
   1624          frame_type == SETTINGS ||
   1625          frame_type == WINDOW_UPDATE ||
   1626          frame_type == PING);
   1627   EnqueueWrite(
   1628       priority, frame_type,
   1629       scoped_ptr<SpdyBufferProducer>(
   1630           new SimpleBufferProducer(
   1631               scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
   1632       base::WeakPtr<SpdyStream>());
   1633 }
   1634 
   1635 void SpdySession::EnqueueWrite(RequestPriority priority,
   1636                                SpdyFrameType frame_type,
   1637                                scoped_ptr<SpdyBufferProducer> producer,
   1638                                const base::WeakPtr<SpdyStream>& stream) {
   1639   if (availability_state_ == STATE_CLOSED)
   1640     return;
   1641 
   1642   bool was_idle = write_queue_.IsEmpty();
   1643   write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
   1644   if (write_state_ == WRITE_STATE_IDLE) {
   1645     DCHECK(was_idle);
   1646     DCHECK(!in_flight_write_);
   1647     write_state_ = WRITE_STATE_DO_WRITE;
   1648     base::MessageLoop::current()->PostTask(
   1649         FROM_HERE,
   1650         base::Bind(&SpdySession::PumpWriteLoop,
   1651                    weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
   1652   }
   1653 }
   1654 
   1655 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
   1656   DCHECK_EQ(stream->stream_id(), 0u);
   1657   DCHECK(created_streams_.find(stream.get()) == created_streams_.end());
   1658   created_streams_.insert(stream.release());
   1659 }
   1660 
   1661 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
   1662   DCHECK_EQ(stream->stream_id(), 0u);
   1663   DCHECK(created_streams_.find(stream) != created_streams_.end());
   1664   stream->set_stream_id(GetNewStreamId());
   1665   scoped_ptr<SpdyStream> owned_stream(stream);
   1666   created_streams_.erase(stream);
   1667   return owned_stream.Pass();
   1668 }
   1669 
   1670 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
   1671   SpdyStreamId stream_id = stream->stream_id();
   1672   DCHECK_NE(stream_id, 0u);
   1673   std::pair<ActiveStreamMap::iterator, bool> result =
   1674       active_streams_.insert(
   1675           std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
   1676   if (result.second) {
   1677     ignore_result(stream.release());
   1678   } else {
   1679     NOTREACHED();
   1680   }
   1681 }
   1682 
   1683 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
   1684   if (in_flight_write_stream_.get() == stream.get()) {
   1685     // If we're deleting the stream for the in-flight write, we still
   1686     // need to let the write complete, so we clear
   1687     // |in_flight_write_stream_| and let the write finish on its own
   1688     // without notifying |in_flight_write_stream_|.
   1689     in_flight_write_stream_.reset();
   1690   }
   1691 
   1692   write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
   1693 
   1694   // |stream->OnClose()| may end up closing |this|, so detect that.
   1695   base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
   1696 
   1697   stream->OnClose(status);
   1698 
   1699   if (!weak_this)
   1700     return;
   1701 
   1702   switch (availability_state_) {
   1703     case STATE_AVAILABLE:
   1704       ProcessPendingStreamRequests();
   1705       break;
   1706     case STATE_GOING_AWAY:
   1707       DcheckGoingAway();
   1708       MaybeFinishGoingAway();
   1709       break;
   1710     case STATE_CLOSED:
   1711       // Do nothing.
   1712       break;
   1713   }
   1714 }
   1715 
   1716 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
   1717   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
   1718 
   1719   PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
   1720   if (unclaimed_it == unclaimed_pushed_streams_.end())
   1721     return base::WeakPtr<SpdyStream>();
   1722 
   1723   SpdyStreamId stream_id = unclaimed_it->second.stream_id;
   1724   unclaimed_pushed_streams_.erase(unclaimed_it);
   1725 
   1726   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
   1727   if (active_it == active_streams_.end()) {
   1728     NOTREACHED();
   1729     return base::WeakPtr<SpdyStream>();
   1730   }
   1731 
   1732   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
   1733   used_push_streams.Increment();
   1734   return active_it->second.stream->GetWeakPtr();
   1735 }
   1736 
   1737 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
   1738                              bool* was_npn_negotiated,
   1739                              NextProto* protocol_negotiated) {
   1740   *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
   1741   *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
   1742   return connection_->socket()->GetSSLInfo(ssl_info);
   1743 }
   1744 
   1745 bool SpdySession::GetSSLCertRequestInfo(
   1746     SSLCertRequestInfo* cert_request_info) {
   1747   if (!is_secure_)
   1748     return false;
   1749   GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
   1750   return true;
   1751 }
   1752 
   1753 ServerBoundCertService* SpdySession::GetServerBoundCertService() const {
   1754   if (!is_secure_)
   1755     return NULL;
   1756   return GetSSLClientSocket()->GetServerBoundCertService();
   1757 }
   1758 
   1759 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
   1760   CHECK(in_io_loop_);
   1761 
   1762   if (availability_state_ == STATE_CLOSED)
   1763     return;
   1764 
   1765   RecordProtocolErrorHistogram(
   1766       static_cast<SpdyProtocolErrorDetails>(error_code));
   1767   std::string description = base::StringPrintf(
   1768       "SPDY_ERROR error_code: %d.", error_code);
   1769   CloseSessionResult result =
   1770       DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description);
   1771   DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
   1772 }
   1773 
   1774 void SpdySession::OnStreamError(SpdyStreamId stream_id,
   1775                                 const std::string& description) {
   1776   CHECK(in_io_loop_);
   1777 
   1778   if (availability_state_ == STATE_CLOSED)
   1779     return;
   1780 
   1781   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1782   if (it == active_streams_.end()) {
   1783     // We still want to send a frame to reset the stream even if we
   1784     // don't know anything about it.
   1785     EnqueueResetStreamFrame(
   1786         stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
   1787     return;
   1788   }
   1789 
   1790   ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
   1791 }
   1792 
   1793 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
   1794                                     const char* data,
   1795                                     size_t len,
   1796                                     bool fin) {
   1797   CHECK(in_io_loop_);
   1798 
   1799   if (availability_state_ == STATE_CLOSED)
   1800     return;
   1801 
   1802   DCHECK_LT(len, 1u << 24);
   1803   if (net_log().IsLoggingAllEvents()) {
   1804     net_log().AddEvent(
   1805         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
   1806         base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
   1807   }
   1808 
   1809   // Build the buffer as early as possible so that we go through the
   1810   // session flow control checks and update
   1811   // |unacked_recv_window_bytes_| properly even when the stream is
   1812   // inactive (since the other side has still reduced its session send
   1813   // window).
   1814   scoped_ptr<SpdyBuffer> buffer;
   1815   if (data) {
   1816     DCHECK_GT(len, 0u);
   1817     buffer.reset(new SpdyBuffer(data, len));
   1818 
   1819     if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   1820       DecreaseRecvWindowSize(static_cast<int32>(len));
   1821       buffer->AddConsumeCallback(
   1822           base::Bind(&SpdySession::OnReadBufferConsumed,
   1823                      weak_factory_.GetWeakPtr()));
   1824     }
   1825   } else {
   1826     DCHECK_EQ(len, 0u);
   1827   }
   1828 
   1829   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1830 
   1831   // By the time data comes in, the stream may already be inactive.
   1832   if (it == active_streams_.end())
   1833     return;
   1834 
   1835   SpdyStream* stream = it->second.stream;
   1836   CHECK_EQ(stream->stream_id(), stream_id);
   1837 
   1838   if (it->second.waiting_for_syn_reply) {
   1839     const std::string& error = "Data received before SYN_REPLY.";
   1840     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   1841     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
   1842     return;
   1843   }
   1844 
   1845   stream->OnDataReceived(buffer.Pass());
   1846 }
   1847 
   1848 void SpdySession::OnSettings(bool clear_persisted) {
   1849   CHECK(in_io_loop_);
   1850 
   1851   if (availability_state_ == STATE_CLOSED)
   1852     return;
   1853 
   1854   if (clear_persisted)
   1855     http_server_properties_->ClearSpdySettings(host_port_pair());
   1856 
   1857   if (net_log_.IsLoggingAllEvents()) {
   1858     net_log_.AddEvent(
   1859         NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
   1860         base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
   1861                    clear_persisted));
   1862   }
   1863 }
   1864 
   1865 void SpdySession::OnSetting(SpdySettingsIds id,
   1866                             uint8 flags,
   1867                             uint32 value) {
   1868   CHECK(in_io_loop_);
   1869 
   1870   if (availability_state_ == STATE_CLOSED)
   1871     return;
   1872 
   1873   HandleSetting(id, value);
   1874   http_server_properties_->SetSpdySetting(
   1875       host_port_pair(),
   1876       id,
   1877       static_cast<SpdySettingsFlags>(flags),
   1878       value);
   1879   received_settings_ = true;
   1880 
   1881   // Log the setting.
   1882   net_log_.AddEvent(
   1883       NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
   1884       base::Bind(&NetLogSpdySettingCallback,
   1885                  id, static_cast<SpdySettingsFlags>(flags), value));
   1886 }
   1887 
   1888 void SpdySession::OnSendCompressedFrame(
   1889     SpdyStreamId stream_id,
   1890     SpdyFrameType type,
   1891     size_t payload_len,
   1892     size_t frame_len) {
   1893   if (type != SYN_STREAM)
   1894     return;
   1895 
   1896   DCHECK(buffered_spdy_framer_.get());
   1897   size_t compressed_len =
   1898       frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
   1899 
   1900   if (payload_len) {
   1901     // Make sure we avoid early decimal truncation.
   1902     int compression_pct = 100 - (100 * compressed_len) / payload_len;
   1903     UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
   1904                              compression_pct);
   1905   }
   1906 }
   1907 
   1908 int SpdySession::OnInitialResponseHeadersReceived(
   1909     const SpdyHeaderBlock& response_headers,
   1910     base::Time response_time,
   1911     base::TimeTicks recv_first_byte_time,
   1912     SpdyStream* stream) {
   1913   CHECK(in_io_loop_);
   1914   SpdyStreamId stream_id = stream->stream_id();
   1915   // May invalidate |stream|.
   1916   int rv = stream->OnInitialResponseHeadersReceived(
   1917       response_headers, response_time, recv_first_byte_time);
   1918   if (rv < 0) {
   1919     DCHECK_NE(rv, ERR_IO_PENDING);
   1920     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
   1921   }
   1922   return rv;
   1923 }
   1924 
   1925 void SpdySession::OnSynStream(SpdyStreamId stream_id,
   1926                               SpdyStreamId associated_stream_id,
   1927                               SpdyPriority priority,
   1928                               uint8 credential_slot,
   1929                               bool fin,
   1930                               bool unidirectional,
   1931                               const SpdyHeaderBlock& headers) {
   1932   CHECK(in_io_loop_);
   1933 
   1934   if (availability_state_ == STATE_CLOSED)
   1935     return;
   1936 
   1937   base::Time response_time = base::Time::Now();
   1938   base::TimeTicks recv_first_byte_time = time_func_();
   1939 
   1940   if (net_log_.IsLoggingAllEvents()) {
   1941     net_log_.AddEvent(
   1942         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
   1943         base::Bind(&NetLogSpdySynCallback,
   1944                    &headers, fin, unidirectional,
   1945                    stream_id, associated_stream_id));
   1946   }
   1947 
   1948   // Server-initiated streams should have even sequence numbers.
   1949   if ((stream_id & 0x1) != 0) {
   1950     LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
   1951     return;
   1952   }
   1953 
   1954   if (IsStreamActive(stream_id)) {
   1955     LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
   1956     return;
   1957   }
   1958 
   1959   RequestPriority request_priority =
   1960       ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
   1961 
   1962   if (availability_state_ == STATE_GOING_AWAY) {
   1963     // TODO(akalin): This behavior isn't in the SPDY spec, although it
   1964     // probably should be.
   1965     EnqueueResetStreamFrame(stream_id, request_priority,
   1966                             RST_STREAM_REFUSED_STREAM,
   1967                             "OnSyn received when going away");
   1968     return;
   1969   }
   1970 
   1971   if (associated_stream_id == 0) {
   1972     std::string description = base::StringPrintf(
   1973         "Received invalid OnSyn associated stream id %d for stream %d",
   1974         associated_stream_id, stream_id);
   1975     EnqueueResetStreamFrame(stream_id, request_priority,
   1976                             RST_STREAM_REFUSED_STREAM, description);
   1977     return;
   1978   }
   1979 
   1980   streams_pushed_count_++;
   1981 
   1982   // TODO(mbelshe): DCHECK that this is a GET method?
   1983 
   1984   // Verify that the response had a URL for us.
   1985   GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
   1986   if (!gurl.is_valid()) {
   1987     EnqueueResetStreamFrame(
   1988         stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR,
   1989         "Pushed stream url was invalid: " + gurl.spec());
   1990     return;
   1991   }
   1992 
   1993   // Verify we have a valid stream association.
   1994   ActiveStreamMap::iterator associated_it =
   1995       active_streams_.find(associated_stream_id);
   1996   if (associated_it == active_streams_.end()) {
   1997     EnqueueResetStreamFrame(
   1998         stream_id, request_priority, RST_STREAM_INVALID_STREAM,
   1999         base::StringPrintf(
   2000             "Received OnSyn with inactive associated stream %d",
   2001             associated_stream_id));
   2002     return;
   2003   }
   2004 
   2005   // Check that the SYN advertises the same origin as its associated stream.
   2006   // Bypass this check if and only if this session is with a SPDY proxy that
   2007   // is trusted explicitly via the --trusted-spdy-proxy switch.
   2008   if (trusted_spdy_proxy_.Equals(host_port_pair())) {
   2009     // Disallow pushing of HTTPS content.
   2010     if (gurl.SchemeIs("https")) {
   2011       EnqueueResetStreamFrame(
   2012           stream_id, request_priority, RST_STREAM_REFUSED_STREAM,
   2013           base::StringPrintf(
   2014               "Rejected push of Cross Origin HTTPS content %d",
   2015               associated_stream_id));
   2016     }
   2017   } else {
   2018     GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
   2019     if (associated_url.GetOrigin() != gurl.GetOrigin()) {
   2020       EnqueueResetStreamFrame(
   2021           stream_id, request_priority, RST_STREAM_REFUSED_STREAM,
   2022           base::StringPrintf(
   2023               "Rejected Cross Origin Push Stream %d",
   2024               associated_stream_id));
   2025       return;
   2026     }
   2027   }
   2028 
   2029   // There should not be an existing pushed stream with the same path.
   2030   PushedStreamMap::iterator pushed_it =
   2031       unclaimed_pushed_streams_.lower_bound(gurl);
   2032   if (pushed_it != unclaimed_pushed_streams_.end() &&
   2033       pushed_it->first == gurl) {
   2034     EnqueueResetStreamFrame(
   2035         stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR,
   2036         "Received duplicate pushed stream with url: " +
   2037         gurl.spec());
   2038     return;
   2039   }
   2040 
   2041   scoped_ptr<SpdyStream> stream(
   2042       new SpdyStream(SPDY_PUSH_STREAM, GetWeakPtr(), gurl,
   2043                      request_priority,
   2044                      stream_initial_send_window_size_,
   2045                      stream_initial_recv_window_size_,
   2046                      net_log_));
   2047   stream->set_stream_id(stream_id);
   2048 
   2049   DeleteExpiredPushedStreams();
   2050   PushedStreamMap::iterator inserted_pushed_it =
   2051       unclaimed_pushed_streams_.insert(
   2052           pushed_it,
   2053           std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
   2054   DCHECK(inserted_pushed_it != pushed_it);
   2055 
   2056   InsertActivatedStream(stream.Pass());
   2057 
   2058   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
   2059   if (active_it == active_streams_.end()) {
   2060     NOTREACHED();
   2061     return;
   2062   }
   2063 
   2064   // Parse the headers.
   2065   if (OnInitialResponseHeadersReceived(
   2066           headers, response_time,
   2067           recv_first_byte_time, active_it->second.stream) != OK)
   2068     return;
   2069 
   2070   base::StatsCounter push_requests("spdy.pushed_streams");
   2071   push_requests.Increment();
   2072 }
   2073 
   2074 void SpdySession::DeleteExpiredPushedStreams() {
   2075   if (unclaimed_pushed_streams_.empty())
   2076     return;
   2077 
   2078   // Check that adequate time has elapsed since the last sweep.
   2079   if (time_func_() < next_unclaimed_push_stream_sweep_time_)
   2080     return;
   2081 
   2082   // Gather old streams to delete.
   2083   base::TimeTicks minimum_freshness = time_func_() -
   2084       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
   2085   std::vector<SpdyStreamId> streams_to_close;
   2086   for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
   2087        it != unclaimed_pushed_streams_.end(); ++it) {
   2088     if (minimum_freshness > it->second.creation_time)
   2089       streams_to_close.push_back(it->second.stream_id);
   2090   }
   2091 
   2092   for (std::vector<SpdyStreamId>::const_iterator to_close_it =
   2093            streams_to_close.begin();
   2094        to_close_it != streams_to_close.end(); ++to_close_it) {
   2095     ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
   2096     if (active_it == active_streams_.end())
   2097       continue;
   2098 
   2099     LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
   2100     // CloseActiveStreamIterator() will remove the stream from
   2101     // |unclaimed_pushed_streams_|.
   2102     CloseActiveStreamIterator(active_it, ERR_INVALID_SPDY_STREAM);
   2103   }
   2104 
   2105   next_unclaimed_push_stream_sweep_time_ = time_func_() +
   2106       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
   2107 }
   2108 
   2109 void SpdySession::OnSynReply(SpdyStreamId stream_id,
   2110                              bool fin,
   2111                              const SpdyHeaderBlock& headers) {
   2112   CHECK(in_io_loop_);
   2113 
   2114   if (availability_state_ == STATE_CLOSED)
   2115     return;
   2116 
   2117   base::Time response_time = base::Time::Now();
   2118   base::TimeTicks recv_first_byte_time = time_func_();
   2119 
   2120   if (net_log().IsLoggingAllEvents()) {
   2121     net_log().AddEvent(
   2122         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
   2123         base::Bind(&NetLogSpdySynCallback,
   2124                    &headers, fin, false,  // not unidirectional
   2125                    stream_id, 0));
   2126   }
   2127 
   2128   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2129   if (it == active_streams_.end()) {
   2130     // NOTE:  it may just be that the stream was cancelled.
   2131     return;
   2132   }
   2133 
   2134   SpdyStream* stream = it->second.stream;
   2135   CHECK_EQ(stream->stream_id(), stream_id);
   2136 
   2137   if (!it->second.waiting_for_syn_reply) {
   2138     const std::string& error =
   2139         "Received duplicate SYN_REPLY for stream.";
   2140     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   2141     ResetStreamIterator(it, RST_STREAM_STREAM_IN_USE, error);
   2142     return;
   2143   }
   2144   it->second.waiting_for_syn_reply = false;
   2145 
   2146   ignore_result(OnInitialResponseHeadersReceived(
   2147       headers, response_time, recv_first_byte_time, stream));
   2148 }
   2149 
   2150 void SpdySession::OnHeaders(SpdyStreamId stream_id,
   2151                             bool fin,
   2152                             const SpdyHeaderBlock& headers) {
   2153   CHECK(in_io_loop_);
   2154 
   2155   if (availability_state_ == STATE_CLOSED)
   2156     return;
   2157 
   2158   if (net_log().IsLoggingAllEvents()) {
   2159     net_log().AddEvent(
   2160         NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
   2161         base::Bind(&NetLogSpdySynCallback,
   2162                    &headers, fin, /*unidirectional=*/false,
   2163                    stream_id, 0));
   2164   }
   2165 
   2166   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2167   if (it == active_streams_.end()) {
   2168     // NOTE:  it may just be that the stream was cancelled.
   2169     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
   2170     return;
   2171   }
   2172 
   2173   SpdyStream* stream = it->second.stream;
   2174   CHECK_EQ(stream->stream_id(), stream_id);
   2175 
   2176   int rv = stream->OnAdditionalResponseHeadersReceived(headers);
   2177   if (rv < 0) {
   2178     DCHECK_NE(rv, ERR_IO_PENDING);
   2179     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
   2180   }
   2181 }
   2182 
   2183 void SpdySession::OnRstStream(SpdyStreamId stream_id,
   2184                               SpdyRstStreamStatus status) {
   2185   CHECK(in_io_loop_);
   2186 
   2187   if (availability_state_ == STATE_CLOSED)
   2188     return;
   2189 
   2190   std::string description;
   2191   net_log().AddEvent(
   2192       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
   2193       base::Bind(&NetLogSpdyRstCallback,
   2194                  stream_id, status, &description));
   2195 
   2196   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2197   if (it == active_streams_.end()) {
   2198     // NOTE:  it may just be that the stream was cancelled.
   2199     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
   2200     return;
   2201   }
   2202 
   2203   CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2204 
   2205   if (status == 0) {
   2206     it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
   2207   } else if (status == RST_STREAM_REFUSED_STREAM) {
   2208     CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
   2209   } else {
   2210     RecordProtocolErrorHistogram(
   2211         PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
   2212     it->second.stream->LogStreamError(
   2213         ERR_SPDY_PROTOCOL_ERROR,
   2214         base::StringPrintf("SPDY stream closed with status: %d", status));
   2215     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
   2216     //                For now, it doesn't matter much - it is a protocol error.
   2217     CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
   2218   }
   2219 }
   2220 
   2221 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
   2222                            SpdyGoAwayStatus status) {
   2223   CHECK(in_io_loop_);
   2224 
   2225   if (availability_state_ == STATE_CLOSED)
   2226     return;
   2227 
   2228   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
   2229       base::Bind(&NetLogSpdyGoAwayCallback,
   2230                  last_accepted_stream_id,
   2231                  active_streams_.size(),
   2232                  unclaimed_pushed_streams_.size(),
   2233                  status));
   2234   if (availability_state_ < STATE_GOING_AWAY) {
   2235     availability_state_ = STATE_GOING_AWAY;
   2236     // |pool_| will be NULL when |InitializeWithSocket()| is in the
   2237     // call stack.
   2238     if (pool_)
   2239       pool_->MakeSessionUnavailable(GetWeakPtr());
   2240   }
   2241   StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
   2242   // This is to handle the case when we already don't have any active
   2243   // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
   2244   // active streams and so the last one being closed will finish the
   2245   // going away process (see DeleteStream()).
   2246   MaybeFinishGoingAway();
   2247 }
   2248 
   2249 void SpdySession::OnPing(uint32 unique_id) {
   2250   CHECK(in_io_loop_);
   2251 
   2252   if (availability_state_ == STATE_CLOSED)
   2253     return;
   2254 
   2255   net_log_.AddEvent(
   2256       NetLog::TYPE_SPDY_SESSION_PING,
   2257       base::Bind(&NetLogSpdyPingCallback, unique_id, "received"));
   2258 
   2259   // Send response to a PING from server.
   2260   if (unique_id % 2 == 0) {
   2261     WritePingFrame(unique_id);
   2262     return;
   2263   }
   2264 
   2265   --pings_in_flight_;
   2266   if (pings_in_flight_ < 0) {
   2267     RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
   2268     CloseSessionResult result =
   2269         DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
   2270     DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
   2271     pings_in_flight_ = 0;
   2272     return;
   2273   }
   2274 
   2275   if (pings_in_flight_ > 0)
   2276     return;
   2277 
   2278   // We will record RTT in histogram when there are no more client sent
   2279   // pings_in_flight_.
   2280   RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
   2281 }
   2282 
   2283 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
   2284                                  uint32 delta_window_size) {
   2285   CHECK(in_io_loop_);
   2286 
   2287   if (availability_state_ == STATE_CLOSED)
   2288     return;
   2289 
   2290   DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
   2291   net_log_.AddEvent(
   2292       NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
   2293       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
   2294                  stream_id, delta_window_size));
   2295 
   2296   if (stream_id == kSessionFlowControlStreamId) {
   2297     // WINDOW_UPDATE for the session.
   2298     if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
   2299       LOG(WARNING) << "Received WINDOW_UPDATE for session when "
   2300                    << "session flow control is not turned on";
   2301       // TODO(akalin): Record an error and close the session.
   2302       return;
   2303     }
   2304 
   2305     if (delta_window_size < 1u) {
   2306       RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
   2307       CloseSessionResult result = DoCloseSession(
   2308           ERR_SPDY_PROTOCOL_ERROR,
   2309           "Received WINDOW_UPDATE with an invalid delta_window_size " +
   2310           base::UintToString(delta_window_size));
   2311       DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
   2312       return;
   2313     }
   2314 
   2315     IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
   2316   } else {
   2317     // WINDOW_UPDATE for a stream.
   2318     if (flow_control_state_ < FLOW_CONTROL_STREAM) {
   2319       // TODO(akalin): Record an error and close the session.
   2320       LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
   2321                    << " when flow control is not turned on";
   2322       return;
   2323     }
   2324 
   2325     ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2326 
   2327     if (it == active_streams_.end()) {
   2328       // NOTE:  it may just be that the stream was cancelled.
   2329       LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
   2330       return;
   2331     }
   2332 
   2333     SpdyStream* stream = it->second.stream;
   2334     CHECK_EQ(stream->stream_id(), stream_id);
   2335 
   2336     if (delta_window_size < 1u) {
   2337       ResetStreamIterator(it,
   2338                           RST_STREAM_FLOW_CONTROL_ERROR,
   2339                           base::StringPrintf(
   2340                               "Received WINDOW_UPDATE with an invalid "
   2341                               "delta_window_size %ud", delta_window_size));
   2342       return;
   2343     }
   2344 
   2345     CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2346     it->second.stream->IncreaseSendWindowSize(
   2347         static_cast<int32>(delta_window_size));
   2348   }
   2349 }
   2350 
   2351 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
   2352                                 SpdyStreamId promised_stream_id) {
   2353   // TODO(akalin): Handle PUSH_PROMISE frames.
   2354 }
   2355 
   2356 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
   2357                                          uint32 delta_window_size) {
   2358   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2359   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   2360   CHECK(it != active_streams_.end());
   2361   CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2362   SendWindowUpdateFrame(
   2363       stream_id, delta_window_size, it->second.stream->priority());
   2364 }
   2365 
   2366 void SpdySession::SendInitialData() {
   2367   DCHECK(enable_sending_initial_data_);
   2368   DCHECK_NE(availability_state_, STATE_CLOSED);
   2369 
   2370   if (send_connection_header_prefix_) {
   2371     DCHECK_EQ(protocol_, kProtoHTTP2Draft04);
   2372     scoped_ptr<SpdyFrame> connection_header_prefix_frame(
   2373         new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
   2374                       kHttp2ConnectionHeaderPrefixSize,
   2375                       false /* take_ownership */));
   2376     // Count the prefix as part of the subsequent SETTINGS frame.
   2377     EnqueueSessionWrite(HIGHEST, SETTINGS,
   2378                         connection_header_prefix_frame.Pass());
   2379   }
   2380 
   2381   // First, notify the server about the settings they should use when
   2382   // communicating with us.
   2383   SettingsMap settings_map;
   2384   // Create a new settings frame notifying the server of our
   2385   // max concurrent streams and initial window size.
   2386   settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
   2387       SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
   2388   if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
   2389       stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
   2390     settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
   2391         SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
   2392                               stream_initial_recv_window_size_);
   2393   }
   2394   SendSettings(settings_map);
   2395 
   2396   // Next, notify the server about our initial recv window size.
   2397   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   2398     // Bump up the receive window size to the real initial value. This
   2399     // has to go here since the WINDOW_UPDATE frame sent by
   2400     // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
   2401     DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
   2402     // This condition implies that |kDefaultInitialRecvWindowSize| -
   2403     // |session_recv_window_size_| doesn't overflow.
   2404     DCHECK_GT(session_recv_window_size_, 0);
   2405     IncreaseRecvWindowSize(
   2406         kDefaultInitialRecvWindowSize - session_recv_window_size_);
   2407   }
   2408 
   2409   // Finally, notify the server about the settings they have
   2410   // previously told us to use when communicating with them (after
   2411   // applying them).
   2412   const SettingsMap& server_settings_map =
   2413       http_server_properties_->GetSpdySettings(host_port_pair());
   2414   if (server_settings_map.empty())
   2415     return;
   2416 
   2417   SettingsMap::const_iterator it =
   2418       server_settings_map.find(SETTINGS_CURRENT_CWND);
   2419   uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
   2420   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
   2421 
   2422   for (SettingsMap::const_iterator it = server_settings_map.begin();
   2423        it != server_settings_map.end(); ++it) {
   2424     const SpdySettingsIds new_id = it->first;
   2425     const uint32 new_val = it->second.second;
   2426     HandleSetting(new_id, new_val);
   2427   }
   2428 
   2429   SendSettings(server_settings_map);
   2430 }
   2431 
   2432 
   2433 void SpdySession::SendSettings(const SettingsMap& settings) {
   2434   DCHECK_NE(availability_state_, STATE_CLOSED);
   2435 
   2436   net_log_.AddEvent(
   2437       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
   2438       base::Bind(&NetLogSpdySendSettingsCallback, &settings));
   2439 
   2440   // Create the SETTINGS frame and send it.
   2441   DCHECK(buffered_spdy_framer_.get());
   2442   scoped_ptr<SpdyFrame> settings_frame(
   2443       buffered_spdy_framer_->CreateSettings(settings));
   2444   sent_settings_ = true;
   2445   EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
   2446 }
   2447 
   2448 void SpdySession::HandleSetting(uint32 id, uint32 value) {
   2449   switch (id) {
   2450     case SETTINGS_MAX_CONCURRENT_STREAMS:
   2451       max_concurrent_streams_ = std::min(static_cast<size_t>(value),
   2452                                          kMaxConcurrentStreamLimit);
   2453       ProcessPendingStreamRequests();
   2454       break;
   2455     case SETTINGS_INITIAL_WINDOW_SIZE: {
   2456       if (flow_control_state_ < FLOW_CONTROL_STREAM) {
   2457         net_log().AddEvent(
   2458             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
   2459         return;
   2460       }
   2461 
   2462       if (value > static_cast<uint32>(kint32max)) {
   2463         net_log().AddEvent(
   2464             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
   2465             NetLog::IntegerCallback("initial_window_size", value));
   2466         return;
   2467       }
   2468 
   2469       // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
   2470       int32 delta_window_size =
   2471           static_cast<int32>(value) - stream_initial_send_window_size_;
   2472       stream_initial_send_window_size_ = static_cast<int32>(value);
   2473       UpdateStreamsSendWindowSize(delta_window_size);
   2474       net_log().AddEvent(
   2475           NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
   2476           NetLog::IntegerCallback("delta_window_size", delta_window_size));
   2477       break;
   2478     }
   2479   }
   2480 }
   2481 
   2482 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
   2483   DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2484   for (ActiveStreamMap::iterator it = active_streams_.begin();
   2485        it != active_streams_.end(); ++it) {
   2486     it->second.stream->AdjustSendWindowSize(delta_window_size);
   2487   }
   2488 
   2489   for (CreatedStreamSet::const_iterator it = created_streams_.begin();
   2490        it != created_streams_.end(); it++) {
   2491     (*it)->AdjustSendWindowSize(delta_window_size);
   2492   }
   2493 }
   2494 
   2495 void SpdySession::SendPrefacePingIfNoneInFlight() {
   2496   if (pings_in_flight_ || !enable_ping_based_connection_checking_)
   2497     return;
   2498 
   2499   base::TimeTicks now = time_func_();
   2500   // If there is no activity in the session, then send a preface-PING.
   2501   if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
   2502     SendPrefacePing();
   2503 }
   2504 
   2505 void SpdySession::SendPrefacePing() {
   2506   WritePingFrame(next_ping_id_);
   2507 }
   2508 
   2509 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
   2510                                         uint32 delta_window_size,
   2511                                         RequestPriority priority) {
   2512   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2513   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   2514   if (it != active_streams_.end()) {
   2515     CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2516   } else {
   2517     CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2518     CHECK_EQ(stream_id, kSessionFlowControlStreamId);
   2519   }
   2520 
   2521   net_log_.AddEvent(
   2522       NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
   2523       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
   2524                  stream_id, delta_window_size));
   2525 
   2526   DCHECK(buffered_spdy_framer_.get());
   2527   scoped_ptr<SpdyFrame> window_update_frame(
   2528       buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
   2529   EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
   2530 }
   2531 
   2532 void SpdySession::WritePingFrame(uint32 unique_id) {
   2533   DCHECK(buffered_spdy_framer_.get());
   2534   scoped_ptr<SpdyFrame> ping_frame(
   2535       buffered_spdy_framer_->CreatePingFrame(unique_id));
   2536   EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
   2537 
   2538   if (net_log().IsLoggingAllEvents()) {
   2539     net_log().AddEvent(
   2540         NetLog::TYPE_SPDY_SESSION_PING,
   2541         base::Bind(&NetLogSpdyPingCallback, unique_id, "sent"));
   2542   }
   2543   if (unique_id % 2 != 0) {
   2544     next_ping_id_ += 2;
   2545     ++pings_in_flight_;
   2546     PlanToCheckPingStatus();
   2547     last_ping_sent_time_ = time_func_();
   2548   }
   2549 }
   2550 
   2551 void SpdySession::PlanToCheckPingStatus() {
   2552   if (check_ping_status_pending_)
   2553     return;
   2554 
   2555   check_ping_status_pending_ = true;
   2556   base::MessageLoop::current()->PostDelayedTask(
   2557       FROM_HERE,
   2558       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
   2559                  time_func_()), hung_interval_);
   2560 }
   2561 
   2562 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
   2563   CHECK(!in_io_loop_);
   2564   DCHECK_NE(availability_state_, STATE_CLOSED);
   2565 
   2566   // Check if we got a response back for all PINGs we had sent.
   2567   if (pings_in_flight_ == 0) {
   2568     check_ping_status_pending_ = false;
   2569     return;
   2570   }
   2571 
   2572   DCHECK(check_ping_status_pending_);
   2573 
   2574   base::TimeTicks now = time_func_();
   2575   base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
   2576 
   2577   if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
   2578     // Track all failed PING messages in a separate bucket.
   2579     const base::TimeDelta kFailedPing =
   2580         base::TimeDelta::FromInternalValue(INT_MAX);
   2581     RecordPingRTTHistogram(kFailedPing);
   2582     CloseSessionResult result =
   2583         DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping.");
   2584     DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
   2585     return;
   2586   }
   2587 
   2588   // Check the status of connection after a delay.
   2589   base::MessageLoop::current()->PostDelayedTask(
   2590       FROM_HERE,
   2591       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
   2592                  now),
   2593       delay);
   2594 }
   2595 
   2596 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
   2597   UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
   2598 }
   2599 
   2600 void SpdySession::RecordProtocolErrorHistogram(
   2601     SpdyProtocolErrorDetails details) {
   2602   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
   2603                             NUM_SPDY_PROTOCOL_ERROR_DETAILS);
   2604   if (EndsWith(host_port_pair().host(), "google.com", false)) {
   2605     UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
   2606                               NUM_SPDY_PROTOCOL_ERROR_DETAILS);
   2607   }
   2608 }
   2609 
   2610 void SpdySession::RecordHistograms() {
   2611   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
   2612                               streams_initiated_count_,
   2613                               0, 300, 50);
   2614   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
   2615                               streams_pushed_count_,
   2616                               0, 300, 50);
   2617   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
   2618                               streams_pushed_and_claimed_count_,
   2619                               0, 300, 50);
   2620   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
   2621                               streams_abandoned_count_,
   2622                               0, 300, 50);
   2623   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
   2624                             sent_settings_ ? 1 : 0, 2);
   2625   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
   2626                             received_settings_ ? 1 : 0, 2);
   2627   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
   2628                               stalled_streams_,
   2629                               0, 300, 50);
   2630   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
   2631                             stalled_streams_ > 0 ? 1 : 0, 2);
   2632 
   2633   if (received_settings_) {
   2634     // Enumerate the saved settings, and set histograms for it.
   2635     const SettingsMap& settings_map =
   2636         http_server_properties_->GetSpdySettings(host_port_pair());
   2637 
   2638     SettingsMap::const_iterator it;
   2639     for (it = settings_map.begin(); it != settings_map.end(); ++it) {
   2640       const SpdySettingsIds id = it->first;
   2641       const uint32 val = it->second.second;
   2642       switch (id) {
   2643         case SETTINGS_CURRENT_CWND:
   2644           // Record several different histograms to see if cwnd converges
   2645           // for larger volumes of data being sent.
   2646           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
   2647                                       val, 1, 200, 100);
   2648           if (total_bytes_received_ > 10 * 1024) {
   2649             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
   2650                                         val, 1, 200, 100);
   2651             if (total_bytes_received_ > 25 * 1024) {
   2652               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
   2653                                           val, 1, 200, 100);
   2654               if (total_bytes_received_ > 50 * 1024) {
   2655                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
   2656                                             val, 1, 200, 100);
   2657                 if (total_bytes_received_ > 100 * 1024) {
   2658                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
   2659                                               val, 1, 200, 100);
   2660                 }
   2661               }
   2662             }
   2663           }
   2664           break;
   2665         case SETTINGS_ROUND_TRIP_TIME:
   2666           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
   2667                                       val, 1, 1200, 100);
   2668           break;
   2669         case SETTINGS_DOWNLOAD_RETRANS_RATE:
   2670           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
   2671                                       val, 1, 100, 50);
   2672           break;
   2673         default:
   2674           break;
   2675       }
   2676     }
   2677   }
   2678 }
   2679 
   2680 void SpdySession::CompleteStreamRequest(SpdyStreamRequest* pending_request) {
   2681   CHECK(pending_request);
   2682 
   2683   PendingStreamRequestCompletionSet::iterator it =
   2684       pending_stream_request_completions_.find(pending_request);
   2685 
   2686   // Abort if the request has already been cancelled.
   2687   if (it == pending_stream_request_completions_.end())
   2688     return;
   2689 
   2690   base::WeakPtr<SpdyStream> stream;
   2691   int rv = CreateStream(*pending_request, &stream);
   2692   pending_stream_request_completions_.erase(it);
   2693 
   2694   if (rv == OK) {
   2695     DCHECK(stream.get());
   2696     pending_request->OnRequestCompleteSuccess(&stream);
   2697   } else {
   2698     DCHECK(!stream.get());
   2699     pending_request->OnRequestCompleteFailure(rv);
   2700   }
   2701 }
   2702 
   2703 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
   2704   if (!is_secure_)
   2705     return NULL;
   2706   SSLClientSocket* ssl_socket =
   2707       reinterpret_cast<SSLClientSocket*>(connection_->socket());
   2708   DCHECK(ssl_socket);
   2709   return ssl_socket;
   2710 }
   2711 
   2712 void SpdySession::OnWriteBufferConsumed(
   2713     size_t frame_payload_size,
   2714     size_t consume_size,
   2715     SpdyBuffer::ConsumeSource consume_source) {
   2716   // We can be called with |in_io_loop_| set if a write SpdyBuffer is
   2717   // deleted (e.g., a stream is closed due to incoming data).
   2718 
   2719   if (availability_state_ == STATE_CLOSED)
   2720     return;
   2721 
   2722   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2723 
   2724   if (consume_source == SpdyBuffer::DISCARD) {
   2725     // If we're discarding a frame or part of it, increase the send
   2726     // window by the number of discarded bytes. (Although if we're
   2727     // discarding part of a frame, it's probably because of a write
   2728     // error and we'll be tearing down the session soon.)
   2729     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
   2730     DCHECK_GT(remaining_payload_bytes, 0u);
   2731     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
   2732   }
   2733   // For consumed bytes, the send window is increased when we receive
   2734   // a WINDOW_UPDATE frame.
   2735 }
   2736 
   2737 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
   2738   // We can be called with |in_io_loop_| set if a SpdyBuffer is
   2739   // deleted (e.g., a stream is closed due to incoming data).
   2740 
   2741   DCHECK_NE(availability_state_, STATE_CLOSED);
   2742   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2743   DCHECK_GE(delta_window_size, 1);
   2744 
   2745   // Check for overflow.
   2746   int32 max_delta_window_size = kint32max - session_send_window_size_;
   2747   if (delta_window_size > max_delta_window_size) {
   2748     RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
   2749     CloseSessionResult result = DoCloseSession(
   2750         ERR_SPDY_PROTOCOL_ERROR,
   2751         "Received WINDOW_UPDATE [delta: " +
   2752         base::IntToString(delta_window_size) +
   2753         "] for session overflows session_send_window_size_ [current: " +
   2754         base::IntToString(session_send_window_size_) + "]");
   2755     DCHECK_NE(result, SESSION_ALREADY_CLOSED);
   2756     return;
   2757   }
   2758 
   2759   session_send_window_size_ += delta_window_size;
   2760 
   2761   net_log_.AddEvent(
   2762       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
   2763       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   2764                  delta_window_size, session_send_window_size_));
   2765 
   2766   DCHECK(!IsSendStalled());
   2767   ResumeSendStalledStreams();
   2768 }
   2769 
   2770 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
   2771   DCHECK_NE(availability_state_, STATE_CLOSED);
   2772   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2773 
   2774   // We only call this method when sending a frame. Therefore,
   2775   // |delta_window_size| should be within the valid frame size range.
   2776   DCHECK_GE(delta_window_size, 1);
   2777   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
   2778 
   2779   // |send_window_size_| should have been at least |delta_window_size| for
   2780   // this call to happen.
   2781   DCHECK_GE(session_send_window_size_, delta_window_size);
   2782 
   2783   session_send_window_size_ -= delta_window_size;
   2784 
   2785   net_log_.AddEvent(
   2786       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
   2787       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   2788                  -delta_window_size, session_send_window_size_));
   2789 }
   2790 
   2791 void SpdySession::OnReadBufferConsumed(
   2792     size_t consume_size,
   2793     SpdyBuffer::ConsumeSource consume_source) {
   2794   // We can be called with |in_io_loop_| set if a read SpdyBuffer is
   2795   // deleted (e.g., discarded by a SpdyReadQueue).
   2796 
   2797   if (availability_state_ == STATE_CLOSED)
   2798     return;
   2799 
   2800   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2801   DCHECK_GE(consume_size, 1u);
   2802   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
   2803 
   2804   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
   2805 }
   2806 
   2807 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
   2808   DCHECK_NE(availability_state_, STATE_CLOSED);
   2809   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2810   DCHECK_GE(session_unacked_recv_window_bytes_, 0);
   2811   DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
   2812   DCHECK_GE(delta_window_size, 1);
   2813   // Check for overflow.
   2814   DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
   2815 
   2816   session_recv_window_size_ += delta_window_size;
   2817   net_log_.AddEvent(
   2818       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
   2819       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   2820                  delta_window_size, session_recv_window_size_));
   2821 
   2822   session_unacked_recv_window_bytes_ += delta_window_size;
   2823   if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
   2824     SendWindowUpdateFrame(kSessionFlowControlStreamId,
   2825                           session_unacked_recv_window_bytes_,
   2826                           HIGHEST);
   2827     session_unacked_recv_window_bytes_ = 0;
   2828   }
   2829 }
   2830 
   2831 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
   2832   CHECK(in_io_loop_);
   2833   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2834   DCHECK_GE(delta_window_size, 1);
   2835 
   2836   // Since we never decrease the initial receive window size,
   2837   // |delta_window_size| should never cause |recv_window_size_| to go
   2838   // negative. If we do, the receive window isn't being respected.
   2839   if (delta_window_size > session_recv_window_size_) {
   2840     RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
   2841     CloseSessionResult result = DoCloseSession(
   2842         ERR_SPDY_PROTOCOL_ERROR,
   2843         "delta_window_size is " + base::IntToString(delta_window_size) +
   2844             " in DecreaseRecvWindowSize, which is larger than the receive " +
   2845             "window size of " + base::IntToString(session_recv_window_size_));
   2846     DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
   2847     return;
   2848   }
   2849 
   2850   session_recv_window_size_ -= delta_window_size;
   2851   net_log_.AddEvent(
   2852       NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
   2853       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   2854                  -delta_window_size, session_recv_window_size_));
   2855 }
   2856 
   2857 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
   2858   DCHECK(stream.send_stalled_by_flow_control());
   2859   stream_send_unstall_queue_[stream.priority()].push_back(stream.stream_id());
   2860 }
   2861 
   2862 namespace {
   2863 
   2864 // Helper function to return the total size of an array of objects
   2865 // with .size() member functions.
   2866 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
   2867   size_t total_size = 0;
   2868   for (size_t i = 0; i < N; ++i) {
   2869     total_size += arr[i].size();
   2870   }
   2871   return total_size;
   2872 }
   2873 
   2874 }  // namespace
   2875 
   2876 void SpdySession::ResumeSendStalledStreams() {
   2877   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2878 
   2879   // We don't have to worry about new streams being queued, since
   2880   // doing so would cause IsSendStalled() to return true. But we do
   2881   // have to worry about streams being closed, as well as ourselves
   2882   // being closed.
   2883 
   2884   while (availability_state_ != STATE_CLOSED && !IsSendStalled()) {
   2885     size_t old_size = 0;
   2886     if (DCHECK_IS_ON())
   2887       old_size = GetTotalSize(stream_send_unstall_queue_);
   2888 
   2889     SpdyStreamId stream_id = PopStreamToPossiblyResume();
   2890     if (stream_id == 0)
   2891       break;
   2892     ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   2893     // The stream may actually still be send-stalled after this (due
   2894     // to its own send window) but that's okay -- it'll then be
   2895     // resumed once its send window increases.
   2896     if (it != active_streams_.end())
   2897       it->second.stream->PossiblyResumeIfSendStalled();
   2898 
   2899     // The size should decrease unless we got send-stalled again.
   2900     if (!IsSendStalled())
   2901       DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
   2902   }
   2903 }
   2904 
   2905 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
   2906   for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
   2907     std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
   2908     if (!queue->empty()) {
   2909       SpdyStreamId stream_id = queue->front();
   2910       queue->pop_front();
   2911       return stream_id;
   2912     }
   2913   }
   2914   return 0;
   2915 }
   2916 
   2917 }  // namespace net
   2918