Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_session.h"
      6 
      7 #include <algorithm>
      8 #include <map>
      9 
     10 #include "base/basictypes.h"
     11 #include "base/bind.h"
     12 #include "base/compiler_specific.h"
     13 #include "base/logging.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "base/metrics/field_trial.h"
     16 #include "base/metrics/histogram.h"
     17 #include "base/metrics/sparse_histogram.h"
     18 #include "base/metrics/stats_counters.h"
     19 #include "base/stl_util.h"
     20 #include "base/strings/string_number_conversions.h"
     21 #include "base/strings/string_util.h"
     22 #include "base/strings/stringprintf.h"
     23 #include "base/strings/utf_string_conversions.h"
     24 #include "base/time/time.h"
     25 #include "base/values.h"
     26 #include "crypto/ec_private_key.h"
     27 #include "crypto/ec_signature_creator.h"
     28 #include "net/base/connection_type_histograms.h"
     29 #include "net/base/net_log.h"
     30 #include "net/base/net_util.h"
     31 #include "net/cert/asn1_util.h"
     32 #include "net/http/http_log_util.h"
     33 #include "net/http/http_network_session.h"
     34 #include "net/http/http_server_properties.h"
     35 #include "net/http/http_util.h"
     36 #include "net/spdy/spdy_buffer_producer.h"
     37 #include "net/spdy/spdy_frame_builder.h"
     38 #include "net/spdy/spdy_http_utils.h"
     39 #include "net/spdy/spdy_protocol.h"
     40 #include "net/spdy/spdy_session_pool.h"
     41 #include "net/spdy/spdy_stream.h"
     42 #include "net/ssl/server_bound_cert_service.h"
     43 #include "net/ssl/ssl_cipher_suite_names.h"
     44 #include "net/ssl/ssl_connection_status_flags.h"
     45 
     46 namespace net {
     47 
     48 namespace {
     49 
     50 const int kReadBufferSize = 8 * 1024;
     51 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
     52 const int kHungIntervalSeconds = 10;
     53 
     54 // Minimum seconds that unclaimed pushed streams will be kept in memory.
     55 const int kMinPushedStreamLifetimeSeconds = 300;
     56 
     57 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
     58     const SpdyHeaderBlock& headers,
     59     net::NetLog::LogLevel log_level) {
     60   scoped_ptr<base::ListValue> headers_list(new base::ListValue());
     61   for (SpdyHeaderBlock::const_iterator it = headers.begin();
     62        it != headers.end(); ++it) {
     63     headers_list->AppendString(
     64         it->first + ": " +
     65         ElideHeaderValueForNetLog(log_level, it->first, it->second));
     66   }
     67   return headers_list.Pass();
     68 }
     69 
     70 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
     71                                              bool fin,
     72                                              bool unidirectional,
     73                                              SpdyPriority spdy_priority,
     74                                              SpdyStreamId stream_id,
     75                                              NetLog::LogLevel log_level) {
     76   base::DictionaryValue* dict = new base::DictionaryValue();
     77   dict->Set("headers",
     78             SpdyHeaderBlockToListValue(*headers, log_level).release());
     79   dict->SetBoolean("fin", fin);
     80   dict->SetBoolean("unidirectional", unidirectional);
     81   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
     82   dict->SetInteger("stream_id", stream_id);
     83   return dict;
     84 }
     85 
     86 base::Value* NetLogSpdySynStreamReceivedCallback(
     87     const SpdyHeaderBlock* headers,
     88     bool fin,
     89     bool unidirectional,
     90     SpdyPriority spdy_priority,
     91     SpdyStreamId stream_id,
     92     SpdyStreamId associated_stream,
     93     NetLog::LogLevel log_level) {
     94   base::DictionaryValue* dict = new base::DictionaryValue();
     95   dict->Set("headers",
     96             SpdyHeaderBlockToListValue(*headers, log_level).release());
     97   dict->SetBoolean("fin", fin);
     98   dict->SetBoolean("unidirectional", unidirectional);
     99   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
    100   dict->SetInteger("stream_id", stream_id);
    101   dict->SetInteger("associated_stream", associated_stream);
    102   return dict;
    103 }
    104 
    105 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
    106     const SpdyHeaderBlock* headers,
    107     bool fin,
    108     SpdyStreamId stream_id,
    109     NetLog::LogLevel log_level) {
    110   base::DictionaryValue* dict = new base::DictionaryValue();
    111   dict->Set("headers",
    112             SpdyHeaderBlockToListValue(*headers, log_level).release());
    113   dict->SetBoolean("fin", fin);
    114   dict->SetInteger("stream_id", stream_id);
    115   return dict;
    116 }
    117 
    118 base::Value* NetLogSpdySessionCloseCallback(int net_error,
    119                                             const std::string* description,
    120                                             NetLog::LogLevel /* log_level */) {
    121   base::DictionaryValue* dict = new base::DictionaryValue();
    122   dict->SetInteger("net_error", net_error);
    123   dict->SetString("description", *description);
    124   return dict;
    125 }
    126 
    127 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
    128                                        NetLog::LogLevel /* log_level */) {
    129   base::DictionaryValue* dict = new base::DictionaryValue();
    130   dict->SetString("host", host_pair->first.ToString());
    131   dict->SetString("proxy", host_pair->second.ToPacString());
    132   return dict;
    133 }
    134 
    135 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
    136                                         bool clear_persisted,
    137                                         NetLog::LogLevel /* log_level */) {
    138   base::DictionaryValue* dict = new base::DictionaryValue();
    139   dict->SetString("host", host_port_pair.ToString());
    140   dict->SetBoolean("clear_persisted", clear_persisted);
    141   return dict;
    142 }
    143 
    144 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
    145                                        SpdySettingsFlags flags,
    146                                        uint32 value,
    147                                        NetLog::LogLevel /* log_level */) {
    148   base::DictionaryValue* dict = new base::DictionaryValue();
    149   dict->SetInteger("id", id);
    150   dict->SetInteger("flags", flags);
    151   dict->SetInteger("value", value);
    152   return dict;
    153 }
    154 
    155 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings,
    156                                             NetLog::LogLevel /* log_level */) {
    157   base::DictionaryValue* dict = new base::DictionaryValue();
    158   base::ListValue* settings_list = new base::ListValue();
    159   for (SettingsMap::const_iterator it = settings->begin();
    160        it != settings->end(); ++it) {
    161     const SpdySettingsIds id = it->first;
    162     const SpdySettingsFlags flags = it->second.first;
    163     const uint32 value = it->second.second;
    164     settings_list->Append(new base::StringValue(
    165         base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value)));
    166   }
    167   dict->Set("settings", settings_list);
    168   return dict;
    169 }
    170 
    171 base::Value* NetLogSpdyWindowUpdateFrameCallback(
    172     SpdyStreamId stream_id,
    173     uint32 delta,
    174     NetLog::LogLevel /* log_level */) {
    175   base::DictionaryValue* dict = new base::DictionaryValue();
    176   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    177   dict->SetInteger("delta", delta);
    178   return dict;
    179 }
    180 
    181 base::Value* NetLogSpdySessionWindowUpdateCallback(
    182     int32 delta,
    183     int32 window_size,
    184     NetLog::LogLevel /* log_level */) {
    185   base::DictionaryValue* dict = new base::DictionaryValue();
    186   dict->SetInteger("delta", delta);
    187   dict->SetInteger("window_size", window_size);
    188   return dict;
    189 }
    190 
    191 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
    192                                     int size,
    193                                     bool fin,
    194                                     NetLog::LogLevel /* log_level */) {
    195   base::DictionaryValue* dict = new base::DictionaryValue();
    196   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    197   dict->SetInteger("size", size);
    198   dict->SetBoolean("fin", fin);
    199   return dict;
    200 }
    201 
    202 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
    203                                    int status,
    204                                    const std::string* description,
    205                                    NetLog::LogLevel /* log_level */) {
    206   base::DictionaryValue* dict = new base::DictionaryValue();
    207   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    208   dict->SetInteger("status", status);
    209   dict->SetString("description", *description);
    210   return dict;
    211 }
    212 
    213 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
    214                                     bool is_ack,
    215                                     const char* type,
    216                                     NetLog::LogLevel /* log_level */) {
    217   base::DictionaryValue* dict = new base::DictionaryValue();
    218   dict->SetInteger("unique_id", unique_id);
    219   dict->SetString("type", type);
    220   dict->SetBoolean("is_ack", is_ack);
    221   return dict;
    222 }
    223 
    224 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
    225                                       int active_streams,
    226                                       int unclaimed_streams,
    227                                       SpdyGoAwayStatus status,
    228                                       NetLog::LogLevel /* log_level */) {
    229   base::DictionaryValue* dict = new base::DictionaryValue();
    230   dict->SetInteger("last_accepted_stream_id",
    231                    static_cast<int>(last_stream_id));
    232   dict->SetInteger("active_streams", active_streams);
    233   dict->SetInteger("unclaimed_streams", unclaimed_streams);
    234   dict->SetInteger("status", static_cast<int>(status));
    235   return dict;
    236 }
    237 
    238 base::Value* NetLogSpdyPushPromiseReceivedCallback(
    239     const SpdyHeaderBlock* headers,
    240     SpdyStreamId stream_id,
    241     SpdyStreamId promised_stream_id,
    242     NetLog::LogLevel log_level) {
    243   base::DictionaryValue* dict = new base::DictionaryValue();
    244   dict->Set("headers",
    245             SpdyHeaderBlockToListValue(*headers, log_level).release());
    246   dict->SetInteger("id", stream_id);
    247   dict->SetInteger("promised_stream_id", promised_stream_id);
    248   return dict;
    249 }
    250 
    251 // Helper function to return the total size of an array of objects
    252 // with .size() member functions.
    253 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
    254   size_t total_size = 0;
    255   for (size_t i = 0; i < N; ++i) {
    256     total_size += arr[i].size();
    257   }
    258   return total_size;
    259 }
    260 
    261 // Helper class for std:find_if on STL container containing
    262 // SpdyStreamRequest weak pointers.
    263 class RequestEquals {
    264  public:
    265   RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
    266       : request_(request) {}
    267 
    268   bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
    269     return request_.get() == request.get();
    270   }
    271 
    272  private:
    273   const base::WeakPtr<SpdyStreamRequest> request_;
    274 };
    275 
    276 // The maximum number of concurrent streams we will ever create.  Even if
    277 // the server permits more, we will never exceed this limit.
    278 const size_t kMaxConcurrentStreamLimit = 256;
    279 
    280 }  // namespace
    281 
    282 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
    283     SpdyFramer::SpdyError err) {
    284   switch(err) {
    285     case SpdyFramer::SPDY_NO_ERROR:
    286       return SPDY_ERROR_NO_ERROR;
    287     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
    288       return SPDY_ERROR_INVALID_CONTROL_FRAME;
    289     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
    290       return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
    291     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
    292       return SPDY_ERROR_ZLIB_INIT_FAILURE;
    293     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
    294       return SPDY_ERROR_UNSUPPORTED_VERSION;
    295     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
    296       return SPDY_ERROR_DECOMPRESS_FAILURE;
    297     case SpdyFramer::SPDY_COMPRESS_FAILURE:
    298       return SPDY_ERROR_COMPRESS_FAILURE;
    299     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
    300       return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
    301     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
    302       return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
    303     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
    304       return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
    305     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
    306       return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
    307     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
    308       return SPDY_ERROR_UNEXPECTED_FRAME;
    309     default:
    310       NOTREACHED();
    311       return static_cast<SpdyProtocolErrorDetails>(-1);
    312   }
    313 }
    314 
    315 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) {
    316   switch (err) {
    317     case SpdyFramer::SPDY_NO_ERROR:
    318       return OK;
    319     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
    320       return ERR_SPDY_PROTOCOL_ERROR;
    321     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
    322       return ERR_SPDY_FRAME_SIZE_ERROR;
    323     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
    324       return ERR_SPDY_COMPRESSION_ERROR;
    325     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
    326       return ERR_SPDY_PROTOCOL_ERROR;
    327     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
    328       return ERR_SPDY_COMPRESSION_ERROR;
    329     case SpdyFramer::SPDY_COMPRESS_FAILURE:
    330       return ERR_SPDY_COMPRESSION_ERROR;
    331     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
    332       return ERR_SPDY_PROTOCOL_ERROR;
    333     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
    334       return ERR_SPDY_PROTOCOL_ERROR;
    335     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
    336       return ERR_SPDY_PROTOCOL_ERROR;
    337     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
    338       return ERR_SPDY_PROTOCOL_ERROR;
    339     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
    340       return ERR_SPDY_PROTOCOL_ERROR;
    341     default:
    342       NOTREACHED();
    343       return ERR_SPDY_PROTOCOL_ERROR;
    344   }
    345 }
    346 
    347 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
    348     SpdyRstStreamStatus status) {
    349   switch(status) {
    350     case RST_STREAM_PROTOCOL_ERROR:
    351       return STATUS_CODE_PROTOCOL_ERROR;
    352     case RST_STREAM_INVALID_STREAM:
    353       return STATUS_CODE_INVALID_STREAM;
    354     case RST_STREAM_REFUSED_STREAM:
    355       return STATUS_CODE_REFUSED_STREAM;
    356     case RST_STREAM_UNSUPPORTED_VERSION:
    357       return STATUS_CODE_UNSUPPORTED_VERSION;
    358     case RST_STREAM_CANCEL:
    359       return STATUS_CODE_CANCEL;
    360     case RST_STREAM_INTERNAL_ERROR:
    361       return STATUS_CODE_INTERNAL_ERROR;
    362     case RST_STREAM_FLOW_CONTROL_ERROR:
    363       return STATUS_CODE_FLOW_CONTROL_ERROR;
    364     case RST_STREAM_STREAM_IN_USE:
    365       return STATUS_CODE_STREAM_IN_USE;
    366     case RST_STREAM_STREAM_ALREADY_CLOSED:
    367       return STATUS_CODE_STREAM_ALREADY_CLOSED;
    368     case RST_STREAM_INVALID_CREDENTIALS:
    369       return STATUS_CODE_INVALID_CREDENTIALS;
    370     case RST_STREAM_FRAME_SIZE_ERROR:
    371       return STATUS_CODE_FRAME_SIZE_ERROR;
    372     case RST_STREAM_SETTINGS_TIMEOUT:
    373       return STATUS_CODE_SETTINGS_TIMEOUT;
    374     case RST_STREAM_CONNECT_ERROR:
    375       return STATUS_CODE_CONNECT_ERROR;
    376     case RST_STREAM_ENHANCE_YOUR_CALM:
    377       return STATUS_CODE_ENHANCE_YOUR_CALM;
    378     default:
    379       NOTREACHED();
    380       return static_cast<SpdyProtocolErrorDetails>(-1);
    381   }
    382 }
    383 
    384 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) {
    385   switch (err) {
    386     case OK:
    387       return GOAWAY_NO_ERROR;
    388     case ERR_SPDY_PROTOCOL_ERROR:
    389       return GOAWAY_PROTOCOL_ERROR;
    390     case ERR_SPDY_FLOW_CONTROL_ERROR:
    391       return GOAWAY_FLOW_CONTROL_ERROR;
    392     case ERR_SPDY_FRAME_SIZE_ERROR:
    393       return GOAWAY_FRAME_SIZE_ERROR;
    394     case ERR_SPDY_COMPRESSION_ERROR:
    395       return GOAWAY_COMPRESSION_ERROR;
    396     case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY:
    397       return GOAWAY_INADEQUATE_SECURITY;
    398     default:
    399       return GOAWAY_PROTOCOL_ERROR;
    400   }
    401 }
    402 
    403 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers,
    404                                             SpdyMajorVersion protocol_version,
    405                                             SpdyHeaderBlock* request_headers,
    406                                             SpdyHeaderBlock* response_headers) {
    407   DCHECK(response_headers);
    408   DCHECK(request_headers);
    409   for (SpdyHeaderBlock::const_iterator it = headers.begin();
    410        it != headers.end();
    411        ++it) {
    412     SpdyHeaderBlock* to_insert = response_headers;
    413     if (protocol_version == SPDY2) {
    414       if (it->first == "url")
    415         to_insert = request_headers;
    416     } else {
    417       const char* host = protocol_version >= SPDY4 ? ":authority" : ":host";
    418       static const char* scheme = ":scheme";
    419       static const char* path = ":path";
    420       if (it->first == host || it->first == scheme || it->first == path)
    421         to_insert = request_headers;
    422     }
    423     to_insert->insert(*it);
    424   }
    425 }
    426 
    427 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
    428   Reset();
    429 }
    430 
    431 SpdyStreamRequest::~SpdyStreamRequest() {
    432   CancelRequest();
    433 }
    434 
    435 int SpdyStreamRequest::StartRequest(
    436     SpdyStreamType type,
    437     const base::WeakPtr<SpdySession>& session,
    438     const GURL& url,
    439     RequestPriority priority,
    440     const BoundNetLog& net_log,
    441     const CompletionCallback& callback) {
    442   DCHECK(session);
    443   DCHECK(!session_);
    444   DCHECK(!stream_);
    445   DCHECK(callback_.is_null());
    446 
    447   type_ = type;
    448   session_ = session;
    449   url_ = url;
    450   priority_ = priority;
    451   net_log_ = net_log;
    452   callback_ = callback;
    453 
    454   base::WeakPtr<SpdyStream> stream;
    455   int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
    456   if (rv == OK) {
    457     Reset();
    458     stream_ = stream;
    459   }
    460   return rv;
    461 }
    462 
    463 void SpdyStreamRequest::CancelRequest() {
    464   if (session_)
    465     session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
    466   Reset();
    467   // Do this to cancel any pending CompleteStreamRequest() tasks.
    468   weak_ptr_factory_.InvalidateWeakPtrs();
    469 }
    470 
    471 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
    472   DCHECK(!session_);
    473   base::WeakPtr<SpdyStream> stream = stream_;
    474   DCHECK(stream);
    475   Reset();
    476   return stream;
    477 }
    478 
    479 void SpdyStreamRequest::OnRequestCompleteSuccess(
    480     const base::WeakPtr<SpdyStream>& stream) {
    481   DCHECK(session_);
    482   DCHECK(!stream_);
    483   DCHECK(!callback_.is_null());
    484   CompletionCallback callback = callback_;
    485   Reset();
    486   DCHECK(stream);
    487   stream_ = stream;
    488   callback.Run(OK);
    489 }
    490 
    491 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
    492   DCHECK(session_);
    493   DCHECK(!stream_);
    494   DCHECK(!callback_.is_null());
    495   CompletionCallback callback = callback_;
    496   Reset();
    497   DCHECK_NE(rv, OK);
    498   callback.Run(rv);
    499 }
    500 
    501 void SpdyStreamRequest::Reset() {
    502   type_ = SPDY_BIDIRECTIONAL_STREAM;
    503   session_.reset();
    504   stream_.reset();
    505   url_ = GURL();
    506   priority_ = MINIMUM_PRIORITY;
    507   net_log_ = BoundNetLog();
    508   callback_.Reset();
    509 }
    510 
    511 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
    512     : stream(NULL),
    513       waiting_for_syn_reply(false) {}
    514 
    515 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
    516     : stream(stream),
    517       waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {
    518 }
    519 
    520 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
    521 
    522 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
    523 
    524 SpdySession::PushedStreamInfo::PushedStreamInfo(
    525     SpdyStreamId stream_id,
    526     base::TimeTicks creation_time)
    527     : stream_id(stream_id),
    528       creation_time(creation_time) {}
    529 
    530 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
    531 
    532 SpdySession::SpdySession(
    533     const SpdySessionKey& spdy_session_key,
    534     const base::WeakPtr<HttpServerProperties>& http_server_properties,
    535     bool verify_domain_authentication,
    536     bool enable_sending_initial_data,
    537     bool enable_compression,
    538     bool enable_ping_based_connection_checking,
    539     NextProto default_protocol,
    540     size_t stream_initial_recv_window_size,
    541     size_t initial_max_concurrent_streams,
    542     size_t max_concurrent_streams_limit,
    543     TimeFunc time_func,
    544     const HostPortPair& trusted_spdy_proxy,
    545     NetLog* net_log)
    546     : in_io_loop_(false),
    547       spdy_session_key_(spdy_session_key),
    548       pool_(NULL),
    549       http_server_properties_(http_server_properties),
    550       read_buffer_(new IOBuffer(kReadBufferSize)),
    551       stream_hi_water_mark_(kFirstStreamId),
    552       in_flight_write_frame_type_(DATA),
    553       in_flight_write_frame_size_(0),
    554       is_secure_(false),
    555       certificate_error_code_(OK),
    556       availability_state_(STATE_AVAILABLE),
    557       read_state_(READ_STATE_DO_READ),
    558       write_state_(WRITE_STATE_IDLE),
    559       error_on_close_(OK),
    560       max_concurrent_streams_(initial_max_concurrent_streams == 0
    561                                   ? kInitialMaxConcurrentStreams
    562                                   : initial_max_concurrent_streams),
    563       max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
    564                                         ? kMaxConcurrentStreamLimit
    565                                         : max_concurrent_streams_limit),
    566       streams_initiated_count_(0),
    567       streams_pushed_count_(0),
    568       streams_pushed_and_claimed_count_(0),
    569       streams_abandoned_count_(0),
    570       total_bytes_received_(0),
    571       sent_settings_(false),
    572       received_settings_(false),
    573       stalled_streams_(0),
    574       pings_in_flight_(0),
    575       next_ping_id_(1),
    576       last_activity_time_(time_func()),
    577       last_compressed_frame_len_(0),
    578       check_ping_status_pending_(false),
    579       send_connection_header_prefix_(false),
    580       flow_control_state_(FLOW_CONTROL_NONE),
    581       stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
    582       stream_initial_recv_window_size_(stream_initial_recv_window_size == 0
    583                                            ? kDefaultInitialRecvWindowSize
    584                                            : stream_initial_recv_window_size),
    585       session_send_window_size_(0),
    586       session_recv_window_size_(0),
    587       session_unacked_recv_window_bytes_(0),
    588       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
    589       verify_domain_authentication_(verify_domain_authentication),
    590       enable_sending_initial_data_(enable_sending_initial_data),
    591       enable_compression_(enable_compression),
    592       enable_ping_based_connection_checking_(
    593           enable_ping_based_connection_checking),
    594       protocol_(default_protocol),
    595       connection_at_risk_of_loss_time_(
    596           base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
    597       hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
    598       trusted_spdy_proxy_(trusted_spdy_proxy),
    599       time_func_(time_func),
    600       weak_factory_(this) {
    601   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
    602   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
    603   DCHECK(HttpStreamFactory::spdy_enabled());
    604   net_log_.BeginEvent(
    605       NetLog::TYPE_SPDY_SESSION,
    606       base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
    607   next_unclaimed_push_stream_sweep_time_ = time_func_() +
    608       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
    609   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
    610 }
    611 
    612 SpdySession::~SpdySession() {
    613   CHECK(!in_io_loop_);
    614   DcheckDraining();
    615 
    616   // TODO(akalin): Check connection->is_initialized() instead. This
    617   // requires re-working CreateFakeSpdySession(), though.
    618   DCHECK(connection_->socket());
    619   // With SPDY we can't recycle sockets.
    620   connection_->socket()->Disconnect();
    621 
    622   RecordHistograms();
    623 
    624   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
    625 }
    626 
    627 void SpdySession::InitializeWithSocket(
    628     scoped_ptr<ClientSocketHandle> connection,
    629     SpdySessionPool* pool,
    630     bool is_secure,
    631     int certificate_error_code) {
    632   CHECK(!in_io_loop_);
    633   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
    634   DCHECK_EQ(read_state_, READ_STATE_DO_READ);
    635   DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
    636   DCHECK(!connection_);
    637 
    638   DCHECK(certificate_error_code == OK ||
    639          certificate_error_code < ERR_IO_PENDING);
    640   // TODO(akalin): Check connection->is_initialized() instead. This
    641   // requires re-working CreateFakeSpdySession(), though.
    642   DCHECK(connection->socket());
    643 
    644   base::StatsCounter spdy_sessions("spdy.sessions");
    645   spdy_sessions.Increment();
    646 
    647   connection_ = connection.Pass();
    648   is_secure_ = is_secure;
    649   certificate_error_code_ = certificate_error_code;
    650 
    651   NextProto protocol_negotiated =
    652       connection_->socket()->GetNegotiatedProtocol();
    653   if (protocol_negotiated != kProtoUnknown) {
    654     protocol_ = protocol_negotiated;
    655   }
    656   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
    657   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
    658 
    659   if (protocol_ == kProtoSPDY4)
    660     send_connection_header_prefix_ = true;
    661 
    662   if (protocol_ >= kProtoSPDY31) {
    663     flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
    664     session_send_window_size_ = kSpdySessionInitialWindowSize;
    665     session_recv_window_size_ = kSpdySessionInitialWindowSize;
    666   } else if (protocol_ >= kProtoSPDY3) {
    667     flow_control_state_ = FLOW_CONTROL_STREAM;
    668   } else {
    669     flow_control_state_ = FLOW_CONTROL_NONE;
    670   }
    671 
    672   buffered_spdy_framer_.reset(
    673       new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
    674                              enable_compression_));
    675   buffered_spdy_framer_->set_visitor(this);
    676   buffered_spdy_framer_->set_debug_visitor(this);
    677   UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion);
    678 #if defined(SPDY_PROXY_AUTH_ORIGIN)
    679   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
    680                         host_port_pair().Equals(HostPortPair::FromURL(
    681                             GURL(SPDY_PROXY_AUTH_ORIGIN))));
    682 #endif
    683 
    684   net_log_.AddEvent(
    685       NetLog::TYPE_SPDY_SESSION_INITIALIZED,
    686       connection_->socket()->NetLog().source().ToEventParametersCallback());
    687 
    688   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
    689   connection_->AddHigherLayeredPool(this);
    690   if (enable_sending_initial_data_)
    691     SendInitialData();
    692   pool_ = pool;
    693 
    694   // Bootstrap the read loop.
    695   base::MessageLoop::current()->PostTask(
    696       FROM_HERE,
    697       base::Bind(&SpdySession::PumpReadLoop,
    698                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
    699 }
    700 
    701 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
    702   if (!verify_domain_authentication_)
    703     return true;
    704 
    705   if (availability_state_ == STATE_DRAINING)
    706     return false;
    707 
    708   SSLInfo ssl_info;
    709   bool was_npn_negotiated;
    710   NextProto protocol_negotiated = kProtoUnknown;
    711   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
    712     return true;   // This is not a secure session, so all domains are okay.
    713 
    714   // Disable pooling for secure sessions.
    715   // TODO(rch): re-enable this.
    716   return false;
    717 #if 0
    718   bool unused = false;
    719   return
    720       !ssl_info.client_cert_sent &&
    721       (!ssl_info.channel_id_sent ||
    722        (ServerBoundCertService::GetDomainForHost(domain) ==
    723         ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) &&
    724       ssl_info.cert->VerifyNameMatch(domain, &unused);
    725 #endif
    726 }
    727 
    728 int SpdySession::GetPushStream(
    729     const GURL& url,
    730     base::WeakPtr<SpdyStream>* stream,
    731     const BoundNetLog& stream_net_log) {
    732   CHECK(!in_io_loop_);
    733 
    734   stream->reset();
    735 
    736   if (availability_state_ == STATE_DRAINING)
    737     return ERR_CONNECTION_CLOSED;
    738 
    739   Error err = TryAccessStream(url);
    740   if (err != OK)
    741     return err;
    742 
    743   *stream = GetActivePushStream(url);
    744   if (*stream) {
    745     DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
    746     streams_pushed_and_claimed_count_++;
    747   }
    748   return OK;
    749 }
    750 
    751 // {,Try}CreateStream() and TryAccessStream() can be called with
    752 // |in_io_loop_| set if a stream is being created in response to
    753 // another being closed due to received data.
    754 
    755 Error SpdySession::TryAccessStream(const GURL& url) {
    756   if (is_secure_ && certificate_error_code_ != OK &&
    757       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
    758     RecordProtocolErrorHistogram(
    759         PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
    760     DoDrainSession(
    761         static_cast<Error>(certificate_error_code_),
    762         "Tried to get SPDY stream for secure content over an unauthenticated "
    763         "session.");
    764     return ERR_SPDY_PROTOCOL_ERROR;
    765   }
    766   return OK;
    767 }
    768 
    769 int SpdySession::TryCreateStream(
    770     const base::WeakPtr<SpdyStreamRequest>& request,
    771     base::WeakPtr<SpdyStream>* stream) {
    772   DCHECK(request);
    773 
    774   if (availability_state_ == STATE_GOING_AWAY)
    775     return ERR_FAILED;
    776 
    777   if (availability_state_ == STATE_DRAINING)
    778     return ERR_CONNECTION_CLOSED;
    779 
    780   Error err = TryAccessStream(request->url());
    781   if (err != OK)
    782     return err;
    783 
    784   if (!max_concurrent_streams_ ||
    785       (active_streams_.size() + created_streams_.size() <
    786        max_concurrent_streams_)) {
    787     return CreateStream(*request, stream);
    788   }
    789 
    790   stalled_streams_++;
    791   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
    792   RequestPriority priority = request->priority();
    793   CHECK_GE(priority, MINIMUM_PRIORITY);
    794   CHECK_LE(priority, MAXIMUM_PRIORITY);
    795   pending_create_stream_queues_[priority].push_back(request);
    796   return ERR_IO_PENDING;
    797 }
    798 
    799 int SpdySession::CreateStream(const SpdyStreamRequest& request,
    800                               base::WeakPtr<SpdyStream>* stream) {
    801   DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
    802   DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
    803 
    804   if (availability_state_ == STATE_GOING_AWAY)
    805     return ERR_FAILED;
    806 
    807   if (availability_state_ == STATE_DRAINING)
    808     return ERR_CONNECTION_CLOSED;
    809 
    810   Error err = TryAccessStream(request.url());
    811   if (err != OK) {
    812     // This should have been caught in TryCreateStream().
    813     NOTREACHED();
    814     return err;
    815   }
    816 
    817   DCHECK(connection_->socket());
    818   DCHECK(connection_->socket()->IsConnected());
    819   if (connection_->socket()) {
    820     UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
    821                           connection_->socket()->IsConnected());
    822     if (!connection_->socket()->IsConnected()) {
    823       DoDrainSession(
    824           ERR_CONNECTION_CLOSED,
    825           "Tried to create SPDY stream for a closed socket connection.");
    826       return ERR_CONNECTION_CLOSED;
    827     }
    828   }
    829 
    830   scoped_ptr<SpdyStream> new_stream(
    831       new SpdyStream(request.type(), GetWeakPtr(), request.url(),
    832                      request.priority(),
    833                      stream_initial_send_window_size_,
    834                      stream_initial_recv_window_size_,
    835                      request.net_log()));
    836   *stream = new_stream->GetWeakPtr();
    837   InsertCreatedStream(new_stream.Pass());
    838 
    839   UMA_HISTOGRAM_CUSTOM_COUNTS(
    840       "Net.SpdyPriorityCount",
    841       static_cast<int>(request.priority()), 0, 10, 11);
    842 
    843   return OK;
    844 }
    845 
    846 void SpdySession::CancelStreamRequest(
    847     const base::WeakPtr<SpdyStreamRequest>& request) {
    848   DCHECK(request);
    849   RequestPriority priority = request->priority();
    850   CHECK_GE(priority, MINIMUM_PRIORITY);
    851   CHECK_LE(priority, MAXIMUM_PRIORITY);
    852 
    853 #if DCHECK_IS_ON
    854   // |request| should not be in a queue not matching its priority.
    855   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
    856     if (priority == i)
    857       continue;
    858     PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
    859     DCHECK(std::find_if(queue->begin(),
    860                         queue->end(),
    861                         RequestEquals(request)) == queue->end());
    862   }
    863 #endif
    864 
    865   PendingStreamRequestQueue* queue =
    866       &pending_create_stream_queues_[priority];
    867   // Remove |request| from |queue| while preserving the order of the
    868   // other elements.
    869   PendingStreamRequestQueue::iterator it =
    870       std::find_if(queue->begin(), queue->end(), RequestEquals(request));
    871   // The request may already be removed if there's a
    872   // CompleteStreamRequest() in flight.
    873   if (it != queue->end()) {
    874     it = queue->erase(it);
    875     // |request| should be in the queue at most once, and if it is
    876     // present, should not be pending completion.
    877     DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
    878            queue->end());
    879   }
    880 }
    881 
    882 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
    883   for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
    884     if (pending_create_stream_queues_[j].empty())
    885       continue;
    886 
    887     base::WeakPtr<SpdyStreamRequest> pending_request =
    888         pending_create_stream_queues_[j].front();
    889     DCHECK(pending_request);
    890     pending_create_stream_queues_[j].pop_front();
    891     return pending_request;
    892   }
    893   return base::WeakPtr<SpdyStreamRequest>();
    894 }
    895 
    896 void SpdySession::ProcessPendingStreamRequests() {
    897   // Like |max_concurrent_streams_|, 0 means infinite for
    898   // |max_requests_to_process|.
    899   size_t max_requests_to_process = 0;
    900   if (max_concurrent_streams_ != 0) {
    901     max_requests_to_process =
    902         max_concurrent_streams_ -
    903         (active_streams_.size() + created_streams_.size());
    904   }
    905   for (size_t i = 0;
    906        max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
    907     base::WeakPtr<SpdyStreamRequest> pending_request =
    908         GetNextPendingStreamRequest();
    909     if (!pending_request)
    910       break;
    911 
    912     // Note that this post can race with other stream creations, and it's
    913     // possible that the un-stalled stream will be stalled again if it loses.
    914     // TODO(jgraettinger): Provide stronger ordering guarantees.
    915     base::MessageLoop::current()->PostTask(
    916         FROM_HERE,
    917         base::Bind(&SpdySession::CompleteStreamRequest,
    918                    weak_factory_.GetWeakPtr(),
    919                    pending_request));
    920   }
    921 }
    922 
    923 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
    924   pooled_aliases_.insert(alias_key);
    925 }
    926 
    927 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
    928   DCHECK(buffered_spdy_framer_.get());
    929   return buffered_spdy_framer_->protocol_version();
    930 }
    931 
    932 bool SpdySession::HasAcceptableTransportSecurity() const {
    933   // If we're not even using TLS, we have no standards to meet.
    934   if (!is_secure_) {
    935     return true;
    936   }
    937 
    938   // We don't enforce transport security standards for older SPDY versions.
    939   if (GetProtocolVersion() < SPDY4) {
    940     return true;
    941   }
    942 
    943   SSLInfo ssl_info;
    944   CHECK(connection_->socket()->GetSSLInfo(&ssl_info));
    945 
    946   // HTTP/2 requires TLS 1.2+
    947   if (SSLConnectionStatusToVersion(ssl_info.connection_status) <
    948       SSL_CONNECTION_VERSION_TLS1_2) {
    949     return false;
    950   }
    951 
    952   if (!IsSecureTLSCipherSuite(
    953           SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) {
    954     return false;
    955   }
    956 
    957   return true;
    958 }
    959 
    960 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
    961   return weak_factory_.GetWeakPtr();
    962 }
    963 
    964 bool SpdySession::CloseOneIdleConnection() {
    965   CHECK(!in_io_loop_);
    966   DCHECK(pool_);
    967   if (active_streams_.empty()) {
    968     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
    969   }
    970   // Return false as the socket wasn't immediately closed.
    971   return false;
    972 }
    973 
    974 void SpdySession::EnqueueStreamWrite(
    975     const base::WeakPtr<SpdyStream>& stream,
    976     SpdyFrameType frame_type,
    977     scoped_ptr<SpdyBufferProducer> producer) {
    978   DCHECK(frame_type == HEADERS ||
    979          frame_type == DATA ||
    980          frame_type == CREDENTIAL ||
    981          frame_type == SYN_STREAM);
    982   EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
    983 }
    984 
    985 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
    986     SpdyStreamId stream_id,
    987     RequestPriority priority,
    988     SpdyControlFlags flags,
    989     const SpdyHeaderBlock& headers) {
    990   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
    991   CHECK(it != active_streams_.end());
    992   CHECK_EQ(it->second.stream->stream_id(), stream_id);
    993 
    994   SendPrefacePingIfNoneInFlight();
    995 
    996   DCHECK(buffered_spdy_framer_.get());
    997   SpdyPriority spdy_priority =
    998       ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
    999   scoped_ptr<SpdyFrame> syn_frame(
   1000       buffered_spdy_framer_->CreateSynStream(stream_id, 0, spdy_priority, flags,
   1001                                              &headers));
   1002 
   1003   base::StatsCounter spdy_requests("spdy.requests");
   1004   spdy_requests.Increment();
   1005   streams_initiated_count_++;
   1006 
   1007   if (net_log().IsLogging()) {
   1008     net_log().AddEvent(
   1009         NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
   1010         base::Bind(&NetLogSpdySynStreamSentCallback, &headers,
   1011                    (flags & CONTROL_FLAG_FIN) != 0,
   1012                    (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
   1013                    spdy_priority,
   1014                    stream_id));
   1015   }
   1016 
   1017   return syn_frame.Pass();
   1018 }
   1019 
   1020 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
   1021                                                      IOBuffer* data,
   1022                                                      int len,
   1023                                                      SpdyDataFlags flags) {
   1024   if (availability_state_ == STATE_DRAINING) {
   1025     return scoped_ptr<SpdyBuffer>();
   1026   }
   1027 
   1028   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   1029   CHECK(it != active_streams_.end());
   1030   SpdyStream* stream = it->second.stream;
   1031   CHECK_EQ(stream->stream_id(), stream_id);
   1032 
   1033   if (len < 0) {
   1034     NOTREACHED();
   1035     return scoped_ptr<SpdyBuffer>();
   1036   }
   1037 
   1038   int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
   1039 
   1040   bool send_stalled_by_stream =
   1041       (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
   1042       (stream->send_window_size() <= 0);
   1043   bool send_stalled_by_session = IsSendStalled();
   1044 
   1045   // NOTE: There's an enum of the same name in histograms.xml.
   1046   enum SpdyFrameFlowControlState {
   1047     SEND_NOT_STALLED,
   1048     SEND_STALLED_BY_STREAM,
   1049     SEND_STALLED_BY_SESSION,
   1050     SEND_STALLED_BY_STREAM_AND_SESSION,
   1051   };
   1052 
   1053   SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
   1054   if (send_stalled_by_stream) {
   1055     if (send_stalled_by_session) {
   1056       frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
   1057     } else {
   1058       frame_flow_control_state = SEND_STALLED_BY_STREAM;
   1059     }
   1060   } else if (send_stalled_by_session) {
   1061     frame_flow_control_state = SEND_STALLED_BY_SESSION;
   1062   }
   1063 
   1064   if (flow_control_state_ == FLOW_CONTROL_STREAM) {
   1065     UMA_HISTOGRAM_ENUMERATION(
   1066         "Net.SpdyFrameStreamFlowControlState",
   1067         frame_flow_control_state,
   1068         SEND_STALLED_BY_STREAM + 1);
   1069   } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   1070     UMA_HISTOGRAM_ENUMERATION(
   1071         "Net.SpdyFrameStreamAndSessionFlowControlState",
   1072         frame_flow_control_state,
   1073         SEND_STALLED_BY_STREAM_AND_SESSION + 1);
   1074   }
   1075 
   1076   // Obey send window size of the stream if stream flow control is
   1077   // enabled.
   1078   if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
   1079     if (send_stalled_by_stream) {
   1080       stream->set_send_stalled_by_flow_control(true);
   1081       // Even though we're currently stalled only by the stream, we
   1082       // might end up being stalled by the session also.
   1083       QueueSendStalledStream(*stream);
   1084       net_log().AddEvent(
   1085           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
   1086           NetLog::IntegerCallback("stream_id", stream_id));
   1087       return scoped_ptr<SpdyBuffer>();
   1088     }
   1089 
   1090     effective_len = std::min(effective_len, stream->send_window_size());
   1091   }
   1092 
   1093   // Obey send window size of the session if session flow control is
   1094   // enabled.
   1095   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   1096     if (send_stalled_by_session) {
   1097       stream->set_send_stalled_by_flow_control(true);
   1098       QueueSendStalledStream(*stream);
   1099       net_log().AddEvent(
   1100           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
   1101           NetLog::IntegerCallback("stream_id", stream_id));
   1102       return scoped_ptr<SpdyBuffer>();
   1103     }
   1104 
   1105     effective_len = std::min(effective_len, session_send_window_size_);
   1106   }
   1107 
   1108   DCHECK_GE(effective_len, 0);
   1109 
   1110   // Clear FIN flag if only some of the data will be in the data
   1111   // frame.
   1112   if (effective_len < len)
   1113     flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
   1114 
   1115   if (net_log().IsLogging()) {
   1116     net_log().AddEvent(
   1117         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
   1118         base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
   1119                    (flags & DATA_FLAG_FIN) != 0));
   1120   }
   1121 
   1122   // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
   1123   if (effective_len > 0)
   1124     SendPrefacePingIfNoneInFlight();
   1125 
   1126   // TODO(mbelshe): reduce memory copies here.
   1127   DCHECK(buffered_spdy_framer_.get());
   1128   scoped_ptr<SpdyFrame> frame(
   1129       buffered_spdy_framer_->CreateDataFrame(
   1130           stream_id, data->data(),
   1131           static_cast<uint32>(effective_len), flags));
   1132 
   1133   scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
   1134 
   1135   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   1136     DecreaseSendWindowSize(static_cast<int32>(effective_len));
   1137     data_buffer->AddConsumeCallback(
   1138         base::Bind(&SpdySession::OnWriteBufferConsumed,
   1139                    weak_factory_.GetWeakPtr(),
   1140                    static_cast<size_t>(effective_len)));
   1141   }
   1142 
   1143   return data_buffer.Pass();
   1144 }
   1145 
   1146 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
   1147   DCHECK_NE(stream_id, 0u);
   1148 
   1149   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1150   if (it == active_streams_.end()) {
   1151     NOTREACHED();
   1152     return;
   1153   }
   1154 
   1155   CloseActiveStreamIterator(it, status);
   1156 }
   1157 
   1158 void SpdySession::CloseCreatedStream(
   1159     const base::WeakPtr<SpdyStream>& stream, int status) {
   1160   DCHECK_EQ(stream->stream_id(), 0u);
   1161 
   1162   CreatedStreamSet::iterator it = created_streams_.find(stream.get());
   1163   if (it == created_streams_.end()) {
   1164     NOTREACHED();
   1165     return;
   1166   }
   1167 
   1168   CloseCreatedStreamIterator(it, status);
   1169 }
   1170 
   1171 void SpdySession::ResetStream(SpdyStreamId stream_id,
   1172                               SpdyRstStreamStatus status,
   1173                               const std::string& description) {
   1174   DCHECK_NE(stream_id, 0u);
   1175 
   1176   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1177   if (it == active_streams_.end()) {
   1178     NOTREACHED();
   1179     return;
   1180   }
   1181 
   1182   ResetStreamIterator(it, status, description);
   1183 }
   1184 
   1185 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
   1186   return ContainsKey(active_streams_, stream_id);
   1187 }
   1188 
   1189 LoadState SpdySession::GetLoadState() const {
   1190   // Just report that we're idle since the session could be doing
   1191   // many things concurrently.
   1192   return LOAD_STATE_IDLE;
   1193 }
   1194 
   1195 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
   1196                                             int status) {
   1197   // TODO(mbelshe): We should send a RST_STREAM control frame here
   1198   //                so that the server can cancel a large send.
   1199 
   1200   scoped_ptr<SpdyStream> owned_stream(it->second.stream);
   1201   active_streams_.erase(it);
   1202 
   1203   // TODO(akalin): When SpdyStream was ref-counted (and
   1204   // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
   1205   // was only done when status was not OK. This meant that pushed
   1206   // streams can still be claimed after they're closed. This is
   1207   // probably something that we still want to support, although server
   1208   // push is hardly used. Write tests for this and fix this. (See
   1209   // http://crbug.com/261712 .)
   1210   if (owned_stream->type() == SPDY_PUSH_STREAM)
   1211     unclaimed_pushed_streams_.erase(owned_stream->url());
   1212 
   1213   DeleteStream(owned_stream.Pass(), status);
   1214   MaybeFinishGoingAway();
   1215 
   1216   // If there are no active streams and the socket pool is stalled, close the
   1217   // session to free up a socket slot.
   1218   if (active_streams_.empty() && connection_->IsPoolStalled()) {
   1219     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
   1220   }
   1221 }
   1222 
   1223 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
   1224                                              int status) {
   1225   scoped_ptr<SpdyStream> owned_stream(*it);
   1226   created_streams_.erase(it);
   1227   DeleteStream(owned_stream.Pass(), status);
   1228 }
   1229 
   1230 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
   1231                                       SpdyRstStreamStatus status,
   1232                                       const std::string& description) {
   1233   // Send the RST_STREAM frame first as CloseActiveStreamIterator()
   1234   // may close us.
   1235   SpdyStreamId stream_id = it->first;
   1236   RequestPriority priority = it->second.stream->priority();
   1237   EnqueueResetStreamFrame(stream_id, priority, status, description);
   1238 
   1239   // Removes any pending writes for the stream except for possibly an
   1240   // in-flight one.
   1241   CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
   1242 }
   1243 
   1244 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
   1245                                           RequestPriority priority,
   1246                                           SpdyRstStreamStatus status,
   1247                                           const std::string& description) {
   1248   DCHECK_NE(stream_id, 0u);
   1249 
   1250   net_log().AddEvent(
   1251       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
   1252       base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
   1253 
   1254   DCHECK(buffered_spdy_framer_.get());
   1255   scoped_ptr<SpdyFrame> rst_frame(
   1256       buffered_spdy_framer_->CreateRstStream(stream_id, status));
   1257 
   1258   EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
   1259   RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
   1260 }
   1261 
   1262 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
   1263   CHECK(!in_io_loop_);
   1264   if (availability_state_ == STATE_DRAINING) {
   1265     return;
   1266   }
   1267   ignore_result(DoReadLoop(expected_read_state, result));
   1268 }
   1269 
   1270 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
   1271   CHECK(!in_io_loop_);
   1272   CHECK_EQ(read_state_, expected_read_state);
   1273 
   1274   in_io_loop_ = true;
   1275 
   1276   int bytes_read_without_yielding = 0;
   1277 
   1278   // Loop until the session is draining, the read becomes blocked, or
   1279   // the read limit is exceeded.
   1280   while (true) {
   1281     switch (read_state_) {
   1282       case READ_STATE_DO_READ:
   1283         CHECK_EQ(result, OK);
   1284         result = DoRead();
   1285         break;
   1286       case READ_STATE_DO_READ_COMPLETE:
   1287         if (result > 0)
   1288           bytes_read_without_yielding += result;
   1289         result = DoReadComplete(result);
   1290         break;
   1291       default:
   1292         NOTREACHED() << "read_state_: " << read_state_;
   1293         break;
   1294     }
   1295 
   1296     if (availability_state_ == STATE_DRAINING)
   1297       break;
   1298 
   1299     if (result == ERR_IO_PENDING)
   1300       break;
   1301 
   1302     if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
   1303       read_state_ = READ_STATE_DO_READ;
   1304       base::MessageLoop::current()->PostTask(
   1305           FROM_HERE,
   1306           base::Bind(&SpdySession::PumpReadLoop,
   1307                      weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
   1308       result = ERR_IO_PENDING;
   1309       break;
   1310     }
   1311   }
   1312 
   1313   CHECK(in_io_loop_);
   1314   in_io_loop_ = false;
   1315 
   1316   return result;
   1317 }
   1318 
   1319 int SpdySession::DoRead() {
   1320   CHECK(in_io_loop_);
   1321 
   1322   CHECK(connection_);
   1323   CHECK(connection_->socket());
   1324   read_state_ = READ_STATE_DO_READ_COMPLETE;
   1325   return connection_->socket()->Read(
   1326       read_buffer_.get(),
   1327       kReadBufferSize,
   1328       base::Bind(&SpdySession::PumpReadLoop,
   1329                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
   1330 }
   1331 
   1332 int SpdySession::DoReadComplete(int result) {
   1333   CHECK(in_io_loop_);
   1334 
   1335   // Parse a frame.  For now this code requires that the frame fit into our
   1336   // buffer (kReadBufferSize).
   1337   // TODO(mbelshe): support arbitrarily large frames!
   1338 
   1339   if (result == 0) {
   1340     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
   1341                                 total_bytes_received_, 1, 100000000, 50);
   1342     DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
   1343 
   1344     return ERR_CONNECTION_CLOSED;
   1345   }
   1346 
   1347   if (result < 0) {
   1348     DoDrainSession(static_cast<Error>(result), "result is < 0.");
   1349     return result;
   1350   }
   1351   CHECK_LE(result, kReadBufferSize);
   1352   total_bytes_received_ += result;
   1353 
   1354   last_activity_time_ = time_func_();
   1355 
   1356   DCHECK(buffered_spdy_framer_.get());
   1357   char* data = read_buffer_->data();
   1358   while (result > 0) {
   1359     uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
   1360     result -= bytes_processed;
   1361     data += bytes_processed;
   1362 
   1363     if (availability_state_ == STATE_DRAINING) {
   1364       return ERR_CONNECTION_CLOSED;
   1365     }
   1366 
   1367     DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
   1368   }
   1369 
   1370   read_state_ = READ_STATE_DO_READ;
   1371   return OK;
   1372 }
   1373 
   1374 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
   1375   CHECK(!in_io_loop_);
   1376   DCHECK_EQ(write_state_, expected_write_state);
   1377 
   1378   DoWriteLoop(expected_write_state, result);
   1379 
   1380   if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
   1381       write_queue_.IsEmpty()) {
   1382     pool_->RemoveUnavailableSession(GetWeakPtr());  // Destroys |this|.
   1383     return;
   1384   }
   1385 }
   1386 
   1387 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
   1388   CHECK(!in_io_loop_);
   1389   DCHECK_NE(write_state_, WRITE_STATE_IDLE);
   1390   DCHECK_EQ(write_state_, expected_write_state);
   1391 
   1392   in_io_loop_ = true;
   1393 
   1394   // Loop until the session is closed or the write becomes blocked.
   1395   while (true) {
   1396     switch (write_state_) {
   1397       case WRITE_STATE_DO_WRITE:
   1398         DCHECK_EQ(result, OK);
   1399         result = DoWrite();
   1400         break;
   1401       case WRITE_STATE_DO_WRITE_COMPLETE:
   1402         result = DoWriteComplete(result);
   1403         break;
   1404       case WRITE_STATE_IDLE:
   1405       default:
   1406         NOTREACHED() << "write_state_: " << write_state_;
   1407         break;
   1408     }
   1409 
   1410     if (write_state_ == WRITE_STATE_IDLE) {
   1411       DCHECK_EQ(result, ERR_IO_PENDING);
   1412       break;
   1413     }
   1414 
   1415     if (result == ERR_IO_PENDING)
   1416       break;
   1417   }
   1418 
   1419   CHECK(in_io_loop_);
   1420   in_io_loop_ = false;
   1421 
   1422   return result;
   1423 }
   1424 
   1425 int SpdySession::DoWrite() {
   1426   CHECK(in_io_loop_);
   1427 
   1428   DCHECK(buffered_spdy_framer_);
   1429   if (in_flight_write_) {
   1430     DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
   1431   } else {
   1432     // Grab the next frame to send.
   1433     SpdyFrameType frame_type = DATA;
   1434     scoped_ptr<SpdyBufferProducer> producer;
   1435     base::WeakPtr<SpdyStream> stream;
   1436     if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
   1437       write_state_ = WRITE_STATE_IDLE;
   1438       return ERR_IO_PENDING;
   1439     }
   1440 
   1441     if (stream.get())
   1442       CHECK(!stream->IsClosed());
   1443 
   1444     // Activate the stream only when sending the SYN_STREAM frame to
   1445     // guarantee monotonically-increasing stream IDs.
   1446     if (frame_type == SYN_STREAM) {
   1447       CHECK(stream.get());
   1448       CHECK_EQ(stream->stream_id(), 0u);
   1449       scoped_ptr<SpdyStream> owned_stream =
   1450           ActivateCreatedStream(stream.get());
   1451       InsertActivatedStream(owned_stream.Pass());
   1452 
   1453       if (stream_hi_water_mark_ > kLastStreamId) {
   1454         CHECK_EQ(stream->stream_id(), kLastStreamId);
   1455         // We've exhausted the stream ID space, and no new streams may be
   1456         // created after this one.
   1457         MakeUnavailable();
   1458         StartGoingAway(kLastStreamId, ERR_ABORTED);
   1459       }
   1460     }
   1461 
   1462     in_flight_write_ = producer->ProduceBuffer();
   1463     if (!in_flight_write_) {
   1464       NOTREACHED();
   1465       return ERR_UNEXPECTED;
   1466     }
   1467     in_flight_write_frame_type_ = frame_type;
   1468     in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
   1469     DCHECK_GE(in_flight_write_frame_size_,
   1470               buffered_spdy_framer_->GetFrameMinimumSize());
   1471     in_flight_write_stream_ = stream;
   1472   }
   1473 
   1474   write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
   1475 
   1476   // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
   1477   // with Socket implementations that don't store their IOBuffer
   1478   // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
   1479   scoped_refptr<IOBuffer> write_io_buffer =
   1480       in_flight_write_->GetIOBufferForRemainingData();
   1481   return connection_->socket()->Write(
   1482       write_io_buffer.get(),
   1483       in_flight_write_->GetRemainingSize(),
   1484       base::Bind(&SpdySession::PumpWriteLoop,
   1485                  weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
   1486 }
   1487 
   1488 int SpdySession::DoWriteComplete(int result) {
   1489   CHECK(in_io_loop_);
   1490   DCHECK_NE(result, ERR_IO_PENDING);
   1491   DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
   1492 
   1493   last_activity_time_ = time_func_();
   1494 
   1495   if (result < 0) {
   1496     DCHECK_NE(result, ERR_IO_PENDING);
   1497     in_flight_write_.reset();
   1498     in_flight_write_frame_type_ = DATA;
   1499     in_flight_write_frame_size_ = 0;
   1500     in_flight_write_stream_.reset();
   1501     write_state_ = WRITE_STATE_DO_WRITE;
   1502     DoDrainSession(static_cast<Error>(result), "Write error");
   1503     return OK;
   1504   }
   1505 
   1506   // It should not be possible to have written more bytes than our
   1507   // in_flight_write_.
   1508   DCHECK_LE(static_cast<size_t>(result),
   1509             in_flight_write_->GetRemainingSize());
   1510 
   1511   if (result > 0) {
   1512     in_flight_write_->Consume(static_cast<size_t>(result));
   1513 
   1514     // We only notify the stream when we've fully written the pending frame.
   1515     if (in_flight_write_->GetRemainingSize() == 0) {
   1516       // It is possible that the stream was cancelled while we were
   1517       // writing to the socket.
   1518       if (in_flight_write_stream_.get()) {
   1519         DCHECK_GT(in_flight_write_frame_size_, 0u);
   1520         in_flight_write_stream_->OnFrameWriteComplete(
   1521             in_flight_write_frame_type_,
   1522             in_flight_write_frame_size_);
   1523       }
   1524 
   1525       // Cleanup the write which just completed.
   1526       in_flight_write_.reset();
   1527       in_flight_write_frame_type_ = DATA;
   1528       in_flight_write_frame_size_ = 0;
   1529       in_flight_write_stream_.reset();
   1530     }
   1531   }
   1532 
   1533   write_state_ = WRITE_STATE_DO_WRITE;
   1534   return OK;
   1535 }
   1536 
   1537 void SpdySession::DcheckGoingAway() const {
   1538 #if DCHECK_IS_ON
   1539   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
   1540   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
   1541     DCHECK(pending_create_stream_queues_[i].empty());
   1542   }
   1543   DCHECK(created_streams_.empty());
   1544 #endif
   1545 }
   1546 
   1547 void SpdySession::DcheckDraining() const {
   1548   DcheckGoingAway();
   1549   DCHECK_EQ(availability_state_, STATE_DRAINING);
   1550   DCHECK(active_streams_.empty());
   1551   DCHECK(unclaimed_pushed_streams_.empty());
   1552 }
   1553 
   1554 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
   1555                                  Error status) {
   1556   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
   1557 
   1558   // The loops below are carefully written to avoid reentrancy problems.
   1559 
   1560   while (true) {
   1561     size_t old_size = GetTotalSize(pending_create_stream_queues_);
   1562     base::WeakPtr<SpdyStreamRequest> pending_request =
   1563         GetNextPendingStreamRequest();
   1564     if (!pending_request)
   1565       break;
   1566     // No new stream requests should be added while the session is
   1567     // going away.
   1568     DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
   1569     pending_request->OnRequestCompleteFailure(ERR_ABORTED);
   1570   }
   1571 
   1572   while (true) {
   1573     size_t old_size = active_streams_.size();
   1574     ActiveStreamMap::iterator it =
   1575         active_streams_.lower_bound(last_good_stream_id + 1);
   1576     if (it == active_streams_.end())
   1577       break;
   1578     LogAbandonedActiveStream(it, status);
   1579     CloseActiveStreamIterator(it, status);
   1580     // No new streams should be activated while the session is going
   1581     // away.
   1582     DCHECK_GT(old_size, active_streams_.size());
   1583   }
   1584 
   1585   while (!created_streams_.empty()) {
   1586     size_t old_size = created_streams_.size();
   1587     CreatedStreamSet::iterator it = created_streams_.begin();
   1588     LogAbandonedStream(*it, status);
   1589     CloseCreatedStreamIterator(it, status);
   1590     // No new streams should be created while the session is going
   1591     // away.
   1592     DCHECK_GT(old_size, created_streams_.size());
   1593   }
   1594 
   1595   write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
   1596 
   1597   DcheckGoingAway();
   1598 }
   1599 
   1600 void SpdySession::MaybeFinishGoingAway() {
   1601   if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
   1602     DoDrainSession(OK, "Finished going away");
   1603   }
   1604 }
   1605 
   1606 void SpdySession::DoDrainSession(Error err, const std::string& description) {
   1607   if (availability_state_ == STATE_DRAINING) {
   1608     return;
   1609   }
   1610   MakeUnavailable();
   1611 
   1612   // If |err| indicates an error occurred, inform the peer that we're closing
   1613   // and why. Don't GOAWAY on a graceful or idle close, as that may
   1614   // unnecessarily wake the radio. We could technically GOAWAY on network errors
   1615   // (we'll probably fail to actually write it, but that's okay), however many
   1616   // unit-tests would need to be updated.
   1617   if (err != OK &&
   1618       err != ERR_ABORTED &&  // Used by SpdySessionPool to close idle sessions.
   1619       err != ERR_NETWORK_CHANGED &&  // Used to deprecate sessions on IP change.
   1620       err != ERR_SOCKET_NOT_CONNECTED &&
   1621       err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) {
   1622     // Enqueue a GOAWAY to inform the peer of why we're closing the connection.
   1623     SpdyGoAwayIR goaway_ir(0,  // Last accepted stream ID.
   1624                            MapNetErrorToGoAwayStatus(err),
   1625                            description);
   1626     EnqueueSessionWrite(HIGHEST,
   1627                         GOAWAY,
   1628                         scoped_ptr<SpdyFrame>(
   1629                             buffered_spdy_framer_->SerializeFrame(goaway_ir)));
   1630   }
   1631 
   1632   availability_state_ = STATE_DRAINING;
   1633   error_on_close_ = err;
   1634 
   1635   net_log_.AddEvent(
   1636       NetLog::TYPE_SPDY_SESSION_CLOSE,
   1637       base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
   1638 
   1639   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
   1640   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
   1641                               total_bytes_received_, 1, 100000000, 50);
   1642 
   1643   if (err == OK) {
   1644     // We ought to be going away already, as this is a graceful close.
   1645     DcheckGoingAway();
   1646   } else {
   1647     StartGoingAway(0, err);
   1648   }
   1649   DcheckDraining();
   1650   MaybePostWriteLoop();
   1651 }
   1652 
   1653 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
   1654   DCHECK(stream);
   1655   std::string description = base::StringPrintf(
   1656       "ABANDONED (stream_id=%d): ", stream->stream_id()) +
   1657       stream->url().spec();
   1658   stream->LogStreamError(status, description);
   1659   // We don't increment the streams abandoned counter here. If the
   1660   // stream isn't active (i.e., it hasn't written anything to the wire
   1661   // yet) then it's as if it never existed. If it is active, then
   1662   // LogAbandonedActiveStream() will increment the counters.
   1663 }
   1664 
   1665 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
   1666                                            Error status) {
   1667   DCHECK_GT(it->first, 0u);
   1668   LogAbandonedStream(it->second.stream, status);
   1669   ++streams_abandoned_count_;
   1670   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
   1671   abandoned_streams.Increment();
   1672   if (it->second.stream->type() == SPDY_PUSH_STREAM &&
   1673       unclaimed_pushed_streams_.find(it->second.stream->url()) !=
   1674       unclaimed_pushed_streams_.end()) {
   1675     base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
   1676     abandoned_push_streams.Increment();
   1677   }
   1678 }
   1679 
   1680 SpdyStreamId SpdySession::GetNewStreamId() {
   1681   CHECK_LE(stream_hi_water_mark_, kLastStreamId);
   1682   SpdyStreamId id = stream_hi_water_mark_;
   1683   stream_hi_water_mark_ += 2;
   1684   return id;
   1685 }
   1686 
   1687 void SpdySession::CloseSessionOnError(Error err,
   1688                                       const std::string& description) {
   1689   DCHECK_LT(err, ERR_IO_PENDING);
   1690   DoDrainSession(err, description);
   1691 }
   1692 
   1693 void SpdySession::MakeUnavailable() {
   1694   if (availability_state_ == STATE_AVAILABLE) {
   1695     availability_state_ = STATE_GOING_AWAY;
   1696     pool_->MakeSessionUnavailable(GetWeakPtr());
   1697   }
   1698 }
   1699 
   1700 base::Value* SpdySession::GetInfoAsValue() const {
   1701   base::DictionaryValue* dict = new base::DictionaryValue();
   1702 
   1703   dict->SetInteger("source_id", net_log_.source().id);
   1704 
   1705   dict->SetString("host_port_pair", host_port_pair().ToString());
   1706   if (!pooled_aliases_.empty()) {
   1707     base::ListValue* alias_list = new base::ListValue();
   1708     for (std::set<SpdySessionKey>::const_iterator it =
   1709              pooled_aliases_.begin();
   1710          it != pooled_aliases_.end(); it++) {
   1711       alias_list->Append(new base::StringValue(
   1712           it->host_port_pair().ToString()));
   1713     }
   1714     dict->Set("aliases", alias_list);
   1715   }
   1716   dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
   1717 
   1718   dict->SetInteger("active_streams", active_streams_.size());
   1719 
   1720   dict->SetInteger("unclaimed_pushed_streams",
   1721                    unclaimed_pushed_streams_.size());
   1722 
   1723   dict->SetBoolean("is_secure", is_secure_);
   1724 
   1725   dict->SetString("protocol_negotiated",
   1726                   SSLClientSocket::NextProtoToString(
   1727                       connection_->socket()->GetNegotiatedProtocol()));
   1728 
   1729   dict->SetInteger("error", error_on_close_);
   1730   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
   1731 
   1732   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
   1733   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
   1734   dict->SetInteger("streams_pushed_and_claimed_count",
   1735       streams_pushed_and_claimed_count_);
   1736   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
   1737   DCHECK(buffered_spdy_framer_.get());
   1738   dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
   1739 
   1740   dict->SetBoolean("sent_settings", sent_settings_);
   1741   dict->SetBoolean("received_settings", received_settings_);
   1742 
   1743   dict->SetInteger("send_window_size", session_send_window_size_);
   1744   dict->SetInteger("recv_window_size", session_recv_window_size_);
   1745   dict->SetInteger("unacked_recv_window_bytes",
   1746                    session_unacked_recv_window_bytes_);
   1747   return dict;
   1748 }
   1749 
   1750 bool SpdySession::IsReused() const {
   1751   return buffered_spdy_framer_->frames_received() > 0 ||
   1752       connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
   1753 }
   1754 
   1755 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
   1756                                     LoadTimingInfo* load_timing_info) const {
   1757   return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
   1758                                         load_timing_info);
   1759 }
   1760 
   1761 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
   1762   int rv = ERR_SOCKET_NOT_CONNECTED;
   1763   if (connection_->socket()) {
   1764     rv = connection_->socket()->GetPeerAddress(address);
   1765   }
   1766 
   1767   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
   1768                         rv == ERR_SOCKET_NOT_CONNECTED);
   1769 
   1770   return rv;
   1771 }
   1772 
   1773 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
   1774   int rv = ERR_SOCKET_NOT_CONNECTED;
   1775   if (connection_->socket()) {
   1776     rv = connection_->socket()->GetLocalAddress(address);
   1777   }
   1778 
   1779   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
   1780                         rv == ERR_SOCKET_NOT_CONNECTED);
   1781 
   1782   return rv;
   1783 }
   1784 
   1785 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
   1786                                       SpdyFrameType frame_type,
   1787                                       scoped_ptr<SpdyFrame> frame) {
   1788   DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS ||
   1789          frame_type == WINDOW_UPDATE || frame_type == PING ||
   1790          frame_type == GOAWAY);
   1791   EnqueueWrite(
   1792       priority, frame_type,
   1793       scoped_ptr<SpdyBufferProducer>(
   1794           new SimpleBufferProducer(
   1795               scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
   1796       base::WeakPtr<SpdyStream>());
   1797 }
   1798 
   1799 void SpdySession::EnqueueWrite(RequestPriority priority,
   1800                                SpdyFrameType frame_type,
   1801                                scoped_ptr<SpdyBufferProducer> producer,
   1802                                const base::WeakPtr<SpdyStream>& stream) {
   1803   if (availability_state_ == STATE_DRAINING)
   1804     return;
   1805 
   1806   write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
   1807   MaybePostWriteLoop();
   1808 }
   1809 
   1810 void SpdySession::MaybePostWriteLoop() {
   1811   if (write_state_ == WRITE_STATE_IDLE) {
   1812     CHECK(!in_flight_write_);
   1813     write_state_ = WRITE_STATE_DO_WRITE;
   1814     base::MessageLoop::current()->PostTask(
   1815         FROM_HERE,
   1816         base::Bind(&SpdySession::PumpWriteLoop,
   1817                    weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
   1818   }
   1819 }
   1820 
   1821 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
   1822   CHECK_EQ(stream->stream_id(), 0u);
   1823   CHECK(created_streams_.find(stream.get()) == created_streams_.end());
   1824   created_streams_.insert(stream.release());
   1825 }
   1826 
   1827 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
   1828   CHECK_EQ(stream->stream_id(), 0u);
   1829   CHECK(created_streams_.find(stream) != created_streams_.end());
   1830   stream->set_stream_id(GetNewStreamId());
   1831   scoped_ptr<SpdyStream> owned_stream(stream);
   1832   created_streams_.erase(stream);
   1833   return owned_stream.Pass();
   1834 }
   1835 
   1836 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
   1837   SpdyStreamId stream_id = stream->stream_id();
   1838   CHECK_NE(stream_id, 0u);
   1839   std::pair<ActiveStreamMap::iterator, bool> result =
   1840       active_streams_.insert(
   1841           std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
   1842   CHECK(result.second);
   1843   ignore_result(stream.release());
   1844 }
   1845 
   1846 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
   1847   if (in_flight_write_stream_.get() == stream.get()) {
   1848     // If we're deleting the stream for the in-flight write, we still
   1849     // need to let the write complete, so we clear
   1850     // |in_flight_write_stream_| and let the write finish on its own
   1851     // without notifying |in_flight_write_stream_|.
   1852     in_flight_write_stream_.reset();
   1853   }
   1854 
   1855   write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
   1856   stream->OnClose(status);
   1857 
   1858   if (availability_state_ == STATE_AVAILABLE) {
   1859     ProcessPendingStreamRequests();
   1860   }
   1861 }
   1862 
   1863 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
   1864   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
   1865 
   1866   PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
   1867   if (unclaimed_it == unclaimed_pushed_streams_.end())
   1868     return base::WeakPtr<SpdyStream>();
   1869 
   1870   SpdyStreamId stream_id = unclaimed_it->second.stream_id;
   1871   unclaimed_pushed_streams_.erase(unclaimed_it);
   1872 
   1873   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
   1874   if (active_it == active_streams_.end()) {
   1875     NOTREACHED();
   1876     return base::WeakPtr<SpdyStream>();
   1877   }
   1878 
   1879   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
   1880   used_push_streams.Increment();
   1881   return active_it->second.stream->GetWeakPtr();
   1882 }
   1883 
   1884 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
   1885                              bool* was_npn_negotiated,
   1886                              NextProto* protocol_negotiated) {
   1887   *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
   1888   *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
   1889   return connection_->socket()->GetSSLInfo(ssl_info);
   1890 }
   1891 
   1892 bool SpdySession::GetSSLCertRequestInfo(
   1893     SSLCertRequestInfo* cert_request_info) {
   1894   if (!is_secure_)
   1895     return false;
   1896   GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
   1897   return true;
   1898 }
   1899 
   1900 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
   1901   CHECK(in_io_loop_);
   1902 
   1903   RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
   1904   std::string description =
   1905       base::StringPrintf("Framer error: %d (%s).",
   1906                          error_code,
   1907                          SpdyFramer::ErrorCodeToString(error_code));
   1908   DoDrainSession(MapFramerErrorToNetError(error_code), description);
   1909 }
   1910 
   1911 void SpdySession::OnStreamError(SpdyStreamId stream_id,
   1912                                 const std::string& description) {
   1913   CHECK(in_io_loop_);
   1914 
   1915   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1916   if (it == active_streams_.end()) {
   1917     // We still want to send a frame to reset the stream even if we
   1918     // don't know anything about it.
   1919     EnqueueResetStreamFrame(
   1920         stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
   1921     return;
   1922   }
   1923 
   1924   ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
   1925 }
   1926 
   1927 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
   1928                                     size_t length,
   1929                                     bool fin) {
   1930   CHECK(in_io_loop_);
   1931 
   1932   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1933 
   1934   // By the time data comes in, the stream may already be inactive.
   1935   if (it == active_streams_.end())
   1936     return;
   1937 
   1938   SpdyStream* stream = it->second.stream;
   1939   CHECK_EQ(stream->stream_id(), stream_id);
   1940 
   1941   DCHECK(buffered_spdy_framer_);
   1942   size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
   1943   stream->IncrementRawReceivedBytes(header_len);
   1944 }
   1945 
   1946 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
   1947                                     const char* data,
   1948                                     size_t len,
   1949                                     bool fin) {
   1950   CHECK(in_io_loop_);
   1951 
   1952   if (data == NULL && len != 0) {
   1953     // This is notification of consumed data padding.
   1954     // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
   1955     // See crbug.com/353012.
   1956     return;
   1957   }
   1958 
   1959   DCHECK_LT(len, 1u << 24);
   1960   if (net_log().IsLogging()) {
   1961     net_log().AddEvent(
   1962         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
   1963         base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
   1964   }
   1965 
   1966   // Build the buffer as early as possible so that we go through the
   1967   // session flow control checks and update
   1968   // |unacked_recv_window_bytes_| properly even when the stream is
   1969   // inactive (since the other side has still reduced its session send
   1970   // window).
   1971   scoped_ptr<SpdyBuffer> buffer;
   1972   if (data) {
   1973     DCHECK_GT(len, 0u);
   1974     CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
   1975     buffer.reset(new SpdyBuffer(data, len));
   1976 
   1977     if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   1978       DecreaseRecvWindowSize(static_cast<int32>(len));
   1979       buffer->AddConsumeCallback(
   1980           base::Bind(&SpdySession::OnReadBufferConsumed,
   1981                      weak_factory_.GetWeakPtr()));
   1982     }
   1983   } else {
   1984     DCHECK_EQ(len, 0u);
   1985   }
   1986 
   1987   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1988 
   1989   // By the time data comes in, the stream may already be inactive.
   1990   if (it == active_streams_.end())
   1991     return;
   1992 
   1993   SpdyStream* stream = it->second.stream;
   1994   CHECK_EQ(stream->stream_id(), stream_id);
   1995 
   1996   stream->IncrementRawReceivedBytes(len);
   1997 
   1998   if (it->second.waiting_for_syn_reply) {
   1999     const std::string& error = "Data received before SYN_REPLY.";
   2000     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   2001     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
   2002     return;
   2003   }
   2004 
   2005   stream->OnDataReceived(buffer.Pass());
   2006 }
   2007 
   2008 void SpdySession::OnSettings(bool clear_persisted) {
   2009   CHECK(in_io_loop_);
   2010 
   2011   if (clear_persisted)
   2012     http_server_properties_->ClearSpdySettings(host_port_pair());
   2013 
   2014   if (net_log_.IsLogging()) {
   2015     net_log_.AddEvent(
   2016         NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
   2017         base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
   2018                    clear_persisted));
   2019   }
   2020 
   2021   if (GetProtocolVersion() >= SPDY4) {
   2022     // Send an acknowledgment of the setting.
   2023     SpdySettingsIR settings_ir;
   2024     settings_ir.set_is_ack(true);
   2025     EnqueueSessionWrite(
   2026         HIGHEST,
   2027         SETTINGS,
   2028         scoped_ptr<SpdyFrame>(
   2029             buffered_spdy_framer_->SerializeFrame(settings_ir)));
   2030   }
   2031 }
   2032 
   2033 void SpdySession::OnSetting(SpdySettingsIds id,
   2034                             uint8 flags,
   2035                             uint32 value) {
   2036   CHECK(in_io_loop_);
   2037 
   2038   HandleSetting(id, value);
   2039   http_server_properties_->SetSpdySetting(
   2040       host_port_pair(),
   2041       id,
   2042       static_cast<SpdySettingsFlags>(flags),
   2043       value);
   2044   received_settings_ = true;
   2045 
   2046   // Log the setting.
   2047   net_log_.AddEvent(
   2048       NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
   2049       base::Bind(&NetLogSpdySettingCallback,
   2050                  id, static_cast<SpdySettingsFlags>(flags), value));
   2051 }
   2052 
   2053 void SpdySession::OnSendCompressedFrame(
   2054     SpdyStreamId stream_id,
   2055     SpdyFrameType type,
   2056     size_t payload_len,
   2057     size_t frame_len) {
   2058   if (type != SYN_STREAM)
   2059     return;
   2060 
   2061   DCHECK(buffered_spdy_framer_.get());
   2062   size_t compressed_len =
   2063       frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
   2064 
   2065   if (payload_len) {
   2066     // Make sure we avoid early decimal truncation.
   2067     int compression_pct = 100 - (100 * compressed_len) / payload_len;
   2068     UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
   2069                              compression_pct);
   2070   }
   2071 }
   2072 
   2073 void SpdySession::OnReceiveCompressedFrame(
   2074     SpdyStreamId stream_id,
   2075     SpdyFrameType type,
   2076     size_t frame_len) {
   2077   last_compressed_frame_len_ = frame_len;
   2078 }
   2079 
   2080 int SpdySession::OnInitialResponseHeadersReceived(
   2081     const SpdyHeaderBlock& response_headers,
   2082     base::Time response_time,
   2083     base::TimeTicks recv_first_byte_time,
   2084     SpdyStream* stream) {
   2085   CHECK(in_io_loop_);
   2086   SpdyStreamId stream_id = stream->stream_id();
   2087   // May invalidate |stream|.
   2088   int rv = stream->OnInitialResponseHeadersReceived(
   2089       response_headers, response_time, recv_first_byte_time);
   2090   if (rv < 0) {
   2091     DCHECK_NE(rv, ERR_IO_PENDING);
   2092     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
   2093   }
   2094   return rv;
   2095 }
   2096 
   2097 void SpdySession::OnSynStream(SpdyStreamId stream_id,
   2098                               SpdyStreamId associated_stream_id,
   2099                               SpdyPriority priority,
   2100                               bool fin,
   2101                               bool unidirectional,
   2102                               const SpdyHeaderBlock& headers) {
   2103   CHECK(in_io_loop_);
   2104 
   2105   if (GetProtocolVersion() >= SPDY4) {
   2106     DCHECK_EQ(0u, associated_stream_id);
   2107     OnHeaders(stream_id, fin, headers);
   2108     return;
   2109   }
   2110 
   2111   base::Time response_time = base::Time::Now();
   2112   base::TimeTicks recv_first_byte_time = time_func_();
   2113 
   2114   if (net_log_.IsLogging()) {
   2115     net_log_.AddEvent(
   2116         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
   2117         base::Bind(&NetLogSpdySynStreamReceivedCallback,
   2118                    &headers, fin, unidirectional, priority,
   2119                    stream_id, associated_stream_id));
   2120   }
   2121 
   2122   // Split headers to simulate push promise and response.
   2123   SpdyHeaderBlock request_headers;
   2124   SpdyHeaderBlock response_headers;
   2125   SplitPushedHeadersToRequestAndResponse(
   2126       headers, GetProtocolVersion(), &request_headers, &response_headers);
   2127 
   2128   if (!TryCreatePushStream(
   2129           stream_id, associated_stream_id, priority, request_headers))
   2130     return;
   2131 
   2132   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
   2133   if (active_it == active_streams_.end()) {
   2134     NOTREACHED();
   2135     return;
   2136   }
   2137 
   2138   if (OnInitialResponseHeadersReceived(response_headers,
   2139                                        response_time,
   2140                                        recv_first_byte_time,
   2141                                        active_it->second.stream) != OK)
   2142     return;
   2143 
   2144   base::StatsCounter push_requests("spdy.pushed_streams");
   2145   push_requests.Increment();
   2146 }
   2147 
   2148 void SpdySession::DeleteExpiredPushedStreams() {
   2149   if (unclaimed_pushed_streams_.empty())
   2150     return;
   2151 
   2152   // Check that adequate time has elapsed since the last sweep.
   2153   if (time_func_() < next_unclaimed_push_stream_sweep_time_)
   2154     return;
   2155 
   2156   // Gather old streams to delete.
   2157   base::TimeTicks minimum_freshness = time_func_() -
   2158       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
   2159   std::vector<SpdyStreamId> streams_to_close;
   2160   for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
   2161        it != unclaimed_pushed_streams_.end(); ++it) {
   2162     if (minimum_freshness > it->second.creation_time)
   2163       streams_to_close.push_back(it->second.stream_id);
   2164   }
   2165 
   2166   for (std::vector<SpdyStreamId>::const_iterator to_close_it =
   2167            streams_to_close.begin();
   2168        to_close_it != streams_to_close.end(); ++to_close_it) {
   2169     ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
   2170     if (active_it == active_streams_.end())
   2171       continue;
   2172 
   2173     LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
   2174     // CloseActiveStreamIterator() will remove the stream from
   2175     // |unclaimed_pushed_streams_|.
   2176     ResetStreamIterator(
   2177         active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
   2178   }
   2179 
   2180   next_unclaimed_push_stream_sweep_time_ = time_func_() +
   2181       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
   2182 }
   2183 
   2184 void SpdySession::OnSynReply(SpdyStreamId stream_id,
   2185                              bool fin,
   2186                              const SpdyHeaderBlock& headers) {
   2187   CHECK(in_io_loop_);
   2188 
   2189   base::Time response_time = base::Time::Now();
   2190   base::TimeTicks recv_first_byte_time = time_func_();
   2191 
   2192   if (net_log().IsLogging()) {
   2193     net_log().AddEvent(
   2194         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
   2195         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
   2196                    &headers, fin, stream_id));
   2197   }
   2198 
   2199   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2200   if (it == active_streams_.end()) {
   2201     // NOTE:  it may just be that the stream was cancelled.
   2202     return;
   2203   }
   2204 
   2205   SpdyStream* stream = it->second.stream;
   2206   CHECK_EQ(stream->stream_id(), stream_id);
   2207 
   2208   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
   2209   last_compressed_frame_len_ = 0;
   2210 
   2211   if (GetProtocolVersion() >= SPDY4) {
   2212     const std::string& error =
   2213         "SPDY4 wasn't expecting SYN_REPLY.";
   2214     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   2215     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
   2216     return;
   2217   }
   2218   if (!it->second.waiting_for_syn_reply) {
   2219     const std::string& error =
   2220         "Received duplicate SYN_REPLY for stream.";
   2221     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   2222     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
   2223     return;
   2224   }
   2225   it->second.waiting_for_syn_reply = false;
   2226 
   2227   ignore_result(OnInitialResponseHeadersReceived(
   2228       headers, response_time, recv_first_byte_time, stream));
   2229 }
   2230 
   2231 void SpdySession::OnHeaders(SpdyStreamId stream_id,
   2232                             bool fin,
   2233                             const SpdyHeaderBlock& headers) {
   2234   CHECK(in_io_loop_);
   2235 
   2236   if (net_log().IsLogging()) {
   2237     net_log().AddEvent(
   2238         NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
   2239         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
   2240                    &headers, fin, stream_id));
   2241   }
   2242 
   2243   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2244   if (it == active_streams_.end()) {
   2245     // NOTE:  it may just be that the stream was cancelled.
   2246     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
   2247     return;
   2248   }
   2249 
   2250   SpdyStream* stream = it->second.stream;
   2251   CHECK_EQ(stream->stream_id(), stream_id);
   2252 
   2253   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
   2254   last_compressed_frame_len_ = 0;
   2255 
   2256   base::Time response_time = base::Time::Now();
   2257   base::TimeTicks recv_first_byte_time = time_func_();
   2258 
   2259   if (it->second.waiting_for_syn_reply) {
   2260     if (GetProtocolVersion() < SPDY4) {
   2261       const std::string& error =
   2262           "Was expecting SYN_REPLY, not HEADERS.";
   2263       stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   2264       ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
   2265       return;
   2266     }
   2267 
   2268     it->second.waiting_for_syn_reply = false;
   2269     ignore_result(OnInitialResponseHeadersReceived(
   2270         headers, response_time, recv_first_byte_time, stream));
   2271   } else if (it->second.stream->IsReservedRemote()) {
   2272     ignore_result(OnInitialResponseHeadersReceived(
   2273         headers, response_time, recv_first_byte_time, stream));
   2274   } else {
   2275     int rv = stream->OnAdditionalResponseHeadersReceived(headers);
   2276     if (rv < 0) {
   2277       DCHECK_NE(rv, ERR_IO_PENDING);
   2278       DCHECK(active_streams_.find(stream_id) == active_streams_.end());
   2279     }
   2280   }
   2281 }
   2282 
   2283 void SpdySession::OnRstStream(SpdyStreamId stream_id,
   2284                               SpdyRstStreamStatus status) {
   2285   CHECK(in_io_loop_);
   2286 
   2287   std::string description;
   2288   net_log().AddEvent(
   2289       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
   2290       base::Bind(&NetLogSpdyRstCallback,
   2291                  stream_id, status, &description));
   2292 
   2293   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2294   if (it == active_streams_.end()) {
   2295     // NOTE:  it may just be that the stream was cancelled.
   2296     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
   2297     return;
   2298   }
   2299 
   2300   CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2301 
   2302   if (status == 0) {
   2303     it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
   2304   } else if (status == RST_STREAM_REFUSED_STREAM) {
   2305     CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
   2306   } else {
   2307     RecordProtocolErrorHistogram(
   2308         PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
   2309     it->second.stream->LogStreamError(
   2310         ERR_SPDY_PROTOCOL_ERROR,
   2311         base::StringPrintf("SPDY stream closed with status: %d", status));
   2312     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
   2313     //                For now, it doesn't matter much - it is a protocol error.
   2314     CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
   2315   }
   2316 }
   2317 
   2318 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
   2319                            SpdyGoAwayStatus status) {
   2320   CHECK(in_io_loop_);
   2321 
   2322   // TODO(jgraettinger): UMA histogram on |status|.
   2323 
   2324   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
   2325       base::Bind(&NetLogSpdyGoAwayCallback,
   2326                  last_accepted_stream_id,
   2327                  active_streams_.size(),
   2328                  unclaimed_pushed_streams_.size(),
   2329                  status));
   2330   MakeUnavailable();
   2331   StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
   2332   // This is to handle the case when we already don't have any active
   2333   // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
   2334   // active streams and so the last one being closed will finish the
   2335   // going away process (see DeleteStream()).
   2336   MaybeFinishGoingAway();
   2337 }
   2338 
   2339 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
   2340   CHECK(in_io_loop_);
   2341 
   2342   net_log_.AddEvent(
   2343       NetLog::TYPE_SPDY_SESSION_PING,
   2344       base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
   2345 
   2346   // Send response to a PING from server.
   2347   if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
   2348       (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
   2349     WritePingFrame(unique_id, true);
   2350     return;
   2351   }
   2352 
   2353   --pings_in_flight_;
   2354   if (pings_in_flight_ < 0) {
   2355     RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
   2356     DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
   2357     pings_in_flight_ = 0;
   2358     return;
   2359   }
   2360 
   2361   if (pings_in_flight_ > 0)
   2362     return;
   2363 
   2364   // We will record RTT in histogram when there are no more client sent
   2365   // pings_in_flight_.
   2366   RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
   2367 }
   2368 
   2369 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
   2370                                  uint32 delta_window_size) {
   2371   CHECK(in_io_loop_);
   2372 
   2373   DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
   2374   net_log_.AddEvent(
   2375       NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
   2376       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
   2377                  stream_id, delta_window_size));
   2378 
   2379   if (stream_id == kSessionFlowControlStreamId) {
   2380     // WINDOW_UPDATE for the session.
   2381     if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
   2382       LOG(WARNING) << "Received WINDOW_UPDATE for session when "
   2383                    << "session flow control is not turned on";
   2384       // TODO(akalin): Record an error and close the session.
   2385       return;
   2386     }
   2387 
   2388     if (delta_window_size < 1u) {
   2389       RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
   2390       DoDrainSession(
   2391           ERR_SPDY_PROTOCOL_ERROR,
   2392           "Received WINDOW_UPDATE with an invalid delta_window_size " +
   2393               base::UintToString(delta_window_size));
   2394       return;
   2395     }
   2396 
   2397     IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
   2398   } else {
   2399     // WINDOW_UPDATE for a stream.
   2400     if (flow_control_state_ < FLOW_CONTROL_STREAM) {
   2401       // TODO(akalin): Record an error and close the session.
   2402       LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
   2403                    << " when flow control is not turned on";
   2404       return;
   2405     }
   2406 
   2407     ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2408 
   2409     if (it == active_streams_.end()) {
   2410       // NOTE:  it may just be that the stream was cancelled.
   2411       LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
   2412       return;
   2413     }
   2414 
   2415     SpdyStream* stream = it->second.stream;
   2416     CHECK_EQ(stream->stream_id(), stream_id);
   2417 
   2418     if (delta_window_size < 1u) {
   2419       ResetStreamIterator(it,
   2420                           RST_STREAM_FLOW_CONTROL_ERROR,
   2421                           base::StringPrintf(
   2422                               "Received WINDOW_UPDATE with an invalid "
   2423                               "delta_window_size %ud", delta_window_size));
   2424       return;
   2425     }
   2426 
   2427     CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2428     it->second.stream->IncreaseSendWindowSize(
   2429         static_cast<int32>(delta_window_size));
   2430   }
   2431 }
   2432 
   2433 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
   2434                                       SpdyStreamId associated_stream_id,
   2435                                       SpdyPriority priority,
   2436                                       const SpdyHeaderBlock& headers) {
   2437   // Server-initiated streams should have even sequence numbers.
   2438   if ((stream_id & 0x1) != 0) {
   2439     LOG(WARNING) << "Received invalid push stream id " << stream_id;
   2440     return false;
   2441   }
   2442 
   2443   if (IsStreamActive(stream_id)) {
   2444     LOG(WARNING) << "Received push for active stream " << stream_id;
   2445     return false;
   2446   }
   2447 
   2448   RequestPriority request_priority =
   2449       ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
   2450 
   2451   if (availability_state_ == STATE_GOING_AWAY) {
   2452     // TODO(akalin): This behavior isn't in the SPDY spec, although it
   2453     // probably should be.
   2454     EnqueueResetStreamFrame(stream_id,
   2455                             request_priority,
   2456                             RST_STREAM_REFUSED_STREAM,
   2457                             "push stream request received when going away");
   2458     return false;
   2459   }
   2460 
   2461   if (associated_stream_id == 0) {
   2462     // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and
   2463     // session going away. We should never get here.
   2464     CHECK_GT(SPDY4, GetProtocolVersion());
   2465     std::string description = base::StringPrintf(
   2466         "Received invalid associated stream id %d for pushed stream %d",
   2467         associated_stream_id,
   2468         stream_id);
   2469     EnqueueResetStreamFrame(
   2470         stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description);
   2471     return false;
   2472   }
   2473 
   2474   streams_pushed_count_++;
   2475 
   2476   // TODO(mbelshe): DCHECK that this is a GET method?
   2477 
   2478   // Verify that the response had a URL for us.
   2479   GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
   2480   if (!gurl.is_valid()) {
   2481     EnqueueResetStreamFrame(stream_id,
   2482                             request_priority,
   2483                             RST_STREAM_PROTOCOL_ERROR,
   2484                             "Pushed stream url was invalid: " + gurl.spec());
   2485     return false;
   2486   }
   2487 
   2488   // Verify we have a valid stream association.
   2489   ActiveStreamMap::iterator associated_it =
   2490       active_streams_.find(associated_stream_id);
   2491   if (associated_it == active_streams_.end()) {
   2492     EnqueueResetStreamFrame(
   2493         stream_id,
   2494         request_priority,
   2495         RST_STREAM_INVALID_STREAM,
   2496         base::StringPrintf("Received push for inactive associated stream %d",
   2497                            associated_stream_id));
   2498     return false;
   2499   }
   2500 
   2501   // Check that the pushed stream advertises the same origin as its associated
   2502   // stream. Bypass this check if and only if this session is with a SPDY proxy
   2503   // that is trusted explicitly via the --trusted-spdy-proxy switch.
   2504   if (trusted_spdy_proxy_.Equals(host_port_pair())) {
   2505     // Disallow pushing of HTTPS content.
   2506     if (gurl.SchemeIs("https")) {
   2507       EnqueueResetStreamFrame(
   2508           stream_id,
   2509           request_priority,
   2510           RST_STREAM_REFUSED_STREAM,
   2511           base::StringPrintf("Rejected push of Cross Origin HTTPS content %d",
   2512                              associated_stream_id));
   2513     }
   2514   } else {
   2515     GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
   2516     if (associated_url.GetOrigin() != gurl.GetOrigin()) {
   2517       EnqueueResetStreamFrame(
   2518           stream_id,
   2519           request_priority,
   2520           RST_STREAM_REFUSED_STREAM,
   2521           base::StringPrintf("Rejected Cross Origin Push Stream %d",
   2522                              associated_stream_id));
   2523       return false;
   2524     }
   2525   }
   2526 
   2527   // There should not be an existing pushed stream with the same path.
   2528   PushedStreamMap::iterator pushed_it =
   2529       unclaimed_pushed_streams_.lower_bound(gurl);
   2530   if (pushed_it != unclaimed_pushed_streams_.end() &&
   2531       pushed_it->first == gurl) {
   2532     EnqueueResetStreamFrame(
   2533         stream_id,
   2534         request_priority,
   2535         RST_STREAM_PROTOCOL_ERROR,
   2536         "Received duplicate pushed stream with url: " + gurl.spec());
   2537     return false;
   2538   }
   2539 
   2540   scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM,
   2541                                                GetWeakPtr(),
   2542                                                gurl,
   2543                                                request_priority,
   2544                                                stream_initial_send_window_size_,
   2545                                                stream_initial_recv_window_size_,
   2546                                                net_log_));
   2547   stream->set_stream_id(stream_id);
   2548 
   2549   // In spdy4/http2 PUSH_PROMISE arrives on associated stream.
   2550   if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) {
   2551     associated_it->second.stream->IncrementRawReceivedBytes(
   2552         last_compressed_frame_len_);
   2553   } else {
   2554     stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
   2555   }
   2556 
   2557   last_compressed_frame_len_ = 0;
   2558 
   2559   DeleteExpiredPushedStreams();
   2560   PushedStreamMap::iterator inserted_pushed_it =
   2561       unclaimed_pushed_streams_.insert(
   2562           pushed_it,
   2563           std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
   2564   DCHECK(inserted_pushed_it != pushed_it);
   2565 
   2566   InsertActivatedStream(stream.Pass());
   2567 
   2568   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
   2569   if (active_it == active_streams_.end()) {
   2570     NOTREACHED();
   2571     return false;
   2572   }
   2573 
   2574   active_it->second.stream->OnPushPromiseHeadersReceived(headers);
   2575   DCHECK(active_it->second.stream->IsReservedRemote());
   2576   return true;
   2577 }
   2578 
   2579 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
   2580                                 SpdyStreamId promised_stream_id,
   2581                                 const SpdyHeaderBlock& headers) {
   2582   CHECK(in_io_loop_);
   2583 
   2584   if (net_log_.IsLogging()) {
   2585     net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE,
   2586                       base::Bind(&NetLogSpdyPushPromiseReceivedCallback,
   2587                                  &headers,
   2588                                  stream_id,
   2589                                  promised_stream_id));
   2590   }
   2591 
   2592   // Any priority will do.
   2593   // TODO(baranovich): pass parent stream id priority?
   2594   if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers))
   2595     return;
   2596 
   2597   base::StatsCounter push_requests("spdy.pushed_streams");
   2598   push_requests.Increment();
   2599 }
   2600 
   2601 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
   2602                                          uint32 delta_window_size) {
   2603   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2604   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   2605   CHECK(it != active_streams_.end());
   2606   CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2607   SendWindowUpdateFrame(
   2608       stream_id, delta_window_size, it->second.stream->priority());
   2609 }
   2610 
   2611 void SpdySession::SendInitialData() {
   2612   DCHECK(enable_sending_initial_data_);
   2613 
   2614   if (send_connection_header_prefix_) {
   2615     DCHECK_EQ(protocol_, kProtoSPDY4);
   2616     scoped_ptr<SpdyFrame> connection_header_prefix_frame(
   2617         new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
   2618                       kHttp2ConnectionHeaderPrefixSize,
   2619                       false /* take_ownership */));
   2620     // Count the prefix as part of the subsequent SETTINGS frame.
   2621     EnqueueSessionWrite(HIGHEST, SETTINGS,
   2622                         connection_header_prefix_frame.Pass());
   2623   }
   2624 
   2625   // First, notify the server about the settings they should use when
   2626   // communicating with us.
   2627   SettingsMap settings_map;
   2628   // Create a new settings frame notifying the server of our
   2629   // max concurrent streams and initial window size.
   2630   settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
   2631       SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
   2632   if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
   2633       stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
   2634     settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
   2635         SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
   2636                               stream_initial_recv_window_size_);
   2637   }
   2638   SendSettings(settings_map);
   2639 
   2640   // Next, notify the server about our initial recv window size.
   2641   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   2642     // Bump up the receive window size to the real initial value. This
   2643     // has to go here since the WINDOW_UPDATE frame sent by
   2644     // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
   2645     DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
   2646     // This condition implies that |kDefaultInitialRecvWindowSize| -
   2647     // |session_recv_window_size_| doesn't overflow.
   2648     DCHECK_GT(session_recv_window_size_, 0);
   2649     IncreaseRecvWindowSize(
   2650         kDefaultInitialRecvWindowSize - session_recv_window_size_);
   2651   }
   2652 
   2653   // Finally, notify the server about the settings they have
   2654   // previously told us to use when communicating with them (after
   2655   // applying them).
   2656   const SettingsMap& server_settings_map =
   2657       http_server_properties_->GetSpdySettings(host_port_pair());
   2658   if (server_settings_map.empty())
   2659     return;
   2660 
   2661   SettingsMap::const_iterator it =
   2662       server_settings_map.find(SETTINGS_CURRENT_CWND);
   2663   uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
   2664   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
   2665 
   2666   for (SettingsMap::const_iterator it = server_settings_map.begin();
   2667        it != server_settings_map.end(); ++it) {
   2668     const SpdySettingsIds new_id = it->first;
   2669     const uint32 new_val = it->second.second;
   2670     HandleSetting(new_id, new_val);
   2671   }
   2672 
   2673   SendSettings(server_settings_map);
   2674 }
   2675 
   2676 
   2677 void SpdySession::SendSettings(const SettingsMap& settings) {
   2678   net_log_.AddEvent(
   2679       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
   2680       base::Bind(&NetLogSpdySendSettingsCallback, &settings));
   2681 
   2682   // Create the SETTINGS frame and send it.
   2683   DCHECK(buffered_spdy_framer_.get());
   2684   scoped_ptr<SpdyFrame> settings_frame(
   2685       buffered_spdy_framer_->CreateSettings(settings));
   2686   sent_settings_ = true;
   2687   EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
   2688 }
   2689 
   2690 void SpdySession::HandleSetting(uint32 id, uint32 value) {
   2691   switch (id) {
   2692     case SETTINGS_MAX_CONCURRENT_STREAMS:
   2693       max_concurrent_streams_ = std::min(static_cast<size_t>(value),
   2694                                          kMaxConcurrentStreamLimit);
   2695       ProcessPendingStreamRequests();
   2696       break;
   2697     case SETTINGS_INITIAL_WINDOW_SIZE: {
   2698       if (flow_control_state_ < FLOW_CONTROL_STREAM) {
   2699         net_log().AddEvent(
   2700             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
   2701         return;
   2702       }
   2703 
   2704       if (value > static_cast<uint32>(kint32max)) {
   2705         net_log().AddEvent(
   2706             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
   2707             NetLog::IntegerCallback("initial_window_size", value));
   2708         return;
   2709       }
   2710 
   2711       // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
   2712       int32 delta_window_size =
   2713           static_cast<int32>(value) - stream_initial_send_window_size_;
   2714       stream_initial_send_window_size_ = static_cast<int32>(value);
   2715       UpdateStreamsSendWindowSize(delta_window_size);
   2716       net_log().AddEvent(
   2717           NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
   2718           NetLog::IntegerCallback("delta_window_size", delta_window_size));
   2719       break;
   2720     }
   2721   }
   2722 }
   2723 
   2724 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
   2725   DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2726   for (ActiveStreamMap::iterator it = active_streams_.begin();
   2727        it != active_streams_.end(); ++it) {
   2728     it->second.stream->AdjustSendWindowSize(delta_window_size);
   2729   }
   2730 
   2731   for (CreatedStreamSet::const_iterator it = created_streams_.begin();
   2732        it != created_streams_.end(); it++) {
   2733     (*it)->AdjustSendWindowSize(delta_window_size);
   2734   }
   2735 }
   2736 
   2737 void SpdySession::SendPrefacePingIfNoneInFlight() {
   2738   if (pings_in_flight_ || !enable_ping_based_connection_checking_)
   2739     return;
   2740 
   2741   base::TimeTicks now = time_func_();
   2742   // If there is no activity in the session, then send a preface-PING.
   2743   if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
   2744     SendPrefacePing();
   2745 }
   2746 
   2747 void SpdySession::SendPrefacePing() {
   2748   WritePingFrame(next_ping_id_, false);
   2749 }
   2750 
   2751 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
   2752                                         uint32 delta_window_size,
   2753                                         RequestPriority priority) {
   2754   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2755   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   2756   if (it != active_streams_.end()) {
   2757     CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2758   } else {
   2759     CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2760     CHECK_EQ(stream_id, kSessionFlowControlStreamId);
   2761   }
   2762 
   2763   net_log_.AddEvent(
   2764       NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
   2765       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
   2766                  stream_id, delta_window_size));
   2767 
   2768   DCHECK(buffered_spdy_framer_.get());
   2769   scoped_ptr<SpdyFrame> window_update_frame(
   2770       buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
   2771   EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
   2772 }
   2773 
   2774 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
   2775   DCHECK(buffered_spdy_framer_.get());
   2776   scoped_ptr<SpdyFrame> ping_frame(
   2777       buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
   2778   EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
   2779 
   2780   if (net_log().IsLogging()) {
   2781     net_log().AddEvent(
   2782         NetLog::TYPE_SPDY_SESSION_PING,
   2783         base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
   2784   }
   2785   if (!is_ack) {
   2786     next_ping_id_ += 2;
   2787     ++pings_in_flight_;
   2788     PlanToCheckPingStatus();
   2789     last_ping_sent_time_ = time_func_();
   2790   }
   2791 }
   2792 
   2793 void SpdySession::PlanToCheckPingStatus() {
   2794   if (check_ping_status_pending_)
   2795     return;
   2796 
   2797   check_ping_status_pending_ = true;
   2798   base::MessageLoop::current()->PostDelayedTask(
   2799       FROM_HERE,
   2800       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
   2801                  time_func_()), hung_interval_);
   2802 }
   2803 
   2804 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
   2805   CHECK(!in_io_loop_);
   2806 
   2807   // Check if we got a response back for all PINGs we had sent.
   2808   if (pings_in_flight_ == 0) {
   2809     check_ping_status_pending_ = false;
   2810     return;
   2811   }
   2812 
   2813   DCHECK(check_ping_status_pending_);
   2814 
   2815   base::TimeTicks now = time_func_();
   2816   base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
   2817 
   2818   if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
   2819     // Track all failed PING messages in a separate bucket.
   2820     RecordPingRTTHistogram(base::TimeDelta::Max());
   2821     DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
   2822     return;
   2823   }
   2824 
   2825   // Check the status of connection after a delay.
   2826   base::MessageLoop::current()->PostDelayedTask(
   2827       FROM_HERE,
   2828       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
   2829                  now),
   2830       delay);
   2831 }
   2832 
   2833 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
   2834   UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
   2835 }
   2836 
   2837 void SpdySession::RecordProtocolErrorHistogram(
   2838     SpdyProtocolErrorDetails details) {
   2839   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
   2840                             NUM_SPDY_PROTOCOL_ERROR_DETAILS);
   2841   if (EndsWith(host_port_pair().host(), "google.com", false)) {
   2842     UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
   2843                               NUM_SPDY_PROTOCOL_ERROR_DETAILS);
   2844   }
   2845 }
   2846 
   2847 void SpdySession::RecordHistograms() {
   2848   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
   2849                               streams_initiated_count_,
   2850                               0, 300, 50);
   2851   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
   2852                               streams_pushed_count_,
   2853                               0, 300, 50);
   2854   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
   2855                               streams_pushed_and_claimed_count_,
   2856                               0, 300, 50);
   2857   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
   2858                               streams_abandoned_count_,
   2859                               0, 300, 50);
   2860   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
   2861                             sent_settings_ ? 1 : 0, 2);
   2862   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
   2863                             received_settings_ ? 1 : 0, 2);
   2864   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
   2865                               stalled_streams_,
   2866                               0, 300, 50);
   2867   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
   2868                             stalled_streams_ > 0 ? 1 : 0, 2);
   2869 
   2870   if (received_settings_) {
   2871     // Enumerate the saved settings, and set histograms for it.
   2872     const SettingsMap& settings_map =
   2873         http_server_properties_->GetSpdySettings(host_port_pair());
   2874 
   2875     SettingsMap::const_iterator it;
   2876     for (it = settings_map.begin(); it != settings_map.end(); ++it) {
   2877       const SpdySettingsIds id = it->first;
   2878       const uint32 val = it->second.second;
   2879       switch (id) {
   2880         case SETTINGS_CURRENT_CWND:
   2881           // Record several different histograms to see if cwnd converges
   2882           // for larger volumes of data being sent.
   2883           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
   2884                                       val, 1, 200, 100);
   2885           if (total_bytes_received_ > 10 * 1024) {
   2886             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
   2887                                         val, 1, 200, 100);
   2888             if (total_bytes_received_ > 25 * 1024) {
   2889               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
   2890                                           val, 1, 200, 100);
   2891               if (total_bytes_received_ > 50 * 1024) {
   2892                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
   2893                                             val, 1, 200, 100);
   2894                 if (total_bytes_received_ > 100 * 1024) {
   2895                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
   2896                                               val, 1, 200, 100);
   2897                 }
   2898               }
   2899             }
   2900           }
   2901           break;
   2902         case SETTINGS_ROUND_TRIP_TIME:
   2903           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
   2904                                       val, 1, 1200, 100);
   2905           break;
   2906         case SETTINGS_DOWNLOAD_RETRANS_RATE:
   2907           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
   2908                                       val, 1, 100, 50);
   2909           break;
   2910         default:
   2911           break;
   2912       }
   2913     }
   2914   }
   2915 }
   2916 
   2917 void SpdySession::CompleteStreamRequest(
   2918     const base::WeakPtr<SpdyStreamRequest>& pending_request) {
   2919   // Abort if the request has already been cancelled.
   2920   if (!pending_request)
   2921     return;
   2922 
   2923   base::WeakPtr<SpdyStream> stream;
   2924   int rv = TryCreateStream(pending_request, &stream);
   2925 
   2926   if (rv == OK) {
   2927     DCHECK(stream);
   2928     pending_request->OnRequestCompleteSuccess(stream);
   2929     return;
   2930   }
   2931   DCHECK(!stream);
   2932 
   2933   if (rv != ERR_IO_PENDING) {
   2934     pending_request->OnRequestCompleteFailure(rv);
   2935   }
   2936 }
   2937 
   2938 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
   2939   if (!is_secure_)
   2940     return NULL;
   2941   SSLClientSocket* ssl_socket =
   2942       reinterpret_cast<SSLClientSocket*>(connection_->socket());
   2943   DCHECK(ssl_socket);
   2944   return ssl_socket;
   2945 }
   2946 
   2947 void SpdySession::OnWriteBufferConsumed(
   2948     size_t frame_payload_size,
   2949     size_t consume_size,
   2950     SpdyBuffer::ConsumeSource consume_source) {
   2951   // We can be called with |in_io_loop_| set if a write SpdyBuffer is
   2952   // deleted (e.g., a stream is closed due to incoming data).
   2953 
   2954   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2955 
   2956   if (consume_source == SpdyBuffer::DISCARD) {
   2957     // If we're discarding a frame or part of it, increase the send
   2958     // window by the number of discarded bytes. (Although if we're
   2959     // discarding part of a frame, it's probably because of a write
   2960     // error and we'll be tearing down the session soon.)
   2961     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
   2962     DCHECK_GT(remaining_payload_bytes, 0u);
   2963     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
   2964   }
   2965   // For consumed bytes, the send window is increased when we receive
   2966   // a WINDOW_UPDATE frame.
   2967 }
   2968 
   2969 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
   2970   // We can be called with |in_io_loop_| set if a SpdyBuffer is
   2971   // deleted (e.g., a stream is closed due to incoming data).
   2972 
   2973   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2974   DCHECK_GE(delta_window_size, 1);
   2975 
   2976   // Check for overflow.
   2977   int32 max_delta_window_size = kint32max - session_send_window_size_;
   2978   if (delta_window_size > max_delta_window_size) {
   2979     RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
   2980     DoDrainSession(
   2981         ERR_SPDY_PROTOCOL_ERROR,
   2982         "Received WINDOW_UPDATE [delta: " +
   2983             base::IntToString(delta_window_size) +
   2984             "] for session overflows session_send_window_size_ [current: " +
   2985             base::IntToString(session_send_window_size_) + "]");
   2986     return;
   2987   }
   2988 
   2989   session_send_window_size_ += delta_window_size;
   2990 
   2991   net_log_.AddEvent(
   2992       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
   2993       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   2994                  delta_window_size, session_send_window_size_));
   2995 
   2996   DCHECK(!IsSendStalled());
   2997   ResumeSendStalledStreams();
   2998 }
   2999 
   3000 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
   3001   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3002 
   3003   // We only call this method when sending a frame. Therefore,
   3004   // |delta_window_size| should be within the valid frame size range.
   3005   DCHECK_GE(delta_window_size, 1);
   3006   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
   3007 
   3008   // |send_window_size_| should have been at least |delta_window_size| for
   3009   // this call to happen.
   3010   DCHECK_GE(session_send_window_size_, delta_window_size);
   3011 
   3012   session_send_window_size_ -= delta_window_size;
   3013 
   3014   net_log_.AddEvent(
   3015       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
   3016       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   3017                  -delta_window_size, session_send_window_size_));
   3018 }
   3019 
   3020 void SpdySession::OnReadBufferConsumed(
   3021     size_t consume_size,
   3022     SpdyBuffer::ConsumeSource consume_source) {
   3023   // We can be called with |in_io_loop_| set if a read SpdyBuffer is
   3024   // deleted (e.g., discarded by a SpdyReadQueue).
   3025 
   3026   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3027   DCHECK_GE(consume_size, 1u);
   3028   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
   3029 
   3030   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
   3031 }
   3032 
   3033 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
   3034   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3035   DCHECK_GE(session_unacked_recv_window_bytes_, 0);
   3036   DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
   3037   DCHECK_GE(delta_window_size, 1);
   3038   // Check for overflow.
   3039   DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
   3040 
   3041   session_recv_window_size_ += delta_window_size;
   3042   net_log_.AddEvent(
   3043       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
   3044       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   3045                  delta_window_size, session_recv_window_size_));
   3046 
   3047   session_unacked_recv_window_bytes_ += delta_window_size;
   3048   if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
   3049     SendWindowUpdateFrame(kSessionFlowControlStreamId,
   3050                           session_unacked_recv_window_bytes_,
   3051                           HIGHEST);
   3052     session_unacked_recv_window_bytes_ = 0;
   3053   }
   3054 }
   3055 
   3056 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
   3057   CHECK(in_io_loop_);
   3058   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3059   DCHECK_GE(delta_window_size, 1);
   3060 
   3061   // Since we never decrease the initial receive window size,
   3062   // |delta_window_size| should never cause |recv_window_size_| to go
   3063   // negative. If we do, the receive window isn't being respected.
   3064   if (delta_window_size > session_recv_window_size_) {
   3065     RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
   3066     DoDrainSession(
   3067         ERR_SPDY_FLOW_CONTROL_ERROR,
   3068         "delta_window_size is " + base::IntToString(delta_window_size) +
   3069             " in DecreaseRecvWindowSize, which is larger than the receive " +
   3070             "window size of " + base::IntToString(session_recv_window_size_));
   3071     return;
   3072   }
   3073 
   3074   session_recv_window_size_ -= delta_window_size;
   3075   net_log_.AddEvent(
   3076       NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
   3077       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   3078                  -delta_window_size, session_recv_window_size_));
   3079 }
   3080 
   3081 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
   3082   DCHECK(stream.send_stalled_by_flow_control());
   3083   RequestPriority priority = stream.priority();
   3084   CHECK_GE(priority, MINIMUM_PRIORITY);
   3085   CHECK_LE(priority, MAXIMUM_PRIORITY);
   3086   stream_send_unstall_queue_[priority].push_back(stream.stream_id());
   3087 }
   3088 
   3089 void SpdySession::ResumeSendStalledStreams() {
   3090   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3091 
   3092   // We don't have to worry about new streams being queued, since
   3093   // doing so would cause IsSendStalled() to return true. But we do
   3094   // have to worry about streams being closed, as well as ourselves
   3095   // being closed.
   3096 
   3097   while (!IsSendStalled()) {
   3098     size_t old_size = 0;
   3099 #if DCHECK_IS_ON
   3100     old_size = GetTotalSize(stream_send_unstall_queue_);
   3101 #endif
   3102 
   3103     SpdyStreamId stream_id = PopStreamToPossiblyResume();
   3104     if (stream_id == 0)
   3105       break;
   3106     ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   3107     // The stream may actually still be send-stalled after this (due
   3108     // to its own send window) but that's okay -- it'll then be
   3109     // resumed once its send window increases.
   3110     if (it != active_streams_.end())
   3111       it->second.stream->PossiblyResumeIfSendStalled();
   3112 
   3113     // The size should decrease unless we got send-stalled again.
   3114     if (!IsSendStalled())
   3115       DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
   3116   }
   3117 }
   3118 
   3119 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
   3120   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
   3121     std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
   3122     if (!queue->empty()) {
   3123       SpdyStreamId stream_id = queue->front();
   3124       queue->pop_front();
   3125       return stream_id;
   3126     }
   3127   }
   3128   return 0;
   3129 }
   3130 
   3131 }  // namespace net
   3132