Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_session.h"
      6 
      7 #include <algorithm>
      8 #include <map>
      9 
     10 #include "base/basictypes.h"
     11 #include "base/bind.h"
     12 #include "base/compiler_specific.h"
     13 #include "base/logging.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "base/metrics/field_trial.h"
     16 #include "base/metrics/histogram.h"
     17 #include "base/metrics/sparse_histogram.h"
     18 #include "base/metrics/stats_counters.h"
     19 #include "base/stl_util.h"
     20 #include "base/strings/string_number_conversions.h"
     21 #include "base/strings/string_util.h"
     22 #include "base/strings/stringprintf.h"
     23 #include "base/strings/utf_string_conversions.h"
     24 #include "base/time/time.h"
     25 #include "base/values.h"
     26 #include "crypto/ec_private_key.h"
     27 #include "crypto/ec_signature_creator.h"
     28 #include "net/base/connection_type_histograms.h"
     29 #include "net/base/net_log.h"
     30 #include "net/base/net_util.h"
     31 #include "net/cert/asn1_util.h"
     32 #include "net/http/http_network_session.h"
     33 #include "net/http/http_server_properties.h"
     34 #include "net/spdy/spdy_buffer_producer.h"
     35 #include "net/spdy/spdy_frame_builder.h"
     36 #include "net/spdy/spdy_http_utils.h"
     37 #include "net/spdy/spdy_protocol.h"
     38 #include "net/spdy/spdy_session_pool.h"
     39 #include "net/spdy/spdy_stream.h"
     40 #include "net/ssl/server_bound_cert_service.h"
     41 
     42 namespace net {
     43 
     44 namespace {
     45 
     46 const int kReadBufferSize = 8 * 1024;
     47 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
     48 const int kHungIntervalSeconds = 10;
     49 
     50 // Always start at 1 for the first stream id.
     51 const SpdyStreamId kFirstStreamId = 1;
     52 
     53 // Minimum seconds that unclaimed pushed streams will be kept in memory.
     54 const int kMinPushedStreamLifetimeSeconds = 300;
     55 
     56 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
     57     const SpdyHeaderBlock& headers) {
     58   scoped_ptr<base::ListValue> headers_list(new base::ListValue());
     59   for (SpdyHeaderBlock::const_iterator it = headers.begin();
     60        it != headers.end(); ++it) {
     61     headers_list->AppendString(
     62         it->first + ": " +
     63         (ShouldShowHttpHeaderValue(it->first) ? it->second : "[elided]"));
     64   }
     65   return headers_list.Pass();
     66 }
     67 
     68 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
     69                                              bool fin,
     70                                              bool unidirectional,
     71                                              SpdyPriority spdy_priority,
     72                                              SpdyStreamId stream_id,
     73                                              NetLog::LogLevel /* log_level */) {
     74   base::DictionaryValue* dict = new base::DictionaryValue();
     75   dict->Set("headers", SpdyHeaderBlockToListValue(*headers).release());
     76   dict->SetBoolean("fin", fin);
     77   dict->SetBoolean("unidirectional", unidirectional);
     78   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
     79   dict->SetInteger("stream_id", stream_id);
     80   return dict;
     81 }
     82 
     83 base::Value* NetLogSpdySynStreamReceivedCallback(
     84     const SpdyHeaderBlock* headers,
     85     bool fin,
     86     bool unidirectional,
     87     SpdyPriority spdy_priority,
     88     SpdyStreamId stream_id,
     89     SpdyStreamId associated_stream,
     90     NetLog::LogLevel /* log_level */) {
     91   base::DictionaryValue* dict = new base::DictionaryValue();
     92   dict->Set("headers", SpdyHeaderBlockToListValue(*headers).release());
     93   dict->SetBoolean("fin", fin);
     94   dict->SetBoolean("unidirectional", unidirectional);
     95   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
     96   dict->SetInteger("stream_id", stream_id);
     97   dict->SetInteger("associated_stream", associated_stream);
     98   return dict;
     99 }
    100 
    101 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
    102     const SpdyHeaderBlock* headers,
    103     bool fin,
    104     SpdyStreamId stream_id,
    105     NetLog::LogLevel /* log_level */) {
    106   base::DictionaryValue* dict = new base::DictionaryValue();
    107   dict->Set("headers", SpdyHeaderBlockToListValue(*headers).release());
    108   dict->SetBoolean("fin", fin);
    109   dict->SetInteger("stream_id", stream_id);
    110   return dict;
    111 }
    112 
    113 base::Value* NetLogSpdySessionCloseCallback(int net_error,
    114                                             const std::string* description,
    115                                             NetLog::LogLevel /* log_level */) {
    116   base::DictionaryValue* dict = new base::DictionaryValue();
    117   dict->SetInteger("net_error", net_error);
    118   dict->SetString("description", *description);
    119   return dict;
    120 }
    121 
    122 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
    123                                        NetLog::LogLevel /* log_level */) {
    124   base::DictionaryValue* dict = new base::DictionaryValue();
    125   dict->SetString("host", host_pair->first.ToString());
    126   dict->SetString("proxy", host_pair->second.ToPacString());
    127   return dict;
    128 }
    129 
    130 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
    131                                         bool clear_persisted,
    132                                         NetLog::LogLevel /* log_level */) {
    133   base::DictionaryValue* dict = new base::DictionaryValue();
    134   dict->SetString("host", host_port_pair.ToString());
    135   dict->SetBoolean("clear_persisted", clear_persisted);
    136   return dict;
    137 }
    138 
    139 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
    140                                        SpdySettingsFlags flags,
    141                                        uint32 value,
    142                                        NetLog::LogLevel /* log_level */) {
    143   base::DictionaryValue* dict = new base::DictionaryValue();
    144   dict->SetInteger("id", id);
    145   dict->SetInteger("flags", flags);
    146   dict->SetInteger("value", value);
    147   return dict;
    148 }
    149 
    150 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings,
    151                                             NetLog::LogLevel /* log_level */) {
    152   base::DictionaryValue* dict = new base::DictionaryValue();
    153   base::ListValue* settings_list = new base::ListValue();
    154   for (SettingsMap::const_iterator it = settings->begin();
    155        it != settings->end(); ++it) {
    156     const SpdySettingsIds id = it->first;
    157     const SpdySettingsFlags flags = it->second.first;
    158     const uint32 value = it->second.second;
    159     settings_list->Append(new base::StringValue(
    160         base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value)));
    161   }
    162   dict->Set("settings", settings_list);
    163   return dict;
    164 }
    165 
    166 base::Value* NetLogSpdyWindowUpdateFrameCallback(
    167     SpdyStreamId stream_id,
    168     uint32 delta,
    169     NetLog::LogLevel /* log_level */) {
    170   base::DictionaryValue* dict = new base::DictionaryValue();
    171   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    172   dict->SetInteger("delta", delta);
    173   return dict;
    174 }
    175 
    176 base::Value* NetLogSpdySessionWindowUpdateCallback(
    177     int32 delta,
    178     int32 window_size,
    179     NetLog::LogLevel /* log_level */) {
    180   base::DictionaryValue* dict = new base::DictionaryValue();
    181   dict->SetInteger("delta", delta);
    182   dict->SetInteger("window_size", window_size);
    183   return dict;
    184 }
    185 
    186 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
    187                                     int size,
    188                                     bool fin,
    189                                     NetLog::LogLevel /* log_level */) {
    190   base::DictionaryValue* dict = new base::DictionaryValue();
    191   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    192   dict->SetInteger("size", size);
    193   dict->SetBoolean("fin", fin);
    194   return dict;
    195 }
    196 
    197 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
    198                                    int status,
    199                                    const std::string* description,
    200                                    NetLog::LogLevel /* log_level */) {
    201   base::DictionaryValue* dict = new base::DictionaryValue();
    202   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    203   dict->SetInteger("status", status);
    204   dict->SetString("description", *description);
    205   return dict;
    206 }
    207 
    208 base::Value* NetLogSpdyPingCallback(uint32 unique_id,
    209                                     const char* type,
    210                                     NetLog::LogLevel /* log_level */) {
    211   base::DictionaryValue* dict = new base::DictionaryValue();
    212   dict->SetInteger("unique_id", unique_id);
    213   dict->SetString("type", type);
    214   return dict;
    215 }
    216 
    217 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
    218                                       int active_streams,
    219                                       int unclaimed_streams,
    220                                       SpdyGoAwayStatus status,
    221                                       NetLog::LogLevel /* log_level */) {
    222   base::DictionaryValue* dict = new base::DictionaryValue();
    223   dict->SetInteger("last_accepted_stream_id",
    224                    static_cast<int>(last_stream_id));
    225   dict->SetInteger("active_streams", active_streams);
    226   dict->SetInteger("unclaimed_streams", unclaimed_streams);
    227   dict->SetInteger("status", static_cast<int>(status));
    228   return dict;
    229 }
    230 
    231 // Helper function to return the total size of an array of objects
    232 // with .size() member functions.
    233 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
    234   size_t total_size = 0;
    235   for (size_t i = 0; i < N; ++i) {
    236     total_size += arr[i].size();
    237   }
    238   return total_size;
    239 }
    240 
    241 // Helper class for std:find_if on STL container containing
    242 // SpdyStreamRequest weak pointers.
    243 class RequestEquals {
    244  public:
    245   RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
    246       : request_(request) {}
    247 
    248   bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
    249     return request_.get() == request.get();
    250   }
    251 
    252  private:
    253   const base::WeakPtr<SpdyStreamRequest> request_;
    254 };
    255 
    256 // The maximum number of concurrent streams we will ever create.  Even if
    257 // the server permits more, we will never exceed this limit.
    258 const size_t kMaxConcurrentStreamLimit = 256;
    259 
    260 }  // namespace
    261 
    262 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
    263   Reset();
    264 }
    265 
    266 SpdyStreamRequest::~SpdyStreamRequest() {
    267   CancelRequest();
    268 }
    269 
    270 int SpdyStreamRequest::StartRequest(
    271     SpdyStreamType type,
    272     const base::WeakPtr<SpdySession>& session,
    273     const GURL& url,
    274     RequestPriority priority,
    275     const BoundNetLog& net_log,
    276     const CompletionCallback& callback) {
    277   DCHECK(session);
    278   DCHECK(!session_);
    279   DCHECK(!stream_);
    280   DCHECK(callback_.is_null());
    281 
    282   type_ = type;
    283   session_ = session;
    284   url_ = url;
    285   priority_ = priority;
    286   net_log_ = net_log;
    287   callback_ = callback;
    288 
    289   base::WeakPtr<SpdyStream> stream;
    290   int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
    291   if (rv == OK) {
    292     Reset();
    293     stream_ = stream;
    294   }
    295   return rv;
    296 }
    297 
    298 void SpdyStreamRequest::CancelRequest() {
    299   if (session_)
    300     session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
    301   Reset();
    302   // Do this to cancel any pending CompleteStreamRequest() tasks.
    303   weak_ptr_factory_.InvalidateWeakPtrs();
    304 }
    305 
    306 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
    307   DCHECK(!session_);
    308   base::WeakPtr<SpdyStream> stream = stream_;
    309   DCHECK(stream);
    310   Reset();
    311   return stream;
    312 }
    313 
    314 void SpdyStreamRequest::OnRequestCompleteSuccess(
    315     const base::WeakPtr<SpdyStream>& stream) {
    316   DCHECK(session_);
    317   DCHECK(!stream_);
    318   DCHECK(!callback_.is_null());
    319   CompletionCallback callback = callback_;
    320   Reset();
    321   DCHECK(stream);
    322   stream_ = stream;
    323   callback.Run(OK);
    324 }
    325 
    326 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
    327   DCHECK(session_);
    328   DCHECK(!stream_);
    329   DCHECK(!callback_.is_null());
    330   CompletionCallback callback = callback_;
    331   Reset();
    332   DCHECK_NE(rv, OK);
    333   callback.Run(rv);
    334 }
    335 
    336 void SpdyStreamRequest::Reset() {
    337   type_ = SPDY_BIDIRECTIONAL_STREAM;
    338   session_.reset();
    339   stream_.reset();
    340   url_ = GURL();
    341   priority_ = MINIMUM_PRIORITY;
    342   net_log_ = BoundNetLog();
    343   callback_.Reset();
    344 }
    345 
    346 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
    347     : stream(NULL),
    348       waiting_for_syn_reply(false) {}
    349 
    350 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
    351     : stream(stream),
    352       waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {}
    353 
    354 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
    355 
    356 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
    357 
    358 SpdySession::PushedStreamInfo::PushedStreamInfo(
    359     SpdyStreamId stream_id,
    360     base::TimeTicks creation_time)
    361     : stream_id(stream_id),
    362       creation_time(creation_time) {}
    363 
    364 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
    365 
    366 SpdySession::SpdySession(
    367     const SpdySessionKey& spdy_session_key,
    368     const base::WeakPtr<HttpServerProperties>& http_server_properties,
    369     bool verify_domain_authentication,
    370     bool enable_sending_initial_data,
    371     bool enable_compression,
    372     bool enable_ping_based_connection_checking,
    373     NextProto default_protocol,
    374     size_t stream_initial_recv_window_size,
    375     size_t initial_max_concurrent_streams,
    376     size_t max_concurrent_streams_limit,
    377     TimeFunc time_func,
    378     const HostPortPair& trusted_spdy_proxy,
    379     NetLog* net_log)
    380     : weak_factory_(this),
    381       in_io_loop_(false),
    382       spdy_session_key_(spdy_session_key),
    383       pool_(NULL),
    384       http_server_properties_(http_server_properties),
    385       read_buffer_(new IOBuffer(kReadBufferSize)),
    386       stream_hi_water_mark_(kFirstStreamId),
    387       in_flight_write_frame_type_(DATA),
    388       in_flight_write_frame_size_(0),
    389       is_secure_(false),
    390       certificate_error_code_(OK),
    391       availability_state_(STATE_AVAILABLE),
    392       read_state_(READ_STATE_DO_READ),
    393       write_state_(WRITE_STATE_IDLE),
    394       error_on_close_(OK),
    395       max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
    396                               kInitialMaxConcurrentStreams :
    397                               initial_max_concurrent_streams),
    398       max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ?
    399                                     kMaxConcurrentStreamLimit :
    400                                     max_concurrent_streams_limit),
    401       streams_initiated_count_(0),
    402       streams_pushed_count_(0),
    403       streams_pushed_and_claimed_count_(0),
    404       streams_abandoned_count_(0),
    405       total_bytes_received_(0),
    406       sent_settings_(false),
    407       received_settings_(false),
    408       stalled_streams_(0),
    409       pings_in_flight_(0),
    410       next_ping_id_(1),
    411       last_activity_time_(time_func()),
    412       last_compressed_frame_len_(0),
    413       check_ping_status_pending_(false),
    414       send_connection_header_prefix_(false),
    415       flow_control_state_(FLOW_CONTROL_NONE),
    416       stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
    417       stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 ?
    418                                        kDefaultInitialRecvWindowSize :
    419                                        stream_initial_recv_window_size),
    420       session_send_window_size_(0),
    421       session_recv_window_size_(0),
    422       session_unacked_recv_window_bytes_(0),
    423       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
    424       verify_domain_authentication_(verify_domain_authentication),
    425       enable_sending_initial_data_(enable_sending_initial_data),
    426       enable_compression_(enable_compression),
    427       enable_ping_based_connection_checking_(
    428           enable_ping_based_connection_checking),
    429       protocol_(default_protocol),
    430       connection_at_risk_of_loss_time_(
    431           base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
    432       hung_interval_(
    433           base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
    434       trusted_spdy_proxy_(trusted_spdy_proxy),
    435       time_func_(time_func) {
    436   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
    437   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
    438   DCHECK(HttpStreamFactory::spdy_enabled());
    439   net_log_.BeginEvent(
    440       NetLog::TYPE_SPDY_SESSION,
    441       base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
    442   next_unclaimed_push_stream_sweep_time_ = time_func_() +
    443       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
    444   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
    445 }
    446 
    447 SpdySession::~SpdySession() {
    448   CHECK(!in_io_loop_);
    449   DCHECK(!pool_);
    450   DcheckClosed();
    451 
    452   // TODO(akalin): Check connection->is_initialized() instead. This
    453   // requires re-working CreateFakeSpdySession(), though.
    454   DCHECK(connection_->socket());
    455   // With SPDY we can't recycle sockets.
    456   connection_->socket()->Disconnect();
    457 
    458   RecordHistograms();
    459 
    460   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
    461 }
    462 
    463 Error SpdySession::InitializeWithSocket(
    464     scoped_ptr<ClientSocketHandle> connection,
    465     SpdySessionPool* pool,
    466     bool is_secure,
    467     int certificate_error_code) {
    468   CHECK(!in_io_loop_);
    469   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
    470   DCHECK_EQ(read_state_, READ_STATE_DO_READ);
    471   DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
    472   DCHECK(!connection_);
    473 
    474   DCHECK(certificate_error_code == OK ||
    475          certificate_error_code < ERR_IO_PENDING);
    476   // TODO(akalin): Check connection->is_initialized() instead. This
    477   // requires re-working CreateFakeSpdySession(), though.
    478   DCHECK(connection->socket());
    479 
    480   base::StatsCounter spdy_sessions("spdy.sessions");
    481   spdy_sessions.Increment();
    482 
    483   connection_ = connection.Pass();
    484   is_secure_ = is_secure;
    485   certificate_error_code_ = certificate_error_code;
    486 
    487   NextProto protocol_negotiated =
    488       connection_->socket()->GetNegotiatedProtocol();
    489   if (protocol_negotiated != kProtoUnknown) {
    490     protocol_ = protocol_negotiated;
    491   }
    492   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
    493   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
    494 
    495   if (protocol_ == kProtoHTTP2Draft04)
    496     send_connection_header_prefix_ = true;
    497 
    498   if (protocol_ >= kProtoSPDY31) {
    499     flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
    500     session_send_window_size_ = kSpdySessionInitialWindowSize;
    501     session_recv_window_size_ = kSpdySessionInitialWindowSize;
    502   } else if (protocol_ >= kProtoSPDY3) {
    503     flow_control_state_ = FLOW_CONTROL_STREAM;
    504   } else {
    505     flow_control_state_ = FLOW_CONTROL_NONE;
    506   }
    507 
    508   buffered_spdy_framer_.reset(
    509       new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
    510                              enable_compression_));
    511   buffered_spdy_framer_->set_visitor(this);
    512   buffered_spdy_framer_->set_debug_visitor(this);
    513   UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion);
    514 #if defined(SPDY_PROXY_AUTH_ORIGIN)
    515   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
    516                         host_port_pair().Equals(HostPortPair::FromURL(
    517                             GURL(SPDY_PROXY_AUTH_ORIGIN))));
    518 #endif
    519 
    520   net_log_.AddEvent(
    521       NetLog::TYPE_SPDY_SESSION_INITIALIZED,
    522       connection_->socket()->NetLog().source().ToEventParametersCallback());
    523 
    524   int error = DoReadLoop(READ_STATE_DO_READ, OK);
    525   if (error == ERR_IO_PENDING)
    526     error = OK;
    527   if (error == OK) {
    528     DCHECK_NE(availability_state_, STATE_CLOSED);
    529     connection_->AddHigherLayeredPool(this);
    530     if (enable_sending_initial_data_)
    531       SendInitialData();
    532     pool_ = pool;
    533   } else {
    534     DcheckClosed();
    535   }
    536   return static_cast<Error>(error);
    537 }
    538 
    539 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
    540   if (!verify_domain_authentication_)
    541     return true;
    542 
    543   if (availability_state_ == STATE_CLOSED)
    544     return false;
    545 
    546   SSLInfo ssl_info;
    547   bool was_npn_negotiated;
    548   NextProto protocol_negotiated = kProtoUnknown;
    549   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
    550     return true;   // This is not a secure session, so all domains are okay.
    551 
    552   bool unused = false;
    553   return
    554       !ssl_info.client_cert_sent &&
    555       (!ssl_info.channel_id_sent ||
    556        (ServerBoundCertService::GetDomainForHost(domain) ==
    557         ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) &&
    558       ssl_info.cert->VerifyNameMatch(domain, &unused);
    559 }
    560 
    561 int SpdySession::GetPushStream(
    562     const GURL& url,
    563     base::WeakPtr<SpdyStream>* stream,
    564     const BoundNetLog& stream_net_log) {
    565   CHECK(!in_io_loop_);
    566 
    567   stream->reset();
    568 
    569   // TODO(akalin): Add unit test exercising this code path.
    570   if (availability_state_ == STATE_CLOSED)
    571     return ERR_CONNECTION_CLOSED;
    572 
    573   Error err = TryAccessStream(url);
    574   if (err != OK)
    575     return err;
    576 
    577   *stream = GetActivePushStream(url);
    578   if (*stream) {
    579     DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
    580     streams_pushed_and_claimed_count_++;
    581   }
    582   return OK;
    583 }
    584 
    585 // {,Try}CreateStream() and TryAccessStream() can be called with
    586 // |in_io_loop_| set if a stream is being created in response to
    587 // another being closed due to received data.
    588 
    589 Error SpdySession::TryAccessStream(const GURL& url) {
    590   DCHECK_NE(availability_state_, STATE_CLOSED);
    591 
    592   if (is_secure_ && certificate_error_code_ != OK &&
    593       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
    594     RecordProtocolErrorHistogram(
    595         PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
    596     CloseSessionResult result = DoCloseSession(
    597         static_cast<Error>(certificate_error_code_),
    598         "Tried to get SPDY stream for secure content over an unauthenticated "
    599         "session.");
    600     DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
    601     return ERR_SPDY_PROTOCOL_ERROR;
    602   }
    603   return OK;
    604 }
    605 
    606 int SpdySession::TryCreateStream(
    607     const base::WeakPtr<SpdyStreamRequest>& request,
    608     base::WeakPtr<SpdyStream>* stream) {
    609   DCHECK(request);
    610 
    611   if (availability_state_ == STATE_GOING_AWAY)
    612     return ERR_FAILED;
    613 
    614   // TODO(akalin): Add unit test exercising this code path.
    615   if (availability_state_ == STATE_CLOSED)
    616     return ERR_CONNECTION_CLOSED;
    617 
    618   Error err = TryAccessStream(request->url());
    619   if (err != OK)
    620     return err;
    621 
    622   if (!max_concurrent_streams_ ||
    623       (active_streams_.size() + created_streams_.size() <
    624        max_concurrent_streams_)) {
    625     return CreateStream(*request, stream);
    626   }
    627 
    628   stalled_streams_++;
    629   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
    630   RequestPriority priority = request->priority();
    631   CHECK_GE(priority, MINIMUM_PRIORITY);
    632   CHECK_LE(priority, MAXIMUM_PRIORITY);
    633   pending_create_stream_queues_[priority].push_back(request);
    634   return ERR_IO_PENDING;
    635 }
    636 
    637 int SpdySession::CreateStream(const SpdyStreamRequest& request,
    638                               base::WeakPtr<SpdyStream>* stream) {
    639   DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
    640   DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
    641 
    642   if (availability_state_ == STATE_GOING_AWAY)
    643     return ERR_FAILED;
    644 
    645   // TODO(akalin): Add unit test exercising this code path.
    646   if (availability_state_ == STATE_CLOSED)
    647     return ERR_CONNECTION_CLOSED;
    648 
    649   Error err = TryAccessStream(request.url());
    650   if (err != OK) {
    651     // This should have been caught in TryCreateStream().
    652     NOTREACHED();
    653     return err;
    654   }
    655 
    656   DCHECK(connection_->socket());
    657   DCHECK(connection_->socket()->IsConnected());
    658   if (connection_->socket()) {
    659     UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
    660                           connection_->socket()->IsConnected());
    661     if (!connection_->socket()->IsConnected()) {
    662       CloseSessionResult result = DoCloseSession(
    663           ERR_CONNECTION_CLOSED,
    664           "Tried to create SPDY stream for a closed socket connection.");
    665       DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
    666       return ERR_CONNECTION_CLOSED;
    667     }
    668   }
    669 
    670   scoped_ptr<SpdyStream> new_stream(
    671       new SpdyStream(request.type(), GetWeakPtr(), request.url(),
    672                      request.priority(),
    673                      stream_initial_send_window_size_,
    674                      stream_initial_recv_window_size_,
    675                      request.net_log()));
    676   *stream = new_stream->GetWeakPtr();
    677   InsertCreatedStream(new_stream.Pass());
    678 
    679   UMA_HISTOGRAM_CUSTOM_COUNTS(
    680       "Net.SpdyPriorityCount",
    681       static_cast<int>(request.priority()), 0, 10, 11);
    682 
    683   return OK;
    684 }
    685 
    686 void SpdySession::CancelStreamRequest(
    687     const base::WeakPtr<SpdyStreamRequest>& request) {
    688   DCHECK(request);
    689   RequestPriority priority = request->priority();
    690   CHECK_GE(priority, MINIMUM_PRIORITY);
    691   CHECK_LE(priority, MAXIMUM_PRIORITY);
    692 
    693   if (DCHECK_IS_ON()) {
    694     // |request| should not be in a queue not matching its priority.
    695     for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
    696       if (priority == i)
    697         continue;
    698       PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
    699       DCHECK(std::find_if(queue->begin(),
    700                           queue->end(),
    701                           RequestEquals(request)) == queue->end());
    702     }
    703   }
    704 
    705   PendingStreamRequestQueue* queue =
    706       &pending_create_stream_queues_[priority];
    707   // Remove |request| from |queue| while preserving the order of the
    708   // other elements.
    709   PendingStreamRequestQueue::iterator it =
    710       std::find_if(queue->begin(), queue->end(), RequestEquals(request));
    711   // The request may already be removed if there's a
    712   // CompleteStreamRequest() in flight.
    713   if (it != queue->end()) {
    714     it = queue->erase(it);
    715     // |request| should be in the queue at most once, and if it is
    716     // present, should not be pending completion.
    717     DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
    718            queue->end());
    719   }
    720 }
    721 
    722 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
    723   for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
    724     if (pending_create_stream_queues_[j].empty())
    725       continue;
    726 
    727     base::WeakPtr<SpdyStreamRequest> pending_request =
    728         pending_create_stream_queues_[j].front();
    729     DCHECK(pending_request);
    730     pending_create_stream_queues_[j].pop_front();
    731     return pending_request;
    732   }
    733   return base::WeakPtr<SpdyStreamRequest>();
    734 }
    735 
    736 void SpdySession::ProcessPendingStreamRequests() {
    737   // Like |max_concurrent_streams_|, 0 means infinite for
    738   // |max_requests_to_process|.
    739   size_t max_requests_to_process = 0;
    740   if (max_concurrent_streams_ != 0) {
    741     max_requests_to_process =
    742         max_concurrent_streams_ -
    743         (active_streams_.size() + created_streams_.size());
    744   }
    745   for (size_t i = 0;
    746        max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
    747     base::WeakPtr<SpdyStreamRequest> pending_request =
    748         GetNextPendingStreamRequest();
    749     if (!pending_request)
    750       break;
    751 
    752     base::MessageLoop::current()->PostTask(
    753         FROM_HERE,
    754         base::Bind(&SpdySession::CompleteStreamRequest,
    755                    weak_factory_.GetWeakPtr(),
    756                    pending_request));
    757   }
    758 }
    759 
    760 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
    761   pooled_aliases_.insert(alias_key);
    762 }
    763 
    764 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
    765   DCHECK(buffered_spdy_framer_.get());
    766   return buffered_spdy_framer_->protocol_version();
    767 }
    768 
    769 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
    770   return weak_factory_.GetWeakPtr();
    771 }
    772 
    773 bool SpdySession::CloseOneIdleConnection() {
    774   CHECK(!in_io_loop_);
    775   DCHECK_NE(availability_state_, STATE_CLOSED);
    776   DCHECK(pool_);
    777   if (!active_streams_.empty())
    778     return false;
    779   CloseSessionResult result =
    780       DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection.");
    781   if (result != SESSION_CLOSED_AND_REMOVED) {
    782     NOTREACHED();
    783     return false;
    784   }
    785   return true;
    786 }
    787 
    788 void SpdySession::EnqueueStreamWrite(
    789     const base::WeakPtr<SpdyStream>& stream,
    790     SpdyFrameType frame_type,
    791     scoped_ptr<SpdyBufferProducer> producer) {
    792   DCHECK(frame_type == HEADERS ||
    793          frame_type == DATA ||
    794          frame_type == CREDENTIAL ||
    795          frame_type == SYN_STREAM);
    796   EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
    797 }
    798 
    799 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
    800     SpdyStreamId stream_id,
    801     RequestPriority priority,
    802     uint8 credential_slot,
    803     SpdyControlFlags flags,
    804     const SpdyHeaderBlock& headers) {
    805   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
    806   CHECK(it != active_streams_.end());
    807   CHECK_EQ(it->second.stream->stream_id(), stream_id);
    808 
    809   SendPrefacePingIfNoneInFlight();
    810 
    811   DCHECK(buffered_spdy_framer_.get());
    812   SpdyPriority spdy_priority =
    813       ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
    814   scoped_ptr<SpdyFrame> syn_frame(
    815       buffered_spdy_framer_->CreateSynStream(
    816           stream_id, 0, spdy_priority,
    817           credential_slot, flags, &headers));
    818 
    819   base::StatsCounter spdy_requests("spdy.requests");
    820   spdy_requests.Increment();
    821   streams_initiated_count_++;
    822 
    823   if (net_log().IsLoggingAllEvents()) {
    824     net_log().AddEvent(
    825         NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
    826         base::Bind(&NetLogSpdySynStreamSentCallback, &headers,
    827                    (flags & CONTROL_FLAG_FIN) != 0,
    828                    (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
    829                    spdy_priority,
    830                    stream_id));
    831   }
    832 
    833   return syn_frame.Pass();
    834 }
    835 
    836 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
    837                                                      IOBuffer* data,
    838                                                      int len,
    839                                                      SpdyDataFlags flags) {
    840   if (availability_state_ == STATE_CLOSED) {
    841     NOTREACHED();
    842     return scoped_ptr<SpdyBuffer>();
    843   }
    844 
    845   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
    846   CHECK(it != active_streams_.end());
    847   SpdyStream* stream = it->second.stream;
    848   CHECK_EQ(stream->stream_id(), stream_id);
    849 
    850   if (len < 0) {
    851     NOTREACHED();
    852     return scoped_ptr<SpdyBuffer>();
    853   }
    854 
    855   int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
    856 
    857   bool send_stalled_by_stream =
    858       (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
    859       (stream->send_window_size() <= 0);
    860   bool send_stalled_by_session = IsSendStalled();
    861 
    862   // NOTE: There's an enum of the same name in histograms.xml.
    863   enum SpdyFrameFlowControlState {
    864     SEND_NOT_STALLED,
    865     SEND_STALLED_BY_STREAM,
    866     SEND_STALLED_BY_SESSION,
    867     SEND_STALLED_BY_STREAM_AND_SESSION,
    868   };
    869 
    870   SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
    871   if (send_stalled_by_stream) {
    872     if (send_stalled_by_session) {
    873       frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
    874     } else {
    875       frame_flow_control_state = SEND_STALLED_BY_STREAM;
    876     }
    877   } else if (send_stalled_by_session) {
    878     frame_flow_control_state = SEND_STALLED_BY_SESSION;
    879   }
    880 
    881   if (flow_control_state_ == FLOW_CONTROL_STREAM) {
    882     UMA_HISTOGRAM_ENUMERATION(
    883         "Net.SpdyFrameStreamFlowControlState",
    884         frame_flow_control_state,
    885         SEND_STALLED_BY_STREAM + 1);
    886   } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
    887     UMA_HISTOGRAM_ENUMERATION(
    888         "Net.SpdyFrameStreamAndSessionFlowControlState",
    889         frame_flow_control_state,
    890         SEND_STALLED_BY_STREAM_AND_SESSION + 1);
    891   }
    892 
    893   // Obey send window size of the stream if stream flow control is
    894   // enabled.
    895   if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
    896     if (send_stalled_by_stream) {
    897       stream->set_send_stalled_by_flow_control(true);
    898       // Even though we're currently stalled only by the stream, we
    899       // might end up being stalled by the session also.
    900       QueueSendStalledStream(*stream);
    901       net_log().AddEvent(
    902           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
    903           NetLog::IntegerCallback("stream_id", stream_id));
    904       return scoped_ptr<SpdyBuffer>();
    905     }
    906 
    907     effective_len = std::min(effective_len, stream->send_window_size());
    908   }
    909 
    910   // Obey send window size of the session if session flow control is
    911   // enabled.
    912   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
    913     if (send_stalled_by_session) {
    914       stream->set_send_stalled_by_flow_control(true);
    915       QueueSendStalledStream(*stream);
    916       net_log().AddEvent(
    917           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
    918           NetLog::IntegerCallback("stream_id", stream_id));
    919       return scoped_ptr<SpdyBuffer>();
    920     }
    921 
    922     effective_len = std::min(effective_len, session_send_window_size_);
    923   }
    924 
    925   DCHECK_GE(effective_len, 0);
    926 
    927   // Clear FIN flag if only some of the data will be in the data
    928   // frame.
    929   if (effective_len < len)
    930     flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
    931 
    932   if (net_log().IsLoggingAllEvents()) {
    933     net_log().AddEvent(
    934         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
    935         base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
    936                    (flags & DATA_FLAG_FIN) != 0));
    937   }
    938 
    939   // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
    940   if (effective_len > 0)
    941     SendPrefacePingIfNoneInFlight();
    942 
    943   // TODO(mbelshe): reduce memory copies here.
    944   DCHECK(buffered_spdy_framer_.get());
    945   scoped_ptr<SpdyFrame> frame(
    946       buffered_spdy_framer_->CreateDataFrame(
    947           stream_id, data->data(),
    948           static_cast<uint32>(effective_len), flags));
    949 
    950   scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
    951 
    952   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
    953     DecreaseSendWindowSize(static_cast<int32>(effective_len));
    954     data_buffer->AddConsumeCallback(
    955         base::Bind(&SpdySession::OnWriteBufferConsumed,
    956                    weak_factory_.GetWeakPtr(),
    957                    static_cast<size_t>(effective_len)));
    958   }
    959 
    960   return data_buffer.Pass();
    961 }
    962 
    963 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
    964   DCHECK_NE(stream_id, 0u);
    965 
    966   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
    967   if (it == active_streams_.end()) {
    968     NOTREACHED();
    969     return;
    970   }
    971 
    972   CloseActiveStreamIterator(it, status);
    973 }
    974 
    975 void SpdySession::CloseCreatedStream(
    976     const base::WeakPtr<SpdyStream>& stream, int status) {
    977   DCHECK_EQ(stream->stream_id(), 0u);
    978 
    979   CreatedStreamSet::iterator it = created_streams_.find(stream.get());
    980   if (it == created_streams_.end()) {
    981     NOTREACHED();
    982     return;
    983   }
    984 
    985   CloseCreatedStreamIterator(it, status);
    986 }
    987 
    988 void SpdySession::ResetStream(SpdyStreamId stream_id,
    989                               SpdyRstStreamStatus status,
    990                               const std::string& description) {
    991   DCHECK_NE(stream_id, 0u);
    992 
    993   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
    994   if (it == active_streams_.end()) {
    995     NOTREACHED();
    996     return;
    997   }
    998 
    999   ResetStreamIterator(it, status, description);
   1000 }
   1001 
   1002 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
   1003   return ContainsKey(active_streams_, stream_id);
   1004 }
   1005 
   1006 LoadState SpdySession::GetLoadState() const {
   1007   // Just report that we're idle since the session could be doing
   1008   // many things concurrently.
   1009   return LOAD_STATE_IDLE;
   1010 }
   1011 
   1012 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
   1013                                             int status) {
   1014   // TODO(mbelshe): We should send a RST_STREAM control frame here
   1015   //                so that the server can cancel a large send.
   1016 
   1017   scoped_ptr<SpdyStream> owned_stream(it->second.stream);
   1018   active_streams_.erase(it);
   1019 
   1020   // TODO(akalin): When SpdyStream was ref-counted (and
   1021   // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
   1022   // was only done when status was not OK. This meant that pushed
   1023   // streams can still be claimed after they're closed. This is
   1024   // probably something that we still want to support, although server
   1025   // push is hardly used. Write tests for this and fix this. (See
   1026   // http://crbug.com/261712 .)
   1027   if (owned_stream->type() == SPDY_PUSH_STREAM)
   1028     unclaimed_pushed_streams_.erase(owned_stream->url());
   1029 
   1030   base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
   1031 
   1032   DeleteStream(owned_stream.Pass(), status);
   1033 
   1034   if (!weak_this)
   1035     return;
   1036 
   1037   if (availability_state_ == STATE_CLOSED)
   1038     return;
   1039 
   1040   // If there are no active streams and the socket pool is stalled, close the
   1041   // session to free up a socket slot.
   1042   if (active_streams_.empty() && connection_->IsPoolStalled()) {
   1043     CloseSessionResult result =
   1044         DoCloseSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
   1045     DCHECK_NE(result, SESSION_ALREADY_CLOSED);
   1046   }
   1047 }
   1048 
   1049 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
   1050                                              int status) {
   1051   scoped_ptr<SpdyStream> owned_stream(*it);
   1052   created_streams_.erase(it);
   1053   DeleteStream(owned_stream.Pass(), status);
   1054 }
   1055 
   1056 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
   1057                                       SpdyRstStreamStatus status,
   1058                                       const std::string& description) {
   1059   // Send the RST_STREAM frame first as CloseActiveStreamIterator()
   1060   // may close us.
   1061   SpdyStreamId stream_id = it->first;
   1062   RequestPriority priority = it->second.stream->priority();
   1063   EnqueueResetStreamFrame(stream_id, priority, status, description);
   1064 
   1065   // Removes any pending writes for the stream except for possibly an
   1066   // in-flight one.
   1067   CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
   1068 }
   1069 
   1070 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
   1071                                           RequestPriority priority,
   1072                                           SpdyRstStreamStatus status,
   1073                                           const std::string& description) {
   1074   DCHECK_NE(stream_id, 0u);
   1075 
   1076   net_log().AddEvent(
   1077       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
   1078       base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
   1079 
   1080   DCHECK(buffered_spdy_framer_.get());
   1081   scoped_ptr<SpdyFrame> rst_frame(
   1082       buffered_spdy_framer_->CreateRstStream(stream_id, status));
   1083 
   1084   EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
   1085   RecordProtocolErrorHistogram(
   1086       static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
   1087 }
   1088 
   1089 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
   1090   CHECK(!in_io_loop_);
   1091   DCHECK_NE(availability_state_, STATE_CLOSED);
   1092   DCHECK_EQ(read_state_, expected_read_state);
   1093 
   1094   result = DoReadLoop(expected_read_state, result);
   1095 
   1096   if (availability_state_ == STATE_CLOSED) {
   1097     DCHECK_EQ(result, error_on_close_);
   1098     DCHECK_LT(error_on_close_, ERR_IO_PENDING);
   1099     RemoveFromPool();
   1100     return;
   1101   }
   1102 
   1103   DCHECK(result == OK || result == ERR_IO_PENDING);
   1104 }
   1105 
   1106 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
   1107   CHECK(!in_io_loop_);
   1108   DCHECK_NE(availability_state_, STATE_CLOSED);
   1109   DCHECK_EQ(read_state_, expected_read_state);
   1110 
   1111   in_io_loop_ = true;
   1112 
   1113   int bytes_read_without_yielding = 0;
   1114 
   1115   // Loop until the session is closed, the read becomes blocked, or
   1116   // the read limit is exceeded.
   1117   while (true) {
   1118     switch (read_state_) {
   1119       case READ_STATE_DO_READ:
   1120         DCHECK_EQ(result, OK);
   1121         result = DoRead();
   1122         break;
   1123       case READ_STATE_DO_READ_COMPLETE:
   1124         if (result > 0)
   1125           bytes_read_without_yielding += result;
   1126         result = DoReadComplete(result);
   1127         break;
   1128       default:
   1129         NOTREACHED() << "read_state_: " << read_state_;
   1130         break;
   1131     }
   1132 
   1133     if (availability_state_ == STATE_CLOSED) {
   1134       DCHECK_EQ(result, error_on_close_);
   1135       DCHECK_LT(result, ERR_IO_PENDING);
   1136       break;
   1137     }
   1138 
   1139     if (result == ERR_IO_PENDING)
   1140       break;
   1141 
   1142     if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
   1143       read_state_ = READ_STATE_DO_READ;
   1144       base::MessageLoop::current()->PostTask(
   1145           FROM_HERE,
   1146           base::Bind(&SpdySession::PumpReadLoop,
   1147                      weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
   1148       result = ERR_IO_PENDING;
   1149       break;
   1150     }
   1151   }
   1152 
   1153   CHECK(in_io_loop_);
   1154   in_io_loop_ = false;
   1155 
   1156   return result;
   1157 }
   1158 
   1159 int SpdySession::DoRead() {
   1160   CHECK(in_io_loop_);
   1161   DCHECK_NE(availability_state_, STATE_CLOSED);
   1162 
   1163   CHECK(connection_);
   1164   CHECK(connection_->socket());
   1165   read_state_ = READ_STATE_DO_READ_COMPLETE;
   1166   return connection_->socket()->Read(
   1167       read_buffer_.get(),
   1168       kReadBufferSize,
   1169       base::Bind(&SpdySession::PumpReadLoop,
   1170                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
   1171 }
   1172 
   1173 int SpdySession::DoReadComplete(int result) {
   1174   CHECK(in_io_loop_);
   1175   DCHECK_NE(availability_state_, STATE_CLOSED);
   1176 
   1177   // Parse a frame.  For now this code requires that the frame fit into our
   1178   // buffer (kReadBufferSize).
   1179   // TODO(mbelshe): support arbitrarily large frames!
   1180 
   1181   if (result == 0) {
   1182     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
   1183                                 total_bytes_received_, 1, 100000000, 50);
   1184     CloseSessionResult close_session_result =
   1185         DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed");
   1186     DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
   1187     DCHECK_EQ(availability_state_, STATE_CLOSED);
   1188     DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED);
   1189     return ERR_CONNECTION_CLOSED;
   1190   }
   1191 
   1192   if (result < 0) {
   1193     CloseSessionResult close_session_result =
   1194         DoCloseSession(static_cast<Error>(result), "result is < 0.");
   1195     DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
   1196     DCHECK_EQ(availability_state_, STATE_CLOSED);
   1197     DCHECK_EQ(error_on_close_, result);
   1198     return result;
   1199   }
   1200 
   1201   total_bytes_received_ += result;
   1202 
   1203   last_activity_time_ = time_func_();
   1204 
   1205   DCHECK(buffered_spdy_framer_.get());
   1206   char* data = read_buffer_->data();
   1207   while (result > 0) {
   1208     uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
   1209     result -= bytes_processed;
   1210     data += bytes_processed;
   1211 
   1212     if (availability_state_ == STATE_CLOSED) {
   1213       DCHECK_LT(error_on_close_, ERR_IO_PENDING);
   1214       return error_on_close_;
   1215     }
   1216 
   1217     DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
   1218   }
   1219 
   1220   read_state_ = READ_STATE_DO_READ;
   1221   return OK;
   1222 }
   1223 
   1224 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
   1225   CHECK(!in_io_loop_);
   1226   DCHECK_NE(availability_state_, STATE_CLOSED);
   1227   DCHECK_EQ(write_state_, expected_write_state);
   1228 
   1229   result = DoWriteLoop(expected_write_state, result);
   1230 
   1231   if (availability_state_ == STATE_CLOSED) {
   1232     DCHECK_EQ(result, error_on_close_);
   1233     DCHECK_LT(error_on_close_, ERR_IO_PENDING);
   1234     RemoveFromPool();
   1235     return;
   1236   }
   1237 
   1238   DCHECK(result == OK || result == ERR_IO_PENDING);
   1239 }
   1240 
   1241 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
   1242   CHECK(!in_io_loop_);
   1243   DCHECK_NE(availability_state_, STATE_CLOSED);
   1244   DCHECK_NE(write_state_, WRITE_STATE_IDLE);
   1245   DCHECK_EQ(write_state_, expected_write_state);
   1246 
   1247   in_io_loop_ = true;
   1248 
   1249   // Loop until the session is closed or the write becomes blocked.
   1250   while (true) {
   1251     switch (write_state_) {
   1252       case WRITE_STATE_DO_WRITE:
   1253         DCHECK_EQ(result, OK);
   1254         result = DoWrite();
   1255         break;
   1256       case WRITE_STATE_DO_WRITE_COMPLETE:
   1257         result = DoWriteComplete(result);
   1258         break;
   1259       case WRITE_STATE_IDLE:
   1260       default:
   1261         NOTREACHED() << "write_state_: " << write_state_;
   1262         break;
   1263     }
   1264 
   1265     if (availability_state_ == STATE_CLOSED) {
   1266       DCHECK_EQ(result, error_on_close_);
   1267       DCHECK_LT(result, ERR_IO_PENDING);
   1268       break;
   1269     }
   1270 
   1271     if (write_state_ == WRITE_STATE_IDLE) {
   1272       DCHECK_EQ(result, ERR_IO_PENDING);
   1273       break;
   1274     }
   1275 
   1276     if (result == ERR_IO_PENDING)
   1277       break;
   1278   }
   1279 
   1280   CHECK(in_io_loop_);
   1281   in_io_loop_ = false;
   1282 
   1283   return result;
   1284 }
   1285 
   1286 int SpdySession::DoWrite() {
   1287   CHECK(in_io_loop_);
   1288   DCHECK_NE(availability_state_, STATE_CLOSED);
   1289 
   1290   DCHECK(buffered_spdy_framer_);
   1291   if (in_flight_write_) {
   1292     DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
   1293   } else {
   1294     // Grab the next frame to send.
   1295     SpdyFrameType frame_type = DATA;
   1296     scoped_ptr<SpdyBufferProducer> producer;
   1297     base::WeakPtr<SpdyStream> stream;
   1298     if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
   1299       write_state_ = WRITE_STATE_IDLE;
   1300       return ERR_IO_PENDING;
   1301     }
   1302 
   1303     if (stream.get())
   1304       DCHECK(!stream->IsClosed());
   1305 
   1306     // Activate the stream only when sending the SYN_STREAM frame to
   1307     // guarantee monotonically-increasing stream IDs.
   1308     if (frame_type == SYN_STREAM) {
   1309       if (stream.get() && stream->stream_id() == 0) {
   1310         scoped_ptr<SpdyStream> owned_stream =
   1311             ActivateCreatedStream(stream.get());
   1312         InsertActivatedStream(owned_stream.Pass());
   1313       } else {
   1314         NOTREACHED();
   1315         return ERR_UNEXPECTED;
   1316       }
   1317     }
   1318 
   1319     in_flight_write_ = producer->ProduceBuffer();
   1320     if (!in_flight_write_) {
   1321       NOTREACHED();
   1322       return ERR_UNEXPECTED;
   1323     }
   1324     in_flight_write_frame_type_ = frame_type;
   1325     in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
   1326     DCHECK_GE(in_flight_write_frame_size_,
   1327               buffered_spdy_framer_->GetFrameMinimumSize());
   1328     in_flight_write_stream_ = stream;
   1329   }
   1330 
   1331   write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
   1332 
   1333   // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
   1334   // with Socket implementations that don't store their IOBuffer
   1335   // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
   1336   scoped_refptr<IOBuffer> write_io_buffer =
   1337       in_flight_write_->GetIOBufferForRemainingData();
   1338   return connection_->socket()->Write(
   1339       write_io_buffer.get(),
   1340       in_flight_write_->GetRemainingSize(),
   1341       base::Bind(&SpdySession::PumpWriteLoop,
   1342                  weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
   1343 }
   1344 
   1345 int SpdySession::DoWriteComplete(int result) {
   1346   CHECK(in_io_loop_);
   1347   DCHECK_NE(availability_state_, STATE_CLOSED);
   1348   DCHECK_NE(result, ERR_IO_PENDING);
   1349   DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
   1350 
   1351   last_activity_time_ = time_func_();
   1352 
   1353   if (result < 0) {
   1354     DCHECK_NE(result, ERR_IO_PENDING);
   1355     in_flight_write_.reset();
   1356     in_flight_write_frame_type_ = DATA;
   1357     in_flight_write_frame_size_ = 0;
   1358     in_flight_write_stream_.reset();
   1359     CloseSessionResult close_session_result =
   1360         DoCloseSession(static_cast<Error>(result), "Write error");
   1361     DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
   1362     DCHECK_EQ(availability_state_, STATE_CLOSED);
   1363     DCHECK_EQ(error_on_close_, result);
   1364     return result;
   1365   }
   1366 
   1367   // It should not be possible to have written more bytes than our
   1368   // in_flight_write_.
   1369   DCHECK_LE(static_cast<size_t>(result),
   1370             in_flight_write_->GetRemainingSize());
   1371 
   1372   if (result > 0) {
   1373     in_flight_write_->Consume(static_cast<size_t>(result));
   1374 
   1375     // We only notify the stream when we've fully written the pending frame.
   1376     if (in_flight_write_->GetRemainingSize() == 0) {
   1377       // It is possible that the stream was cancelled while we were
   1378       // writing to the socket.
   1379       if (in_flight_write_stream_.get()) {
   1380         DCHECK_GT(in_flight_write_frame_size_, 0u);
   1381         in_flight_write_stream_->OnFrameWriteComplete(
   1382             in_flight_write_frame_type_,
   1383             in_flight_write_frame_size_);
   1384       }
   1385 
   1386       // Cleanup the write which just completed.
   1387       in_flight_write_.reset();
   1388       in_flight_write_frame_type_ = DATA;
   1389       in_flight_write_frame_size_ = 0;
   1390       in_flight_write_stream_.reset();
   1391     }
   1392   }
   1393 
   1394   write_state_ = WRITE_STATE_DO_WRITE;
   1395   return OK;
   1396 }
   1397 
   1398 void SpdySession::DcheckGoingAway() const {
   1399   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
   1400   if (DCHECK_IS_ON()) {
   1401     for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
   1402       DCHECK(pending_create_stream_queues_[i].empty());
   1403     }
   1404   }
   1405   DCHECK(created_streams_.empty());
   1406 }
   1407 
   1408 void SpdySession::DcheckClosed() const {
   1409   DcheckGoingAway();
   1410   DCHECK_EQ(availability_state_, STATE_CLOSED);
   1411   DCHECK_LT(error_on_close_, ERR_IO_PENDING);
   1412   DCHECK(active_streams_.empty());
   1413   DCHECK(unclaimed_pushed_streams_.empty());
   1414   DCHECK(write_queue_.IsEmpty());
   1415 }
   1416 
   1417 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
   1418                                  Error status) {
   1419   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
   1420 
   1421   // The loops below are carefully written to avoid reentrancy problems.
   1422 
   1423   while (true) {
   1424     size_t old_size = GetTotalSize(pending_create_stream_queues_);
   1425     base::WeakPtr<SpdyStreamRequest> pending_request =
   1426         GetNextPendingStreamRequest();
   1427     if (!pending_request)
   1428       break;
   1429     // No new stream requests should be added while the session is
   1430     // going away.
   1431     DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
   1432     pending_request->OnRequestCompleteFailure(ERR_ABORTED);
   1433   }
   1434 
   1435   while (true) {
   1436     size_t old_size = active_streams_.size();
   1437     ActiveStreamMap::iterator it =
   1438         active_streams_.lower_bound(last_good_stream_id + 1);
   1439     if (it == active_streams_.end())
   1440       break;
   1441     LogAbandonedActiveStream(it, status);
   1442     CloseActiveStreamIterator(it, status);
   1443     // No new streams should be activated while the session is going
   1444     // away.
   1445     DCHECK_GT(old_size, active_streams_.size());
   1446   }
   1447 
   1448   while (!created_streams_.empty()) {
   1449     size_t old_size = created_streams_.size();
   1450     CreatedStreamSet::iterator it = created_streams_.begin();
   1451     LogAbandonedStream(*it, status);
   1452     CloseCreatedStreamIterator(it, status);
   1453     // No new streams should be created while the session is going
   1454     // away.
   1455     DCHECK_GT(old_size, created_streams_.size());
   1456   }
   1457 
   1458   write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
   1459 
   1460   DcheckGoingAway();
   1461 }
   1462 
   1463 void SpdySession::MaybeFinishGoingAway() {
   1464   DcheckGoingAway();
   1465   if (active_streams_.empty() && availability_state_ != STATE_CLOSED) {
   1466     CloseSessionResult result =
   1467         DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away");
   1468     DCHECK_NE(result, SESSION_ALREADY_CLOSED);
   1469   }
   1470 }
   1471 
   1472 SpdySession::CloseSessionResult SpdySession::DoCloseSession(
   1473     Error err,
   1474     const std::string& description) {
   1475   DCHECK_LT(err, ERR_IO_PENDING);
   1476 
   1477   if (availability_state_ == STATE_CLOSED)
   1478     return SESSION_ALREADY_CLOSED;
   1479 
   1480   net_log_.AddEvent(
   1481       NetLog::TYPE_SPDY_SESSION_CLOSE,
   1482       base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
   1483 
   1484   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
   1485   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
   1486                               total_bytes_received_, 1, 100000000, 50);
   1487 
   1488   // |pool_| will be NULL when |InitializeWithSocket()| is in the
   1489   // call stack.
   1490   if (pool_ && availability_state_ != STATE_GOING_AWAY)
   1491     pool_->MakeSessionUnavailable(GetWeakPtr());
   1492 
   1493   availability_state_ = STATE_CLOSED;
   1494   error_on_close_ = err;
   1495 
   1496   StartGoingAway(0, err);
   1497   write_queue_.Clear();
   1498 
   1499   DcheckClosed();
   1500 
   1501   if (in_io_loop_)
   1502     return SESSION_CLOSED_BUT_NOT_REMOVED;
   1503 
   1504   RemoveFromPool();
   1505   return SESSION_CLOSED_AND_REMOVED;
   1506 }
   1507 
   1508 void SpdySession::RemoveFromPool() {
   1509   DcheckClosed();
   1510   CHECK(pool_);
   1511 
   1512   SpdySessionPool* pool = pool_;
   1513   pool_ = NULL;
   1514   pool->RemoveUnavailableSession(GetWeakPtr());
   1515 }
   1516 
   1517 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
   1518   DCHECK(stream);
   1519   std::string description = base::StringPrintf(
   1520       "ABANDONED (stream_id=%d): ", stream->stream_id()) +
   1521       stream->url().spec();
   1522   stream->LogStreamError(status, description);
   1523   // We don't increment the streams abandoned counter here. If the
   1524   // stream isn't active (i.e., it hasn't written anything to the wire
   1525   // yet) then it's as if it never existed. If it is active, then
   1526   // LogAbandonedActiveStream() will increment the counters.
   1527 }
   1528 
   1529 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
   1530                                            Error status) {
   1531   DCHECK_GT(it->first, 0u);
   1532   LogAbandonedStream(it->second.stream, status);
   1533   ++streams_abandoned_count_;
   1534   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
   1535   abandoned_streams.Increment();
   1536   if (it->second.stream->type() == SPDY_PUSH_STREAM &&
   1537       unclaimed_pushed_streams_.find(it->second.stream->url()) !=
   1538       unclaimed_pushed_streams_.end()) {
   1539     base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
   1540     abandoned_push_streams.Increment();
   1541   }
   1542 }
   1543 
   1544 int SpdySession::GetNewStreamId() {
   1545   int id = stream_hi_water_mark_;
   1546   stream_hi_water_mark_ += 2;
   1547   if (stream_hi_water_mark_ > 0x7fff)
   1548     stream_hi_water_mark_ = 1;
   1549   return id;
   1550 }
   1551 
   1552 void SpdySession::CloseSessionOnError(Error err,
   1553                                       const std::string& description) {
   1554   // We may be called from anywhere, so we can't expect a particular
   1555   // return value.
   1556   ignore_result(DoCloseSession(err, description));
   1557 }
   1558 
   1559 base::Value* SpdySession::GetInfoAsValue() const {
   1560   base::DictionaryValue* dict = new base::DictionaryValue();
   1561 
   1562   dict->SetInteger("source_id", net_log_.source().id);
   1563 
   1564   dict->SetString("host_port_pair", host_port_pair().ToString());
   1565   if (!pooled_aliases_.empty()) {
   1566     base::ListValue* alias_list = new base::ListValue();
   1567     for (std::set<SpdySessionKey>::const_iterator it =
   1568              pooled_aliases_.begin();
   1569          it != pooled_aliases_.end(); it++) {
   1570       alias_list->Append(new base::StringValue(
   1571           it->host_port_pair().ToString()));
   1572     }
   1573     dict->Set("aliases", alias_list);
   1574   }
   1575   dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
   1576 
   1577   dict->SetInteger("active_streams", active_streams_.size());
   1578 
   1579   dict->SetInteger("unclaimed_pushed_streams",
   1580                    unclaimed_pushed_streams_.size());
   1581 
   1582   dict->SetBoolean("is_secure", is_secure_);
   1583 
   1584   dict->SetString("protocol_negotiated",
   1585                   SSLClientSocket::NextProtoToString(
   1586                       connection_->socket()->GetNegotiatedProtocol()));
   1587 
   1588   dict->SetInteger("error", error_on_close_);
   1589   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
   1590 
   1591   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
   1592   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
   1593   dict->SetInteger("streams_pushed_and_claimed_count",
   1594       streams_pushed_and_claimed_count_);
   1595   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
   1596   DCHECK(buffered_spdy_framer_.get());
   1597   dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
   1598 
   1599   dict->SetBoolean("sent_settings", sent_settings_);
   1600   dict->SetBoolean("received_settings", received_settings_);
   1601 
   1602   dict->SetInteger("send_window_size", session_send_window_size_);
   1603   dict->SetInteger("recv_window_size", session_recv_window_size_);
   1604   dict->SetInteger("unacked_recv_window_bytes",
   1605                    session_unacked_recv_window_bytes_);
   1606   return dict;
   1607 }
   1608 
   1609 bool SpdySession::IsReused() const {
   1610   return buffered_spdy_framer_->frames_received() > 0;
   1611 }
   1612 
   1613 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
   1614                                     LoadTimingInfo* load_timing_info) const {
   1615   return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
   1616                                         load_timing_info);
   1617 }
   1618 
   1619 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
   1620   int rv = ERR_SOCKET_NOT_CONNECTED;
   1621   if (connection_->socket()) {
   1622     rv = connection_->socket()->GetPeerAddress(address);
   1623   }
   1624 
   1625   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
   1626                         rv == ERR_SOCKET_NOT_CONNECTED);
   1627 
   1628   return rv;
   1629 }
   1630 
   1631 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
   1632   int rv = ERR_SOCKET_NOT_CONNECTED;
   1633   if (connection_->socket()) {
   1634     rv = connection_->socket()->GetLocalAddress(address);
   1635   }
   1636 
   1637   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
   1638                         rv == ERR_SOCKET_NOT_CONNECTED);
   1639 
   1640   return rv;
   1641 }
   1642 
   1643 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
   1644                                       SpdyFrameType frame_type,
   1645                                       scoped_ptr<SpdyFrame> frame) {
   1646   DCHECK(frame_type == RST_STREAM ||
   1647          frame_type == SETTINGS ||
   1648          frame_type == WINDOW_UPDATE ||
   1649          frame_type == PING);
   1650   EnqueueWrite(
   1651       priority, frame_type,
   1652       scoped_ptr<SpdyBufferProducer>(
   1653           new SimpleBufferProducer(
   1654               scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
   1655       base::WeakPtr<SpdyStream>());
   1656 }
   1657 
   1658 void SpdySession::EnqueueWrite(RequestPriority priority,
   1659                                SpdyFrameType frame_type,
   1660                                scoped_ptr<SpdyBufferProducer> producer,
   1661                                const base::WeakPtr<SpdyStream>& stream) {
   1662   if (availability_state_ == STATE_CLOSED)
   1663     return;
   1664 
   1665   bool was_idle = write_queue_.IsEmpty();
   1666   write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
   1667   if (write_state_ == WRITE_STATE_IDLE) {
   1668     DCHECK(was_idle);
   1669     DCHECK(!in_flight_write_);
   1670     write_state_ = WRITE_STATE_DO_WRITE;
   1671     base::MessageLoop::current()->PostTask(
   1672         FROM_HERE,
   1673         base::Bind(&SpdySession::PumpWriteLoop,
   1674                    weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
   1675   }
   1676 }
   1677 
   1678 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
   1679   DCHECK_EQ(stream->stream_id(), 0u);
   1680   DCHECK(created_streams_.find(stream.get()) == created_streams_.end());
   1681   created_streams_.insert(stream.release());
   1682 }
   1683 
   1684 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
   1685   DCHECK_EQ(stream->stream_id(), 0u);
   1686   DCHECK(created_streams_.find(stream) != created_streams_.end());
   1687   stream->set_stream_id(GetNewStreamId());
   1688   scoped_ptr<SpdyStream> owned_stream(stream);
   1689   created_streams_.erase(stream);
   1690   return owned_stream.Pass();
   1691 }
   1692 
   1693 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
   1694   SpdyStreamId stream_id = stream->stream_id();
   1695   DCHECK_NE(stream_id, 0u);
   1696   std::pair<ActiveStreamMap::iterator, bool> result =
   1697       active_streams_.insert(
   1698           std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
   1699   if (result.second) {
   1700     ignore_result(stream.release());
   1701   } else {
   1702     NOTREACHED();
   1703   }
   1704 }
   1705 
   1706 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
   1707   if (in_flight_write_stream_.get() == stream.get()) {
   1708     // If we're deleting the stream for the in-flight write, we still
   1709     // need to let the write complete, so we clear
   1710     // |in_flight_write_stream_| and let the write finish on its own
   1711     // without notifying |in_flight_write_stream_|.
   1712     in_flight_write_stream_.reset();
   1713   }
   1714 
   1715   write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
   1716 
   1717   // |stream->OnClose()| may end up closing |this|, so detect that.
   1718   base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
   1719 
   1720   stream->OnClose(status);
   1721 
   1722   if (!weak_this)
   1723     return;
   1724 
   1725   switch (availability_state_) {
   1726     case STATE_AVAILABLE:
   1727       ProcessPendingStreamRequests();
   1728       break;
   1729     case STATE_GOING_AWAY:
   1730       DcheckGoingAway();
   1731       MaybeFinishGoingAway();
   1732       break;
   1733     case STATE_CLOSED:
   1734       // Do nothing.
   1735       break;
   1736   }
   1737 }
   1738 
   1739 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
   1740   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
   1741 
   1742   PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
   1743   if (unclaimed_it == unclaimed_pushed_streams_.end())
   1744     return base::WeakPtr<SpdyStream>();
   1745 
   1746   SpdyStreamId stream_id = unclaimed_it->second.stream_id;
   1747   unclaimed_pushed_streams_.erase(unclaimed_it);
   1748 
   1749   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
   1750   if (active_it == active_streams_.end()) {
   1751     NOTREACHED();
   1752     return base::WeakPtr<SpdyStream>();
   1753   }
   1754 
   1755   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
   1756   used_push_streams.Increment();
   1757   return active_it->second.stream->GetWeakPtr();
   1758 }
   1759 
   1760 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
   1761                              bool* was_npn_negotiated,
   1762                              NextProto* protocol_negotiated) {
   1763   *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
   1764   *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
   1765   return connection_->socket()->GetSSLInfo(ssl_info);
   1766 }
   1767 
   1768 bool SpdySession::GetSSLCertRequestInfo(
   1769     SSLCertRequestInfo* cert_request_info) {
   1770   if (!is_secure_)
   1771     return false;
   1772   GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
   1773   return true;
   1774 }
   1775 
   1776 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
   1777   CHECK(in_io_loop_);
   1778 
   1779   if (availability_state_ == STATE_CLOSED)
   1780     return;
   1781 
   1782   RecordProtocolErrorHistogram(
   1783       static_cast<SpdyProtocolErrorDetails>(error_code));
   1784   std::string description = base::StringPrintf(
   1785       "SPDY_ERROR error_code: %d.", error_code);
   1786   CloseSessionResult result =
   1787       DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description);
   1788   DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
   1789 }
   1790 
   1791 void SpdySession::OnStreamError(SpdyStreamId stream_id,
   1792                                 const std::string& description) {
   1793   CHECK(in_io_loop_);
   1794 
   1795   if (availability_state_ == STATE_CLOSED)
   1796     return;
   1797 
   1798   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1799   if (it == active_streams_.end()) {
   1800     // We still want to send a frame to reset the stream even if we
   1801     // don't know anything about it.
   1802     EnqueueResetStreamFrame(
   1803         stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
   1804     return;
   1805   }
   1806 
   1807   ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
   1808 }
   1809 
   1810 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
   1811                                     size_t length,
   1812                                     bool fin) {
   1813   CHECK(in_io_loop_);
   1814 
   1815   if (availability_state_ == STATE_CLOSED)
   1816     return;
   1817 
   1818   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1819 
   1820   // By the time data comes in, the stream may already be inactive.
   1821   if (it == active_streams_.end())
   1822     return;
   1823 
   1824   SpdyStream* stream = it->second.stream;
   1825   CHECK_EQ(stream->stream_id(), stream_id);
   1826 
   1827   DCHECK(buffered_spdy_framer_);
   1828   size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
   1829   stream->IncrementRawReceivedBytes(header_len);
   1830 }
   1831 
   1832 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
   1833                                     const char* data,
   1834                                     size_t len,
   1835                                     bool fin) {
   1836   CHECK(in_io_loop_);
   1837 
   1838   if (availability_state_ == STATE_CLOSED)
   1839     return;
   1840 
   1841   DCHECK_LT(len, 1u << 24);
   1842   if (net_log().IsLoggingAllEvents()) {
   1843     net_log().AddEvent(
   1844         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
   1845         base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
   1846   }
   1847 
   1848   // Build the buffer as early as possible so that we go through the
   1849   // session flow control checks and update
   1850   // |unacked_recv_window_bytes_| properly even when the stream is
   1851   // inactive (since the other side has still reduced its session send
   1852   // window).
   1853   scoped_ptr<SpdyBuffer> buffer;
   1854   if (data) {
   1855     DCHECK_GT(len, 0u);
   1856     buffer.reset(new SpdyBuffer(data, len));
   1857 
   1858     if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   1859       DecreaseRecvWindowSize(static_cast<int32>(len));
   1860       buffer->AddConsumeCallback(
   1861           base::Bind(&SpdySession::OnReadBufferConsumed,
   1862                      weak_factory_.GetWeakPtr()));
   1863     }
   1864   } else {
   1865     DCHECK_EQ(len, 0u);
   1866   }
   1867 
   1868   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1869 
   1870   // By the time data comes in, the stream may already be inactive.
   1871   if (it == active_streams_.end())
   1872     return;
   1873 
   1874   SpdyStream* stream = it->second.stream;
   1875   CHECK_EQ(stream->stream_id(), stream_id);
   1876 
   1877   stream->IncrementRawReceivedBytes(len);
   1878 
   1879   if (it->second.waiting_for_syn_reply) {
   1880     const std::string& error = "Data received before SYN_REPLY.";
   1881     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   1882     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
   1883     return;
   1884   }
   1885 
   1886   stream->OnDataReceived(buffer.Pass());
   1887 }
   1888 
   1889 void SpdySession::OnSettings(bool clear_persisted) {
   1890   CHECK(in_io_loop_);
   1891 
   1892   if (availability_state_ == STATE_CLOSED)
   1893     return;
   1894 
   1895   if (clear_persisted)
   1896     http_server_properties_->ClearSpdySettings(host_port_pair());
   1897 
   1898   if (net_log_.IsLoggingAllEvents()) {
   1899     net_log_.AddEvent(
   1900         NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
   1901         base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
   1902                    clear_persisted));
   1903   }
   1904 }
   1905 
   1906 void SpdySession::OnSetting(SpdySettingsIds id,
   1907                             uint8 flags,
   1908                             uint32 value) {
   1909   CHECK(in_io_loop_);
   1910 
   1911   if (availability_state_ == STATE_CLOSED)
   1912     return;
   1913 
   1914   HandleSetting(id, value);
   1915   http_server_properties_->SetSpdySetting(
   1916       host_port_pair(),
   1917       id,
   1918       static_cast<SpdySettingsFlags>(flags),
   1919       value);
   1920   received_settings_ = true;
   1921 
   1922   // Log the setting.
   1923   net_log_.AddEvent(
   1924       NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
   1925       base::Bind(&NetLogSpdySettingCallback,
   1926                  id, static_cast<SpdySettingsFlags>(flags), value));
   1927 }
   1928 
   1929 void SpdySession::OnSendCompressedFrame(
   1930     SpdyStreamId stream_id,
   1931     SpdyFrameType type,
   1932     size_t payload_len,
   1933     size_t frame_len) {
   1934   if (type != SYN_STREAM)
   1935     return;
   1936 
   1937   DCHECK(buffered_spdy_framer_.get());
   1938   size_t compressed_len =
   1939       frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
   1940 
   1941   if (payload_len) {
   1942     // Make sure we avoid early decimal truncation.
   1943     int compression_pct = 100 - (100 * compressed_len) / payload_len;
   1944     UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
   1945                              compression_pct);
   1946   }
   1947 }
   1948 
   1949 void SpdySession::OnReceiveCompressedFrame(
   1950     SpdyStreamId stream_id,
   1951     SpdyFrameType type,
   1952     size_t frame_len) {
   1953   last_compressed_frame_len_ = frame_len;
   1954 }
   1955 
   1956 int SpdySession::OnInitialResponseHeadersReceived(
   1957     const SpdyHeaderBlock& response_headers,
   1958     base::Time response_time,
   1959     base::TimeTicks recv_first_byte_time,
   1960     SpdyStream* stream) {
   1961   CHECK(in_io_loop_);
   1962   SpdyStreamId stream_id = stream->stream_id();
   1963   // May invalidate |stream|.
   1964   int rv = stream->OnInitialResponseHeadersReceived(
   1965       response_headers, response_time, recv_first_byte_time);
   1966   if (rv < 0) {
   1967     DCHECK_NE(rv, ERR_IO_PENDING);
   1968     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
   1969   }
   1970   return rv;
   1971 }
   1972 
   1973 void SpdySession::OnSynStream(SpdyStreamId stream_id,
   1974                               SpdyStreamId associated_stream_id,
   1975                               SpdyPriority priority,
   1976                               uint8 credential_slot,
   1977                               bool fin,
   1978                               bool unidirectional,
   1979                               const SpdyHeaderBlock& headers) {
   1980   CHECK(in_io_loop_);
   1981 
   1982   if (availability_state_ == STATE_CLOSED)
   1983     return;
   1984 
   1985   base::Time response_time = base::Time::Now();
   1986   base::TimeTicks recv_first_byte_time = time_func_();
   1987 
   1988   if (net_log_.IsLoggingAllEvents()) {
   1989     net_log_.AddEvent(
   1990         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
   1991         base::Bind(&NetLogSpdySynStreamReceivedCallback,
   1992                    &headers, fin, unidirectional, priority,
   1993                    stream_id, associated_stream_id));
   1994   }
   1995 
   1996   // Server-initiated streams should have even sequence numbers.
   1997   if ((stream_id & 0x1) != 0) {
   1998     LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
   1999     return;
   2000   }
   2001 
   2002   if (IsStreamActive(stream_id)) {
   2003     LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
   2004     return;
   2005   }
   2006 
   2007   RequestPriority request_priority =
   2008       ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
   2009 
   2010   if (availability_state_ == STATE_GOING_AWAY) {
   2011     // TODO(akalin): This behavior isn't in the SPDY spec, although it
   2012     // probably should be.
   2013     EnqueueResetStreamFrame(stream_id, request_priority,
   2014                             RST_STREAM_REFUSED_STREAM,
   2015                             "OnSyn received when going away");
   2016     return;
   2017   }
   2018 
   2019   if (associated_stream_id == 0) {
   2020     std::string description = base::StringPrintf(
   2021         "Received invalid OnSyn associated stream id %d for stream %d",
   2022         associated_stream_id, stream_id);
   2023     EnqueueResetStreamFrame(stream_id, request_priority,
   2024                             RST_STREAM_REFUSED_STREAM, description);
   2025     return;
   2026   }
   2027 
   2028   streams_pushed_count_++;
   2029 
   2030   // TODO(mbelshe): DCHECK that this is a GET method?
   2031 
   2032   // Verify that the response had a URL for us.
   2033   GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
   2034   if (!gurl.is_valid()) {
   2035     EnqueueResetStreamFrame(
   2036         stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR,
   2037         "Pushed stream url was invalid: " + gurl.spec());
   2038     return;
   2039   }
   2040 
   2041   // Verify we have a valid stream association.
   2042   ActiveStreamMap::iterator associated_it =
   2043       active_streams_.find(associated_stream_id);
   2044   if (associated_it == active_streams_.end()) {
   2045     EnqueueResetStreamFrame(
   2046         stream_id, request_priority, RST_STREAM_INVALID_STREAM,
   2047         base::StringPrintf(
   2048             "Received OnSyn with inactive associated stream %d",
   2049             associated_stream_id));
   2050     return;
   2051   }
   2052 
   2053   // Check that the SYN advertises the same origin as its associated stream.
   2054   // Bypass this check if and only if this session is with a SPDY proxy that
   2055   // is trusted explicitly via the --trusted-spdy-proxy switch.
   2056   if (trusted_spdy_proxy_.Equals(host_port_pair())) {
   2057     // Disallow pushing of HTTPS content.
   2058     if (gurl.SchemeIs("https")) {
   2059       EnqueueResetStreamFrame(
   2060           stream_id, request_priority, RST_STREAM_REFUSED_STREAM,
   2061           base::StringPrintf(
   2062               "Rejected push of Cross Origin HTTPS content %d",
   2063               associated_stream_id));
   2064     }
   2065   } else {
   2066     GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
   2067     if (associated_url.GetOrigin() != gurl.GetOrigin()) {
   2068       EnqueueResetStreamFrame(
   2069           stream_id, request_priority, RST_STREAM_REFUSED_STREAM,
   2070           base::StringPrintf(
   2071               "Rejected Cross Origin Push Stream %d",
   2072               associated_stream_id));
   2073       return;
   2074     }
   2075   }
   2076 
   2077   // There should not be an existing pushed stream with the same path.
   2078   PushedStreamMap::iterator pushed_it =
   2079       unclaimed_pushed_streams_.lower_bound(gurl);
   2080   if (pushed_it != unclaimed_pushed_streams_.end() &&
   2081       pushed_it->first == gurl) {
   2082     EnqueueResetStreamFrame(
   2083         stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR,
   2084         "Received duplicate pushed stream with url: " +
   2085         gurl.spec());
   2086     return;
   2087   }
   2088 
   2089   scoped_ptr<SpdyStream> stream(
   2090       new SpdyStream(SPDY_PUSH_STREAM, GetWeakPtr(), gurl,
   2091                      request_priority,
   2092                      stream_initial_send_window_size_,
   2093                      stream_initial_recv_window_size_,
   2094                      net_log_));
   2095   stream->set_stream_id(stream_id);
   2096   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
   2097   last_compressed_frame_len_ = 0;
   2098 
   2099   DeleteExpiredPushedStreams();
   2100   PushedStreamMap::iterator inserted_pushed_it =
   2101       unclaimed_pushed_streams_.insert(
   2102           pushed_it,
   2103           std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
   2104   DCHECK(inserted_pushed_it != pushed_it);
   2105 
   2106   InsertActivatedStream(stream.Pass());
   2107 
   2108   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
   2109   if (active_it == active_streams_.end()) {
   2110     NOTREACHED();
   2111     return;
   2112   }
   2113 
   2114   // Parse the headers.
   2115   if (OnInitialResponseHeadersReceived(
   2116           headers, response_time,
   2117           recv_first_byte_time, active_it->second.stream) != OK)
   2118     return;
   2119 
   2120   base::StatsCounter push_requests("spdy.pushed_streams");
   2121   push_requests.Increment();
   2122 }
   2123 
   2124 void SpdySession::DeleteExpiredPushedStreams() {
   2125   if (unclaimed_pushed_streams_.empty())
   2126     return;
   2127 
   2128   // Check that adequate time has elapsed since the last sweep.
   2129   if (time_func_() < next_unclaimed_push_stream_sweep_time_)
   2130     return;
   2131 
   2132   // Gather old streams to delete.
   2133   base::TimeTicks minimum_freshness = time_func_() -
   2134       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
   2135   std::vector<SpdyStreamId> streams_to_close;
   2136   for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
   2137        it != unclaimed_pushed_streams_.end(); ++it) {
   2138     if (minimum_freshness > it->second.creation_time)
   2139       streams_to_close.push_back(it->second.stream_id);
   2140   }
   2141 
   2142   for (std::vector<SpdyStreamId>::const_iterator to_close_it =
   2143            streams_to_close.begin();
   2144        to_close_it != streams_to_close.end(); ++to_close_it) {
   2145     ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
   2146     if (active_it == active_streams_.end())
   2147       continue;
   2148 
   2149     LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
   2150     // CloseActiveStreamIterator() will remove the stream from
   2151     // |unclaimed_pushed_streams_|.
   2152     CloseActiveStreamIterator(active_it, ERR_INVALID_SPDY_STREAM);
   2153   }
   2154 
   2155   next_unclaimed_push_stream_sweep_time_ = time_func_() +
   2156       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
   2157 }
   2158 
   2159 void SpdySession::OnSynReply(SpdyStreamId stream_id,
   2160                              bool fin,
   2161                              const SpdyHeaderBlock& headers) {
   2162   CHECK(in_io_loop_);
   2163 
   2164   if (availability_state_ == STATE_CLOSED)
   2165     return;
   2166 
   2167   base::Time response_time = base::Time::Now();
   2168   base::TimeTicks recv_first_byte_time = time_func_();
   2169 
   2170   if (net_log().IsLoggingAllEvents()) {
   2171     net_log().AddEvent(
   2172         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
   2173         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
   2174                    &headers, fin, stream_id));
   2175   }
   2176 
   2177   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2178   if (it == active_streams_.end()) {
   2179     // NOTE:  it may just be that the stream was cancelled.
   2180     return;
   2181   }
   2182 
   2183   SpdyStream* stream = it->second.stream;
   2184   CHECK_EQ(stream->stream_id(), stream_id);
   2185 
   2186   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
   2187   last_compressed_frame_len_ = 0;
   2188 
   2189   if (!it->second.waiting_for_syn_reply) {
   2190     const std::string& error =
   2191         "Received duplicate SYN_REPLY for stream.";
   2192     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   2193     ResetStreamIterator(it, RST_STREAM_STREAM_IN_USE, error);
   2194     return;
   2195   }
   2196   it->second.waiting_for_syn_reply = false;
   2197 
   2198   ignore_result(OnInitialResponseHeadersReceived(
   2199       headers, response_time, recv_first_byte_time, stream));
   2200 }
   2201 
   2202 void SpdySession::OnHeaders(SpdyStreamId stream_id,
   2203                             bool fin,
   2204                             const SpdyHeaderBlock& headers) {
   2205   CHECK(in_io_loop_);
   2206 
   2207   if (availability_state_ == STATE_CLOSED)
   2208     return;
   2209 
   2210   if (net_log().IsLoggingAllEvents()) {
   2211     net_log().AddEvent(
   2212         NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
   2213         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
   2214                    &headers, fin, stream_id));
   2215   }
   2216 
   2217   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2218   if (it == active_streams_.end()) {
   2219     // NOTE:  it may just be that the stream was cancelled.
   2220     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
   2221     return;
   2222   }
   2223 
   2224   SpdyStream* stream = it->second.stream;
   2225   CHECK_EQ(stream->stream_id(), stream_id);
   2226 
   2227   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
   2228   last_compressed_frame_len_ = 0;
   2229 
   2230   int rv = stream->OnAdditionalResponseHeadersReceived(headers);
   2231   if (rv < 0) {
   2232     DCHECK_NE(rv, ERR_IO_PENDING);
   2233     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
   2234   }
   2235 }
   2236 
   2237 void SpdySession::OnRstStream(SpdyStreamId stream_id,
   2238                               SpdyRstStreamStatus status) {
   2239   CHECK(in_io_loop_);
   2240 
   2241   if (availability_state_ == STATE_CLOSED)
   2242     return;
   2243 
   2244   std::string description;
   2245   net_log().AddEvent(
   2246       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
   2247       base::Bind(&NetLogSpdyRstCallback,
   2248                  stream_id, status, &description));
   2249 
   2250   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2251   if (it == active_streams_.end()) {
   2252     // NOTE:  it may just be that the stream was cancelled.
   2253     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
   2254     return;
   2255   }
   2256 
   2257   CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2258 
   2259   if (status == 0) {
   2260     it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
   2261   } else if (status == RST_STREAM_REFUSED_STREAM) {
   2262     CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
   2263   } else {
   2264     RecordProtocolErrorHistogram(
   2265         PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
   2266     it->second.stream->LogStreamError(
   2267         ERR_SPDY_PROTOCOL_ERROR,
   2268         base::StringPrintf("SPDY stream closed with status: %d", status));
   2269     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
   2270     //                For now, it doesn't matter much - it is a protocol error.
   2271     CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
   2272   }
   2273 }
   2274 
   2275 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
   2276                            SpdyGoAwayStatus status) {
   2277   CHECK(in_io_loop_);
   2278 
   2279   if (availability_state_ == STATE_CLOSED)
   2280     return;
   2281 
   2282   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
   2283       base::Bind(&NetLogSpdyGoAwayCallback,
   2284                  last_accepted_stream_id,
   2285                  active_streams_.size(),
   2286                  unclaimed_pushed_streams_.size(),
   2287                  status));
   2288   if (availability_state_ < STATE_GOING_AWAY) {
   2289     availability_state_ = STATE_GOING_AWAY;
   2290     // |pool_| will be NULL when |InitializeWithSocket()| is in the
   2291     // call stack.
   2292     if (pool_)
   2293       pool_->MakeSessionUnavailable(GetWeakPtr());
   2294   }
   2295   StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
   2296   // This is to handle the case when we already don't have any active
   2297   // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
   2298   // active streams and so the last one being closed will finish the
   2299   // going away process (see DeleteStream()).
   2300   MaybeFinishGoingAway();
   2301 }
   2302 
   2303 void SpdySession::OnPing(uint32 unique_id) {
   2304   CHECK(in_io_loop_);
   2305 
   2306   if (availability_state_ == STATE_CLOSED)
   2307     return;
   2308 
   2309   net_log_.AddEvent(
   2310       NetLog::TYPE_SPDY_SESSION_PING,
   2311       base::Bind(&NetLogSpdyPingCallback, unique_id, "received"));
   2312 
   2313   // Send response to a PING from server.
   2314   if (unique_id % 2 == 0) {
   2315     WritePingFrame(unique_id);
   2316     return;
   2317   }
   2318 
   2319   --pings_in_flight_;
   2320   if (pings_in_flight_ < 0) {
   2321     RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
   2322     CloseSessionResult result =
   2323         DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
   2324     DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
   2325     pings_in_flight_ = 0;
   2326     return;
   2327   }
   2328 
   2329   if (pings_in_flight_ > 0)
   2330     return;
   2331 
   2332   // We will record RTT in histogram when there are no more client sent
   2333   // pings_in_flight_.
   2334   RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
   2335 }
   2336 
   2337 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
   2338                                  uint32 delta_window_size) {
   2339   CHECK(in_io_loop_);
   2340 
   2341   if (availability_state_ == STATE_CLOSED)
   2342     return;
   2343 
   2344   DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
   2345   net_log_.AddEvent(
   2346       NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
   2347       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
   2348                  stream_id, delta_window_size));
   2349 
   2350   if (stream_id == kSessionFlowControlStreamId) {
   2351     // WINDOW_UPDATE for the session.
   2352     if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
   2353       LOG(WARNING) << "Received WINDOW_UPDATE for session when "
   2354                    << "session flow control is not turned on";
   2355       // TODO(akalin): Record an error and close the session.
   2356       return;
   2357     }
   2358 
   2359     if (delta_window_size < 1u) {
   2360       RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
   2361       CloseSessionResult result = DoCloseSession(
   2362           ERR_SPDY_PROTOCOL_ERROR,
   2363           "Received WINDOW_UPDATE with an invalid delta_window_size " +
   2364           base::UintToString(delta_window_size));
   2365       DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
   2366       return;
   2367     }
   2368 
   2369     IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
   2370   } else {
   2371     // WINDOW_UPDATE for a stream.
   2372     if (flow_control_state_ < FLOW_CONTROL_STREAM) {
   2373       // TODO(akalin): Record an error and close the session.
   2374       LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
   2375                    << " when flow control is not turned on";
   2376       return;
   2377     }
   2378 
   2379     ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2380 
   2381     if (it == active_streams_.end()) {
   2382       // NOTE:  it may just be that the stream was cancelled.
   2383       LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
   2384       return;
   2385     }
   2386 
   2387     SpdyStream* stream = it->second.stream;
   2388     CHECK_EQ(stream->stream_id(), stream_id);
   2389 
   2390     if (delta_window_size < 1u) {
   2391       ResetStreamIterator(it,
   2392                           RST_STREAM_FLOW_CONTROL_ERROR,
   2393                           base::StringPrintf(
   2394                               "Received WINDOW_UPDATE with an invalid "
   2395                               "delta_window_size %ud", delta_window_size));
   2396       return;
   2397     }
   2398 
   2399     CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2400     it->second.stream->IncreaseSendWindowSize(
   2401         static_cast<int32>(delta_window_size));
   2402   }
   2403 }
   2404 
   2405 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
   2406                                 SpdyStreamId promised_stream_id) {
   2407   // TODO(akalin): Handle PUSH_PROMISE frames.
   2408 }
   2409 
   2410 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
   2411                                          uint32 delta_window_size) {
   2412   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2413   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   2414   CHECK(it != active_streams_.end());
   2415   CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2416   SendWindowUpdateFrame(
   2417       stream_id, delta_window_size, it->second.stream->priority());
   2418 }
   2419 
   2420 void SpdySession::SendInitialData() {
   2421   DCHECK(enable_sending_initial_data_);
   2422   DCHECK_NE(availability_state_, STATE_CLOSED);
   2423 
   2424   if (send_connection_header_prefix_) {
   2425     DCHECK_EQ(protocol_, kProtoHTTP2Draft04);
   2426     scoped_ptr<SpdyFrame> connection_header_prefix_frame(
   2427         new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
   2428                       kHttp2ConnectionHeaderPrefixSize,
   2429                       false /* take_ownership */));
   2430     // Count the prefix as part of the subsequent SETTINGS frame.
   2431     EnqueueSessionWrite(HIGHEST, SETTINGS,
   2432                         connection_header_prefix_frame.Pass());
   2433   }
   2434 
   2435   // First, notify the server about the settings they should use when
   2436   // communicating with us.
   2437   SettingsMap settings_map;
   2438   // Create a new settings frame notifying the server of our
   2439   // max concurrent streams and initial window size.
   2440   settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
   2441       SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
   2442   if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
   2443       stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
   2444     settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
   2445         SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
   2446                               stream_initial_recv_window_size_);
   2447   }
   2448   SendSettings(settings_map);
   2449 
   2450   // Next, notify the server about our initial recv window size.
   2451   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   2452     // Bump up the receive window size to the real initial value. This
   2453     // has to go here since the WINDOW_UPDATE frame sent by
   2454     // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
   2455     DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
   2456     // This condition implies that |kDefaultInitialRecvWindowSize| -
   2457     // |session_recv_window_size_| doesn't overflow.
   2458     DCHECK_GT(session_recv_window_size_, 0);
   2459     IncreaseRecvWindowSize(
   2460         kDefaultInitialRecvWindowSize - session_recv_window_size_);
   2461   }
   2462 
   2463   // Finally, notify the server about the settings they have
   2464   // previously told us to use when communicating with them (after
   2465   // applying them).
   2466   const SettingsMap& server_settings_map =
   2467       http_server_properties_->GetSpdySettings(host_port_pair());
   2468   if (server_settings_map.empty())
   2469     return;
   2470 
   2471   SettingsMap::const_iterator it =
   2472       server_settings_map.find(SETTINGS_CURRENT_CWND);
   2473   uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
   2474   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
   2475 
   2476   for (SettingsMap::const_iterator it = server_settings_map.begin();
   2477        it != server_settings_map.end(); ++it) {
   2478     const SpdySettingsIds new_id = it->first;
   2479     const uint32 new_val = it->second.second;
   2480     HandleSetting(new_id, new_val);
   2481   }
   2482 
   2483   SendSettings(server_settings_map);
   2484 }
   2485 
   2486 
   2487 void SpdySession::SendSettings(const SettingsMap& settings) {
   2488   DCHECK_NE(availability_state_, STATE_CLOSED);
   2489 
   2490   net_log_.AddEvent(
   2491       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
   2492       base::Bind(&NetLogSpdySendSettingsCallback, &settings));
   2493 
   2494   // Create the SETTINGS frame and send it.
   2495   DCHECK(buffered_spdy_framer_.get());
   2496   scoped_ptr<SpdyFrame> settings_frame(
   2497       buffered_spdy_framer_->CreateSettings(settings));
   2498   sent_settings_ = true;
   2499   EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
   2500 }
   2501 
   2502 void SpdySession::HandleSetting(uint32 id, uint32 value) {
   2503   switch (id) {
   2504     case SETTINGS_MAX_CONCURRENT_STREAMS:
   2505       max_concurrent_streams_ = std::min(static_cast<size_t>(value),
   2506                                          kMaxConcurrentStreamLimit);
   2507       ProcessPendingStreamRequests();
   2508       break;
   2509     case SETTINGS_INITIAL_WINDOW_SIZE: {
   2510       if (flow_control_state_ < FLOW_CONTROL_STREAM) {
   2511         net_log().AddEvent(
   2512             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
   2513         return;
   2514       }
   2515 
   2516       if (value > static_cast<uint32>(kint32max)) {
   2517         net_log().AddEvent(
   2518             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
   2519             NetLog::IntegerCallback("initial_window_size", value));
   2520         return;
   2521       }
   2522 
   2523       // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
   2524       int32 delta_window_size =
   2525           static_cast<int32>(value) - stream_initial_send_window_size_;
   2526       stream_initial_send_window_size_ = static_cast<int32>(value);
   2527       UpdateStreamsSendWindowSize(delta_window_size);
   2528       net_log().AddEvent(
   2529           NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
   2530           NetLog::IntegerCallback("delta_window_size", delta_window_size));
   2531       break;
   2532     }
   2533   }
   2534 }
   2535 
   2536 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
   2537   DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2538   for (ActiveStreamMap::iterator it = active_streams_.begin();
   2539        it != active_streams_.end(); ++it) {
   2540     it->second.stream->AdjustSendWindowSize(delta_window_size);
   2541   }
   2542 
   2543   for (CreatedStreamSet::const_iterator it = created_streams_.begin();
   2544        it != created_streams_.end(); it++) {
   2545     (*it)->AdjustSendWindowSize(delta_window_size);
   2546   }
   2547 }
   2548 
   2549 void SpdySession::SendPrefacePingIfNoneInFlight() {
   2550   if (pings_in_flight_ || !enable_ping_based_connection_checking_)
   2551     return;
   2552 
   2553   base::TimeTicks now = time_func_();
   2554   // If there is no activity in the session, then send a preface-PING.
   2555   if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
   2556     SendPrefacePing();
   2557 }
   2558 
   2559 void SpdySession::SendPrefacePing() {
   2560   WritePingFrame(next_ping_id_);
   2561 }
   2562 
   2563 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
   2564                                         uint32 delta_window_size,
   2565                                         RequestPriority priority) {
   2566   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2567   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   2568   if (it != active_streams_.end()) {
   2569     CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2570   } else {
   2571     CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2572     CHECK_EQ(stream_id, kSessionFlowControlStreamId);
   2573   }
   2574 
   2575   net_log_.AddEvent(
   2576       NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
   2577       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
   2578                  stream_id, delta_window_size));
   2579 
   2580   DCHECK(buffered_spdy_framer_.get());
   2581   scoped_ptr<SpdyFrame> window_update_frame(
   2582       buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
   2583   EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
   2584 }
   2585 
   2586 void SpdySession::WritePingFrame(uint32 unique_id) {
   2587   DCHECK(buffered_spdy_framer_.get());
   2588   scoped_ptr<SpdyFrame> ping_frame(
   2589       buffered_spdy_framer_->CreatePingFrame(unique_id));
   2590   EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
   2591 
   2592   if (net_log().IsLoggingAllEvents()) {
   2593     net_log().AddEvent(
   2594         NetLog::TYPE_SPDY_SESSION_PING,
   2595         base::Bind(&NetLogSpdyPingCallback, unique_id, "sent"));
   2596   }
   2597   if (unique_id % 2 != 0) {
   2598     next_ping_id_ += 2;
   2599     ++pings_in_flight_;
   2600     PlanToCheckPingStatus();
   2601     last_ping_sent_time_ = time_func_();
   2602   }
   2603 }
   2604 
   2605 void SpdySession::PlanToCheckPingStatus() {
   2606   if (check_ping_status_pending_)
   2607     return;
   2608 
   2609   check_ping_status_pending_ = true;
   2610   base::MessageLoop::current()->PostDelayedTask(
   2611       FROM_HERE,
   2612       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
   2613                  time_func_()), hung_interval_);
   2614 }
   2615 
   2616 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
   2617   CHECK(!in_io_loop_);
   2618   DCHECK_NE(availability_state_, STATE_CLOSED);
   2619 
   2620   // Check if we got a response back for all PINGs we had sent.
   2621   if (pings_in_flight_ == 0) {
   2622     check_ping_status_pending_ = false;
   2623     return;
   2624   }
   2625 
   2626   DCHECK(check_ping_status_pending_);
   2627 
   2628   base::TimeTicks now = time_func_();
   2629   base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
   2630 
   2631   if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
   2632     // Track all failed PING messages in a separate bucket.
   2633     const base::TimeDelta kFailedPing =
   2634         base::TimeDelta::FromInternalValue(INT_MAX);
   2635     RecordPingRTTHistogram(kFailedPing);
   2636     CloseSessionResult result =
   2637         DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping.");
   2638     DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
   2639     return;
   2640   }
   2641 
   2642   // Check the status of connection after a delay.
   2643   base::MessageLoop::current()->PostDelayedTask(
   2644       FROM_HERE,
   2645       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
   2646                  now),
   2647       delay);
   2648 }
   2649 
   2650 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
   2651   UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
   2652 }
   2653 
   2654 void SpdySession::RecordProtocolErrorHistogram(
   2655     SpdyProtocolErrorDetails details) {
   2656   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
   2657                             NUM_SPDY_PROTOCOL_ERROR_DETAILS);
   2658   if (EndsWith(host_port_pair().host(), "google.com", false)) {
   2659     UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
   2660                               NUM_SPDY_PROTOCOL_ERROR_DETAILS);
   2661   }
   2662 }
   2663 
   2664 void SpdySession::RecordHistograms() {
   2665   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
   2666                               streams_initiated_count_,
   2667                               0, 300, 50);
   2668   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
   2669                               streams_pushed_count_,
   2670                               0, 300, 50);
   2671   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
   2672                               streams_pushed_and_claimed_count_,
   2673                               0, 300, 50);
   2674   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
   2675                               streams_abandoned_count_,
   2676                               0, 300, 50);
   2677   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
   2678                             sent_settings_ ? 1 : 0, 2);
   2679   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
   2680                             received_settings_ ? 1 : 0, 2);
   2681   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
   2682                               stalled_streams_,
   2683                               0, 300, 50);
   2684   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
   2685                             stalled_streams_ > 0 ? 1 : 0, 2);
   2686 
   2687   if (received_settings_) {
   2688     // Enumerate the saved settings, and set histograms for it.
   2689     const SettingsMap& settings_map =
   2690         http_server_properties_->GetSpdySettings(host_port_pair());
   2691 
   2692     SettingsMap::const_iterator it;
   2693     for (it = settings_map.begin(); it != settings_map.end(); ++it) {
   2694       const SpdySettingsIds id = it->first;
   2695       const uint32 val = it->second.second;
   2696       switch (id) {
   2697         case SETTINGS_CURRENT_CWND:
   2698           // Record several different histograms to see if cwnd converges
   2699           // for larger volumes of data being sent.
   2700           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
   2701                                       val, 1, 200, 100);
   2702           if (total_bytes_received_ > 10 * 1024) {
   2703             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
   2704                                         val, 1, 200, 100);
   2705             if (total_bytes_received_ > 25 * 1024) {
   2706               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
   2707                                           val, 1, 200, 100);
   2708               if (total_bytes_received_ > 50 * 1024) {
   2709                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
   2710                                             val, 1, 200, 100);
   2711                 if (total_bytes_received_ > 100 * 1024) {
   2712                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
   2713                                               val, 1, 200, 100);
   2714                 }
   2715               }
   2716             }
   2717           }
   2718           break;
   2719         case SETTINGS_ROUND_TRIP_TIME:
   2720           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
   2721                                       val, 1, 1200, 100);
   2722           break;
   2723         case SETTINGS_DOWNLOAD_RETRANS_RATE:
   2724           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
   2725                                       val, 1, 100, 50);
   2726           break;
   2727         default:
   2728           break;
   2729       }
   2730     }
   2731   }
   2732 }
   2733 
   2734 void SpdySession::CompleteStreamRequest(
   2735     const base::WeakPtr<SpdyStreamRequest>& pending_request) {
   2736   // Abort if the request has already been cancelled.
   2737   if (!pending_request)
   2738     return;
   2739 
   2740   base::WeakPtr<SpdyStream> stream;
   2741   int rv = CreateStream(*pending_request, &stream);
   2742 
   2743   if (rv == OK) {
   2744     DCHECK(stream);
   2745     pending_request->OnRequestCompleteSuccess(stream);
   2746   } else {
   2747     DCHECK(!stream);
   2748     pending_request->OnRequestCompleteFailure(rv);
   2749   }
   2750 }
   2751 
   2752 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
   2753   if (!is_secure_)
   2754     return NULL;
   2755   SSLClientSocket* ssl_socket =
   2756       reinterpret_cast<SSLClientSocket*>(connection_->socket());
   2757   DCHECK(ssl_socket);
   2758   return ssl_socket;
   2759 }
   2760 
   2761 void SpdySession::OnWriteBufferConsumed(
   2762     size_t frame_payload_size,
   2763     size_t consume_size,
   2764     SpdyBuffer::ConsumeSource consume_source) {
   2765   // We can be called with |in_io_loop_| set if a write SpdyBuffer is
   2766   // deleted (e.g., a stream is closed due to incoming data).
   2767 
   2768   if (availability_state_ == STATE_CLOSED)
   2769     return;
   2770 
   2771   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2772 
   2773   if (consume_source == SpdyBuffer::DISCARD) {
   2774     // If we're discarding a frame or part of it, increase the send
   2775     // window by the number of discarded bytes. (Although if we're
   2776     // discarding part of a frame, it's probably because of a write
   2777     // error and we'll be tearing down the session soon.)
   2778     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
   2779     DCHECK_GT(remaining_payload_bytes, 0u);
   2780     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
   2781   }
   2782   // For consumed bytes, the send window is increased when we receive
   2783   // a WINDOW_UPDATE frame.
   2784 }
   2785 
   2786 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
   2787   // We can be called with |in_io_loop_| set if a SpdyBuffer is
   2788   // deleted (e.g., a stream is closed due to incoming data).
   2789 
   2790   DCHECK_NE(availability_state_, STATE_CLOSED);
   2791   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2792   DCHECK_GE(delta_window_size, 1);
   2793 
   2794   // Check for overflow.
   2795   int32 max_delta_window_size = kint32max - session_send_window_size_;
   2796   if (delta_window_size > max_delta_window_size) {
   2797     RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
   2798     CloseSessionResult result = DoCloseSession(
   2799         ERR_SPDY_PROTOCOL_ERROR,
   2800         "Received WINDOW_UPDATE [delta: " +
   2801         base::IntToString(delta_window_size) +
   2802         "] for session overflows session_send_window_size_ [current: " +
   2803         base::IntToString(session_send_window_size_) + "]");
   2804     DCHECK_NE(result, SESSION_ALREADY_CLOSED);
   2805     return;
   2806   }
   2807 
   2808   session_send_window_size_ += delta_window_size;
   2809 
   2810   net_log_.AddEvent(
   2811       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
   2812       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   2813                  delta_window_size, session_send_window_size_));
   2814 
   2815   DCHECK(!IsSendStalled());
   2816   ResumeSendStalledStreams();
   2817 }
   2818 
   2819 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
   2820   DCHECK_NE(availability_state_, STATE_CLOSED);
   2821   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2822 
   2823   // We only call this method when sending a frame. Therefore,
   2824   // |delta_window_size| should be within the valid frame size range.
   2825   DCHECK_GE(delta_window_size, 1);
   2826   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
   2827 
   2828   // |send_window_size_| should have been at least |delta_window_size| for
   2829   // this call to happen.
   2830   DCHECK_GE(session_send_window_size_, delta_window_size);
   2831 
   2832   session_send_window_size_ -= delta_window_size;
   2833 
   2834   net_log_.AddEvent(
   2835       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
   2836       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   2837                  -delta_window_size, session_send_window_size_));
   2838 }
   2839 
   2840 void SpdySession::OnReadBufferConsumed(
   2841     size_t consume_size,
   2842     SpdyBuffer::ConsumeSource consume_source) {
   2843   // We can be called with |in_io_loop_| set if a read SpdyBuffer is
   2844   // deleted (e.g., discarded by a SpdyReadQueue).
   2845 
   2846   if (availability_state_ == STATE_CLOSED)
   2847     return;
   2848 
   2849   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2850   DCHECK_GE(consume_size, 1u);
   2851   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
   2852 
   2853   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
   2854 }
   2855 
   2856 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
   2857   DCHECK_NE(availability_state_, STATE_CLOSED);
   2858   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2859   DCHECK_GE(session_unacked_recv_window_bytes_, 0);
   2860   DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
   2861   DCHECK_GE(delta_window_size, 1);
   2862   // Check for overflow.
   2863   DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
   2864 
   2865   session_recv_window_size_ += delta_window_size;
   2866   net_log_.AddEvent(
   2867       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
   2868       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   2869                  delta_window_size, session_recv_window_size_));
   2870 
   2871   session_unacked_recv_window_bytes_ += delta_window_size;
   2872   if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
   2873     SendWindowUpdateFrame(kSessionFlowControlStreamId,
   2874                           session_unacked_recv_window_bytes_,
   2875                           HIGHEST);
   2876     session_unacked_recv_window_bytes_ = 0;
   2877   }
   2878 }
   2879 
   2880 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
   2881   CHECK(in_io_loop_);
   2882   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2883   DCHECK_GE(delta_window_size, 1);
   2884 
   2885   // Since we never decrease the initial receive window size,
   2886   // |delta_window_size| should never cause |recv_window_size_| to go
   2887   // negative. If we do, the receive window isn't being respected.
   2888   if (delta_window_size > session_recv_window_size_) {
   2889     RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
   2890     CloseSessionResult result = DoCloseSession(
   2891         ERR_SPDY_PROTOCOL_ERROR,
   2892         "delta_window_size is " + base::IntToString(delta_window_size) +
   2893             " in DecreaseRecvWindowSize, which is larger than the receive " +
   2894             "window size of " + base::IntToString(session_recv_window_size_));
   2895     DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
   2896     return;
   2897   }
   2898 
   2899   session_recv_window_size_ -= delta_window_size;
   2900   net_log_.AddEvent(
   2901       NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
   2902       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   2903                  -delta_window_size, session_recv_window_size_));
   2904 }
   2905 
   2906 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
   2907   DCHECK(stream.send_stalled_by_flow_control());
   2908   RequestPriority priority = stream.priority();
   2909   CHECK_GE(priority, MINIMUM_PRIORITY);
   2910   CHECK_LE(priority, MAXIMUM_PRIORITY);
   2911   stream_send_unstall_queue_[priority].push_back(stream.stream_id());
   2912 }
   2913 
   2914 void SpdySession::ResumeSendStalledStreams() {
   2915   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2916 
   2917   // We don't have to worry about new streams being queued, since
   2918   // doing so would cause IsSendStalled() to return true. But we do
   2919   // have to worry about streams being closed, as well as ourselves
   2920   // being closed.
   2921 
   2922   while (availability_state_ != STATE_CLOSED && !IsSendStalled()) {
   2923     size_t old_size = 0;
   2924     if (DCHECK_IS_ON())
   2925       old_size = GetTotalSize(stream_send_unstall_queue_);
   2926 
   2927     SpdyStreamId stream_id = PopStreamToPossiblyResume();
   2928     if (stream_id == 0)
   2929       break;
   2930     ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   2931     // The stream may actually still be send-stalled after this (due
   2932     // to its own send window) but that's okay -- it'll then be
   2933     // resumed once its send window increases.
   2934     if (it != active_streams_.end())
   2935       it->second.stream->PossiblyResumeIfSendStalled();
   2936 
   2937     // The size should decrease unless we got send-stalled again.
   2938     if (!IsSendStalled())
   2939       DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
   2940   }
   2941 }
   2942 
   2943 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
   2944   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
   2945     std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
   2946     if (!queue->empty()) {
   2947       SpdyStreamId stream_id = queue->front();
   2948       queue->pop_front();
   2949       return stream_id;
   2950     }
   2951   }
   2952   return 0;
   2953 }
   2954 
   2955 }  // namespace net
   2956