Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_session.h"
      6 
      7 #include "base/basictypes.h"
      8 #include "base/logging.h"
      9 #include "base/memory/linked_ptr.h"
     10 #include "base/message_loop.h"
     11 #include "base/metrics/field_trial.h"
     12 #include "base/metrics/stats_counters.h"
     13 #include "base/stl_util-inl.h"
     14 #include "base/string_number_conversions.h"
     15 #include "base/string_util.h"
     16 #include "base/stringprintf.h"
     17 #include "base/time.h"
     18 #include "base/utf_string_conversions.h"
     19 #include "base/values.h"
     20 #include "net/base/connection_type_histograms.h"
     21 #include "net/base/net_log.h"
     22 #include "net/base/net_util.h"
     23 #include "net/http/http_network_session.h"
     24 #include "net/socket/ssl_client_socket.h"
     25 #include "net/spdy/spdy_frame_builder.h"
     26 #include "net/spdy/spdy_http_utils.h"
     27 #include "net/spdy/spdy_protocol.h"
     28 #include "net/spdy/spdy_session_pool.h"
     29 #include "net/spdy/spdy_settings_storage.h"
     30 #include "net/spdy/spdy_stream.h"
     31 
     32 namespace net {
     33 
     34 NetLogSpdySynParameter::NetLogSpdySynParameter(
     35     const linked_ptr<spdy::SpdyHeaderBlock>& headers,
     36     spdy::SpdyControlFlags flags,
     37     spdy::SpdyStreamId id,
     38     spdy::SpdyStreamId associated_stream)
     39     : headers_(headers),
     40       flags_(flags),
     41       id_(id),
     42       associated_stream_(associated_stream) {
     43 }
     44 
     45 NetLogSpdySynParameter::~NetLogSpdySynParameter() {
     46 }
     47 
     48 Value* NetLogSpdySynParameter::ToValue() const {
     49   DictionaryValue* dict = new DictionaryValue();
     50   ListValue* headers_list = new ListValue();
     51   for (spdy::SpdyHeaderBlock::const_iterator it = headers_->begin();
     52       it != headers_->end(); ++it) {
     53     headers_list->Append(new StringValue(base::StringPrintf(
     54         "%s: %s", it->first.c_str(), it->second.c_str())));
     55   }
     56   dict->SetInteger("flags", flags_);
     57   dict->Set("headers", headers_list);
     58   dict->SetInteger("id", id_);
     59   if (associated_stream_)
     60     dict->SetInteger("associated_stream", associated_stream_);
     61   return dict;
     62 }
     63 
     64 namespace {
     65 
     66 const int kReadBufferSize = 8 * 1024;
     67 
     68 class NetLogSpdySessionParameter : public NetLog::EventParameters {
     69  public:
     70   NetLogSpdySessionParameter(const HostPortProxyPair& host_pair)
     71       : host_pair_(host_pair) {}
     72   virtual Value* ToValue() const {
     73     DictionaryValue* dict = new DictionaryValue();
     74     dict->Set("host", new StringValue(host_pair_.first.ToString()));
     75     dict->Set("proxy", new StringValue(host_pair_.second.ToPacString()));
     76     return dict;
     77   }
     78  private:
     79   const HostPortProxyPair host_pair_;
     80   DISALLOW_COPY_AND_ASSIGN(NetLogSpdySessionParameter);
     81 };
     82 
     83 class NetLogSpdySettingsParameter : public NetLog::EventParameters {
     84  public:
     85   explicit NetLogSpdySettingsParameter(const spdy::SpdySettings& settings)
     86       : settings_(settings) {}
     87 
     88   virtual Value* ToValue() const {
     89     DictionaryValue* dict = new DictionaryValue();
     90     ListValue* settings = new ListValue();
     91     for (spdy::SpdySettings::const_iterator it = settings_.begin();
     92          it != settings_.end(); ++it) {
     93       settings->Append(new StringValue(
     94           base::StringPrintf("[%u:%u]", it->first.id(), it->second)));
     95     }
     96     dict->Set("settings", settings);
     97     return dict;
     98   }
     99 
    100  private:
    101   ~NetLogSpdySettingsParameter() {}
    102   const spdy::SpdySettings settings_;
    103 
    104   DISALLOW_COPY_AND_ASSIGN(NetLogSpdySettingsParameter);
    105 };
    106 
    107 class NetLogSpdyWindowUpdateParameter : public NetLog::EventParameters {
    108  public:
    109   NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,
    110                                   int delta,
    111                                   int window_size)
    112       : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
    113 
    114   virtual Value* ToValue() const {
    115     DictionaryValue* dict = new DictionaryValue();
    116     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
    117     dict->SetInteger("delta", delta_);
    118     dict->SetInteger("window_size", window_size_);
    119     return dict;
    120   }
    121 
    122  private:
    123   ~NetLogSpdyWindowUpdateParameter() {}
    124   const spdy::SpdyStreamId stream_id_;
    125   const int delta_;
    126   const int window_size_;
    127 
    128   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyWindowUpdateParameter);
    129 };
    130 
    131 class NetLogSpdyDataParameter : public NetLog::EventParameters {
    132  public:
    133   NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,
    134                           int size,
    135                           spdy::SpdyDataFlags flags)
    136       : stream_id_(stream_id), size_(size), flags_(flags) {}
    137 
    138   virtual Value* ToValue() const {
    139     DictionaryValue* dict = new DictionaryValue();
    140     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
    141     dict->SetInteger("size", size_);
    142     dict->SetInteger("flags", static_cast<int>(flags_));
    143     return dict;
    144   }
    145 
    146  private:
    147   ~NetLogSpdyDataParameter() {}
    148   const spdy::SpdyStreamId stream_id_;
    149   const int size_;
    150   const spdy::SpdyDataFlags flags_;
    151 
    152   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyDataParameter);
    153 };
    154 
    155 class NetLogSpdyRstParameter : public NetLog::EventParameters {
    156  public:
    157   NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id, int status)
    158       : stream_id_(stream_id), status_(status) {}
    159 
    160   virtual Value* ToValue() const {
    161     DictionaryValue* dict = new DictionaryValue();
    162     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
    163     dict->SetInteger("status", status_);
    164     return dict;
    165   }
    166 
    167  private:
    168   ~NetLogSpdyRstParameter() {}
    169   const spdy::SpdyStreamId stream_id_;
    170   const int status_;
    171 
    172   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyRstParameter);
    173 };
    174 
    175 class NetLogSpdyGoAwayParameter : public NetLog::EventParameters {
    176  public:
    177   NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,
    178                             int active_streams,
    179                             int unclaimed_streams)
    180       : last_stream_id_(last_stream_id),
    181         active_streams_(active_streams),
    182         unclaimed_streams_(unclaimed_streams) {}
    183 
    184   virtual Value* ToValue() const {
    185     DictionaryValue* dict = new DictionaryValue();
    186     dict->SetInteger("last_accepted_stream_id",
    187                      static_cast<int>(last_stream_id_));
    188     dict->SetInteger("active_streams", active_streams_);
    189     dict->SetInteger("unclaimed_streams", unclaimed_streams_);
    190     return dict;
    191   }
    192 
    193  private:
    194   ~NetLogSpdyGoAwayParameter() {}
    195   const spdy::SpdyStreamId last_stream_id_;
    196   const int active_streams_;
    197   const int unclaimed_streams_;
    198 
    199   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyGoAwayParameter);
    200 };
    201 
    202 }  // namespace
    203 
    204 // static
    205 bool SpdySession::use_ssl_ = true;
    206 
    207 // static
    208 bool SpdySession::use_flow_control_ = false;
    209 
    210 // static
    211 size_t SpdySession::max_concurrent_stream_limit_ = 256;
    212 
    213 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
    214                          SpdySessionPool* spdy_session_pool,
    215                          SpdySettingsStorage* spdy_settings,
    216                          NetLog* net_log)
    217     : ALLOW_THIS_IN_INITIALIZER_LIST(
    218           read_callback_(this, &SpdySession::OnReadComplete)),
    219       ALLOW_THIS_IN_INITIALIZER_LIST(
    220           write_callback_(this, &SpdySession::OnWriteComplete)),
    221       ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
    222       host_port_proxy_pair_(host_port_proxy_pair),
    223       spdy_session_pool_(spdy_session_pool),
    224       spdy_settings_(spdy_settings),
    225       connection_(new ClientSocketHandle),
    226       read_buffer_(new IOBuffer(kReadBufferSize)),
    227       read_pending_(false),
    228       stream_hi_water_mark_(1),  // Always start at 1 for the first stream id.
    229       write_pending_(false),
    230       delayed_write_pending_(false),
    231       is_secure_(false),
    232       certificate_error_code_(OK),
    233       error_(OK),
    234       state_(IDLE),
    235       max_concurrent_streams_(kDefaultMaxConcurrentStreams),
    236       streams_initiated_count_(0),
    237       streams_pushed_count_(0),
    238       streams_pushed_and_claimed_count_(0),
    239       streams_abandoned_count_(0),
    240       frames_received_(0),
    241       bytes_received_(0),
    242       sent_settings_(false),
    243       received_settings_(false),
    244       stalled_streams_(0),
    245       initial_send_window_size_(spdy::kSpdyStreamInitialWindowSize),
    246       initial_recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
    247       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)) {
    248   DCHECK(HttpStreamFactory::spdy_enabled());
    249   net_log_.BeginEvent(
    250       NetLog::TYPE_SPDY_SESSION,
    251       make_scoped_refptr(
    252           new NetLogSpdySessionParameter(host_port_proxy_pair_)));
    253 
    254   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
    255 
    256   spdy_framer_.set_visitor(this);
    257 
    258   SendSettings();
    259 }
    260 
    261 SpdySession::~SpdySession() {
    262   if (state_ != CLOSED) {
    263     state_ = CLOSED;
    264 
    265     // Cleanup all the streams.
    266     CloseAllStreams(net::ERR_ABORTED);
    267   }
    268 
    269   if (connection_->is_initialized()) {
    270     // With Spdy we can't recycle sockets.
    271     connection_->socket()->Disconnect();
    272   }
    273 
    274   // Streams should all be gone now.
    275   DCHECK_EQ(0u, num_active_streams());
    276   DCHECK_EQ(0u, num_unclaimed_pushed_streams());
    277 
    278   DCHECK(pending_callback_map_.empty());
    279 
    280   RecordHistograms();
    281 
    282   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION, NULL);
    283 }
    284 
    285 net::Error SpdySession::InitializeWithSocket(
    286     ClientSocketHandle* connection,
    287     bool is_secure,
    288     int certificate_error_code) {
    289   base::StatsCounter spdy_sessions("spdy.sessions");
    290   spdy_sessions.Increment();
    291 
    292   state_ = CONNECTED;
    293   connection_.reset(connection);
    294   is_secure_ = is_secure;
    295   certificate_error_code_ = certificate_error_code;
    296 
    297   // Write out any data that we might have to send, such as the settings frame.
    298   WriteSocketLater();
    299   net::Error error = ReadSocket();
    300   if (error == ERR_IO_PENDING)
    301     return OK;
    302   return error;
    303 }
    304 
    305 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
    306   if (state_ != CONNECTED)
    307     return false;
    308 
    309   SSLInfo ssl_info;
    310   bool was_npn_negotiated;
    311   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated))
    312     return true;   // This is not a secure session, so all domains are okay.
    313 
    314   return ssl_info.cert->VerifyNameMatch(domain);
    315 }
    316 
    317 int SpdySession::GetPushStream(
    318     const GURL& url,
    319     scoped_refptr<SpdyStream>* stream,
    320     const BoundNetLog& stream_net_log) {
    321   CHECK_NE(state_, CLOSED);
    322 
    323   *stream = NULL;
    324 
    325   // Don't allow access to secure push streams over an unauthenticated, but
    326   // encrypted SSL socket.
    327   if (is_secure_ && certificate_error_code_ != OK &&
    328       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
    329     LOG(ERROR) << "Tried to get pushed spdy stream for secure content over an "
    330                << "unauthenticated session.";
    331     CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
    332     return ERR_SPDY_PROTOCOL_ERROR;
    333   }
    334 
    335   *stream = GetActivePushStream(url.spec());
    336   if (stream->get()) {
    337     DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
    338     streams_pushed_and_claimed_count_++;
    339     return OK;
    340   }
    341   return 0;
    342 }
    343 
    344 int SpdySession::CreateStream(
    345     const GURL& url,
    346     RequestPriority priority,
    347     scoped_refptr<SpdyStream>* spdy_stream,
    348     const BoundNetLog& stream_net_log,
    349     CompletionCallback* callback) {
    350   if (!max_concurrent_streams_ ||
    351       active_streams_.size() < max_concurrent_streams_) {
    352     return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
    353   }
    354 
    355   stalled_streams_++;
    356   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL);
    357   create_stream_queues_[priority].push(
    358       PendingCreateStream(url, priority, spdy_stream,
    359                           stream_net_log, callback));
    360   return ERR_IO_PENDING;
    361 }
    362 
    363 void SpdySession::ProcessPendingCreateStreams() {
    364   while (!max_concurrent_streams_ ||
    365          active_streams_.size() < max_concurrent_streams_) {
    366     bool no_pending_create_streams = true;
    367     for (int i = 0;i < NUM_PRIORITIES;++i) {
    368       if (!create_stream_queues_[i].empty()) {
    369         PendingCreateStream pending_create = create_stream_queues_[i].front();
    370         create_stream_queues_[i].pop();
    371         no_pending_create_streams = false;
    372         int error = CreateStreamImpl(*pending_create.url,
    373                                      pending_create.priority,
    374                                      pending_create.spdy_stream,
    375                                      *pending_create.stream_net_log);
    376         scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream;
    377         DCHECK(!ContainsKey(pending_callback_map_, stream));
    378         pending_callback_map_[stream] =
    379             CallbackResultPair(pending_create.callback, error);
    380         MessageLoop::current()->PostTask(
    381             FROM_HERE,
    382             method_factory_.NewRunnableMethod(
    383                 &SpdySession::InvokeUserStreamCreationCallback, stream));
    384         break;
    385       }
    386     }
    387     if (no_pending_create_streams)
    388       return;  // there were no streams in any queue
    389   }
    390 }
    391 
    392 void SpdySession::CancelPendingCreateStreams(
    393     const scoped_refptr<SpdyStream>* spdy_stream) {
    394   PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream);
    395   if (it != pending_callback_map_.end()) {
    396     pending_callback_map_.erase(it);
    397     return;
    398   }
    399 
    400   for (int i = 0;i < NUM_PRIORITIES;++i) {
    401     PendingCreateStreamQueue tmp;
    402     // Make a copy removing this trans
    403     while (!create_stream_queues_[i].empty()) {
    404       PendingCreateStream pending_create = create_stream_queues_[i].front();
    405       create_stream_queues_[i].pop();
    406       if (pending_create.spdy_stream != spdy_stream)
    407         tmp.push(pending_create);
    408     }
    409     // Now copy it back
    410     while (!tmp.empty()) {
    411       create_stream_queues_[i].push(tmp.front());
    412       tmp.pop();
    413     }
    414   }
    415 }
    416 
    417 int SpdySession::CreateStreamImpl(
    418     const GURL& url,
    419     RequestPriority priority,
    420     scoped_refptr<SpdyStream>* spdy_stream,
    421     const BoundNetLog& stream_net_log) {
    422   // Make sure that we don't try to send https/wss over an unauthenticated, but
    423   // encrypted SSL socket.
    424   if (is_secure_ && certificate_error_code_ != OK &&
    425       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
    426     LOG(ERROR) << "Tried to create spdy stream for secure content over an "
    427                << "unauthenticated session.";
    428     CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
    429     return ERR_SPDY_PROTOCOL_ERROR;
    430   }
    431 
    432   const std::string& path = url.PathForRequest();
    433 
    434   const spdy::SpdyStreamId stream_id = GetNewStreamId();
    435 
    436   *spdy_stream = new SpdyStream(this,
    437                                 stream_id,
    438                                 false,
    439                                 stream_net_log);
    440   const scoped_refptr<SpdyStream>& stream = *spdy_stream;
    441 
    442   stream->set_priority(priority);
    443   stream->set_path(path);
    444   stream->set_send_window_size(initial_send_window_size_);
    445   stream->set_recv_window_size(initial_recv_window_size_);
    446   ActivateStream(stream);
    447 
    448   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
    449       static_cast<int>(priority), 0, 10, 11);
    450 
    451   // TODO(mbelshe): Optimize memory allocations
    452   DCHECK(priority >= net::HIGHEST && priority < net::NUM_PRIORITIES);
    453 
    454   DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
    455   return OK;
    456 }
    457 
    458 int SpdySession::WriteSynStream(
    459     spdy::SpdyStreamId stream_id,
    460     RequestPriority priority,
    461     spdy::SpdyControlFlags flags,
    462     const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
    463   // Find our stream
    464   if (!IsStreamActive(stream_id))
    465     return ERR_INVALID_SPDY_STREAM;
    466   const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
    467   CHECK_EQ(stream->stream_id(), stream_id);
    468 
    469   scoped_ptr<spdy::SpdySynStreamControlFrame> syn_frame(
    470       spdy_framer_.CreateSynStream(
    471           stream_id, 0,
    472           ConvertRequestPriorityToSpdyPriority(priority),
    473           flags, false, headers.get()));
    474   QueueFrame(syn_frame.get(), priority, stream);
    475 
    476   base::StatsCounter spdy_requests("spdy.requests");
    477   spdy_requests.Increment();
    478   streams_initiated_count_++;
    479 
    480   if (net_log().IsLoggingAllEvents()) {
    481     net_log().AddEvent(
    482         NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
    483         make_scoped_refptr(
    484             new NetLogSpdySynParameter(headers, flags, stream_id, 0)));
    485   }
    486 
    487   return ERR_IO_PENDING;
    488 }
    489 
    490 int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id,
    491                                  net::IOBuffer* data, int len,
    492                                  spdy::SpdyDataFlags flags) {
    493   // Find our stream
    494   DCHECK(IsStreamActive(stream_id));
    495   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
    496   CHECK_EQ(stream->stream_id(), stream_id);
    497   if (!stream)
    498     return ERR_INVALID_SPDY_STREAM;
    499 
    500   if (len > kMaxSpdyFrameChunkSize) {
    501     len = kMaxSpdyFrameChunkSize;
    502     flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
    503   }
    504 
    505   // Obey send window size of the stream if flow control is enabled.
    506   if (use_flow_control_) {
    507     if (stream->send_window_size() <= 0) {
    508       // Because we queue frames onto the session, it is possible that
    509       // a stream was not flow controlled at the time it attempted the
    510       // write, but when we go to fulfill the write, it is now flow
    511       // controlled.  This is why we need the session to mark the stream
    512       // as stalled - because only the session knows for sure when the
    513       // stall occurs.
    514       stream->set_stalled_by_flow_control(true);
    515       net_log().AddEvent(
    516           NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
    517           make_scoped_refptr(
    518               new NetLogIntegerParameter("stream_id", stream_id)));
    519       return ERR_IO_PENDING;
    520     }
    521     int new_len = std::min(len, stream->send_window_size());
    522     if (new_len < len) {
    523       len = new_len;
    524       flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
    525     }
    526     stream->DecreaseSendWindowSize(len);
    527   }
    528 
    529   if (net_log().IsLoggingAllEvents()) {
    530     net_log().AddEvent(
    531         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
    532         make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags)));
    533   }
    534 
    535   // TODO(mbelshe): reduce memory copies here.
    536   scoped_ptr<spdy::SpdyDataFrame> frame(
    537       spdy_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
    538   QueueFrame(frame.get(), stream->priority(), stream);
    539   return ERR_IO_PENDING;
    540 }
    541 
    542 void SpdySession::CloseStream(spdy::SpdyStreamId stream_id, int status) {
    543   // TODO(mbelshe): We should send a RST_STREAM control frame here
    544   //                so that the server can cancel a large send.
    545 
    546   DeleteStream(stream_id, status);
    547 }
    548 
    549 void SpdySession::ResetStream(
    550     spdy::SpdyStreamId stream_id, spdy::SpdyStatusCodes status) {
    551 
    552   net_log().AddEvent(
    553       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
    554       make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status)));
    555 
    556   scoped_ptr<spdy::SpdyRstStreamControlFrame> rst_frame(
    557       spdy_framer_.CreateRstStream(stream_id, status));
    558 
    559   // Default to lowest priority unless we know otherwise.
    560   int priority = 3;
    561   if(IsStreamActive(stream_id)) {
    562     scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
    563     priority = stream->priority();
    564   }
    565   QueueFrame(rst_frame.get(), priority, NULL);
    566 
    567   DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
    568 }
    569 
    570 bool SpdySession::IsStreamActive(spdy::SpdyStreamId stream_id) const {
    571   return ContainsKey(active_streams_, stream_id);
    572 }
    573 
    574 LoadState SpdySession::GetLoadState() const {
    575   // NOTE: The application only queries the LoadState via the
    576   //       SpdyNetworkTransaction, and details are only needed when
    577   //       we're in the process of connecting.
    578 
    579   // If we're connecting, defer to the connection to give us the actual
    580   // LoadState.
    581   if (state_ == CONNECTING)
    582     return connection_->GetLoadState();
    583 
    584   // Just report that we're idle since the session could be doing
    585   // many things concurrently.
    586   return LOAD_STATE_IDLE;
    587 }
    588 
    589 void SpdySession::OnReadComplete(int bytes_read) {
    590   // Parse a frame.  For now this code requires that the frame fit into our
    591   // buffer (32KB).
    592   // TODO(mbelshe): support arbitrarily large frames!
    593 
    594   read_pending_ = false;
    595 
    596   if (bytes_read <= 0) {
    597     // Session is tearing down.
    598     net::Error error = static_cast<net::Error>(bytes_read);
    599     if (bytes_read == 0)
    600       error = ERR_CONNECTION_CLOSED;
    601     CloseSessionOnError(error, true);
    602     return;
    603   }
    604 
    605   bytes_received_ += bytes_read;
    606 
    607   // The SpdyFramer will use callbacks onto |this| as it parses frames.
    608   // When errors occur, those callbacks can lead to teardown of all references
    609   // to |this|, so maintain a reference to self during this call for safe
    610   // cleanup.
    611   scoped_refptr<SpdySession> self(this);
    612 
    613   char *data = read_buffer_->data();
    614   while (bytes_read &&
    615          spdy_framer_.error_code() == spdy::SpdyFramer::SPDY_NO_ERROR) {
    616     uint32 bytes_processed = spdy_framer_.ProcessInput(data, bytes_read);
    617     bytes_read -= bytes_processed;
    618     data += bytes_processed;
    619     if (spdy_framer_.state() == spdy::SpdyFramer::SPDY_DONE)
    620       spdy_framer_.Reset();
    621   }
    622 
    623   if (state_ != CLOSED)
    624     ReadSocket();
    625 }
    626 
    627 void SpdySession::OnWriteComplete(int result) {
    628   DCHECK(write_pending_);
    629   DCHECK(in_flight_write_.size());
    630 
    631   write_pending_ = false;
    632 
    633   scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
    634 
    635   if (result >= 0) {
    636     // It should not be possible to have written more bytes than our
    637     // in_flight_write_.
    638     DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
    639 
    640     in_flight_write_.buffer()->DidConsume(result);
    641 
    642     // We only notify the stream when we've fully written the pending frame.
    643     if (!in_flight_write_.buffer()->BytesRemaining()) {
    644       if (stream) {
    645         // Report the number of bytes written to the caller, but exclude the
    646         // frame size overhead.  NOTE: if this frame was compressed the
    647         // reported bytes written is the compressed size, not the original
    648         // size.
    649         if (result > 0) {
    650           result = in_flight_write_.buffer()->size();
    651           DCHECK_GE(result, static_cast<int>(spdy::SpdyFrame::size()));
    652           result -= static_cast<int>(spdy::SpdyFrame::size());
    653         }
    654 
    655         // It is possible that the stream was cancelled while we were writing
    656         // to the socket.
    657         if (!stream->cancelled())
    658           stream->OnWriteComplete(result);
    659       }
    660 
    661       // Cleanup the write which just completed.
    662       in_flight_write_.release();
    663     }
    664 
    665     // Write more data.  We're already in a continuation, so we can
    666     // go ahead and write it immediately (without going back to the
    667     // message loop).
    668     WriteSocketLater();
    669   } else {
    670     in_flight_write_.release();
    671 
    672     // The stream is now errored.  Close it down.
    673     CloseSessionOnError(static_cast<net::Error>(result), true);
    674   }
    675 }
    676 
    677 net::Error SpdySession::ReadSocket() {
    678   if (read_pending_)
    679     return OK;
    680 
    681   if (state_ == CLOSED) {
    682     NOTREACHED();
    683     return ERR_UNEXPECTED;
    684   }
    685 
    686   CHECK(connection_.get());
    687   CHECK(connection_->socket());
    688   int bytes_read = connection_->socket()->Read(read_buffer_.get(),
    689                                                kReadBufferSize,
    690                                                &read_callback_);
    691   switch (bytes_read) {
    692     case 0:
    693       // Socket is closed!
    694       CloseSessionOnError(ERR_CONNECTION_CLOSED, true);
    695       return ERR_CONNECTION_CLOSED;
    696     case net::ERR_IO_PENDING:
    697       // Waiting for data.  Nothing to do now.
    698       read_pending_ = true;
    699       return ERR_IO_PENDING;
    700     default:
    701       // Data was read, process it.
    702       // Schedule the work through the message loop to avoid recursive
    703       // callbacks.
    704       read_pending_ = true;
    705       MessageLoop::current()->PostTask(
    706           FROM_HERE,
    707           method_factory_.NewRunnableMethod(
    708               &SpdySession::OnReadComplete, bytes_read));
    709       break;
    710   }
    711   return OK;
    712 }
    713 
    714 void SpdySession::WriteSocketLater() {
    715   if (delayed_write_pending_)
    716     return;
    717 
    718   if (state_ < CONNECTED)
    719     return;
    720 
    721   delayed_write_pending_ = true;
    722   MessageLoop::current()->PostTask(
    723       FROM_HERE,
    724       method_factory_.NewRunnableMethod(&SpdySession::WriteSocket));
    725 }
    726 
    727 void SpdySession::WriteSocket() {
    728   // This function should only be called via WriteSocketLater.
    729   DCHECK(delayed_write_pending_);
    730   delayed_write_pending_ = false;
    731 
    732   // If the socket isn't connected yet, just wait; we'll get called
    733   // again when the socket connection completes.  If the socket is
    734   // closed, just return.
    735   if (state_ < CONNECTED || state_ == CLOSED)
    736     return;
    737 
    738   if (write_pending_)   // Another write is in progress still.
    739     return;
    740 
    741   // Loop sending frames until we've sent everything or until the write
    742   // returns error (or ERR_IO_PENDING).
    743   while (in_flight_write_.buffer() || !queue_.empty()) {
    744     if (!in_flight_write_.buffer()) {
    745       // Grab the next SpdyFrame to send.
    746       SpdyIOBuffer next_buffer = queue_.top();
    747       queue_.pop();
    748 
    749       // We've deferred compression until just before we write it to the socket,
    750       // which is now.  At this time, we don't compress our data frames.
    751       spdy::SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
    752       size_t size;
    753       if (spdy_framer_.IsCompressible(uncompressed_frame)) {
    754         scoped_ptr<spdy::SpdyFrame> compressed_frame(
    755             spdy_framer_.CompressFrame(uncompressed_frame));
    756         if (!compressed_frame.get()) {
    757           LOG(ERROR) << "SPDY Compression failure";
    758           CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
    759           return;
    760         }
    761 
    762         size = compressed_frame->length() + spdy::SpdyFrame::size();
    763 
    764         DCHECK_GT(size, 0u);
    765 
    766         // TODO(mbelshe): We have too much copying of data here.
    767         IOBufferWithSize* buffer = new IOBufferWithSize(size);
    768         memcpy(buffer->data(), compressed_frame->data(), size);
    769 
    770         // Attempt to send the frame.
    771         in_flight_write_ = SpdyIOBuffer(buffer, size, 0, next_buffer.stream());
    772       } else {
    773         size = uncompressed_frame.length() + spdy::SpdyFrame::size();
    774         in_flight_write_ = next_buffer;
    775       }
    776     } else {
    777       DCHECK(in_flight_write_.buffer()->BytesRemaining());
    778     }
    779 
    780     write_pending_ = true;
    781     int rv = connection_->socket()->Write(in_flight_write_.buffer(),
    782         in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
    783     if (rv == net::ERR_IO_PENDING)
    784       break;
    785 
    786     // We sent the frame successfully.
    787     OnWriteComplete(rv);
    788 
    789     // TODO(mbelshe):  Test this error case.  Maybe we should mark the socket
    790     //                 as in an error state.
    791     if (rv < 0)
    792       break;
    793   }
    794 }
    795 
    796 void SpdySession::CloseAllStreams(net::Error status) {
    797   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
    798   base::StatsCounter abandoned_push_streams(
    799       "spdy.abandoned_push_streams");
    800 
    801   if (!active_streams_.empty())
    802     abandoned_streams.Add(active_streams_.size());
    803   if (!unclaimed_pushed_streams_.empty()) {
    804     streams_abandoned_count_ += unclaimed_pushed_streams_.size();
    805     abandoned_push_streams.Add(unclaimed_pushed_streams_.size());
    806     unclaimed_pushed_streams_.clear();
    807   }
    808 
    809   for (int i = 0;i < NUM_PRIORITIES;++i) {
    810     while (!create_stream_queues_[i].empty()) {
    811       PendingCreateStream pending_create = create_stream_queues_[i].front();
    812       create_stream_queues_[i].pop();
    813       pending_create.callback->Run(ERR_ABORTED);
    814     }
    815   }
    816 
    817   while (!active_streams_.empty()) {
    818     ActiveStreamMap::iterator it = active_streams_.begin();
    819     const scoped_refptr<SpdyStream>& stream = it->second;
    820     DCHECK(stream);
    821     LOG(WARNING) << "ABANDONED (stream_id=" << stream->stream_id()
    822                  << "): " << stream->path();
    823     DeleteStream(stream->stream_id(), status);
    824   }
    825 
    826   // We also need to drain the queue.
    827   while (queue_.size())
    828     queue_.pop();
    829 }
    830 
    831 int SpdySession::GetNewStreamId() {
    832   int id = stream_hi_water_mark_;
    833   stream_hi_water_mark_ += 2;
    834   if (stream_hi_water_mark_ > 0x7fff)
    835     stream_hi_water_mark_ = 1;
    836   return id;
    837 }
    838 
    839 void SpdySession::QueueFrame(spdy::SpdyFrame* frame,
    840                              spdy::SpdyPriority priority,
    841                              SpdyStream* stream) {
    842   int length = spdy::SpdyFrame::size() + frame->length();
    843   IOBuffer* buffer = new IOBuffer(length);
    844   memcpy(buffer->data(), frame->data(), length);
    845   queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
    846 
    847   WriteSocketLater();
    848 }
    849 
    850 void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool) {
    851   // Closing all streams can have a side-effect of dropping the last reference
    852   // to |this|.  Hold a reference through this function.
    853   scoped_refptr<SpdySession> self(this);
    854 
    855   DCHECK_LT(err, OK);
    856   net_log_.AddEvent(
    857       NetLog::TYPE_SPDY_SESSION_CLOSE,
    858       make_scoped_refptr(new NetLogIntegerParameter("status", err)));
    859 
    860   // Don't close twice.  This can occur because we can have both
    861   // a read and a write outstanding, and each can complete with
    862   // an error.
    863   if (state_ != CLOSED) {
    864     state_ = CLOSED;
    865     error_ = err;
    866     if (remove_from_pool)
    867       RemoveFromPool();
    868     CloseAllStreams(err);
    869   }
    870 }
    871 
    872 Value* SpdySession::GetInfoAsValue() const {
    873   DictionaryValue* dict = new DictionaryValue();
    874 
    875   dict->SetInteger("source_id", net_log_.source().id);
    876 
    877   dict->SetString("host_port_pair", host_port_proxy_pair_.first.ToString());
    878   dict->SetString("proxy", host_port_proxy_pair_.second.ToURI());
    879 
    880   dict->SetInteger("active_streams", active_streams_.size());
    881 
    882   dict->SetInteger("unclaimed_pushed_streams",
    883       unclaimed_pushed_streams_.size());
    884 
    885   dict->SetBoolean("is_secure", is_secure_);
    886 
    887   dict->SetInteger("error", error_);
    888   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
    889 
    890   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
    891   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
    892   dict->SetInteger("streams_pushed_and_claimed_count",
    893       streams_pushed_and_claimed_count_);
    894   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
    895   dict->SetInteger("frames_received", frames_received_);
    896 
    897   dict->SetBoolean("sent_settings", sent_settings_);
    898   dict->SetBoolean("received_settings", received_settings_);
    899   return dict;
    900 }
    901 
    902 int SpdySession::GetPeerAddress(AddressList* address) const {
    903   if (!connection_->socket())
    904     return ERR_SOCKET_NOT_CONNECTED;
    905 
    906   return connection_->socket()->GetPeerAddress(address);
    907 }
    908 
    909 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
    910   if (!connection_->socket())
    911     return ERR_SOCKET_NOT_CONNECTED;
    912 
    913   return connection_->socket()->GetLocalAddress(address);
    914 }
    915 
    916 void SpdySession::ActivateStream(SpdyStream* stream) {
    917   const spdy::SpdyStreamId id = stream->stream_id();
    918   DCHECK(!IsStreamActive(id));
    919 
    920   active_streams_[id] = stream;
    921 }
    922 
    923 void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) {
    924   // For push streams, if they are being deleted normally, we leave
    925   // the stream in the unclaimed_pushed_streams_ list.  However, if
    926   // the stream is errored out, clean it up entirely.
    927   if (status != OK) {
    928     PushedStreamMap::iterator it;
    929     for (it = unclaimed_pushed_streams_.begin();
    930          it != unclaimed_pushed_streams_.end(); ++it) {
    931       scoped_refptr<SpdyStream> curr = it->second;
    932       if (id == curr->stream_id()) {
    933         unclaimed_pushed_streams_.erase(it);
    934         break;
    935       }
    936     }
    937   }
    938 
    939   // The stream might have been deleted.
    940   ActiveStreamMap::iterator it2 = active_streams_.find(id);
    941   if (it2 == active_streams_.end())
    942     return;
    943 
    944   // If this is an active stream, call the callback.
    945   const scoped_refptr<SpdyStream> stream(it2->second);
    946   active_streams_.erase(it2);
    947   if (stream)
    948     stream->OnClose(status);
    949   ProcessPendingCreateStreams();
    950 }
    951 
    952 void SpdySession::RemoveFromPool() {
    953   if (spdy_session_pool_) {
    954     spdy_session_pool_->Remove(make_scoped_refptr(this));
    955     spdy_session_pool_ = NULL;
    956   }
    957 }
    958 
    959 scoped_refptr<SpdyStream> SpdySession::GetActivePushStream(
    960     const std::string& path) {
    961   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
    962 
    963   PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(path);
    964   if (it != unclaimed_pushed_streams_.end()) {
    965     net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL);
    966     scoped_refptr<SpdyStream> stream = it->second;
    967     unclaimed_pushed_streams_.erase(it);
    968     used_push_streams.Increment();
    969     return stream;
    970   }
    971   return NULL;
    972 }
    973 
    974 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
    975   if (is_secure_) {
    976     SSLClientSocket* ssl_socket =
    977         reinterpret_cast<SSLClientSocket*>(connection_->socket());
    978     ssl_socket->GetSSLInfo(ssl_info);
    979     *was_npn_negotiated = ssl_socket->was_npn_negotiated();
    980     return true;
    981   }
    982   return false;
    983 }
    984 
    985 bool SpdySession::GetSSLCertRequestInfo(
    986     SSLCertRequestInfo* cert_request_info) {
    987   if (is_secure_) {
    988     SSLClientSocket* ssl_socket =
    989         reinterpret_cast<SSLClientSocket*>(connection_->socket());
    990     ssl_socket->GetSSLCertRequestInfo(cert_request_info);
    991     return true;
    992   }
    993   return false;
    994 }
    995 
    996 void SpdySession::OnError(spdy::SpdyFramer* framer) {
    997   CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
    998 }
    999 
   1000 void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id,
   1001                                     const char* data,
   1002                                     size_t len) {
   1003   if (net_log().IsLoggingAllEvents()) {
   1004     net_log().AddEvent(
   1005         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
   1006         make_scoped_refptr(new NetLogSpdyDataParameter(
   1007             stream_id, len, spdy::SpdyDataFlags())));
   1008   }
   1009 
   1010   if (!IsStreamActive(stream_id)) {
   1011     // NOTE:  it may just be that the stream was cancelled.
   1012     LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
   1013     return;
   1014   }
   1015 
   1016   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1017   stream->OnDataReceived(data, len);
   1018 }
   1019 
   1020 bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers,
   1021                           const scoped_refptr<SpdyStream> stream) {
   1022   int rv = OK;
   1023   rv = stream->OnResponseReceived(headers);
   1024   if (rv < 0) {
   1025     DCHECK_NE(rv, ERR_IO_PENDING);
   1026     const spdy::SpdyStreamId stream_id = stream->stream_id();
   1027     DeleteStream(stream_id, rv);
   1028     return false;
   1029   }
   1030   return true;
   1031 }
   1032 
   1033 void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame,
   1034                         const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
   1035   spdy::SpdyStreamId stream_id = frame.stream_id();
   1036   spdy::SpdyStreamId associated_stream_id = frame.associated_stream_id();
   1037 
   1038   if (net_log_.IsLoggingAllEvents()) {
   1039     net_log_.AddEvent(
   1040         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
   1041         make_scoped_refptr(new NetLogSpdySynParameter(
   1042             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
   1043             stream_id, associated_stream_id)));
   1044   }
   1045 
   1046   // Server-initiated streams should have even sequence numbers.
   1047   if ((stream_id & 0x1) != 0) {
   1048     LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
   1049     return;
   1050   }
   1051 
   1052   if (IsStreamActive(stream_id)) {
   1053     LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
   1054     return;
   1055   }
   1056 
   1057   if (associated_stream_id == 0) {
   1058     LOG(WARNING) << "Received invalid OnSyn associated stream id "
   1059                  << associated_stream_id
   1060                  << " for stream " << stream_id;
   1061     ResetStream(stream_id, spdy::INVALID_STREAM);
   1062     return;
   1063   }
   1064 
   1065   streams_pushed_count_++;
   1066 
   1067   // TODO(mbelshe): DCHECK that this is a GET method?
   1068 
   1069   // Verify that the response had a URL for us.
   1070   const std::string& url = ContainsKey(*headers, "url") ?
   1071       headers->find("url")->second : "";
   1072   if (url.empty()) {
   1073     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
   1074     LOG(WARNING) << "Pushed stream did not contain a url.";
   1075     return;
   1076   }
   1077 
   1078   GURL gurl(url);
   1079   if (!gurl.is_valid()) {
   1080     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
   1081     LOG(WARNING) << "Pushed stream url was invalid: " << url;
   1082     return;
   1083   }
   1084 
   1085   // Verify we have a valid stream association.
   1086   if (!IsStreamActive(associated_stream_id)) {
   1087     LOG(WARNING) << "Received OnSyn with inactive associated stream "
   1088                << associated_stream_id;
   1089     ResetStream(stream_id, spdy::INVALID_ASSOCIATED_STREAM);
   1090     return;
   1091   }
   1092 
   1093   scoped_refptr<SpdyStream> associated_stream =
   1094       active_streams_[associated_stream_id];
   1095   GURL associated_url(associated_stream->GetUrl());
   1096   if (associated_url.GetOrigin() != gurl.GetOrigin()) {
   1097     LOG(WARNING) << "Rejected Cross Origin Push Stream "
   1098                  << associated_stream_id;
   1099     ResetStream(stream_id, spdy::REFUSED_STREAM);
   1100     return;
   1101   }
   1102 
   1103   // There should not be an existing pushed stream with the same path.
   1104   PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url);
   1105   if (it != unclaimed_pushed_streams_.end()) {
   1106     LOG(WARNING) << "Received duplicate pushed stream with url: " << url;
   1107     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
   1108     return;
   1109   }
   1110 
   1111   scoped_refptr<SpdyStream> stream(
   1112       new SpdyStream(this, stream_id, true, net_log_));
   1113 
   1114   stream->set_path(gurl.PathForRequest());
   1115 
   1116   unclaimed_pushed_streams_[url] = stream;
   1117 
   1118   ActivateStream(stream);
   1119   stream->set_response_received();
   1120 
   1121   // Parse the headers.
   1122   if (!Respond(*headers, stream))
   1123     return;
   1124 
   1125   base::StatsCounter push_requests("spdy.pushed_streams");
   1126   push_requests.Increment();
   1127 }
   1128 
   1129 void SpdySession::OnSynReply(const spdy::SpdySynReplyControlFrame& frame,
   1130                              const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
   1131   spdy::SpdyStreamId stream_id = frame.stream_id();
   1132 
   1133   bool valid_stream = IsStreamActive(stream_id);
   1134   if (!valid_stream) {
   1135     // NOTE:  it may just be that the stream was cancelled.
   1136     LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
   1137     return;
   1138   }
   1139 
   1140   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1141   CHECK_EQ(stream->stream_id(), stream_id);
   1142   CHECK(!stream->cancelled());
   1143 
   1144   if (stream->response_received()) {
   1145     LOG(WARNING) << "Received duplicate SYN_REPLY for stream " << stream_id;
   1146     CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR);
   1147     return;
   1148   }
   1149   stream->set_response_received();
   1150 
   1151   if (net_log().IsLoggingAllEvents()) {
   1152     net_log().AddEvent(
   1153         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
   1154         make_scoped_refptr(new NetLogSpdySynParameter(
   1155             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
   1156             stream_id, 0)));
   1157   }
   1158 
   1159   Respond(*headers, stream);
   1160 }
   1161 
   1162 void SpdySession::OnHeaders(const spdy::SpdyHeadersControlFrame& frame,
   1163                             const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
   1164   spdy::SpdyStreamId stream_id = frame.stream_id();
   1165 
   1166   bool valid_stream = IsStreamActive(stream_id);
   1167   if (!valid_stream) {
   1168     // NOTE:  it may just be that the stream was cancelled.
   1169     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
   1170     return;
   1171   }
   1172 
   1173   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1174   CHECK_EQ(stream->stream_id(), stream_id);
   1175   CHECK(!stream->cancelled());
   1176 
   1177   if (net_log().IsLoggingAllEvents()) {
   1178     net_log().AddEvent(
   1179         NetLog::TYPE_SPDY_SESSION_HEADERS,
   1180         make_scoped_refptr(new NetLogSpdySynParameter(
   1181             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
   1182             stream_id, 0)));
   1183   }
   1184 
   1185   int rv = stream->OnHeaders(*headers);
   1186   if (rv < 0) {
   1187     DCHECK_NE(rv, ERR_IO_PENDING);
   1188     const spdy::SpdyStreamId stream_id = stream->stream_id();
   1189     DeleteStream(stream_id, rv);
   1190   }
   1191 }
   1192 
   1193 void SpdySession::OnControl(const spdy::SpdyControlFrame* frame) {
   1194   const linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
   1195   uint32 type = frame->type();
   1196   if (type == spdy::SYN_STREAM ||
   1197       type == spdy::SYN_REPLY ||
   1198       type == spdy::HEADERS) {
   1199     if (!spdy_framer_.ParseHeaderBlock(frame, headers.get())) {
   1200       LOG(WARNING) << "Could not parse Spdy Control Frame Header.";
   1201       int stream_id = 0;
   1202       if (type == spdy::SYN_STREAM) {
   1203         stream_id = (reinterpret_cast<const spdy::SpdySynStreamControlFrame*>
   1204                      (frame))->stream_id();
   1205       } else if (type == spdy::SYN_REPLY) {
   1206         stream_id = (reinterpret_cast<const spdy::SpdySynReplyControlFrame*>
   1207                      (frame))->stream_id();
   1208       } else if (type == spdy::HEADERS) {
   1209         stream_id = (reinterpret_cast<const spdy::SpdyHeadersControlFrame*>
   1210                      (frame))->stream_id();
   1211       }
   1212       if(IsStreamActive(stream_id))
   1213         ResetStream(stream_id, spdy::PROTOCOL_ERROR);
   1214       return;
   1215     }
   1216   }
   1217 
   1218   frames_received_++;
   1219 
   1220   switch (type) {
   1221     case spdy::GOAWAY:
   1222       OnGoAway(*reinterpret_cast<const spdy::SpdyGoAwayControlFrame*>(frame));
   1223       break;
   1224     case spdy::SETTINGS:
   1225       OnSettings(
   1226           *reinterpret_cast<const spdy::SpdySettingsControlFrame*>(frame));
   1227       break;
   1228     case spdy::RST_STREAM:
   1229       OnRst(*reinterpret_cast<const spdy::SpdyRstStreamControlFrame*>(frame));
   1230       break;
   1231     case spdy::SYN_STREAM:
   1232       OnSyn(*reinterpret_cast<const spdy::SpdySynStreamControlFrame*>(frame),
   1233             headers);
   1234       break;
   1235     case spdy::HEADERS:
   1236       OnHeaders(*reinterpret_cast<const spdy::SpdyHeadersControlFrame*>(frame),
   1237                 headers);
   1238       break;
   1239     case spdy::SYN_REPLY:
   1240       OnSynReply(
   1241           *reinterpret_cast<const spdy::SpdySynReplyControlFrame*>(frame),
   1242           headers);
   1243       break;
   1244     case spdy::WINDOW_UPDATE:
   1245       OnWindowUpdate(
   1246           *reinterpret_cast<const spdy::SpdyWindowUpdateControlFrame*>(frame));
   1247       break;
   1248     default:
   1249       DCHECK(false);  // Error!
   1250   }
   1251 }
   1252 
   1253 void SpdySession::OnRst(const spdy::SpdyRstStreamControlFrame& frame) {
   1254   spdy::SpdyStreamId stream_id = frame.stream_id();
   1255 
   1256   net_log().AddEvent(
   1257       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
   1258       make_scoped_refptr(
   1259           new NetLogSpdyRstParameter(stream_id, frame.status())));
   1260 
   1261   bool valid_stream = IsStreamActive(stream_id);
   1262   if (!valid_stream) {
   1263     // NOTE:  it may just be that the stream was cancelled.
   1264     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
   1265     return;
   1266   }
   1267   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1268   CHECK_EQ(stream->stream_id(), stream_id);
   1269   CHECK(!stream->cancelled());
   1270 
   1271   if (frame.status() == 0) {
   1272     stream->OnDataReceived(NULL, 0);
   1273   } else {
   1274     LOG(ERROR) << "Spdy stream closed: " << frame.status();
   1275     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
   1276     //                For now, it doesn't matter much - it is a protocol error.
   1277     DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
   1278   }
   1279 }
   1280 
   1281 void SpdySession::OnGoAway(const spdy::SpdyGoAwayControlFrame& frame) {
   1282   net_log_.AddEvent(
   1283       NetLog::TYPE_SPDY_SESSION_GOAWAY,
   1284       make_scoped_refptr(
   1285           new NetLogSpdyGoAwayParameter(frame.last_accepted_stream_id(),
   1286                                         active_streams_.size(),
   1287                                         unclaimed_pushed_streams_.size())));
   1288   RemoveFromPool();
   1289   CloseAllStreams(net::ERR_ABORTED);
   1290 
   1291   // TODO(willchan): Cancel any streams that are past the GoAway frame's
   1292   // |last_accepted_stream_id|.
   1293 
   1294   // Don't bother killing any streams that are still reading.  They'll either
   1295   // complete successfully or get an ERR_CONNECTION_CLOSED when the socket is
   1296   // closed.
   1297 }
   1298 
   1299 void SpdySession::OnSettings(const spdy::SpdySettingsControlFrame& frame) {
   1300   spdy::SpdySettings settings;
   1301   if (spdy_framer_.ParseSettings(&frame, &settings)) {
   1302     HandleSettings(settings);
   1303     spdy_settings_->Set(host_port_pair(), settings);
   1304   }
   1305 
   1306   received_settings_ = true;
   1307 
   1308   net_log_.AddEvent(
   1309       NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
   1310       make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
   1311 }
   1312 
   1313 void SpdySession::OnWindowUpdate(
   1314     const spdy::SpdyWindowUpdateControlFrame& frame) {
   1315   spdy::SpdyStreamId stream_id = frame.stream_id();
   1316   if (!IsStreamActive(stream_id)) {
   1317     LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
   1318     return;
   1319   }
   1320 
   1321   int delta_window_size = static_cast<int>(frame.delta_window_size());
   1322   if (delta_window_size < 1) {
   1323     LOG(WARNING) << "Received WINDOW_UPDATE with an invalid delta_window_size "
   1324                  << delta_window_size;
   1325     ResetStream(stream_id, spdy::FLOW_CONTROL_ERROR);
   1326     return;
   1327   }
   1328 
   1329   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1330   CHECK_EQ(stream->stream_id(), stream_id);
   1331   CHECK(!stream->cancelled());
   1332 
   1333   if (use_flow_control_)
   1334     stream->IncreaseSendWindowSize(delta_window_size);
   1335 
   1336   net_log_.AddEvent(
   1337       NetLog::TYPE_SPDY_SESSION_SEND_WINDOW_UPDATE,
   1338       make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
   1339           stream_id, delta_window_size, stream->send_window_size())));
   1340 }
   1341 
   1342 void SpdySession::SendWindowUpdate(spdy::SpdyStreamId stream_id,
   1343                                    int delta_window_size) {
   1344   DCHECK(IsStreamActive(stream_id));
   1345   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
   1346   CHECK_EQ(stream->stream_id(), stream_id);
   1347 
   1348   net_log_.AddEvent(
   1349       NetLog::TYPE_SPDY_SESSION_RECV_WINDOW_UPDATE,
   1350       make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
   1351           stream_id, delta_window_size, stream->recv_window_size())));
   1352 
   1353   scoped_ptr<spdy::SpdyWindowUpdateControlFrame> window_update_frame(
   1354       spdy_framer_.CreateWindowUpdate(stream_id, delta_window_size));
   1355   QueueFrame(window_update_frame.get(), stream->priority(), stream);
   1356 }
   1357 
   1358 // Given a cwnd that we would have sent to the server, modify it based on the
   1359 // field trial policy.
   1360 uint32 ApplyCwndFieldTrialPolicy(int cwnd) {
   1361   base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd");
   1362   if (!trial) {
   1363       LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList";
   1364       return cwnd;
   1365   }
   1366   if (trial->group_name() == "cwnd10")
   1367     return 10;
   1368   else if (trial->group_name() == "cwnd16")
   1369     return 16;
   1370   else if (trial->group_name() == "cwndMin16")
   1371     return std::max(cwnd, 16);
   1372   else if (trial->group_name() == "cwndMin10")
   1373     return std::max(cwnd, 10);
   1374   else if (trial->group_name() == "cwndDynamic")
   1375     return cwnd;
   1376   NOTREACHED();
   1377   return cwnd;
   1378 }
   1379 
   1380 void SpdySession::SendSettings() {
   1381   // Note:  we're copying the settings here, so that we can potentially modify
   1382   // the settings for the field trial.  When removing the field trial, make
   1383   // this a reference to the const SpdySettings again.
   1384   spdy::SpdySettings settings = spdy_settings_->Get(host_port_pair());
   1385   if (settings.empty())
   1386     return;
   1387 
   1388   // Record Histogram Data and Apply the SpdyCwnd FieldTrial if applicable.
   1389   for (spdy::SpdySettings::iterator i = settings.begin(),
   1390            end = settings.end(); i != end; ++i) {
   1391     const uint32 id = i->first.id();
   1392     const uint32 val = i->second;
   1393     switch (id) {
   1394       case spdy::SETTINGS_CURRENT_CWND:
   1395         uint32 cwnd = 0;
   1396         cwnd = ApplyCwndFieldTrialPolicy(val);
   1397         UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent",
   1398                                     cwnd,
   1399                                     1, 200, 100);
   1400         if (cwnd != val) {
   1401           i->second = cwnd;
   1402           i->first.set_flags(spdy::SETTINGS_FLAG_PLEASE_PERSIST);
   1403           spdy_settings_->Set(host_port_pair(), settings);
   1404         }
   1405         break;
   1406     }
   1407   }
   1408 
   1409   HandleSettings(settings);
   1410 
   1411   net_log_.AddEvent(
   1412       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
   1413       make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
   1414 
   1415   // Create the SETTINGS frame and send it.
   1416   scoped_ptr<spdy::SpdySettingsControlFrame> settings_frame(
   1417       spdy_framer_.CreateSettings(settings));
   1418   sent_settings_ = true;
   1419   QueueFrame(settings_frame.get(), 0, NULL);
   1420 }
   1421 
   1422 void SpdySession::HandleSettings(const spdy::SpdySettings& settings) {
   1423   for (spdy::SpdySettings::const_iterator i = settings.begin(),
   1424            end = settings.end(); i != end; ++i) {
   1425     const uint32 id = i->first.id();
   1426     const uint32 val = i->second;
   1427     switch (id) {
   1428       case spdy::SETTINGS_MAX_CONCURRENT_STREAMS:
   1429         max_concurrent_streams_ = std::min(static_cast<size_t>(val),
   1430                                            max_concurrent_stream_limit_);
   1431         ProcessPendingCreateStreams();
   1432         break;
   1433     }
   1434   }
   1435 }
   1436 
   1437 void SpdySession::RecordHistograms() {
   1438   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
   1439                               streams_initiated_count_,
   1440                               0, 300, 50);
   1441   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
   1442                               streams_pushed_count_,
   1443                               0, 300, 50);
   1444   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
   1445                               streams_pushed_and_claimed_count_,
   1446                               0, 300, 50);
   1447   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
   1448                               streams_abandoned_count_,
   1449                               0, 300, 50);
   1450   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
   1451                             sent_settings_ ? 1 : 0, 2);
   1452   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
   1453                             received_settings_ ? 1 : 0, 2);
   1454   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
   1455                               stalled_streams_,
   1456                               0, 300, 50);
   1457   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
   1458                             stalled_streams_ > 0 ? 1 : 0, 2);
   1459 
   1460   if (received_settings_) {
   1461     // Enumerate the saved settings, and set histograms for it.
   1462     const spdy::SpdySettings& settings = spdy_settings_->Get(host_port_pair());
   1463 
   1464     spdy::SpdySettings::const_iterator it;
   1465     for (it = settings.begin(); it != settings.end(); ++it) {
   1466       const spdy::SpdySetting setting = *it;
   1467       switch (setting.first.id()) {
   1468         case spdy::SETTINGS_CURRENT_CWND:
   1469           // Record several different histograms to see if cwnd converges
   1470           // for larger volumes of data being sent.
   1471           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
   1472                                       setting.second,
   1473                                       1, 200, 100);
   1474           if (bytes_received_ > 10 * 1024) {
   1475             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
   1476                                         setting.second,
   1477                                         1, 200, 100);
   1478             if (bytes_received_ > 25 * 1024) {
   1479               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
   1480                                           setting.second,
   1481                                           1, 200, 100);
   1482               if (bytes_received_ > 50 * 1024) {
   1483                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
   1484                                             setting.second,
   1485                                             1, 200, 100);
   1486                 if (bytes_received_ > 100 * 1024) {
   1487                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
   1488                                               setting.second,
   1489                                               1, 200, 100);
   1490                 }
   1491               }
   1492             }
   1493           }
   1494           break;
   1495         case spdy::SETTINGS_ROUND_TRIP_TIME:
   1496           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
   1497                                       setting.second,
   1498                                       1, 1200, 100);
   1499           break;
   1500         case spdy::SETTINGS_DOWNLOAD_RETRANS_RATE:
   1501           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
   1502                                       setting.second,
   1503                                       1, 100, 50);
   1504           break;
   1505       }
   1506     }
   1507   }
   1508 }
   1509 
   1510 void SpdySession::InvokeUserStreamCreationCallback(
   1511     scoped_refptr<SpdyStream>* stream) {
   1512   PendingCallbackMap::iterator it = pending_callback_map_.find(stream);
   1513 
   1514   // Exit if the request has already been cancelled.
   1515   if (it == pending_callback_map_.end())
   1516     return;
   1517 
   1518   CompletionCallback* callback = it->second.callback;
   1519   int result = it->second.result;
   1520   pending_callback_map_.erase(it);
   1521   callback->Run(result);
   1522 }
   1523 
   1524 }  // namespace net
   1525