Home | History | Annotate | Download | only in browser
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/browser/byte_stream.h"
      6 
      7 #include <deque>
      8 
      9 #include "base/bind.h"
     10 #include "base/callback.h"
     11 #include "base/memory/ref_counted.h"
     12 #include "base/message_loop/message_loop.h"
     13 #include "base/test/test_simple_task_runner.h"
     14 #include "net/base/io_buffer.h"
     15 #include "testing/gtest/include/gtest/gtest.h"
     16 
     17 namespace content {
     18 namespace {
     19 
     20 void CountCallbacks(int* counter) {
     21   ++*counter;
     22 }
     23 
     24 }  // namespace
     25 
     26 class ByteStreamTest : public testing::Test {
     27  public:
     28   ByteStreamTest();
     29 
     30   // Create a new IO buffer of the given |buffer_size|.  Details of the
     31   // contents of the created buffer will be kept, and can be validated
     32   // by ValidateIOBuffer.
     33   scoped_refptr<net::IOBuffer> NewIOBuffer(size_t buffer_size) {
     34     scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(buffer_size));
     35     char *bufferp = buffer->data();
     36     for (size_t i = 0; i < buffer_size; i++)
     37       bufferp[i] = (i + producing_seed_key_) % (1 << sizeof(char));
     38     pointer_queue_.push_back(bufferp);
     39     length_queue_.push_back(buffer_size);
     40     ++producing_seed_key_;
     41     return buffer;
     42   }
     43 
     44   // Create an IOBuffer of the appropriate size and add it to the
     45   // ByteStream, returning the result of the ByteStream::Write.
     46   // Separate function to avoid duplication of buffer_size in test
     47   // calls.
     48   bool Write(ByteStreamWriter* byte_stream_input, size_t buffer_size) {
     49     return byte_stream_input->Write(NewIOBuffer(buffer_size), buffer_size);
     50   }
     51 
     52   // Validate that we have the IOBuffer we expect.  This routine must be
     53   // called on buffers that were allocated from NewIOBuffer, and in the
     54   // order that they were allocated.  Calls to NewIOBuffer &&
     55   // ValidateIOBuffer may be interleaved.
     56   bool ValidateIOBuffer(
     57       scoped_refptr<net::IOBuffer> buffer, size_t buffer_size) {
     58     char *bufferp = buffer->data();
     59 
     60     char *expected_ptr = pointer_queue_.front();
     61     size_t expected_length = length_queue_.front();
     62     pointer_queue_.pop_front();
     63     length_queue_.pop_front();
     64     ++consuming_seed_key_;
     65 
     66     EXPECT_EQ(expected_ptr, bufferp);
     67     if (expected_ptr != bufferp)
     68       return false;
     69 
     70     EXPECT_EQ(expected_length, buffer_size);
     71     if (expected_length != buffer_size)
     72       return false;
     73 
     74     for (size_t i = 0; i < buffer_size; i++) {
     75       // Already incremented, so subtract one from the key.
     76       EXPECT_EQ(static_cast<int>((i + consuming_seed_key_ - 1)
     77                                  % (1 << sizeof(char))),
     78                 bufferp[i]);
     79       if (static_cast<int>((i + consuming_seed_key_ - 1) %
     80                            (1 << sizeof(char))) != bufferp[i]) {
     81         return false;
     82       }
     83     }
     84     return true;
     85   }
     86 
     87  protected:
     88   base::MessageLoop message_loop_;
     89 
     90  private:
     91   int producing_seed_key_;
     92   int consuming_seed_key_;
     93   std::deque<char*> pointer_queue_;
     94   std::deque<size_t> length_queue_;
     95 };
     96 
     97 ByteStreamTest::ByteStreamTest()
     98     : producing_seed_key_(0),
     99       consuming_seed_key_(0) { }
    100 
    101 // Confirm that filling and emptying the stream works properly, and that
    102 // we get full triggers when we expect.
    103 TEST_F(ByteStreamTest, ByteStream_PushBack) {
    104   scoped_ptr<ByteStreamWriter> byte_stream_input;
    105   scoped_ptr<ByteStreamReader> byte_stream_output;
    106   CreateByteStream(
    107       message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
    108       3 * 1024, &byte_stream_input, &byte_stream_output);
    109 
    110   // Push a series of IO buffers on; test pushback happening and
    111   // that it's advisory.
    112   EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
    113   EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
    114   EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
    115   EXPECT_FALSE(Write(byte_stream_input.get(), 1));
    116   EXPECT_FALSE(Write(byte_stream_input.get(), 1024));
    117   // Flush
    118   byte_stream_input->Close(0);
    119   message_loop_.RunUntilIdle();
    120 
    121   // Pull the IO buffers out; do we get the same buffers and do they
    122   // have the same contents?
    123   scoped_refptr<net::IOBuffer> output_io_buffer;
    124   size_t output_length;
    125   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    126             byte_stream_output->Read(&output_io_buffer, &output_length));
    127   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    128 
    129   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    130             byte_stream_output->Read(&output_io_buffer, &output_length));
    131   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    132 
    133   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    134             byte_stream_output->Read(&output_io_buffer, &output_length));
    135   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    136 
    137   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    138             byte_stream_output->Read(&output_io_buffer, &output_length));
    139   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    140 
    141   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    142             byte_stream_output->Read(&output_io_buffer, &output_length));
    143   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    144 
    145   EXPECT_EQ(ByteStreamReader::STREAM_COMPLETE,
    146             byte_stream_output->Read(&output_io_buffer, &output_length));
    147 }
    148 
    149 // Confirm that Flush() method makes the writer to send written contents to
    150 // the reader.
    151 TEST_F(ByteStreamTest, ByteStream_Flush) {
    152   scoped_ptr<ByteStreamWriter> byte_stream_input;
    153   scoped_ptr<ByteStreamReader> byte_stream_output;
    154   CreateByteStream(
    155       message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
    156       1024, &byte_stream_input, &byte_stream_output);
    157 
    158   EXPECT_TRUE(Write(byte_stream_input.get(), 1));
    159   message_loop_.RunUntilIdle();
    160 
    161   scoped_refptr<net::IOBuffer> output_io_buffer;
    162   size_t output_length = 0;
    163   // Check that data is not sent to the reader yet.
    164   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    165             byte_stream_output->Read(&output_io_buffer, &output_length));
    166 
    167   byte_stream_input->Flush();
    168   message_loop_.RunUntilIdle();
    169 
    170   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    171             byte_stream_output->Read(&output_io_buffer, &output_length));
    172   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    173 
    174   // Check that it's ok to Flush() an empty writer.
    175   byte_stream_input->Flush();
    176   message_loop_.RunUntilIdle();
    177 
    178   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    179             byte_stream_output->Read(&output_io_buffer, &output_length));
    180 
    181   byte_stream_input->Close(0);
    182   message_loop_.RunUntilIdle();
    183 
    184   EXPECT_EQ(ByteStreamReader::STREAM_COMPLETE,
    185             byte_stream_output->Read(&output_io_buffer, &output_length));
    186 }
    187 
    188 // Same as above, only use knowledge of the internals to confirm
    189 // that we're getting pushback even when data's split across the two
    190 // objects
    191 TEST_F(ByteStreamTest, ByteStream_PushBackSplit) {
    192   scoped_ptr<ByteStreamWriter> byte_stream_input;
    193   scoped_ptr<ByteStreamReader> byte_stream_output;
    194   CreateByteStream(
    195       message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
    196       9 * 1024, &byte_stream_input, &byte_stream_output);
    197 
    198   // Push a series of IO buffers on; test pushback happening and
    199   // that it's advisory.
    200   EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
    201   message_loop_.RunUntilIdle();
    202   EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
    203   message_loop_.RunUntilIdle();
    204   EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
    205   message_loop_.RunUntilIdle();
    206   EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
    207   message_loop_.RunUntilIdle();
    208   EXPECT_FALSE(Write(byte_stream_input.get(), 6 * 1024));
    209   message_loop_.RunUntilIdle();
    210 
    211   // Pull the IO buffers out; do we get the same buffers and do they
    212   // have the same contents?
    213   scoped_refptr<net::IOBuffer> output_io_buffer;
    214   size_t output_length;
    215   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    216             byte_stream_output->Read(&output_io_buffer, &output_length));
    217   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    218 
    219   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    220             byte_stream_output->Read(&output_io_buffer, &output_length));
    221   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    222 
    223   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    224             byte_stream_output->Read(&output_io_buffer, &output_length));
    225   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    226 
    227   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    228             byte_stream_output->Read(&output_io_buffer, &output_length));
    229   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    230 
    231   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    232             byte_stream_output->Read(&output_io_buffer, &output_length));
    233   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    234 
    235   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    236             byte_stream_output->Read(&output_io_buffer, &output_length));
    237 }
    238 
    239 // Confirm that a Close() notification transmits in-order
    240 // with data on the stream.
    241 TEST_F(ByteStreamTest, ByteStream_CompleteTransmits) {
    242   scoped_ptr<ByteStreamWriter> byte_stream_input;
    243   scoped_ptr<ByteStreamReader> byte_stream_output;
    244 
    245   scoped_refptr<net::IOBuffer> output_io_buffer;
    246   size_t output_length;
    247 
    248   // Empty stream, non-error case.
    249   CreateByteStream(
    250       message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
    251       3 * 1024, &byte_stream_input, &byte_stream_output);
    252   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    253             byte_stream_output->Read(&output_io_buffer, &output_length));
    254   byte_stream_input->Close(0);
    255   message_loop_.RunUntilIdle();
    256   ASSERT_EQ(ByteStreamReader::STREAM_COMPLETE,
    257             byte_stream_output->Read(&output_io_buffer, &output_length));
    258   EXPECT_EQ(0, byte_stream_output->GetStatus());
    259 
    260   // Non-empty stream, non-error case.
    261   CreateByteStream(
    262       message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
    263       3 * 1024, &byte_stream_input, &byte_stream_output);
    264   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    265             byte_stream_output->Read(&output_io_buffer, &output_length));
    266   EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
    267   byte_stream_input->Close(0);
    268   message_loop_.RunUntilIdle();
    269   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    270             byte_stream_output->Read(&output_io_buffer, &output_length));
    271   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    272   ASSERT_EQ(ByteStreamReader::STREAM_COMPLETE,
    273             byte_stream_output->Read(&output_io_buffer, &output_length));
    274   EXPECT_EQ(0, byte_stream_output->GetStatus());
    275 
    276   const int kFakeErrorCode = 22;
    277 
    278   // Empty stream, error case.
    279   CreateByteStream(
    280       message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
    281       3 * 1024, &byte_stream_input, &byte_stream_output);
    282   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    283             byte_stream_output->Read(&output_io_buffer, &output_length));
    284   byte_stream_input->Close(kFakeErrorCode);
    285   message_loop_.RunUntilIdle();
    286   ASSERT_EQ(ByteStreamReader::STREAM_COMPLETE,
    287             byte_stream_output->Read(&output_io_buffer, &output_length));
    288   EXPECT_EQ(kFakeErrorCode, byte_stream_output->GetStatus());
    289 
    290   // Non-empty stream, error case.
    291   CreateByteStream(
    292       message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
    293       3 * 1024, &byte_stream_input, &byte_stream_output);
    294   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    295             byte_stream_output->Read(&output_io_buffer, &output_length));
    296   EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
    297   byte_stream_input->Close(kFakeErrorCode);
    298   message_loop_.RunUntilIdle();
    299   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    300             byte_stream_output->Read(&output_io_buffer, &output_length));
    301   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    302   ASSERT_EQ(ByteStreamReader::STREAM_COMPLETE,
    303             byte_stream_output->Read(&output_io_buffer, &output_length));
    304   EXPECT_EQ(kFakeErrorCode, byte_stream_output->GetStatus());
    305 }
    306 
    307 // Confirm that callbacks on the sink side are triggered when they should be.
    308 TEST_F(ByteStreamTest, ByteStream_SinkCallback) {
    309   scoped_refptr<base::TestSimpleTaskRunner> task_runner(
    310       new base::TestSimpleTaskRunner());
    311 
    312   scoped_ptr<ByteStreamWriter> byte_stream_input;
    313   scoped_ptr<ByteStreamReader> byte_stream_output;
    314   CreateByteStream(
    315       message_loop_.message_loop_proxy(), task_runner,
    316       10000, &byte_stream_input, &byte_stream_output);
    317 
    318   scoped_refptr<net::IOBuffer> output_io_buffer;
    319   size_t output_length;
    320 
    321   // Note that the specifics of when the callbacks are called with regard
    322   // to how much data is pushed onto the stream is not (currently) part
    323   // of the interface contract.  If it becomes part of the contract, the
    324   // tests below should get much more precise.
    325 
    326   // Confirm callback called when you add more than 33% of the buffer.
    327 
    328   // Setup callback
    329   int num_callbacks = 0;
    330   byte_stream_output->RegisterCallback(
    331       base::Bind(CountCallbacks, &num_callbacks));
    332 
    333   EXPECT_TRUE(Write(byte_stream_input.get(), 4000));
    334   message_loop_.RunUntilIdle();
    335 
    336   EXPECT_EQ(0, num_callbacks);
    337   task_runner->RunUntilIdle();
    338   EXPECT_EQ(1, num_callbacks);
    339 
    340   // Check data and stream state.
    341   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    342             byte_stream_output->Read(&output_io_buffer, &output_length));
    343   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    344   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    345             byte_stream_output->Read(&output_io_buffer, &output_length));
    346 
    347   // Confirm callback *isn't* called at less than 33% (by lack of
    348   // unexpected call on task runner).
    349   EXPECT_TRUE(Write(byte_stream_input.get(), 3000));
    350   message_loop_.RunUntilIdle();
    351 
    352   // This reflects an implementation artifact that data goes with callbacks,
    353   // which should not be considered part of the interface guarantee.
    354   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    355             byte_stream_output->Read(&output_io_buffer, &output_length));
    356 }
    357 
    358 // Confirm that callbacks on the source side are triggered when they should
    359 // be.
    360 TEST_F(ByteStreamTest, ByteStream_SourceCallback) {
    361   scoped_refptr<base::TestSimpleTaskRunner> task_runner(
    362       new base::TestSimpleTaskRunner());
    363 
    364   scoped_ptr<ByteStreamWriter> byte_stream_input;
    365   scoped_ptr<ByteStreamReader> byte_stream_output;
    366   CreateByteStream(
    367       task_runner, message_loop_.message_loop_proxy(),
    368       10000, &byte_stream_input, &byte_stream_output);
    369 
    370   scoped_refptr<net::IOBuffer> output_io_buffer;
    371   size_t output_length;
    372 
    373   // Note that the specifics of when the callbacks are called with regard
    374   // to how much data is pulled from the stream is not (currently) part
    375   // of the interface contract.  If it becomes part of the contract, the
    376   // tests below should get much more precise.
    377 
    378   // Confirm callback called when about 33% space available, and not
    379   // at other transitions.
    380 
    381   // Add data.
    382   int num_callbacks = 0;
    383   byte_stream_input->RegisterCallback(
    384       base::Bind(CountCallbacks, &num_callbacks));
    385   EXPECT_TRUE(Write(byte_stream_input.get(), 2000));
    386   EXPECT_TRUE(Write(byte_stream_input.get(), 2001));
    387   EXPECT_FALSE(Write(byte_stream_input.get(), 6000));
    388 
    389   // Allow bytes to transition (needed for message passing implementation),
    390   // and get and validate the data.
    391   message_loop_.RunUntilIdle();
    392   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    393             byte_stream_output->Read(&output_io_buffer, &output_length));
    394   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    395 
    396   // Grab data, triggering callback.  Recorded on dispatch, but doesn't
    397   // happen because it's caught by the mock.
    398   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    399             byte_stream_output->Read(&output_io_buffer, &output_length));
    400   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    401 
    402   // Confirm that the callback passed to the mock does what we expect.
    403   EXPECT_EQ(0, num_callbacks);
    404   task_runner->RunUntilIdle();
    405   EXPECT_EQ(1, num_callbacks);
    406 
    407   // Same drill with final buffer.
    408   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    409             byte_stream_output->Read(&output_io_buffer, &output_length));
    410   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    411   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    412             byte_stream_output->Read(&output_io_buffer, &output_length));
    413   EXPECT_EQ(1, num_callbacks);
    414   task_runner->RunUntilIdle();
    415   // Should have updated the internal structures but not called the
    416   // callback.
    417   EXPECT_EQ(1, num_callbacks);
    418 }
    419 
    420 // Confirm that racing a change to a sink callback with a post results
    421 // in the new callback being called.
    422 TEST_F(ByteStreamTest, ByteStream_SinkInterrupt) {
    423   scoped_refptr<base::TestSimpleTaskRunner> task_runner(
    424       new base::TestSimpleTaskRunner());
    425 
    426   scoped_ptr<ByteStreamWriter> byte_stream_input;
    427   scoped_ptr<ByteStreamReader> byte_stream_output;
    428   CreateByteStream(
    429       message_loop_.message_loop_proxy(), task_runner,
    430       10000, &byte_stream_input, &byte_stream_output);
    431 
    432   scoped_refptr<net::IOBuffer> output_io_buffer;
    433   size_t output_length;
    434   base::Closure intermediate_callback;
    435 
    436   // Record initial state.
    437   int num_callbacks = 0;
    438   byte_stream_output->RegisterCallback(
    439       base::Bind(CountCallbacks, &num_callbacks));
    440 
    441   // Add data, and pass it across.
    442   EXPECT_TRUE(Write(byte_stream_input.get(), 4000));
    443   message_loop_.RunUntilIdle();
    444 
    445   // The task runner should have been hit, but the callback count
    446   // isn't changed until we actually run the callback.
    447   EXPECT_EQ(0, num_callbacks);
    448 
    449   // If we change the callback now, the new one should be run
    450   // (simulates race with post task).
    451   int num_alt_callbacks = 0;
    452   byte_stream_output->RegisterCallback(
    453       base::Bind(CountCallbacks, &num_alt_callbacks));
    454   task_runner->RunUntilIdle();
    455   EXPECT_EQ(0, num_callbacks);
    456   EXPECT_EQ(1, num_alt_callbacks);
    457 
    458   // Final cleanup.
    459   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    460             byte_stream_output->Read(&output_io_buffer, &output_length));
    461   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    462   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    463             byte_stream_output->Read(&output_io_buffer, &output_length));
    464 
    465 }
    466 
    467 // Confirm that racing a change to a source callback with a post results
    468 // in the new callback being called.
    469 TEST_F(ByteStreamTest, ByteStream_SourceInterrupt) {
    470   scoped_refptr<base::TestSimpleTaskRunner> task_runner(
    471       new base::TestSimpleTaskRunner());
    472 
    473   scoped_ptr<ByteStreamWriter> byte_stream_input;
    474   scoped_ptr<ByteStreamReader> byte_stream_output;
    475   CreateByteStream(
    476       task_runner, message_loop_.message_loop_proxy(),
    477       10000, &byte_stream_input, &byte_stream_output);
    478 
    479   scoped_refptr<net::IOBuffer> output_io_buffer;
    480   size_t output_length;
    481   base::Closure intermediate_callback;
    482 
    483   // Setup state for test.
    484   int num_callbacks = 0;
    485   byte_stream_input->RegisterCallback(
    486       base::Bind(CountCallbacks, &num_callbacks));
    487   EXPECT_TRUE(Write(byte_stream_input.get(), 2000));
    488   EXPECT_TRUE(Write(byte_stream_input.get(), 2001));
    489   EXPECT_FALSE(Write(byte_stream_input.get(), 6000));
    490   message_loop_.RunUntilIdle();
    491 
    492   // Initial get should not trigger callback.
    493   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    494             byte_stream_output->Read(&output_io_buffer, &output_length));
    495   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    496   message_loop_.RunUntilIdle();
    497 
    498   // Second get *should* trigger callback.
    499   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    500             byte_stream_output->Read(&output_io_buffer, &output_length));
    501   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    502 
    503   // Which should do the right thing when it's run.
    504   int num_alt_callbacks = 0;
    505   byte_stream_input->RegisterCallback(
    506       base::Bind(CountCallbacks, &num_alt_callbacks));
    507   task_runner->RunUntilIdle();
    508   EXPECT_EQ(0, num_callbacks);
    509   EXPECT_EQ(1, num_alt_callbacks);
    510 
    511   // Third get should also trigger callback.
    512   EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
    513             byte_stream_output->Read(&output_io_buffer, &output_length));
    514   EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
    515   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    516             byte_stream_output->Read(&output_io_buffer, &output_length));
    517 }
    518 
    519 // Confirm that callback is called on zero data transfer but source
    520 // complete.
    521 TEST_F(ByteStreamTest, ByteStream_ZeroCallback) {
    522   scoped_refptr<base::TestSimpleTaskRunner> task_runner(
    523       new base::TestSimpleTaskRunner());
    524 
    525   scoped_ptr<ByteStreamWriter> byte_stream_input;
    526   scoped_ptr<ByteStreamReader> byte_stream_output;
    527   CreateByteStream(
    528       message_loop_.message_loop_proxy(), task_runner,
    529       10000, &byte_stream_input, &byte_stream_output);
    530 
    531   base::Closure intermediate_callback;
    532 
    533   // Record initial state.
    534   int num_callbacks = 0;
    535   byte_stream_output->RegisterCallback(
    536       base::Bind(CountCallbacks, &num_callbacks));
    537 
    538   // Immediately close the stream.
    539   byte_stream_input->Close(0);
    540   task_runner->RunUntilIdle();
    541   EXPECT_EQ(1, num_callbacks);
    542 }
    543 
    544 TEST_F(ByteStreamTest, ByteStream_CloseWithoutAnyWrite) {
    545   scoped_ptr<ByteStreamWriter> byte_stream_input;
    546   scoped_ptr<ByteStreamReader> byte_stream_output;
    547   CreateByteStream(
    548       message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
    549       3 * 1024, &byte_stream_input, &byte_stream_output);
    550 
    551   byte_stream_input->Close(0);
    552   message_loop_.RunUntilIdle();
    553 
    554   scoped_refptr<net::IOBuffer> output_io_buffer;
    555   size_t output_length;
    556   EXPECT_EQ(ByteStreamReader::STREAM_COMPLETE,
    557             byte_stream_output->Read(&output_io_buffer, &output_length));
    558 }
    559 
    560 TEST_F(ByteStreamTest, ByteStream_FlushWithoutAnyWrite) {
    561   scoped_ptr<ByteStreamWriter> byte_stream_input;
    562   scoped_ptr<ByteStreamReader> byte_stream_output;
    563   CreateByteStream(
    564       message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
    565       3 * 1024, &byte_stream_input, &byte_stream_output);
    566 
    567   byte_stream_input->Flush();
    568   message_loop_.RunUntilIdle();
    569 
    570   scoped_refptr<net::IOBuffer> output_io_buffer;
    571   size_t output_length;
    572   EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
    573             byte_stream_output->Read(&output_io_buffer, &output_length));
    574 
    575   byte_stream_input->Close(0);
    576   message_loop_.RunUntilIdle();
    577 
    578   EXPECT_EQ(ByteStreamReader::STREAM_COMPLETE,
    579             byte_stream_output->Read(&output_io_buffer, &output_length));
    580 }
    581 
    582 }  // namespace content
    583