Home | History | Annotate | Download | only in streams
      1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/browser/streams/stream.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/location.h"
      9 #include "base/message_loop/message_loop_proxy.h"
     10 #include "content/browser/streams/stream_handle_impl.h"
     11 #include "content/browser/streams/stream_read_observer.h"
     12 #include "content/browser/streams/stream_registry.h"
     13 #include "content/browser/streams/stream_write_observer.h"
     14 #include "net/base/io_buffer.h"
     15 
     16 namespace {
     17 // Start throttling the connection at about 1MB.
     18 const size_t kDeferSizeThreshold = 40 * 32768;
     19 }
     20 
     21 namespace content {
     22 
     23 Stream::Stream(StreamRegistry* registry,
     24                StreamWriteObserver* write_observer,
     25                const GURL& url)
     26     : can_add_data_(true),
     27       url_(url),
     28       data_length_(0),
     29       data_bytes_read_(0),
     30       last_total_buffered_bytes_(0),
     31       registry_(registry),
     32       read_observer_(NULL),
     33       write_observer_(write_observer),
     34       stream_handle_(NULL),
     35       weak_ptr_factory_(this) {
     36   CreateByteStream(base::MessageLoopProxy::current(),
     37                    base::MessageLoopProxy::current(),
     38                    kDeferSizeThreshold,
     39                    &writer_,
     40                    &reader_);
     41 
     42   // Setup callback for writing.
     43   writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
     44                                        weak_ptr_factory_.GetWeakPtr()));
     45   reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
     46                                        weak_ptr_factory_.GetWeakPtr()));
     47 
     48   registry_->RegisterStream(this);
     49 }
     50 
     51 Stream::~Stream() {
     52 }
     53 
     54 bool Stream::SetReadObserver(StreamReadObserver* observer) {
     55   if (read_observer_)
     56     return false;
     57   read_observer_ = observer;
     58   return true;
     59 }
     60 
     61 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
     62   DCHECK(observer == read_observer_);
     63   read_observer_ = NULL;
     64 }
     65 
     66 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
     67   DCHECK(observer == write_observer_);
     68   write_observer_ = NULL;
     69 }
     70 
     71 void Stream::Abort() {
     72   // Clear all buffer. It's safe to clear reader_ here since the same thread
     73   // is used for both input and output operation.
     74   writer_.reset();
     75   reader_.reset();
     76   ClearBuffer();
     77   can_add_data_ = false;
     78   registry_->UnregisterStream(url());
     79 }
     80 
     81 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
     82   if (!writer_.get())
     83     return;
     84 
     85   size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
     86   if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
     87     Abort();
     88     return;
     89   }
     90 
     91   // Now it's guaranteed that this doesn't overflow. This must be done before
     92   // Write() since GetTotalBufferedBytes() may return different value after
     93   // Write() call, so if we use the new value, information in this instance and
     94   // one in |registry_| become inconsistent.
     95   last_total_buffered_bytes_ = current_buffered_bytes + size;
     96 
     97   can_add_data_ = writer_->Write(buffer, size);
     98 }
     99 
    100 void Stream::AddData(const char* data, size_t size) {
    101   if (!writer_.get())
    102     return;
    103 
    104   scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
    105   memcpy(io_buffer->data(), data, size);
    106   AddData(io_buffer, size);
    107 }
    108 
    109 void Stream::Finalize() {
    110   if (!writer_.get())
    111     return;
    112 
    113   writer_->Close(0);
    114   writer_.reset();
    115 
    116   // Continue asynchronously.
    117   base::MessageLoopProxy::current()->PostTask(
    118       FROM_HERE,
    119       base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
    120 }
    121 
    122 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
    123                                         int buf_size,
    124                                         int* bytes_read) {
    125   DCHECK(buf);
    126   DCHECK(bytes_read);
    127 
    128   *bytes_read = 0;
    129   if (!data_.get()) {
    130     DCHECK(!data_length_);
    131     DCHECK(!data_bytes_read_);
    132 
    133     if (!reader_.get())
    134       return STREAM_ABORTED;
    135 
    136     ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
    137     switch (state) {
    138       case ByteStreamReader::STREAM_HAS_DATA:
    139         break;
    140       case ByteStreamReader::STREAM_COMPLETE:
    141         registry_->UnregisterStream(url());
    142         return STREAM_COMPLETE;
    143       case ByteStreamReader::STREAM_EMPTY:
    144         return STREAM_EMPTY;
    145     }
    146   }
    147 
    148   const size_t remaining_bytes = data_length_ - data_bytes_read_;
    149   size_t to_read =
    150       static_cast<size_t>(buf_size) < remaining_bytes ?
    151       buf_size : remaining_bytes;
    152   memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
    153   data_bytes_read_ += to_read;
    154   if (data_bytes_read_ >= data_length_)
    155     ClearBuffer();
    156 
    157   *bytes_read = to_read;
    158   return STREAM_HAS_DATA;
    159 }
    160 
    161 scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url,
    162                                               const std::string& mime_type) {
    163   CHECK(!stream_handle_);
    164   stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
    165                                         original_url,
    166                                         mime_type);
    167   return scoped_ptr<StreamHandle>(stream_handle_).Pass();
    168 }
    169 
    170 void Stream::CloseHandle() {
    171   // Prevent deletion until this function ends.
    172   scoped_refptr<Stream> ref(this);
    173 
    174   CHECK(stream_handle_);
    175   stream_handle_ = NULL;
    176   registry_->UnregisterStream(url());
    177   if (write_observer_)
    178     write_observer_->OnClose(this);
    179 }
    180 
    181 void Stream::OnSpaceAvailable() {
    182   can_add_data_ = true;
    183   if (write_observer_)
    184     write_observer_->OnSpaceAvailable(this);
    185 }
    186 
    187 void Stream::OnDataAvailable() {
    188   if (read_observer_)
    189     read_observer_->OnDataAvailable(this);
    190 }
    191 
    192 void Stream::ClearBuffer() {
    193   data_ = NULL;
    194   data_length_ = 0;
    195   data_bytes_read_ = 0;
    196 }
    197 
    198 }  // namespace content
    199