Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_session.h"
      6 
      7 #include <algorithm>
      8 #include <map>
      9 
     10 #include "base/basictypes.h"
     11 #include "base/bind.h"
     12 #include "base/compiler_specific.h"
     13 #include "base/logging.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "base/metrics/field_trial.h"
     16 #include "base/metrics/histogram.h"
     17 #include "base/metrics/sparse_histogram.h"
     18 #include "base/metrics/stats_counters.h"
     19 #include "base/stl_util.h"
     20 #include "base/strings/string_number_conversions.h"
     21 #include "base/strings/string_util.h"
     22 #include "base/strings/stringprintf.h"
     23 #include "base/strings/utf_string_conversions.h"
     24 #include "base/time/time.h"
     25 #include "base/values.h"
     26 #include "crypto/ec_private_key.h"
     27 #include "crypto/ec_signature_creator.h"
     28 #include "net/base/connection_type_histograms.h"
     29 #include "net/base/net_log.h"
     30 #include "net/base/net_util.h"
     31 #include "net/cert/asn1_util.h"
     32 #include "net/cert/cert_verify_result.h"
     33 #include "net/http/http_log_util.h"
     34 #include "net/http/http_network_session.h"
     35 #include "net/http/http_server_properties.h"
     36 #include "net/http/http_util.h"
     37 #include "net/http/transport_security_state.h"
     38 #include "net/socket/ssl_client_socket.h"
     39 #include "net/spdy/spdy_buffer_producer.h"
     40 #include "net/spdy/spdy_frame_builder.h"
     41 #include "net/spdy/spdy_http_utils.h"
     42 #include "net/spdy/spdy_protocol.h"
     43 #include "net/spdy/spdy_session_pool.h"
     44 #include "net/spdy/spdy_stream.h"
     45 #include "net/ssl/channel_id_service.h"
     46 #include "net/ssl/ssl_cipher_suite_names.h"
     47 #include "net/ssl/ssl_connection_status_flags.h"
     48 
     49 namespace net {
     50 
     51 namespace {
     52 
     53 const int kReadBufferSize = 8 * 1024;
     54 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
     55 const int kHungIntervalSeconds = 10;
     56 
     57 // Minimum seconds that unclaimed pushed streams will be kept in memory.
     58 const int kMinPushedStreamLifetimeSeconds = 300;
     59 
     60 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
     61     const SpdyHeaderBlock& headers,
     62     net::NetLog::LogLevel log_level) {
     63   scoped_ptr<base::ListValue> headers_list(new base::ListValue());
     64   for (SpdyHeaderBlock::const_iterator it = headers.begin();
     65        it != headers.end(); ++it) {
     66     headers_list->AppendString(
     67         it->first + ": " +
     68         ElideHeaderValueForNetLog(log_level, it->first, it->second));
     69   }
     70   return headers_list.Pass();
     71 }
     72 
     73 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
     74                                              bool fin,
     75                                              bool unidirectional,
     76                                              SpdyPriority spdy_priority,
     77                                              SpdyStreamId stream_id,
     78                                              NetLog::LogLevel log_level) {
     79   base::DictionaryValue* dict = new base::DictionaryValue();
     80   dict->Set("headers",
     81             SpdyHeaderBlockToListValue(*headers, log_level).release());
     82   dict->SetBoolean("fin", fin);
     83   dict->SetBoolean("unidirectional", unidirectional);
     84   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
     85   dict->SetInteger("stream_id", stream_id);
     86   return dict;
     87 }
     88 
     89 base::Value* NetLogSpdySynStreamReceivedCallback(
     90     const SpdyHeaderBlock* headers,
     91     bool fin,
     92     bool unidirectional,
     93     SpdyPriority spdy_priority,
     94     SpdyStreamId stream_id,
     95     SpdyStreamId associated_stream,
     96     NetLog::LogLevel log_level) {
     97   base::DictionaryValue* dict = new base::DictionaryValue();
     98   dict->Set("headers",
     99             SpdyHeaderBlockToListValue(*headers, log_level).release());
    100   dict->SetBoolean("fin", fin);
    101   dict->SetBoolean("unidirectional", unidirectional);
    102   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
    103   dict->SetInteger("stream_id", stream_id);
    104   dict->SetInteger("associated_stream", associated_stream);
    105   return dict;
    106 }
    107 
    108 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
    109     const SpdyHeaderBlock* headers,
    110     bool fin,
    111     SpdyStreamId stream_id,
    112     NetLog::LogLevel log_level) {
    113   base::DictionaryValue* dict = new base::DictionaryValue();
    114   dict->Set("headers",
    115             SpdyHeaderBlockToListValue(*headers, log_level).release());
    116   dict->SetBoolean("fin", fin);
    117   dict->SetInteger("stream_id", stream_id);
    118   return dict;
    119 }
    120 
    121 base::Value* NetLogSpdySessionCloseCallback(int net_error,
    122                                             const std::string* description,
    123                                             NetLog::LogLevel /* log_level */) {
    124   base::DictionaryValue* dict = new base::DictionaryValue();
    125   dict->SetInteger("net_error", net_error);
    126   dict->SetString("description", *description);
    127   return dict;
    128 }
    129 
    130 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
    131                                        NetLog::LogLevel /* log_level */) {
    132   base::DictionaryValue* dict = new base::DictionaryValue();
    133   dict->SetString("host", host_pair->first.ToString());
    134   dict->SetString("proxy", host_pair->second.ToPacString());
    135   return dict;
    136 }
    137 
    138 base::Value* NetLogSpdyInitializedCallback(NetLog::Source source,
    139                                            const NextProto protocol_version,
    140                                            NetLog::LogLevel /* log_level */) {
    141   base::DictionaryValue* dict = new base::DictionaryValue();
    142   if (source.IsValid()) {
    143     source.AddToEventParameters(dict);
    144   }
    145   dict->SetString("protocol",
    146                   SSLClientSocket::NextProtoToString(protocol_version));
    147   return dict;
    148 }
    149 
    150 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
    151                                         bool clear_persisted,
    152                                         NetLog::LogLevel /* log_level */) {
    153   base::DictionaryValue* dict = new base::DictionaryValue();
    154   dict->SetString("host", host_port_pair.ToString());
    155   dict->SetBoolean("clear_persisted", clear_persisted);
    156   return dict;
    157 }
    158 
    159 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
    160                                        const SpdyMajorVersion protocol_version,
    161                                        SpdySettingsFlags flags,
    162                                        uint32 value,
    163                                        NetLog::LogLevel /* log_level */) {
    164   base::DictionaryValue* dict = new base::DictionaryValue();
    165   dict->SetInteger("id",
    166                    SpdyConstants::SerializeSettingId(protocol_version, id));
    167   dict->SetInteger("flags", flags);
    168   dict->SetInteger("value", value);
    169   return dict;
    170 }
    171 
    172 base::Value* NetLogSpdySendSettingsCallback(
    173     const SettingsMap* settings,
    174     const SpdyMajorVersion protocol_version,
    175     NetLog::LogLevel /* log_level */) {
    176   base::DictionaryValue* dict = new base::DictionaryValue();
    177   base::ListValue* settings_list = new base::ListValue();
    178   for (SettingsMap::const_iterator it = settings->begin();
    179        it != settings->end(); ++it) {
    180     const SpdySettingsIds id = it->first;
    181     const SpdySettingsFlags flags = it->second.first;
    182     const uint32 value = it->second.second;
    183     settings_list->Append(new base::StringValue(base::StringPrintf(
    184         "[id:%u flags:%u value:%u]",
    185         SpdyConstants::SerializeSettingId(protocol_version, id),
    186         flags,
    187         value)));
    188   }
    189   dict->Set("settings", settings_list);
    190   return dict;
    191 }
    192 
    193 base::Value* NetLogSpdyWindowUpdateFrameCallback(
    194     SpdyStreamId stream_id,
    195     uint32 delta,
    196     NetLog::LogLevel /* log_level */) {
    197   base::DictionaryValue* dict = new base::DictionaryValue();
    198   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    199   dict->SetInteger("delta", delta);
    200   return dict;
    201 }
    202 
    203 base::Value* NetLogSpdySessionWindowUpdateCallback(
    204     int32 delta,
    205     int32 window_size,
    206     NetLog::LogLevel /* log_level */) {
    207   base::DictionaryValue* dict = new base::DictionaryValue();
    208   dict->SetInteger("delta", delta);
    209   dict->SetInteger("window_size", window_size);
    210   return dict;
    211 }
    212 
    213 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
    214                                     int size,
    215                                     bool fin,
    216                                     NetLog::LogLevel /* log_level */) {
    217   base::DictionaryValue* dict = new base::DictionaryValue();
    218   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    219   dict->SetInteger("size", size);
    220   dict->SetBoolean("fin", fin);
    221   return dict;
    222 }
    223 
    224 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
    225                                    int status,
    226                                    const std::string* description,
    227                                    NetLog::LogLevel /* log_level */) {
    228   base::DictionaryValue* dict = new base::DictionaryValue();
    229   dict->SetInteger("stream_id", static_cast<int>(stream_id));
    230   dict->SetInteger("status", status);
    231   dict->SetString("description", *description);
    232   return dict;
    233 }
    234 
    235 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
    236                                     bool is_ack,
    237                                     const char* type,
    238                                     NetLog::LogLevel /* log_level */) {
    239   base::DictionaryValue* dict = new base::DictionaryValue();
    240   dict->SetInteger("unique_id", unique_id);
    241   dict->SetString("type", type);
    242   dict->SetBoolean("is_ack", is_ack);
    243   return dict;
    244 }
    245 
    246 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
    247                                       int active_streams,
    248                                       int unclaimed_streams,
    249                                       SpdyGoAwayStatus status,
    250                                       NetLog::LogLevel /* log_level */) {
    251   base::DictionaryValue* dict = new base::DictionaryValue();
    252   dict->SetInteger("last_accepted_stream_id",
    253                    static_cast<int>(last_stream_id));
    254   dict->SetInteger("active_streams", active_streams);
    255   dict->SetInteger("unclaimed_streams", unclaimed_streams);
    256   dict->SetInteger("status", static_cast<int>(status));
    257   return dict;
    258 }
    259 
    260 base::Value* NetLogSpdyPushPromiseReceivedCallback(
    261     const SpdyHeaderBlock* headers,
    262     SpdyStreamId stream_id,
    263     SpdyStreamId promised_stream_id,
    264     NetLog::LogLevel log_level) {
    265   base::DictionaryValue* dict = new base::DictionaryValue();
    266   dict->Set("headers",
    267             SpdyHeaderBlockToListValue(*headers, log_level).release());
    268   dict->SetInteger("id", stream_id);
    269   dict->SetInteger("promised_stream_id", promised_stream_id);
    270   return dict;
    271 }
    272 
    273 // Helper function to return the total size of an array of objects
    274 // with .size() member functions.
    275 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
    276   size_t total_size = 0;
    277   for (size_t i = 0; i < N; ++i) {
    278     total_size += arr[i].size();
    279   }
    280   return total_size;
    281 }
    282 
    283 // Helper class for std:find_if on STL container containing
    284 // SpdyStreamRequest weak pointers.
    285 class RequestEquals {
    286  public:
    287   RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
    288       : request_(request) {}
    289 
    290   bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
    291     return request_.get() == request.get();
    292   }
    293 
    294  private:
    295   const base::WeakPtr<SpdyStreamRequest> request_;
    296 };
    297 
    298 // The maximum number of concurrent streams we will ever create.  Even if
    299 // the server permits more, we will never exceed this limit.
    300 const size_t kMaxConcurrentStreamLimit = 256;
    301 
    302 }  // namespace
    303 
    304 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
    305     SpdyFramer::SpdyError err) {
    306   switch(err) {
    307     case SpdyFramer::SPDY_NO_ERROR:
    308       return SPDY_ERROR_NO_ERROR;
    309     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
    310       return SPDY_ERROR_INVALID_CONTROL_FRAME;
    311     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
    312       return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
    313     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
    314       return SPDY_ERROR_ZLIB_INIT_FAILURE;
    315     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
    316       return SPDY_ERROR_UNSUPPORTED_VERSION;
    317     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
    318       return SPDY_ERROR_DECOMPRESS_FAILURE;
    319     case SpdyFramer::SPDY_COMPRESS_FAILURE:
    320       return SPDY_ERROR_COMPRESS_FAILURE;
    321     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
    322       return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
    323     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
    324       return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
    325     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
    326       return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
    327     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
    328       return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
    329     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
    330       return SPDY_ERROR_UNEXPECTED_FRAME;
    331     default:
    332       NOTREACHED();
    333       return static_cast<SpdyProtocolErrorDetails>(-1);
    334   }
    335 }
    336 
    337 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) {
    338   switch (err) {
    339     case SpdyFramer::SPDY_NO_ERROR:
    340       return OK;
    341     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
    342       return ERR_SPDY_PROTOCOL_ERROR;
    343     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
    344       return ERR_SPDY_FRAME_SIZE_ERROR;
    345     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
    346       return ERR_SPDY_COMPRESSION_ERROR;
    347     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
    348       return ERR_SPDY_PROTOCOL_ERROR;
    349     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
    350       return ERR_SPDY_COMPRESSION_ERROR;
    351     case SpdyFramer::SPDY_COMPRESS_FAILURE:
    352       return ERR_SPDY_COMPRESSION_ERROR;
    353     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
    354       return ERR_SPDY_PROTOCOL_ERROR;
    355     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
    356       return ERR_SPDY_PROTOCOL_ERROR;
    357     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
    358       return ERR_SPDY_PROTOCOL_ERROR;
    359     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
    360       return ERR_SPDY_PROTOCOL_ERROR;
    361     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
    362       return ERR_SPDY_PROTOCOL_ERROR;
    363     default:
    364       NOTREACHED();
    365       return ERR_SPDY_PROTOCOL_ERROR;
    366   }
    367 }
    368 
    369 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
    370     SpdyRstStreamStatus status) {
    371   switch(status) {
    372     case RST_STREAM_PROTOCOL_ERROR:
    373       return STATUS_CODE_PROTOCOL_ERROR;
    374     case RST_STREAM_INVALID_STREAM:
    375       return STATUS_CODE_INVALID_STREAM;
    376     case RST_STREAM_REFUSED_STREAM:
    377       return STATUS_CODE_REFUSED_STREAM;
    378     case RST_STREAM_UNSUPPORTED_VERSION:
    379       return STATUS_CODE_UNSUPPORTED_VERSION;
    380     case RST_STREAM_CANCEL:
    381       return STATUS_CODE_CANCEL;
    382     case RST_STREAM_INTERNAL_ERROR:
    383       return STATUS_CODE_INTERNAL_ERROR;
    384     case RST_STREAM_FLOW_CONTROL_ERROR:
    385       return STATUS_CODE_FLOW_CONTROL_ERROR;
    386     case RST_STREAM_STREAM_IN_USE:
    387       return STATUS_CODE_STREAM_IN_USE;
    388     case RST_STREAM_STREAM_ALREADY_CLOSED:
    389       return STATUS_CODE_STREAM_ALREADY_CLOSED;
    390     case RST_STREAM_INVALID_CREDENTIALS:
    391       return STATUS_CODE_INVALID_CREDENTIALS;
    392     case RST_STREAM_FRAME_SIZE_ERROR:
    393       return STATUS_CODE_FRAME_SIZE_ERROR;
    394     case RST_STREAM_SETTINGS_TIMEOUT:
    395       return STATUS_CODE_SETTINGS_TIMEOUT;
    396     case RST_STREAM_CONNECT_ERROR:
    397       return STATUS_CODE_CONNECT_ERROR;
    398     case RST_STREAM_ENHANCE_YOUR_CALM:
    399       return STATUS_CODE_ENHANCE_YOUR_CALM;
    400     default:
    401       NOTREACHED();
    402       return static_cast<SpdyProtocolErrorDetails>(-1);
    403   }
    404 }
    405 
    406 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) {
    407   switch (err) {
    408     case OK:
    409       return GOAWAY_NO_ERROR;
    410     case ERR_SPDY_PROTOCOL_ERROR:
    411       return GOAWAY_PROTOCOL_ERROR;
    412     case ERR_SPDY_FLOW_CONTROL_ERROR:
    413       return GOAWAY_FLOW_CONTROL_ERROR;
    414     case ERR_SPDY_FRAME_SIZE_ERROR:
    415       return GOAWAY_FRAME_SIZE_ERROR;
    416     case ERR_SPDY_COMPRESSION_ERROR:
    417       return GOAWAY_COMPRESSION_ERROR;
    418     case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY:
    419       return GOAWAY_INADEQUATE_SECURITY;
    420     default:
    421       return GOAWAY_PROTOCOL_ERROR;
    422   }
    423 }
    424 
    425 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers,
    426                                             SpdyMajorVersion protocol_version,
    427                                             SpdyHeaderBlock* request_headers,
    428                                             SpdyHeaderBlock* response_headers) {
    429   DCHECK(response_headers);
    430   DCHECK(request_headers);
    431   for (SpdyHeaderBlock::const_iterator it = headers.begin();
    432        it != headers.end();
    433        ++it) {
    434     SpdyHeaderBlock* to_insert = response_headers;
    435     if (protocol_version == SPDY2) {
    436       if (it->first == "url")
    437         to_insert = request_headers;
    438     } else {
    439       const char* host = protocol_version >= SPDY4 ? ":authority" : ":host";
    440       static const char* scheme = ":scheme";
    441       static const char* path = ":path";
    442       if (it->first == host || it->first == scheme || it->first == path)
    443         to_insert = request_headers;
    444     }
    445     to_insert->insert(*it);
    446   }
    447 }
    448 
    449 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
    450   Reset();
    451 }
    452 
    453 SpdyStreamRequest::~SpdyStreamRequest() {
    454   CancelRequest();
    455 }
    456 
    457 int SpdyStreamRequest::StartRequest(
    458     SpdyStreamType type,
    459     const base::WeakPtr<SpdySession>& session,
    460     const GURL& url,
    461     RequestPriority priority,
    462     const BoundNetLog& net_log,
    463     const CompletionCallback& callback) {
    464   DCHECK(session);
    465   DCHECK(!session_);
    466   DCHECK(!stream_);
    467   DCHECK(callback_.is_null());
    468 
    469   type_ = type;
    470   session_ = session;
    471   url_ = url;
    472   priority_ = priority;
    473   net_log_ = net_log;
    474   callback_ = callback;
    475 
    476   base::WeakPtr<SpdyStream> stream;
    477   int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
    478   if (rv == OK) {
    479     Reset();
    480     stream_ = stream;
    481   }
    482   return rv;
    483 }
    484 
    485 void SpdyStreamRequest::CancelRequest() {
    486   if (session_)
    487     session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
    488   Reset();
    489   // Do this to cancel any pending CompleteStreamRequest() tasks.
    490   weak_ptr_factory_.InvalidateWeakPtrs();
    491 }
    492 
    493 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
    494   DCHECK(!session_);
    495   base::WeakPtr<SpdyStream> stream = stream_;
    496   DCHECK(stream);
    497   Reset();
    498   return stream;
    499 }
    500 
    501 void SpdyStreamRequest::OnRequestCompleteSuccess(
    502     const base::WeakPtr<SpdyStream>& stream) {
    503   DCHECK(session_);
    504   DCHECK(!stream_);
    505   DCHECK(!callback_.is_null());
    506   CompletionCallback callback = callback_;
    507   Reset();
    508   DCHECK(stream);
    509   stream_ = stream;
    510   callback.Run(OK);
    511 }
    512 
    513 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
    514   DCHECK(session_);
    515   DCHECK(!stream_);
    516   DCHECK(!callback_.is_null());
    517   CompletionCallback callback = callback_;
    518   Reset();
    519   DCHECK_NE(rv, OK);
    520   callback.Run(rv);
    521 }
    522 
    523 void SpdyStreamRequest::Reset() {
    524   type_ = SPDY_BIDIRECTIONAL_STREAM;
    525   session_.reset();
    526   stream_.reset();
    527   url_ = GURL();
    528   priority_ = MINIMUM_PRIORITY;
    529   net_log_ = BoundNetLog();
    530   callback_.Reset();
    531 }
    532 
    533 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
    534     : stream(NULL),
    535       waiting_for_syn_reply(false) {}
    536 
    537 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
    538     : stream(stream),
    539       waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {
    540 }
    541 
    542 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
    543 
    544 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
    545 
    546 SpdySession::PushedStreamInfo::PushedStreamInfo(
    547     SpdyStreamId stream_id,
    548     base::TimeTicks creation_time)
    549     : stream_id(stream_id),
    550       creation_time(creation_time) {}
    551 
    552 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
    553 
    554 // static
    555 bool SpdySession::CanPool(TransportSecurityState* transport_security_state,
    556                           const SSLInfo& ssl_info,
    557                           const std::string& old_hostname,
    558                           const std::string& new_hostname) {
    559   // Pooling is prohibited if the server cert is not valid for the new domain,
    560   // and for connections on which client certs were sent. It is also prohibited
    561   // when channel ID was sent if the hosts are from different eTLDs+1.
    562   if (IsCertStatusError(ssl_info.cert_status))
    563     return false;
    564 
    565   if (ssl_info.client_cert_sent)
    566     return false;
    567 
    568   if (ssl_info.channel_id_sent &&
    569       ChannelIDService::GetDomainForHost(new_hostname) !=
    570       ChannelIDService::GetDomainForHost(old_hostname)) {
    571     return false;
    572   }
    573 
    574   bool unused = false;
    575   if (!ssl_info.cert->VerifyNameMatch(new_hostname, &unused))
    576     return false;
    577 
    578   std::string pinning_failure_log;
    579   if (!transport_security_state->CheckPublicKeyPins(
    580           new_hostname,
    581           ssl_info.is_issued_by_known_root,
    582           ssl_info.public_key_hashes,
    583           &pinning_failure_log)) {
    584     return false;
    585   }
    586 
    587   return true;
    588 }
    589 
    590 SpdySession::SpdySession(
    591     const SpdySessionKey& spdy_session_key,
    592     const base::WeakPtr<HttpServerProperties>& http_server_properties,
    593     TransportSecurityState* transport_security_state,
    594     bool verify_domain_authentication,
    595     bool enable_sending_initial_data,
    596     bool enable_compression,
    597     bool enable_ping_based_connection_checking,
    598     NextProto default_protocol,
    599     size_t stream_initial_recv_window_size,
    600     size_t initial_max_concurrent_streams,
    601     size_t max_concurrent_streams_limit,
    602     TimeFunc time_func,
    603     const HostPortPair& trusted_spdy_proxy,
    604     NetLog* net_log)
    605     : in_io_loop_(false),
    606       spdy_session_key_(spdy_session_key),
    607       pool_(NULL),
    608       http_server_properties_(http_server_properties),
    609       transport_security_state_(transport_security_state),
    610       read_buffer_(new IOBuffer(kReadBufferSize)),
    611       stream_hi_water_mark_(kFirstStreamId),
    612       last_accepted_push_stream_id_(0),
    613       num_pushed_streams_(0u),
    614       num_active_pushed_streams_(0u),
    615       in_flight_write_frame_type_(DATA),
    616       in_flight_write_frame_size_(0),
    617       is_secure_(false),
    618       certificate_error_code_(OK),
    619       availability_state_(STATE_AVAILABLE),
    620       read_state_(READ_STATE_DO_READ),
    621       write_state_(WRITE_STATE_IDLE),
    622       error_on_close_(OK),
    623       max_concurrent_streams_(initial_max_concurrent_streams == 0
    624                                   ? kInitialMaxConcurrentStreams
    625                                   : initial_max_concurrent_streams),
    626       max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
    627                                         ? kMaxConcurrentStreamLimit
    628                                         : max_concurrent_streams_limit),
    629       max_concurrent_pushed_streams_(kMaxConcurrentPushedStreams),
    630       streams_initiated_count_(0),
    631       streams_pushed_count_(0),
    632       streams_pushed_and_claimed_count_(0),
    633       streams_abandoned_count_(0),
    634       total_bytes_received_(0),
    635       sent_settings_(false),
    636       received_settings_(false),
    637       stalled_streams_(0),
    638       pings_in_flight_(0),
    639       next_ping_id_(1),
    640       last_activity_time_(time_func()),
    641       last_compressed_frame_len_(0),
    642       check_ping_status_pending_(false),
    643       send_connection_header_prefix_(false),
    644       flow_control_state_(FLOW_CONTROL_NONE),
    645       stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
    646       stream_initial_recv_window_size_(stream_initial_recv_window_size == 0
    647                                            ? kDefaultInitialRecvWindowSize
    648                                            : stream_initial_recv_window_size),
    649       session_send_window_size_(0),
    650       session_recv_window_size_(0),
    651       session_unacked_recv_window_bytes_(0),
    652       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
    653       verify_domain_authentication_(verify_domain_authentication),
    654       enable_sending_initial_data_(enable_sending_initial_data),
    655       enable_compression_(enable_compression),
    656       enable_ping_based_connection_checking_(
    657           enable_ping_based_connection_checking),
    658       protocol_(default_protocol),
    659       connection_at_risk_of_loss_time_(
    660           base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
    661       hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
    662       trusted_spdy_proxy_(trusted_spdy_proxy),
    663       time_func_(time_func),
    664       weak_factory_(this) {
    665   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
    666   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
    667   DCHECK(HttpStreamFactory::spdy_enabled());
    668   net_log_.BeginEvent(
    669       NetLog::TYPE_SPDY_SESSION,
    670       base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
    671   next_unclaimed_push_stream_sweep_time_ = time_func_() +
    672       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
    673   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
    674 }
    675 
    676 SpdySession::~SpdySession() {
    677   CHECK(!in_io_loop_);
    678   DcheckDraining();
    679 
    680   // TODO(akalin): Check connection->is_initialized() instead. This
    681   // requires re-working CreateFakeSpdySession(), though.
    682   DCHECK(connection_->socket());
    683   // With SPDY we can't recycle sockets.
    684   connection_->socket()->Disconnect();
    685 
    686   RecordHistograms();
    687 
    688   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
    689 }
    690 
    691 void SpdySession::InitializeWithSocket(
    692     scoped_ptr<ClientSocketHandle> connection,
    693     SpdySessionPool* pool,
    694     bool is_secure,
    695     int certificate_error_code) {
    696   CHECK(!in_io_loop_);
    697   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
    698   DCHECK_EQ(read_state_, READ_STATE_DO_READ);
    699   DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
    700   DCHECK(!connection_);
    701 
    702   DCHECK(certificate_error_code == OK ||
    703          certificate_error_code < ERR_IO_PENDING);
    704   // TODO(akalin): Check connection->is_initialized() instead. This
    705   // requires re-working CreateFakeSpdySession(), though.
    706   DCHECK(connection->socket());
    707 
    708   base::StatsCounter spdy_sessions("spdy.sessions");
    709   spdy_sessions.Increment();
    710 
    711   connection_ = connection.Pass();
    712   is_secure_ = is_secure;
    713   certificate_error_code_ = certificate_error_code;
    714 
    715   NextProto protocol_negotiated =
    716       connection_->socket()->GetNegotiatedProtocol();
    717   if (protocol_negotiated != kProtoUnknown) {
    718     protocol_ = protocol_negotiated;
    719   }
    720   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
    721   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
    722 
    723   if (protocol_ == kProtoSPDY4)
    724     send_connection_header_prefix_ = true;
    725 
    726   if (protocol_ >= kProtoSPDY31) {
    727     flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
    728     session_send_window_size_ = kSpdySessionInitialWindowSize;
    729     session_recv_window_size_ = kSpdySessionInitialWindowSize;
    730   } else if (protocol_ >= kProtoSPDY3) {
    731     flow_control_state_ = FLOW_CONTROL_STREAM;
    732   } else {
    733     flow_control_state_ = FLOW_CONTROL_NONE;
    734   }
    735 
    736   buffered_spdy_framer_.reset(
    737       new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
    738                              enable_compression_));
    739   buffered_spdy_framer_->set_visitor(this);
    740   buffered_spdy_framer_->set_debug_visitor(this);
    741   UMA_HISTOGRAM_ENUMERATION(
    742       "Net.SpdyVersion2",
    743       protocol_ - kProtoSPDYMinimumVersion,
    744       kProtoSPDYMaximumVersion - kProtoSPDYMinimumVersion + 1);
    745 
    746   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_INITIALIZED,
    747                     base::Bind(&NetLogSpdyInitializedCallback,
    748                                connection_->socket()->NetLog().source(),
    749                                protocol_));
    750 
    751   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
    752   connection_->AddHigherLayeredPool(this);
    753   if (enable_sending_initial_data_)
    754     SendInitialData();
    755   pool_ = pool;
    756 
    757   // Bootstrap the read loop.
    758   base::MessageLoop::current()->PostTask(
    759       FROM_HERE,
    760       base::Bind(&SpdySession::PumpReadLoop,
    761                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
    762 }
    763 
    764 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
    765   if (!verify_domain_authentication_)
    766     return true;
    767 
    768   if (availability_state_ == STATE_DRAINING)
    769     return false;
    770 
    771   SSLInfo ssl_info;
    772   bool was_npn_negotiated;
    773   NextProto protocol_negotiated = kProtoUnknown;
    774   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
    775     return true;   // This is not a secure session, so all domains are okay.
    776 
    777   return CanPool(transport_security_state_, ssl_info,
    778                  host_port_pair().host(), domain);
    779 }
    780 
    781 int SpdySession::GetPushStream(
    782     const GURL& url,
    783     base::WeakPtr<SpdyStream>* stream,
    784     const BoundNetLog& stream_net_log) {
    785   CHECK(!in_io_loop_);
    786 
    787   stream->reset();
    788 
    789   if (availability_state_ == STATE_DRAINING)
    790     return ERR_CONNECTION_CLOSED;
    791 
    792   Error err = TryAccessStream(url);
    793   if (err != OK)
    794     return err;
    795 
    796   *stream = GetActivePushStream(url);
    797   if (*stream) {
    798     DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
    799     streams_pushed_and_claimed_count_++;
    800   }
    801   return OK;
    802 }
    803 
    804 // {,Try}CreateStream() and TryAccessStream() can be called with
    805 // |in_io_loop_| set if a stream is being created in response to
    806 // another being closed due to received data.
    807 
    808 Error SpdySession::TryAccessStream(const GURL& url) {
    809   if (is_secure_ && certificate_error_code_ != OK &&
    810       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
    811     RecordProtocolErrorHistogram(
    812         PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
    813     DoDrainSession(
    814         static_cast<Error>(certificate_error_code_),
    815         "Tried to get SPDY stream for secure content over an unauthenticated "
    816         "session.");
    817     return ERR_SPDY_PROTOCOL_ERROR;
    818   }
    819   return OK;
    820 }
    821 
    822 int SpdySession::TryCreateStream(
    823     const base::WeakPtr<SpdyStreamRequest>& request,
    824     base::WeakPtr<SpdyStream>* stream) {
    825   DCHECK(request);
    826 
    827   if (availability_state_ == STATE_GOING_AWAY)
    828     return ERR_FAILED;
    829 
    830   if (availability_state_ == STATE_DRAINING)
    831     return ERR_CONNECTION_CLOSED;
    832 
    833   Error err = TryAccessStream(request->url());
    834   if (err != OK)
    835     return err;
    836 
    837   if (!max_concurrent_streams_ ||
    838       (active_streams_.size() + created_streams_.size() - num_pushed_streams_ <
    839        max_concurrent_streams_)) {
    840     return CreateStream(*request, stream);
    841   }
    842 
    843   stalled_streams_++;
    844   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
    845   RequestPriority priority = request->priority();
    846   CHECK_GE(priority, MINIMUM_PRIORITY);
    847   CHECK_LE(priority, MAXIMUM_PRIORITY);
    848   pending_create_stream_queues_[priority].push_back(request);
    849   return ERR_IO_PENDING;
    850 }
    851 
    852 int SpdySession::CreateStream(const SpdyStreamRequest& request,
    853                               base::WeakPtr<SpdyStream>* stream) {
    854   DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
    855   DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
    856 
    857   if (availability_state_ == STATE_GOING_AWAY)
    858     return ERR_FAILED;
    859 
    860   if (availability_state_ == STATE_DRAINING)
    861     return ERR_CONNECTION_CLOSED;
    862 
    863   Error err = TryAccessStream(request.url());
    864   if (err != OK) {
    865     // This should have been caught in TryCreateStream().
    866     NOTREACHED();
    867     return err;
    868   }
    869 
    870   DCHECK(connection_->socket());
    871   DCHECK(connection_->socket()->IsConnected());
    872   if (connection_->socket()) {
    873     UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
    874                           connection_->socket()->IsConnected());
    875     if (!connection_->socket()->IsConnected()) {
    876       DoDrainSession(
    877           ERR_CONNECTION_CLOSED,
    878           "Tried to create SPDY stream for a closed socket connection.");
    879       return ERR_CONNECTION_CLOSED;
    880     }
    881   }
    882 
    883   scoped_ptr<SpdyStream> new_stream(
    884       new SpdyStream(request.type(), GetWeakPtr(), request.url(),
    885                      request.priority(),
    886                      stream_initial_send_window_size_,
    887                      stream_initial_recv_window_size_,
    888                      request.net_log()));
    889   *stream = new_stream->GetWeakPtr();
    890   InsertCreatedStream(new_stream.Pass());
    891 
    892   UMA_HISTOGRAM_CUSTOM_COUNTS(
    893       "Net.SpdyPriorityCount",
    894       static_cast<int>(request.priority()), 0, 10, 11);
    895 
    896   return OK;
    897 }
    898 
    899 void SpdySession::CancelStreamRequest(
    900     const base::WeakPtr<SpdyStreamRequest>& request) {
    901   DCHECK(request);
    902   RequestPriority priority = request->priority();
    903   CHECK_GE(priority, MINIMUM_PRIORITY);
    904   CHECK_LE(priority, MAXIMUM_PRIORITY);
    905 
    906 #if DCHECK_IS_ON
    907   // |request| should not be in a queue not matching its priority.
    908   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
    909     if (priority == i)
    910       continue;
    911     PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
    912     DCHECK(std::find_if(queue->begin(),
    913                         queue->end(),
    914                         RequestEquals(request)) == queue->end());
    915   }
    916 #endif
    917 
    918   PendingStreamRequestQueue* queue =
    919       &pending_create_stream_queues_[priority];
    920   // Remove |request| from |queue| while preserving the order of the
    921   // other elements.
    922   PendingStreamRequestQueue::iterator it =
    923       std::find_if(queue->begin(), queue->end(), RequestEquals(request));
    924   // The request may already be removed if there's a
    925   // CompleteStreamRequest() in flight.
    926   if (it != queue->end()) {
    927     it = queue->erase(it);
    928     // |request| should be in the queue at most once, and if it is
    929     // present, should not be pending completion.
    930     DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
    931            queue->end());
    932   }
    933 }
    934 
    935 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
    936   for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
    937     if (pending_create_stream_queues_[j].empty())
    938       continue;
    939 
    940     base::WeakPtr<SpdyStreamRequest> pending_request =
    941         pending_create_stream_queues_[j].front();
    942     DCHECK(pending_request);
    943     pending_create_stream_queues_[j].pop_front();
    944     return pending_request;
    945   }
    946   return base::WeakPtr<SpdyStreamRequest>();
    947 }
    948 
    949 void SpdySession::ProcessPendingStreamRequests() {
    950   // Like |max_concurrent_streams_|, 0 means infinite for
    951   // |max_requests_to_process|.
    952   size_t max_requests_to_process = 0;
    953   if (max_concurrent_streams_ != 0) {
    954     max_requests_to_process =
    955         max_concurrent_streams_ -
    956         (active_streams_.size() + created_streams_.size());
    957   }
    958   for (size_t i = 0;
    959        max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
    960     base::WeakPtr<SpdyStreamRequest> pending_request =
    961         GetNextPendingStreamRequest();
    962     if (!pending_request)
    963       break;
    964 
    965     // Note that this post can race with other stream creations, and it's
    966     // possible that the un-stalled stream will be stalled again if it loses.
    967     // TODO(jgraettinger): Provide stronger ordering guarantees.
    968     base::MessageLoop::current()->PostTask(
    969         FROM_HERE,
    970         base::Bind(&SpdySession::CompleteStreamRequest,
    971                    weak_factory_.GetWeakPtr(),
    972                    pending_request));
    973   }
    974 }
    975 
    976 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
    977   pooled_aliases_.insert(alias_key);
    978 }
    979 
    980 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
    981   DCHECK(buffered_spdy_framer_.get());
    982   return buffered_spdy_framer_->protocol_version();
    983 }
    984 
    985 bool SpdySession::HasAcceptableTransportSecurity() const {
    986   // If we're not even using TLS, we have no standards to meet.
    987   if (!is_secure_) {
    988     return true;
    989   }
    990 
    991   // We don't enforce transport security standards for older SPDY versions.
    992   if (GetProtocolVersion() < SPDY4) {
    993     return true;
    994   }
    995 
    996   SSLInfo ssl_info;
    997   CHECK(connection_->socket()->GetSSLInfo(&ssl_info));
    998 
    999   // HTTP/2 requires TLS 1.2+
   1000   if (SSLConnectionStatusToVersion(ssl_info.connection_status) <
   1001       SSL_CONNECTION_VERSION_TLS1_2) {
   1002     return false;
   1003   }
   1004 
   1005   if (!IsSecureTLSCipherSuite(
   1006           SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) {
   1007     return false;
   1008   }
   1009 
   1010   return true;
   1011 }
   1012 
   1013 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
   1014   return weak_factory_.GetWeakPtr();
   1015 }
   1016 
   1017 bool SpdySession::CloseOneIdleConnection() {
   1018   CHECK(!in_io_loop_);
   1019   DCHECK(pool_);
   1020   if (active_streams_.empty()) {
   1021     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
   1022   }
   1023   // Return false as the socket wasn't immediately closed.
   1024   return false;
   1025 }
   1026 
   1027 void SpdySession::EnqueueStreamWrite(
   1028     const base::WeakPtr<SpdyStream>& stream,
   1029     SpdyFrameType frame_type,
   1030     scoped_ptr<SpdyBufferProducer> producer) {
   1031   DCHECK(frame_type == HEADERS ||
   1032          frame_type == DATA ||
   1033          frame_type == CREDENTIAL ||
   1034          frame_type == SYN_STREAM);
   1035   EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
   1036 }
   1037 
   1038 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
   1039     SpdyStreamId stream_id,
   1040     RequestPriority priority,
   1041     SpdyControlFlags flags,
   1042     const SpdyHeaderBlock& block) {
   1043   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   1044   CHECK(it != active_streams_.end());
   1045   CHECK_EQ(it->second.stream->stream_id(), stream_id);
   1046 
   1047   SendPrefacePingIfNoneInFlight();
   1048 
   1049   DCHECK(buffered_spdy_framer_.get());
   1050   SpdyPriority spdy_priority =
   1051       ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
   1052 
   1053   scoped_ptr<SpdyFrame> syn_frame;
   1054   // TODO(hkhalil): Avoid copy of |block|.
   1055   if (GetProtocolVersion() <= SPDY3) {
   1056     SpdySynStreamIR syn_stream(stream_id);
   1057     syn_stream.set_associated_to_stream_id(0);
   1058     syn_stream.set_priority(spdy_priority);
   1059     syn_stream.set_fin((flags & CONTROL_FLAG_FIN) != 0);
   1060     syn_stream.set_unidirectional((flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0);
   1061     syn_stream.set_name_value_block(block);
   1062     syn_frame.reset(buffered_spdy_framer_->SerializeFrame(syn_stream));
   1063   } else {
   1064     SpdyHeadersIR headers(stream_id);
   1065     headers.set_priority(spdy_priority);
   1066     headers.set_has_priority(true);
   1067     headers.set_fin((flags & CONTROL_FLAG_FIN) != 0);
   1068     headers.set_name_value_block(block);
   1069     syn_frame.reset(buffered_spdy_framer_->SerializeFrame(headers));
   1070   }
   1071 
   1072   base::StatsCounter spdy_requests("spdy.requests");
   1073   spdy_requests.Increment();
   1074   streams_initiated_count_++;
   1075 
   1076   if (net_log().IsLogging()) {
   1077     net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
   1078                        base::Bind(&NetLogSpdySynStreamSentCallback,
   1079                                   &block,
   1080                                   (flags & CONTROL_FLAG_FIN) != 0,
   1081                                   (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
   1082                                   spdy_priority,
   1083                                   stream_id));
   1084   }
   1085 
   1086   return syn_frame.Pass();
   1087 }
   1088 
   1089 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
   1090                                                      IOBuffer* data,
   1091                                                      int len,
   1092                                                      SpdyDataFlags flags) {
   1093   if (availability_state_ == STATE_DRAINING) {
   1094     return scoped_ptr<SpdyBuffer>();
   1095   }
   1096 
   1097   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   1098   CHECK(it != active_streams_.end());
   1099   SpdyStream* stream = it->second.stream;
   1100   CHECK_EQ(stream->stream_id(), stream_id);
   1101 
   1102   if (len < 0) {
   1103     NOTREACHED();
   1104     return scoped_ptr<SpdyBuffer>();
   1105   }
   1106 
   1107   int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
   1108 
   1109   bool send_stalled_by_stream =
   1110       (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
   1111       (stream->send_window_size() <= 0);
   1112   bool send_stalled_by_session = IsSendStalled();
   1113 
   1114   // NOTE: There's an enum of the same name in histograms.xml.
   1115   enum SpdyFrameFlowControlState {
   1116     SEND_NOT_STALLED,
   1117     SEND_STALLED_BY_STREAM,
   1118     SEND_STALLED_BY_SESSION,
   1119     SEND_STALLED_BY_STREAM_AND_SESSION,
   1120   };
   1121 
   1122   SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
   1123   if (send_stalled_by_stream) {
   1124     if (send_stalled_by_session) {
   1125       frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
   1126     } else {
   1127       frame_flow_control_state = SEND_STALLED_BY_STREAM;
   1128     }
   1129   } else if (send_stalled_by_session) {
   1130     frame_flow_control_state = SEND_STALLED_BY_SESSION;
   1131   }
   1132 
   1133   if (flow_control_state_ == FLOW_CONTROL_STREAM) {
   1134     UMA_HISTOGRAM_ENUMERATION(
   1135         "Net.SpdyFrameStreamFlowControlState",
   1136         frame_flow_control_state,
   1137         SEND_STALLED_BY_STREAM + 1);
   1138   } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   1139     UMA_HISTOGRAM_ENUMERATION(
   1140         "Net.SpdyFrameStreamAndSessionFlowControlState",
   1141         frame_flow_control_state,
   1142         SEND_STALLED_BY_STREAM_AND_SESSION + 1);
   1143   }
   1144 
   1145   // Obey send window size of the stream if stream flow control is
   1146   // enabled.
   1147   if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
   1148     if (send_stalled_by_stream) {
   1149       stream->set_send_stalled_by_flow_control(true);
   1150       // Even though we're currently stalled only by the stream, we
   1151       // might end up being stalled by the session also.
   1152       QueueSendStalledStream(*stream);
   1153       net_log().AddEvent(
   1154           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
   1155           NetLog::IntegerCallback("stream_id", stream_id));
   1156       return scoped_ptr<SpdyBuffer>();
   1157     }
   1158 
   1159     effective_len = std::min(effective_len, stream->send_window_size());
   1160   }
   1161 
   1162   // Obey send window size of the session if session flow control is
   1163   // enabled.
   1164   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   1165     if (send_stalled_by_session) {
   1166       stream->set_send_stalled_by_flow_control(true);
   1167       QueueSendStalledStream(*stream);
   1168       net_log().AddEvent(
   1169           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
   1170           NetLog::IntegerCallback("stream_id", stream_id));
   1171       return scoped_ptr<SpdyBuffer>();
   1172     }
   1173 
   1174     effective_len = std::min(effective_len, session_send_window_size_);
   1175   }
   1176 
   1177   DCHECK_GE(effective_len, 0);
   1178 
   1179   // Clear FIN flag if only some of the data will be in the data
   1180   // frame.
   1181   if (effective_len < len)
   1182     flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
   1183 
   1184   if (net_log().IsLogging()) {
   1185     net_log().AddEvent(
   1186         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
   1187         base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
   1188                    (flags & DATA_FLAG_FIN) != 0));
   1189   }
   1190 
   1191   // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
   1192   if (effective_len > 0)
   1193     SendPrefacePingIfNoneInFlight();
   1194 
   1195   // TODO(mbelshe): reduce memory copies here.
   1196   DCHECK(buffered_spdy_framer_.get());
   1197   scoped_ptr<SpdyFrame> frame(
   1198       buffered_spdy_framer_->CreateDataFrame(
   1199           stream_id, data->data(),
   1200           static_cast<uint32>(effective_len), flags));
   1201 
   1202   scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
   1203 
   1204   // Send window size is based on payload size, so nothing to do if this is
   1205   // just a FIN with no payload.
   1206   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION &&
   1207       effective_len != 0) {
   1208     DecreaseSendWindowSize(static_cast<int32>(effective_len));
   1209     data_buffer->AddConsumeCallback(
   1210         base::Bind(&SpdySession::OnWriteBufferConsumed,
   1211                    weak_factory_.GetWeakPtr(),
   1212                    static_cast<size_t>(effective_len)));
   1213   }
   1214 
   1215   return data_buffer.Pass();
   1216 }
   1217 
   1218 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
   1219   DCHECK_NE(stream_id, 0u);
   1220 
   1221   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1222   if (it == active_streams_.end()) {
   1223     NOTREACHED();
   1224     return;
   1225   }
   1226 
   1227   CloseActiveStreamIterator(it, status);
   1228 }
   1229 
   1230 void SpdySession::CloseCreatedStream(
   1231     const base::WeakPtr<SpdyStream>& stream, int status) {
   1232   DCHECK_EQ(stream->stream_id(), 0u);
   1233 
   1234   CreatedStreamSet::iterator it = created_streams_.find(stream.get());
   1235   if (it == created_streams_.end()) {
   1236     NOTREACHED();
   1237     return;
   1238   }
   1239 
   1240   CloseCreatedStreamIterator(it, status);
   1241 }
   1242 
   1243 void SpdySession::ResetStream(SpdyStreamId stream_id,
   1244                               SpdyRstStreamStatus status,
   1245                               const std::string& description) {
   1246   DCHECK_NE(stream_id, 0u);
   1247 
   1248   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1249   if (it == active_streams_.end()) {
   1250     NOTREACHED();
   1251     return;
   1252   }
   1253 
   1254   ResetStreamIterator(it, status, description);
   1255 }
   1256 
   1257 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
   1258   return ContainsKey(active_streams_, stream_id);
   1259 }
   1260 
   1261 LoadState SpdySession::GetLoadState() const {
   1262   // Just report that we're idle since the session could be doing
   1263   // many things concurrently.
   1264   return LOAD_STATE_IDLE;
   1265 }
   1266 
   1267 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
   1268                                             int status) {
   1269   // TODO(mbelshe): We should send a RST_STREAM control frame here
   1270   //                so that the server can cancel a large send.
   1271 
   1272   scoped_ptr<SpdyStream> owned_stream(it->second.stream);
   1273   active_streams_.erase(it);
   1274 
   1275   // TODO(akalin): When SpdyStream was ref-counted (and
   1276   // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
   1277   // was only done when status was not OK. This meant that pushed
   1278   // streams can still be claimed after they're closed. This is
   1279   // probably something that we still want to support, although server
   1280   // push is hardly used. Write tests for this and fix this. (See
   1281   // http://crbug.com/261712 .)
   1282   if (owned_stream->type() == SPDY_PUSH_STREAM) {
   1283     unclaimed_pushed_streams_.erase(owned_stream->url());
   1284     num_pushed_streams_--;
   1285     if (!owned_stream->IsReservedRemote())
   1286       num_active_pushed_streams_--;
   1287   }
   1288 
   1289   DeleteStream(owned_stream.Pass(), status);
   1290   MaybeFinishGoingAway();
   1291 
   1292   // If there are no active streams and the socket pool is stalled, close the
   1293   // session to free up a socket slot.
   1294   if (active_streams_.empty() && connection_->IsPoolStalled()) {
   1295     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
   1296   }
   1297 }
   1298 
   1299 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
   1300                                              int status) {
   1301   scoped_ptr<SpdyStream> owned_stream(*it);
   1302   created_streams_.erase(it);
   1303   DeleteStream(owned_stream.Pass(), status);
   1304 }
   1305 
   1306 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
   1307                                       SpdyRstStreamStatus status,
   1308                                       const std::string& description) {
   1309   // Send the RST_STREAM frame first as CloseActiveStreamIterator()
   1310   // may close us.
   1311   SpdyStreamId stream_id = it->first;
   1312   RequestPriority priority = it->second.stream->priority();
   1313   EnqueueResetStreamFrame(stream_id, priority, status, description);
   1314 
   1315   // Removes any pending writes for the stream except for possibly an
   1316   // in-flight one.
   1317   CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
   1318 }
   1319 
   1320 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
   1321                                           RequestPriority priority,
   1322                                           SpdyRstStreamStatus status,
   1323                                           const std::string& description) {
   1324   DCHECK_NE(stream_id, 0u);
   1325 
   1326   net_log().AddEvent(
   1327       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
   1328       base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
   1329 
   1330   DCHECK(buffered_spdy_framer_.get());
   1331   scoped_ptr<SpdyFrame> rst_frame(
   1332       buffered_spdy_framer_->CreateRstStream(stream_id, status));
   1333 
   1334   EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
   1335   RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
   1336 }
   1337 
   1338 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
   1339   CHECK(!in_io_loop_);
   1340   if (availability_state_ == STATE_DRAINING) {
   1341     return;
   1342   }
   1343   ignore_result(DoReadLoop(expected_read_state, result));
   1344 }
   1345 
   1346 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
   1347   CHECK(!in_io_loop_);
   1348   CHECK_EQ(read_state_, expected_read_state);
   1349 
   1350   in_io_loop_ = true;
   1351 
   1352   int bytes_read_without_yielding = 0;
   1353 
   1354   // Loop until the session is draining, the read becomes blocked, or
   1355   // the read limit is exceeded.
   1356   while (true) {
   1357     switch (read_state_) {
   1358       case READ_STATE_DO_READ:
   1359         CHECK_EQ(result, OK);
   1360         result = DoRead();
   1361         break;
   1362       case READ_STATE_DO_READ_COMPLETE:
   1363         if (result > 0)
   1364           bytes_read_without_yielding += result;
   1365         result = DoReadComplete(result);
   1366         break;
   1367       default:
   1368         NOTREACHED() << "read_state_: " << read_state_;
   1369         break;
   1370     }
   1371 
   1372     if (availability_state_ == STATE_DRAINING)
   1373       break;
   1374 
   1375     if (result == ERR_IO_PENDING)
   1376       break;
   1377 
   1378     if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
   1379       read_state_ = READ_STATE_DO_READ;
   1380       base::MessageLoop::current()->PostTask(
   1381           FROM_HERE,
   1382           base::Bind(&SpdySession::PumpReadLoop,
   1383                      weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
   1384       result = ERR_IO_PENDING;
   1385       break;
   1386     }
   1387   }
   1388 
   1389   CHECK(in_io_loop_);
   1390   in_io_loop_ = false;
   1391 
   1392   return result;
   1393 }
   1394 
   1395 int SpdySession::DoRead() {
   1396   CHECK(in_io_loop_);
   1397 
   1398   CHECK(connection_);
   1399   CHECK(connection_->socket());
   1400   read_state_ = READ_STATE_DO_READ_COMPLETE;
   1401   return connection_->socket()->Read(
   1402       read_buffer_.get(),
   1403       kReadBufferSize,
   1404       base::Bind(&SpdySession::PumpReadLoop,
   1405                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
   1406 }
   1407 
   1408 int SpdySession::DoReadComplete(int result) {
   1409   CHECK(in_io_loop_);
   1410 
   1411   // Parse a frame.  For now this code requires that the frame fit into our
   1412   // buffer (kReadBufferSize).
   1413   // TODO(mbelshe): support arbitrarily large frames!
   1414 
   1415   if (result == 0) {
   1416     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
   1417                                 total_bytes_received_, 1, 100000000, 50);
   1418     DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
   1419 
   1420     return ERR_CONNECTION_CLOSED;
   1421   }
   1422 
   1423   if (result < 0) {
   1424     DoDrainSession(static_cast<Error>(result), "result is < 0.");
   1425     return result;
   1426   }
   1427   CHECK_LE(result, kReadBufferSize);
   1428   total_bytes_received_ += result;
   1429 
   1430   last_activity_time_ = time_func_();
   1431 
   1432   DCHECK(buffered_spdy_framer_.get());
   1433   char* data = read_buffer_->data();
   1434   while (result > 0) {
   1435     uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
   1436     result -= bytes_processed;
   1437     data += bytes_processed;
   1438 
   1439     if (availability_state_ == STATE_DRAINING) {
   1440       return ERR_CONNECTION_CLOSED;
   1441     }
   1442 
   1443     DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
   1444   }
   1445 
   1446   read_state_ = READ_STATE_DO_READ;
   1447   return OK;
   1448 }
   1449 
   1450 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
   1451   CHECK(!in_io_loop_);
   1452   DCHECK_EQ(write_state_, expected_write_state);
   1453 
   1454   DoWriteLoop(expected_write_state, result);
   1455 
   1456   if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
   1457       write_queue_.IsEmpty()) {
   1458     pool_->RemoveUnavailableSession(GetWeakPtr());  // Destroys |this|.
   1459     return;
   1460   }
   1461 }
   1462 
   1463 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
   1464   CHECK(!in_io_loop_);
   1465   DCHECK_NE(write_state_, WRITE_STATE_IDLE);
   1466   DCHECK_EQ(write_state_, expected_write_state);
   1467 
   1468   in_io_loop_ = true;
   1469 
   1470   // Loop until the session is closed or the write becomes blocked.
   1471   while (true) {
   1472     switch (write_state_) {
   1473       case WRITE_STATE_DO_WRITE:
   1474         DCHECK_EQ(result, OK);
   1475         result = DoWrite();
   1476         break;
   1477       case WRITE_STATE_DO_WRITE_COMPLETE:
   1478         result = DoWriteComplete(result);
   1479         break;
   1480       case WRITE_STATE_IDLE:
   1481       default:
   1482         NOTREACHED() << "write_state_: " << write_state_;
   1483         break;
   1484     }
   1485 
   1486     if (write_state_ == WRITE_STATE_IDLE) {
   1487       DCHECK_EQ(result, ERR_IO_PENDING);
   1488       break;
   1489     }
   1490 
   1491     if (result == ERR_IO_PENDING)
   1492       break;
   1493   }
   1494 
   1495   CHECK(in_io_loop_);
   1496   in_io_loop_ = false;
   1497 
   1498   return result;
   1499 }
   1500 
   1501 int SpdySession::DoWrite() {
   1502   CHECK(in_io_loop_);
   1503 
   1504   DCHECK(buffered_spdy_framer_);
   1505   if (in_flight_write_) {
   1506     DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
   1507   } else {
   1508     // Grab the next frame to send.
   1509     SpdyFrameType frame_type = DATA;
   1510     scoped_ptr<SpdyBufferProducer> producer;
   1511     base::WeakPtr<SpdyStream> stream;
   1512     if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
   1513       write_state_ = WRITE_STATE_IDLE;
   1514       return ERR_IO_PENDING;
   1515     }
   1516 
   1517     if (stream.get())
   1518       CHECK(!stream->IsClosed());
   1519 
   1520     // Activate the stream only when sending the SYN_STREAM frame to
   1521     // guarantee monotonically-increasing stream IDs.
   1522     if (frame_type == SYN_STREAM) {
   1523       CHECK(stream.get());
   1524       CHECK_EQ(stream->stream_id(), 0u);
   1525       scoped_ptr<SpdyStream> owned_stream =
   1526           ActivateCreatedStream(stream.get());
   1527       InsertActivatedStream(owned_stream.Pass());
   1528 
   1529       if (stream_hi_water_mark_ > kLastStreamId) {
   1530         CHECK_EQ(stream->stream_id(), kLastStreamId);
   1531         // We've exhausted the stream ID space, and no new streams may be
   1532         // created after this one.
   1533         MakeUnavailable();
   1534         StartGoingAway(kLastStreamId, ERR_ABORTED);
   1535       }
   1536     }
   1537 
   1538     in_flight_write_ = producer->ProduceBuffer();
   1539     if (!in_flight_write_) {
   1540       NOTREACHED();
   1541       return ERR_UNEXPECTED;
   1542     }
   1543     in_flight_write_frame_type_ = frame_type;
   1544     in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
   1545     DCHECK_GE(in_flight_write_frame_size_,
   1546               buffered_spdy_framer_->GetFrameMinimumSize());
   1547     in_flight_write_stream_ = stream;
   1548   }
   1549 
   1550   write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
   1551 
   1552   // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
   1553   // with Socket implementations that don't store their IOBuffer
   1554   // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
   1555   scoped_refptr<IOBuffer> write_io_buffer =
   1556       in_flight_write_->GetIOBufferForRemainingData();
   1557   return connection_->socket()->Write(
   1558       write_io_buffer.get(),
   1559       in_flight_write_->GetRemainingSize(),
   1560       base::Bind(&SpdySession::PumpWriteLoop,
   1561                  weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
   1562 }
   1563 
   1564 int SpdySession::DoWriteComplete(int result) {
   1565   CHECK(in_io_loop_);
   1566   DCHECK_NE(result, ERR_IO_PENDING);
   1567   DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
   1568 
   1569   last_activity_time_ = time_func_();
   1570 
   1571   if (result < 0) {
   1572     DCHECK_NE(result, ERR_IO_PENDING);
   1573     in_flight_write_.reset();
   1574     in_flight_write_frame_type_ = DATA;
   1575     in_flight_write_frame_size_ = 0;
   1576     in_flight_write_stream_.reset();
   1577     write_state_ = WRITE_STATE_DO_WRITE;
   1578     DoDrainSession(static_cast<Error>(result), "Write error");
   1579     return OK;
   1580   }
   1581 
   1582   // It should not be possible to have written more bytes than our
   1583   // in_flight_write_.
   1584   DCHECK_LE(static_cast<size_t>(result),
   1585             in_flight_write_->GetRemainingSize());
   1586 
   1587   if (result > 0) {
   1588     in_flight_write_->Consume(static_cast<size_t>(result));
   1589 
   1590     // We only notify the stream when we've fully written the pending frame.
   1591     if (in_flight_write_->GetRemainingSize() == 0) {
   1592       // It is possible that the stream was cancelled while we were
   1593       // writing to the socket.
   1594       if (in_flight_write_stream_.get()) {
   1595         DCHECK_GT(in_flight_write_frame_size_, 0u);
   1596         in_flight_write_stream_->OnFrameWriteComplete(
   1597             in_flight_write_frame_type_,
   1598             in_flight_write_frame_size_);
   1599       }
   1600 
   1601       // Cleanup the write which just completed.
   1602       in_flight_write_.reset();
   1603       in_flight_write_frame_type_ = DATA;
   1604       in_flight_write_frame_size_ = 0;
   1605       in_flight_write_stream_.reset();
   1606     }
   1607   }
   1608 
   1609   write_state_ = WRITE_STATE_DO_WRITE;
   1610   return OK;
   1611 }
   1612 
   1613 void SpdySession::DcheckGoingAway() const {
   1614 #if DCHECK_IS_ON
   1615   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
   1616   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
   1617     DCHECK(pending_create_stream_queues_[i].empty());
   1618   }
   1619   DCHECK(created_streams_.empty());
   1620 #endif
   1621 }
   1622 
   1623 void SpdySession::DcheckDraining() const {
   1624   DcheckGoingAway();
   1625   DCHECK_EQ(availability_state_, STATE_DRAINING);
   1626   DCHECK(active_streams_.empty());
   1627   DCHECK(unclaimed_pushed_streams_.empty());
   1628 }
   1629 
   1630 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
   1631                                  Error status) {
   1632   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
   1633 
   1634   // The loops below are carefully written to avoid reentrancy problems.
   1635 
   1636   while (true) {
   1637     size_t old_size = GetTotalSize(pending_create_stream_queues_);
   1638     base::WeakPtr<SpdyStreamRequest> pending_request =
   1639         GetNextPendingStreamRequest();
   1640     if (!pending_request)
   1641       break;
   1642     // No new stream requests should be added while the session is
   1643     // going away.
   1644     DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
   1645     pending_request->OnRequestCompleteFailure(ERR_ABORTED);
   1646   }
   1647 
   1648   while (true) {
   1649     size_t old_size = active_streams_.size();
   1650     ActiveStreamMap::iterator it =
   1651         active_streams_.lower_bound(last_good_stream_id + 1);
   1652     if (it == active_streams_.end())
   1653       break;
   1654     LogAbandonedActiveStream(it, status);
   1655     CloseActiveStreamIterator(it, status);
   1656     // No new streams should be activated while the session is going
   1657     // away.
   1658     DCHECK_GT(old_size, active_streams_.size());
   1659   }
   1660 
   1661   while (!created_streams_.empty()) {
   1662     size_t old_size = created_streams_.size();
   1663     CreatedStreamSet::iterator it = created_streams_.begin();
   1664     LogAbandonedStream(*it, status);
   1665     CloseCreatedStreamIterator(it, status);
   1666     // No new streams should be created while the session is going
   1667     // away.
   1668     DCHECK_GT(old_size, created_streams_.size());
   1669   }
   1670 
   1671   write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
   1672 
   1673   DcheckGoingAway();
   1674 }
   1675 
   1676 void SpdySession::MaybeFinishGoingAway() {
   1677   if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
   1678     DoDrainSession(OK, "Finished going away");
   1679   }
   1680 }
   1681 
   1682 void SpdySession::DoDrainSession(Error err, const std::string& description) {
   1683   if (availability_state_ == STATE_DRAINING) {
   1684     return;
   1685   }
   1686   MakeUnavailable();
   1687 
   1688   // If |err| indicates an error occurred, inform the peer that we're closing
   1689   // and why. Don't GOAWAY on a graceful or idle close, as that may
   1690   // unnecessarily wake the radio. We could technically GOAWAY on network errors
   1691   // (we'll probably fail to actually write it, but that's okay), however many
   1692   // unit-tests would need to be updated.
   1693   if (err != OK &&
   1694       err != ERR_ABORTED &&  // Used by SpdySessionPool to close idle sessions.
   1695       err != ERR_NETWORK_CHANGED &&  // Used to deprecate sessions on IP change.
   1696       err != ERR_SOCKET_NOT_CONNECTED &&
   1697       err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) {
   1698     // Enqueue a GOAWAY to inform the peer of why we're closing the connection.
   1699     SpdyGoAwayIR goaway_ir(last_accepted_push_stream_id_,
   1700                            MapNetErrorToGoAwayStatus(err),
   1701                            description);
   1702     EnqueueSessionWrite(HIGHEST,
   1703                         GOAWAY,
   1704                         scoped_ptr<SpdyFrame>(
   1705                             buffered_spdy_framer_->SerializeFrame(goaway_ir)));
   1706   }
   1707 
   1708   availability_state_ = STATE_DRAINING;
   1709   error_on_close_ = err;
   1710 
   1711   net_log_.AddEvent(
   1712       NetLog::TYPE_SPDY_SESSION_CLOSE,
   1713       base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
   1714 
   1715   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
   1716   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
   1717                               total_bytes_received_, 1, 100000000, 50);
   1718 
   1719   if (err == OK) {
   1720     // We ought to be going away already, as this is a graceful close.
   1721     DcheckGoingAway();
   1722   } else {
   1723     StartGoingAway(0, err);
   1724   }
   1725   DcheckDraining();
   1726   MaybePostWriteLoop();
   1727 }
   1728 
   1729 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
   1730   DCHECK(stream);
   1731   std::string description = base::StringPrintf(
   1732       "ABANDONED (stream_id=%d): ", stream->stream_id()) +
   1733       stream->url().spec();
   1734   stream->LogStreamError(status, description);
   1735   // We don't increment the streams abandoned counter here. If the
   1736   // stream isn't active (i.e., it hasn't written anything to the wire
   1737   // yet) then it's as if it never existed. If it is active, then
   1738   // LogAbandonedActiveStream() will increment the counters.
   1739 }
   1740 
   1741 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
   1742                                            Error status) {
   1743   DCHECK_GT(it->first, 0u);
   1744   LogAbandonedStream(it->second.stream, status);
   1745   ++streams_abandoned_count_;
   1746   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
   1747   abandoned_streams.Increment();
   1748   if (it->second.stream->type() == SPDY_PUSH_STREAM &&
   1749       unclaimed_pushed_streams_.find(it->second.stream->url()) !=
   1750       unclaimed_pushed_streams_.end()) {
   1751     base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
   1752     abandoned_push_streams.Increment();
   1753   }
   1754 }
   1755 
   1756 SpdyStreamId SpdySession::GetNewStreamId() {
   1757   CHECK_LE(stream_hi_water_mark_, kLastStreamId);
   1758   SpdyStreamId id = stream_hi_water_mark_;
   1759   stream_hi_water_mark_ += 2;
   1760   return id;
   1761 }
   1762 
   1763 void SpdySession::CloseSessionOnError(Error err,
   1764                                       const std::string& description) {
   1765   DCHECK_LT(err, ERR_IO_PENDING);
   1766   DoDrainSession(err, description);
   1767 }
   1768 
   1769 void SpdySession::MakeUnavailable() {
   1770   if (availability_state_ == STATE_AVAILABLE) {
   1771     availability_state_ = STATE_GOING_AWAY;
   1772     pool_->MakeSessionUnavailable(GetWeakPtr());
   1773   }
   1774 }
   1775 
   1776 base::Value* SpdySession::GetInfoAsValue() const {
   1777   base::DictionaryValue* dict = new base::DictionaryValue();
   1778 
   1779   dict->SetInteger("source_id", net_log_.source().id);
   1780 
   1781   dict->SetString("host_port_pair", host_port_pair().ToString());
   1782   if (!pooled_aliases_.empty()) {
   1783     base::ListValue* alias_list = new base::ListValue();
   1784     for (std::set<SpdySessionKey>::const_iterator it =
   1785              pooled_aliases_.begin();
   1786          it != pooled_aliases_.end(); it++) {
   1787       alias_list->Append(new base::StringValue(
   1788           it->host_port_pair().ToString()));
   1789     }
   1790     dict->Set("aliases", alias_list);
   1791   }
   1792   dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
   1793 
   1794   dict->SetInteger("active_streams", active_streams_.size());
   1795 
   1796   dict->SetInteger("unclaimed_pushed_streams",
   1797                    unclaimed_pushed_streams_.size());
   1798 
   1799   dict->SetBoolean("is_secure", is_secure_);
   1800 
   1801   dict->SetString("protocol_negotiated",
   1802                   SSLClientSocket::NextProtoToString(
   1803                       connection_->socket()->GetNegotiatedProtocol()));
   1804 
   1805   dict->SetInteger("error", error_on_close_);
   1806   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
   1807 
   1808   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
   1809   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
   1810   dict->SetInteger("streams_pushed_and_claimed_count",
   1811       streams_pushed_and_claimed_count_);
   1812   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
   1813   DCHECK(buffered_spdy_framer_.get());
   1814   dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
   1815 
   1816   dict->SetBoolean("sent_settings", sent_settings_);
   1817   dict->SetBoolean("received_settings", received_settings_);
   1818 
   1819   dict->SetInteger("send_window_size", session_send_window_size_);
   1820   dict->SetInteger("recv_window_size", session_recv_window_size_);
   1821   dict->SetInteger("unacked_recv_window_bytes",
   1822                    session_unacked_recv_window_bytes_);
   1823   return dict;
   1824 }
   1825 
   1826 bool SpdySession::IsReused() const {
   1827   return buffered_spdy_framer_->frames_received() > 0 ||
   1828       connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
   1829 }
   1830 
   1831 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
   1832                                     LoadTimingInfo* load_timing_info) const {
   1833   return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
   1834                                         load_timing_info);
   1835 }
   1836 
   1837 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
   1838   int rv = ERR_SOCKET_NOT_CONNECTED;
   1839   if (connection_->socket()) {
   1840     rv = connection_->socket()->GetPeerAddress(address);
   1841   }
   1842 
   1843   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
   1844                         rv == ERR_SOCKET_NOT_CONNECTED);
   1845 
   1846   return rv;
   1847 }
   1848 
   1849 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
   1850   int rv = ERR_SOCKET_NOT_CONNECTED;
   1851   if (connection_->socket()) {
   1852     rv = connection_->socket()->GetLocalAddress(address);
   1853   }
   1854 
   1855   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
   1856                         rv == ERR_SOCKET_NOT_CONNECTED);
   1857 
   1858   return rv;
   1859 }
   1860 
   1861 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
   1862                                       SpdyFrameType frame_type,
   1863                                       scoped_ptr<SpdyFrame> frame) {
   1864   DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS ||
   1865          frame_type == WINDOW_UPDATE || frame_type == PING ||
   1866          frame_type == GOAWAY);
   1867   EnqueueWrite(
   1868       priority, frame_type,
   1869       scoped_ptr<SpdyBufferProducer>(
   1870           new SimpleBufferProducer(
   1871               scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
   1872       base::WeakPtr<SpdyStream>());
   1873 }
   1874 
   1875 void SpdySession::EnqueueWrite(RequestPriority priority,
   1876                                SpdyFrameType frame_type,
   1877                                scoped_ptr<SpdyBufferProducer> producer,
   1878                                const base::WeakPtr<SpdyStream>& stream) {
   1879   if (availability_state_ == STATE_DRAINING)
   1880     return;
   1881 
   1882   write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
   1883   MaybePostWriteLoop();
   1884 }
   1885 
   1886 void SpdySession::MaybePostWriteLoop() {
   1887   if (write_state_ == WRITE_STATE_IDLE) {
   1888     CHECK(!in_flight_write_);
   1889     write_state_ = WRITE_STATE_DO_WRITE;
   1890     base::MessageLoop::current()->PostTask(
   1891         FROM_HERE,
   1892         base::Bind(&SpdySession::PumpWriteLoop,
   1893                    weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
   1894   }
   1895 }
   1896 
   1897 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
   1898   CHECK_EQ(stream->stream_id(), 0u);
   1899   CHECK(created_streams_.find(stream.get()) == created_streams_.end());
   1900   created_streams_.insert(stream.release());
   1901 }
   1902 
   1903 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
   1904   CHECK_EQ(stream->stream_id(), 0u);
   1905   CHECK(created_streams_.find(stream) != created_streams_.end());
   1906   stream->set_stream_id(GetNewStreamId());
   1907   scoped_ptr<SpdyStream> owned_stream(stream);
   1908   created_streams_.erase(stream);
   1909   return owned_stream.Pass();
   1910 }
   1911 
   1912 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
   1913   SpdyStreamId stream_id = stream->stream_id();
   1914   CHECK_NE(stream_id, 0u);
   1915   std::pair<ActiveStreamMap::iterator, bool> result =
   1916       active_streams_.insert(
   1917           std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
   1918   CHECK(result.second);
   1919   ignore_result(stream.release());
   1920 }
   1921 
   1922 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
   1923   if (in_flight_write_stream_.get() == stream.get()) {
   1924     // If we're deleting the stream for the in-flight write, we still
   1925     // need to let the write complete, so we clear
   1926     // |in_flight_write_stream_| and let the write finish on its own
   1927     // without notifying |in_flight_write_stream_|.
   1928     in_flight_write_stream_.reset();
   1929   }
   1930 
   1931   write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
   1932   stream->OnClose(status);
   1933 
   1934   if (availability_state_ == STATE_AVAILABLE) {
   1935     ProcessPendingStreamRequests();
   1936   }
   1937 }
   1938 
   1939 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
   1940   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
   1941 
   1942   PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
   1943   if (unclaimed_it == unclaimed_pushed_streams_.end())
   1944     return base::WeakPtr<SpdyStream>();
   1945 
   1946   SpdyStreamId stream_id = unclaimed_it->second.stream_id;
   1947   unclaimed_pushed_streams_.erase(unclaimed_it);
   1948 
   1949   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
   1950   if (active_it == active_streams_.end()) {
   1951     NOTREACHED();
   1952     return base::WeakPtr<SpdyStream>();
   1953   }
   1954 
   1955   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
   1956   used_push_streams.Increment();
   1957   return active_it->second.stream->GetWeakPtr();
   1958 }
   1959 
   1960 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
   1961                              bool* was_npn_negotiated,
   1962                              NextProto* protocol_negotiated) {
   1963   *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
   1964   *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
   1965   return connection_->socket()->GetSSLInfo(ssl_info);
   1966 }
   1967 
   1968 bool SpdySession::GetSSLCertRequestInfo(
   1969     SSLCertRequestInfo* cert_request_info) {
   1970   if (!is_secure_)
   1971     return false;
   1972   GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
   1973   return true;
   1974 }
   1975 
   1976 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
   1977   CHECK(in_io_loop_);
   1978 
   1979   RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
   1980   std::string description =
   1981       base::StringPrintf("Framer error: %d (%s).",
   1982                          error_code,
   1983                          SpdyFramer::ErrorCodeToString(error_code));
   1984   DoDrainSession(MapFramerErrorToNetError(error_code), description);
   1985 }
   1986 
   1987 void SpdySession::OnStreamError(SpdyStreamId stream_id,
   1988                                 const std::string& description) {
   1989   CHECK(in_io_loop_);
   1990 
   1991   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   1992   if (it == active_streams_.end()) {
   1993     // We still want to send a frame to reset the stream even if we
   1994     // don't know anything about it.
   1995     EnqueueResetStreamFrame(
   1996         stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
   1997     return;
   1998   }
   1999 
   2000   ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
   2001 }
   2002 
   2003 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
   2004                                     size_t length,
   2005                                     bool fin) {
   2006   CHECK(in_io_loop_);
   2007 
   2008   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2009 
   2010   // By the time data comes in, the stream may already be inactive.
   2011   if (it == active_streams_.end())
   2012     return;
   2013 
   2014   SpdyStream* stream = it->second.stream;
   2015   CHECK_EQ(stream->stream_id(), stream_id);
   2016 
   2017   DCHECK(buffered_spdy_framer_);
   2018   size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
   2019   stream->IncrementRawReceivedBytes(header_len);
   2020 }
   2021 
   2022 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
   2023                                     const char* data,
   2024                                     size_t len,
   2025                                     bool fin) {
   2026   CHECK(in_io_loop_);
   2027 
   2028   if (data == NULL && len != 0) {
   2029     // This is notification of consumed data padding.
   2030     // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
   2031     // See crbug.com/353012.
   2032     return;
   2033   }
   2034 
   2035   DCHECK_LT(len, 1u << 24);
   2036   if (net_log().IsLogging()) {
   2037     net_log().AddEvent(
   2038         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
   2039         base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
   2040   }
   2041 
   2042   // Build the buffer as early as possible so that we go through the
   2043   // session flow control checks and update
   2044   // |unacked_recv_window_bytes_| properly even when the stream is
   2045   // inactive (since the other side has still reduced its session send
   2046   // window).
   2047   scoped_ptr<SpdyBuffer> buffer;
   2048   if (data) {
   2049     DCHECK_GT(len, 0u);
   2050     CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
   2051     buffer.reset(new SpdyBuffer(data, len));
   2052 
   2053     if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   2054       DecreaseRecvWindowSize(static_cast<int32>(len));
   2055       buffer->AddConsumeCallback(
   2056           base::Bind(&SpdySession::OnReadBufferConsumed,
   2057                      weak_factory_.GetWeakPtr()));
   2058     }
   2059   } else {
   2060     DCHECK_EQ(len, 0u);
   2061   }
   2062 
   2063   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2064 
   2065   // By the time data comes in, the stream may already be inactive.
   2066   if (it == active_streams_.end())
   2067     return;
   2068 
   2069   SpdyStream* stream = it->second.stream;
   2070   CHECK_EQ(stream->stream_id(), stream_id);
   2071 
   2072   stream->IncrementRawReceivedBytes(len);
   2073 
   2074   if (it->second.waiting_for_syn_reply) {
   2075     const std::string& error = "Data received before SYN_REPLY.";
   2076     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   2077     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
   2078     return;
   2079   }
   2080 
   2081   stream->OnDataReceived(buffer.Pass());
   2082 }
   2083 
   2084 void SpdySession::OnSettings(bool clear_persisted) {
   2085   CHECK(in_io_loop_);
   2086 
   2087   if (clear_persisted)
   2088     http_server_properties_->ClearSpdySettings(host_port_pair());
   2089 
   2090   if (net_log_.IsLogging()) {
   2091     net_log_.AddEvent(
   2092         NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
   2093         base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
   2094                    clear_persisted));
   2095   }
   2096 
   2097   if (GetProtocolVersion() >= SPDY4) {
   2098     // Send an acknowledgment of the setting.
   2099     SpdySettingsIR settings_ir;
   2100     settings_ir.set_is_ack(true);
   2101     EnqueueSessionWrite(
   2102         HIGHEST,
   2103         SETTINGS,
   2104         scoped_ptr<SpdyFrame>(
   2105             buffered_spdy_framer_->SerializeFrame(settings_ir)));
   2106   }
   2107 }
   2108 
   2109 void SpdySession::OnSetting(SpdySettingsIds id,
   2110                             uint8 flags,
   2111                             uint32 value) {
   2112   CHECK(in_io_loop_);
   2113 
   2114   HandleSetting(id, value);
   2115   http_server_properties_->SetSpdySetting(
   2116       host_port_pair(),
   2117       id,
   2118       static_cast<SpdySettingsFlags>(flags),
   2119       value);
   2120   received_settings_ = true;
   2121 
   2122   // Log the setting.
   2123   const SpdyMajorVersion protocol_version = GetProtocolVersion();
   2124   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
   2125                     base::Bind(&NetLogSpdySettingCallback,
   2126                                id,
   2127                                protocol_version,
   2128                                static_cast<SpdySettingsFlags>(flags),
   2129                                value));
   2130 }
   2131 
   2132 void SpdySession::OnSendCompressedFrame(
   2133     SpdyStreamId stream_id,
   2134     SpdyFrameType type,
   2135     size_t payload_len,
   2136     size_t frame_len) {
   2137   if (type != SYN_STREAM && type != HEADERS)
   2138     return;
   2139 
   2140   DCHECK(buffered_spdy_framer_.get());
   2141   size_t compressed_len =
   2142       frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
   2143 
   2144   if (payload_len) {
   2145     // Make sure we avoid early decimal truncation.
   2146     int compression_pct = 100 - (100 * compressed_len) / payload_len;
   2147     UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
   2148                              compression_pct);
   2149   }
   2150 }
   2151 
   2152 void SpdySession::OnReceiveCompressedFrame(
   2153     SpdyStreamId stream_id,
   2154     SpdyFrameType type,
   2155     size_t frame_len) {
   2156   last_compressed_frame_len_ = frame_len;
   2157 }
   2158 
   2159 int SpdySession::OnInitialResponseHeadersReceived(
   2160     const SpdyHeaderBlock& response_headers,
   2161     base::Time response_time,
   2162     base::TimeTicks recv_first_byte_time,
   2163     SpdyStream* stream) {
   2164   CHECK(in_io_loop_);
   2165   SpdyStreamId stream_id = stream->stream_id();
   2166 
   2167   if (stream->type() == SPDY_PUSH_STREAM) {
   2168     DCHECK(stream->IsReservedRemote());
   2169     if (max_concurrent_pushed_streams_ &&
   2170         num_active_pushed_streams_ >= max_concurrent_pushed_streams_) {
   2171       ResetStream(stream_id,
   2172                   RST_STREAM_REFUSED_STREAM,
   2173                   "Stream concurrency limit reached.");
   2174       return STATUS_CODE_REFUSED_STREAM;
   2175     }
   2176   }
   2177 
   2178   if (stream->type() == SPDY_PUSH_STREAM) {
   2179     // Will be balanced in DeleteStream.
   2180     num_active_pushed_streams_++;
   2181   }
   2182 
   2183   // May invalidate |stream|.
   2184   int rv = stream->OnInitialResponseHeadersReceived(
   2185       response_headers, response_time, recv_first_byte_time);
   2186   if (rv < 0) {
   2187     DCHECK_NE(rv, ERR_IO_PENDING);
   2188     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
   2189   }
   2190 
   2191   return rv;
   2192 }
   2193 
   2194 void SpdySession::OnSynStream(SpdyStreamId stream_id,
   2195                               SpdyStreamId associated_stream_id,
   2196                               SpdyPriority priority,
   2197                               bool fin,
   2198                               bool unidirectional,
   2199                               const SpdyHeaderBlock& headers) {
   2200   CHECK(in_io_loop_);
   2201 
   2202   if (GetProtocolVersion() >= SPDY4) {
   2203     DCHECK_EQ(0u, associated_stream_id);
   2204     OnHeaders(stream_id, fin, headers);
   2205     return;
   2206   }
   2207 
   2208   base::Time response_time = base::Time::Now();
   2209   base::TimeTicks recv_first_byte_time = time_func_();
   2210 
   2211   if (net_log_.IsLogging()) {
   2212     net_log_.AddEvent(
   2213         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
   2214         base::Bind(&NetLogSpdySynStreamReceivedCallback,
   2215                    &headers, fin, unidirectional, priority,
   2216                    stream_id, associated_stream_id));
   2217   }
   2218 
   2219   // Split headers to simulate push promise and response.
   2220   SpdyHeaderBlock request_headers;
   2221   SpdyHeaderBlock response_headers;
   2222   SplitPushedHeadersToRequestAndResponse(
   2223       headers, GetProtocolVersion(), &request_headers, &response_headers);
   2224 
   2225   if (!TryCreatePushStream(
   2226           stream_id, associated_stream_id, priority, request_headers))
   2227     return;
   2228 
   2229   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
   2230   if (active_it == active_streams_.end()) {
   2231     NOTREACHED();
   2232     return;
   2233   }
   2234 
   2235   if (OnInitialResponseHeadersReceived(response_headers,
   2236                                        response_time,
   2237                                        recv_first_byte_time,
   2238                                        active_it->second.stream) != OK)
   2239     return;
   2240 
   2241   base::StatsCounter push_requests("spdy.pushed_streams");
   2242   push_requests.Increment();
   2243 }
   2244 
   2245 void SpdySession::DeleteExpiredPushedStreams() {
   2246   if (unclaimed_pushed_streams_.empty())
   2247     return;
   2248 
   2249   // Check that adequate time has elapsed since the last sweep.
   2250   if (time_func_() < next_unclaimed_push_stream_sweep_time_)
   2251     return;
   2252 
   2253   // Gather old streams to delete.
   2254   base::TimeTicks minimum_freshness = time_func_() -
   2255       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
   2256   std::vector<SpdyStreamId> streams_to_close;
   2257   for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
   2258        it != unclaimed_pushed_streams_.end(); ++it) {
   2259     if (minimum_freshness > it->second.creation_time)
   2260       streams_to_close.push_back(it->second.stream_id);
   2261   }
   2262 
   2263   for (std::vector<SpdyStreamId>::const_iterator to_close_it =
   2264            streams_to_close.begin();
   2265        to_close_it != streams_to_close.end(); ++to_close_it) {
   2266     ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
   2267     if (active_it == active_streams_.end())
   2268       continue;
   2269 
   2270     LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
   2271     // CloseActiveStreamIterator() will remove the stream from
   2272     // |unclaimed_pushed_streams_|.
   2273     ResetStreamIterator(
   2274         active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
   2275   }
   2276 
   2277   next_unclaimed_push_stream_sweep_time_ = time_func_() +
   2278       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
   2279 }
   2280 
   2281 void SpdySession::OnSynReply(SpdyStreamId stream_id,
   2282                              bool fin,
   2283                              const SpdyHeaderBlock& headers) {
   2284   CHECK(in_io_loop_);
   2285 
   2286   base::Time response_time = base::Time::Now();
   2287   base::TimeTicks recv_first_byte_time = time_func_();
   2288 
   2289   if (net_log().IsLogging()) {
   2290     net_log().AddEvent(
   2291         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
   2292         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
   2293                    &headers, fin, stream_id));
   2294   }
   2295 
   2296   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2297   if (it == active_streams_.end()) {
   2298     // NOTE:  it may just be that the stream was cancelled.
   2299     return;
   2300   }
   2301 
   2302   SpdyStream* stream = it->second.stream;
   2303   CHECK_EQ(stream->stream_id(), stream_id);
   2304 
   2305   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
   2306   last_compressed_frame_len_ = 0;
   2307 
   2308   if (GetProtocolVersion() >= SPDY4) {
   2309     const std::string& error =
   2310         "SPDY4 wasn't expecting SYN_REPLY.";
   2311     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   2312     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
   2313     return;
   2314   }
   2315   if (!it->second.waiting_for_syn_reply) {
   2316     const std::string& error =
   2317         "Received duplicate SYN_REPLY for stream.";
   2318     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   2319     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
   2320     return;
   2321   }
   2322   it->second.waiting_for_syn_reply = false;
   2323 
   2324   ignore_result(OnInitialResponseHeadersReceived(
   2325       headers, response_time, recv_first_byte_time, stream));
   2326 }
   2327 
   2328 void SpdySession::OnHeaders(SpdyStreamId stream_id,
   2329                             bool fin,
   2330                             const SpdyHeaderBlock& headers) {
   2331   CHECK(in_io_loop_);
   2332 
   2333   if (net_log().IsLogging()) {
   2334     net_log().AddEvent(
   2335         NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
   2336         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
   2337                    &headers, fin, stream_id));
   2338   }
   2339 
   2340   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2341   if (it == active_streams_.end()) {
   2342     // NOTE:  it may just be that the stream was cancelled.
   2343     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
   2344     return;
   2345   }
   2346 
   2347   SpdyStream* stream = it->second.stream;
   2348   CHECK_EQ(stream->stream_id(), stream_id);
   2349 
   2350   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
   2351   last_compressed_frame_len_ = 0;
   2352 
   2353   base::Time response_time = base::Time::Now();
   2354   base::TimeTicks recv_first_byte_time = time_func_();
   2355 
   2356   if (it->second.waiting_for_syn_reply) {
   2357     if (GetProtocolVersion() < SPDY4) {
   2358       const std::string& error =
   2359           "Was expecting SYN_REPLY, not HEADERS.";
   2360       stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
   2361       ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
   2362       return;
   2363     }
   2364 
   2365     it->second.waiting_for_syn_reply = false;
   2366     ignore_result(OnInitialResponseHeadersReceived(
   2367         headers, response_time, recv_first_byte_time, stream));
   2368   } else if (it->second.stream->IsReservedRemote()) {
   2369     ignore_result(OnInitialResponseHeadersReceived(
   2370         headers, response_time, recv_first_byte_time, stream));
   2371   } else {
   2372     int rv = stream->OnAdditionalResponseHeadersReceived(headers);
   2373     if (rv < 0) {
   2374       DCHECK_NE(rv, ERR_IO_PENDING);
   2375       DCHECK(active_streams_.find(stream_id) == active_streams_.end());
   2376     }
   2377   }
   2378 }
   2379 
   2380 bool SpdySession::OnUnknownFrame(SpdyStreamId stream_id, int frame_type) {
   2381   // Validate stream id.
   2382   // Was the frame sent on a stream id that has not been used in this session?
   2383   if (stream_id % 2 == 1 && stream_id > stream_hi_water_mark_)
   2384     return false;
   2385 
   2386   if (stream_id % 2 == 0 && stream_id > last_accepted_push_stream_id_)
   2387     return false;
   2388 
   2389   return true;
   2390 }
   2391 
   2392 void SpdySession::OnRstStream(SpdyStreamId stream_id,
   2393                               SpdyRstStreamStatus status) {
   2394   CHECK(in_io_loop_);
   2395 
   2396   std::string description;
   2397   net_log().AddEvent(
   2398       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
   2399       base::Bind(&NetLogSpdyRstCallback,
   2400                  stream_id, status, &description));
   2401 
   2402   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2403   if (it == active_streams_.end()) {
   2404     // NOTE:  it may just be that the stream was cancelled.
   2405     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
   2406     return;
   2407   }
   2408 
   2409   CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2410 
   2411   if (status == 0) {
   2412     it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
   2413   } else if (status == RST_STREAM_REFUSED_STREAM) {
   2414     CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
   2415   } else {
   2416     RecordProtocolErrorHistogram(
   2417         PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
   2418     it->second.stream->LogStreamError(
   2419         ERR_SPDY_PROTOCOL_ERROR,
   2420         base::StringPrintf("SPDY stream closed with status: %d", status));
   2421     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
   2422     //                For now, it doesn't matter much - it is a protocol error.
   2423     CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
   2424   }
   2425 }
   2426 
   2427 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
   2428                            SpdyGoAwayStatus status) {
   2429   CHECK(in_io_loop_);
   2430 
   2431   // TODO(jgraettinger): UMA histogram on |status|.
   2432 
   2433   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
   2434       base::Bind(&NetLogSpdyGoAwayCallback,
   2435                  last_accepted_stream_id,
   2436                  active_streams_.size(),
   2437                  unclaimed_pushed_streams_.size(),
   2438                  status));
   2439   MakeUnavailable();
   2440   StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
   2441   // This is to handle the case when we already don't have any active
   2442   // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
   2443   // active streams and so the last one being closed will finish the
   2444   // going away process (see DeleteStream()).
   2445   MaybeFinishGoingAway();
   2446 }
   2447 
   2448 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
   2449   CHECK(in_io_loop_);
   2450 
   2451   net_log_.AddEvent(
   2452       NetLog::TYPE_SPDY_SESSION_PING,
   2453       base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
   2454 
   2455   // Send response to a PING from server.
   2456   if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
   2457       (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
   2458     WritePingFrame(unique_id, true);
   2459     return;
   2460   }
   2461 
   2462   --pings_in_flight_;
   2463   if (pings_in_flight_ < 0) {
   2464     RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
   2465     DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
   2466     pings_in_flight_ = 0;
   2467     return;
   2468   }
   2469 
   2470   if (pings_in_flight_ > 0)
   2471     return;
   2472 
   2473   // We will record RTT in histogram when there are no more client sent
   2474   // pings_in_flight_.
   2475   RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
   2476 }
   2477 
   2478 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
   2479                                  uint32 delta_window_size) {
   2480   CHECK(in_io_loop_);
   2481 
   2482   DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
   2483   net_log_.AddEvent(
   2484       NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
   2485       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
   2486                  stream_id, delta_window_size));
   2487 
   2488   if (stream_id == kSessionFlowControlStreamId) {
   2489     // WINDOW_UPDATE for the session.
   2490     if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
   2491       LOG(WARNING) << "Received WINDOW_UPDATE for session when "
   2492                    << "session flow control is not turned on";
   2493       // TODO(akalin): Record an error and close the session.
   2494       return;
   2495     }
   2496 
   2497     if (delta_window_size < 1u) {
   2498       RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
   2499       DoDrainSession(
   2500           ERR_SPDY_PROTOCOL_ERROR,
   2501           "Received WINDOW_UPDATE with an invalid delta_window_size " +
   2502               base::UintToString(delta_window_size));
   2503       return;
   2504     }
   2505 
   2506     IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
   2507   } else {
   2508     // WINDOW_UPDATE for a stream.
   2509     if (flow_control_state_ < FLOW_CONTROL_STREAM) {
   2510       // TODO(akalin): Record an error and close the session.
   2511       LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
   2512                    << " when flow control is not turned on";
   2513       return;
   2514     }
   2515 
   2516     ActiveStreamMap::iterator it = active_streams_.find(stream_id);
   2517 
   2518     if (it == active_streams_.end()) {
   2519       // NOTE:  it may just be that the stream was cancelled.
   2520       LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
   2521       return;
   2522     }
   2523 
   2524     SpdyStream* stream = it->second.stream;
   2525     CHECK_EQ(stream->stream_id(), stream_id);
   2526 
   2527     if (delta_window_size < 1u) {
   2528       ResetStreamIterator(it,
   2529                           RST_STREAM_FLOW_CONTROL_ERROR,
   2530                           base::StringPrintf(
   2531                               "Received WINDOW_UPDATE with an invalid "
   2532                               "delta_window_size %ud", delta_window_size));
   2533       return;
   2534     }
   2535 
   2536     CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2537     it->second.stream->IncreaseSendWindowSize(
   2538         static_cast<int32>(delta_window_size));
   2539   }
   2540 }
   2541 
   2542 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
   2543                                       SpdyStreamId associated_stream_id,
   2544                                       SpdyPriority priority,
   2545                                       const SpdyHeaderBlock& headers) {
   2546   // Server-initiated streams should have even sequence numbers.
   2547   if ((stream_id & 0x1) != 0) {
   2548     LOG(WARNING) << "Received invalid push stream id " << stream_id;
   2549     if (GetProtocolVersion() > SPDY2)
   2550       CloseSessionOnError(ERR_SPDY_PROTOCOL_ERROR, "Odd push stream id.");
   2551     return false;
   2552   }
   2553 
   2554   if (GetProtocolVersion() > SPDY2) {
   2555     if (stream_id <= last_accepted_push_stream_id_) {
   2556       LOG(WARNING) << "Received push stream id lesser or equal to the last "
   2557                    << "accepted before " << stream_id;
   2558       CloseSessionOnError(
   2559           ERR_SPDY_PROTOCOL_ERROR,
   2560           "New push stream id must be greater than the last accepted.");
   2561       return false;
   2562     }
   2563   }
   2564 
   2565   if (IsStreamActive(stream_id)) {
   2566     // For SPDY3 and higher we should not get here, we'll start going away
   2567     // earlier on |last_seen_push_stream_id_| check.
   2568     CHECK_GT(SPDY3, GetProtocolVersion());
   2569     LOG(WARNING) << "Received push for active stream " << stream_id;
   2570     return false;
   2571   }
   2572 
   2573   last_accepted_push_stream_id_ = stream_id;
   2574 
   2575   RequestPriority request_priority =
   2576       ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
   2577 
   2578   if (availability_state_ == STATE_GOING_AWAY) {
   2579     // TODO(akalin): This behavior isn't in the SPDY spec, although it
   2580     // probably should be.
   2581     EnqueueResetStreamFrame(stream_id,
   2582                             request_priority,
   2583                             RST_STREAM_REFUSED_STREAM,
   2584                             "push stream request received when going away");
   2585     return false;
   2586   }
   2587 
   2588   if (associated_stream_id == 0) {
   2589     // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and
   2590     // session going away. We should never get here.
   2591     CHECK_GT(SPDY4, GetProtocolVersion());
   2592     std::string description = base::StringPrintf(
   2593         "Received invalid associated stream id %d for pushed stream %d",
   2594         associated_stream_id,
   2595         stream_id);
   2596     EnqueueResetStreamFrame(
   2597         stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description);
   2598     return false;
   2599   }
   2600 
   2601   streams_pushed_count_++;
   2602 
   2603   // TODO(mbelshe): DCHECK that this is a GET method?
   2604 
   2605   // Verify that the response had a URL for us.
   2606   GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
   2607   if (!gurl.is_valid()) {
   2608     EnqueueResetStreamFrame(stream_id,
   2609                             request_priority,
   2610                             RST_STREAM_PROTOCOL_ERROR,
   2611                             "Pushed stream url was invalid: " + gurl.spec());
   2612     return false;
   2613   }
   2614 
   2615   // Verify we have a valid stream association.
   2616   ActiveStreamMap::iterator associated_it =
   2617       active_streams_.find(associated_stream_id);
   2618   if (associated_it == active_streams_.end()) {
   2619     EnqueueResetStreamFrame(
   2620         stream_id,
   2621         request_priority,
   2622         RST_STREAM_INVALID_STREAM,
   2623         base::StringPrintf("Received push for inactive associated stream %d",
   2624                            associated_stream_id));
   2625     return false;
   2626   }
   2627 
   2628   // Check that the pushed stream advertises the same origin as its associated
   2629   // stream. Bypass this check if and only if this session is with a SPDY proxy
   2630   // that is trusted explicitly via the --trusted-spdy-proxy switch.
   2631   if (trusted_spdy_proxy_.Equals(host_port_pair())) {
   2632     // Disallow pushing of HTTPS content.
   2633     if (gurl.SchemeIs("https")) {
   2634       EnqueueResetStreamFrame(
   2635           stream_id,
   2636           request_priority,
   2637           RST_STREAM_REFUSED_STREAM,
   2638           base::StringPrintf("Rejected push of Cross Origin HTTPS content %d",
   2639                              associated_stream_id));
   2640     }
   2641   } else {
   2642     GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
   2643     if (associated_url.GetOrigin() != gurl.GetOrigin()) {
   2644       EnqueueResetStreamFrame(
   2645           stream_id,
   2646           request_priority,
   2647           RST_STREAM_REFUSED_STREAM,
   2648           base::StringPrintf("Rejected Cross Origin Push Stream %d",
   2649                              associated_stream_id));
   2650       return false;
   2651     }
   2652   }
   2653 
   2654   // There should not be an existing pushed stream with the same path.
   2655   PushedStreamMap::iterator pushed_it =
   2656       unclaimed_pushed_streams_.lower_bound(gurl);
   2657   if (pushed_it != unclaimed_pushed_streams_.end() &&
   2658       pushed_it->first == gurl) {
   2659     EnqueueResetStreamFrame(
   2660         stream_id,
   2661         request_priority,
   2662         RST_STREAM_PROTOCOL_ERROR,
   2663         "Received duplicate pushed stream with url: " + gurl.spec());
   2664     return false;
   2665   }
   2666 
   2667   scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM,
   2668                                                GetWeakPtr(),
   2669                                                gurl,
   2670                                                request_priority,
   2671                                                stream_initial_send_window_size_,
   2672                                                stream_initial_recv_window_size_,
   2673                                                net_log_));
   2674   stream->set_stream_id(stream_id);
   2675 
   2676   // In spdy4/http2 PUSH_PROMISE arrives on associated stream.
   2677   if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) {
   2678     associated_it->second.stream->IncrementRawReceivedBytes(
   2679         last_compressed_frame_len_);
   2680   } else {
   2681     stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
   2682   }
   2683 
   2684   last_compressed_frame_len_ = 0;
   2685 
   2686   DeleteExpiredPushedStreams();
   2687   PushedStreamMap::iterator inserted_pushed_it =
   2688       unclaimed_pushed_streams_.insert(
   2689           pushed_it,
   2690           std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
   2691   DCHECK(inserted_pushed_it != pushed_it);
   2692 
   2693   InsertActivatedStream(stream.Pass());
   2694 
   2695   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
   2696   if (active_it == active_streams_.end()) {
   2697     NOTREACHED();
   2698     return false;
   2699   }
   2700 
   2701   active_it->second.stream->OnPushPromiseHeadersReceived(headers);
   2702   DCHECK(active_it->second.stream->IsReservedRemote());
   2703   num_pushed_streams_++;
   2704   return true;
   2705 }
   2706 
   2707 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
   2708                                 SpdyStreamId promised_stream_id,
   2709                                 const SpdyHeaderBlock& headers) {
   2710   CHECK(in_io_loop_);
   2711 
   2712   if (net_log_.IsLogging()) {
   2713     net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE,
   2714                       base::Bind(&NetLogSpdyPushPromiseReceivedCallback,
   2715                                  &headers,
   2716                                  stream_id,
   2717                                  promised_stream_id));
   2718   }
   2719 
   2720   // Any priority will do.
   2721   // TODO(baranovich): pass parent stream id priority?
   2722   if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers))
   2723     return;
   2724 
   2725   base::StatsCounter push_requests("spdy.pushed_streams");
   2726   push_requests.Increment();
   2727 }
   2728 
   2729 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
   2730                                          uint32 delta_window_size) {
   2731   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2732   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   2733   CHECK(it != active_streams_.end());
   2734   CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2735   SendWindowUpdateFrame(
   2736       stream_id, delta_window_size, it->second.stream->priority());
   2737 }
   2738 
   2739 void SpdySession::SendInitialData() {
   2740   DCHECK(enable_sending_initial_data_);
   2741 
   2742   if (send_connection_header_prefix_) {
   2743     DCHECK_EQ(protocol_, kProtoSPDY4);
   2744     scoped_ptr<SpdyFrame> connection_header_prefix_frame(
   2745         new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
   2746                       kHttp2ConnectionHeaderPrefixSize,
   2747                       false /* take_ownership */));
   2748     // Count the prefix as part of the subsequent SETTINGS frame.
   2749     EnqueueSessionWrite(HIGHEST, SETTINGS,
   2750                         connection_header_prefix_frame.Pass());
   2751   }
   2752 
   2753   // First, notify the server about the settings they should use when
   2754   // communicating with us.
   2755   SettingsMap settings_map;
   2756   // Create a new settings frame notifying the server of our
   2757   // max concurrent streams and initial window size.
   2758   settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
   2759       SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
   2760   if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
   2761       stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
   2762     settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
   2763         SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
   2764                               stream_initial_recv_window_size_);
   2765   }
   2766   SendSettings(settings_map);
   2767 
   2768   // Next, notify the server about our initial recv window size.
   2769   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
   2770     // Bump up the receive window size to the real initial value. This
   2771     // has to go here since the WINDOW_UPDATE frame sent by
   2772     // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
   2773     DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
   2774     // This condition implies that |kDefaultInitialRecvWindowSize| -
   2775     // |session_recv_window_size_| doesn't overflow.
   2776     DCHECK_GT(session_recv_window_size_, 0);
   2777     IncreaseRecvWindowSize(
   2778         kDefaultInitialRecvWindowSize - session_recv_window_size_);
   2779   }
   2780 
   2781   if (protocol_ <= kProtoSPDY31) {
   2782     // Finally, notify the server about the settings they have
   2783     // previously told us to use when communicating with them (after
   2784     // applying them).
   2785     const SettingsMap& server_settings_map =
   2786         http_server_properties_->GetSpdySettings(host_port_pair());
   2787     if (server_settings_map.empty())
   2788       return;
   2789 
   2790     SettingsMap::const_iterator it =
   2791         server_settings_map.find(SETTINGS_CURRENT_CWND);
   2792     uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
   2793     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
   2794 
   2795     for (SettingsMap::const_iterator it = server_settings_map.begin();
   2796          it != server_settings_map.end(); ++it) {
   2797       const SpdySettingsIds new_id = it->first;
   2798       const uint32 new_val = it->second.second;
   2799       HandleSetting(new_id, new_val);
   2800     }
   2801 
   2802     SendSettings(server_settings_map);
   2803   }
   2804 }
   2805 
   2806 
   2807 void SpdySession::SendSettings(const SettingsMap& settings) {
   2808   const SpdyMajorVersion protocol_version = GetProtocolVersion();
   2809   net_log_.AddEvent(
   2810       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
   2811       base::Bind(&NetLogSpdySendSettingsCallback, &settings, protocol_version));
   2812   // Create the SETTINGS frame and send it.
   2813   DCHECK(buffered_spdy_framer_.get());
   2814   scoped_ptr<SpdyFrame> settings_frame(
   2815       buffered_spdy_framer_->CreateSettings(settings));
   2816   sent_settings_ = true;
   2817   EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
   2818 }
   2819 
   2820 void SpdySession::HandleSetting(uint32 id, uint32 value) {
   2821   switch (id) {
   2822     case SETTINGS_MAX_CONCURRENT_STREAMS:
   2823       max_concurrent_streams_ = std::min(static_cast<size_t>(value),
   2824                                          kMaxConcurrentStreamLimit);
   2825       ProcessPendingStreamRequests();
   2826       break;
   2827     case SETTINGS_INITIAL_WINDOW_SIZE: {
   2828       if (flow_control_state_ < FLOW_CONTROL_STREAM) {
   2829         net_log().AddEvent(
   2830             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
   2831         return;
   2832       }
   2833 
   2834       if (value > static_cast<uint32>(kint32max)) {
   2835         net_log().AddEvent(
   2836             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
   2837             NetLog::IntegerCallback("initial_window_size", value));
   2838         return;
   2839       }
   2840 
   2841       // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
   2842       int32 delta_window_size =
   2843           static_cast<int32>(value) - stream_initial_send_window_size_;
   2844       stream_initial_send_window_size_ = static_cast<int32>(value);
   2845       UpdateStreamsSendWindowSize(delta_window_size);
   2846       net_log().AddEvent(
   2847           NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
   2848           NetLog::IntegerCallback("delta_window_size", delta_window_size));
   2849       break;
   2850     }
   2851   }
   2852 }
   2853 
   2854 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
   2855   DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2856   for (ActiveStreamMap::iterator it = active_streams_.begin();
   2857        it != active_streams_.end(); ++it) {
   2858     it->second.stream->AdjustSendWindowSize(delta_window_size);
   2859   }
   2860 
   2861   for (CreatedStreamSet::const_iterator it = created_streams_.begin();
   2862        it != created_streams_.end(); it++) {
   2863     (*it)->AdjustSendWindowSize(delta_window_size);
   2864   }
   2865 }
   2866 
   2867 void SpdySession::SendPrefacePingIfNoneInFlight() {
   2868   if (pings_in_flight_ || !enable_ping_based_connection_checking_)
   2869     return;
   2870 
   2871   base::TimeTicks now = time_func_();
   2872   // If there is no activity in the session, then send a preface-PING.
   2873   if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
   2874     SendPrefacePing();
   2875 }
   2876 
   2877 void SpdySession::SendPrefacePing() {
   2878   WritePingFrame(next_ping_id_, false);
   2879 }
   2880 
   2881 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
   2882                                         uint32 delta_window_size,
   2883                                         RequestPriority priority) {
   2884   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
   2885   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   2886   if (it != active_streams_.end()) {
   2887     CHECK_EQ(it->second.stream->stream_id(), stream_id);
   2888   } else {
   2889     CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   2890     CHECK_EQ(stream_id, kSessionFlowControlStreamId);
   2891   }
   2892 
   2893   net_log_.AddEvent(
   2894       NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
   2895       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
   2896                  stream_id, delta_window_size));
   2897 
   2898   DCHECK(buffered_spdy_framer_.get());
   2899   scoped_ptr<SpdyFrame> window_update_frame(
   2900       buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
   2901   EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
   2902 }
   2903 
   2904 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
   2905   DCHECK(buffered_spdy_framer_.get());
   2906   scoped_ptr<SpdyFrame> ping_frame(
   2907       buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
   2908   EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
   2909 
   2910   if (net_log().IsLogging()) {
   2911     net_log().AddEvent(
   2912         NetLog::TYPE_SPDY_SESSION_PING,
   2913         base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
   2914   }
   2915   if (!is_ack) {
   2916     next_ping_id_ += 2;
   2917     ++pings_in_flight_;
   2918     PlanToCheckPingStatus();
   2919     last_ping_sent_time_ = time_func_();
   2920   }
   2921 }
   2922 
   2923 void SpdySession::PlanToCheckPingStatus() {
   2924   if (check_ping_status_pending_)
   2925     return;
   2926 
   2927   check_ping_status_pending_ = true;
   2928   base::MessageLoop::current()->PostDelayedTask(
   2929       FROM_HERE,
   2930       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
   2931                  time_func_()), hung_interval_);
   2932 }
   2933 
   2934 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
   2935   CHECK(!in_io_loop_);
   2936 
   2937   // Check if we got a response back for all PINGs we had sent.
   2938   if (pings_in_flight_ == 0) {
   2939     check_ping_status_pending_ = false;
   2940     return;
   2941   }
   2942 
   2943   DCHECK(check_ping_status_pending_);
   2944 
   2945   base::TimeTicks now = time_func_();
   2946   base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
   2947 
   2948   if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
   2949     // Track all failed PING messages in a separate bucket.
   2950     RecordPingRTTHistogram(base::TimeDelta::Max());
   2951     DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
   2952     return;
   2953   }
   2954 
   2955   // Check the status of connection after a delay.
   2956   base::MessageLoop::current()->PostDelayedTask(
   2957       FROM_HERE,
   2958       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
   2959                  now),
   2960       delay);
   2961 }
   2962 
   2963 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
   2964   UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
   2965 }
   2966 
   2967 void SpdySession::RecordProtocolErrorHistogram(
   2968     SpdyProtocolErrorDetails details) {
   2969   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
   2970                             NUM_SPDY_PROTOCOL_ERROR_DETAILS);
   2971   if (EndsWith(host_port_pair().host(), "google.com", false)) {
   2972     UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
   2973                               NUM_SPDY_PROTOCOL_ERROR_DETAILS);
   2974   }
   2975 }
   2976 
   2977 void SpdySession::RecordHistograms() {
   2978   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
   2979                               streams_initiated_count_,
   2980                               0, 300, 50);
   2981   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
   2982                               streams_pushed_count_,
   2983                               0, 300, 50);
   2984   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
   2985                               streams_pushed_and_claimed_count_,
   2986                               0, 300, 50);
   2987   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
   2988                               streams_abandoned_count_,
   2989                               0, 300, 50);
   2990   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
   2991                             sent_settings_ ? 1 : 0, 2);
   2992   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
   2993                             received_settings_ ? 1 : 0, 2);
   2994   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
   2995                               stalled_streams_,
   2996                               0, 300, 50);
   2997   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
   2998                             stalled_streams_ > 0 ? 1 : 0, 2);
   2999 
   3000   if (received_settings_) {
   3001     // Enumerate the saved settings, and set histograms for it.
   3002     const SettingsMap& settings_map =
   3003         http_server_properties_->GetSpdySettings(host_port_pair());
   3004 
   3005     SettingsMap::const_iterator it;
   3006     for (it = settings_map.begin(); it != settings_map.end(); ++it) {
   3007       const SpdySettingsIds id = it->first;
   3008       const uint32 val = it->second.second;
   3009       switch (id) {
   3010         case SETTINGS_CURRENT_CWND:
   3011           // Record several different histograms to see if cwnd converges
   3012           // for larger volumes of data being sent.
   3013           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
   3014                                       val, 1, 200, 100);
   3015           if (total_bytes_received_ > 10 * 1024) {
   3016             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
   3017                                         val, 1, 200, 100);
   3018             if (total_bytes_received_ > 25 * 1024) {
   3019               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
   3020                                           val, 1, 200, 100);
   3021               if (total_bytes_received_ > 50 * 1024) {
   3022                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
   3023                                             val, 1, 200, 100);
   3024                 if (total_bytes_received_ > 100 * 1024) {
   3025                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
   3026                                               val, 1, 200, 100);
   3027                 }
   3028               }
   3029             }
   3030           }
   3031           break;
   3032         case SETTINGS_ROUND_TRIP_TIME:
   3033           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
   3034                                       val, 1, 1200, 100);
   3035           break;
   3036         case SETTINGS_DOWNLOAD_RETRANS_RATE:
   3037           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
   3038                                       val, 1, 100, 50);
   3039           break;
   3040         default:
   3041           break;
   3042       }
   3043     }
   3044   }
   3045 }
   3046 
   3047 void SpdySession::CompleteStreamRequest(
   3048     const base::WeakPtr<SpdyStreamRequest>& pending_request) {
   3049   // Abort if the request has already been cancelled.
   3050   if (!pending_request)
   3051     return;
   3052 
   3053   base::WeakPtr<SpdyStream> stream;
   3054   int rv = TryCreateStream(pending_request, &stream);
   3055 
   3056   if (rv == OK) {
   3057     DCHECK(stream);
   3058     pending_request->OnRequestCompleteSuccess(stream);
   3059     return;
   3060   }
   3061   DCHECK(!stream);
   3062 
   3063   if (rv != ERR_IO_PENDING) {
   3064     pending_request->OnRequestCompleteFailure(rv);
   3065   }
   3066 }
   3067 
   3068 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
   3069   if (!is_secure_)
   3070     return NULL;
   3071   SSLClientSocket* ssl_socket =
   3072       reinterpret_cast<SSLClientSocket*>(connection_->socket());
   3073   DCHECK(ssl_socket);
   3074   return ssl_socket;
   3075 }
   3076 
   3077 void SpdySession::OnWriteBufferConsumed(
   3078     size_t frame_payload_size,
   3079     size_t consume_size,
   3080     SpdyBuffer::ConsumeSource consume_source) {
   3081   // We can be called with |in_io_loop_| set if a write SpdyBuffer is
   3082   // deleted (e.g., a stream is closed due to incoming data).
   3083 
   3084   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3085 
   3086   if (consume_source == SpdyBuffer::DISCARD) {
   3087     // If we're discarding a frame or part of it, increase the send
   3088     // window by the number of discarded bytes. (Although if we're
   3089     // discarding part of a frame, it's probably because of a write
   3090     // error and we'll be tearing down the session soon.)
   3091     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
   3092     DCHECK_GT(remaining_payload_bytes, 0u);
   3093     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
   3094   }
   3095   // For consumed bytes, the send window is increased when we receive
   3096   // a WINDOW_UPDATE frame.
   3097 }
   3098 
   3099 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
   3100   // We can be called with |in_io_loop_| set if a SpdyBuffer is
   3101   // deleted (e.g., a stream is closed due to incoming data).
   3102 
   3103   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3104   DCHECK_GE(delta_window_size, 1);
   3105 
   3106   // Check for overflow.
   3107   int32 max_delta_window_size = kint32max - session_send_window_size_;
   3108   if (delta_window_size > max_delta_window_size) {
   3109     RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
   3110     DoDrainSession(
   3111         ERR_SPDY_PROTOCOL_ERROR,
   3112         "Received WINDOW_UPDATE [delta: " +
   3113             base::IntToString(delta_window_size) +
   3114             "] for session overflows session_send_window_size_ [current: " +
   3115             base::IntToString(session_send_window_size_) + "]");
   3116     return;
   3117   }
   3118 
   3119   session_send_window_size_ += delta_window_size;
   3120 
   3121   net_log_.AddEvent(
   3122       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
   3123       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   3124                  delta_window_size, session_send_window_size_));
   3125 
   3126   DCHECK(!IsSendStalled());
   3127   ResumeSendStalledStreams();
   3128 }
   3129 
   3130 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
   3131   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3132 
   3133   // We only call this method when sending a frame. Therefore,
   3134   // |delta_window_size| should be within the valid frame size range.
   3135   DCHECK_GE(delta_window_size, 1);
   3136   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
   3137 
   3138   // |send_window_size_| should have been at least |delta_window_size| for
   3139   // this call to happen.
   3140   DCHECK_GE(session_send_window_size_, delta_window_size);
   3141 
   3142   session_send_window_size_ -= delta_window_size;
   3143 
   3144   net_log_.AddEvent(
   3145       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
   3146       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   3147                  -delta_window_size, session_send_window_size_));
   3148 }
   3149 
   3150 void SpdySession::OnReadBufferConsumed(
   3151     size_t consume_size,
   3152     SpdyBuffer::ConsumeSource consume_source) {
   3153   // We can be called with |in_io_loop_| set if a read SpdyBuffer is
   3154   // deleted (e.g., discarded by a SpdyReadQueue).
   3155 
   3156   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3157   DCHECK_GE(consume_size, 1u);
   3158   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
   3159 
   3160   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
   3161 }
   3162 
   3163 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
   3164   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3165   DCHECK_GE(session_unacked_recv_window_bytes_, 0);
   3166   DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
   3167   DCHECK_GE(delta_window_size, 1);
   3168   // Check for overflow.
   3169   DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
   3170 
   3171   session_recv_window_size_ += delta_window_size;
   3172   net_log_.AddEvent(
   3173       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
   3174       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   3175                  delta_window_size, session_recv_window_size_));
   3176 
   3177   session_unacked_recv_window_bytes_ += delta_window_size;
   3178   if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
   3179     SendWindowUpdateFrame(kSessionFlowControlStreamId,
   3180                           session_unacked_recv_window_bytes_,
   3181                           HIGHEST);
   3182     session_unacked_recv_window_bytes_ = 0;
   3183   }
   3184 }
   3185 
   3186 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
   3187   CHECK(in_io_loop_);
   3188   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3189   DCHECK_GE(delta_window_size, 1);
   3190 
   3191   // Since we never decrease the initial receive window size,
   3192   // |delta_window_size| should never cause |recv_window_size_| to go
   3193   // negative. If we do, the receive window isn't being respected.
   3194   if (delta_window_size > session_recv_window_size_) {
   3195     RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
   3196     DoDrainSession(
   3197         ERR_SPDY_FLOW_CONTROL_ERROR,
   3198         "delta_window_size is " + base::IntToString(delta_window_size) +
   3199             " in DecreaseRecvWindowSize, which is larger than the receive " +
   3200             "window size of " + base::IntToString(session_recv_window_size_));
   3201     return;
   3202   }
   3203 
   3204   session_recv_window_size_ -= delta_window_size;
   3205   net_log_.AddEvent(
   3206       NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
   3207       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
   3208                  -delta_window_size, session_recv_window_size_));
   3209 }
   3210 
   3211 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
   3212   DCHECK(stream.send_stalled_by_flow_control());
   3213   RequestPriority priority = stream.priority();
   3214   CHECK_GE(priority, MINIMUM_PRIORITY);
   3215   CHECK_LE(priority, MAXIMUM_PRIORITY);
   3216   stream_send_unstall_queue_[priority].push_back(stream.stream_id());
   3217 }
   3218 
   3219 void SpdySession::ResumeSendStalledStreams() {
   3220   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
   3221 
   3222   // We don't have to worry about new streams being queued, since
   3223   // doing so would cause IsSendStalled() to return true. But we do
   3224   // have to worry about streams being closed, as well as ourselves
   3225   // being closed.
   3226 
   3227   while (!IsSendStalled()) {
   3228     size_t old_size = 0;
   3229 #if DCHECK_IS_ON
   3230     old_size = GetTotalSize(stream_send_unstall_queue_);
   3231 #endif
   3232 
   3233     SpdyStreamId stream_id = PopStreamToPossiblyResume();
   3234     if (stream_id == 0)
   3235       break;
   3236     ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
   3237     // The stream may actually still be send-stalled after this (due
   3238     // to its own send window) but that's okay -- it'll then be
   3239     // resumed once its send window increases.
   3240     if (it != active_streams_.end())
   3241       it->second.stream->PossiblyResumeIfSendStalled();
   3242 
   3243     // The size should decrease unless we got send-stalled again.
   3244     if (!IsSendStalled())
   3245       DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
   3246   }
   3247 }
   3248 
   3249 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
   3250   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
   3251     std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
   3252     if (!queue->empty()) {
   3253       SpdyStreamId stream_id = queue->front();
   3254       queue->pop_front();
   3255       return stream_id;
   3256     }
   3257   }
   3258   return 0;
   3259 }
   3260 
   3261 }  // namespace net
   3262