Home | History | Annotate | Download | only in system
      1 // Copyright 2017 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/public/cpp/system/file_data_pipe_producer.h"
      6 
      7 #include <algorithm>
      8 #include <limits>
      9 #include <memory>
     10 #include <utility>
     11 
     12 #include "base/bind.h"
     13 #include "base/callback.h"
     14 #include "base/location.h"
     15 #include "base/memory/ref_counted_delete_on_sequence.h"
     16 #include "base/numerics/safe_conversions.h"
     17 #include "base/sequenced_task_runner.h"
     18 #include "base/synchronization/lock.h"
     19 #include "base/task_scheduler/post_task.h"
     20 #include "base/threading/sequenced_task_runner_handle.h"
     21 #include "mojo/public/cpp/system/simple_watcher.h"
     22 
     23 namespace mojo {
     24 
     25 namespace {
     26 
     27 // No good reason not to attempt very large pipe transactions in case the data
     28 // pipe in use has a very large capacity available, so we default to trying
     29 // 64 MB chunks whenever a producer is writable.
     30 constexpr uint32_t kDefaultMaxReadSize = 64 * 1024 * 1024;
     31 
     32 MojoResult FileErrorToMojoResult(base::File::Error error) {
     33   switch (error) {
     34     case base::File::FILE_OK:
     35       return MOJO_RESULT_OK;
     36     case base::File::FILE_ERROR_NOT_FOUND:
     37       return MOJO_RESULT_NOT_FOUND;
     38     case base::File::FILE_ERROR_SECURITY:
     39     case base::File::FILE_ERROR_ACCESS_DENIED:
     40       return MOJO_RESULT_PERMISSION_DENIED;
     41     case base::File::FILE_ERROR_TOO_MANY_OPENED:
     42     case base::File::FILE_ERROR_NO_MEMORY:
     43       return MOJO_RESULT_RESOURCE_EXHAUSTED;
     44     case base::File::FILE_ERROR_ABORT:
     45       return MOJO_RESULT_ABORTED;
     46     default:
     47       return MOJO_RESULT_UNKNOWN;
     48   }
     49 }
     50 
     51 }  // namespace
     52 
     53 class FileDataPipeProducer::FileSequenceState
     54     : public base::RefCountedDeleteOnSequence<FileSequenceState> {
     55  public:
     56   using CompletionCallback =
     57       base::OnceCallback<void(ScopedDataPipeProducerHandle producer,
     58                               MojoResult result)>;
     59 
     60   FileSequenceState(
     61       ScopedDataPipeProducerHandle producer_handle,
     62       scoped_refptr<base::SequencedTaskRunner> file_task_runner,
     63       CompletionCallback callback,
     64       scoped_refptr<base::SequencedTaskRunner> callback_task_runner,
     65       std::unique_ptr<Observer> observer)
     66       : base::RefCountedDeleteOnSequence<FileSequenceState>(
     67             std::move(file_task_runner)),
     68         callback_task_runner_(std::move(callback_task_runner)),
     69         producer_handle_(std::move(producer_handle)),
     70         callback_(std::move(callback)),
     71         observer_(std::move(observer)) {}
     72 
     73   void Cancel() {
     74     base::AutoLock lock(lock_);
     75     is_cancelled_ = true;
     76   }
     77 
     78   void StartFromFile(base::File file, size_t max_bytes) {
     79     owning_task_runner()->PostTask(
     80         FROM_HERE,
     81         base::BindOnce(&FileSequenceState::StartFromFileOnFileSequence, this,
     82                        std::move(file), max_bytes));
     83   }
     84 
     85   void StartFromPath(const base::FilePath& path) {
     86     owning_task_runner()->PostTask(
     87         FROM_HERE,
     88         base::BindOnce(&FileSequenceState::StartFromPathOnFileSequence, this,
     89                        path));
     90   }
     91 
     92  private:
     93   friend class base::DeleteHelper<FileSequenceState>;
     94   friend class base::RefCountedDeleteOnSequence<FileSequenceState>;
     95 
     96   ~FileSequenceState() = default;
     97 
     98   void StartFromFileOnFileSequence(base::File file, size_t max_bytes) {
     99     if (file.error_details() != base::File::FILE_OK) {
    100       Finish(FileErrorToMojoResult(file.error_details()));
    101       return;
    102     }
    103     file_ = std::move(file);
    104     max_bytes_ = max_bytes;
    105     TransferSomeBytes();
    106     if (producer_handle_.is_valid()) {
    107       // If we didn't nail it all on the first transaction attempt, setup a
    108       // watcher and complete the read asynchronously.
    109       watcher_ = std::make_unique<SimpleWatcher>(
    110           FROM_HERE, SimpleWatcher::ArmingPolicy::AUTOMATIC,
    111           base::SequencedTaskRunnerHandle::Get());
    112       watcher_->Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
    113                       MOJO_WATCH_CONDITION_SATISFIED,
    114                       base::Bind(&FileSequenceState::OnHandleReady, this));
    115     }
    116   }
    117 
    118   void StartFromPathOnFileSequence(const base::FilePath& path) {
    119     StartFromFileOnFileSequence(
    120         base::File(path, base::File::FLAG_OPEN | base::File::FLAG_READ),
    121         std::numeric_limits<size_t>::max());
    122   }
    123 
    124   void OnHandleReady(MojoResult result, const HandleSignalsState& state) {
    125     {
    126       // Stop ourselves from doing redundant work if we've been cancelled from
    127       // another thread. Note that we do not rely on this for any kind of thread
    128       // safety concerns.
    129       base::AutoLock lock(lock_);
    130       if (is_cancelled_)
    131         return;
    132     }
    133 
    134     if (result != MOJO_RESULT_OK) {
    135       // Either the consumer pipe has been closed or something terrible
    136       // happened. In any case, we'll never be able to write more data.
    137       Finish(result);
    138       return;
    139     }
    140 
    141     TransferSomeBytes();
    142   }
    143 
    144   void TransferSomeBytes() {
    145     while (true) {
    146       // Lock as much of the pipe as we can.
    147       void* pipe_buffer;
    148       uint32_t size = kDefaultMaxReadSize;
    149       MojoResult result = producer_handle_->BeginWriteData(
    150           &pipe_buffer, &size, MOJO_WRITE_DATA_FLAG_NONE);
    151       if (result == MOJO_RESULT_SHOULD_WAIT)
    152         return;
    153       if (result != MOJO_RESULT_OK) {
    154         Finish(result);
    155         return;
    156       }
    157 
    158       // Attempt to read that many bytes from the file, directly into the data
    159       // pipe. Note that while |max_bytes_remaining| may be very large, the
    160       // length we attempt read is bounded by the much smaller
    161       // |kDefaultMaxReadSize| via |size|.
    162       DCHECK(base::IsValueInRangeForNumericType<int>(size));
    163       const size_t max_bytes_remaining = max_bytes_ - bytes_transferred_;
    164       int attempted_read_size = static_cast<int>(
    165           std::min(static_cast<size_t>(size), max_bytes_remaining));
    166       int read_size = file_.ReadAtCurrentPos(static_cast<char*>(pipe_buffer),
    167                                              attempted_read_size);
    168       base::File::Error read_error;
    169       if (read_size < 0) {
    170         read_error = base::File::GetLastFileError();
    171         DCHECK_NE(base::File::FILE_OK, read_error);
    172         if (observer_)
    173           observer_->OnBytesRead(pipe_buffer, 0u, read_error);
    174       } else {
    175         read_error = base::File::FILE_OK;
    176         if (observer_) {
    177           observer_->OnBytesRead(pipe_buffer, static_cast<size_t>(read_size),
    178                                  base::File::FILE_OK);
    179         }
    180       }
    181       producer_handle_->EndWriteData(
    182           read_size >= 0 ? static_cast<uint32_t>(read_size) : 0);
    183 
    184       if (read_size < 0) {
    185         Finish(FileErrorToMojoResult(read_error));
    186         return;
    187       }
    188 
    189       bytes_transferred_ += read_size;
    190       DCHECK_LE(bytes_transferred_, max_bytes_);
    191 
    192       if (read_size < attempted_read_size) {
    193         // ReadAtCurrentPos makes a best effort to read all requested bytes. We
    194         // reasonably assume if it fails to read what we ask for, we've hit EOF.
    195         Finish(MOJO_RESULT_OK);
    196         return;
    197       }
    198 
    199       if (bytes_transferred_ == max_bytes_) {
    200         // We've read as much as we were asked to read.
    201         Finish(MOJO_RESULT_OK);
    202         return;
    203       }
    204     }
    205   }
    206 
    207   void Finish(MojoResult result) {
    208     if (observer_) {
    209       observer_->OnDoneReading();
    210       observer_ = nullptr;
    211     }
    212     watcher_.reset();
    213     callback_task_runner_->PostTask(
    214         FROM_HERE, base::BindOnce(std::move(callback_),
    215                                   std::move(producer_handle_), result));
    216   }
    217 
    218   const scoped_refptr<base::SequencedTaskRunner> callback_task_runner_;
    219 
    220   // State which is effectively owned and used only on the file sequence.
    221   ScopedDataPipeProducerHandle producer_handle_;
    222   base::File file_;
    223   size_t max_bytes_ = 0;
    224   size_t bytes_transferred_ = 0;
    225   CompletionCallback callback_;
    226   std::unique_ptr<SimpleWatcher> watcher_;
    227 
    228   // Guards |is_cancelled_|.
    229   base::Lock lock_;
    230   bool is_cancelled_ = false;
    231   std::unique_ptr<Observer> observer_;
    232 
    233   DISALLOW_COPY_AND_ASSIGN(FileSequenceState);
    234 };
    235 
    236 FileDataPipeProducer::FileDataPipeProducer(
    237     ScopedDataPipeProducerHandle producer,
    238     std::unique_ptr<Observer> observer)
    239     : producer_(std::move(producer)),
    240       observer_(std::move(observer)),
    241       weak_factory_(this) {}
    242 
    243 FileDataPipeProducer::~FileDataPipeProducer() {
    244   if (file_sequence_state_)
    245     file_sequence_state_->Cancel();
    246 }
    247 
    248 void FileDataPipeProducer::WriteFromFile(base::File file,
    249                                          CompletionCallback callback) {
    250   WriteFromFile(std::move(file), std::numeric_limits<size_t>::max(),
    251                 std::move(callback));
    252 }
    253 
    254 void FileDataPipeProducer::WriteFromFile(base::File file,
    255                                          size_t max_bytes,
    256                                          CompletionCallback callback) {
    257   InitializeNewRequest(std::move(callback));
    258   file_sequence_state_->StartFromFile(std::move(file), max_bytes);
    259 }
    260 
    261 void FileDataPipeProducer::WriteFromPath(const base::FilePath& path,
    262                                          CompletionCallback callback) {
    263   InitializeNewRequest(std::move(callback));
    264   file_sequence_state_->StartFromPath(path);
    265 }
    266 
    267 void FileDataPipeProducer::InitializeNewRequest(CompletionCallback callback) {
    268   DCHECK(!file_sequence_state_);
    269 
    270   LOG(FATAL) << "unsupported in libchrome";
    271   // auto file_task_runner = base::CreateSequencedTaskRunnerWithTraits(
    272   //     {base::MayBlock(), base::TaskPriority::BACKGROUND});
    273   // file_sequence_state_ = new FileSequenceState(
    274   //     std::move(producer_), file_task_runner,
    275   //     base::BindOnce(&FileDataPipeProducer::OnWriteComplete,
    276   //                    weak_factory_.GetWeakPtr(), std::move(callback)),
    277   //     base::SequencedTaskRunnerHandle::Get(), std::move(observer_));
    278 }
    279 
    280 void FileDataPipeProducer::OnWriteComplete(
    281     CompletionCallback callback,
    282     ScopedDataPipeProducerHandle producer,
    283     MojoResult ready_result) {
    284   producer_ = std::move(producer);
    285   file_sequence_state_ = nullptr;
    286   std::move(callback).Run(ready_result);
    287 }
    288 
    289 }  // namespace mojo
    290