1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "content/browser/streams/stream.h" 6 7 #include "base/bind.h" 8 #include "base/location.h" 9 #include "base/message_loop/message_loop_proxy.h" 10 #include "content/browser/streams/stream_handle_impl.h" 11 #include "content/browser/streams/stream_read_observer.h" 12 #include "content/browser/streams/stream_registry.h" 13 #include "content/browser/streams/stream_write_observer.h" 14 #include "net/base/io_buffer.h" 15 16 namespace { 17 // Start throttling the connection at about 1MB. 18 const size_t kDeferSizeThreshold = 40 * 32768; 19 } 20 21 namespace content { 22 23 Stream::Stream(StreamRegistry* registry, 24 StreamWriteObserver* write_observer, 25 const GURL& url) 26 : data_bytes_read_(0), 27 can_add_data_(true), 28 url_(url), 29 data_length_(0), 30 registry_(registry), 31 read_observer_(NULL), 32 write_observer_(write_observer), 33 stream_handle_(NULL), 34 weak_ptr_factory_(this) { 35 CreateByteStream(base::MessageLoopProxy::current(), 36 base::MessageLoopProxy::current(), 37 kDeferSizeThreshold, 38 &writer_, 39 &reader_); 40 41 // Setup callback for writing. 42 writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, 43 weak_ptr_factory_.GetWeakPtr())); 44 reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, 45 weak_ptr_factory_.GetWeakPtr())); 46 47 registry_->RegisterStream(this); 48 } 49 50 Stream::~Stream() { 51 } 52 53 bool Stream::SetReadObserver(StreamReadObserver* observer) { 54 if (read_observer_) 55 return false; 56 read_observer_ = observer; 57 return true; 58 } 59 60 void Stream::RemoveReadObserver(StreamReadObserver* observer) { 61 DCHECK(observer == read_observer_); 62 read_observer_ = NULL; 63 } 64 65 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { 66 DCHECK(observer == write_observer_); 67 write_observer_ = NULL; 68 } 69 70 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { 71 can_add_data_ = writer_->Write(buffer, size); 72 } 73 74 void Stream::AddData(const char* data, size_t size) { 75 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); 76 memcpy(io_buffer->data(), data, size); 77 can_add_data_ = writer_->Write(io_buffer, size); 78 } 79 80 void Stream::Finalize() { 81 writer_->Close(0); 82 writer_.reset(NULL); 83 84 // Continue asynchronously. 85 base::MessageLoopProxy::current()->PostTask( 86 FROM_HERE, 87 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); 88 } 89 90 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, 91 int buf_size, 92 int* bytes_read) { 93 *bytes_read = 0; 94 if (!data_.get()) { 95 data_length_ = 0; 96 data_bytes_read_ = 0; 97 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); 98 switch (state) { 99 case ByteStreamReader::STREAM_HAS_DATA: 100 break; 101 case ByteStreamReader::STREAM_COMPLETE: 102 registry_->UnregisterStream(url()); 103 return STREAM_COMPLETE; 104 case ByteStreamReader::STREAM_EMPTY: 105 return STREAM_EMPTY; 106 } 107 } 108 109 const size_t remaining_bytes = data_length_ - data_bytes_read_; 110 size_t to_read = 111 static_cast<size_t>(buf_size) < remaining_bytes ? 112 buf_size : remaining_bytes; 113 memcpy(buf->data(), data_->data() + data_bytes_read_, to_read); 114 data_bytes_read_ += to_read; 115 if (data_bytes_read_ >= data_length_) 116 data_ = NULL; 117 118 *bytes_read = to_read; 119 return STREAM_HAS_DATA; 120 } 121 122 scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url, 123 const std::string& mime_type) { 124 CHECK(!stream_handle_); 125 stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(), 126 original_url, 127 mime_type); 128 return scoped_ptr<StreamHandle>(stream_handle_).Pass(); 129 } 130 131 void Stream::CloseHandle() { 132 // Prevent deletion until this function ends. 133 scoped_refptr<Stream> ref(this); 134 135 CHECK(stream_handle_); 136 stream_handle_ = NULL; 137 registry_->UnregisterStream(url()); 138 if (write_observer_) 139 write_observer_->OnClose(this); 140 } 141 142 void Stream::OnSpaceAvailable() { 143 can_add_data_ = true; 144 if (write_observer_) 145 write_observer_->OnSpaceAvailable(this); 146 } 147 148 void Stream::OnDataAvailable() { 149 if (read_observer_) 150 read_observer_->OnDataAvailable(this); 151 } 152 153 } // namespace content 154