Home | History | Annotate | Download | only in drive
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/chromeos/drive/drive_file_stream_reader.h"
      6 
      7 #include <algorithm>
      8 #include <cstring>
      9 
     10 #include "base/callback_helpers.h"
     11 #include "base/logging.h"
     12 #include "base/sequenced_task_runner.h"
     13 #include "chrome/browser/chromeos/drive/drive.pb.h"
     14 #include "chrome/browser/chromeos/drive/file_system_interface.h"
     15 #include "chrome/browser/chromeos/drive/local_file_reader.h"
     16 #include "content/public/browser/browser_thread.h"
     17 #include "google_apis/drive/task_util.h"
     18 #include "net/base/io_buffer.h"
     19 #include "net/base/net_errors.h"
     20 #include "net/http/http_byte_range.h"
     21 
     22 using content::BrowserThread;
     23 
     24 namespace drive {
     25 namespace {
     26 
     27 // Converts FileError code to net::Error code.
     28 int FileErrorToNetError(FileError error) {
     29   return net::FileErrorToNetError(FileErrorToBaseFileError(error));
     30 }
     31 
     32 // Computes the concrete |start| offset and the |length| of |range| in a file
     33 // of |total| size.
     34 //
     35 // This is a thin wrapper of HttpByteRange::ComputeBounds, extended to allow
     36 // an empty range at the end of the file, like "Range: bytes 0-" on a zero byte
     37 // file. This is for convenience in unifying implementation with the seek
     38 // operation of stream reader. HTTP doesn't allow such ranges but we want to
     39 // treat such seeking as valid.
     40 bool ComputeConcretePosition(net::HttpByteRange range, int64 total,
     41                              int64* start, int64* length) {
     42   // The special case when empty range in the end of the file is selected.
     43   if (range.HasFirstBytePosition() && range.first_byte_position() == total) {
     44     *start = range.first_byte_position();
     45     *length = 0;
     46     return true;
     47   }
     48 
     49   // Otherwise forward to HttpByteRange::ComputeBounds.
     50   if (!range.ComputeBounds(total))
     51     return false;
     52   *start = range.first_byte_position();
     53   *length = range.last_byte_position() - range.first_byte_position() + 1;
     54   return true;
     55 }
     56 
     57 }  // namespace
     58 
     59 namespace internal {
     60 namespace {
     61 
     62 // Copies the content in |pending_data| into |buffer| at most
     63 // |buffer_length| bytes, and erases the copied data from
     64 // |pending_data|. Returns the number of copied bytes.
     65 int ReadInternal(ScopedVector<std::string>* pending_data,
     66                  net::IOBuffer* buffer, int buffer_length) {
     67   size_t index = 0;
     68   int offset = 0;
     69   for (; index < pending_data->size() && offset < buffer_length; ++index) {
     70     const std::string& chunk = *(*pending_data)[index];
     71     DCHECK(!chunk.empty());
     72 
     73     size_t bytes_to_read = std::min(
     74         chunk.size(), static_cast<size_t>(buffer_length - offset));
     75     std::memmove(buffer->data() + offset, chunk.data(), bytes_to_read);
     76     offset += bytes_to_read;
     77     if (bytes_to_read < chunk.size()) {
     78       // The chunk still has some remaining data.
     79       // So remove leading (copied) bytes, and quit the loop so that
     80       // the remaining data won't be deleted in the following erase().
     81       (*pending_data)[index]->erase(0, bytes_to_read);
     82       break;
     83     }
     84   }
     85 
     86   // Consume the copied data.
     87   pending_data->erase(pending_data->begin(), pending_data->begin() + index);
     88 
     89   return offset;
     90 }
     91 
     92 }  // namespace
     93 
     94 LocalReaderProxy::LocalReaderProxy(
     95     scoped_ptr<util::LocalFileReader> file_reader, int64 length)
     96     : file_reader_(file_reader.Pass()),
     97       remaining_length_(length),
     98       weak_ptr_factory_(this) {
     99   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    100   DCHECK(file_reader_);
    101 }
    102 
    103 LocalReaderProxy::~LocalReaderProxy() {
    104 }
    105 
    106 int LocalReaderProxy::Read(net::IOBuffer* buffer, int buffer_length,
    107                            const net::CompletionCallback& callback) {
    108   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    109   DCHECK(file_reader_);
    110 
    111   if (buffer_length > remaining_length_) {
    112     // Here, narrowing is safe.
    113     buffer_length = static_cast<int>(remaining_length_);
    114   }
    115 
    116   if (!buffer_length)
    117     return 0;
    118 
    119   file_reader_->Read(buffer, buffer_length,
    120                      base::Bind(&LocalReaderProxy::OnReadCompleted,
    121                                 weak_ptr_factory_.GetWeakPtr(), callback));
    122   return net::ERR_IO_PENDING;
    123 }
    124 
    125 void LocalReaderProxy::OnGetContent(scoped_ptr<std::string> data) {
    126   // This method should never be called, because no data should be received
    127   // from the network during the reading of local-cache file.
    128   NOTREACHED();
    129 }
    130 
    131 void LocalReaderProxy::OnCompleted(FileError error) {
    132   // If this method is called, no network error should be happened.
    133   DCHECK_EQ(FILE_ERROR_OK, error);
    134 }
    135 
    136 void LocalReaderProxy::OnReadCompleted(const net::CompletionCallback& callback,
    137                                        int read_result) {
    138   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    139   DCHECK(file_reader_);
    140 
    141   if (read_result >= 0) {
    142     // |read_result| bytes data is read.
    143     DCHECK_LE(read_result, remaining_length_);
    144     remaining_length_ -= read_result;
    145   } else {
    146     // An error occurs. Close the |file_reader_|.
    147     file_reader_.reset();
    148   }
    149   callback.Run(read_result);
    150 }
    151 
    152 NetworkReaderProxy::NetworkReaderProxy(
    153     int64 offset,
    154     int64 content_length,
    155     int64 full_content_length,
    156     const base::Closure& job_canceller)
    157     : remaining_offset_(offset),
    158       remaining_content_length_(content_length),
    159       is_full_download_(offset + content_length == full_content_length),
    160       error_code_(net::OK),
    161       buffer_length_(0),
    162       job_canceller_(job_canceller) {
    163   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    164 }
    165 
    166 NetworkReaderProxy::~NetworkReaderProxy() {
    167   if (!job_canceller_.is_null()) {
    168     job_canceller_.Run();
    169   }
    170 }
    171 
    172 int NetworkReaderProxy::Read(net::IOBuffer* buffer, int buffer_length,
    173                              const net::CompletionCallback& callback) {
    174   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    175   // Check if there is no pending Read operation.
    176   DCHECK(!buffer_.get());
    177   DCHECK_EQ(buffer_length_, 0);
    178   DCHECK(callback_.is_null());
    179   // Validate the arguments.
    180   DCHECK(buffer);
    181   DCHECK_GT(buffer_length, 0);
    182   DCHECK(!callback.is_null());
    183 
    184   if (error_code_ != net::OK) {
    185     // An error is already found. Return it immediately.
    186     return error_code_;
    187   }
    188 
    189   if (remaining_content_length_ == 0) {
    190     // If no more data, return immediately.
    191     return 0;
    192   }
    193 
    194   if (buffer_length > remaining_content_length_) {
    195     // Here, narrowing cast should be safe.
    196     buffer_length = static_cast<int>(remaining_content_length_);
    197   }
    198 
    199   if (pending_data_.empty()) {
    200     // No data is available. Keep the arguments, and return pending status.
    201     buffer_ = buffer;
    202     buffer_length_ = buffer_length;
    203     callback_ = callback;
    204     return net::ERR_IO_PENDING;
    205   }
    206 
    207   int result = ReadInternal(&pending_data_, buffer, buffer_length);
    208   remaining_content_length_ -= result;
    209   DCHECK_GE(remaining_content_length_, 0);
    210 
    211   // Although OnCompleted() should reset |job_canceller_| when download is done,
    212   // due to timing issues the ReaderProxy instance may be destructed before the
    213   // notification. To fix the case we reset here earlier.
    214   if (is_full_download_ && remaining_content_length_ == 0)
    215     job_canceller_.Reset();
    216 
    217   return result;
    218 }
    219 
    220 void NetworkReaderProxy::OnGetContent(scoped_ptr<std::string> data) {
    221   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    222   DCHECK(data && !data->empty());
    223 
    224   if (remaining_offset_ >= static_cast<int64>(data->length())) {
    225     // Skip unneeded leading data.
    226     remaining_offset_ -= data->length();
    227     return;
    228   }
    229 
    230   if (remaining_offset_ > 0) {
    231     // Erase unnecessary leading bytes.
    232     data->erase(0, static_cast<size_t>(remaining_offset_));
    233     remaining_offset_ = 0;
    234   }
    235 
    236   pending_data_.push_back(data.release());
    237   if (!buffer_.get()) {
    238     // No pending Read operation.
    239     return;
    240   }
    241 
    242   int result = ReadInternal(&pending_data_, buffer_.get(), buffer_length_);
    243   remaining_content_length_ -= result;
    244   DCHECK_GE(remaining_content_length_, 0);
    245 
    246   if (is_full_download_ && remaining_content_length_ == 0)
    247     job_canceller_.Reset();
    248 
    249   buffer_ = NULL;
    250   buffer_length_ = 0;
    251   DCHECK(!callback_.is_null());
    252   base::ResetAndReturn(&callback_).Run(result);
    253 }
    254 
    255 void NetworkReaderProxy::OnCompleted(FileError error) {
    256   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    257   // The downloading is completed, so we do not need to cancel the job
    258   // in the destructor.
    259   job_canceller_.Reset();
    260 
    261   if (error == FILE_ERROR_OK) {
    262     return;
    263   }
    264 
    265   error_code_ = FileErrorToNetError(error);
    266   pending_data_.clear();
    267 
    268   if (callback_.is_null()) {
    269     // No pending Read operation.
    270     return;
    271   }
    272 
    273   buffer_ = NULL;
    274   buffer_length_ = 0;
    275   base::ResetAndReturn(&callback_).Run(error_code_);
    276 }
    277 
    278 }  // namespace internal
    279 
    280 namespace {
    281 
    282 // Calls FileSystemInterface::GetFileContent if the file system
    283 // is available. If not, the |completion_callback| is invoked with
    284 // FILE_ERROR_FAILED.
    285 base::Closure GetFileContentOnUIThread(
    286     const DriveFileStreamReader::FileSystemGetter& file_system_getter,
    287     const base::FilePath& drive_file_path,
    288     const GetFileContentInitializedCallback& initialized_callback,
    289     const google_apis::GetContentCallback& get_content_callback,
    290     const FileOperationCallback& completion_callback) {
    291   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    292 
    293   FileSystemInterface* file_system = file_system_getter.Run();
    294   if (!file_system) {
    295     completion_callback.Run(FILE_ERROR_FAILED);
    296     return base::Closure();
    297   }
    298 
    299   return google_apis::CreateRelayCallback(
    300       file_system->GetFileContent(drive_file_path,
    301                                   initialized_callback,
    302                                   get_content_callback,
    303                                   completion_callback));
    304 }
    305 
    306 // Helper to run FileSystemInterface::GetFileContent on UI thread.
    307 void GetFileContent(
    308     const DriveFileStreamReader::FileSystemGetter& file_system_getter,
    309     const base::FilePath& drive_file_path,
    310     const GetFileContentInitializedCallback& initialized_callback,
    311     const google_apis::GetContentCallback& get_content_callback,
    312     const FileOperationCallback& completion_callback,
    313     const base::Callback<void(const base::Closure&)>& reply_callback) {
    314   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    315 
    316   BrowserThread::PostTaskAndReplyWithResult(
    317       BrowserThread::UI,
    318       FROM_HERE,
    319       base::Bind(&GetFileContentOnUIThread,
    320                  file_system_getter,
    321                  drive_file_path,
    322                  google_apis::CreateRelayCallback(initialized_callback),
    323                  google_apis::CreateRelayCallback(get_content_callback),
    324                  google_apis::CreateRelayCallback(completion_callback)),
    325       reply_callback);
    326 }
    327 
    328 }  // namespace
    329 
    330 DriveFileStreamReader::DriveFileStreamReader(
    331     const FileSystemGetter& file_system_getter,
    332     base::SequencedTaskRunner* file_task_runner)
    333     : file_system_getter_(file_system_getter),
    334       file_task_runner_(file_task_runner),
    335       weak_ptr_factory_(this) {
    336   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    337 }
    338 
    339 DriveFileStreamReader::~DriveFileStreamReader() {
    340 }
    341 
    342 bool DriveFileStreamReader::IsInitialized() const {
    343   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    344   return reader_proxy_.get() != NULL;
    345 }
    346 
    347 void DriveFileStreamReader::Initialize(
    348     const base::FilePath& drive_file_path,
    349     const net::HttpByteRange& byte_range,
    350     const InitializeCompletionCallback& callback) {
    351   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    352   DCHECK(!callback.is_null());
    353 
    354   GetFileContent(
    355       file_system_getter_,
    356       drive_file_path,
    357       base::Bind(&DriveFileStreamReader
    358                      ::InitializeAfterGetFileContentInitialized,
    359                  weak_ptr_factory_.GetWeakPtr(),
    360                  byte_range,
    361                  callback),
    362       base::Bind(&DriveFileStreamReader::OnGetContent,
    363                  weak_ptr_factory_.GetWeakPtr()),
    364       base::Bind(&DriveFileStreamReader::OnGetFileContentCompletion,
    365                  weak_ptr_factory_.GetWeakPtr(),
    366                  callback),
    367       base::Bind(&DriveFileStreamReader::StoreCancelDownloadClosure,
    368                  weak_ptr_factory_.GetWeakPtr()));
    369 }
    370 
    371 int DriveFileStreamReader::Read(net::IOBuffer* buffer, int buffer_length,
    372                                 const net::CompletionCallback& callback) {
    373   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    374   DCHECK(reader_proxy_);
    375   DCHECK(buffer);
    376   DCHECK(!callback.is_null());
    377   return reader_proxy_->Read(buffer, buffer_length, callback);
    378 }
    379 
    380 void DriveFileStreamReader::StoreCancelDownloadClosure(
    381     const base::Closure& cancel_download_closure) {
    382   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    383   cancel_download_closure_ = cancel_download_closure;
    384 }
    385 
    386 void DriveFileStreamReader::InitializeAfterGetFileContentInitialized(
    387     const net::HttpByteRange& byte_range,
    388     const InitializeCompletionCallback& callback,
    389     FileError error,
    390     const base::FilePath& local_cache_file_path,
    391     scoped_ptr<ResourceEntry> entry) {
    392   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    393   // StoreCancelDownloadClosure() should be called before this function.
    394   DCHECK(!cancel_download_closure_.is_null());
    395 
    396   if (error != FILE_ERROR_OK) {
    397     callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>());
    398     return;
    399   }
    400   DCHECK(entry);
    401 
    402   int64 range_start = 0, range_length = 0;
    403   if (!ComputeConcretePosition(byte_range, entry->file_info().size(),
    404                                &range_start, &range_length)) {
    405     // If |byte_range| is invalid (e.g. out of bounds), return with an error.
    406     // At the same time, we cancel the in-flight downloading operation if
    407     // needed and and invalidate weak pointers so that we won't
    408     // receive unwanted callbacks.
    409     cancel_download_closure_.Run();
    410     weak_ptr_factory_.InvalidateWeakPtrs();
    411     callback.Run(
    412         net::ERR_REQUEST_RANGE_NOT_SATISFIABLE, scoped_ptr<ResourceEntry>());
    413     return;
    414   }
    415 
    416   if (local_cache_file_path.empty()) {
    417     // The file is not cached, and being downloaded.
    418     reader_proxy_.reset(
    419         new internal::NetworkReaderProxy(
    420             range_start, range_length,
    421             entry->file_info().size(), cancel_download_closure_));
    422     callback.Run(net::OK, entry.Pass());
    423     return;
    424   }
    425 
    426   // Otherwise, open the stream for file.
    427   scoped_ptr<util::LocalFileReader> file_reader(
    428       new util::LocalFileReader(file_task_runner_.get()));
    429   util::LocalFileReader* file_reader_ptr = file_reader.get();
    430   file_reader_ptr->Open(
    431       local_cache_file_path,
    432       range_start,
    433       base::Bind(
    434           &DriveFileStreamReader::InitializeAfterLocalFileOpen,
    435           weak_ptr_factory_.GetWeakPtr(),
    436           range_length,
    437           callback,
    438           base::Passed(&entry),
    439           base::Passed(&file_reader)));
    440 }
    441 
    442 void DriveFileStreamReader::InitializeAfterLocalFileOpen(
    443     int64 length,
    444     const InitializeCompletionCallback& callback,
    445     scoped_ptr<ResourceEntry> entry,
    446     scoped_ptr<util::LocalFileReader> file_reader,
    447     int open_result) {
    448   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    449 
    450   if (open_result != net::OK) {
    451     callback.Run(net::ERR_FAILED, scoped_ptr<ResourceEntry>());
    452     return;
    453   }
    454 
    455   reader_proxy_.reset(
    456       new internal::LocalReaderProxy(file_reader.Pass(), length));
    457   callback.Run(net::OK, entry.Pass());
    458 }
    459 
    460 void DriveFileStreamReader::OnGetContent(google_apis::GDataErrorCode error_code,
    461                                          scoped_ptr<std::string> data) {
    462   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    463   DCHECK(reader_proxy_);
    464   reader_proxy_->OnGetContent(data.Pass());
    465 }
    466 
    467 void DriveFileStreamReader::OnGetFileContentCompletion(
    468     const InitializeCompletionCallback& callback,
    469     FileError error) {
    470   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    471 
    472   if (reader_proxy_) {
    473     // If the proxy object available, send the error to it.
    474     reader_proxy_->OnCompleted(error);
    475   } else {
    476     // Here the proxy object is not yet available.
    477     // There are two cases. 1) Some error happens during the initialization.
    478     // 2) the cache file is found, but the proxy object is not *yet*
    479     // initialized because the file is being opened.
    480     // We are interested in 1) only. The callback for 2) will be called
    481     // after opening the file is completed.
    482     // Note: due to the same reason, LocalReaderProxy::OnCompleted may
    483     // or may not be called. This is timing issue, and it is difficult to avoid
    484     // unfortunately.
    485     if (error != FILE_ERROR_OK) {
    486       callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>());
    487     }
    488   }
    489 }
    490 
    491 }  // namespace drive
    492