1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/chromeos/drive/drive_file_stream_reader.h" 6 7 #include <algorithm> 8 #include <cstring> 9 10 #include "base/callback_helpers.h" 11 #include "base/logging.h" 12 #include "base/sequenced_task_runner.h" 13 #include "chrome/browser/chromeos/drive/drive.pb.h" 14 #include "chrome/browser/chromeos/drive/file_system_interface.h" 15 #include "chrome/browser/chromeos/drive/local_file_reader.h" 16 #include "chrome/browser/google_apis/task_util.h" 17 #include "content/public/browser/browser_thread.h" 18 #include "net/base/io_buffer.h" 19 #include "net/base/net_errors.h" 20 #include "net/http/http_byte_range.h" 21 22 using content::BrowserThread; 23 24 namespace drive { 25 namespace { 26 27 // Converts FileError code to net::Error code. 28 int FileErrorToNetError(FileError error) { 29 return net::PlatformFileErrorToNetError(FileErrorToPlatformError(error)); 30 } 31 32 // Runs task on UI thread. 33 void RunTaskOnUIThread(const base::Closure& task) { 34 google_apis::RunTaskOnThread( 35 BrowserThread::GetMessageLoopProxyForThread(BrowserThread::UI), task); 36 } 37 38 } // namespace 39 40 namespace internal { 41 namespace { 42 43 // Copies the content in |pending_data| into |buffer| at most 44 // |buffer_length| bytes, and erases the copied data from 45 // |pending_data|. Returns the number of copied bytes. 46 int ReadInternal(ScopedVector<std::string>* pending_data, 47 net::IOBuffer* buffer, int buffer_length) { 48 size_t index = 0; 49 int offset = 0; 50 for (; index < pending_data->size() && offset < buffer_length; ++index) { 51 const std::string& chunk = *(*pending_data)[index]; 52 DCHECK(!chunk.empty()); 53 54 size_t bytes_to_read = std::min( 55 chunk.size(), static_cast<size_t>(buffer_length - offset)); 56 std::memmove(buffer->data() + offset, chunk.data(), bytes_to_read); 57 offset += bytes_to_read; 58 if (bytes_to_read < chunk.size()) { 59 // The chunk still has some remaining data. 60 // So remove leading (copied) bytes, and quit the loop so that 61 // the remaining data won't be deleted in the following erase(). 62 (*pending_data)[index]->erase(0, bytes_to_read); 63 break; 64 } 65 } 66 67 // Consume the copied data. 68 pending_data->erase(pending_data->begin(), pending_data->begin() + index); 69 70 return offset; 71 } 72 73 } // namespace 74 75 LocalReaderProxy::LocalReaderProxy( 76 scoped_ptr<util::LocalFileReader> file_reader, int64 length) 77 : file_reader_(file_reader.Pass()), 78 remaining_length_(length), 79 weak_ptr_factory_(this) { 80 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 81 DCHECK(file_reader_); 82 } 83 84 LocalReaderProxy::~LocalReaderProxy() { 85 } 86 87 int LocalReaderProxy::Read(net::IOBuffer* buffer, int buffer_length, 88 const net::CompletionCallback& callback) { 89 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 90 DCHECK(file_reader_); 91 92 if (buffer_length > remaining_length_) { 93 // Here, narrowing is safe. 94 buffer_length = static_cast<int>(remaining_length_); 95 } 96 97 file_reader_->Read(buffer, buffer_length, 98 base::Bind(&LocalReaderProxy::OnReadCompleted, 99 weak_ptr_factory_.GetWeakPtr(), callback)); 100 return net::ERR_IO_PENDING; 101 } 102 103 void LocalReaderProxy::OnGetContent(scoped_ptr<std::string> data) { 104 // This method should never be called, because no data should be received 105 // from the network during the reading of local-cache file. 106 NOTREACHED(); 107 } 108 109 void LocalReaderProxy::OnCompleted(FileError error) { 110 // If this method is called, no network error should be happened. 111 DCHECK_EQ(FILE_ERROR_OK, error); 112 } 113 114 void LocalReaderProxy::OnReadCompleted(const net::CompletionCallback& callback, 115 int read_result) { 116 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 117 DCHECK(file_reader_); 118 119 if (read_result >= 0) { 120 // |read_result| bytes data is read. 121 DCHECK_LE(read_result, remaining_length_); 122 remaining_length_ -= read_result; 123 } else { 124 // An error occurs. Close the |file_reader_|. 125 file_reader_.reset(); 126 } 127 callback.Run(read_result); 128 } 129 130 NetworkReaderProxy::NetworkReaderProxy( 131 int64 offset, 132 int64 content_length, 133 const base::Closure& job_canceller) 134 : remaining_offset_(offset), 135 remaining_content_length_(content_length), 136 error_code_(net::OK), 137 buffer_length_(0), 138 job_canceller_(job_canceller) { 139 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 140 } 141 142 NetworkReaderProxy::~NetworkReaderProxy() { 143 if (!job_canceller_.is_null()) { 144 job_canceller_.Run(); 145 } 146 } 147 148 int NetworkReaderProxy::Read(net::IOBuffer* buffer, int buffer_length, 149 const net::CompletionCallback& callback) { 150 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 151 // Check if there is no pending Read operation. 152 DCHECK(!buffer_.get()); 153 DCHECK_EQ(buffer_length_, 0); 154 DCHECK(callback_.is_null()); 155 // Validate the arguments. 156 DCHECK(buffer); 157 DCHECK_GT(buffer_length, 0); 158 DCHECK(!callback.is_null()); 159 160 if (error_code_ != net::OK) { 161 // An error is already found. Return it immediately. 162 return error_code_; 163 } 164 165 if (remaining_content_length_ == 0) { 166 // If no more data, return immediately. 167 return 0; 168 } 169 170 if (buffer_length > remaining_content_length_) { 171 // Here, narrowing cast should be safe. 172 buffer_length = static_cast<int>(remaining_content_length_); 173 } 174 175 if (pending_data_.empty()) { 176 // No data is available. Keep the arguments, and return pending status. 177 buffer_ = buffer; 178 buffer_length_ = buffer_length; 179 callback_ = callback; 180 return net::ERR_IO_PENDING; 181 } 182 183 int result = ReadInternal(&pending_data_, buffer, buffer_length); 184 remaining_content_length_ -= result; 185 DCHECK_GE(remaining_content_length_, 0); 186 return result; 187 } 188 189 void NetworkReaderProxy::OnGetContent(scoped_ptr<std::string> data) { 190 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 191 DCHECK(data && !data->empty()); 192 193 if (remaining_offset_ >= static_cast<int64>(data->length())) { 194 // Skip unneeded leading data. 195 remaining_offset_ -= data->length(); 196 return; 197 } 198 199 if (remaining_offset_ > 0) { 200 // Erase unnecessary leading bytes. 201 data->erase(0, static_cast<size_t>(remaining_offset_)); 202 remaining_offset_ = 0; 203 } 204 205 pending_data_.push_back(data.release()); 206 if (!buffer_.get()) { 207 // No pending Read operation. 208 return; 209 } 210 211 int result = ReadInternal(&pending_data_, buffer_.get(), buffer_length_); 212 remaining_content_length_ -= result; 213 DCHECK_GE(remaining_content_length_, 0); 214 215 buffer_ = NULL; 216 buffer_length_ = 0; 217 DCHECK(!callback_.is_null()); 218 base::ResetAndReturn(&callback_).Run(result); 219 } 220 221 void NetworkReaderProxy::OnCompleted(FileError error) { 222 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 223 // The downloading is completed, so we do not need to cancel the job 224 // in the destructor. 225 job_canceller_.Reset(); 226 227 if (error == FILE_ERROR_OK) { 228 return; 229 } 230 231 error_code_ = FileErrorToNetError(error); 232 pending_data_.clear(); 233 234 if (callback_.is_null()) { 235 // No pending Read operation. 236 return; 237 } 238 239 buffer_ = NULL; 240 buffer_length_ = 0; 241 base::ResetAndReturn(&callback_).Run(error_code_); 242 } 243 244 } // namespace internal 245 246 namespace { 247 248 // Calls FileSystemInterface::GetFileContentByPath if the file system 249 // is available. If not, the |completion_callback| is invoked with 250 // FILE_ERROR_FAILED. 251 void GetFileContentByPathOnUIThread( 252 const DriveFileStreamReader::FileSystemGetter& file_system_getter, 253 const base::FilePath& drive_file_path, 254 const GetFileContentInitializedCallback& initialized_callback, 255 const google_apis::GetContentCallback& get_content_callback, 256 const FileOperationCallback& completion_callback) { 257 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 258 259 FileSystemInterface* file_system = file_system_getter.Run(); 260 if (!file_system) { 261 completion_callback.Run(FILE_ERROR_FAILED); 262 return; 263 } 264 265 file_system->GetFileContentByPath(drive_file_path, 266 initialized_callback, 267 get_content_callback, 268 completion_callback); 269 } 270 271 // Helper to run FileSystemInterface::GetFileContentByPath on UI thread. 272 void GetFileContentByPath( 273 const DriveFileStreamReader::FileSystemGetter& file_system_getter, 274 const base::FilePath& drive_file_path, 275 const GetFileContentInitializedCallback& initialized_callback, 276 const google_apis::GetContentCallback& get_content_callback, 277 const FileOperationCallback& completion_callback) { 278 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 279 280 BrowserThread::PostTask( 281 BrowserThread::UI, 282 FROM_HERE, 283 base::Bind(&GetFileContentByPathOnUIThread, 284 file_system_getter, 285 drive_file_path, 286 google_apis::CreateRelayCallback(initialized_callback), 287 google_apis::CreateRelayCallback(get_content_callback), 288 google_apis::CreateRelayCallback(completion_callback))); 289 } 290 291 } // namespace 292 293 DriveFileStreamReader::DriveFileStreamReader( 294 const FileSystemGetter& file_system_getter, 295 base::SequencedTaskRunner* file_task_runner) 296 : file_system_getter_(file_system_getter), 297 file_task_runner_(file_task_runner), 298 weak_ptr_factory_(this) { 299 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 300 } 301 302 DriveFileStreamReader::~DriveFileStreamReader() { 303 } 304 305 bool DriveFileStreamReader::IsInitialized() const { 306 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 307 return reader_proxy_.get() != NULL; 308 } 309 310 void DriveFileStreamReader::Initialize( 311 const base::FilePath& drive_file_path, 312 const net::HttpByteRange& byte_range, 313 const InitializeCompletionCallback& callback) { 314 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 315 DCHECK(!callback.is_null()); 316 317 GetFileContentByPath( 318 file_system_getter_, 319 drive_file_path, 320 base::Bind(&DriveFileStreamReader 321 ::InitializeAfterGetFileContentByPathInitialized, 322 weak_ptr_factory_.GetWeakPtr(), 323 byte_range, 324 callback), 325 base::Bind(&DriveFileStreamReader::OnGetContent, 326 weak_ptr_factory_.GetWeakPtr()), 327 base::Bind(&DriveFileStreamReader::OnGetFileContentByPathCompletion, 328 weak_ptr_factory_.GetWeakPtr(), 329 callback)); 330 } 331 332 int DriveFileStreamReader::Read(net::IOBuffer* buffer, int buffer_length, 333 const net::CompletionCallback& callback) { 334 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 335 DCHECK(reader_proxy_); 336 DCHECK(buffer); 337 DCHECK(!callback.is_null()); 338 return reader_proxy_->Read(buffer, buffer_length, callback); 339 } 340 341 void DriveFileStreamReader::InitializeAfterGetFileContentByPathInitialized( 342 const net::HttpByteRange& in_byte_range, 343 const InitializeCompletionCallback& callback, 344 FileError error, 345 scoped_ptr<ResourceEntry> entry, 346 const base::FilePath& local_cache_file_path, 347 const base::Closure& ui_cancel_download_closure) { 348 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 349 350 if (error != FILE_ERROR_OK) { 351 callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>()); 352 return; 353 } 354 DCHECK(entry); 355 356 net::HttpByteRange byte_range = in_byte_range; 357 if (!byte_range.ComputeBounds(entry->file_info().size())) { 358 // If |byte_range| is invalid (e.g. out of bounds), return with an error. 359 // At the same time, we cancel the in-flight downloading operation if 360 // needed and and invalidate weak pointers so that we won't 361 // receive unwanted callbacks. 362 if (!ui_cancel_download_closure.is_null()) 363 ui_cancel_download_closure.Run(); 364 weak_ptr_factory_.InvalidateWeakPtrs(); 365 callback.Run( 366 net::ERR_REQUEST_RANGE_NOT_SATISFIABLE, scoped_ptr<ResourceEntry>()); 367 return; 368 } 369 370 // Note: both boundary of |byte_range| are inclusive. 371 int64 range_length = 372 byte_range.last_byte_position() - byte_range.first_byte_position() + 1; 373 DCHECK_GE(range_length, 0); 374 375 if (local_cache_file_path.empty()) { 376 // The file is not cached, and being downloaded. 377 DCHECK(!ui_cancel_download_closure.is_null()); 378 reader_proxy_.reset( 379 new internal::NetworkReaderProxy( 380 byte_range.first_byte_position(), range_length, 381 base::Bind(&RunTaskOnUIThread, ui_cancel_download_closure))); 382 callback.Run(net::OK, entry.Pass()); 383 return; 384 } 385 386 // Otherwise, open the stream for file. 387 scoped_ptr<util::LocalFileReader> file_reader( 388 new util::LocalFileReader(file_task_runner_.get())); 389 util::LocalFileReader* file_reader_ptr = file_reader.get(); 390 file_reader_ptr->Open( 391 local_cache_file_path, 392 byte_range.first_byte_position(), 393 base::Bind( 394 &DriveFileStreamReader::InitializeAfterLocalFileOpen, 395 weak_ptr_factory_.GetWeakPtr(), 396 range_length, 397 callback, 398 base::Passed(&entry), 399 base::Passed(&file_reader))); 400 } 401 402 void DriveFileStreamReader::InitializeAfterLocalFileOpen( 403 int64 length, 404 const InitializeCompletionCallback& callback, 405 scoped_ptr<ResourceEntry> entry, 406 scoped_ptr<util::LocalFileReader> file_reader, 407 int open_result) { 408 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 409 410 if (open_result != net::OK) { 411 callback.Run(net::ERR_FAILED, scoped_ptr<ResourceEntry>()); 412 return; 413 } 414 415 reader_proxy_.reset( 416 new internal::LocalReaderProxy(file_reader.Pass(), length)); 417 callback.Run(net::OK, entry.Pass()); 418 } 419 420 void DriveFileStreamReader::OnGetContent(google_apis::GDataErrorCode error_code, 421 scoped_ptr<std::string> data) { 422 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 423 DCHECK(reader_proxy_); 424 reader_proxy_->OnGetContent(data.Pass()); 425 } 426 427 void DriveFileStreamReader::OnGetFileContentByPathCompletion( 428 const InitializeCompletionCallback& callback, 429 FileError error) { 430 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 431 432 if (reader_proxy_) { 433 // If the proxy object available, send the error to it. 434 reader_proxy_->OnCompleted(error); 435 } else { 436 // Here the proxy object is not yet available. 437 // There are two cases. 1) Some error happens during the initialization. 438 // 2) the cache file is found, but the proxy object is not *yet* 439 // initialized because the file is being opened. 440 // We are interested in 1) only. The callback for 2) will be called 441 // after opening the file is completed. 442 // Note: due to the same reason, LocalReaderProxy::OnCompleted may 443 // or may not be called. This is timing issue, and it is difficult to avoid 444 // unfortunately. 445 if (error != FILE_ERROR_OK) { 446 callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>()); 447 } 448 } 449 } 450 451 } // namespace drive 452