1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/base/upload_data_stream.h" 6 7 #include "base/logging.h" 8 #include "net/base/io_buffer.h" 9 #include "net/base/net_errors.h" 10 #include "net/base/upload_bytes_element_reader.h" 11 #include "net/base/upload_element_reader.h" 12 13 namespace net { 14 15 UploadDataStream::UploadDataStream( 16 ScopedVector<UploadElementReader> element_readers, 17 int64 identifier) 18 : element_readers_(element_readers.Pass()), 19 element_index_(0), 20 total_size_(0), 21 current_position_(0), 22 identifier_(identifier), 23 is_chunked_(false), 24 last_chunk_appended_(false), 25 read_failed_(false), 26 initialized_successfully_(false), 27 weak_ptr_factory_(this) { 28 } 29 30 UploadDataStream::UploadDataStream(Chunked /*chunked*/, int64 identifier) 31 : element_index_(0), 32 total_size_(0), 33 current_position_(0), 34 identifier_(identifier), 35 is_chunked_(true), 36 last_chunk_appended_(false), 37 read_failed_(false), 38 initialized_successfully_(false), 39 weak_ptr_factory_(this) { 40 } 41 42 UploadDataStream::~UploadDataStream() { 43 } 44 45 UploadDataStream* UploadDataStream::CreateWithReader( 46 scoped_ptr<UploadElementReader> reader, 47 int64 identifier) { 48 ScopedVector<UploadElementReader> readers; 49 readers.push_back(reader.release()); 50 return new UploadDataStream(readers.Pass(), identifier); 51 } 52 53 int UploadDataStream::Init(const CompletionCallback& callback) { 54 Reset(); 55 return InitInternal(0, callback); 56 } 57 58 int UploadDataStream::Read(IOBuffer* buf, 59 int buf_len, 60 const CompletionCallback& callback) { 61 DCHECK(initialized_successfully_); 62 DCHECK_GT(buf_len, 0); 63 return ReadInternal(new DrainableIOBuffer(buf, buf_len), callback); 64 } 65 66 bool UploadDataStream::IsEOF() const { 67 DCHECK(initialized_successfully_); 68 if (!is_chunked_) 69 return current_position_ == total_size_; 70 71 // If the upload data is chunked, check if the last chunk is appended and all 72 // elements are consumed. 73 return element_index_ == element_readers_.size() && last_chunk_appended_; 74 } 75 76 bool UploadDataStream::IsInMemory() const { 77 // Chunks are in memory, but UploadData does not have all the chunks at 78 // once. Chunks are provided progressively with AppendChunk() as chunks 79 // are ready. Check is_chunked_ here, rather than relying on the loop 80 // below, as there is a case that is_chunked_ is set to true, but the 81 // first chunk is not yet delivered. 82 if (is_chunked_) 83 return false; 84 85 for (size_t i = 0; i < element_readers_.size(); ++i) { 86 if (!element_readers_[i]->IsInMemory()) 87 return false; 88 } 89 return true; 90 } 91 92 void UploadDataStream::AppendChunk(const char* bytes, 93 int bytes_len, 94 bool is_last_chunk) { 95 DCHECK(is_chunked_); 96 DCHECK(!last_chunk_appended_); 97 last_chunk_appended_ = is_last_chunk; 98 99 // Initialize a reader for the newly appended chunk. We leave |total_size_| at 100 // zero, since for chunked uploads, we may not know the total size. 101 std::vector<char> data(bytes, bytes + bytes_len); 102 UploadElementReader* reader = new UploadOwnedBytesElementReader(&data); 103 const int rv = reader->Init(net::CompletionCallback()); 104 DCHECK_EQ(OK, rv); 105 element_readers_.push_back(reader); 106 107 // Resume pending read. 108 if (!pending_chunked_read_callback_.is_null()) { 109 base::Closure callback = pending_chunked_read_callback_; 110 pending_chunked_read_callback_.Reset(); 111 callback.Run(); 112 } 113 } 114 115 void UploadDataStream::Reset() { 116 weak_ptr_factory_.InvalidateWeakPtrs(); 117 pending_chunked_read_callback_.Reset(); 118 initialized_successfully_ = false; 119 read_failed_ = false; 120 current_position_ = 0; 121 total_size_ = 0; 122 element_index_ = 0; 123 } 124 125 int UploadDataStream::InitInternal(int start_index, 126 const CompletionCallback& callback) { 127 DCHECK(!initialized_successfully_); 128 129 // Call Init() for all elements. 130 for (size_t i = start_index; i < element_readers_.size(); ++i) { 131 UploadElementReader* reader = element_readers_[i]; 132 // When new_result is ERR_IO_PENDING, InitInternal() will be called 133 // with start_index == i + 1 when reader->Init() finishes. 134 const int result = reader->Init( 135 base::Bind(&UploadDataStream::ResumePendingInit, 136 weak_ptr_factory_.GetWeakPtr(), 137 i + 1, 138 callback)); 139 if (result != OK) { 140 DCHECK(result != ERR_IO_PENDING || !callback.is_null()); 141 return result; 142 } 143 } 144 145 // Finalize initialization. 146 if (!is_chunked_) { 147 uint64 total_size = 0; 148 for (size_t i = 0; i < element_readers_.size(); ++i) { 149 UploadElementReader* reader = element_readers_[i]; 150 total_size += reader->GetContentLength(); 151 } 152 total_size_ = total_size; 153 } 154 initialized_successfully_ = true; 155 return OK; 156 } 157 158 void UploadDataStream::ResumePendingInit(int start_index, 159 const CompletionCallback& callback, 160 int previous_result) { 161 DCHECK(!initialized_successfully_); 162 DCHECK(!callback.is_null()); 163 DCHECK_NE(ERR_IO_PENDING, previous_result); 164 165 // Check the last result. 166 if (previous_result != OK) { 167 callback.Run(previous_result); 168 return; 169 } 170 171 const int result = InitInternal(start_index, callback); 172 if (result != ERR_IO_PENDING) 173 callback.Run(result); 174 } 175 176 int UploadDataStream::ReadInternal(scoped_refptr<DrainableIOBuffer> buf, 177 const CompletionCallback& callback) { 178 DCHECK(initialized_successfully_); 179 180 while (!read_failed_ && element_index_ < element_readers_.size()) { 181 UploadElementReader* reader = element_readers_[element_index_]; 182 183 if (reader->BytesRemaining() == 0) { 184 ++element_index_; 185 continue; 186 } 187 188 if (buf->BytesRemaining() == 0) 189 break; 190 191 int result = reader->Read( 192 buf.get(), 193 buf->BytesRemaining(), 194 base::Bind(base::IgnoreResult(&UploadDataStream::ResumePendingRead), 195 weak_ptr_factory_.GetWeakPtr(), 196 buf, 197 callback)); 198 if (result == ERR_IO_PENDING) { 199 DCHECK(!callback.is_null()); 200 return ERR_IO_PENDING; 201 } 202 ProcessReadResult(buf, result); 203 } 204 205 if (read_failed_) { 206 // Chunked transfers may only contain byte readers, so cannot have read 207 // failures. 208 DCHECK(!is_chunked_); 209 210 // If an error occured during read operation, then pad with zero. 211 // Otherwise the server will hang waiting for the rest of the data. 212 const int num_bytes_to_fill = 213 std::min(static_cast<uint64>(buf->BytesRemaining()), 214 size() - position() - buf->BytesConsumed()); 215 DCHECK_LE(0, num_bytes_to_fill); 216 memset(buf->data(), 0, num_bytes_to_fill); 217 buf->DidConsume(num_bytes_to_fill); 218 } 219 220 const int bytes_copied = buf->BytesConsumed(); 221 current_position_ += bytes_copied; 222 DCHECK(is_chunked_ || total_size_ >= current_position_); 223 224 if (is_chunked_ && !IsEOF() && bytes_copied == 0) { 225 DCHECK(!callback.is_null()); 226 DCHECK(pending_chunked_read_callback_.is_null()); 227 pending_chunked_read_callback_ = 228 base::Bind(&UploadDataStream::ResumePendingRead, 229 weak_ptr_factory_.GetWeakPtr(), 230 buf, 231 callback, 232 OK); 233 return ERR_IO_PENDING; 234 } 235 236 // Returning 0 is allowed only when IsEOF() == true. 237 DCHECK(bytes_copied != 0 || IsEOF()); 238 return bytes_copied; 239 } 240 241 void UploadDataStream::ResumePendingRead(scoped_refptr<DrainableIOBuffer> buf, 242 const CompletionCallback& callback, 243 int previous_result) { 244 DCHECK(!callback.is_null()); 245 246 ProcessReadResult(buf, previous_result); 247 248 const int result = ReadInternal(buf, callback); 249 if (result != ERR_IO_PENDING) 250 callback.Run(result); 251 } 252 253 void UploadDataStream::ProcessReadResult(scoped_refptr<DrainableIOBuffer> buf, 254 int result) { 255 DCHECK_NE(ERR_IO_PENDING, result); 256 DCHECK(!read_failed_); 257 258 if (result >= 0) 259 buf->DidConsume(result); 260 else 261 read_failed_ = true; 262 } 263 264 } // namespace net 265