1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "content/browser/byte_stream.h" 6 7 #include <deque> 8 #include <set> 9 #include <utility> 10 11 #include "base/bind.h" 12 #include "base/location.h" 13 #include "base/memory/ref_counted.h" 14 #include "base/sequenced_task_runner.h" 15 16 namespace content { 17 namespace { 18 19 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > 20 ContentVector; 21 22 class ByteStreamReaderImpl; 23 24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be 25 // cleared in an object destructor and accessed to check for object 26 // existence. We can't use weak pointers because they're tightly tied to 27 // threads rather than task runners. 28 // TODO(rdsmith): A better solution would be extending weak pointers 29 // to support SequencedTaskRunners. 30 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> { 31 public: 32 LifetimeFlag() : is_alive(true) { } 33 bool is_alive; 34 35 protected: 36 friend class base::RefCountedThreadSafe<LifetimeFlag>; 37 virtual ~LifetimeFlag() { } 38 39 private: 40 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag); 41 }; 42 43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and 44 // SetPeer may happen anywhere; all other operations on each class must 45 // happen in the context of their SequencedTaskRunner. 46 class ByteStreamWriterImpl : public ByteStreamWriter { 47 public: 48 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, 49 scoped_refptr<LifetimeFlag> lifetime_flag, 50 size_t buffer_size); 51 virtual ~ByteStreamWriterImpl(); 52 53 // Must be called before any operations are performed. 54 void SetPeer(ByteStreamReaderImpl* peer, 55 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 56 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 57 58 // Overridden from ByteStreamWriter. 59 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, 60 size_t byte_count) OVERRIDE; 61 virtual void Flush() OVERRIDE; 62 virtual void Close(int status) OVERRIDE; 63 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; 64 virtual size_t GetTotalBufferedBytes() const OVERRIDE; 65 66 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. 67 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, 68 ByteStreamWriterImpl* target, 69 size_t bytes_consumed); 70 71 private: 72 // Called from UpdateWindow when object existence has been validated. 73 void UpdateWindowInternal(size_t bytes_consumed); 74 75 void PostToPeer(bool complete, int status); 76 77 const size_t total_buffer_size_; 78 79 // All data objects in this class are only valid to access on 80 // this task runner except as otherwise noted. 81 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; 82 83 // True while this object is alive. 84 scoped_refptr<LifetimeFlag> my_lifetime_flag_; 85 86 base::Closure space_available_callback_; 87 ContentVector input_contents_; 88 size_t input_contents_size_; 89 90 // ** Peer information. 91 92 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; 93 94 // How much we've sent to the output that for flow control purposes we 95 // must assume hasn't been read yet. 96 size_t output_size_used_; 97 98 // Only valid to access on peer_task_runner_. 99 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; 100 101 // Only valid to access on peer_task_runner_ if 102 // |*peer_lifetime_flag_ == true| 103 ByteStreamReaderImpl* peer_; 104 }; 105 106 class ByteStreamReaderImpl : public ByteStreamReader { 107 public: 108 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, 109 scoped_refptr<LifetimeFlag> lifetime_flag, 110 size_t buffer_size); 111 virtual ~ByteStreamReaderImpl(); 112 113 // Must be called before any operations are performed. 114 void SetPeer(ByteStreamWriterImpl* peer, 115 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 116 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 117 118 // Overridden from ByteStreamReader. 119 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, 120 size_t* length) OVERRIDE; 121 virtual int GetStatus() const OVERRIDE; 122 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; 123 124 // PostTask target from |ByteStreamWriterImpl::Write| and 125 // |ByteStreamWriterImpl::Close|. 126 // Receive data from our peer. 127 // static because it may be called after the object it is targeting 128 // has been destroyed. It may not access |*target| 129 // if |*object_lifetime_flag| is false. 130 static void TransferData( 131 scoped_refptr<LifetimeFlag> object_lifetime_flag, 132 ByteStreamReaderImpl* target, 133 scoped_ptr<ContentVector> transfer_buffer, 134 size_t transfer_buffer_bytes, 135 bool source_complete, 136 int status); 137 138 private: 139 // Called from TransferData once object existence has been validated. 140 void TransferDataInternal( 141 scoped_ptr<ContentVector> transfer_buffer, 142 size_t transfer_buffer_bytes, 143 bool source_complete, 144 int status); 145 146 void MaybeUpdateInput(); 147 148 const size_t total_buffer_size_; 149 150 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; 151 152 // True while this object is alive. 153 scoped_refptr<LifetimeFlag> my_lifetime_flag_; 154 155 ContentVector available_contents_; 156 157 bool received_status_; 158 int status_; 159 160 base::Closure data_available_callback_; 161 162 // Time of last point at which data in stream transitioned from full 163 // to non-full. Nulled when a callback is sent. 164 base::Time last_non_full_time_; 165 166 // ** Peer information 167 168 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; 169 170 // How much has been removed from this class that we haven't told 171 // the input about yet. 172 size_t unreported_consumed_bytes_; 173 174 // Only valid to access on peer_task_runner_. 175 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; 176 177 // Only valid to access on peer_task_runner_ if 178 // |*peer_lifetime_flag_ == true| 179 ByteStreamWriterImpl* peer_; 180 }; 181 182 ByteStreamWriterImpl::ByteStreamWriterImpl( 183 scoped_refptr<base::SequencedTaskRunner> task_runner, 184 scoped_refptr<LifetimeFlag> lifetime_flag, 185 size_t buffer_size) 186 : total_buffer_size_(buffer_size), 187 my_task_runner_(task_runner), 188 my_lifetime_flag_(lifetime_flag), 189 input_contents_size_(0), 190 output_size_used_(0), 191 peer_(NULL) { 192 DCHECK(my_lifetime_flag_.get()); 193 my_lifetime_flag_->is_alive = true; 194 } 195 196 ByteStreamWriterImpl::~ByteStreamWriterImpl() { 197 // No RunsTasksOnCurrentThread() check to allow deleting a created writer 198 // before we start using it. Once started, should be deleted on the specified 199 // task runner. 200 my_lifetime_flag_->is_alive = false; 201 } 202 203 void ByteStreamWriterImpl::SetPeer( 204 ByteStreamReaderImpl* peer, 205 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 206 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { 207 peer_ = peer; 208 peer_task_runner_ = peer_task_runner; 209 peer_lifetime_flag_ = peer_lifetime_flag; 210 } 211 212 bool ByteStreamWriterImpl::Write( 213 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { 214 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 215 216 // Check overflow. 217 // 218 // TODO(tyoshino): Discuss with content/browser/download developer and if 219 // they're fine with, set smaller limit and make it configurable. 220 size_t space_limit = std::numeric_limits<size_t>::max() - 221 GetTotalBufferedBytes(); 222 if (byte_count > space_limit) { 223 // TODO(tyoshino): Tell the user that Write() failed. 224 // Ignore input. 225 return false; 226 } 227 228 input_contents_.push_back(std::make_pair(buffer, byte_count)); 229 input_contents_size_ += byte_count; 230 231 // Arbitrarily, we buffer to a third of the total size before sending. 232 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) 233 PostToPeer(false, 0); 234 235 return GetTotalBufferedBytes() <= total_buffer_size_; 236 } 237 238 void ByteStreamWriterImpl::Flush() { 239 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 240 if (input_contents_size_ > 0) 241 PostToPeer(false, 0); 242 } 243 244 void ByteStreamWriterImpl::Close(int status) { 245 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 246 PostToPeer(true, status); 247 } 248 249 void ByteStreamWriterImpl::RegisterCallback( 250 const base::Closure& source_callback) { 251 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 252 space_available_callback_ = source_callback; 253 } 254 255 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const { 256 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 257 // This sum doesn't overflow since Write() fails if this sum is going to 258 // overflow. 259 return input_contents_size_ + output_size_used_; 260 } 261 262 // static 263 void ByteStreamWriterImpl::UpdateWindow( 264 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target, 265 size_t bytes_consumed) { 266 // If the target object isn't alive anymore, we do nothing. 267 if (!lifetime_flag->is_alive) return; 268 269 target->UpdateWindowInternal(bytes_consumed); 270 } 271 272 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) { 273 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 274 275 bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_; 276 277 DCHECK_GE(output_size_used_, bytes_consumed); 278 output_size_used_ -= bytes_consumed; 279 280 // Callback if we were above the limit and we're now <= to it. 281 bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_; 282 283 if (no_longer_above_limit && was_above_limit && 284 !space_available_callback_.is_null()) 285 space_available_callback_.Run(); 286 } 287 288 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { 289 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 290 // Valid contexts in which to call. 291 DCHECK(complete || 0 != input_contents_size_); 292 293 scoped_ptr<ContentVector> transfer_buffer; 294 size_t buffer_size = 0; 295 if (0 != input_contents_size_) { 296 transfer_buffer.reset(new ContentVector); 297 transfer_buffer->swap(input_contents_); 298 buffer_size = input_contents_size_; 299 output_size_used_ += input_contents_size_; 300 input_contents_size_ = 0; 301 } 302 peer_task_runner_->PostTask( 303 FROM_HERE, base::Bind( 304 &ByteStreamReaderImpl::TransferData, 305 peer_lifetime_flag_, 306 peer_, 307 base::Passed(&transfer_buffer), 308 buffer_size, 309 complete, 310 status)); 311 } 312 313 ByteStreamReaderImpl::ByteStreamReaderImpl( 314 scoped_refptr<base::SequencedTaskRunner> task_runner, 315 scoped_refptr<LifetimeFlag> lifetime_flag, 316 size_t buffer_size) 317 : total_buffer_size_(buffer_size), 318 my_task_runner_(task_runner), 319 my_lifetime_flag_(lifetime_flag), 320 received_status_(false), 321 status_(0), 322 unreported_consumed_bytes_(0), 323 peer_(NULL) { 324 DCHECK(my_lifetime_flag_.get()); 325 my_lifetime_flag_->is_alive = true; 326 } 327 328 ByteStreamReaderImpl::~ByteStreamReaderImpl() { 329 // No RunsTasksOnCurrentThread() check to allow deleting a created writer 330 // before we start using it. Once started, should be deleted on the specified 331 // task runner. 332 my_lifetime_flag_->is_alive = false; 333 } 334 335 void ByteStreamReaderImpl::SetPeer( 336 ByteStreamWriterImpl* peer, 337 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 338 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { 339 peer_ = peer; 340 peer_task_runner_ = peer_task_runner; 341 peer_lifetime_flag_ = peer_lifetime_flag; 342 } 343 344 ByteStreamReaderImpl::StreamState 345 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data, 346 size_t* length) { 347 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 348 349 if (available_contents_.size()) { 350 *data = available_contents_.front().first; 351 *length = available_contents_.front().second; 352 available_contents_.pop_front(); 353 unreported_consumed_bytes_ += *length; 354 355 MaybeUpdateInput(); 356 return STREAM_HAS_DATA; 357 } 358 if (received_status_) { 359 return STREAM_COMPLETE; 360 } 361 return STREAM_EMPTY; 362 } 363 364 int ByteStreamReaderImpl::GetStatus() const { 365 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 366 DCHECK(received_status_); 367 return status_; 368 } 369 370 void ByteStreamReaderImpl::RegisterCallback( 371 const base::Closure& sink_callback) { 372 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 373 374 data_available_callback_ = sink_callback; 375 } 376 377 // static 378 void ByteStreamReaderImpl::TransferData( 379 scoped_refptr<LifetimeFlag> object_lifetime_flag, 380 ByteStreamReaderImpl* target, 381 scoped_ptr<ContentVector> transfer_buffer, 382 size_t buffer_size, 383 bool source_complete, 384 int status) { 385 // If our target is no longer alive, do nothing. 386 if (!object_lifetime_flag->is_alive) return; 387 388 target->TransferDataInternal( 389 transfer_buffer.Pass(), buffer_size, source_complete, status); 390 } 391 392 void ByteStreamReaderImpl::TransferDataInternal( 393 scoped_ptr<ContentVector> transfer_buffer, 394 size_t buffer_size, 395 bool source_complete, 396 int status) { 397 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 398 399 bool was_empty = available_contents_.empty(); 400 401 if (transfer_buffer) { 402 available_contents_.insert(available_contents_.end(), 403 transfer_buffer->begin(), 404 transfer_buffer->end()); 405 } 406 407 if (source_complete) { 408 received_status_ = true; 409 status_ = status; 410 } 411 412 // Callback on transition from empty to non-empty, or 413 // source complete. 414 if (((was_empty && !available_contents_.empty()) || 415 source_complete) && 416 !data_available_callback_.is_null()) 417 data_available_callback_.Run(); 418 } 419 420 // Decide whether or not to send the input a window update. 421 // Currently we do that whenever we've got unreported consumption 422 // greater than 1/3 of total size. 423 void ByteStreamReaderImpl::MaybeUpdateInput() { 424 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 425 426 if (unreported_consumed_bytes_ <= 427 total_buffer_size_ / kFractionReadBeforeWindowUpdate) 428 return; 429 430 peer_task_runner_->PostTask( 431 FROM_HERE, base::Bind( 432 &ByteStreamWriterImpl::UpdateWindow, 433 peer_lifetime_flag_, 434 peer_, 435 unreported_consumed_bytes_)); 436 unreported_consumed_bytes_ = 0; 437 } 438 439 } // namespace 440 441 const int ByteStreamWriter::kFractionBufferBeforeSending = 3; 442 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3; 443 444 ByteStreamReader::~ByteStreamReader() { } 445 446 ByteStreamWriter::~ByteStreamWriter() { } 447 448 void CreateByteStream( 449 scoped_refptr<base::SequencedTaskRunner> input_task_runner, 450 scoped_refptr<base::SequencedTaskRunner> output_task_runner, 451 size_t buffer_size, 452 scoped_ptr<ByteStreamWriter>* input, 453 scoped_ptr<ByteStreamReader>* output) { 454 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag()); 455 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag()); 456 457 ByteStreamWriterImpl* in = new ByteStreamWriterImpl( 458 input_task_runner, input_flag, buffer_size); 459 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( 460 output_task_runner, output_flag, buffer_size); 461 462 in->SetPeer(out, output_task_runner, output_flag); 463 out->SetPeer(in, input_task_runner, input_flag); 464 input->reset(in); 465 output->reset(out); 466 } 467 468 } // namespace content 469