Home | History | Annotate | Download | only in browser
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/browser/byte_stream.h"
      6 
      7 #include <deque>
      8 #include <set>
      9 #include <utility>
     10 
     11 #include "base/bind.h"
     12 #include "base/location.h"
     13 #include "base/memory/ref_counted.h"
     14 #include "base/sequenced_task_runner.h"
     15 
     16 namespace content {
     17 namespace {
     18 
     19 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
     20 ContentVector;
     21 
     22 class ByteStreamReaderImpl;
     23 
     24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
     25 // cleared in an object destructor and accessed to check for object
     26 // existence.  We can't use weak pointers because they're tightly tied to
     27 // threads rather than task runners.
     28 // TODO(rdsmith): A better solution would be extending weak pointers
     29 // to support SequencedTaskRunners.
     30 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
     31  public:
     32   LifetimeFlag() : is_alive(true) { }
     33   bool is_alive;
     34 
     35  protected:
     36   friend class base::RefCountedThreadSafe<LifetimeFlag>;
     37   virtual ~LifetimeFlag() { }
     38 
     39  private:
     40   DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
     41 };
     42 
     43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
     44 // SetPeer may happen anywhere; all other operations on each class must
     45 // happen in the context of their SequencedTaskRunner.
     46 class ByteStreamWriterImpl : public ByteStreamWriter {
     47  public:
     48   ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
     49                        scoped_refptr<LifetimeFlag> lifetime_flag,
     50                        size_t buffer_size);
     51   virtual ~ByteStreamWriterImpl();
     52 
     53   // Must be called before any operations are performed.
     54   void SetPeer(ByteStreamReaderImpl* peer,
     55                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
     56                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
     57 
     58   // Overridden from ByteStreamWriter.
     59   virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
     60                      size_t byte_count) OVERRIDE;
     61   virtual void Flush() OVERRIDE;
     62   virtual void Close(int status) OVERRIDE;
     63   virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
     64   virtual size_t GetTotalBufferedBytes() const OVERRIDE;
     65 
     66   // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
     67   static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
     68                            ByteStreamWriterImpl* target,
     69                            size_t bytes_consumed);
     70 
     71  private:
     72   // Called from UpdateWindow when object existence has been validated.
     73   void UpdateWindowInternal(size_t bytes_consumed);
     74 
     75   void PostToPeer(bool complete, int status);
     76 
     77   const size_t total_buffer_size_;
     78 
     79   // All data objects in this class are only valid to access on
     80   // this task runner except as otherwise noted.
     81   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
     82 
     83   // True while this object is alive.
     84   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
     85 
     86   base::Closure space_available_callback_;
     87   ContentVector input_contents_;
     88   size_t input_contents_size_;
     89 
     90   // ** Peer information.
     91 
     92   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
     93 
     94   // How much we've sent to the output that for flow control purposes we
     95   // must assume hasn't been read yet.
     96   size_t output_size_used_;
     97 
     98   // Only valid to access on peer_task_runner_.
     99   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
    100 
    101   // Only valid to access on peer_task_runner_ if
    102   // |*peer_lifetime_flag_ == true|
    103   ByteStreamReaderImpl* peer_;
    104 };
    105 
    106 class ByteStreamReaderImpl : public ByteStreamReader {
    107  public:
    108   ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
    109                        scoped_refptr<LifetimeFlag> lifetime_flag,
    110                        size_t buffer_size);
    111   virtual ~ByteStreamReaderImpl();
    112 
    113   // Must be called before any operations are performed.
    114   void SetPeer(ByteStreamWriterImpl* peer,
    115                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
    116                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
    117 
    118   // Overridden from ByteStreamReader.
    119   virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
    120                            size_t* length) OVERRIDE;
    121   virtual int GetStatus() const OVERRIDE;
    122   virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
    123 
    124   // PostTask target from |ByteStreamWriterImpl::Write| and
    125   // |ByteStreamWriterImpl::Close|.
    126   // Receive data from our peer.
    127   // static because it may be called after the object it is targeting
    128   // has been destroyed.  It may not access |*target|
    129   // if |*object_lifetime_flag| is false.
    130   static void TransferData(
    131       scoped_refptr<LifetimeFlag> object_lifetime_flag,
    132       ByteStreamReaderImpl* target,
    133       scoped_ptr<ContentVector> transfer_buffer,
    134       size_t transfer_buffer_bytes,
    135       bool source_complete,
    136       int status);
    137 
    138  private:
    139   // Called from TransferData once object existence has been validated.
    140   void TransferDataInternal(
    141       scoped_ptr<ContentVector> transfer_buffer,
    142       size_t transfer_buffer_bytes,
    143       bool source_complete,
    144       int status);
    145 
    146   void MaybeUpdateInput();
    147 
    148   const size_t total_buffer_size_;
    149 
    150   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
    151 
    152   // True while this object is alive.
    153   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
    154 
    155   ContentVector available_contents_;
    156 
    157   bool received_status_;
    158   int status_;
    159 
    160   base::Closure data_available_callback_;
    161 
    162   // Time of last point at which data in stream transitioned from full
    163   // to non-full.  Nulled when a callback is sent.
    164   base::Time last_non_full_time_;
    165 
    166   // ** Peer information
    167 
    168   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
    169 
    170   // How much has been removed from this class that we haven't told
    171   // the input about yet.
    172   size_t unreported_consumed_bytes_;
    173 
    174   // Only valid to access on peer_task_runner_.
    175   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
    176 
    177   // Only valid to access on peer_task_runner_ if
    178   // |*peer_lifetime_flag_ == true|
    179   ByteStreamWriterImpl* peer_;
    180 };
    181 
    182 ByteStreamWriterImpl::ByteStreamWriterImpl(
    183     scoped_refptr<base::SequencedTaskRunner> task_runner,
    184     scoped_refptr<LifetimeFlag> lifetime_flag,
    185     size_t buffer_size)
    186     : total_buffer_size_(buffer_size),
    187       my_task_runner_(task_runner),
    188       my_lifetime_flag_(lifetime_flag),
    189       input_contents_size_(0),
    190       output_size_used_(0),
    191       peer_(NULL) {
    192   DCHECK(my_lifetime_flag_.get());
    193   my_lifetime_flag_->is_alive = true;
    194 }
    195 
    196 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
    197   // No RunsTasksOnCurrentThread() check to allow deleting a created writer
    198   // before we start using it. Once started, should be deleted on the specified
    199   // task runner.
    200   my_lifetime_flag_->is_alive = false;
    201 }
    202 
    203 void ByteStreamWriterImpl::SetPeer(
    204     ByteStreamReaderImpl* peer,
    205     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
    206     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
    207   peer_ = peer;
    208   peer_task_runner_ = peer_task_runner;
    209   peer_lifetime_flag_ = peer_lifetime_flag;
    210 }
    211 
    212 bool ByteStreamWriterImpl::Write(
    213     scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
    214   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    215 
    216   // Check overflow.
    217   //
    218   // TODO(tyoshino): Discuss with content/browser/download developer and if
    219   // they're fine with, set smaller limit and make it configurable.
    220   size_t space_limit = std::numeric_limits<size_t>::max() -
    221       GetTotalBufferedBytes();
    222   if (byte_count > space_limit) {
    223     // TODO(tyoshino): Tell the user that Write() failed.
    224     // Ignore input.
    225     return false;
    226   }
    227 
    228   input_contents_.push_back(std::make_pair(buffer, byte_count));
    229   input_contents_size_ += byte_count;
    230 
    231   // Arbitrarily, we buffer to a third of the total size before sending.
    232   if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
    233     PostToPeer(false, 0);
    234 
    235   return GetTotalBufferedBytes() <= total_buffer_size_;
    236 }
    237 
    238 void ByteStreamWriterImpl::Flush() {
    239   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    240   if (input_contents_size_ > 0)
    241     PostToPeer(false, 0);
    242 }
    243 
    244 void ByteStreamWriterImpl::Close(int status) {
    245   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    246   PostToPeer(true, status);
    247 }
    248 
    249 void ByteStreamWriterImpl::RegisterCallback(
    250     const base::Closure& source_callback) {
    251   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    252   space_available_callback_ = source_callback;
    253 }
    254 
    255 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
    256   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    257   // This sum doesn't overflow since Write() fails if this sum is going to
    258   // overflow.
    259   return input_contents_size_ + output_size_used_;
    260 }
    261 
    262 // static
    263 void ByteStreamWriterImpl::UpdateWindow(
    264     scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
    265     size_t bytes_consumed) {
    266   // If the target object isn't alive anymore, we do nothing.
    267   if (!lifetime_flag->is_alive) return;
    268 
    269   target->UpdateWindowInternal(bytes_consumed);
    270 }
    271 
    272 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
    273   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    274 
    275   bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
    276 
    277   DCHECK_GE(output_size_used_, bytes_consumed);
    278   output_size_used_ -= bytes_consumed;
    279 
    280   // Callback if we were above the limit and we're now <= to it.
    281   bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
    282 
    283   if (no_longer_above_limit && was_above_limit &&
    284       !space_available_callback_.is_null())
    285     space_available_callback_.Run();
    286 }
    287 
    288 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
    289   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    290   // Valid contexts in which to call.
    291   DCHECK(complete || 0 != input_contents_size_);
    292 
    293   scoped_ptr<ContentVector> transfer_buffer;
    294   size_t buffer_size = 0;
    295   if (0 != input_contents_size_) {
    296     transfer_buffer.reset(new ContentVector);
    297     transfer_buffer->swap(input_contents_);
    298     buffer_size = input_contents_size_;
    299     output_size_used_ += input_contents_size_;
    300     input_contents_size_ = 0;
    301   }
    302   peer_task_runner_->PostTask(
    303       FROM_HERE, base::Bind(
    304           &ByteStreamReaderImpl::TransferData,
    305           peer_lifetime_flag_,
    306           peer_,
    307           base::Passed(&transfer_buffer),
    308           buffer_size,
    309           complete,
    310           status));
    311 }
    312 
    313 ByteStreamReaderImpl::ByteStreamReaderImpl(
    314     scoped_refptr<base::SequencedTaskRunner> task_runner,
    315     scoped_refptr<LifetimeFlag> lifetime_flag,
    316     size_t buffer_size)
    317     : total_buffer_size_(buffer_size),
    318       my_task_runner_(task_runner),
    319       my_lifetime_flag_(lifetime_flag),
    320       received_status_(false),
    321       status_(0),
    322       unreported_consumed_bytes_(0),
    323       peer_(NULL) {
    324   DCHECK(my_lifetime_flag_.get());
    325   my_lifetime_flag_->is_alive = true;
    326 }
    327 
    328 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
    329   // No RunsTasksOnCurrentThread() check to allow deleting a created writer
    330   // before we start using it. Once started, should be deleted on the specified
    331   // task runner.
    332   my_lifetime_flag_->is_alive = false;
    333 }
    334 
    335 void ByteStreamReaderImpl::SetPeer(
    336     ByteStreamWriterImpl* peer,
    337     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
    338     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
    339   peer_ = peer;
    340   peer_task_runner_ = peer_task_runner;
    341   peer_lifetime_flag_ = peer_lifetime_flag;
    342 }
    343 
    344 ByteStreamReaderImpl::StreamState
    345 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
    346                            size_t* length) {
    347   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    348 
    349   if (available_contents_.size()) {
    350     *data = available_contents_.front().first;
    351     *length = available_contents_.front().second;
    352     available_contents_.pop_front();
    353     unreported_consumed_bytes_ += *length;
    354 
    355     MaybeUpdateInput();
    356     return STREAM_HAS_DATA;
    357   }
    358   if (received_status_) {
    359     return STREAM_COMPLETE;
    360   }
    361   return STREAM_EMPTY;
    362 }
    363 
    364 int ByteStreamReaderImpl::GetStatus() const {
    365   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    366   DCHECK(received_status_);
    367   return status_;
    368 }
    369 
    370 void ByteStreamReaderImpl::RegisterCallback(
    371     const base::Closure& sink_callback) {
    372   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    373 
    374   data_available_callback_ = sink_callback;
    375 }
    376 
    377 // static
    378 void ByteStreamReaderImpl::TransferData(
    379     scoped_refptr<LifetimeFlag> object_lifetime_flag,
    380     ByteStreamReaderImpl* target,
    381     scoped_ptr<ContentVector> transfer_buffer,
    382     size_t buffer_size,
    383     bool source_complete,
    384     int status) {
    385   // If our target is no longer alive, do nothing.
    386   if (!object_lifetime_flag->is_alive) return;
    387 
    388   target->TransferDataInternal(
    389       transfer_buffer.Pass(), buffer_size, source_complete, status);
    390 }
    391 
    392 void ByteStreamReaderImpl::TransferDataInternal(
    393     scoped_ptr<ContentVector> transfer_buffer,
    394     size_t buffer_size,
    395     bool source_complete,
    396     int status) {
    397   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    398 
    399   bool was_empty = available_contents_.empty();
    400 
    401   if (transfer_buffer) {
    402     available_contents_.insert(available_contents_.end(),
    403                                transfer_buffer->begin(),
    404                                transfer_buffer->end());
    405   }
    406 
    407   if (source_complete) {
    408     received_status_ = true;
    409     status_ = status;
    410   }
    411 
    412   // Callback on transition from empty to non-empty, or
    413   // source complete.
    414   if (((was_empty && !available_contents_.empty()) ||
    415        source_complete) &&
    416       !data_available_callback_.is_null())
    417     data_available_callback_.Run();
    418 }
    419 
    420 // Decide whether or not to send the input a window update.
    421 // Currently we do that whenever we've got unreported consumption
    422 // greater than 1/3 of total size.
    423 void ByteStreamReaderImpl::MaybeUpdateInput() {
    424   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    425 
    426   if (unreported_consumed_bytes_ <=
    427       total_buffer_size_ / kFractionReadBeforeWindowUpdate)
    428     return;
    429 
    430   peer_task_runner_->PostTask(
    431       FROM_HERE, base::Bind(
    432           &ByteStreamWriterImpl::UpdateWindow,
    433           peer_lifetime_flag_,
    434           peer_,
    435           unreported_consumed_bytes_));
    436   unreported_consumed_bytes_ = 0;
    437 }
    438 
    439 }  // namespace
    440 
    441 const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
    442 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
    443 
    444 ByteStreamReader::~ByteStreamReader() { }
    445 
    446 ByteStreamWriter::~ByteStreamWriter() { }
    447 
    448 void CreateByteStream(
    449     scoped_refptr<base::SequencedTaskRunner> input_task_runner,
    450     scoped_refptr<base::SequencedTaskRunner> output_task_runner,
    451     size_t buffer_size,
    452     scoped_ptr<ByteStreamWriter>* input,
    453     scoped_ptr<ByteStreamReader>* output) {
    454   scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
    455   scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
    456 
    457   ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
    458       input_task_runner, input_flag, buffer_size);
    459   ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
    460       output_task_runner, output_flag, buffer_size);
    461 
    462   in->SetPeer(out, output_task_runner, output_flag);
    463   out->SetPeer(in, input_task_runner, input_flag);
    464   input->reset(in);
    465   output->reset(out);
    466 }
    467 
    468 }  // namespace content
    469