1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "content/browser/streams/stream.h" 6 7 #include "base/bind.h" 8 #include "base/location.h" 9 #include "base/message_loop/message_loop_proxy.h" 10 #include "content/browser/streams/stream_handle_impl.h" 11 #include "content/browser/streams/stream_read_observer.h" 12 #include "content/browser/streams/stream_registry.h" 13 #include "content/browser/streams/stream_write_observer.h" 14 #include "net/base/io_buffer.h" 15 16 namespace { 17 // Start throttling the connection at about 1MB. 18 const size_t kDeferSizeThreshold = 40 * 32768; 19 } 20 21 namespace content { 22 23 Stream::Stream(StreamRegistry* registry, 24 StreamWriteObserver* write_observer, 25 const GURL& url) 26 : can_add_data_(true), 27 url_(url), 28 data_length_(0), 29 data_bytes_read_(0), 30 last_total_buffered_bytes_(0), 31 registry_(registry), 32 read_observer_(NULL), 33 write_observer_(write_observer), 34 stream_handle_(NULL), 35 weak_ptr_factory_(this) { 36 CreateByteStream(base::MessageLoopProxy::current(), 37 base::MessageLoopProxy::current(), 38 kDeferSizeThreshold, 39 &writer_, 40 &reader_); 41 42 // Setup callback for writing. 43 writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, 44 weak_ptr_factory_.GetWeakPtr())); 45 reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, 46 weak_ptr_factory_.GetWeakPtr())); 47 48 registry_->RegisterStream(this); 49 } 50 51 Stream::~Stream() { 52 } 53 54 bool Stream::SetReadObserver(StreamReadObserver* observer) { 55 if (read_observer_) 56 return false; 57 read_observer_ = observer; 58 return true; 59 } 60 61 void Stream::RemoveReadObserver(StreamReadObserver* observer) { 62 DCHECK(observer == read_observer_); 63 read_observer_ = NULL; 64 } 65 66 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { 67 DCHECK(observer == write_observer_); 68 write_observer_ = NULL; 69 } 70 71 void Stream::Abort() { 72 // Clear all buffer. It's safe to clear reader_ here since the same thread 73 // is used for both input and output operation. 74 writer_.reset(); 75 reader_.reset(); 76 ClearBuffer(); 77 can_add_data_ = false; 78 registry_->UnregisterStream(url()); 79 } 80 81 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { 82 if (!writer_.get()) 83 return; 84 85 size_t current_buffered_bytes = writer_->GetTotalBufferedBytes(); 86 if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) { 87 Abort(); 88 return; 89 } 90 91 // Now it's guaranteed that this doesn't overflow. This must be done before 92 // Write() since GetTotalBufferedBytes() may return different value after 93 // Write() call, so if we use the new value, information in this instance and 94 // one in |registry_| become inconsistent. 95 last_total_buffered_bytes_ = current_buffered_bytes + size; 96 97 can_add_data_ = writer_->Write(buffer, size); 98 } 99 100 void Stream::AddData(const char* data, size_t size) { 101 if (!writer_.get()) 102 return; 103 104 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); 105 memcpy(io_buffer->data(), data, size); 106 AddData(io_buffer, size); 107 } 108 109 void Stream::Finalize() { 110 if (!writer_.get()) 111 return; 112 113 writer_->Close(0); 114 writer_.reset(); 115 116 // Continue asynchronously. 117 base::MessageLoopProxy::current()->PostTask( 118 FROM_HERE, 119 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); 120 } 121 122 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, 123 int buf_size, 124 int* bytes_read) { 125 DCHECK(buf); 126 DCHECK(bytes_read); 127 128 *bytes_read = 0; 129 if (!data_.get()) { 130 DCHECK(!data_length_); 131 DCHECK(!data_bytes_read_); 132 133 if (!reader_.get()) 134 return STREAM_ABORTED; 135 136 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); 137 switch (state) { 138 case ByteStreamReader::STREAM_HAS_DATA: 139 break; 140 case ByteStreamReader::STREAM_COMPLETE: 141 registry_->UnregisterStream(url()); 142 return STREAM_COMPLETE; 143 case ByteStreamReader::STREAM_EMPTY: 144 return STREAM_EMPTY; 145 } 146 } 147 148 const size_t remaining_bytes = data_length_ - data_bytes_read_; 149 size_t to_read = 150 static_cast<size_t>(buf_size) < remaining_bytes ? 151 buf_size : remaining_bytes; 152 memcpy(buf->data(), data_->data() + data_bytes_read_, to_read); 153 data_bytes_read_ += to_read; 154 if (data_bytes_read_ >= data_length_) 155 ClearBuffer(); 156 157 *bytes_read = to_read; 158 return STREAM_HAS_DATA; 159 } 160 161 scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url, 162 const std::string& mime_type) { 163 CHECK(!stream_handle_); 164 stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(), 165 original_url, 166 mime_type); 167 return scoped_ptr<StreamHandle>(stream_handle_).Pass(); 168 } 169 170 void Stream::CloseHandle() { 171 // Prevent deletion until this function ends. 172 scoped_refptr<Stream> ref(this); 173 174 CHECK(stream_handle_); 175 stream_handle_ = NULL; 176 registry_->UnregisterStream(url()); 177 if (write_observer_) 178 write_observer_->OnClose(this); 179 } 180 181 void Stream::OnSpaceAvailable() { 182 can_add_data_ = true; 183 if (write_observer_) 184 write_observer_->OnSpaceAvailable(this); 185 } 186 187 void Stream::OnDataAvailable() { 188 if (read_observer_) 189 read_observer_->OnDataAvailable(this); 190 } 191 192 void Stream::ClearBuffer() { 193 data_ = NULL; 194 data_length_ = 0; 195 data_bytes_read_ = 0; 196 } 197 198 } // namespace content 199