Home | History | Annotate | Download | only in browser
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/browser/byte_stream.h"
      6 
      7 #include <deque>
      8 #include <set>
      9 #include <utility>
     10 
     11 #include "base/bind.h"
     12 #include "base/location.h"
     13 #include "base/memory/ref_counted.h"
     14 #include "base/sequenced_task_runner.h"
     15 
     16 namespace content {
     17 namespace {
     18 
     19 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
     20 ContentVector;
     21 
     22 class ByteStreamReaderImpl;
     23 
     24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
     25 // cleared in an object destructor and accessed to check for object
     26 // existence.  We can't use weak pointers because they're tightly tied to
     27 // threads rather than task runners.
     28 // TODO(rdsmith): A better solution would be extending weak pointers
     29 // to support SequencedTaskRunners.
     30 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
     31  public:
     32   LifetimeFlag() : is_alive(true) { }
     33   bool is_alive;
     34 
     35  protected:
     36   friend class base::RefCountedThreadSafe<LifetimeFlag>;
     37   virtual ~LifetimeFlag() { }
     38 
     39  private:
     40   DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
     41 };
     42 
     43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
     44 // SetPeer may happen anywhere; all other operations on each class must
     45 // happen in the context of their SequencedTaskRunner.
     46 class ByteStreamWriterImpl : public ByteStreamWriter {
     47  public:
     48   ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
     49                        scoped_refptr<LifetimeFlag> lifetime_flag,
     50                        size_t buffer_size);
     51   virtual ~ByteStreamWriterImpl();
     52 
     53   // Must be called before any operations are performed.
     54   void SetPeer(ByteStreamReaderImpl* peer,
     55                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
     56                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
     57 
     58   // Overridden from ByteStreamWriter.
     59   virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
     60                      size_t byte_count) OVERRIDE;
     61   virtual void Flush() OVERRIDE;
     62   virtual void Close(int status) OVERRIDE;
     63   virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
     64   virtual size_t GetTotalBufferedBytes() const OVERRIDE;
     65 
     66   // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
     67   static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
     68                            ByteStreamWriterImpl* target,
     69                            size_t bytes_consumed);
     70 
     71  private:
     72   // Called from UpdateWindow when object existence has been validated.
     73   void UpdateWindowInternal(size_t bytes_consumed);
     74 
     75   void PostToPeer(bool complete, int status);
     76 
     77   const size_t total_buffer_size_;
     78 
     79   // All data objects in this class are only valid to access on
     80   // this task runner except as otherwise noted.
     81   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
     82 
     83   // True while this object is alive.
     84   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
     85 
     86   base::Closure space_available_callback_;
     87   ContentVector input_contents_;
     88   size_t input_contents_size_;
     89 
     90   // ** Peer information.
     91 
     92   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
     93 
     94   // How much we've sent to the output that for flow control purposes we
     95   // must assume hasn't been read yet.
     96   size_t output_size_used_;
     97 
     98   // Only valid to access on peer_task_runner_.
     99   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
    100 
    101   // Only valid to access on peer_task_runner_ if
    102   // |*peer_lifetime_flag_ == true|
    103   ByteStreamReaderImpl* peer_;
    104 };
    105 
    106 class ByteStreamReaderImpl : public ByteStreamReader {
    107  public:
    108   ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
    109                        scoped_refptr<LifetimeFlag> lifetime_flag,
    110                        size_t buffer_size);
    111   virtual ~ByteStreamReaderImpl();
    112 
    113   // Must be called before any operations are performed.
    114   void SetPeer(ByteStreamWriterImpl* peer,
    115                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
    116                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
    117 
    118   // Overridden from ByteStreamReader.
    119   virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
    120                            size_t* length) OVERRIDE;
    121   virtual int GetStatus() const OVERRIDE;
    122   virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
    123 
    124   // PostTask target from |ByteStreamWriterImpl::Write| and
    125   // |ByteStreamWriterImpl::Close|.
    126   // Receive data from our peer.
    127   // static because it may be called after the object it is targeting
    128   // has been destroyed.  It may not access |*target|
    129   // if |*object_lifetime_flag| is false.
    130   static void TransferData(
    131       scoped_refptr<LifetimeFlag> object_lifetime_flag,
    132       ByteStreamReaderImpl* target,
    133       scoped_ptr<ContentVector> transfer_buffer,
    134       size_t transfer_buffer_bytes,
    135       bool source_complete,
    136       int status);
    137 
    138  private:
    139   // Called from TransferData once object existence has been validated.
    140   void TransferDataInternal(
    141       scoped_ptr<ContentVector> transfer_buffer,
    142       size_t transfer_buffer_bytes,
    143       bool source_complete,
    144       int status);
    145 
    146   void MaybeUpdateInput();
    147 
    148   const size_t total_buffer_size_;
    149 
    150   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
    151 
    152   // True while this object is alive.
    153   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
    154 
    155   ContentVector available_contents_;
    156 
    157   bool received_status_;
    158   int status_;
    159 
    160   base::Closure data_available_callback_;
    161 
    162   // Time of last point at which data in stream transitioned from full
    163   // to non-full.  Nulled when a callback is sent.
    164   base::Time last_non_full_time_;
    165 
    166   // ** Peer information
    167 
    168   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
    169 
    170   // How much has been removed from this class that we haven't told
    171   // the input about yet.
    172   size_t unreported_consumed_bytes_;
    173 
    174   // Only valid to access on peer_task_runner_.
    175   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
    176 
    177   // Only valid to access on peer_task_runner_ if
    178   // |*peer_lifetime_flag_ == true|
    179   ByteStreamWriterImpl* peer_;
    180 };
    181 
    182 ByteStreamWriterImpl::ByteStreamWriterImpl(
    183     scoped_refptr<base::SequencedTaskRunner> task_runner,
    184     scoped_refptr<LifetimeFlag> lifetime_flag,
    185     size_t buffer_size)
    186     : total_buffer_size_(buffer_size),
    187       my_task_runner_(task_runner),
    188       my_lifetime_flag_(lifetime_flag),
    189       input_contents_size_(0),
    190       output_size_used_(0),
    191       peer_(NULL) {
    192   DCHECK(my_lifetime_flag_.get());
    193   my_lifetime_flag_->is_alive = true;
    194 }
    195 
    196 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
    197   my_lifetime_flag_->is_alive = false;
    198 }
    199 
    200 void ByteStreamWriterImpl::SetPeer(
    201     ByteStreamReaderImpl* peer,
    202     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
    203     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
    204   peer_ = peer;
    205   peer_task_runner_ = peer_task_runner;
    206   peer_lifetime_flag_ = peer_lifetime_flag;
    207 }
    208 
    209 bool ByteStreamWriterImpl::Write(
    210     scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
    211   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    212 
    213   // Check overflow.
    214   //
    215   // TODO(tyoshino): Discuss with content/browser/download developer and if
    216   // they're fine with, set smaller limit and make it configurable.
    217   size_t space_limit = std::numeric_limits<size_t>::max() -
    218       GetTotalBufferedBytes();
    219   if (byte_count > space_limit) {
    220     // TODO(tyoshino): Tell the user that Write() failed.
    221     // Ignore input.
    222     return false;
    223   }
    224 
    225   input_contents_.push_back(std::make_pair(buffer, byte_count));
    226   input_contents_size_ += byte_count;
    227 
    228   // Arbitrarily, we buffer to a third of the total size before sending.
    229   if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
    230     PostToPeer(false, 0);
    231 
    232   return GetTotalBufferedBytes() <= total_buffer_size_;
    233 }
    234 
    235 void ByteStreamWriterImpl::Flush() {
    236   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    237   if (input_contents_size_ > 0)
    238     PostToPeer(false, 0);
    239 }
    240 
    241 void ByteStreamWriterImpl::Close(int status) {
    242   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    243   PostToPeer(true, status);
    244 }
    245 
    246 void ByteStreamWriterImpl::RegisterCallback(
    247     const base::Closure& source_callback) {
    248   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    249   space_available_callback_ = source_callback;
    250 }
    251 
    252 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
    253   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    254   // This sum doesn't overflow since Write() fails if this sum is going to
    255   // overflow.
    256   return input_contents_size_ + output_size_used_;
    257 }
    258 
    259 // static
    260 void ByteStreamWriterImpl::UpdateWindow(
    261     scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
    262     size_t bytes_consumed) {
    263   // If the target object isn't alive anymore, we do nothing.
    264   if (!lifetime_flag->is_alive) return;
    265 
    266   target->UpdateWindowInternal(bytes_consumed);
    267 }
    268 
    269 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
    270   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    271 
    272   bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
    273 
    274   DCHECK_GE(output_size_used_, bytes_consumed);
    275   output_size_used_ -= bytes_consumed;
    276 
    277   // Callback if we were above the limit and we're now <= to it.
    278   bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
    279 
    280   if (no_longer_above_limit && was_above_limit &&
    281       !space_available_callback_.is_null())
    282     space_available_callback_.Run();
    283 }
    284 
    285 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
    286   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    287   // Valid contexts in which to call.
    288   DCHECK(complete || 0 != input_contents_size_);
    289 
    290   scoped_ptr<ContentVector> transfer_buffer;
    291   size_t buffer_size = 0;
    292   if (0 != input_contents_size_) {
    293     transfer_buffer.reset(new ContentVector);
    294     transfer_buffer->swap(input_contents_);
    295     buffer_size = input_contents_size_;
    296     output_size_used_ += input_contents_size_;
    297     input_contents_size_ = 0;
    298   }
    299   peer_task_runner_->PostTask(
    300       FROM_HERE, base::Bind(
    301           &ByteStreamReaderImpl::TransferData,
    302           peer_lifetime_flag_,
    303           peer_,
    304           base::Passed(&transfer_buffer),
    305           buffer_size,
    306           complete,
    307           status));
    308 }
    309 
    310 ByteStreamReaderImpl::ByteStreamReaderImpl(
    311     scoped_refptr<base::SequencedTaskRunner> task_runner,
    312     scoped_refptr<LifetimeFlag> lifetime_flag,
    313     size_t buffer_size)
    314     : total_buffer_size_(buffer_size),
    315       my_task_runner_(task_runner),
    316       my_lifetime_flag_(lifetime_flag),
    317       received_status_(false),
    318       status_(0),
    319       unreported_consumed_bytes_(0),
    320       peer_(NULL) {
    321   DCHECK(my_lifetime_flag_.get());
    322   my_lifetime_flag_->is_alive = true;
    323 }
    324 
    325 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
    326   my_lifetime_flag_->is_alive = false;
    327 }
    328 
    329 void ByteStreamReaderImpl::SetPeer(
    330     ByteStreamWriterImpl* peer,
    331     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
    332     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
    333   peer_ = peer;
    334   peer_task_runner_ = peer_task_runner;
    335   peer_lifetime_flag_ = peer_lifetime_flag;
    336 }
    337 
    338 ByteStreamReaderImpl::StreamState
    339 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
    340                            size_t* length) {
    341   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    342 
    343   if (available_contents_.size()) {
    344     *data = available_contents_.front().first;
    345     *length = available_contents_.front().second;
    346     available_contents_.pop_front();
    347     unreported_consumed_bytes_ += *length;
    348 
    349     MaybeUpdateInput();
    350     return STREAM_HAS_DATA;
    351   }
    352   if (received_status_) {
    353     return STREAM_COMPLETE;
    354   }
    355   return STREAM_EMPTY;
    356 }
    357 
    358 int ByteStreamReaderImpl::GetStatus() const {
    359   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    360   DCHECK(received_status_);
    361   return status_;
    362 }
    363 
    364 void ByteStreamReaderImpl::RegisterCallback(
    365     const base::Closure& sink_callback) {
    366   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    367 
    368   data_available_callback_ = sink_callback;
    369 }
    370 
    371 // static
    372 void ByteStreamReaderImpl::TransferData(
    373     scoped_refptr<LifetimeFlag> object_lifetime_flag,
    374     ByteStreamReaderImpl* target,
    375     scoped_ptr<ContentVector> transfer_buffer,
    376     size_t buffer_size,
    377     bool source_complete,
    378     int status) {
    379   // If our target is no longer alive, do nothing.
    380   if (!object_lifetime_flag->is_alive) return;
    381 
    382   target->TransferDataInternal(
    383       transfer_buffer.Pass(), buffer_size, source_complete, status);
    384 }
    385 
    386 void ByteStreamReaderImpl::TransferDataInternal(
    387     scoped_ptr<ContentVector> transfer_buffer,
    388     size_t buffer_size,
    389     bool source_complete,
    390     int status) {
    391   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    392 
    393   bool was_empty = available_contents_.empty();
    394 
    395   if (transfer_buffer) {
    396     available_contents_.insert(available_contents_.end(),
    397                                transfer_buffer->begin(),
    398                                transfer_buffer->end());
    399   }
    400 
    401   if (source_complete) {
    402     received_status_ = true;
    403     status_ = status;
    404   }
    405 
    406   // Callback on transition from empty to non-empty, or
    407   // source complete.
    408   if (((was_empty && !available_contents_.empty()) ||
    409        source_complete) &&
    410       !data_available_callback_.is_null())
    411     data_available_callback_.Run();
    412 }
    413 
    414 // Decide whether or not to send the input a window update.
    415 // Currently we do that whenever we've got unreported consumption
    416 // greater than 1/3 of total size.
    417 void ByteStreamReaderImpl::MaybeUpdateInput() {
    418   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    419 
    420   if (unreported_consumed_bytes_ <=
    421       total_buffer_size_ / kFractionReadBeforeWindowUpdate)
    422     return;
    423 
    424   peer_task_runner_->PostTask(
    425       FROM_HERE, base::Bind(
    426           &ByteStreamWriterImpl::UpdateWindow,
    427           peer_lifetime_flag_,
    428           peer_,
    429           unreported_consumed_bytes_));
    430   unreported_consumed_bytes_ = 0;
    431 }
    432 
    433 }  // namespace
    434 
    435 const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
    436 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
    437 
    438 ByteStreamReader::~ByteStreamReader() { }
    439 
    440 ByteStreamWriter::~ByteStreamWriter() { }
    441 
    442 void CreateByteStream(
    443     scoped_refptr<base::SequencedTaskRunner> input_task_runner,
    444     scoped_refptr<base::SequencedTaskRunner> output_task_runner,
    445     size_t buffer_size,
    446     scoped_ptr<ByteStreamWriter>* input,
    447     scoped_ptr<ByteStreamReader>* output) {
    448   scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
    449   scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
    450 
    451   ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
    452       input_task_runner, input_flag, buffer_size);
    453   ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
    454       output_task_runner, output_flag, buffer_size);
    455 
    456   in->SetPeer(out, output_task_runner, output_flag);
    457   out->SetPeer(in, input_task_runner, input_flag);
    458   input->reset(in);
    459   output->reset(out);
    460 }
    461 
    462 }  // namespace content
    463