1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "content/browser/byte_stream.h" 6 7 #include <deque> 8 #include <set> 9 #include <utility> 10 11 #include "base/bind.h" 12 #include "base/location.h" 13 #include "base/memory/ref_counted.h" 14 #include "base/sequenced_task_runner.h" 15 16 namespace content { 17 namespace { 18 19 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > 20 ContentVector; 21 22 class ByteStreamReaderImpl; 23 24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be 25 // cleared in an object destructor and accessed to check for object 26 // existence. We can't use weak pointers because they're tightly tied to 27 // threads rather than task runners. 28 // TODO(rdsmith): A better solution would be extending weak pointers 29 // to support SequencedTaskRunners. 30 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> { 31 public: 32 LifetimeFlag() : is_alive(true) { } 33 bool is_alive; 34 35 protected: 36 friend class base::RefCountedThreadSafe<LifetimeFlag>; 37 virtual ~LifetimeFlag() { } 38 39 private: 40 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag); 41 }; 42 43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and 44 // SetPeer may happen anywhere; all other operations on each class must 45 // happen in the context of their SequencedTaskRunner. 46 class ByteStreamWriterImpl : public ByteStreamWriter { 47 public: 48 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, 49 scoped_refptr<LifetimeFlag> lifetime_flag, 50 size_t buffer_size); 51 virtual ~ByteStreamWriterImpl(); 52 53 // Must be called before any operations are performed. 54 void SetPeer(ByteStreamReaderImpl* peer, 55 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 56 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 57 58 // Overridden from ByteStreamWriter. 59 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, 60 size_t byte_count) OVERRIDE; 61 virtual void Flush() OVERRIDE; 62 virtual void Close(int status) OVERRIDE; 63 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; 64 virtual size_t GetTotalBufferedBytes() const OVERRIDE; 65 66 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. 67 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, 68 ByteStreamWriterImpl* target, 69 size_t bytes_consumed); 70 71 private: 72 // Called from UpdateWindow when object existence has been validated. 73 void UpdateWindowInternal(size_t bytes_consumed); 74 75 void PostToPeer(bool complete, int status); 76 77 const size_t total_buffer_size_; 78 79 // All data objects in this class are only valid to access on 80 // this task runner except as otherwise noted. 81 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; 82 83 // True while this object is alive. 84 scoped_refptr<LifetimeFlag> my_lifetime_flag_; 85 86 base::Closure space_available_callback_; 87 ContentVector input_contents_; 88 size_t input_contents_size_; 89 90 // ** Peer information. 91 92 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; 93 94 // How much we've sent to the output that for flow control purposes we 95 // must assume hasn't been read yet. 96 size_t output_size_used_; 97 98 // Only valid to access on peer_task_runner_. 99 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; 100 101 // Only valid to access on peer_task_runner_ if 102 // |*peer_lifetime_flag_ == true| 103 ByteStreamReaderImpl* peer_; 104 }; 105 106 class ByteStreamReaderImpl : public ByteStreamReader { 107 public: 108 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, 109 scoped_refptr<LifetimeFlag> lifetime_flag, 110 size_t buffer_size); 111 virtual ~ByteStreamReaderImpl(); 112 113 // Must be called before any operations are performed. 114 void SetPeer(ByteStreamWriterImpl* peer, 115 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 116 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 117 118 // Overridden from ByteStreamReader. 119 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, 120 size_t* length) OVERRIDE; 121 virtual int GetStatus() const OVERRIDE; 122 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; 123 124 // PostTask target from |ByteStreamWriterImpl::Write| and 125 // |ByteStreamWriterImpl::Close|. 126 // Receive data from our peer. 127 // static because it may be called after the object it is targeting 128 // has been destroyed. It may not access |*target| 129 // if |*object_lifetime_flag| is false. 130 static void TransferData( 131 scoped_refptr<LifetimeFlag> object_lifetime_flag, 132 ByteStreamReaderImpl* target, 133 scoped_ptr<ContentVector> transfer_buffer, 134 size_t transfer_buffer_bytes, 135 bool source_complete, 136 int status); 137 138 private: 139 // Called from TransferData once object existence has been validated. 140 void TransferDataInternal( 141 scoped_ptr<ContentVector> transfer_buffer, 142 size_t transfer_buffer_bytes, 143 bool source_complete, 144 int status); 145 146 void MaybeUpdateInput(); 147 148 const size_t total_buffer_size_; 149 150 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; 151 152 // True while this object is alive. 153 scoped_refptr<LifetimeFlag> my_lifetime_flag_; 154 155 ContentVector available_contents_; 156 157 bool received_status_; 158 int status_; 159 160 base::Closure data_available_callback_; 161 162 // Time of last point at which data in stream transitioned from full 163 // to non-full. Nulled when a callback is sent. 164 base::Time last_non_full_time_; 165 166 // ** Peer information 167 168 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; 169 170 // How much has been removed from this class that we haven't told 171 // the input about yet. 172 size_t unreported_consumed_bytes_; 173 174 // Only valid to access on peer_task_runner_. 175 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; 176 177 // Only valid to access on peer_task_runner_ if 178 // |*peer_lifetime_flag_ == true| 179 ByteStreamWriterImpl* peer_; 180 }; 181 182 ByteStreamWriterImpl::ByteStreamWriterImpl( 183 scoped_refptr<base::SequencedTaskRunner> task_runner, 184 scoped_refptr<LifetimeFlag> lifetime_flag, 185 size_t buffer_size) 186 : total_buffer_size_(buffer_size), 187 my_task_runner_(task_runner), 188 my_lifetime_flag_(lifetime_flag), 189 input_contents_size_(0), 190 output_size_used_(0), 191 peer_(NULL) { 192 DCHECK(my_lifetime_flag_.get()); 193 my_lifetime_flag_->is_alive = true; 194 } 195 196 ByteStreamWriterImpl::~ByteStreamWriterImpl() { 197 my_lifetime_flag_->is_alive = false; 198 } 199 200 void ByteStreamWriterImpl::SetPeer( 201 ByteStreamReaderImpl* peer, 202 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 203 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { 204 peer_ = peer; 205 peer_task_runner_ = peer_task_runner; 206 peer_lifetime_flag_ = peer_lifetime_flag; 207 } 208 209 bool ByteStreamWriterImpl::Write( 210 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { 211 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 212 213 // Check overflow. 214 // 215 // TODO(tyoshino): Discuss with content/browser/download developer and if 216 // they're fine with, set smaller limit and make it configurable. 217 size_t space_limit = std::numeric_limits<size_t>::max() - 218 GetTotalBufferedBytes(); 219 if (byte_count > space_limit) { 220 // TODO(tyoshino): Tell the user that Write() failed. 221 // Ignore input. 222 return false; 223 } 224 225 input_contents_.push_back(std::make_pair(buffer, byte_count)); 226 input_contents_size_ += byte_count; 227 228 // Arbitrarily, we buffer to a third of the total size before sending. 229 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) 230 PostToPeer(false, 0); 231 232 return GetTotalBufferedBytes() <= total_buffer_size_; 233 } 234 235 void ByteStreamWriterImpl::Flush() { 236 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 237 if (input_contents_size_ > 0) 238 PostToPeer(false, 0); 239 } 240 241 void ByteStreamWriterImpl::Close(int status) { 242 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 243 PostToPeer(true, status); 244 } 245 246 void ByteStreamWriterImpl::RegisterCallback( 247 const base::Closure& source_callback) { 248 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 249 space_available_callback_ = source_callback; 250 } 251 252 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const { 253 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 254 // This sum doesn't overflow since Write() fails if this sum is going to 255 // overflow. 256 return input_contents_size_ + output_size_used_; 257 } 258 259 // static 260 void ByteStreamWriterImpl::UpdateWindow( 261 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target, 262 size_t bytes_consumed) { 263 // If the target object isn't alive anymore, we do nothing. 264 if (!lifetime_flag->is_alive) return; 265 266 target->UpdateWindowInternal(bytes_consumed); 267 } 268 269 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) { 270 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 271 272 bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_; 273 274 DCHECK_GE(output_size_used_, bytes_consumed); 275 output_size_used_ -= bytes_consumed; 276 277 // Callback if we were above the limit and we're now <= to it. 278 bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_; 279 280 if (no_longer_above_limit && was_above_limit && 281 !space_available_callback_.is_null()) 282 space_available_callback_.Run(); 283 } 284 285 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { 286 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 287 // Valid contexts in which to call. 288 DCHECK(complete || 0 != input_contents_size_); 289 290 scoped_ptr<ContentVector> transfer_buffer; 291 size_t buffer_size = 0; 292 if (0 != input_contents_size_) { 293 transfer_buffer.reset(new ContentVector); 294 transfer_buffer->swap(input_contents_); 295 buffer_size = input_contents_size_; 296 output_size_used_ += input_contents_size_; 297 input_contents_size_ = 0; 298 } 299 peer_task_runner_->PostTask( 300 FROM_HERE, base::Bind( 301 &ByteStreamReaderImpl::TransferData, 302 peer_lifetime_flag_, 303 peer_, 304 base::Passed(&transfer_buffer), 305 buffer_size, 306 complete, 307 status)); 308 } 309 310 ByteStreamReaderImpl::ByteStreamReaderImpl( 311 scoped_refptr<base::SequencedTaskRunner> task_runner, 312 scoped_refptr<LifetimeFlag> lifetime_flag, 313 size_t buffer_size) 314 : total_buffer_size_(buffer_size), 315 my_task_runner_(task_runner), 316 my_lifetime_flag_(lifetime_flag), 317 received_status_(false), 318 status_(0), 319 unreported_consumed_bytes_(0), 320 peer_(NULL) { 321 DCHECK(my_lifetime_flag_.get()); 322 my_lifetime_flag_->is_alive = true; 323 } 324 325 ByteStreamReaderImpl::~ByteStreamReaderImpl() { 326 my_lifetime_flag_->is_alive = false; 327 } 328 329 void ByteStreamReaderImpl::SetPeer( 330 ByteStreamWriterImpl* peer, 331 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 332 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { 333 peer_ = peer; 334 peer_task_runner_ = peer_task_runner; 335 peer_lifetime_flag_ = peer_lifetime_flag; 336 } 337 338 ByteStreamReaderImpl::StreamState 339 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data, 340 size_t* length) { 341 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 342 343 if (available_contents_.size()) { 344 *data = available_contents_.front().first; 345 *length = available_contents_.front().second; 346 available_contents_.pop_front(); 347 unreported_consumed_bytes_ += *length; 348 349 MaybeUpdateInput(); 350 return STREAM_HAS_DATA; 351 } 352 if (received_status_) { 353 return STREAM_COMPLETE; 354 } 355 return STREAM_EMPTY; 356 } 357 358 int ByteStreamReaderImpl::GetStatus() const { 359 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 360 DCHECK(received_status_); 361 return status_; 362 } 363 364 void ByteStreamReaderImpl::RegisterCallback( 365 const base::Closure& sink_callback) { 366 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 367 368 data_available_callback_ = sink_callback; 369 } 370 371 // static 372 void ByteStreamReaderImpl::TransferData( 373 scoped_refptr<LifetimeFlag> object_lifetime_flag, 374 ByteStreamReaderImpl* target, 375 scoped_ptr<ContentVector> transfer_buffer, 376 size_t buffer_size, 377 bool source_complete, 378 int status) { 379 // If our target is no longer alive, do nothing. 380 if (!object_lifetime_flag->is_alive) return; 381 382 target->TransferDataInternal( 383 transfer_buffer.Pass(), buffer_size, source_complete, status); 384 } 385 386 void ByteStreamReaderImpl::TransferDataInternal( 387 scoped_ptr<ContentVector> transfer_buffer, 388 size_t buffer_size, 389 bool source_complete, 390 int status) { 391 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 392 393 bool was_empty = available_contents_.empty(); 394 395 if (transfer_buffer) { 396 available_contents_.insert(available_contents_.end(), 397 transfer_buffer->begin(), 398 transfer_buffer->end()); 399 } 400 401 if (source_complete) { 402 received_status_ = true; 403 status_ = status; 404 } 405 406 // Callback on transition from empty to non-empty, or 407 // source complete. 408 if (((was_empty && !available_contents_.empty()) || 409 source_complete) && 410 !data_available_callback_.is_null()) 411 data_available_callback_.Run(); 412 } 413 414 // Decide whether or not to send the input a window update. 415 // Currently we do that whenever we've got unreported consumption 416 // greater than 1/3 of total size. 417 void ByteStreamReaderImpl::MaybeUpdateInput() { 418 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 419 420 if (unreported_consumed_bytes_ <= 421 total_buffer_size_ / kFractionReadBeforeWindowUpdate) 422 return; 423 424 peer_task_runner_->PostTask( 425 FROM_HERE, base::Bind( 426 &ByteStreamWriterImpl::UpdateWindow, 427 peer_lifetime_flag_, 428 peer_, 429 unreported_consumed_bytes_)); 430 unreported_consumed_bytes_ = 0; 431 } 432 433 } // namespace 434 435 const int ByteStreamWriter::kFractionBufferBeforeSending = 3; 436 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3; 437 438 ByteStreamReader::~ByteStreamReader() { } 439 440 ByteStreamWriter::~ByteStreamWriter() { } 441 442 void CreateByteStream( 443 scoped_refptr<base::SequencedTaskRunner> input_task_runner, 444 scoped_refptr<base::SequencedTaskRunner> output_task_runner, 445 size_t buffer_size, 446 scoped_ptr<ByteStreamWriter>* input, 447 scoped_ptr<ByteStreamReader>* output) { 448 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag()); 449 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag()); 450 451 ByteStreamWriterImpl* in = new ByteStreamWriterImpl( 452 input_task_runner, input_flag, buffer_size); 453 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( 454 output_task_runner, output_flag, buffer_size); 455 456 in->SetPeer(out, output_task_runner, output_flag); 457 out->SetPeer(in, input_task_runner, input_flag); 458 input->reset(in); 459 output->reset(out); 460 } 461 462 } // namespace content 463