Home | History | Annotate | Download | only in streams
      1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/browser/streams/stream.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/location.h"
      9 #include "base/message_loop/message_loop_proxy.h"
     10 #include "content/browser/streams/stream_handle_impl.h"
     11 #include "content/browser/streams/stream_read_observer.h"
     12 #include "content/browser/streams/stream_registry.h"
     13 #include "content/browser/streams/stream_write_observer.h"
     14 #include "net/base/io_buffer.h"
     15 
     16 namespace {
     17 // Start throttling the connection at about 1MB.
     18 const size_t kDeferSizeThreshold = 40 * 32768;
     19 }
     20 
     21 namespace content {
     22 
     23 Stream::Stream(StreamRegistry* registry,
     24                StreamWriteObserver* write_observer,
     25                const GURL& url)
     26     : data_bytes_read_(0),
     27       can_add_data_(true),
     28       url_(url),
     29       data_length_(0),
     30       registry_(registry),
     31       read_observer_(NULL),
     32       write_observer_(write_observer),
     33       stream_handle_(NULL),
     34       weak_ptr_factory_(this) {
     35   CreateByteStream(base::MessageLoopProxy::current(),
     36                    base::MessageLoopProxy::current(),
     37                    kDeferSizeThreshold,
     38                    &writer_,
     39                    &reader_);
     40 
     41   // Setup callback for writing.
     42   writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
     43                                        weak_ptr_factory_.GetWeakPtr()));
     44   reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
     45                                        weak_ptr_factory_.GetWeakPtr()));
     46 
     47   registry_->RegisterStream(this);
     48 }
     49 
     50 Stream::~Stream() {
     51 }
     52 
     53 bool Stream::SetReadObserver(StreamReadObserver* observer) {
     54   if (read_observer_)
     55     return false;
     56   read_observer_ = observer;
     57   return true;
     58 }
     59 
     60 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
     61   DCHECK(observer == read_observer_);
     62   read_observer_ = NULL;
     63 }
     64 
     65 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
     66   DCHECK(observer == write_observer_);
     67   write_observer_ = NULL;
     68 }
     69 
     70 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
     71   can_add_data_ = writer_->Write(buffer, size);
     72 }
     73 
     74 void Stream::AddData(const char* data, size_t size) {
     75   scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
     76   memcpy(io_buffer->data(), data, size);
     77   can_add_data_ = writer_->Write(io_buffer, size);
     78 }
     79 
     80 void Stream::Finalize() {
     81   writer_->Close(0);
     82   writer_.reset(NULL);
     83 
     84   // Continue asynchronously.
     85   base::MessageLoopProxy::current()->PostTask(
     86       FROM_HERE,
     87       base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
     88 }
     89 
     90 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
     91                                         int buf_size,
     92                                         int* bytes_read) {
     93   *bytes_read = 0;
     94   if (!data_.get()) {
     95     data_length_ = 0;
     96     data_bytes_read_ = 0;
     97     ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
     98     switch (state) {
     99       case ByteStreamReader::STREAM_HAS_DATA:
    100         break;
    101       case ByteStreamReader::STREAM_COMPLETE:
    102         registry_->UnregisterStream(url());
    103         return STREAM_COMPLETE;
    104       case ByteStreamReader::STREAM_EMPTY:
    105         return STREAM_EMPTY;
    106     }
    107   }
    108 
    109   const size_t remaining_bytes = data_length_ - data_bytes_read_;
    110   size_t to_read =
    111       static_cast<size_t>(buf_size) < remaining_bytes ?
    112       buf_size : remaining_bytes;
    113   memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
    114   data_bytes_read_ += to_read;
    115   if (data_bytes_read_ >= data_length_)
    116     data_ = NULL;
    117 
    118   *bytes_read = to_read;
    119   return STREAM_HAS_DATA;
    120 }
    121 
    122 scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url,
    123                                               const std::string& mime_type) {
    124   CHECK(!stream_handle_);
    125   stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
    126                                         original_url,
    127                                         mime_type);
    128   return scoped_ptr<StreamHandle>(stream_handle_).Pass();
    129 }
    130 
    131 void Stream::CloseHandle() {
    132   // Prevent deletion until this function ends.
    133   scoped_refptr<Stream> ref(this);
    134 
    135   CHECK(stream_handle_);
    136   stream_handle_ = NULL;
    137   registry_->UnregisterStream(url());
    138   if (write_observer_)
    139     write_observer_->OnClose(this);
    140 }
    141 
    142 void Stream::OnSpaceAvailable() {
    143   can_add_data_ = true;
    144   if (write_observer_)
    145     write_observer_->OnSpaceAvailable(this);
    146 }
    147 
    148 void Stream::OnDataAvailable() {
    149   if (read_observer_)
    150     read_observer_->OnDataAvailable(this);
    151 }
    152 
    153 }  // namespace content
    154