Home | History | Annotate | Download | only in fileapi
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/media_galleries/fileapi/readahead_file_stream_reader.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/message_loop/message_loop.h"
     10 #include "base/numerics/safe_conversions.h"
     11 #include "net/base/io_buffer.h"
     12 #include "net/base/net_errors.h"
     13 
     14 using webkit_blob::FileStreamReader;
     15 
     16 namespace {
     17 
     18 const size_t kDesiredNumberOfBuffers = 2;  // So we are always one buffer ahead.
     19 const int kBufferSize = 1024*1024;  // 1MB to minimize transaction costs.
     20 
     21 }  // namespace
     22 
     23 ReadaheadFileStreamReader::ReadaheadFileStreamReader(FileStreamReader* source)
     24     : source_(source),
     25       source_error_(0),
     26       source_has_pending_read_(false),
     27       weak_factory_(this) {
     28 }
     29 
     30 ReadaheadFileStreamReader::~ReadaheadFileStreamReader() {}
     31 
     32 int ReadaheadFileStreamReader::Read(
     33     net::IOBuffer* buf, int buf_len, const net::CompletionCallback& callback) {
     34   DCHECK(!pending_sink_buffer_.get());
     35   DCHECK(pending_read_callback_.is_null());
     36 
     37   ReadFromSourceIfNeeded();
     38 
     39   scoped_refptr<net::DrainableIOBuffer> sink =
     40       new net::DrainableIOBuffer(buf, buf_len);
     41   int result = FinishReadFromCacheOrStoredError(sink);
     42 
     43   // We are waiting for an source read to complete, so save the request.
     44   if (result == net::ERR_IO_PENDING) {
     45     DCHECK(!pending_sink_buffer_.get());
     46     DCHECK(pending_read_callback_.is_null());
     47     pending_sink_buffer_ = sink;
     48     pending_read_callback_ = callback;
     49   }
     50 
     51   return result;
     52 }
     53 
     54 int64 ReadaheadFileStreamReader::GetLength(
     55     const net::Int64CompletionCallback& callback) {
     56   return source_->GetLength(callback);
     57 }
     58 
     59 int ReadaheadFileStreamReader::FinishReadFromCacheOrStoredError(
     60     net::DrainableIOBuffer* sink) {
     61   // If we don't have any ready cache, return the pending read code, or
     62   // the stored error code.
     63   if (buffers_.empty()) {
     64     if (source_.get()) {
     65       DCHECK(source_has_pending_read_);
     66       return net::ERR_IO_PENDING;
     67     } else {
     68       return source_error_;
     69     }
     70   }
     71 
     72   while (sink->BytesRemaining() > 0 && !buffers_.empty()) {
     73     net::DrainableIOBuffer* source_buffer = buffers_.front().get();
     74 
     75     DCHECK(source_buffer->BytesRemaining() > 0);
     76 
     77     int copy_len = std::min(source_buffer->BytesRemaining(),
     78                             sink->BytesRemaining());
     79     std::copy(source_buffer->data(), source_buffer->data() + copy_len,
     80               sink->data());
     81 
     82     source_buffer->DidConsume(copy_len);
     83     sink->DidConsume(copy_len);
     84 
     85     if (source_buffer->BytesRemaining() == 0) {
     86       buffers_.pop();
     87 
     88       // Get a new buffer to replace the one we just used up.
     89       ReadFromSourceIfNeeded();
     90     }
     91   }
     92 
     93   return sink->BytesConsumed();
     94 }
     95 
     96 void ReadaheadFileStreamReader::ReadFromSourceIfNeeded() {
     97   if (!source_.get() || source_has_pending_read_ ||
     98       buffers_.size() >= kDesiredNumberOfBuffers) {
     99     return;
    100   }
    101 
    102   source_has_pending_read_ = true;
    103 
    104   scoped_refptr<net::IOBuffer> buf(new net::IOBuffer(kBufferSize));
    105   int result = source_->Read(
    106       buf,
    107       kBufferSize,
    108       base::Bind(&ReadaheadFileStreamReader::OnFinishReadFromSource,
    109                  weak_factory_.GetWeakPtr(), buf));
    110 
    111   if (result != net::ERR_IO_PENDING)
    112     OnFinishReadFromSource(buf, result);
    113 }
    114 
    115 void ReadaheadFileStreamReader::OnFinishReadFromSource(net::IOBuffer* buf,
    116                                                        int result) {
    117   DCHECK(result != net::ERR_IO_PENDING);
    118   DCHECK(source_has_pending_read_);
    119   source_has_pending_read_ = false;
    120 
    121   // Either store the data read from |source_|, or store the error code.
    122   if (result > 0) {
    123     scoped_refptr<net::DrainableIOBuffer> drainable_buffer(
    124         new net::DrainableIOBuffer(buf, result));
    125     buffers_.push(drainable_buffer);
    126     ReadFromSourceIfNeeded();
    127   } else {
    128     source_.reset();
    129     source_error_ = result;
    130   }
    131 
    132   // If there's a read request waiting for the source FileStreamReader to
    133   // finish reading, fulfill that request now from the cache or stored error.
    134   if (pending_sink_buffer_.get()) {
    135     DCHECK(!pending_read_callback_.is_null());
    136 
    137     // Free the pending callback before running it, as the callback often
    138     // dispatches another read.
    139     scoped_refptr<net::DrainableIOBuffer> sink = pending_sink_buffer_;
    140     pending_sink_buffer_ = NULL;
    141     net::CompletionCallback completion_callback = pending_read_callback_;
    142     pending_read_callback_.Reset();
    143 
    144     completion_callback.Run(FinishReadFromCacheOrStoredError(sink));
    145   }
    146 }
    147