Home | History | Annotate | Download | only in browser
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/browser/byte_stream.h"
      6 
      7 #include <deque>
      8 #include <set>
      9 #include <utility>
     10 
     11 #include "base/bind.h"
     12 #include "base/location.h"
     13 #include "base/memory/ref_counted.h"
     14 #include "base/sequenced_task_runner.h"
     15 
     16 namespace content {
     17 namespace {
     18 
     19 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
     20 ContentVector;
     21 
     22 class ByteStreamReaderImpl;
     23 
     24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
     25 // cleared in an object destructor and accessed to check for object
     26 // existence.  We can't use weak pointers because they're tightly tied to
     27 // threads rather than task runners.
     28 // TODO(rdsmith): A better solution would be extending weak pointers
     29 // to support SequencedTaskRunners.
     30 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
     31  public:
     32   LifetimeFlag() : is_alive(true) { }
     33   bool is_alive;
     34 
     35  protected:
     36   friend class base::RefCountedThreadSafe<LifetimeFlag>;
     37   virtual ~LifetimeFlag() { }
     38 
     39  private:
     40   DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
     41 };
     42 
     43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
     44 // SetPeer may happen anywhere; all other operations on each class must
     45 // happen in the context of their SequencedTaskRunner.
     46 class ByteStreamWriterImpl : public ByteStreamWriter {
     47  public:
     48   ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
     49                        scoped_refptr<LifetimeFlag> lifetime_flag,
     50                        size_t buffer_size);
     51   virtual ~ByteStreamWriterImpl();
     52 
     53   // Must be called before any operations are performed.
     54   void SetPeer(ByteStreamReaderImpl* peer,
     55                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
     56                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
     57 
     58   // Overridden from ByteStreamWriter.
     59   virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
     60                      size_t byte_count) OVERRIDE;
     61   virtual void Flush() OVERRIDE;
     62   virtual void Close(int status) OVERRIDE;
     63   virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
     64 
     65   // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
     66   static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
     67                            ByteStreamWriterImpl* target,
     68                            size_t bytes_consumed);
     69 
     70  private:
     71   // Called from UpdateWindow when object existence has been validated.
     72   void UpdateWindowInternal(size_t bytes_consumed);
     73 
     74   void PostToPeer(bool complete, int status);
     75 
     76   const size_t total_buffer_size_;
     77 
     78   // All data objects in this class are only valid to access on
     79   // this task runner except as otherwise noted.
     80   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
     81 
     82   // True while this object is alive.
     83   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
     84 
     85   base::Closure space_available_callback_;
     86   ContentVector input_contents_;
     87   size_t input_contents_size_;
     88 
     89   // ** Peer information.
     90 
     91   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
     92 
     93   // How much we've sent to the output that for flow control purposes we
     94   // must assume hasn't been read yet.
     95   size_t output_size_used_;
     96 
     97   // Only valid to access on peer_task_runner_.
     98   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
     99 
    100   // Only valid to access on peer_task_runner_ if
    101   // |*peer_lifetime_flag_ == true|
    102   ByteStreamReaderImpl* peer_;
    103 };
    104 
    105 class ByteStreamReaderImpl : public ByteStreamReader {
    106  public:
    107   ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
    108                        scoped_refptr<LifetimeFlag> lifetime_flag,
    109                        size_t buffer_size);
    110   virtual ~ByteStreamReaderImpl();
    111 
    112   // Must be called before any operations are performed.
    113   void SetPeer(ByteStreamWriterImpl* peer,
    114                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
    115                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
    116 
    117   // Overridden from ByteStreamReader.
    118   virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
    119                            size_t* length) OVERRIDE;
    120   virtual int GetStatus() const OVERRIDE;
    121   virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
    122 
    123   // PostTask target from |ByteStreamWriterImpl::Write| and
    124   // |ByteStreamWriterImpl::Close|.
    125   // Receive data from our peer.
    126   // static because it may be called after the object it is targeting
    127   // has been destroyed.  It may not access |*target|
    128   // if |*object_lifetime_flag| is false.
    129   static void TransferData(
    130       scoped_refptr<LifetimeFlag> object_lifetime_flag,
    131       ByteStreamReaderImpl* target,
    132       scoped_ptr<ContentVector> transfer_buffer,
    133       size_t transfer_buffer_bytes,
    134       bool source_complete,
    135       int status);
    136 
    137  private:
    138   // Called from TransferData once object existence has been validated.
    139   void TransferDataInternal(
    140       scoped_ptr<ContentVector> transfer_buffer,
    141       size_t transfer_buffer_bytes,
    142       bool source_complete,
    143       int status);
    144 
    145   void MaybeUpdateInput();
    146 
    147   const size_t total_buffer_size_;
    148 
    149   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
    150 
    151   // True while this object is alive.
    152   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
    153 
    154   ContentVector available_contents_;
    155 
    156   bool received_status_;
    157   int status_;
    158 
    159   base::Closure data_available_callback_;
    160 
    161   // Time of last point at which data in stream transitioned from full
    162   // to non-full.  Nulled when a callback is sent.
    163   base::Time last_non_full_time_;
    164 
    165   // ** Peer information
    166 
    167   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
    168 
    169   // How much has been removed from this class that we haven't told
    170   // the input about yet.
    171   size_t unreported_consumed_bytes_;
    172 
    173   // Only valid to access on peer_task_runner_.
    174   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
    175 
    176   // Only valid to access on peer_task_runner_ if
    177   // |*peer_lifetime_flag_ == true|
    178   ByteStreamWriterImpl* peer_;
    179 };
    180 
    181 ByteStreamWriterImpl::ByteStreamWriterImpl(
    182     scoped_refptr<base::SequencedTaskRunner> task_runner,
    183     scoped_refptr<LifetimeFlag> lifetime_flag,
    184     size_t buffer_size)
    185     : total_buffer_size_(buffer_size),
    186       my_task_runner_(task_runner),
    187       my_lifetime_flag_(lifetime_flag),
    188       input_contents_size_(0),
    189       output_size_used_(0),
    190       peer_(NULL) {
    191   DCHECK(my_lifetime_flag_.get());
    192   my_lifetime_flag_->is_alive = true;
    193 }
    194 
    195 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
    196   my_lifetime_flag_->is_alive = false;
    197 }
    198 
    199 void ByteStreamWriterImpl::SetPeer(
    200     ByteStreamReaderImpl* peer,
    201     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
    202     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
    203   peer_ = peer;
    204   peer_task_runner_ = peer_task_runner;
    205   peer_lifetime_flag_ = peer_lifetime_flag;
    206 }
    207 
    208 bool ByteStreamWriterImpl::Write(
    209     scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
    210   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    211 
    212   input_contents_.push_back(std::make_pair(buffer, byte_count));
    213   input_contents_size_ += byte_count;
    214 
    215   // Arbitrarily, we buffer to a third of the total size before sending.
    216   if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
    217     PostToPeer(false, 0);
    218 
    219   return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
    220 }
    221 
    222 void ByteStreamWriterImpl::Flush() {
    223   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    224   if (input_contents_size_ > 0)
    225     PostToPeer(false, 0);
    226 }
    227 
    228 void ByteStreamWriterImpl::Close(int status) {
    229   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    230   PostToPeer(true, status);
    231 }
    232 
    233 void ByteStreamWriterImpl::RegisterCallback(
    234     const base::Closure& source_callback) {
    235   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    236   space_available_callback_ = source_callback;
    237 }
    238 
    239 // static
    240 void ByteStreamWriterImpl::UpdateWindow(
    241     scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
    242     size_t bytes_consumed) {
    243   // If the target object isn't alive anymore, we do nothing.
    244   if (!lifetime_flag->is_alive) return;
    245 
    246   target->UpdateWindowInternal(bytes_consumed);
    247 }
    248 
    249 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
    250   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    251   DCHECK_GE(output_size_used_, bytes_consumed);
    252   output_size_used_ -= bytes_consumed;
    253 
    254   // Callback if we were above the limit and we're now <= to it.
    255   size_t total_known_size_used =
    256       input_contents_size_ + output_size_used_;
    257 
    258   if (total_known_size_used <= total_buffer_size_ &&
    259       (total_known_size_used + bytes_consumed > total_buffer_size_) &&
    260       !space_available_callback_.is_null())
    261     space_available_callback_.Run();
    262 }
    263 
    264 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
    265   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    266   // Valid contexts in which to call.
    267   DCHECK(complete || 0 != input_contents_size_);
    268 
    269   scoped_ptr<ContentVector> transfer_buffer;
    270   size_t buffer_size = 0;
    271   if (0 != input_contents_size_) {
    272     transfer_buffer.reset(new ContentVector);
    273     transfer_buffer->swap(input_contents_);
    274     buffer_size = input_contents_size_;
    275     output_size_used_ += input_contents_size_;
    276     input_contents_size_ = 0;
    277   }
    278   peer_task_runner_->PostTask(
    279       FROM_HERE, base::Bind(
    280           &ByteStreamReaderImpl::TransferData,
    281           peer_lifetime_flag_,
    282           peer_,
    283           base::Passed(&transfer_buffer),
    284           buffer_size,
    285           complete,
    286           status));
    287 }
    288 
    289 ByteStreamReaderImpl::ByteStreamReaderImpl(
    290     scoped_refptr<base::SequencedTaskRunner> task_runner,
    291     scoped_refptr<LifetimeFlag> lifetime_flag,
    292     size_t buffer_size)
    293     : total_buffer_size_(buffer_size),
    294       my_task_runner_(task_runner),
    295       my_lifetime_flag_(lifetime_flag),
    296       received_status_(false),
    297       status_(0),
    298       unreported_consumed_bytes_(0),
    299       peer_(NULL) {
    300   DCHECK(my_lifetime_flag_.get());
    301   my_lifetime_flag_->is_alive = true;
    302 }
    303 
    304 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
    305   my_lifetime_flag_->is_alive = false;
    306 }
    307 
    308 void ByteStreamReaderImpl::SetPeer(
    309     ByteStreamWriterImpl* peer,
    310     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
    311     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
    312   peer_ = peer;
    313   peer_task_runner_ = peer_task_runner;
    314   peer_lifetime_flag_ = peer_lifetime_flag;
    315 }
    316 
    317 ByteStreamReaderImpl::StreamState
    318 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
    319                            size_t* length) {
    320   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    321 
    322   if (available_contents_.size()) {
    323     *data = available_contents_.front().first;
    324     *length = available_contents_.front().second;
    325     available_contents_.pop_front();
    326     unreported_consumed_bytes_ += *length;
    327 
    328     MaybeUpdateInput();
    329     return STREAM_HAS_DATA;
    330   }
    331   if (received_status_) {
    332     return STREAM_COMPLETE;
    333   }
    334   return STREAM_EMPTY;
    335 }
    336 
    337 int ByteStreamReaderImpl::GetStatus() const {
    338   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    339   DCHECK(received_status_);
    340   return status_;
    341 }
    342 
    343 void ByteStreamReaderImpl::RegisterCallback(
    344     const base::Closure& sink_callback) {
    345   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    346 
    347   data_available_callback_ = sink_callback;
    348 }
    349 
    350 // static
    351 void ByteStreamReaderImpl::TransferData(
    352     scoped_refptr<LifetimeFlag> object_lifetime_flag,
    353     ByteStreamReaderImpl* target,
    354     scoped_ptr<ContentVector> transfer_buffer,
    355     size_t buffer_size,
    356     bool source_complete,
    357     int status) {
    358   // If our target is no longer alive, do nothing.
    359   if (!object_lifetime_flag->is_alive) return;
    360 
    361   target->TransferDataInternal(
    362       transfer_buffer.Pass(), buffer_size, source_complete, status);
    363 }
    364 
    365 void ByteStreamReaderImpl::TransferDataInternal(
    366     scoped_ptr<ContentVector> transfer_buffer,
    367     size_t buffer_size,
    368     bool source_complete,
    369     int status) {
    370   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    371 
    372   bool was_empty = available_contents_.empty();
    373 
    374   if (transfer_buffer) {
    375     available_contents_.insert(available_contents_.end(),
    376                                transfer_buffer->begin(),
    377                                transfer_buffer->end());
    378   }
    379 
    380   if (source_complete) {
    381     received_status_ = true;
    382     status_ = status;
    383   }
    384 
    385   // Callback on transition from empty to non-empty, or
    386   // source complete.
    387   if (((was_empty && !available_contents_.empty()) ||
    388        source_complete) &&
    389       !data_available_callback_.is_null())
    390     data_available_callback_.Run();
    391 }
    392 
    393 // Decide whether or not to send the input a window update.
    394 // Currently we do that whenever we've got unreported consumption
    395 // greater than 1/3 of total size.
    396 void ByteStreamReaderImpl::MaybeUpdateInput() {
    397   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
    398 
    399   if (unreported_consumed_bytes_ <=
    400       total_buffer_size_ / kFractionReadBeforeWindowUpdate)
    401     return;
    402 
    403   peer_task_runner_->PostTask(
    404       FROM_HERE, base::Bind(
    405           &ByteStreamWriterImpl::UpdateWindow,
    406           peer_lifetime_flag_,
    407           peer_,
    408           unreported_consumed_bytes_));
    409   unreported_consumed_bytes_ = 0;
    410 }
    411 
    412 }  // namespace
    413 
    414 const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
    415 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
    416 
    417 ByteStreamReader::~ByteStreamReader() { }
    418 
    419 ByteStreamWriter::~ByteStreamWriter() { }
    420 
    421 void CreateByteStream(
    422     scoped_refptr<base::SequencedTaskRunner> input_task_runner,
    423     scoped_refptr<base::SequencedTaskRunner> output_task_runner,
    424     size_t buffer_size,
    425     scoped_ptr<ByteStreamWriter>* input,
    426     scoped_ptr<ByteStreamReader>* output) {
    427   scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
    428   scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
    429 
    430   ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
    431       input_task_runner, input_flag, buffer_size);
    432   ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
    433       output_task_runner, output_flag, buffer_size);
    434 
    435   in->SetPeer(out, output_task_runner, output_flag);
    436   out->SetPeer(in, input_task_runner, input_flag);
    437   input->reset(in);
    438   output->reset(out);
    439 }
    440 
    441 }  // namespace content
    442