Home | History | Annotate | Download | only in drive
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/chromeos/drive/drive_file_stream_reader.h"
      6 
      7 #include <algorithm>
      8 #include <cstring>
      9 
     10 #include "base/callback_helpers.h"
     11 #include "base/logging.h"
     12 #include "base/sequenced_task_runner.h"
     13 #include "chrome/browser/chromeos/drive/drive.pb.h"
     14 #include "chrome/browser/chromeos/drive/file_system_interface.h"
     15 #include "chrome/browser/chromeos/drive/local_file_reader.h"
     16 #include "chrome/browser/google_apis/task_util.h"
     17 #include "content/public/browser/browser_thread.h"
     18 #include "net/base/io_buffer.h"
     19 #include "net/base/net_errors.h"
     20 #include "net/http/http_byte_range.h"
     21 
     22 using content::BrowserThread;
     23 
     24 namespace drive {
     25 namespace {
     26 
     27 // Converts FileError code to net::Error code.
     28 int FileErrorToNetError(FileError error) {
     29   return net::PlatformFileErrorToNetError(FileErrorToPlatformError(error));
     30 }
     31 
     32 // Runs task on UI thread.
     33 void RunTaskOnUIThread(const base::Closure& task) {
     34   google_apis::RunTaskOnThread(
     35       BrowserThread::GetMessageLoopProxyForThread(BrowserThread::UI), task);
     36 }
     37 
     38 }  // namespace
     39 
     40 namespace internal {
     41 namespace {
     42 
     43 // Copies the content in |pending_data| into |buffer| at most
     44 // |buffer_length| bytes, and erases the copied data from
     45 // |pending_data|. Returns the number of copied bytes.
     46 int ReadInternal(ScopedVector<std::string>* pending_data,
     47                  net::IOBuffer* buffer, int buffer_length) {
     48   size_t index = 0;
     49   int offset = 0;
     50   for (; index < pending_data->size() && offset < buffer_length; ++index) {
     51     const std::string& chunk = *(*pending_data)[index];
     52     DCHECK(!chunk.empty());
     53 
     54     size_t bytes_to_read = std::min(
     55         chunk.size(), static_cast<size_t>(buffer_length - offset));
     56     std::memmove(buffer->data() + offset, chunk.data(), bytes_to_read);
     57     offset += bytes_to_read;
     58     if (bytes_to_read < chunk.size()) {
     59       // The chunk still has some remaining data.
     60       // So remove leading (copied) bytes, and quit the loop so that
     61       // the remaining data won't be deleted in the following erase().
     62       (*pending_data)[index]->erase(0, bytes_to_read);
     63       break;
     64     }
     65   }
     66 
     67   // Consume the copied data.
     68   pending_data->erase(pending_data->begin(), pending_data->begin() + index);
     69 
     70   return offset;
     71 }
     72 
     73 }  // namespace
     74 
     75 LocalReaderProxy::LocalReaderProxy(
     76     scoped_ptr<util::LocalFileReader> file_reader, int64 length)
     77     : file_reader_(file_reader.Pass()),
     78       remaining_length_(length),
     79       weak_ptr_factory_(this) {
     80   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
     81   DCHECK(file_reader_);
     82 }
     83 
     84 LocalReaderProxy::~LocalReaderProxy() {
     85 }
     86 
     87 int LocalReaderProxy::Read(net::IOBuffer* buffer, int buffer_length,
     88                            const net::CompletionCallback& callback) {
     89   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
     90   DCHECK(file_reader_);
     91 
     92   if (buffer_length > remaining_length_) {
     93     // Here, narrowing is safe.
     94     buffer_length = static_cast<int>(remaining_length_);
     95   }
     96 
     97   file_reader_->Read(buffer, buffer_length,
     98                      base::Bind(&LocalReaderProxy::OnReadCompleted,
     99                                 weak_ptr_factory_.GetWeakPtr(), callback));
    100   return net::ERR_IO_PENDING;
    101 }
    102 
    103 void LocalReaderProxy::OnGetContent(scoped_ptr<std::string> data) {
    104   // This method should never be called, because no data should be received
    105   // from the network during the reading of local-cache file.
    106   NOTREACHED();
    107 }
    108 
    109 void LocalReaderProxy::OnCompleted(FileError error) {
    110   // If this method is called, no network error should be happened.
    111   DCHECK_EQ(FILE_ERROR_OK, error);
    112 }
    113 
    114 void LocalReaderProxy::OnReadCompleted(const net::CompletionCallback& callback,
    115                                        int read_result) {
    116   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    117   DCHECK(file_reader_);
    118 
    119   if (read_result >= 0) {
    120     // |read_result| bytes data is read.
    121     DCHECK_LE(read_result, remaining_length_);
    122     remaining_length_ -= read_result;
    123   } else {
    124     // An error occurs. Close the |file_reader_|.
    125     file_reader_.reset();
    126   }
    127   callback.Run(read_result);
    128 }
    129 
    130 NetworkReaderProxy::NetworkReaderProxy(
    131     int64 offset,
    132     int64 content_length,
    133     const base::Closure& job_canceller)
    134     : remaining_offset_(offset),
    135       remaining_content_length_(content_length),
    136       error_code_(net::OK),
    137       buffer_length_(0),
    138       job_canceller_(job_canceller) {
    139   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    140 }
    141 
    142 NetworkReaderProxy::~NetworkReaderProxy() {
    143   if (!job_canceller_.is_null()) {
    144     job_canceller_.Run();
    145   }
    146 }
    147 
    148 int NetworkReaderProxy::Read(net::IOBuffer* buffer, int buffer_length,
    149                              const net::CompletionCallback& callback) {
    150   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    151   // Check if there is no pending Read operation.
    152   DCHECK(!buffer_.get());
    153   DCHECK_EQ(buffer_length_, 0);
    154   DCHECK(callback_.is_null());
    155   // Validate the arguments.
    156   DCHECK(buffer);
    157   DCHECK_GT(buffer_length, 0);
    158   DCHECK(!callback.is_null());
    159 
    160   if (error_code_ != net::OK) {
    161     // An error is already found. Return it immediately.
    162     return error_code_;
    163   }
    164 
    165   if (remaining_content_length_ == 0) {
    166     // If no more data, return immediately.
    167     return 0;
    168   }
    169 
    170   if (buffer_length > remaining_content_length_) {
    171     // Here, narrowing cast should be safe.
    172     buffer_length = static_cast<int>(remaining_content_length_);
    173   }
    174 
    175   if (pending_data_.empty()) {
    176     // No data is available. Keep the arguments, and return pending status.
    177     buffer_ = buffer;
    178     buffer_length_ = buffer_length;
    179     callback_ = callback;
    180     return net::ERR_IO_PENDING;
    181   }
    182 
    183   int result = ReadInternal(&pending_data_, buffer, buffer_length);
    184   remaining_content_length_ -= result;
    185   DCHECK_GE(remaining_content_length_, 0);
    186   return result;
    187 }
    188 
    189 void NetworkReaderProxy::OnGetContent(scoped_ptr<std::string> data) {
    190   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    191   DCHECK(data && !data->empty());
    192 
    193   if (remaining_offset_ >= static_cast<int64>(data->length())) {
    194     // Skip unneeded leading data.
    195     remaining_offset_ -= data->length();
    196     return;
    197   }
    198 
    199   if (remaining_offset_ > 0) {
    200     // Erase unnecessary leading bytes.
    201     data->erase(0, static_cast<size_t>(remaining_offset_));
    202     remaining_offset_ = 0;
    203   }
    204 
    205   pending_data_.push_back(data.release());
    206   if (!buffer_.get()) {
    207     // No pending Read operation.
    208     return;
    209   }
    210 
    211   int result = ReadInternal(&pending_data_, buffer_.get(), buffer_length_);
    212   remaining_content_length_ -= result;
    213   DCHECK_GE(remaining_content_length_, 0);
    214 
    215   buffer_ = NULL;
    216   buffer_length_ = 0;
    217   DCHECK(!callback_.is_null());
    218   base::ResetAndReturn(&callback_).Run(result);
    219 }
    220 
    221 void NetworkReaderProxy::OnCompleted(FileError error) {
    222   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    223   // The downloading is completed, so we do not need to cancel the job
    224   // in the destructor.
    225   job_canceller_.Reset();
    226 
    227   if (error == FILE_ERROR_OK) {
    228     return;
    229   }
    230 
    231   error_code_ = FileErrorToNetError(error);
    232   pending_data_.clear();
    233 
    234   if (callback_.is_null()) {
    235     // No pending Read operation.
    236     return;
    237   }
    238 
    239   buffer_ = NULL;
    240   buffer_length_ = 0;
    241   base::ResetAndReturn(&callback_).Run(error_code_);
    242 }
    243 
    244 }  // namespace internal
    245 
    246 namespace {
    247 
    248 // Calls FileSystemInterface::GetFileContentByPath if the file system
    249 // is available. If not, the |completion_callback| is invoked with
    250 // FILE_ERROR_FAILED.
    251 void GetFileContentByPathOnUIThread(
    252     const DriveFileStreamReader::FileSystemGetter& file_system_getter,
    253     const base::FilePath& drive_file_path,
    254     const GetFileContentInitializedCallback& initialized_callback,
    255     const google_apis::GetContentCallback& get_content_callback,
    256     const FileOperationCallback& completion_callback) {
    257   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    258 
    259   FileSystemInterface* file_system = file_system_getter.Run();
    260   if (!file_system) {
    261     completion_callback.Run(FILE_ERROR_FAILED);
    262     return;
    263   }
    264 
    265   file_system->GetFileContentByPath(drive_file_path,
    266                                     initialized_callback,
    267                                     get_content_callback,
    268                                     completion_callback);
    269 }
    270 
    271 // Helper to run FileSystemInterface::GetFileContentByPath on UI thread.
    272 void GetFileContentByPath(
    273     const DriveFileStreamReader::FileSystemGetter& file_system_getter,
    274     const base::FilePath& drive_file_path,
    275     const GetFileContentInitializedCallback& initialized_callback,
    276     const google_apis::GetContentCallback& get_content_callback,
    277     const FileOperationCallback& completion_callback) {
    278   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    279 
    280   BrowserThread::PostTask(
    281       BrowserThread::UI,
    282       FROM_HERE,
    283       base::Bind(&GetFileContentByPathOnUIThread,
    284                  file_system_getter,
    285                  drive_file_path,
    286                  google_apis::CreateRelayCallback(initialized_callback),
    287                  google_apis::CreateRelayCallback(get_content_callback),
    288                  google_apis::CreateRelayCallback(completion_callback)));
    289 }
    290 
    291 }  // namespace
    292 
    293 DriveFileStreamReader::DriveFileStreamReader(
    294     const FileSystemGetter& file_system_getter,
    295     base::SequencedTaskRunner* file_task_runner)
    296     : file_system_getter_(file_system_getter),
    297       file_task_runner_(file_task_runner),
    298       weak_ptr_factory_(this) {
    299   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    300 }
    301 
    302 DriveFileStreamReader::~DriveFileStreamReader() {
    303 }
    304 
    305 bool DriveFileStreamReader::IsInitialized() const {
    306   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    307   return reader_proxy_.get() != NULL;
    308 }
    309 
    310 void DriveFileStreamReader::Initialize(
    311     const base::FilePath& drive_file_path,
    312     const net::HttpByteRange& byte_range,
    313     const InitializeCompletionCallback& callback) {
    314   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    315   DCHECK(!callback.is_null());
    316 
    317   GetFileContentByPath(
    318       file_system_getter_,
    319       drive_file_path,
    320       base::Bind(&DriveFileStreamReader
    321                      ::InitializeAfterGetFileContentByPathInitialized,
    322                  weak_ptr_factory_.GetWeakPtr(),
    323                  byte_range,
    324                  callback),
    325       base::Bind(&DriveFileStreamReader::OnGetContent,
    326                  weak_ptr_factory_.GetWeakPtr()),
    327       base::Bind(&DriveFileStreamReader::OnGetFileContentByPathCompletion,
    328                  weak_ptr_factory_.GetWeakPtr(),
    329                  callback));
    330 }
    331 
    332 int DriveFileStreamReader::Read(net::IOBuffer* buffer, int buffer_length,
    333                                 const net::CompletionCallback& callback) {
    334   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    335   DCHECK(reader_proxy_);
    336   DCHECK(buffer);
    337   DCHECK(!callback.is_null());
    338   return reader_proxy_->Read(buffer, buffer_length, callback);
    339 }
    340 
    341 void DriveFileStreamReader::InitializeAfterGetFileContentByPathInitialized(
    342     const net::HttpByteRange& in_byte_range,
    343     const InitializeCompletionCallback& callback,
    344     FileError error,
    345     scoped_ptr<ResourceEntry> entry,
    346     const base::FilePath& local_cache_file_path,
    347     const base::Closure& ui_cancel_download_closure) {
    348   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    349 
    350   if (error != FILE_ERROR_OK) {
    351     callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>());
    352     return;
    353   }
    354   DCHECK(entry);
    355 
    356   net::HttpByteRange byte_range = in_byte_range;
    357   if (!byte_range.ComputeBounds(entry->file_info().size())) {
    358     // If |byte_range| is invalid (e.g. out of bounds), return with an error.
    359     // At the same time, we cancel the in-flight downloading operation if
    360     // needed and and invalidate weak pointers so that we won't
    361     // receive unwanted callbacks.
    362     if (!ui_cancel_download_closure.is_null())
    363       ui_cancel_download_closure.Run();
    364     weak_ptr_factory_.InvalidateWeakPtrs();
    365     callback.Run(
    366         net::ERR_REQUEST_RANGE_NOT_SATISFIABLE, scoped_ptr<ResourceEntry>());
    367     return;
    368   }
    369 
    370   // Note: both boundary of |byte_range| are inclusive.
    371   int64 range_length =
    372       byte_range.last_byte_position() - byte_range.first_byte_position() + 1;
    373   DCHECK_GE(range_length, 0);
    374 
    375   if (local_cache_file_path.empty()) {
    376     // The file is not cached, and being downloaded.
    377     DCHECK(!ui_cancel_download_closure.is_null());
    378     reader_proxy_.reset(
    379         new internal::NetworkReaderProxy(
    380             byte_range.first_byte_position(), range_length,
    381             base::Bind(&RunTaskOnUIThread, ui_cancel_download_closure)));
    382     callback.Run(net::OK, entry.Pass());
    383     return;
    384   }
    385 
    386   // Otherwise, open the stream for file.
    387   scoped_ptr<util::LocalFileReader> file_reader(
    388       new util::LocalFileReader(file_task_runner_.get()));
    389   util::LocalFileReader* file_reader_ptr = file_reader.get();
    390   file_reader_ptr->Open(
    391       local_cache_file_path,
    392       byte_range.first_byte_position(),
    393       base::Bind(
    394           &DriveFileStreamReader::InitializeAfterLocalFileOpen,
    395           weak_ptr_factory_.GetWeakPtr(),
    396           range_length,
    397           callback,
    398           base::Passed(&entry),
    399           base::Passed(&file_reader)));
    400 }
    401 
    402 void DriveFileStreamReader::InitializeAfterLocalFileOpen(
    403     int64 length,
    404     const InitializeCompletionCallback& callback,
    405     scoped_ptr<ResourceEntry> entry,
    406     scoped_ptr<util::LocalFileReader> file_reader,
    407     int open_result) {
    408   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    409 
    410   if (open_result != net::OK) {
    411     callback.Run(net::ERR_FAILED, scoped_ptr<ResourceEntry>());
    412     return;
    413   }
    414 
    415   reader_proxy_.reset(
    416       new internal::LocalReaderProxy(file_reader.Pass(), length));
    417   callback.Run(net::OK, entry.Pass());
    418 }
    419 
    420 void DriveFileStreamReader::OnGetContent(google_apis::GDataErrorCode error_code,
    421                                          scoped_ptr<std::string> data) {
    422   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    423   DCHECK(reader_proxy_);
    424   reader_proxy_->OnGetContent(data.Pass());
    425 }
    426 
    427 void DriveFileStreamReader::OnGetFileContentByPathCompletion(
    428     const InitializeCompletionCallback& callback,
    429     FileError error) {
    430   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
    431 
    432   if (reader_proxy_) {
    433     // If the proxy object available, send the error to it.
    434     reader_proxy_->OnCompleted(error);
    435   } else {
    436     // Here the proxy object is not yet available.
    437     // There are two cases. 1) Some error happens during the initialization.
    438     // 2) the cache file is found, but the proxy object is not *yet*
    439     // initialized because the file is being opened.
    440     // We are interested in 1) only. The callback for 2) will be called
    441     // after opening the file is completed.
    442     // Note: due to the same reason, LocalReaderProxy::OnCompleted may
    443     // or may not be called. This is timing issue, and it is difficult to avoid
    444     // unfortunately.
    445     if (error != FILE_ERROR_OK) {
    446       callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>());
    447     }
    448   }
    449 }
    450 
    451 }  // namespace drive
    452