Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_write_queue.h"
      6 
      7 #include <cstddef>
      8 #include <cstring>
      9 #include <string>
     10 
     11 #include "base/basictypes.h"
     12 #include "base/memory/ref_counted.h"
     13 #include "base/memory/scoped_ptr.h"
     14 #include "base/strings/string_number_conversions.h"
     15 #include "net/base/net_log.h"
     16 #include "net/base/request_priority.h"
     17 #include "net/spdy/spdy_buffer_producer.h"
     18 #include "net/spdy/spdy_stream.h"
     19 #include "testing/gtest/include/gtest/gtest.h"
     20 #include "url/gurl.h"
     21 
     22 namespace net {
     23 
     24 namespace {
     25 
     26 using std::string;
     27 
     28 const char kOriginal[] = "original";
     29 const char kRequeued[] = "requeued";
     30 
     31 class SpdyWriteQueueTest : public ::testing::Test {};
     32 
     33 // Makes a SpdyFrameProducer producing a frame with the data in the
     34 // given string.
     35 scoped_ptr<SpdyBufferProducer> StringToProducer(const std::string& s) {
     36   scoped_ptr<char[]> data(new char[s.size()]);
     37   std::memcpy(data.get(), s.data(), s.size());
     38   return scoped_ptr<SpdyBufferProducer>(
     39       new SimpleBufferProducer(
     40           scoped_ptr<SpdyBuffer>(
     41               new SpdyBuffer(
     42                   scoped_ptr<SpdyFrame>(
     43                       new SpdyFrame(data.release(), s.size(), true))))));
     44 }
     45 
     46 // Makes a SpdyBufferProducer producing a frame with the data in the
     47 // given int (converted to a string).
     48 scoped_ptr<SpdyBufferProducer> IntToProducer(int i) {
     49   return StringToProducer(base::IntToString(i));
     50 }
     51 
     52 // Producer whose produced buffer will enqueue yet another buffer into the
     53 // SpdyWriteQueue upon destruction.
     54 class RequeingBufferProducer : public SpdyBufferProducer {
     55  public:
     56   RequeingBufferProducer(SpdyWriteQueue* queue) {
     57     buffer_.reset(new SpdyBuffer(kOriginal, arraysize(kOriginal)));
     58     buffer_->AddConsumeCallback(
     59         base::Bind(RequeingBufferProducer::ConsumeCallback, queue));
     60   }
     61 
     62   virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
     63     return buffer_.Pass();
     64   }
     65 
     66   static void ConsumeCallback(SpdyWriteQueue* queue,
     67                               size_t size,
     68                               SpdyBuffer::ConsumeSource source) {
     69     scoped_ptr<SpdyBufferProducer> producer(
     70         new SimpleBufferProducer(scoped_ptr<SpdyBuffer>(
     71             new SpdyBuffer(kRequeued, arraysize(kRequeued)))));
     72 
     73     queue->Enqueue(
     74         MEDIUM, RST_STREAM, producer.Pass(), base::WeakPtr<SpdyStream>());
     75   }
     76 
     77  private:
     78   scoped_ptr<SpdyBuffer> buffer_;
     79 };
     80 
     81 // Produces a frame with the given producer and returns a copy of its
     82 // data as a string.
     83 std::string ProducerToString(scoped_ptr<SpdyBufferProducer> producer) {
     84   scoped_ptr<SpdyBuffer> buffer = producer->ProduceBuffer();
     85   return std::string(buffer->GetRemainingData(), buffer->GetRemainingSize());
     86 }
     87 
     88 // Produces a frame with the given producer and returns a copy of its
     89 // data as an int (converted from a string).
     90 int ProducerToInt(scoped_ptr<SpdyBufferProducer> producer) {
     91   int i = 0;
     92   EXPECT_TRUE(base::StringToInt(ProducerToString(producer.Pass()), &i));
     93   return i;
     94 }
     95 
     96 // Makes a SpdyStream with the given priority and a NULL SpdySession
     97 // -- be careful to not call any functions that expect the session to
     98 // be there.
     99 SpdyStream* MakeTestStream(RequestPriority priority) {
    100   return new SpdyStream(
    101       SPDY_BIDIRECTIONAL_STREAM, base::WeakPtr<SpdySession>(),
    102       GURL(), priority, 0, 0, BoundNetLog());
    103 }
    104 
    105 // Add some frame producers of different priority. The producers
    106 // should be dequeued in priority order with their associated stream.
    107 TEST_F(SpdyWriteQueueTest, DequeuesByPriority) {
    108   SpdyWriteQueue write_queue;
    109 
    110   scoped_ptr<SpdyBufferProducer> producer_low = StringToProducer("LOW");
    111   scoped_ptr<SpdyBufferProducer> producer_medium = StringToProducer("MEDIUM");
    112   scoped_ptr<SpdyBufferProducer> producer_highest = StringToProducer("HIGHEST");
    113 
    114   scoped_ptr<SpdyStream> stream_medium(MakeTestStream(MEDIUM));
    115   scoped_ptr<SpdyStream> stream_highest(MakeTestStream(HIGHEST));
    116 
    117   // A NULL stream should still work.
    118   write_queue.Enqueue(
    119       LOW, SYN_STREAM, producer_low.Pass(), base::WeakPtr<SpdyStream>());
    120   write_queue.Enqueue(
    121       MEDIUM, SYN_REPLY, producer_medium.Pass(), stream_medium->GetWeakPtr());
    122   write_queue.Enqueue(
    123       HIGHEST, RST_STREAM, producer_highest.Pass(),
    124       stream_highest->GetWeakPtr());
    125 
    126   SpdyFrameType frame_type = DATA;
    127   scoped_ptr<SpdyBufferProducer> frame_producer;
    128   base::WeakPtr<SpdyStream> stream;
    129   ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    130   EXPECT_EQ(RST_STREAM, frame_type);
    131   EXPECT_EQ("HIGHEST", ProducerToString(frame_producer.Pass()));
    132   EXPECT_EQ(stream_highest, stream.get());
    133 
    134   ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    135   EXPECT_EQ(SYN_REPLY, frame_type);
    136   EXPECT_EQ("MEDIUM", ProducerToString(frame_producer.Pass()));
    137   EXPECT_EQ(stream_medium, stream.get());
    138 
    139   ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    140   EXPECT_EQ(SYN_STREAM, frame_type);
    141   EXPECT_EQ("LOW", ProducerToString(frame_producer.Pass()));
    142   EXPECT_EQ(NULL, stream.get());
    143 
    144   EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    145 }
    146 
    147 // Add some frame producers with the same priority. The producers
    148 // should be dequeued in FIFO order with their associated stream.
    149 TEST_F(SpdyWriteQueueTest, DequeuesFIFO) {
    150   SpdyWriteQueue write_queue;
    151 
    152   scoped_ptr<SpdyBufferProducer> producer1 = IntToProducer(1);
    153   scoped_ptr<SpdyBufferProducer> producer2 = IntToProducer(2);
    154   scoped_ptr<SpdyBufferProducer> producer3 = IntToProducer(3);
    155 
    156   scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY));
    157   scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY));
    158   scoped_ptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY));
    159 
    160   write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, producer1.Pass(),
    161                       stream1->GetWeakPtr());
    162   write_queue.Enqueue(DEFAULT_PRIORITY, SYN_REPLY, producer2.Pass(),
    163                       stream2->GetWeakPtr());
    164   write_queue.Enqueue(DEFAULT_PRIORITY, RST_STREAM, producer3.Pass(),
    165                       stream3->GetWeakPtr());
    166 
    167   SpdyFrameType frame_type = DATA;
    168   scoped_ptr<SpdyBufferProducer> frame_producer;
    169   base::WeakPtr<SpdyStream> stream;
    170   ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    171   EXPECT_EQ(SYN_STREAM, frame_type);
    172   EXPECT_EQ(1, ProducerToInt(frame_producer.Pass()));
    173   EXPECT_EQ(stream1, stream.get());
    174 
    175   ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    176   EXPECT_EQ(SYN_REPLY, frame_type);
    177   EXPECT_EQ(2, ProducerToInt(frame_producer.Pass()));
    178   EXPECT_EQ(stream2, stream.get());
    179 
    180   ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    181   EXPECT_EQ(RST_STREAM, frame_type);
    182   EXPECT_EQ(3, ProducerToInt(frame_producer.Pass()));
    183   EXPECT_EQ(stream3, stream.get());
    184 
    185   EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    186 }
    187 
    188 // Enqueue a bunch of writes and then call
    189 // RemovePendingWritesForStream() on one of the streams. No dequeued
    190 // write should be for that stream.
    191 TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) {
    192   SpdyWriteQueue write_queue;
    193 
    194   scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY));
    195   scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY));
    196 
    197   for (int i = 0; i < 100; ++i) {
    198     base::WeakPtr<SpdyStream> stream =
    199         (((i % 3) == 0) ? stream1 : stream2)->GetWeakPtr();
    200     write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), stream);
    201   }
    202 
    203   write_queue.RemovePendingWritesForStream(stream2->GetWeakPtr());
    204 
    205   for (int i = 0; i < 100; i += 3) {
    206     SpdyFrameType frame_type = DATA;
    207     scoped_ptr<SpdyBufferProducer> frame_producer;
    208     base::WeakPtr<SpdyStream> stream;
    209     ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    210     EXPECT_EQ(SYN_STREAM, frame_type);
    211     EXPECT_EQ(i, ProducerToInt(frame_producer.Pass()));
    212     EXPECT_EQ(stream1, stream.get());
    213   }
    214 
    215   SpdyFrameType frame_type = DATA;
    216   scoped_ptr<SpdyBufferProducer> frame_producer;
    217   base::WeakPtr<SpdyStream> stream;
    218   EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    219 }
    220 
    221 // Enqueue a bunch of writes and then call
    222 // RemovePendingWritesForStreamsAfter(). No dequeued write should be for
    223 // those streams without a stream id, or with a stream_id after that
    224 // argument.
    225 TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStreamsAfter) {
    226   SpdyWriteQueue write_queue;
    227 
    228   scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY));
    229   stream1->set_stream_id(1);
    230   scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY));
    231   stream2->set_stream_id(3);
    232   scoped_ptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY));
    233   stream3->set_stream_id(5);
    234   // No stream id assigned.
    235   scoped_ptr<SpdyStream> stream4(MakeTestStream(DEFAULT_PRIORITY));
    236   base::WeakPtr<SpdyStream> streams[] = {
    237     stream1->GetWeakPtr(), stream2->GetWeakPtr(),
    238     stream3->GetWeakPtr(), stream4->GetWeakPtr()
    239   };
    240 
    241   for (int i = 0; i < 100; ++i) {
    242     write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i),
    243                         streams[i % arraysize(streams)]);
    244   }
    245 
    246   write_queue.RemovePendingWritesForStreamsAfter(stream1->stream_id());
    247 
    248   for (int i = 0; i < 100; i += arraysize(streams)) {
    249     SpdyFrameType frame_type = DATA;
    250     scoped_ptr<SpdyBufferProducer> frame_producer;
    251     base::WeakPtr<SpdyStream> stream;
    252     ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream))
    253         << "Unable to Dequeue i: " << i;
    254     EXPECT_EQ(SYN_STREAM, frame_type);
    255     EXPECT_EQ(i, ProducerToInt(frame_producer.Pass()));
    256     EXPECT_EQ(stream1, stream.get());
    257   }
    258 
    259   SpdyFrameType frame_type = DATA;
    260   scoped_ptr<SpdyBufferProducer> frame_producer;
    261   base::WeakPtr<SpdyStream> stream;
    262   EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    263 }
    264 
    265 // Enqueue a bunch of writes and then call Clear(). The write queue
    266 // should clean up the memory properly, and Dequeue() should return
    267 // false.
    268 TEST_F(SpdyWriteQueueTest, Clear) {
    269   SpdyWriteQueue write_queue;
    270 
    271   for (int i = 0; i < 100; ++i) {
    272     write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i),
    273                         base::WeakPtr<SpdyStream>());
    274   }
    275 
    276   write_queue.Clear();
    277 
    278   SpdyFrameType frame_type = DATA;
    279   scoped_ptr<SpdyBufferProducer> frame_producer;
    280   base::WeakPtr<SpdyStream> stream;
    281   EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
    282 }
    283 
    284 TEST_F(SpdyWriteQueueTest, RequeingProducerWithoutReentrance) {
    285   SpdyWriteQueue queue;
    286   queue.Enqueue(
    287       DEFAULT_PRIORITY,
    288       SYN_STREAM,
    289       scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)),
    290       base::WeakPtr<SpdyStream>());
    291   {
    292     SpdyFrameType frame_type;
    293     scoped_ptr<SpdyBufferProducer> producer;
    294     base::WeakPtr<SpdyStream> stream;
    295 
    296     EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &stream));
    297     EXPECT_TRUE(queue.IsEmpty());
    298     EXPECT_EQ(string(kOriginal), producer->ProduceBuffer()->GetRemainingData());
    299   }
    300   // |producer| was destroyed, and a buffer is re-queued.
    301   EXPECT_FALSE(queue.IsEmpty());
    302 
    303   SpdyFrameType frame_type;
    304   scoped_ptr<SpdyBufferProducer> producer;
    305   base::WeakPtr<SpdyStream> stream;
    306 
    307   EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &stream));
    308   EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData());
    309 }
    310 
    311 TEST_F(SpdyWriteQueueTest, ReentranceOnClear) {
    312   SpdyWriteQueue queue;
    313   queue.Enqueue(
    314       DEFAULT_PRIORITY,
    315       SYN_STREAM,
    316       scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)),
    317       base::WeakPtr<SpdyStream>());
    318 
    319   queue.Clear();
    320   EXPECT_FALSE(queue.IsEmpty());
    321 
    322   SpdyFrameType frame_type;
    323   scoped_ptr<SpdyBufferProducer> producer;
    324   base::WeakPtr<SpdyStream> stream;
    325 
    326   EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &stream));
    327   EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData());
    328 }
    329 
    330 TEST_F(SpdyWriteQueueTest, ReentranceOnRemovePendingWritesAfter) {
    331   scoped_ptr<SpdyStream> stream(MakeTestStream(DEFAULT_PRIORITY));
    332   stream->set_stream_id(2);
    333 
    334   SpdyWriteQueue queue;
    335   queue.Enqueue(
    336       DEFAULT_PRIORITY,
    337       SYN_STREAM,
    338       scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)),
    339       stream->GetWeakPtr());
    340 
    341   queue.RemovePendingWritesForStreamsAfter(1);
    342   EXPECT_FALSE(queue.IsEmpty());
    343 
    344   SpdyFrameType frame_type;
    345   scoped_ptr<SpdyBufferProducer> producer;
    346   base::WeakPtr<SpdyStream> weak_stream;
    347 
    348   EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &weak_stream));
    349   EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData());
    350 }
    351 
    352 TEST_F(SpdyWriteQueueTest, ReentranceOnRemovePendingWritesForStream) {
    353   scoped_ptr<SpdyStream> stream(MakeTestStream(DEFAULT_PRIORITY));
    354   stream->set_stream_id(2);
    355 
    356   SpdyWriteQueue queue;
    357   queue.Enqueue(
    358       DEFAULT_PRIORITY,
    359       SYN_STREAM,
    360       scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)),
    361       stream->GetWeakPtr());
    362 
    363   queue.RemovePendingWritesForStream(stream->GetWeakPtr());
    364   EXPECT_FALSE(queue.IsEmpty());
    365 
    366   SpdyFrameType frame_type;
    367   scoped_ptr<SpdyBufferProducer> producer;
    368   base::WeakPtr<SpdyStream> weak_stream;
    369 
    370   EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &weak_stream));
    371   EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData());
    372 }
    373 
    374 }  // namespace
    375 
    376 }  // namespace net
    377