Home | History | Annotate | Download | only in flip
      1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/flip/flip_session.h"
      6 
      7 #include "base/basictypes.h"
      8 #include "base/logging.h"
      9 #include "base/message_loop.h"
     10 #include "base/rand_util.h"
     11 #include "base/stats_counters.h"
     12 #include "base/stl_util-inl.h"
     13 #include "base/string_util.h"
     14 #include "net/base/connection_type_histograms.h"
     15 #include "net/base/load_flags.h"
     16 #include "net/base/load_log.h"
     17 #include "net/base/net_util.h"
     18 #include "net/flip/flip_frame_builder.h"
     19 #include "net/flip/flip_protocol.h"
     20 #include "net/flip/flip_stream.h"
     21 #include "net/http/http_network_session.h"
     22 #include "net/http/http_request_info.h"
     23 #include "net/http/http_response_headers.h"
     24 #include "net/http/http_response_info.h"
     25 #include "net/socket/client_socket.h"
     26 #include "net/socket/client_socket_factory.h"
     27 #include "net/socket/ssl_client_socket.h"
     28 #include "net/tools/dump_cache/url_to_filename_encoder.h"
     29 
     30 namespace {
     31 
     32 // Diagnostics function to dump the headers of a request.
     33 // TODO(mbelshe): Remove this function.
     34 void DumpFlipHeaders(const flip::FlipHeaderBlock& headers) {
     35   // Because this function gets called on every request,
     36   // take extra care to optimize it away if logging is turned off.
     37   if (logging::LOG_INFO < logging::GetMinLogLevel())
     38     return;
     39 
     40   flip::FlipHeaderBlock::const_iterator it = headers.begin();
     41   while (it != headers.end()) {
     42     std::string val = (*it).second;
     43     std::string::size_type pos = 0;
     44     while ((pos = val.find('\0', pos)) != val.npos)
     45       val[pos] = '\n';
     46     LOG(INFO) << (*it).first << "==" << val;
     47     ++it;
     48   }
     49 }
     50 
     51 }  // namespace
     52 
     53 namespace net {
     54 
     55 namespace {
     56 
     57 #ifdef WIN32
     58 // We use an artificially small buffer size on windows because the async IO
     59 // system will artifiially delay IO completions when we use large buffers.
     60 const int kReadBufferSize = 2 * 1024;
     61 #else
     62 const int kReadBufferSize = 8 * 1024;
     63 #endif
     64 
     65 // Convert a FlipHeaderBlock into an HttpResponseInfo.
     66 // |headers| input parameter with the FlipHeaderBlock.
     67 // |info| output parameter for the HttpResponseInfo.
     68 // Returns true if successfully converted.  False if there was a failure
     69 // or if the FlipHeaderBlock was invalid.
     70 bool FlipHeadersToHttpResponse(const flip::FlipHeaderBlock& headers,
     71                                HttpResponseInfo* response) {
     72   std::string version;
     73   std::string status;
     74 
     75   // The "status" and "version" headers are required.
     76   flip::FlipHeaderBlock::const_iterator it;
     77   it = headers.find("status");
     78   if (it == headers.end()) {
     79     LOG(ERROR) << "FlipHeaderBlock without status header.";
     80     return false;
     81   }
     82   status = it->second;
     83 
     84   // Grab the version.  If not provided by the server,
     85   it = headers.find("version");
     86   if (it == headers.end()) {
     87     LOG(ERROR) << "FlipHeaderBlock without version header.";
     88     return false;
     89   }
     90   version = it->second;
     91 
     92   std::string raw_headers(version);
     93   raw_headers.push_back(' ');
     94   raw_headers.append(status);
     95   raw_headers.push_back('\0');
     96   for (it = headers.begin(); it != headers.end(); ++it) {
     97     // For each value, if the server sends a NUL-separated
     98     // list of values, we separate that back out into
     99     // individual headers for each value in the list.
    100     // e.g.
    101     //    Set-Cookie "foo\0bar"
    102     // becomes
    103     //    Set-Cookie: foo\0
    104     //    Set-Cookie: bar\0
    105     std::string value = it->second;
    106     size_t start = 0;
    107     size_t end = 0;
    108     do {
    109       end = value.find('\0', start);
    110       std::string tval;
    111       if (end != value.npos)
    112         tval = value.substr(start, (end - start));
    113       else
    114         tval = value.substr(start);
    115       raw_headers.append(it->first);
    116       raw_headers.push_back(':');
    117       raw_headers.append(tval);
    118       raw_headers.push_back('\0');
    119       start = end + 1;
    120     } while (end != value.npos);
    121   }
    122 
    123   response->headers = new HttpResponseHeaders(raw_headers);
    124   response->was_fetched_via_spdy = true;
    125   return true;
    126 }
    127 
    128 // Create a FlipHeaderBlock for a Flip SYN_STREAM Frame from
    129 // a HttpRequestInfo block.
    130 void CreateFlipHeadersFromHttpRequest(
    131     const HttpRequestInfo& info, flip::FlipHeaderBlock* headers) {
    132   static const char kHttpProtocolVersion[] = "HTTP/1.1";
    133 
    134   HttpUtil::HeadersIterator it(info.extra_headers.begin(),
    135                                info.extra_headers.end(),
    136                                "\r\n");
    137   while (it.GetNext()) {
    138     std::string name = StringToLowerASCII(it.name());
    139     if (headers->find(name) == headers->end()) {
    140       (*headers)[name] = it.values();
    141     } else {
    142       std::string new_value = (*headers)[name];
    143       new_value += "\0";
    144       new_value += it.values();
    145       (*headers)[name] = new_value;
    146     }
    147   }
    148 
    149   // TODO(mbelshe): Add Proxy headers here. (See http_network_transaction.cc)
    150   // TODO(mbelshe): Add authentication headers here.
    151 
    152   (*headers)["method"] = info.method;
    153   (*headers)["url"] = info.url.spec();
    154   (*headers)["version"] = kHttpProtocolVersion;
    155   if (info.user_agent.length())
    156     (*headers)["user-agent"] = info.user_agent;
    157   if (!info.referrer.is_empty())
    158     (*headers)["referer"] = info.referrer.spec();
    159 
    160   // Honor load flags that impact proxy caches.
    161   if (info.load_flags & LOAD_BYPASS_CACHE) {
    162     (*headers)["pragma"] = "no-cache";
    163     (*headers)["cache-control"] = "no-cache";
    164   } else if (info.load_flags & LOAD_VALIDATE_CACHE) {
    165     (*headers)["cache-control"] = "max-age=0";
    166   }
    167 }
    168 
    169 void AdjustSocketBufferSizes(ClientSocket* socket) {
    170   // Adjust socket buffer sizes.
    171   // FLIP uses one socket, and we want a really big buffer.
    172   // This greatly helps on links with packet loss - we can even
    173   // outperform Vista's dynamic window sizing algorithm.
    174   // TODO(mbelshe): more study.
    175   const int kSocketBufferSize = 512 * 1024;
    176   socket->SetReceiveBufferSize(kSocketBufferSize);
    177   socket->SetSendBufferSize(kSocketBufferSize);
    178 }
    179 
    180 }  // namespace
    181 
    182 // static
    183 bool FlipSession::use_ssl_ = true;
    184 
    185 FlipSession::FlipSession(const std::string& host, HttpNetworkSession* session)
    186     : ALLOW_THIS_IN_INITIALIZER_LIST(
    187           connect_callback_(this, &FlipSession::OnTCPConnect)),
    188       ALLOW_THIS_IN_INITIALIZER_LIST(
    189           ssl_connect_callback_(this, &FlipSession::OnSSLConnect)),
    190       ALLOW_THIS_IN_INITIALIZER_LIST(
    191           read_callback_(this, &FlipSession::OnReadComplete)),
    192       ALLOW_THIS_IN_INITIALIZER_LIST(
    193           write_callback_(this, &FlipSession::OnWriteComplete)),
    194       domain_(host),
    195       session_(session),
    196       connection_(new ClientSocketHandle),
    197       read_buffer_(new IOBuffer(kReadBufferSize)),
    198       read_pending_(false),
    199       stream_hi_water_mark_(1),  // Always start at 1 for the first stream id.
    200       write_pending_(false),
    201       delayed_write_pending_(false),
    202       is_secure_(false),
    203       error_(OK),
    204       state_(IDLE),
    205       streams_initiated_count_(0),
    206       streams_pushed_count_(0),
    207       streams_pushed_and_claimed_count_(0),
    208       streams_abandoned_count_(0) {
    209   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
    210 
    211   flip_framer_.set_visitor(this);
    212 
    213   session_->ssl_config_service()->GetSSLConfig(&ssl_config_);
    214 
    215   // TODO(agl): This is a temporary hack for testing reasons. In the medium
    216   // term we'll want to use NPN for all HTTPS connections and use the protocol
    217   // suggested.
    218   //
    219   // In the event that the server supports Next Protocol Negotiation, but
    220   // doesn't support either of these protocols, we'll request the first
    221   // protocol in the list. Because of that, HTTP is listed first because it's
    222   // what we'll actually fallback to in the case that the server doesn't
    223   // support SPDY.
    224   ssl_config_.next_protos = "\007http1.1\004spdy";
    225 }
    226 
    227 FlipSession::~FlipSession() {
    228   // Cleanup all the streams.
    229   CloseAllStreams(net::ERR_ABORTED);
    230 
    231   if (connection_->is_initialized()) {
    232     // With Flip we can't recycle sockets.
    233     connection_->socket()->Disconnect();
    234   }
    235 
    236   // TODO(willchan): Don't hardcode port 80 here.
    237   DCHECK(!session_->flip_session_pool()->HasSession(
    238       HostResolver::RequestInfo(domain_, 80)));
    239 
    240   // Record per-session histograms here.
    241   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
    242       streams_initiated_count_,
    243       0, 300, 50);
    244   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
    245       streams_pushed_count_,
    246       0, 300, 50);
    247   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
    248       streams_pushed_and_claimed_count_,
    249       0, 300, 50);
    250   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
    251       streams_abandoned_count_,
    252       0, 300, 50);
    253 }
    254 
    255 void FlipSession::InitializeWithSocket(ClientSocketHandle* connection) {
    256   static StatsCounter flip_sessions("flip.sessions");
    257   flip_sessions.Increment();
    258 
    259   AdjustSocketBufferSizes(connection->socket());
    260 
    261   state_ = CONNECTED;
    262   connection_.reset(connection);
    263 
    264   // This is a newly initialized session that no client should have a handle to
    265   // yet, so there's no need to start writing data as in OnTCPConnect(), but we
    266   // should start reading data.
    267   ReadSocket();
    268 }
    269 
    270 net::Error FlipSession::Connect(const std::string& group_name,
    271                                 const HostResolver::RequestInfo& host,
    272                                 RequestPriority priority,
    273                                 LoadLog* load_log) {
    274   DCHECK(priority >= FLIP_PRIORITY_HIGHEST && priority <= FLIP_PRIORITY_LOWEST);
    275 
    276   // If the connect process is started, let the caller continue.
    277   if (state_ > IDLE)
    278     return net::OK;
    279 
    280   state_ = CONNECTING;
    281 
    282   static StatsCounter flip_sessions("flip.sessions");
    283   flip_sessions.Increment();
    284 
    285   int rv = connection_->Init(group_name, host, priority, &connect_callback_,
    286                             session_->tcp_socket_pool(), load_log);
    287   DCHECK(rv <= 0);
    288 
    289   // If the connect is pending, we still return ok.  The APIs enqueue
    290   // work until after the connect completes asynchronously later.
    291   if (rv == net::ERR_IO_PENDING)
    292     return net::OK;
    293   return static_cast<net::Error>(rv);
    294 }
    295 
    296 scoped_refptr<FlipStream> FlipSession::GetOrCreateStream(
    297     const HttpRequestInfo& request,
    298     const UploadDataStream* upload_data,
    299     LoadLog* log) {
    300   const GURL& url = request.url;
    301   const std::string& path = url.PathForRequest();
    302 
    303   scoped_refptr<FlipStream> stream;
    304 
    305   // Check if we have a push stream for this path.
    306   if (request.method == "GET") {
    307     stream = GetPushStream(path);
    308     if (stream) {
    309       DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
    310       streams_pushed_and_claimed_count_++;
    311       return stream;
    312     }
    313   }
    314 
    315   // Check if we have a pending push stream for this url.
    316   PendingStreamMap::iterator it;
    317   it = pending_streams_.find(path);
    318   if (it != pending_streams_.end()) {
    319     DCHECK(!it->second);
    320     // Server will assign a stream id when the push stream arrives.  Use 0 for
    321     // now.
    322     LoadLog::AddEvent(log, LoadLog::TYPE_FLIP_STREAM_ADOPTED_PUSH_STREAM);
    323     FlipStream* stream = new FlipStream(this, 0, true, log);
    324     stream->set_path(path);
    325     it->second = stream;
    326     return it->second;
    327   }
    328 
    329   const flip::FlipStreamId stream_id = GetNewStreamId();
    330 
    331   // If we still don't have a stream, activate one now.
    332   stream = new FlipStream(this, stream_id, false, log);
    333   stream->set_priority(request.priority);
    334   stream->set_path(path);
    335   ActivateStream(stream);
    336 
    337   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
    338       static_cast<int>(request.priority), 0, 10, 11);
    339 
    340   LOG(INFO) << "FlipStream: Creating stream " << stream_id << " for " << url;
    341 
    342   // TODO(mbelshe): Optimize memory allocations
    343   DCHECK(request.priority >= FLIP_PRIORITY_HIGHEST &&
    344          request.priority <= FLIP_PRIORITY_LOWEST);
    345 
    346   // Convert from HttpRequestHeaders to Flip Headers.
    347   flip::FlipHeaderBlock headers;
    348   CreateFlipHeadersFromHttpRequest(request, &headers);
    349 
    350   flip::FlipControlFlags flags = flip::CONTROL_FLAG_NONE;
    351   if (!request.upload_data || !upload_data->size())
    352     flags = flip::CONTROL_FLAG_FIN;
    353 
    354   // Create a SYN_STREAM packet and add to the output queue.
    355   scoped_ptr<flip::FlipSynStreamControlFrame> syn_frame(
    356       flip_framer_.CreateSynStream(stream_id, request.priority, flags, false,
    357                                    &headers));
    358   int length = flip::FlipFrame::size() + syn_frame->length();
    359   IOBuffer* buffer = new IOBuffer(length);
    360   memcpy(buffer->data(), syn_frame->data(), length);
    361   queue_.push(FlipIOBuffer(buffer, length, request.priority, stream));
    362 
    363   static StatsCounter flip_requests("flip.requests");
    364   flip_requests.Increment();
    365 
    366   LOG(INFO) << "FETCHING: " << request.url.spec();
    367   streams_initiated_count_++;
    368 
    369   LOG(INFO) << "FLIP SYN_STREAM HEADERS ----------------------------------";
    370   DumpFlipHeaders(headers);
    371 
    372   // Schedule to write to the socket after we've made it back
    373   // to the message loop so that we can aggregate multiple
    374   // requests.
    375   // TODO(mbelshe): Should we do the "first" request immediately?
    376   //                maybe we should only 'do later' for subsequent
    377   //                requests.
    378   WriteSocketLater();
    379 
    380   return stream;
    381 }
    382 
    383 int FlipSession::WriteStreamData(flip::FlipStreamId stream_id,
    384                                  net::IOBuffer* data, int len) {
    385   LOG(INFO) << "Writing Stream Data for stream " << stream_id << " (" << len
    386             << " bytes)";
    387   const int kMss = 1430;  // This is somewhat arbitrary and not really fixed,
    388                           // but it will always work reasonably with ethernet.
    389   // Chop the world into 2-packet chunks.  This is somewhat arbitrary, but
    390   // is reasonably small and ensures that we elicit ACKs quickly from TCP
    391   // (because TCP tries to only ACK every other packet).
    392   const int kMaxFlipFrameChunkSize = (2 * kMss) - flip::FlipFrame::size();
    393 
    394   // Find our stream
    395   DCHECK(IsStreamActive(stream_id));
    396   scoped_refptr<FlipStream> stream = active_streams_[stream_id];
    397   CHECK(stream->stream_id() == stream_id);
    398   if (!stream)
    399     return ERR_INVALID_FLIP_STREAM;
    400 
    401   // TODO(mbelshe):  Setting of the FIN is assuming that the caller will pass
    402   //                 all data to write in a single chunk.  Is this always true?
    403 
    404   // Set the flags on the upload.
    405   flip::FlipDataFlags flags = flip::DATA_FLAG_FIN;
    406   if (len > kMaxFlipFrameChunkSize) {
    407     len = kMaxFlipFrameChunkSize;
    408     flags = flip::DATA_FLAG_NONE;
    409   }
    410 
    411   // TODO(mbelshe): reduce memory copies here.
    412   scoped_ptr<flip::FlipDataFrame> frame(
    413       flip_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
    414   int length = flip::FlipFrame::size() + frame->length();
    415   IOBufferWithSize* buffer = new IOBufferWithSize(length);
    416   memcpy(buffer->data(), frame->data(), length);
    417   queue_.push(FlipIOBuffer(buffer, length, stream->priority(), stream));
    418 
    419   // Whenever we queue onto the socket we need to ensure that we will write to
    420   // it later.
    421   WriteSocketLater();
    422 
    423   return ERR_IO_PENDING;
    424 }
    425 
    426 bool FlipSession::CancelStream(flip::FlipStreamId stream_id) {
    427   LOG(INFO) << "Cancelling stream " << stream_id;
    428   if (!IsStreamActive(stream_id))
    429     return false;
    430 
    431   // TODO(mbelshe): We should send a FIN_STREAM control frame here
    432   //                so that the server can cancel a large send.
    433 
    434   // TODO(mbelshe): Write a method for tearing down a stream
    435   //                that cleans it out of the active list, the pending list,
    436   //                etc.
    437   scoped_refptr<FlipStream> stream = active_streams_[stream_id];
    438   DeactivateStream(stream_id);
    439   return true;
    440 }
    441 
    442 bool FlipSession::IsStreamActive(flip::FlipStreamId stream_id) const {
    443   return ContainsKey(active_streams_, stream_id);
    444 }
    445 
    446 LoadState FlipSession::GetLoadState() const {
    447   // NOTE: The application only queries the LoadState via the
    448   //       FlipNetworkTransaction, and details are only needed when
    449   //       we're in the process of connecting.
    450 
    451   // If we're connecting, defer to the connection to give us the actual
    452   // LoadState.
    453   if (state_ == CONNECTING)
    454     return connection_->GetLoadState();
    455 
    456   // Just report that we're idle since the session could be doing
    457   // many things concurrently.
    458   return LOAD_STATE_IDLE;
    459 }
    460 
    461 void FlipSession::OnTCPConnect(int result) {
    462   LOG(INFO) << "Flip socket connected (result=" << result << ")";
    463 
    464   // We shouldn't be coming through this path if we didn't just open a fresh
    465   // socket (or have an error trying to do so).
    466   DCHECK(!connection_->socket() || !connection_->is_reused());
    467 
    468   UpdateConnectionTypeHistograms(CONNECTION_SPDY, result >= 0);
    469 
    470   if (result != net::OK) {
    471     DCHECK_LT(result, 0);
    472     CloseSessionOnError(static_cast<net::Error>(result));
    473     return;
    474   }
    475 
    476   AdjustSocketBufferSizes(connection_->socket());
    477 
    478   if (use_ssl_) {
    479     // Add a SSL socket on top of our existing transport socket.
    480     ClientSocket* socket = connection_->release_socket();
    481     // TODO(mbelshe): Fix the hostname.  This is BROKEN without having
    482     //                a real hostname.
    483     socket = session_->socket_factory()->CreateSSLClientSocket(
    484         socket, "" /* request_->url.HostNoBrackets() */ , ssl_config_);
    485     connection_->set_socket(socket);
    486     is_secure_ = true;
    487     // TODO(willchan): Plumb LoadLog into FLIP code.
    488     int status = connection_->socket()->Connect(&ssl_connect_callback_, NULL);
    489     if (status != ERR_IO_PENDING)
    490       OnSSLConnect(status);
    491   } else {
    492     DCHECK_EQ(state_, CONNECTING);
    493     state_ = CONNECTED;
    494 
    495     // Make sure we get any pending data sent.
    496     WriteSocketLater();
    497     // Start reading
    498     ReadSocket();
    499   }
    500 }
    501 
    502 void FlipSession::OnSSLConnect(int result) {
    503   // TODO(mbelshe): We need to replicate the functionality of
    504   //   HttpNetworkTransaction::DoSSLConnectComplete here, where it calls
    505   //   HandleCertificateError() and such.
    506   if (IsCertificateError(result))
    507     result = OK;   // TODO(mbelshe): pretend we're happy anyway.
    508 
    509   if (result == OK) {
    510     DCHECK_EQ(state_, CONNECTING);
    511     state_ = CONNECTED;
    512 
    513     // After we've connected, send any data to the server, and then issue
    514     // our read.
    515     WriteSocketLater();
    516     ReadSocket();
    517   } else {
    518     DCHECK_LT(result, 0);  // It should be an error, not a byte count.
    519     CloseSessionOnError(static_cast<net::Error>(result));
    520   }
    521 }
    522 
    523 void FlipSession::OnReadComplete(int bytes_read) {
    524   // Parse a frame.  For now this code requires that the frame fit into our
    525   // buffer (32KB).
    526   // TODO(mbelshe): support arbitrarily large frames!
    527 
    528   LOG(INFO) << "Flip socket read: " << bytes_read << " bytes";
    529 
    530   read_pending_ = false;
    531 
    532   if (bytes_read <= 0) {
    533     // Session is tearing down.
    534     net::Error error = static_cast<net::Error>(bytes_read);
    535     if (error == OK)
    536       error = ERR_CONNECTION_CLOSED;
    537     CloseSessionOnError(error);
    538     return;
    539   }
    540 
    541   // The FlipFramer will use callbacks onto |this| as it parses frames.
    542   // When errors occur, those callbacks can lead to teardown of all references
    543   // to |this|, so maintain a reference to self during this call for safe
    544   // cleanup.
    545   scoped_refptr<FlipSession> self(this);
    546 
    547   char *data = read_buffer_->data();
    548   while (bytes_read &&
    549          flip_framer_.error_code() == flip::FlipFramer::FLIP_NO_ERROR) {
    550     uint32 bytes_processed = flip_framer_.ProcessInput(data, bytes_read);
    551     bytes_read -= bytes_processed;
    552     data += bytes_processed;
    553     if (flip_framer_.state() == flip::FlipFramer::FLIP_DONE)
    554       flip_framer_.Reset();
    555   }
    556 
    557   if (state_ != CLOSED)
    558     ReadSocket();
    559 }
    560 
    561 void FlipSession::OnWriteComplete(int result) {
    562   DCHECK(write_pending_);
    563   DCHECK(in_flight_write_.size());
    564   DCHECK(result != 0);  // This shouldn't happen for write.
    565 
    566   write_pending_ = false;
    567 
    568   LOG(INFO) << "Flip write complete (result=" << result << ") for stream: "
    569             << in_flight_write_.stream()->stream_id();
    570 
    571   if (result >= 0) {
    572     // It should not be possible to have written more bytes than our
    573     // in_flight_write_.
    574     DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
    575 
    576     in_flight_write_.buffer()->DidConsume(result);
    577 
    578     // We only notify the stream when we've fully written the pending frame.
    579     if (!in_flight_write_.buffer()->BytesRemaining()) {
    580       scoped_refptr<FlipStream> stream = in_flight_write_.stream();
    581       DCHECK(stream.get());
    582 
    583       // Report the number of bytes written to the caller, but exclude the
    584       // frame size overhead.  NOTE:  if this frame was compressed the reported
    585       // bytes written is the compressed size, not the original size.
    586       if (result > 0) {
    587         result = in_flight_write_.buffer()->size();
    588         DCHECK_GT(result, static_cast<int>(flip::FlipFrame::size()));
    589         result -= static_cast<int>(flip::FlipFrame::size());
    590       }
    591 
    592       // It is possible that the stream was cancelled while we were writing
    593       // to the socket.
    594       if (!stream->cancelled())
    595         stream->OnWriteComplete(result);
    596 
    597       // Cleanup the write which just completed.
    598       in_flight_write_.release();
    599     }
    600 
    601     // Write more data.  We're already in a continuation, so we can
    602     // go ahead and write it immediately (without going back to the
    603     // message loop).
    604     WriteSocketLater();
    605   } else {
    606     in_flight_write_.release();
    607 
    608     // The stream is now errored.  Close it down.
    609     CloseSessionOnError(static_cast<net::Error>(result));
    610   }
    611 }
    612 
    613 void FlipSession::ReadSocket() {
    614   if (read_pending_)
    615     return;
    616 
    617   if (state_ == CLOSED) {
    618     NOTREACHED();
    619     return;
    620   }
    621 
    622   CHECK(connection_.get());
    623   CHECK(connection_->socket());
    624   int bytes_read = connection_->socket()->Read(read_buffer_.get(),
    625                                                kReadBufferSize,
    626                                                &read_callback_);
    627   switch (bytes_read) {
    628     case 0:
    629       // Socket is closed!
    630       // TODO(mbelshe): Need to abort any active streams here.
    631       DCHECK(!active_streams_.size());
    632       return;
    633     case net::ERR_IO_PENDING:
    634       // Waiting for data.  Nothing to do now.
    635       read_pending_ = true;
    636       return;
    637     default:
    638       // Data was read, process it.
    639       // Schedule the work through the message loop to avoid recursive
    640       // callbacks.
    641       read_pending_ = true;
    642       MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
    643           this, &FlipSession::OnReadComplete, bytes_read));
    644       break;
    645   }
    646 }
    647 
    648 void FlipSession::WriteSocketLater() {
    649   if (delayed_write_pending_)
    650     return;
    651 
    652   delayed_write_pending_ = true;
    653   MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
    654       this, &FlipSession::WriteSocket));
    655 }
    656 
    657 void FlipSession::WriteSocket() {
    658   // This function should only be called via WriteSocketLater.
    659   DCHECK(delayed_write_pending_);
    660   delayed_write_pending_ = false;
    661 
    662   // If the socket isn't connected yet, just wait; we'll get called
    663   // again when the socket connection completes.  If the socket is
    664   // closed, just return.
    665   if (state_ < CONNECTED || state_ == CLOSED)
    666     return;
    667 
    668   if (write_pending_)   // Another write is in progress still.
    669     return;
    670 
    671   // Loop sending frames until we've sent everything or until the write
    672   // returns error (or ERR_IO_PENDING).
    673   while (in_flight_write_.buffer() || queue_.size()) {
    674     if (!in_flight_write_.buffer()) {
    675       // Grab the next FlipFrame to send.
    676       FlipIOBuffer next_buffer = queue_.top();
    677       queue_.pop();
    678 
    679       // We've deferred compression until just before we write it to the socket,
    680       // which is now.  At this time, we don't compress our data frames.
    681       flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false);
    682       size_t size;
    683       if (uncompressed_frame.is_control_frame()) {
    684         scoped_ptr<flip::FlipFrame> compressed_frame(
    685             flip_framer_.CompressFrame(&uncompressed_frame));
    686         size = compressed_frame->length() + flip::FlipFrame::size();
    687 
    688         DCHECK(size > 0);
    689 
    690         // TODO(mbelshe): We have too much copying of data here.
    691         IOBufferWithSize* buffer = new IOBufferWithSize(size);
    692         memcpy(buffer->data(), compressed_frame->data(), size);
    693 
    694         // Attempt to send the frame.
    695         in_flight_write_ = FlipIOBuffer(buffer, size, 0, next_buffer.stream());
    696       } else {
    697         size = uncompressed_frame.length() + flip::FlipFrame::size();
    698         in_flight_write_ = next_buffer;
    699       }
    700     } else {
    701       DCHECK(in_flight_write_.buffer()->BytesRemaining());
    702     }
    703 
    704     write_pending_ = true;
    705     int rv = connection_->socket()->Write(in_flight_write_.buffer(),
    706         in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
    707     if (rv == net::ERR_IO_PENDING)
    708       break;
    709 
    710     // We sent the frame successfully.
    711     OnWriteComplete(rv);
    712 
    713     // TODO(mbelshe):  Test this error case.  Maybe we should mark the socket
    714     //                 as in an error state.
    715     if (rv < 0)
    716       break;
    717   }
    718 }
    719 
    720 void FlipSession::CloseAllStreams(net::Error code) {
    721   LOG(INFO) << "Closing all FLIP Streams";
    722 
    723   static StatsCounter abandoned_streams("flip.abandoned_streams");
    724   static StatsCounter abandoned_push_streams("flip.abandoned_push_streams");
    725 
    726   if (active_streams_.size()) {
    727     abandoned_streams.Add(active_streams_.size());
    728 
    729     // Create a copy of the list, since aborting streams can invalidate
    730     // our list.
    731     FlipStream** list = new FlipStream*[active_streams_.size()];
    732     ActiveStreamMap::const_iterator it;
    733     int index = 0;
    734     for (it = active_streams_.begin(); it != active_streams_.end(); ++it)
    735       list[index++] = it->second;
    736 
    737     // Issue the aborts.
    738     for (--index; index >= 0; index--) {
    739       LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id()
    740                  << "): " << list[index]->path();
    741       list[index]->OnClose(code);
    742     }
    743 
    744     // Clear out anything pending.
    745     active_streams_.clear();
    746 
    747     delete[] list;
    748   }
    749 
    750   if (pushed_streams_.size()) {
    751     streams_abandoned_count_ += pushed_streams_.size();
    752     abandoned_push_streams.Add(pushed_streams_.size());
    753     pushed_streams_.clear();
    754   }
    755 }
    756 
    757 int FlipSession::GetNewStreamId() {
    758   int id = stream_hi_water_mark_;
    759   stream_hi_water_mark_ += 2;
    760   if (stream_hi_water_mark_ > 0x7fff)
    761     stream_hi_water_mark_ = 1;
    762   return id;
    763 }
    764 
    765 void FlipSession::CloseSessionOnError(net::Error err) {
    766   DCHECK_LT(err, OK);
    767   LOG(INFO) << "Flip::CloseSessionOnError(" << err << ")";
    768 
    769   // Don't close twice.  This can occur because we can have both
    770   // a read and a write outstanding, and each can complete with
    771   // an error.
    772   if (state_ != CLOSED) {
    773     state_ = CLOSED;
    774     error_ = err;
    775     CloseAllStreams(err);
    776     session_->flip_session_pool()->Remove(this);
    777   }
    778 }
    779 
    780 void FlipSession::ActivateStream(FlipStream* stream) {
    781   const flip::FlipStreamId id = stream->stream_id();
    782   DCHECK(!IsStreamActive(id));
    783 
    784   active_streams_[id] = stream;
    785 }
    786 
    787 void FlipSession::DeactivateStream(flip::FlipStreamId id) {
    788   DCHECK(IsStreamActive(id));
    789 
    790   // Verify it is not on the pushed_streams_ list.
    791   ActiveStreamList::iterator it;
    792   for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
    793     scoped_refptr<FlipStream> curr = *it;
    794     if (id == curr->stream_id()) {
    795       pushed_streams_.erase(it);
    796       break;
    797     }
    798   }
    799 
    800   active_streams_.erase(id);
    801 }
    802 
    803 scoped_refptr<FlipStream> FlipSession::GetPushStream(const std::string& path) {
    804   static StatsCounter used_push_streams("flip.claimed_push_streams");
    805 
    806   LOG(INFO) << "Looking for push stream: " << path;
    807 
    808   scoped_refptr<FlipStream> stream;
    809 
    810   // We just walk a linear list here.
    811   ActiveStreamList::iterator it;
    812   for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
    813     stream = *it;
    814     if (path == stream->path()) {
    815       CHECK(stream->pushed());
    816       pushed_streams_.erase(it);
    817       used_push_streams.Increment();
    818       LOG(INFO) << "Push Stream Claim for: " << path;
    819       break;
    820     }
    821   }
    822 
    823   return stream;
    824 }
    825 
    826 void FlipSession::GetSSLInfo(SSLInfo* ssl_info) {
    827   if (is_secure_) {
    828     SSLClientSocket* ssl_socket =
    829         reinterpret_cast<SSLClientSocket*>(connection_->socket());
    830     ssl_socket->GetSSLInfo(ssl_info);
    831   }
    832 }
    833 
    834 void FlipSession::OnError(flip::FlipFramer* framer) {
    835   LOG(ERROR) << "FlipSession error: " << framer->error_code();
    836   CloseSessionOnError(net::ERR_FLIP_PROTOCOL_ERROR);
    837 }
    838 
    839 void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id,
    840                                     const char* data,
    841                                     size_t len) {
    842   LOG(INFO) << "Flip data for stream " << stream_id << ", " << len << " bytes";
    843   bool valid_stream = IsStreamActive(stream_id);
    844   if (!valid_stream) {
    845     // NOTE:  it may just be that the stream was cancelled.
    846     LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
    847     return;
    848   }
    849 
    850   scoped_refptr<FlipStream> stream = active_streams_[stream_id];
    851   bool success = stream->OnDataReceived(data, len);
    852   // |len| == 0 implies a closed stream.
    853   if (!success || !len)
    854     DeactivateStream(stream_id);
    855 }
    856 
    857 void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
    858                         const flip::FlipHeaderBlock* headers) {
    859   flip::FlipStreamId stream_id = frame->stream_id();
    860 
    861   // Server-initiated streams should have even sequence numbers.
    862   if ((stream_id & 0x1) != 0) {
    863     LOG(ERROR) << "Received invalid OnSyn stream id " << stream_id;
    864     return;
    865   }
    866 
    867   if (IsStreamActive(stream_id)) {
    868     LOG(ERROR) << "Received OnSyn for active stream " << stream_id;
    869     return;
    870   }
    871 
    872   streams_pushed_count_++;
    873 
    874   LOG(INFO) << "FlipSession: Syn received for stream: " << stream_id;
    875 
    876   LOG(INFO) << "FLIP SYN RESPONSE HEADERS -----------------------";
    877   DumpFlipHeaders(*headers);
    878 
    879   // TODO(mbelshe): DCHECK that this is a GET method?
    880 
    881   const std::string& path = ContainsKey(*headers, "path") ?
    882       headers->find("path")->second : "";
    883 
    884   // Verify that the response had a URL for us.
    885   DCHECK(!path.empty());
    886   if (path.empty()) {
    887     LOG(WARNING) << "Pushed stream did not contain a path.";
    888     return;
    889   }
    890 
    891   scoped_refptr<FlipStream> stream;
    892 
    893   // Check if we already have a delegate awaiting this stream.
    894   PendingStreamMap::iterator it;
    895   it = pending_streams_.find(path);
    896   if (it != pending_streams_.end()) {
    897     stream = it->second;
    898     pending_streams_.erase(it);
    899     if (stream)
    900       pushed_streams_.push_back(stream);
    901   } else {
    902     pushed_streams_.push_back(stream);
    903   }
    904 
    905   if (stream) {
    906     CHECK(stream->pushed());
    907     CHECK(stream->stream_id() == 0);
    908     stream->set_stream_id(stream_id);
    909   } else {
    910     // TODO(mbelshe): can we figure out how to use a LoadLog here?
    911     stream = new FlipStream(this, stream_id, true, NULL);
    912   }
    913 
    914   // Activate a stream and parse the headers.
    915   ActivateStream(stream);
    916 
    917   stream->set_path(path);
    918 
    919   // TODO(mbelshe): For now we convert from our nice hash map back
    920   // to a string of headers; this is because the HttpResponseInfo
    921   // is a bit rigid for its http (non-flip) design.
    922   HttpResponseInfo response;
    923   if (FlipHeadersToHttpResponse(*headers, &response)) {
    924     GetSSLInfo(&response.ssl_info);
    925     stream->OnResponseReceived(response);
    926   } else {
    927     stream->OnClose(ERR_INVALID_RESPONSE);
    928     DeactivateStream(stream_id);
    929     return;
    930   }
    931 
    932   LOG(INFO) << "Got pushed stream for " << stream->path();
    933 
    934   static StatsCounter push_requests("flip.pushed_streams");
    935   push_requests.Increment();
    936 }
    937 
    938 void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame,
    939                              const flip::FlipHeaderBlock* headers) {
    940   DCHECK(headers);
    941   flip::FlipStreamId stream_id = frame->stream_id();
    942   bool valid_stream = IsStreamActive(stream_id);
    943   if (!valid_stream) {
    944     // NOTE:  it may just be that the stream was cancelled.
    945     LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
    946     return;
    947   }
    948 
    949   LOG(INFO) << "FLIP SYN_REPLY RESPONSE HEADERS for stream: " << stream_id;
    950   DumpFlipHeaders(*headers);
    951 
    952   // We record content declared as being pushed so that we don't
    953   // request a duplicate stream which is already scheduled to be
    954   // sent to us.
    955   flip::FlipHeaderBlock::const_iterator it;
    956   it = headers->find("X-Associated-Content");
    957   if (it != headers->end()) {
    958     const std::string& content = it->second;
    959     std::string::size_type start = 0;
    960     std::string::size_type end = 0;
    961     do {
    962       end = content.find("||", start);
    963       if (end == std::string::npos)
    964         end = content.length();
    965       std::string url = content.substr(start, end - start);
    966       std::string::size_type pos = url.find("??");
    967       if (pos == std::string::npos)
    968         break;
    969       url = url.substr(pos + 2);
    970       GURL gurl(url);
    971       std::string path = gurl.PathForRequest();
    972       if (path.length())
    973         pending_streams_[path] = NULL;
    974       else
    975         LOG(INFO) << "Invalid X-Associated-Content path: " << url;
    976       start = end + 2;
    977     } while (start < content.length());
    978   }
    979 
    980   scoped_refptr<FlipStream> stream = active_streams_[stream_id];
    981   CHECK(stream->stream_id() == stream_id);
    982   CHECK(!stream->cancelled());
    983   HttpResponseInfo response;
    984   if (FlipHeadersToHttpResponse(*headers, &response)) {
    985     GetSSLInfo(&response.ssl_info);
    986     stream->OnResponseReceived(response);
    987   } else {
    988     stream->OnClose(ERR_INVALID_RESPONSE);
    989     DeactivateStream(stream_id);
    990   }
    991 }
    992 
    993 void FlipSession::OnControl(const flip::FlipControlFrame* frame) {
    994   flip::FlipHeaderBlock headers;
    995   uint32 type = frame->type();
    996   if (type == flip::SYN_STREAM || type == flip::SYN_REPLY) {
    997     if (!flip_framer_.ParseHeaderBlock(frame, &headers)) {
    998       LOG(WARNING) << "Could not parse Flip Control Frame Header";
    999       // TODO(mbelshe):  Error the session?
   1000       return;
   1001     }
   1002   }
   1003 
   1004   switch (type) {
   1005     case flip::SYN_STREAM:
   1006       LOG(INFO) << "Flip SynStream for stream " << frame->stream_id();
   1007       OnSyn(reinterpret_cast<const flip::FlipSynStreamControlFrame*>(frame),
   1008             &headers);
   1009       break;
   1010     case flip::SYN_REPLY:
   1011       LOG(INFO) << "Flip SynReply for stream " << frame->stream_id();
   1012       OnSynReply(
   1013           reinterpret_cast<const flip::FlipSynReplyControlFrame*>(frame),
   1014           &headers);
   1015       break;
   1016     case flip::FIN_STREAM:
   1017       LOG(INFO) << "Flip Fin for stream " << frame->stream_id();
   1018       OnFin(reinterpret_cast<const flip::FlipFinStreamControlFrame*>(frame));
   1019       break;
   1020     default:
   1021       DCHECK(false);  // Error!
   1022   }
   1023 }
   1024 
   1025 void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) {
   1026   flip::FlipStreamId stream_id = frame->stream_id();
   1027   bool valid_stream = IsStreamActive(stream_id);
   1028   if (!valid_stream) {
   1029     // NOTE:  it may just be that the stream was cancelled.
   1030     LOG(WARNING) << "Received FIN for invalid stream" << stream_id;
   1031     return;
   1032   }
   1033   scoped_refptr<FlipStream> stream = active_streams_[stream_id];
   1034   CHECK(stream->stream_id() == stream_id);
   1035   CHECK(!stream->cancelled());
   1036   if (frame->status() == 0) {
   1037     stream->OnDataReceived(NULL, 0);
   1038   } else {
   1039     LOG(ERROR) << "Flip stream closed: " << frame->status();
   1040     // TODO(mbelshe): Map from Flip-protocol errors to something sensical.
   1041     //                For now, it doesn't matter much - it is a protocol error.
   1042     stream->OnClose(ERR_FAILED);
   1043   }
   1044 
   1045   DeactivateStream(stream_id);
   1046 }
   1047 
   1048 }  // namespace net
   1049