Home | History | Annotate | Download | only in streams
      1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/browser/streams/stream.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/location.h"
      9 #include "base/message_loop/message_loop_proxy.h"
     10 #include "base/values.h"
     11 #include "content/browser/streams/stream_handle_impl.h"
     12 #include "content/browser/streams/stream_read_observer.h"
     13 #include "content/browser/streams/stream_registry.h"
     14 #include "content/browser/streams/stream_write_observer.h"
     15 #include "net/base/io_buffer.h"
     16 #include "net/http/http_response_headers.h"
     17 
     18 namespace {
     19 // Start throttling the connection at about 1MB.
     20 const size_t kDeferSizeThreshold = 40 * 32768;
     21 }
     22 
     23 namespace content {
     24 
     25 Stream::Stream(StreamRegistry* registry,
     26                StreamWriteObserver* write_observer,
     27                const GURL& url)
     28     : can_add_data_(true),
     29       url_(url),
     30       data_length_(0),
     31       data_bytes_read_(0),
     32       last_total_buffered_bytes_(0),
     33       registry_(registry),
     34       read_observer_(NULL),
     35       write_observer_(write_observer),
     36       stream_handle_(NULL),
     37       weak_ptr_factory_(this) {
     38   CreateByteStream(base::MessageLoopProxy::current(),
     39                    base::MessageLoopProxy::current(),
     40                    kDeferSizeThreshold,
     41                    &writer_,
     42                    &reader_);
     43 
     44   // Setup callback for writing.
     45   writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
     46                                        weak_ptr_factory_.GetWeakPtr()));
     47   reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
     48                                        weak_ptr_factory_.GetWeakPtr()));
     49 
     50   registry_->RegisterStream(this);
     51 }
     52 
     53 Stream::~Stream() {
     54 }
     55 
     56 bool Stream::SetReadObserver(StreamReadObserver* observer) {
     57   if (read_observer_)
     58     return false;
     59   read_observer_ = observer;
     60   return true;
     61 }
     62 
     63 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
     64   DCHECK(observer == read_observer_);
     65   read_observer_ = NULL;
     66 }
     67 
     68 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
     69   DCHECK(observer == write_observer_);
     70   write_observer_ = NULL;
     71 }
     72 
     73 void Stream::Abort() {
     74   // Clear all buffer. It's safe to clear reader_ here since the same thread
     75   // is used for both input and output operation.
     76   writer_.reset();
     77   reader_.reset();
     78   ClearBuffer();
     79   can_add_data_ = false;
     80   registry_->UnregisterStream(url());
     81 }
     82 
     83 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
     84   if (!writer_.get())
     85     return;
     86 
     87   size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
     88   if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
     89     Abort();
     90     return;
     91   }
     92 
     93   // Now it's guaranteed that this doesn't overflow. This must be done before
     94   // Write() since GetTotalBufferedBytes() may return different value after
     95   // Write() call, so if we use the new value, information in this instance and
     96   // one in |registry_| become inconsistent.
     97   last_total_buffered_bytes_ = current_buffered_bytes + size;
     98 
     99   can_add_data_ = writer_->Write(buffer, size);
    100 }
    101 
    102 void Stream::AddData(const char* data, size_t size) {
    103   if (!writer_.get())
    104     return;
    105 
    106   scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
    107   memcpy(io_buffer->data(), data, size);
    108   AddData(io_buffer, size);
    109 }
    110 
    111 void Stream::Finalize() {
    112   if (!writer_.get())
    113     return;
    114 
    115   writer_->Close(0);
    116   writer_.reset();
    117 
    118   // Continue asynchronously.
    119   base::MessageLoopProxy::current()->PostTask(
    120       FROM_HERE,
    121       base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
    122 }
    123 
    124 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
    125                                         int buf_size,
    126                                         int* bytes_read) {
    127   DCHECK(buf);
    128   DCHECK(bytes_read);
    129 
    130   *bytes_read = 0;
    131   if (!data_.get()) {
    132     DCHECK(!data_length_);
    133     DCHECK(!data_bytes_read_);
    134 
    135     if (!reader_.get())
    136       return STREAM_ABORTED;
    137 
    138     ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
    139     switch (state) {
    140       case ByteStreamReader::STREAM_HAS_DATA:
    141         break;
    142       case ByteStreamReader::STREAM_COMPLETE:
    143         registry_->UnregisterStream(url());
    144         return STREAM_COMPLETE;
    145       case ByteStreamReader::STREAM_EMPTY:
    146         return STREAM_EMPTY;
    147     }
    148   }
    149 
    150   const size_t remaining_bytes = data_length_ - data_bytes_read_;
    151   size_t to_read =
    152       static_cast<size_t>(buf_size) < remaining_bytes ?
    153       buf_size : remaining_bytes;
    154   memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
    155   data_bytes_read_ += to_read;
    156   if (data_bytes_read_ >= data_length_)
    157     ClearBuffer();
    158 
    159   *bytes_read = to_read;
    160   return STREAM_HAS_DATA;
    161 }
    162 
    163 scoped_ptr<StreamHandle> Stream::CreateHandle(
    164     const GURL& original_url,
    165     const std::string& mime_type,
    166     scoped_refptr<net::HttpResponseHeaders> response_headers) {
    167   CHECK(!stream_handle_);
    168   stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
    169                                         original_url,
    170                                         mime_type,
    171                                         response_headers);
    172   return scoped_ptr<StreamHandle>(stream_handle_).Pass();
    173 }
    174 
    175 void Stream::CloseHandle() {
    176   // Prevent deletion until this function ends.
    177   scoped_refptr<Stream> ref(this);
    178 
    179   CHECK(stream_handle_);
    180   stream_handle_ = NULL;
    181   registry_->UnregisterStream(url());
    182   if (write_observer_)
    183     write_observer_->OnClose(this);
    184 }
    185 
    186 void Stream::OnSpaceAvailable() {
    187   can_add_data_ = true;
    188   if (write_observer_)
    189     write_observer_->OnSpaceAvailable(this);
    190 }
    191 
    192 void Stream::OnDataAvailable() {
    193   if (read_observer_)
    194     read_observer_->OnDataAvailable(this);
    195 }
    196 
    197 void Stream::ClearBuffer() {
    198   data_ = NULL;
    199   data_length_ = 0;
    200   data_bytes_read_ = 0;
    201 }
    202 
    203 }  // namespace content
    204