1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "content/browser/streams/stream.h" 6 7 #include "base/bind.h" 8 #include "base/location.h" 9 #include "base/message_loop/message_loop_proxy.h" 10 #include "base/values.h" 11 #include "content/browser/streams/stream_handle_impl.h" 12 #include "content/browser/streams/stream_read_observer.h" 13 #include "content/browser/streams/stream_registry.h" 14 #include "content/browser/streams/stream_write_observer.h" 15 #include "net/base/io_buffer.h" 16 #include "net/http/http_response_headers.h" 17 18 namespace { 19 // Start throttling the connection at about 1MB. 20 const size_t kDeferSizeThreshold = 40 * 32768; 21 } 22 23 namespace content { 24 25 Stream::Stream(StreamRegistry* registry, 26 StreamWriteObserver* write_observer, 27 const GURL& url) 28 : can_add_data_(true), 29 url_(url), 30 data_length_(0), 31 data_bytes_read_(0), 32 last_total_buffered_bytes_(0), 33 registry_(registry), 34 read_observer_(NULL), 35 write_observer_(write_observer), 36 stream_handle_(NULL), 37 weak_ptr_factory_(this) { 38 CreateByteStream(base::MessageLoopProxy::current(), 39 base::MessageLoopProxy::current(), 40 kDeferSizeThreshold, 41 &writer_, 42 &reader_); 43 44 // Setup callback for writing. 45 writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, 46 weak_ptr_factory_.GetWeakPtr())); 47 reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, 48 weak_ptr_factory_.GetWeakPtr())); 49 50 registry_->RegisterStream(this); 51 } 52 53 Stream::~Stream() { 54 } 55 56 bool Stream::SetReadObserver(StreamReadObserver* observer) { 57 if (read_observer_) 58 return false; 59 read_observer_ = observer; 60 return true; 61 } 62 63 void Stream::RemoveReadObserver(StreamReadObserver* observer) { 64 DCHECK(observer == read_observer_); 65 read_observer_ = NULL; 66 } 67 68 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { 69 DCHECK(observer == write_observer_); 70 write_observer_ = NULL; 71 } 72 73 void Stream::Abort() { 74 // Clear all buffer. It's safe to clear reader_ here since the same thread 75 // is used for both input and output operation. 76 writer_.reset(); 77 reader_.reset(); 78 ClearBuffer(); 79 can_add_data_ = false; 80 registry_->UnregisterStream(url()); 81 } 82 83 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { 84 if (!writer_.get()) 85 return; 86 87 size_t current_buffered_bytes = writer_->GetTotalBufferedBytes(); 88 if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) { 89 Abort(); 90 return; 91 } 92 93 // Now it's guaranteed that this doesn't overflow. This must be done before 94 // Write() since GetTotalBufferedBytes() may return different value after 95 // Write() call, so if we use the new value, information in this instance and 96 // one in |registry_| become inconsistent. 97 last_total_buffered_bytes_ = current_buffered_bytes + size; 98 99 can_add_data_ = writer_->Write(buffer, size); 100 } 101 102 void Stream::AddData(const char* data, size_t size) { 103 if (!writer_.get()) 104 return; 105 106 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); 107 memcpy(io_buffer->data(), data, size); 108 AddData(io_buffer, size); 109 } 110 111 void Stream::Finalize() { 112 if (!writer_.get()) 113 return; 114 115 writer_->Close(0); 116 writer_.reset(); 117 118 // Continue asynchronously. 119 base::MessageLoopProxy::current()->PostTask( 120 FROM_HERE, 121 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); 122 } 123 124 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, 125 int buf_size, 126 int* bytes_read) { 127 DCHECK(buf); 128 DCHECK(bytes_read); 129 130 *bytes_read = 0; 131 if (!data_.get()) { 132 DCHECK(!data_length_); 133 DCHECK(!data_bytes_read_); 134 135 if (!reader_.get()) 136 return STREAM_ABORTED; 137 138 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); 139 switch (state) { 140 case ByteStreamReader::STREAM_HAS_DATA: 141 break; 142 case ByteStreamReader::STREAM_COMPLETE: 143 registry_->UnregisterStream(url()); 144 return STREAM_COMPLETE; 145 case ByteStreamReader::STREAM_EMPTY: 146 return STREAM_EMPTY; 147 } 148 } 149 150 const size_t remaining_bytes = data_length_ - data_bytes_read_; 151 size_t to_read = 152 static_cast<size_t>(buf_size) < remaining_bytes ? 153 buf_size : remaining_bytes; 154 memcpy(buf->data(), data_->data() + data_bytes_read_, to_read); 155 data_bytes_read_ += to_read; 156 if (data_bytes_read_ >= data_length_) 157 ClearBuffer(); 158 159 *bytes_read = to_read; 160 return STREAM_HAS_DATA; 161 } 162 163 scoped_ptr<StreamHandle> Stream::CreateHandle( 164 const GURL& original_url, 165 const std::string& mime_type, 166 scoped_refptr<net::HttpResponseHeaders> response_headers) { 167 CHECK(!stream_handle_); 168 stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(), 169 original_url, 170 mime_type, 171 response_headers); 172 return scoped_ptr<StreamHandle>(stream_handle_).Pass(); 173 } 174 175 void Stream::CloseHandle() { 176 // Prevent deletion until this function ends. 177 scoped_refptr<Stream> ref(this); 178 179 CHECK(stream_handle_); 180 stream_handle_ = NULL; 181 registry_->UnregisterStream(url()); 182 if (write_observer_) 183 write_observer_->OnClose(this); 184 } 185 186 void Stream::OnSpaceAvailable() { 187 can_add_data_ = true; 188 if (write_observer_) 189 write_observer_->OnSpaceAvailable(this); 190 } 191 192 void Stream::OnDataAvailable() { 193 if (read_observer_) 194 read_observer_->OnDataAvailable(this); 195 } 196 197 void Stream::ClearBuffer() { 198 data_ = NULL; 199 data_length_ = 0; 200 data_bytes_read_ = 0; 201 } 202 203 } // namespace content 204