1 // Copyright 2017 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/public/cpp/system/file_data_pipe_producer.h" 6 7 #include <algorithm> 8 #include <limits> 9 #include <memory> 10 #include <utility> 11 12 #include "base/bind.h" 13 #include "base/callback.h" 14 #include "base/location.h" 15 #include "base/memory/ref_counted_delete_on_sequence.h" 16 #include "base/numerics/safe_conversions.h" 17 #include "base/sequenced_task_runner.h" 18 #include "base/synchronization/lock.h" 19 #include "base/task_scheduler/post_task.h" 20 #include "base/threading/sequenced_task_runner_handle.h" 21 #include "mojo/public/cpp/system/simple_watcher.h" 22 23 namespace mojo { 24 25 namespace { 26 27 // No good reason not to attempt very large pipe transactions in case the data 28 // pipe in use has a very large capacity available, so we default to trying 29 // 64 MB chunks whenever a producer is writable. 30 constexpr uint32_t kDefaultMaxReadSize = 64 * 1024 * 1024; 31 32 MojoResult FileErrorToMojoResult(base::File::Error error) { 33 switch (error) { 34 case base::File::FILE_OK: 35 return MOJO_RESULT_OK; 36 case base::File::FILE_ERROR_NOT_FOUND: 37 return MOJO_RESULT_NOT_FOUND; 38 case base::File::FILE_ERROR_SECURITY: 39 case base::File::FILE_ERROR_ACCESS_DENIED: 40 return MOJO_RESULT_PERMISSION_DENIED; 41 case base::File::FILE_ERROR_TOO_MANY_OPENED: 42 case base::File::FILE_ERROR_NO_MEMORY: 43 return MOJO_RESULT_RESOURCE_EXHAUSTED; 44 case base::File::FILE_ERROR_ABORT: 45 return MOJO_RESULT_ABORTED; 46 default: 47 return MOJO_RESULT_UNKNOWN; 48 } 49 } 50 51 } // namespace 52 53 class FileDataPipeProducer::FileSequenceState 54 : public base::RefCountedDeleteOnSequence<FileSequenceState> { 55 public: 56 using CompletionCallback = 57 base::OnceCallback<void(ScopedDataPipeProducerHandle producer, 58 MojoResult result)>; 59 60 FileSequenceState( 61 ScopedDataPipeProducerHandle producer_handle, 62 scoped_refptr<base::SequencedTaskRunner> file_task_runner, 63 CompletionCallback callback, 64 scoped_refptr<base::SequencedTaskRunner> callback_task_runner, 65 std::unique_ptr<Observer> observer) 66 : base::RefCountedDeleteOnSequence<FileSequenceState>( 67 std::move(file_task_runner)), 68 callback_task_runner_(std::move(callback_task_runner)), 69 producer_handle_(std::move(producer_handle)), 70 callback_(std::move(callback)), 71 observer_(std::move(observer)) {} 72 73 void Cancel() { 74 base::AutoLock lock(lock_); 75 is_cancelled_ = true; 76 } 77 78 void StartFromFile(base::File file, size_t max_bytes) { 79 owning_task_runner()->PostTask( 80 FROM_HERE, 81 base::BindOnce(&FileSequenceState::StartFromFileOnFileSequence, this, 82 std::move(file), max_bytes)); 83 } 84 85 void StartFromPath(const base::FilePath& path) { 86 owning_task_runner()->PostTask( 87 FROM_HERE, 88 base::BindOnce(&FileSequenceState::StartFromPathOnFileSequence, this, 89 path)); 90 } 91 92 private: 93 friend class base::DeleteHelper<FileSequenceState>; 94 friend class base::RefCountedDeleteOnSequence<FileSequenceState>; 95 96 ~FileSequenceState() = default; 97 98 void StartFromFileOnFileSequence(base::File file, size_t max_bytes) { 99 if (file.error_details() != base::File::FILE_OK) { 100 Finish(FileErrorToMojoResult(file.error_details())); 101 return; 102 } 103 file_ = std::move(file); 104 max_bytes_ = max_bytes; 105 TransferSomeBytes(); 106 if (producer_handle_.is_valid()) { 107 // If we didn't nail it all on the first transaction attempt, setup a 108 // watcher and complete the read asynchronously. 109 watcher_ = std::make_unique<SimpleWatcher>( 110 FROM_HERE, SimpleWatcher::ArmingPolicy::AUTOMATIC, 111 base::SequencedTaskRunnerHandle::Get()); 112 watcher_->Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, 113 MOJO_WATCH_CONDITION_SATISFIED, 114 base::Bind(&FileSequenceState::OnHandleReady, this)); 115 } 116 } 117 118 void StartFromPathOnFileSequence(const base::FilePath& path) { 119 StartFromFileOnFileSequence( 120 base::File(path, base::File::FLAG_OPEN | base::File::FLAG_READ), 121 std::numeric_limits<size_t>::max()); 122 } 123 124 void OnHandleReady(MojoResult result, const HandleSignalsState& state) { 125 { 126 // Stop ourselves from doing redundant work if we've been cancelled from 127 // another thread. Note that we do not rely on this for any kind of thread 128 // safety concerns. 129 base::AutoLock lock(lock_); 130 if (is_cancelled_) 131 return; 132 } 133 134 if (result != MOJO_RESULT_OK) { 135 // Either the consumer pipe has been closed or something terrible 136 // happened. In any case, we'll never be able to write more data. 137 Finish(result); 138 return; 139 } 140 141 TransferSomeBytes(); 142 } 143 144 void TransferSomeBytes() { 145 while (true) { 146 // Lock as much of the pipe as we can. 147 void* pipe_buffer; 148 uint32_t size = kDefaultMaxReadSize; 149 MojoResult result = producer_handle_->BeginWriteData( 150 &pipe_buffer, &size, MOJO_WRITE_DATA_FLAG_NONE); 151 if (result == MOJO_RESULT_SHOULD_WAIT) 152 return; 153 if (result != MOJO_RESULT_OK) { 154 Finish(result); 155 return; 156 } 157 158 // Attempt to read that many bytes from the file, directly into the data 159 // pipe. Note that while |max_bytes_remaining| may be very large, the 160 // length we attempt read is bounded by the much smaller 161 // |kDefaultMaxReadSize| via |size|. 162 DCHECK(base::IsValueInRangeForNumericType<int>(size)); 163 const size_t max_bytes_remaining = max_bytes_ - bytes_transferred_; 164 int attempted_read_size = static_cast<int>( 165 std::min(static_cast<size_t>(size), max_bytes_remaining)); 166 int read_size = file_.ReadAtCurrentPos(static_cast<char*>(pipe_buffer), 167 attempted_read_size); 168 base::File::Error read_error; 169 if (read_size < 0) { 170 read_error = base::File::GetLastFileError(); 171 DCHECK_NE(base::File::FILE_OK, read_error); 172 if (observer_) 173 observer_->OnBytesRead(pipe_buffer, 0u, read_error); 174 } else { 175 read_error = base::File::FILE_OK; 176 if (observer_) { 177 observer_->OnBytesRead(pipe_buffer, static_cast<size_t>(read_size), 178 base::File::FILE_OK); 179 } 180 } 181 producer_handle_->EndWriteData( 182 read_size >= 0 ? static_cast<uint32_t>(read_size) : 0); 183 184 if (read_size < 0) { 185 Finish(FileErrorToMojoResult(read_error)); 186 return; 187 } 188 189 bytes_transferred_ += read_size; 190 DCHECK_LE(bytes_transferred_, max_bytes_); 191 192 if (read_size < attempted_read_size) { 193 // ReadAtCurrentPos makes a best effort to read all requested bytes. We 194 // reasonably assume if it fails to read what we ask for, we've hit EOF. 195 Finish(MOJO_RESULT_OK); 196 return; 197 } 198 199 if (bytes_transferred_ == max_bytes_) { 200 // We've read as much as we were asked to read. 201 Finish(MOJO_RESULT_OK); 202 return; 203 } 204 } 205 } 206 207 void Finish(MojoResult result) { 208 if (observer_) { 209 observer_->OnDoneReading(); 210 observer_ = nullptr; 211 } 212 watcher_.reset(); 213 callback_task_runner_->PostTask( 214 FROM_HERE, base::BindOnce(std::move(callback_), 215 std::move(producer_handle_), result)); 216 } 217 218 const scoped_refptr<base::SequencedTaskRunner> callback_task_runner_; 219 220 // State which is effectively owned and used only on the file sequence. 221 ScopedDataPipeProducerHandle producer_handle_; 222 base::File file_; 223 size_t max_bytes_ = 0; 224 size_t bytes_transferred_ = 0; 225 CompletionCallback callback_; 226 std::unique_ptr<SimpleWatcher> watcher_; 227 228 // Guards |is_cancelled_|. 229 base::Lock lock_; 230 bool is_cancelled_ = false; 231 std::unique_ptr<Observer> observer_; 232 233 DISALLOW_COPY_AND_ASSIGN(FileSequenceState); 234 }; 235 236 FileDataPipeProducer::FileDataPipeProducer( 237 ScopedDataPipeProducerHandle producer, 238 std::unique_ptr<Observer> observer) 239 : producer_(std::move(producer)), 240 observer_(std::move(observer)), 241 weak_factory_(this) {} 242 243 FileDataPipeProducer::~FileDataPipeProducer() { 244 if (file_sequence_state_) 245 file_sequence_state_->Cancel(); 246 } 247 248 void FileDataPipeProducer::WriteFromFile(base::File file, 249 CompletionCallback callback) { 250 WriteFromFile(std::move(file), std::numeric_limits<size_t>::max(), 251 std::move(callback)); 252 } 253 254 void FileDataPipeProducer::WriteFromFile(base::File file, 255 size_t max_bytes, 256 CompletionCallback callback) { 257 InitializeNewRequest(std::move(callback)); 258 file_sequence_state_->StartFromFile(std::move(file), max_bytes); 259 } 260 261 void FileDataPipeProducer::WriteFromPath(const base::FilePath& path, 262 CompletionCallback callback) { 263 InitializeNewRequest(std::move(callback)); 264 file_sequence_state_->StartFromPath(path); 265 } 266 267 void FileDataPipeProducer::InitializeNewRequest(CompletionCallback callback) { 268 DCHECK(!file_sequence_state_); 269 270 LOG(FATAL) << "unsupported in libchrome"; 271 // auto file_task_runner = base::CreateSequencedTaskRunnerWithTraits( 272 // {base::MayBlock(), base::TaskPriority::BACKGROUND}); 273 // file_sequence_state_ = new FileSequenceState( 274 // std::move(producer_), file_task_runner, 275 // base::BindOnce(&FileDataPipeProducer::OnWriteComplete, 276 // weak_factory_.GetWeakPtr(), std::move(callback)), 277 // base::SequencedTaskRunnerHandle::Get(), std::move(observer_)); 278 } 279 280 void FileDataPipeProducer::OnWriteComplete( 281 CompletionCallback callback, 282 ScopedDataPipeProducerHandle producer, 283 MojoResult ready_result) { 284 producer_ = std::move(producer); 285 file_sequence_state_ = nullptr; 286 std::move(callback).Run(ready_result); 287 } 288 289 } // namespace mojo 290