1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/media_galleries/fileapi/readahead_file_stream_reader.h" 6 7 #include <algorithm> 8 9 #include "base/message_loop/message_loop.h" 10 #include "base/numerics/safe_conversions.h" 11 #include "net/base/io_buffer.h" 12 #include "net/base/net_errors.h" 13 14 using webkit_blob::FileStreamReader; 15 16 namespace { 17 18 const size_t kDesiredNumberOfBuffers = 2; // So we are always one buffer ahead. 19 const int kBufferSize = 1024*1024; // 1MB to minimize transaction costs. 20 21 } // namespace 22 23 ReadaheadFileStreamReader::ReadaheadFileStreamReader(FileStreamReader* source) 24 : source_(source), 25 source_error_(0), 26 source_has_pending_read_(false), 27 weak_factory_(this) { 28 } 29 30 ReadaheadFileStreamReader::~ReadaheadFileStreamReader() {} 31 32 int ReadaheadFileStreamReader::Read( 33 net::IOBuffer* buf, int buf_len, const net::CompletionCallback& callback) { 34 DCHECK(!pending_sink_buffer_.get()); 35 DCHECK(pending_read_callback_.is_null()); 36 37 ReadFromSourceIfNeeded(); 38 39 scoped_refptr<net::DrainableIOBuffer> sink = 40 new net::DrainableIOBuffer(buf, buf_len); 41 int result = FinishReadFromCacheOrStoredError(sink); 42 43 // We are waiting for an source read to complete, so save the request. 44 if (result == net::ERR_IO_PENDING) { 45 DCHECK(!pending_sink_buffer_.get()); 46 DCHECK(pending_read_callback_.is_null()); 47 pending_sink_buffer_ = sink; 48 pending_read_callback_ = callback; 49 } 50 51 return result; 52 } 53 54 int64 ReadaheadFileStreamReader::GetLength( 55 const net::Int64CompletionCallback& callback) { 56 return source_->GetLength(callback); 57 } 58 59 int ReadaheadFileStreamReader::FinishReadFromCacheOrStoredError( 60 net::DrainableIOBuffer* sink) { 61 // If we don't have any ready cache, return the pending read code, or 62 // the stored error code. 63 if (buffers_.empty()) { 64 if (source_.get()) { 65 DCHECK(source_has_pending_read_); 66 return net::ERR_IO_PENDING; 67 } else { 68 return source_error_; 69 } 70 } 71 72 while (sink->BytesRemaining() > 0 && !buffers_.empty()) { 73 net::DrainableIOBuffer* source_buffer = buffers_.front().get(); 74 75 DCHECK(source_buffer->BytesRemaining() > 0); 76 77 int copy_len = std::min(source_buffer->BytesRemaining(), 78 sink->BytesRemaining()); 79 std::copy(source_buffer->data(), source_buffer->data() + copy_len, 80 sink->data()); 81 82 source_buffer->DidConsume(copy_len); 83 sink->DidConsume(copy_len); 84 85 if (source_buffer->BytesRemaining() == 0) { 86 buffers_.pop(); 87 88 // Get a new buffer to replace the one we just used up. 89 ReadFromSourceIfNeeded(); 90 } 91 } 92 93 return sink->BytesConsumed(); 94 } 95 96 void ReadaheadFileStreamReader::ReadFromSourceIfNeeded() { 97 if (!source_.get() || source_has_pending_read_ || 98 buffers_.size() >= kDesiredNumberOfBuffers) { 99 return; 100 } 101 102 source_has_pending_read_ = true; 103 104 scoped_refptr<net::IOBuffer> buf(new net::IOBuffer(kBufferSize)); 105 int result = source_->Read( 106 buf, 107 kBufferSize, 108 base::Bind(&ReadaheadFileStreamReader::OnFinishReadFromSource, 109 weak_factory_.GetWeakPtr(), buf)); 110 111 if (result != net::ERR_IO_PENDING) 112 OnFinishReadFromSource(buf, result); 113 } 114 115 void ReadaheadFileStreamReader::OnFinishReadFromSource(net::IOBuffer* buf, 116 int result) { 117 DCHECK(result != net::ERR_IO_PENDING); 118 DCHECK(source_has_pending_read_); 119 source_has_pending_read_ = false; 120 121 // Either store the data read from |source_|, or store the error code. 122 if (result > 0) { 123 scoped_refptr<net::DrainableIOBuffer> drainable_buffer( 124 new net::DrainableIOBuffer(buf, result)); 125 buffers_.push(drainable_buffer); 126 ReadFromSourceIfNeeded(); 127 } else { 128 source_.reset(); 129 source_error_ = result; 130 } 131 132 // If there's a read request waiting for the source FileStreamReader to 133 // finish reading, fulfill that request now from the cache or stored error. 134 if (pending_sink_buffer_.get()) { 135 DCHECK(!pending_read_callback_.is_null()); 136 137 // Free the pending callback before running it, as the callback often 138 // dispatches another read. 139 scoped_refptr<net::DrainableIOBuffer> sink = pending_sink_buffer_; 140 pending_sink_buffer_ = NULL; 141 net::CompletionCallback completion_callback = pending_read_callback_; 142 pending_read_callback_.Reset(); 143 144 completion_callback.Run(FinishReadFromCacheOrStoredError(sink)); 145 } 146 } 147