Home | History | Annotate | Download | only in base
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/base/upload_data_stream.h"
      6 
      7 #include "base/logging.h"
      8 #include "net/base/io_buffer.h"
      9 #include "net/base/net_errors.h"
     10 #include "net/base/upload_bytes_element_reader.h"
     11 #include "net/base/upload_element_reader.h"
     12 
     13 namespace net {
     14 
     15 UploadDataStream::UploadDataStream(
     16     ScopedVector<UploadElementReader> element_readers,
     17     int64 identifier)
     18     : element_readers_(element_readers.Pass()),
     19       element_index_(0),
     20       total_size_(0),
     21       current_position_(0),
     22       identifier_(identifier),
     23       is_chunked_(false),
     24       last_chunk_appended_(false),
     25       read_failed_(false),
     26       initialized_successfully_(false),
     27       weak_ptr_factory_(this) {
     28 }
     29 
     30 UploadDataStream::UploadDataStream(Chunked /*chunked*/, int64 identifier)
     31     : element_index_(0),
     32       total_size_(0),
     33       current_position_(0),
     34       identifier_(identifier),
     35       is_chunked_(true),
     36       last_chunk_appended_(false),
     37       read_failed_(false),
     38       initialized_successfully_(false),
     39       weak_ptr_factory_(this) {
     40 }
     41 
     42 UploadDataStream::~UploadDataStream() {
     43 }
     44 
     45 UploadDataStream* UploadDataStream::CreateWithReader(
     46     scoped_ptr<UploadElementReader> reader,
     47     int64 identifier) {
     48   ScopedVector<UploadElementReader> readers;
     49   readers.push_back(reader.release());
     50   return new UploadDataStream(readers.Pass(), identifier);
     51 }
     52 
     53 int UploadDataStream::Init(const CompletionCallback& callback) {
     54   Reset();
     55   return InitInternal(0, callback);
     56 }
     57 
     58 int UploadDataStream::Read(IOBuffer* buf,
     59                            int buf_len,
     60                            const CompletionCallback& callback) {
     61   DCHECK(initialized_successfully_);
     62   DCHECK_GT(buf_len, 0);
     63   return ReadInternal(new DrainableIOBuffer(buf, buf_len), callback);
     64 }
     65 
     66 bool UploadDataStream::IsEOF() const {
     67   DCHECK(initialized_successfully_);
     68   if (!is_chunked_)
     69     return current_position_ == total_size_;
     70 
     71   // If the upload data is chunked, check if the last chunk is appended and all
     72   // elements are consumed.
     73   return element_index_ == element_readers_.size() && last_chunk_appended_;
     74 }
     75 
     76 bool UploadDataStream::IsInMemory() const {
     77   // Chunks are in memory, but UploadData does not have all the chunks at
     78   // once. Chunks are provided progressively with AppendChunk() as chunks
     79   // are ready. Check is_chunked_ here, rather than relying on the loop
     80   // below, as there is a case that is_chunked_ is set to true, but the
     81   // first chunk is not yet delivered.
     82   if (is_chunked_)
     83     return false;
     84 
     85   for (size_t i = 0; i < element_readers_.size(); ++i) {
     86     if (!element_readers_[i]->IsInMemory())
     87       return false;
     88   }
     89   return true;
     90 }
     91 
     92 void UploadDataStream::AppendChunk(const char* bytes,
     93                                    int bytes_len,
     94                                    bool is_last_chunk) {
     95   DCHECK(is_chunked_);
     96   DCHECK(!last_chunk_appended_);
     97   last_chunk_appended_ = is_last_chunk;
     98 
     99   // Initialize a reader for the newly appended chunk. We leave |total_size_| at
    100   // zero, since for chunked uploads, we may not know the total size.
    101   std::vector<char> data(bytes, bytes + bytes_len);
    102   UploadElementReader* reader = new UploadOwnedBytesElementReader(&data);
    103   const int rv = reader->Init(net::CompletionCallback());
    104   DCHECK_EQ(OK, rv);
    105   element_readers_.push_back(reader);
    106 
    107   // Resume pending read.
    108   if (!pending_chunked_read_callback_.is_null()) {
    109     base::Closure callback = pending_chunked_read_callback_;
    110     pending_chunked_read_callback_.Reset();
    111     callback.Run();
    112   }
    113 }
    114 
    115 void UploadDataStream::Reset() {
    116   weak_ptr_factory_.InvalidateWeakPtrs();
    117   pending_chunked_read_callback_.Reset();
    118   initialized_successfully_ = false;
    119   read_failed_ = false;
    120   current_position_ = 0;
    121   total_size_ = 0;
    122   element_index_ = 0;
    123 }
    124 
    125 int UploadDataStream::InitInternal(int start_index,
    126                                    const CompletionCallback& callback) {
    127   DCHECK(!initialized_successfully_);
    128 
    129   // Call Init() for all elements.
    130   for (size_t i = start_index; i < element_readers_.size(); ++i) {
    131     UploadElementReader* reader = element_readers_[i];
    132     // When new_result is ERR_IO_PENDING, InitInternal() will be called
    133     // with start_index == i + 1 when reader->Init() finishes.
    134     const int result = reader->Init(
    135         base::Bind(&UploadDataStream::ResumePendingInit,
    136                    weak_ptr_factory_.GetWeakPtr(),
    137                    i + 1,
    138                    callback));
    139     if (result != OK) {
    140       DCHECK(result != ERR_IO_PENDING || !callback.is_null());
    141       return result;
    142     }
    143   }
    144 
    145   // Finalize initialization.
    146   if (!is_chunked_) {
    147     uint64 total_size = 0;
    148     for (size_t i = 0; i < element_readers_.size(); ++i) {
    149       UploadElementReader* reader = element_readers_[i];
    150       total_size += reader->GetContentLength();
    151     }
    152     total_size_ = total_size;
    153   }
    154   initialized_successfully_ = true;
    155   return OK;
    156 }
    157 
    158 void UploadDataStream::ResumePendingInit(int start_index,
    159                                          const CompletionCallback& callback,
    160                                          int previous_result) {
    161   DCHECK(!initialized_successfully_);
    162   DCHECK(!callback.is_null());
    163   DCHECK_NE(ERR_IO_PENDING, previous_result);
    164 
    165   // Check the last result.
    166   if (previous_result != OK) {
    167     callback.Run(previous_result);
    168     return;
    169   }
    170 
    171   const int result = InitInternal(start_index, callback);
    172   if (result != ERR_IO_PENDING)
    173     callback.Run(result);
    174 }
    175 
    176 int UploadDataStream::ReadInternal(scoped_refptr<DrainableIOBuffer> buf,
    177                                    const CompletionCallback& callback) {
    178   DCHECK(initialized_successfully_);
    179 
    180   while (!read_failed_ && element_index_ < element_readers_.size()) {
    181     UploadElementReader* reader = element_readers_[element_index_];
    182 
    183     if (reader->BytesRemaining() == 0) {
    184       ++element_index_;
    185       continue;
    186     }
    187 
    188     if (buf->BytesRemaining() == 0)
    189       break;
    190 
    191     int result = reader->Read(
    192         buf.get(),
    193         buf->BytesRemaining(),
    194         base::Bind(base::IgnoreResult(&UploadDataStream::ResumePendingRead),
    195                    weak_ptr_factory_.GetWeakPtr(),
    196                    buf,
    197                    callback));
    198     if (result == ERR_IO_PENDING) {
    199       DCHECK(!callback.is_null());
    200       return ERR_IO_PENDING;
    201     }
    202     ProcessReadResult(buf, result);
    203   }
    204 
    205   if (read_failed_) {
    206     // Chunked transfers may only contain byte readers, so cannot have read
    207     // failures.
    208     DCHECK(!is_chunked_);
    209 
    210     // If an error occured during read operation, then pad with zero.
    211     // Otherwise the server will hang waiting for the rest of the data.
    212     const int num_bytes_to_fill =
    213         std::min(static_cast<uint64>(buf->BytesRemaining()),
    214                  size() - position() - buf->BytesConsumed());
    215     DCHECK_LE(0, num_bytes_to_fill);
    216     memset(buf->data(), 0, num_bytes_to_fill);
    217     buf->DidConsume(num_bytes_to_fill);
    218   }
    219 
    220   const int bytes_copied = buf->BytesConsumed();
    221   current_position_ += bytes_copied;
    222   DCHECK(is_chunked_ || total_size_ >= current_position_);
    223 
    224   if (is_chunked_ && !IsEOF() && bytes_copied == 0) {
    225     DCHECK(!callback.is_null());
    226     DCHECK(pending_chunked_read_callback_.is_null());
    227     pending_chunked_read_callback_ =
    228         base::Bind(&UploadDataStream::ResumePendingRead,
    229                    weak_ptr_factory_.GetWeakPtr(),
    230                    buf,
    231                    callback,
    232                    OK);
    233     return ERR_IO_PENDING;
    234   }
    235 
    236   // Returning 0 is allowed only when IsEOF() == true.
    237   DCHECK(bytes_copied != 0 || IsEOF());
    238   return bytes_copied;
    239 }
    240 
    241 void UploadDataStream::ResumePendingRead(scoped_refptr<DrainableIOBuffer> buf,
    242                                          const CompletionCallback& callback,
    243                                          int previous_result) {
    244   DCHECK(!callback.is_null());
    245 
    246   ProcessReadResult(buf, previous_result);
    247 
    248   const int result = ReadInternal(buf, callback);
    249   if (result != ERR_IO_PENDING)
    250     callback.Run(result);
    251 }
    252 
    253 void UploadDataStream::ProcessReadResult(scoped_refptr<DrainableIOBuffer> buf,
    254                                          int result) {
    255   DCHECK_NE(ERR_IO_PENDING, result);
    256   DCHECK(!read_failed_);
    257 
    258   if (result >= 0)
    259     buf->DidConsume(result);
    260   else
    261     read_failed_ = true;
    262 }
    263 
    264 }  // namespace net
    265