Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_write_queue.h"
      6 
      7 #include <cstddef>
      8 #include <vector>
      9 
     10 #include "base/logging.h"
     11 #include "base/stl_util.h"
     12 #include "net/spdy/spdy_buffer.h"
     13 #include "net/spdy/spdy_buffer_producer.h"
     14 #include "net/spdy/spdy_stream.h"
     15 
     16 namespace net {
     17 
     18 SpdyWriteQueue::PendingWrite::PendingWrite() : frame_producer(NULL) {}
     19 
     20 SpdyWriteQueue::PendingWrite::PendingWrite(
     21     SpdyFrameType frame_type,
     22     SpdyBufferProducer* frame_producer,
     23     const base::WeakPtr<SpdyStream>& stream)
     24     : frame_type(frame_type),
     25       frame_producer(frame_producer),
     26       stream(stream),
     27       has_stream(stream.get() != NULL) {}
     28 
     29 SpdyWriteQueue::PendingWrite::~PendingWrite() {}
     30 
     31 SpdyWriteQueue::SpdyWriteQueue() : removing_writes_(false) {}
     32 
     33 SpdyWriteQueue::~SpdyWriteQueue() {
     34   Clear();
     35 }
     36 
     37 bool SpdyWriteQueue::IsEmpty() const {
     38   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; i++) {
     39     if (!queue_[i].empty())
     40       return false;
     41   }
     42   return true;
     43 }
     44 
     45 void SpdyWriteQueue::Enqueue(RequestPriority priority,
     46                              SpdyFrameType frame_type,
     47                              scoped_ptr<SpdyBufferProducer> frame_producer,
     48                              const base::WeakPtr<SpdyStream>& stream) {
     49   CHECK(!removing_writes_);
     50   CHECK_GE(priority, MINIMUM_PRIORITY);
     51   CHECK_LE(priority, MAXIMUM_PRIORITY);
     52   if (stream.get())
     53     DCHECK_EQ(stream->priority(), priority);
     54   queue_[priority].push_back(
     55       PendingWrite(frame_type, frame_producer.release(), stream));
     56 }
     57 
     58 bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type,
     59                              scoped_ptr<SpdyBufferProducer>* frame_producer,
     60                              base::WeakPtr<SpdyStream>* stream) {
     61   CHECK(!removing_writes_);
     62   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
     63     if (!queue_[i].empty()) {
     64       PendingWrite pending_write = queue_[i].front();
     65       queue_[i].pop_front();
     66       *frame_type = pending_write.frame_type;
     67       frame_producer->reset(pending_write.frame_producer);
     68       *stream = pending_write.stream;
     69       if (pending_write.has_stream)
     70         DCHECK(stream->get());
     71       return true;
     72     }
     73   }
     74   return false;
     75 }
     76 
     77 void SpdyWriteQueue::RemovePendingWritesForStream(
     78     const base::WeakPtr<SpdyStream>& stream) {
     79   CHECK(!removing_writes_);
     80   removing_writes_ = true;
     81   RequestPriority priority = stream->priority();
     82   CHECK_GE(priority, MINIMUM_PRIORITY);
     83   CHECK_LE(priority, MAXIMUM_PRIORITY);
     84 
     85   DCHECK(stream.get());
     86 #if DCHECK_IS_ON
     87   // |stream| should not have pending writes in a queue not matching
     88   // its priority.
     89   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
     90     if (priority == i)
     91       continue;
     92     for (std::deque<PendingWrite>::const_iterator it = queue_[i].begin();
     93          it != queue_[i].end(); ++it) {
     94       DCHECK_NE(it->stream.get(), stream.get());
     95     }
     96   }
     97 #endif
     98 
     99   // Defer deletion until queue iteration is complete, as
    100   // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
    101   std::vector<SpdyBufferProducer*> erased_buffer_producers;
    102 
    103   // Do the actual deletion and removal, preserving FIFO-ness.
    104   std::deque<PendingWrite>* queue = &queue_[priority];
    105   std::deque<PendingWrite>::iterator out_it = queue->begin();
    106   for (std::deque<PendingWrite>::const_iterator it = queue->begin();
    107        it != queue->end(); ++it) {
    108     if (it->stream.get() == stream.get()) {
    109       erased_buffer_producers.push_back(it->frame_producer);
    110     } else {
    111       *out_it = *it;
    112       ++out_it;
    113     }
    114   }
    115   queue->erase(out_it, queue->end());
    116   removing_writes_ = false;
    117   STLDeleteElements(&erased_buffer_producers);  // Invokes callbacks.
    118 }
    119 
    120 void SpdyWriteQueue::RemovePendingWritesForStreamsAfter(
    121     SpdyStreamId last_good_stream_id) {
    122   CHECK(!removing_writes_);
    123   removing_writes_ = true;
    124   std::vector<SpdyBufferProducer*> erased_buffer_producers;
    125 
    126   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
    127     // Do the actual deletion and removal, preserving FIFO-ness.
    128     std::deque<PendingWrite>* queue = &queue_[i];
    129     std::deque<PendingWrite>::iterator out_it = queue->begin();
    130     for (std::deque<PendingWrite>::const_iterator it = queue->begin();
    131          it != queue->end(); ++it) {
    132       if (it->stream.get() && (it->stream->stream_id() > last_good_stream_id ||
    133                                it->stream->stream_id() == 0)) {
    134         erased_buffer_producers.push_back(it->frame_producer);
    135       } else {
    136         *out_it = *it;
    137         ++out_it;
    138       }
    139     }
    140     queue->erase(out_it, queue->end());
    141   }
    142   removing_writes_ = false;
    143   STLDeleteElements(&erased_buffer_producers);  // Invokes callbacks.
    144 }
    145 
    146 void SpdyWriteQueue::Clear() {
    147   CHECK(!removing_writes_);
    148   removing_writes_ = true;
    149   std::vector<SpdyBufferProducer*> erased_buffer_producers;
    150 
    151   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
    152     for (std::deque<PendingWrite>::iterator it = queue_[i].begin();
    153          it != queue_[i].end(); ++it) {
    154       erased_buffer_producers.push_back(it->frame_producer);
    155     }
    156     queue_[i].clear();
    157   }
    158   removing_writes_ = false;
    159   STLDeleteElements(&erased_buffer_producers);  // Invokes callbacks.
    160 }
    161 
    162 }  // namespace net
    163