Home | History | Annotate | Download | only in streams
      1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/message_loop/message_loop.h"
      6 #include "base/test/test_simple_task_runner.h"
      7 #include "content/browser/streams/stream.h"
      8 #include "content/browser/streams/stream_read_observer.h"
      9 #include "content/browser/streams/stream_registry.h"
     10 #include "content/browser/streams/stream_write_observer.h"
     11 #include "testing/gtest/include/gtest/gtest.h"
     12 
     13 namespace content {
     14 
     15 class StreamTest : public testing::Test {
     16  public:
     17   StreamTest() : producing_seed_key_(0) {}
     18 
     19   virtual void SetUp() OVERRIDE {
     20     registry_.reset(new StreamRegistry());
     21   }
     22 
     23   // Create a new IO buffer of the given |buffer_size| and fill it with random
     24   // data.
     25   scoped_refptr<net::IOBuffer> NewIOBuffer(size_t buffer_size) {
     26     scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(buffer_size));
     27     char *bufferp = buffer->data();
     28     for (size_t i = 0; i < buffer_size; i++)
     29       bufferp[i] = (i + producing_seed_key_) % (1 << sizeof(char));
     30     ++producing_seed_key_;
     31     return buffer;
     32   }
     33 
     34  protected:
     35   base::MessageLoop message_loop_;
     36   scoped_ptr<StreamRegistry> registry_;
     37 
     38  private:
     39   int producing_seed_key_;
     40 };
     41 
     42 class TestStreamReader : public StreamReadObserver {
     43  public:
     44   TestStreamReader() : buffer_(new net::GrowableIOBuffer()), completed_(false) {
     45   }
     46   virtual ~TestStreamReader() {}
     47 
     48   void Read(Stream* stream) {
     49     const size_t kBufferSize = 32768;
     50     scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(kBufferSize));
     51 
     52     int bytes_read = 0;
     53     while (true) {
     54       Stream::StreamState state =
     55           stream->ReadRawData(buffer.get(), kBufferSize, &bytes_read);
     56       switch (state) {
     57         case Stream::STREAM_HAS_DATA:
     58           // TODO(tyoshino): Move these expectations to the beginning of Read()
     59           // method once Stream::Finalize() is fixed.
     60           EXPECT_FALSE(completed_);
     61           break;
     62         case Stream::STREAM_COMPLETE:
     63           completed_ = true;
     64           return;
     65         case Stream::STREAM_EMPTY:
     66           EXPECT_FALSE(completed_);
     67           return;
     68         case Stream::STREAM_ABORTED:
     69           EXPECT_FALSE(completed_);
     70           return;
     71       }
     72       size_t old_capacity = buffer_->capacity();
     73       buffer_->SetCapacity(old_capacity + bytes_read);
     74       memcpy(buffer_->StartOfBuffer() + old_capacity,
     75              buffer->data(), bytes_read);
     76     }
     77   }
     78 
     79   virtual void OnDataAvailable(Stream* stream) OVERRIDE {
     80     Read(stream);
     81   }
     82 
     83   scoped_refptr<net::GrowableIOBuffer> buffer() { return buffer_; }
     84 
     85   bool completed() const {
     86     return completed_;
     87   }
     88 
     89  private:
     90   scoped_refptr<net::GrowableIOBuffer> buffer_;
     91   bool completed_;
     92 };
     93 
     94 class TestStreamWriter : public StreamWriteObserver {
     95  public:
     96   TestStreamWriter() {}
     97   virtual ~TestStreamWriter() {}
     98 
     99   void Write(Stream* stream,
    100              scoped_refptr<net::IOBuffer> buffer,
    101              size_t buffer_size) {
    102     stream->AddData(buffer, buffer_size);
    103   }
    104 
    105   virtual void OnSpaceAvailable(Stream* stream) OVERRIDE {
    106   }
    107 
    108   virtual void OnClose(Stream* stream) OVERRIDE {
    109   }
    110 };
    111 
    112 TEST_F(StreamTest, SetReadObserver) {
    113   TestStreamReader reader;
    114   TestStreamWriter writer;
    115 
    116   GURL url("blob://stream");
    117   scoped_refptr<Stream> stream(
    118       new Stream(registry_.get(), &writer, url));
    119   EXPECT_TRUE(stream->SetReadObserver(&reader));
    120 }
    121 
    122 TEST_F(StreamTest, SetReadObserver_SecondFails) {
    123   TestStreamReader reader1;
    124   TestStreamReader reader2;
    125   TestStreamWriter writer;
    126 
    127   GURL url("blob://stream");
    128   scoped_refptr<Stream> stream(
    129       new Stream(registry_.get(), &writer, url));
    130   EXPECT_TRUE(stream->SetReadObserver(&reader1));
    131   EXPECT_FALSE(stream->SetReadObserver(&reader2));
    132 }
    133 
    134 TEST_F(StreamTest, SetReadObserver_TwoReaders) {
    135   TestStreamReader reader1;
    136   TestStreamReader reader2;
    137   TestStreamWriter writer;
    138 
    139   GURL url("blob://stream");
    140   scoped_refptr<Stream> stream(
    141       new Stream(registry_.get(), &writer, url));
    142   EXPECT_TRUE(stream->SetReadObserver(&reader1));
    143 
    144   // Once the first read observer is removed, a new one can be added.
    145   stream->RemoveReadObserver(&reader1);
    146   EXPECT_TRUE(stream->SetReadObserver(&reader2));
    147 }
    148 
    149 TEST_F(StreamTest, Stream) {
    150   TestStreamReader reader;
    151   TestStreamWriter writer;
    152 
    153   GURL url("blob://stream");
    154   scoped_refptr<Stream> stream(
    155       new Stream(registry_.get(), &writer, url));
    156   EXPECT_TRUE(stream->SetReadObserver(&reader));
    157 
    158   const int kBufferSize = 1000000;
    159   scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
    160   writer.Write(stream.get(), buffer, kBufferSize);
    161   stream->Finalize();
    162   base::MessageLoop::current()->RunUntilIdle();
    163   EXPECT_TRUE(reader.completed());
    164 
    165   ASSERT_EQ(reader.buffer()->capacity(), kBufferSize);
    166   for (int i = 0; i < kBufferSize; i++)
    167     EXPECT_EQ(buffer->data()[i], reader.buffer()->data()[i]);
    168 }
    169 
    170 // Test that even if a reader receives an empty buffer, once TransferData()
    171 // method is called on it with |source_complete| = true, following Read() calls
    172 // on it never returns STREAM_EMPTY. Together with StreamTest.Stream above, this
    173 // guarantees that Reader::Read() call returns only STREAM_HAS_DATA
    174 // or STREAM_COMPLETE in |data_available_callback_| call corresponding to
    175 // Writer::Close().
    176 TEST_F(StreamTest, ClosedReaderDoesNotReturnStreamEmpty) {
    177   TestStreamReader reader;
    178   TestStreamWriter writer;
    179 
    180   GURL url("blob://stream");
    181   scoped_refptr<Stream> stream(
    182       new Stream(registry_.get(), &writer, url));
    183   EXPECT_TRUE(stream->SetReadObserver(&reader));
    184 
    185   const int kBufferSize = 0;
    186   scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
    187   stream->AddData(buffer, kBufferSize);
    188   stream->Finalize();
    189   base::MessageLoop::current()->RunUntilIdle();
    190   EXPECT_TRUE(reader.completed());
    191   EXPECT_EQ(0, reader.buffer()->capacity());
    192 }
    193 
    194 TEST_F(StreamTest, GetStream) {
    195   TestStreamWriter writer;
    196 
    197   GURL url("blob://stream");
    198   scoped_refptr<Stream> stream1(
    199       new Stream(registry_.get(), &writer, url));
    200 
    201   scoped_refptr<Stream> stream2 = registry_->GetStream(url);
    202   ASSERT_EQ(stream1, stream2);
    203 }
    204 
    205 TEST_F(StreamTest, GetStream_Missing) {
    206   TestStreamWriter writer;
    207 
    208   GURL url1("blob://stream");
    209   scoped_refptr<Stream> stream1(
    210       new Stream(registry_.get(), &writer, url1));
    211 
    212   GURL url2("blob://stream2");
    213   scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
    214   ASSERT_FALSE(stream2.get());
    215 }
    216 
    217 TEST_F(StreamTest, CloneStream) {
    218   TestStreamWriter writer;
    219 
    220   GURL url1("blob://stream");
    221   scoped_refptr<Stream> stream1(
    222       new Stream(registry_.get(), &writer, url1));
    223 
    224   GURL url2("blob://stream2");
    225   ASSERT_TRUE(registry_->CloneStream(url2, url1));
    226   scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
    227   ASSERT_EQ(stream1, stream2);
    228 }
    229 
    230 TEST_F(StreamTest, CloneStream_Missing) {
    231   TestStreamWriter writer;
    232 
    233   GURL url1("blob://stream");
    234   scoped_refptr<Stream> stream1(
    235       new Stream(registry_.get(), &writer, url1));
    236 
    237   GURL url2("blob://stream2");
    238   GURL url3("blob://stream3");
    239   ASSERT_FALSE(registry_->CloneStream(url2, url3));
    240   scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
    241   ASSERT_FALSE(stream2.get());
    242 }
    243 
    244 TEST_F(StreamTest, UnregisterStream) {
    245   TestStreamWriter writer;
    246 
    247   GURL url("blob://stream");
    248   scoped_refptr<Stream> stream1(
    249       new Stream(registry_.get(), &writer, url));
    250 
    251   registry_->UnregisterStream(url);
    252   scoped_refptr<Stream> stream2 = registry_->GetStream(url);
    253   ASSERT_FALSE(stream2.get());
    254 }
    255 
    256 TEST_F(StreamTest, MemoryExceedMemoryUsageLimit) {
    257   TestStreamWriter writer1;
    258   TestStreamWriter writer2;
    259 
    260   GURL url1("blob://stream");
    261   scoped_refptr<Stream> stream1(
    262       new Stream(registry_.get(), &writer1, url1));
    263 
    264   GURL url2("blob://stream2");
    265   scoped_refptr<Stream> stream2(
    266       new Stream(registry_.get(), &writer2, url2));
    267 
    268   const int kMaxMemoryUsage = 1500000;
    269   registry_->set_max_memory_usage_for_testing(kMaxMemoryUsage);
    270 
    271   const int kBufferSize = 1000000;
    272   scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
    273   writer1.Write(stream1.get(), buffer, kBufferSize);
    274   // Make transfer happen.
    275   base::MessageLoop::current()->RunUntilIdle();
    276 
    277   writer2.Write(stream2.get(), buffer, kBufferSize);
    278 
    279   // Written data (1000000 * 2) exceeded limit (1500000). |stream2| should be
    280   // unregistered with |registry_|.
    281   EXPECT_EQ(NULL, registry_->GetStream(url2).get());
    282 
    283   writer1.Write(stream1.get(), buffer, kMaxMemoryUsage - kBufferSize);
    284   // Should be accepted since stream2 is unregistered and the new data is not
    285   // so big to exceed the limit.
    286   EXPECT_FALSE(registry_->GetStream(url1).get() == NULL);
    287 }
    288 
    289 TEST_F(StreamTest, UnderMemoryUsageLimit) {
    290   TestStreamWriter writer;
    291   TestStreamReader reader;
    292 
    293   GURL url("blob://stream");
    294   scoped_refptr<Stream> stream(new Stream(registry_.get(), &writer, url));
    295   EXPECT_TRUE(stream->SetReadObserver(&reader));
    296 
    297   registry_->set_max_memory_usage_for_testing(1500000);
    298 
    299   const int kBufferSize = 1000000;
    300   scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
    301   writer.Write(stream.get(), buffer, kBufferSize);
    302 
    303   // Run loop to make |reader| consume the data.
    304   base::MessageLoop::current()->RunUntilIdle();
    305 
    306   writer.Write(stream.get(), buffer, kBufferSize);
    307 
    308   EXPECT_EQ(stream.get(), registry_->GetStream(url).get());
    309 }
    310 
    311 }  // namespace content
    312