1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/spdy/spdy_write_queue.h" 6 7 #include <cstddef> 8 #include <cstring> 9 #include <string> 10 11 #include "base/basictypes.h" 12 #include "base/memory/ref_counted.h" 13 #include "base/memory/scoped_ptr.h" 14 #include "base/strings/string_number_conversions.h" 15 #include "net/base/net_log.h" 16 #include "net/base/request_priority.h" 17 #include "net/spdy/spdy_buffer_producer.h" 18 #include "net/spdy/spdy_stream.h" 19 #include "testing/gtest/include/gtest/gtest.h" 20 #include "url/gurl.h" 21 22 namespace net { 23 24 namespace { 25 26 using std::string; 27 28 const char kOriginal[] = "original"; 29 const char kRequeued[] = "requeued"; 30 31 class SpdyWriteQueueTest : public ::testing::Test {}; 32 33 // Makes a SpdyFrameProducer producing a frame with the data in the 34 // given string. 35 scoped_ptr<SpdyBufferProducer> StringToProducer(const std::string& s) { 36 scoped_ptr<char[]> data(new char[s.size()]); 37 std::memcpy(data.get(), s.data(), s.size()); 38 return scoped_ptr<SpdyBufferProducer>( 39 new SimpleBufferProducer( 40 scoped_ptr<SpdyBuffer>( 41 new SpdyBuffer( 42 scoped_ptr<SpdyFrame>( 43 new SpdyFrame(data.release(), s.size(), true)))))); 44 } 45 46 // Makes a SpdyBufferProducer producing a frame with the data in the 47 // given int (converted to a string). 48 scoped_ptr<SpdyBufferProducer> IntToProducer(int i) { 49 return StringToProducer(base::IntToString(i)); 50 } 51 52 // Producer whose produced buffer will enqueue yet another buffer into the 53 // SpdyWriteQueue upon destruction. 54 class RequeingBufferProducer : public SpdyBufferProducer { 55 public: 56 RequeingBufferProducer(SpdyWriteQueue* queue) { 57 buffer_.reset(new SpdyBuffer(kOriginal, arraysize(kOriginal))); 58 buffer_->AddConsumeCallback( 59 base::Bind(RequeingBufferProducer::ConsumeCallback, queue)); 60 } 61 62 virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE { 63 return buffer_.Pass(); 64 } 65 66 static void ConsumeCallback(SpdyWriteQueue* queue, 67 size_t size, 68 SpdyBuffer::ConsumeSource source) { 69 scoped_ptr<SpdyBufferProducer> producer( 70 new SimpleBufferProducer(scoped_ptr<SpdyBuffer>( 71 new SpdyBuffer(kRequeued, arraysize(kRequeued))))); 72 73 queue->Enqueue( 74 MEDIUM, RST_STREAM, producer.Pass(), base::WeakPtr<SpdyStream>()); 75 } 76 77 private: 78 scoped_ptr<SpdyBuffer> buffer_; 79 }; 80 81 // Produces a frame with the given producer and returns a copy of its 82 // data as a string. 83 std::string ProducerToString(scoped_ptr<SpdyBufferProducer> producer) { 84 scoped_ptr<SpdyBuffer> buffer = producer->ProduceBuffer(); 85 return std::string(buffer->GetRemainingData(), buffer->GetRemainingSize()); 86 } 87 88 // Produces a frame with the given producer and returns a copy of its 89 // data as an int (converted from a string). 90 int ProducerToInt(scoped_ptr<SpdyBufferProducer> producer) { 91 int i = 0; 92 EXPECT_TRUE(base::StringToInt(ProducerToString(producer.Pass()), &i)); 93 return i; 94 } 95 96 // Makes a SpdyStream with the given priority and a NULL SpdySession 97 // -- be careful to not call any functions that expect the session to 98 // be there. 99 SpdyStream* MakeTestStream(RequestPriority priority) { 100 return new SpdyStream( 101 SPDY_BIDIRECTIONAL_STREAM, base::WeakPtr<SpdySession>(), 102 GURL(), priority, 0, 0, BoundNetLog()); 103 } 104 105 // Add some frame producers of different priority. The producers 106 // should be dequeued in priority order with their associated stream. 107 TEST_F(SpdyWriteQueueTest, DequeuesByPriority) { 108 SpdyWriteQueue write_queue; 109 110 scoped_ptr<SpdyBufferProducer> producer_low = StringToProducer("LOW"); 111 scoped_ptr<SpdyBufferProducer> producer_medium = StringToProducer("MEDIUM"); 112 scoped_ptr<SpdyBufferProducer> producer_highest = StringToProducer("HIGHEST"); 113 114 scoped_ptr<SpdyStream> stream_medium(MakeTestStream(MEDIUM)); 115 scoped_ptr<SpdyStream> stream_highest(MakeTestStream(HIGHEST)); 116 117 // A NULL stream should still work. 118 write_queue.Enqueue( 119 LOW, SYN_STREAM, producer_low.Pass(), base::WeakPtr<SpdyStream>()); 120 write_queue.Enqueue( 121 MEDIUM, SYN_REPLY, producer_medium.Pass(), stream_medium->GetWeakPtr()); 122 write_queue.Enqueue( 123 HIGHEST, RST_STREAM, producer_highest.Pass(), 124 stream_highest->GetWeakPtr()); 125 126 SpdyFrameType frame_type = DATA; 127 scoped_ptr<SpdyBufferProducer> frame_producer; 128 base::WeakPtr<SpdyStream> stream; 129 ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 130 EXPECT_EQ(RST_STREAM, frame_type); 131 EXPECT_EQ("HIGHEST", ProducerToString(frame_producer.Pass())); 132 EXPECT_EQ(stream_highest, stream.get()); 133 134 ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 135 EXPECT_EQ(SYN_REPLY, frame_type); 136 EXPECT_EQ("MEDIUM", ProducerToString(frame_producer.Pass())); 137 EXPECT_EQ(stream_medium, stream.get()); 138 139 ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 140 EXPECT_EQ(SYN_STREAM, frame_type); 141 EXPECT_EQ("LOW", ProducerToString(frame_producer.Pass())); 142 EXPECT_EQ(NULL, stream.get()); 143 144 EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 145 } 146 147 // Add some frame producers with the same priority. The producers 148 // should be dequeued in FIFO order with their associated stream. 149 TEST_F(SpdyWriteQueueTest, DequeuesFIFO) { 150 SpdyWriteQueue write_queue; 151 152 scoped_ptr<SpdyBufferProducer> producer1 = IntToProducer(1); 153 scoped_ptr<SpdyBufferProducer> producer2 = IntToProducer(2); 154 scoped_ptr<SpdyBufferProducer> producer3 = IntToProducer(3); 155 156 scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY)); 157 scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); 158 scoped_ptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY)); 159 160 write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, producer1.Pass(), 161 stream1->GetWeakPtr()); 162 write_queue.Enqueue(DEFAULT_PRIORITY, SYN_REPLY, producer2.Pass(), 163 stream2->GetWeakPtr()); 164 write_queue.Enqueue(DEFAULT_PRIORITY, RST_STREAM, producer3.Pass(), 165 stream3->GetWeakPtr()); 166 167 SpdyFrameType frame_type = DATA; 168 scoped_ptr<SpdyBufferProducer> frame_producer; 169 base::WeakPtr<SpdyStream> stream; 170 ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 171 EXPECT_EQ(SYN_STREAM, frame_type); 172 EXPECT_EQ(1, ProducerToInt(frame_producer.Pass())); 173 EXPECT_EQ(stream1, stream.get()); 174 175 ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 176 EXPECT_EQ(SYN_REPLY, frame_type); 177 EXPECT_EQ(2, ProducerToInt(frame_producer.Pass())); 178 EXPECT_EQ(stream2, stream.get()); 179 180 ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 181 EXPECT_EQ(RST_STREAM, frame_type); 182 EXPECT_EQ(3, ProducerToInt(frame_producer.Pass())); 183 EXPECT_EQ(stream3, stream.get()); 184 185 EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 186 } 187 188 // Enqueue a bunch of writes and then call 189 // RemovePendingWritesForStream() on one of the streams. No dequeued 190 // write should be for that stream. 191 TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) { 192 SpdyWriteQueue write_queue; 193 194 scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY)); 195 scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); 196 197 for (int i = 0; i < 100; ++i) { 198 base::WeakPtr<SpdyStream> stream = 199 (((i % 3) == 0) ? stream1 : stream2)->GetWeakPtr(); 200 write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), stream); 201 } 202 203 write_queue.RemovePendingWritesForStream(stream2->GetWeakPtr()); 204 205 for (int i = 0; i < 100; i += 3) { 206 SpdyFrameType frame_type = DATA; 207 scoped_ptr<SpdyBufferProducer> frame_producer; 208 base::WeakPtr<SpdyStream> stream; 209 ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 210 EXPECT_EQ(SYN_STREAM, frame_type); 211 EXPECT_EQ(i, ProducerToInt(frame_producer.Pass())); 212 EXPECT_EQ(stream1, stream.get()); 213 } 214 215 SpdyFrameType frame_type = DATA; 216 scoped_ptr<SpdyBufferProducer> frame_producer; 217 base::WeakPtr<SpdyStream> stream; 218 EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 219 } 220 221 // Enqueue a bunch of writes and then call 222 // RemovePendingWritesForStreamsAfter(). No dequeued write should be for 223 // those streams without a stream id, or with a stream_id after that 224 // argument. 225 TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStreamsAfter) { 226 SpdyWriteQueue write_queue; 227 228 scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY)); 229 stream1->set_stream_id(1); 230 scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); 231 stream2->set_stream_id(3); 232 scoped_ptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY)); 233 stream3->set_stream_id(5); 234 // No stream id assigned. 235 scoped_ptr<SpdyStream> stream4(MakeTestStream(DEFAULT_PRIORITY)); 236 base::WeakPtr<SpdyStream> streams[] = { 237 stream1->GetWeakPtr(), stream2->GetWeakPtr(), 238 stream3->GetWeakPtr(), stream4->GetWeakPtr() 239 }; 240 241 for (int i = 0; i < 100; ++i) { 242 write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), 243 streams[i % arraysize(streams)]); 244 } 245 246 write_queue.RemovePendingWritesForStreamsAfter(stream1->stream_id()); 247 248 for (int i = 0; i < 100; i += arraysize(streams)) { 249 SpdyFrameType frame_type = DATA; 250 scoped_ptr<SpdyBufferProducer> frame_producer; 251 base::WeakPtr<SpdyStream> stream; 252 ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)) 253 << "Unable to Dequeue i: " << i; 254 EXPECT_EQ(SYN_STREAM, frame_type); 255 EXPECT_EQ(i, ProducerToInt(frame_producer.Pass())); 256 EXPECT_EQ(stream1, stream.get()); 257 } 258 259 SpdyFrameType frame_type = DATA; 260 scoped_ptr<SpdyBufferProducer> frame_producer; 261 base::WeakPtr<SpdyStream> stream; 262 EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 263 } 264 265 // Enqueue a bunch of writes and then call Clear(). The write queue 266 // should clean up the memory properly, and Dequeue() should return 267 // false. 268 TEST_F(SpdyWriteQueueTest, Clear) { 269 SpdyWriteQueue write_queue; 270 271 for (int i = 0; i < 100; ++i) { 272 write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), 273 base::WeakPtr<SpdyStream>()); 274 } 275 276 write_queue.Clear(); 277 278 SpdyFrameType frame_type = DATA; 279 scoped_ptr<SpdyBufferProducer> frame_producer; 280 base::WeakPtr<SpdyStream> stream; 281 EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); 282 } 283 284 TEST_F(SpdyWriteQueueTest, RequeingProducerWithoutReentrance) { 285 SpdyWriteQueue queue; 286 queue.Enqueue( 287 DEFAULT_PRIORITY, 288 SYN_STREAM, 289 scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)), 290 base::WeakPtr<SpdyStream>()); 291 { 292 SpdyFrameType frame_type; 293 scoped_ptr<SpdyBufferProducer> producer; 294 base::WeakPtr<SpdyStream> stream; 295 296 EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &stream)); 297 EXPECT_TRUE(queue.IsEmpty()); 298 EXPECT_EQ(string(kOriginal), producer->ProduceBuffer()->GetRemainingData()); 299 } 300 // |producer| was destroyed, and a buffer is re-queued. 301 EXPECT_FALSE(queue.IsEmpty()); 302 303 SpdyFrameType frame_type; 304 scoped_ptr<SpdyBufferProducer> producer; 305 base::WeakPtr<SpdyStream> stream; 306 307 EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &stream)); 308 EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData()); 309 } 310 311 TEST_F(SpdyWriteQueueTest, ReentranceOnClear) { 312 SpdyWriteQueue queue; 313 queue.Enqueue( 314 DEFAULT_PRIORITY, 315 SYN_STREAM, 316 scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)), 317 base::WeakPtr<SpdyStream>()); 318 319 queue.Clear(); 320 EXPECT_FALSE(queue.IsEmpty()); 321 322 SpdyFrameType frame_type; 323 scoped_ptr<SpdyBufferProducer> producer; 324 base::WeakPtr<SpdyStream> stream; 325 326 EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &stream)); 327 EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData()); 328 } 329 330 TEST_F(SpdyWriteQueueTest, ReentranceOnRemovePendingWritesAfter) { 331 scoped_ptr<SpdyStream> stream(MakeTestStream(DEFAULT_PRIORITY)); 332 stream->set_stream_id(2); 333 334 SpdyWriteQueue queue; 335 queue.Enqueue( 336 DEFAULT_PRIORITY, 337 SYN_STREAM, 338 scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)), 339 stream->GetWeakPtr()); 340 341 queue.RemovePendingWritesForStreamsAfter(1); 342 EXPECT_FALSE(queue.IsEmpty()); 343 344 SpdyFrameType frame_type; 345 scoped_ptr<SpdyBufferProducer> producer; 346 base::WeakPtr<SpdyStream> weak_stream; 347 348 EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &weak_stream)); 349 EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData()); 350 } 351 352 TEST_F(SpdyWriteQueueTest, ReentranceOnRemovePendingWritesForStream) { 353 scoped_ptr<SpdyStream> stream(MakeTestStream(DEFAULT_PRIORITY)); 354 stream->set_stream_id(2); 355 356 SpdyWriteQueue queue; 357 queue.Enqueue( 358 DEFAULT_PRIORITY, 359 SYN_STREAM, 360 scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)), 361 stream->GetWeakPtr()); 362 363 queue.RemovePendingWritesForStream(stream->GetWeakPtr()); 364 EXPECT_FALSE(queue.IsEmpty()); 365 366 SpdyFrameType frame_type; 367 scoped_ptr<SpdyBufferProducer> producer; 368 base::WeakPtr<SpdyStream> weak_stream; 369 370 EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &weak_stream)); 371 EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData()); 372 } 373 374 } // namespace 375 376 } // namespace net 377