Home | History | Annotate | Download | only in base
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "google_apis/gcm/base/socket_stream.h"
      6 
      7 #include "base/basictypes.h"
      8 #include "base/bind.h"
      9 #include "base/memory/scoped_ptr.h"
     10 #include "base/run_loop.h"
     11 #include "base/stl_util.h"
     12 #include "base/strings/string_piece.h"
     13 #include "net/socket/socket_test_util.h"
     14 #include "testing/gtest/include/gtest/gtest.h"
     15 
     16 namespace gcm {
     17 namespace {
     18 
     19 typedef std::vector<net::MockRead> ReadList;
     20 typedef std::vector<net::MockWrite> WriteList;
     21 
     22 const char kReadData[] = "read_data";
     23 const uint64 kReadDataSize = arraysize(kReadData) - 1;
     24 const char kReadData2[] = "read_alternate_data";
     25 const uint64 kReadData2Size = arraysize(kReadData2) - 1;
     26 const char kWriteData[] = "write_data";
     27 const uint64 kWriteDataSize = arraysize(kWriteData) - 1;
     28 
     29 class GCMSocketStreamTest : public testing::Test {
     30  public:
     31   GCMSocketStreamTest();
     32   virtual ~GCMSocketStreamTest();
     33 
     34   // Build a socket with the expected reads and writes.
     35   void BuildSocket(const ReadList& read_list, const WriteList& write_list);
     36 
     37   // Pump the message loop until idle.
     38   void PumpLoop();
     39 
     40   // Simulates a google::protobuf::io::CodedInputStream read.
     41   base::StringPiece DoInputStreamRead(uint64 bytes);
     42   // Simulates a google::protobuf::io::CodedOutputStream write.
     43   uint64 DoOutputStreamWrite(const base::StringPiece& write_src);
     44 
     45   // Synchronous Refresh wrapper.
     46   void WaitForData(size_t msg_size);
     47 
     48   base::MessageLoop* message_loop() { return &message_loop_; };
     49   net::DelayedSocketData* data_provider() { return data_provider_.get(); }
     50   SocketInputStream* input_stream() { return socket_input_stream_.get(); }
     51   SocketOutputStream* output_stream() { return socket_output_stream_.get(); }
     52   net::StreamSocket* socket() { return socket_.get(); }
     53 
     54  private:
     55   void OpenConnection();
     56   void ResetInputStream();
     57   void ResetOutputStream();
     58 
     59   void ConnectCallback(int result);
     60 
     61   // SocketStreams and their data providers.
     62   ReadList mock_reads_;
     63   WriteList mock_writes_;
     64   scoped_ptr<net::DelayedSocketData> data_provider_;
     65   scoped_ptr<SocketInputStream> socket_input_stream_;
     66   scoped_ptr<SocketOutputStream> socket_output_stream_;
     67 
     68   // net:: components.
     69   scoped_ptr<net::StreamSocket> socket_;
     70   net::MockClientSocketFactory socket_factory_;
     71   net::AddressList address_list_;
     72 
     73   base::MessageLoopForIO message_loop_;
     74 };
     75 
     76 GCMSocketStreamTest::GCMSocketStreamTest() {
     77   net::IPAddressNumber ip_number;
     78   net::ParseIPLiteralToNumber("127.0.0.1", &ip_number);
     79   address_list_ = net::AddressList::CreateFromIPAddress(ip_number, 5228);
     80 }
     81 
     82 GCMSocketStreamTest::~GCMSocketStreamTest() {}
     83 
     84 void GCMSocketStreamTest::BuildSocket(const ReadList& read_list,
     85                                       const WriteList& write_list) {
     86   mock_reads_ = read_list;
     87   mock_writes_ = write_list;
     88   data_provider_.reset(
     89       new net::DelayedSocketData(
     90           0,
     91           vector_as_array(&mock_reads_), mock_reads_.size(),
     92           vector_as_array(&mock_writes_), mock_writes_.size()));
     93   socket_factory_.AddSocketDataProvider(data_provider_.get());
     94   OpenConnection();
     95   ResetInputStream();
     96   ResetOutputStream();
     97 }
     98 
     99 void GCMSocketStreamTest::PumpLoop() {
    100   base::RunLoop run_loop;
    101   run_loop.RunUntilIdle();
    102 }
    103 
    104 base::StringPiece GCMSocketStreamTest::DoInputStreamRead(uint64 bytes) {
    105   uint64 total_bytes_read = 0;
    106   const void* initial_buffer = NULL;
    107   const void* buffer = NULL;
    108   int size = 0;
    109 
    110   do {
    111     DCHECK(socket_input_stream_->GetState() == SocketInputStream::EMPTY ||
    112            socket_input_stream_->GetState() == SocketInputStream::READY);
    113     if (!socket_input_stream_->Next(&buffer, &size))
    114       break;
    115     total_bytes_read += size;
    116     if (initial_buffer) {  // Verify the buffer doesn't skip data.
    117       EXPECT_EQ(static_cast<const uint8*>(initial_buffer) + total_bytes_read,
    118                 static_cast<const uint8*>(buffer) + size);
    119     } else {
    120       initial_buffer = buffer;
    121     }
    122   } while (total_bytes_read < bytes);
    123 
    124   if (total_bytes_read > bytes) {
    125     socket_input_stream_->BackUp(total_bytes_read - bytes);
    126     total_bytes_read = bytes;
    127   }
    128 
    129   return base::StringPiece(static_cast<const char*>(initial_buffer),
    130                            total_bytes_read);
    131 }
    132 
    133 uint64 GCMSocketStreamTest::DoOutputStreamWrite(
    134     const base::StringPiece& write_src) {
    135   DCHECK_EQ(socket_output_stream_->GetState(), SocketOutputStream::EMPTY);
    136   uint64 total_bytes_written = 0;
    137   void* buffer = NULL;
    138   int size = 0;
    139   size_t bytes = write_src.size();
    140 
    141   do {
    142     if (!socket_output_stream_->Next(&buffer, &size))
    143       break;
    144     uint64 bytes_to_write = (static_cast<uint64>(size) < bytes ? size : bytes);
    145     memcpy(buffer,
    146            write_src.data() + total_bytes_written,
    147            bytes_to_write);
    148     if (bytes_to_write < static_cast<uint64>(size))
    149       socket_output_stream_->BackUp(size - bytes_to_write);
    150     total_bytes_written += bytes_to_write;
    151   } while (total_bytes_written < bytes);
    152 
    153   base::RunLoop run_loop;
    154   if (socket_output_stream_->Flush(run_loop.QuitClosure()) ==
    155           net::ERR_IO_PENDING) {
    156     run_loop.Run();
    157   }
    158 
    159   return total_bytes_written;
    160 }
    161 
    162 void GCMSocketStreamTest::WaitForData(size_t msg_size) {
    163   while (input_stream()->UnreadByteCount() < msg_size) {
    164     base::RunLoop run_loop;
    165     if (input_stream()->Refresh(run_loop.QuitClosure(),
    166                                 msg_size - input_stream()->UnreadByteCount()) ==
    167             net::ERR_IO_PENDING) {
    168       run_loop.Run();
    169     }
    170     if (input_stream()->GetState() == SocketInputStream::CLOSED)
    171       return;
    172   }
    173 }
    174 
    175 void GCMSocketStreamTest::OpenConnection() {
    176   socket_ = socket_factory_.CreateTransportClientSocket(
    177       address_list_, NULL, net::NetLog::Source());
    178   socket_->Connect(
    179       base::Bind(&GCMSocketStreamTest::ConnectCallback,
    180                  base::Unretained(this)));
    181   PumpLoop();
    182 }
    183 
    184 void GCMSocketStreamTest::ConnectCallback(int result) {}
    185 
    186 void GCMSocketStreamTest::ResetInputStream() {
    187   DCHECK(socket_.get());
    188   socket_input_stream_.reset(new SocketInputStream(socket_.get()));
    189 }
    190 
    191 void GCMSocketStreamTest::ResetOutputStream() {
    192   DCHECK(socket_.get());
    193   socket_output_stream_.reset(new SocketOutputStream(socket_.get()));
    194 }
    195 
    196 // A read where all data is already available.
    197 TEST_F(GCMSocketStreamTest, ReadDataSync) {
    198   BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
    199                                         kReadData,
    200                                         kReadDataSize)),
    201               WriteList());
    202 
    203   WaitForData(kReadDataSize);
    204   ASSERT_EQ(std::string(kReadData, kReadDataSize),
    205             DoInputStreamRead(kReadDataSize));
    206 }
    207 
    208 // A read that comes in two parts.
    209 TEST_F(GCMSocketStreamTest, ReadPartialDataSync) {
    210   size_t first_read_len = kReadDataSize / 2;
    211   size_t second_read_len = kReadDataSize - first_read_len;
    212   ReadList read_list;
    213   read_list.push_back(
    214       net::MockRead(net::SYNCHRONOUS,
    215                     kReadData,
    216                     first_read_len));
    217   read_list.push_back(
    218       net::MockRead(net::SYNCHRONOUS,
    219                     &kReadData[first_read_len],
    220                     second_read_len));
    221   BuildSocket(read_list, WriteList());
    222 
    223   WaitForData(kReadDataSize);
    224   ASSERT_EQ(std::string(kReadData, kReadDataSize),
    225             DoInputStreamRead(kReadDataSize));
    226 }
    227 
    228 // A read where no data is available at first (IO_PENDING will be returned).
    229 TEST_F(GCMSocketStreamTest, ReadAsync) {
    230   size_t first_read_len = kReadDataSize / 2;
    231   size_t second_read_len = kReadDataSize - first_read_len;
    232   ReadList read_list;
    233   read_list.push_back(
    234       net::MockRead(net::SYNCHRONOUS, net::ERR_IO_PENDING));
    235   read_list.push_back(
    236       net::MockRead(net::ASYNC, kReadData, first_read_len));
    237   read_list.push_back(
    238       net::MockRead(net::ASYNC, &kReadData[first_read_len], second_read_len));
    239   BuildSocket(read_list, WriteList());
    240 
    241   base::MessageLoop::current()->PostTask(
    242       FROM_HERE,
    243       base::Bind(&net::DelayedSocketData::ForceNextRead,
    244                  base::Unretained(data_provider())));
    245   WaitForData(kReadDataSize);
    246   ASSERT_EQ(std::string(kReadData, kReadDataSize),
    247             DoInputStreamRead(kReadDataSize));
    248 }
    249 
    250 // Simulate two packets arriving at once. Read them in two separate calls.
    251 TEST_F(GCMSocketStreamTest, TwoReadsAtOnce) {
    252   std::string long_data = std::string(kReadData, kReadDataSize) +
    253                           std::string(kReadData2, kReadData2Size);
    254   BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
    255                                         long_data.c_str(),
    256                                         long_data.size())),
    257               WriteList());
    258 
    259   WaitForData(kReadDataSize);
    260   ASSERT_EQ(std::string(kReadData, kReadDataSize),
    261             DoInputStreamRead(kReadDataSize));
    262 
    263   WaitForData(kReadData2Size);
    264   ASSERT_EQ(std::string(kReadData2, kReadData2Size),
    265             DoInputStreamRead(kReadData2Size));
    266 }
    267 
    268 // Simulate two packets arriving at once. Read them in two calls separated
    269 // by a Rebuild.
    270 TEST_F(GCMSocketStreamTest, TwoReadsAtOnceWithRebuild) {
    271   std::string long_data = std::string(kReadData, kReadDataSize) +
    272                           std::string(kReadData2, kReadData2Size);
    273   BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
    274                                         long_data.c_str(),
    275                                         long_data.size())),
    276               WriteList());
    277 
    278   WaitForData(kReadDataSize);
    279   ASSERT_EQ(std::string(kReadData, kReadDataSize),
    280               DoInputStreamRead(kReadDataSize));
    281 
    282   input_stream()->RebuildBuffer();
    283   WaitForData(kReadData2Size);
    284   ASSERT_EQ(std::string(kReadData2, kReadData2Size),
    285             DoInputStreamRead(kReadData2Size));
    286 }
    287 
    288 // Simulate a read that is aborted.
    289 TEST_F(GCMSocketStreamTest, ReadError) {
    290   int result = net::ERR_ABORTED;
    291   BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, result)),
    292               WriteList());
    293 
    294   WaitForData(kReadDataSize);
    295   ASSERT_EQ(SocketInputStream::CLOSED, input_stream()->GetState());
    296   ASSERT_EQ(result, input_stream()->last_error());
    297 }
    298 
    299 // Simulate a read after the connection is closed.
    300 TEST_F(GCMSocketStreamTest, ReadDisconnected) {
    301   BuildSocket(ReadList(), WriteList());
    302   socket()->Disconnect();
    303   WaitForData(kReadDataSize);
    304   ASSERT_EQ(SocketInputStream::CLOSED, input_stream()->GetState());
    305   ASSERT_EQ(net::ERR_CONNECTION_CLOSED, input_stream()->last_error());
    306 }
    307 
    308 // Write a full message in one go.
    309 TEST_F(GCMSocketStreamTest, WriteFull) {
    310   BuildSocket(ReadList(),
    311               WriteList(1, net::MockWrite(net::SYNCHRONOUS,
    312                                           kWriteData,
    313                                           kWriteDataSize)));
    314   ASSERT_EQ(kWriteDataSize,
    315             DoOutputStreamWrite(base::StringPiece(kWriteData,
    316                                                   kWriteDataSize)));
    317 }
    318 
    319 // Write a message in two go's.
    320 TEST_F(GCMSocketStreamTest, WritePartial) {
    321   WriteList write_list;
    322   write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
    323                                       kWriteData,
    324                                       kWriteDataSize / 2));
    325   write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
    326                                       kWriteData + kWriteDataSize / 2,
    327                                       kWriteDataSize / 2));
    328   BuildSocket(ReadList(), write_list);
    329   ASSERT_EQ(kWriteDataSize,
    330             DoOutputStreamWrite(base::StringPiece(kWriteData,
    331                                                   kWriteDataSize)));
    332 }
    333 
    334 // Write a message completely asynchronously (returns IO_PENDING before
    335 // finishing the write in two go's).
    336 TEST_F(GCMSocketStreamTest, WriteNone) {
    337   WriteList write_list;
    338   write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
    339                                       kWriteData,
    340                                       kWriteDataSize / 2));
    341   write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
    342                                       kWriteData + kWriteDataSize / 2,
    343                                       kWriteDataSize / 2));
    344   BuildSocket(ReadList(), write_list);
    345   ASSERT_EQ(kWriteDataSize,
    346             DoOutputStreamWrite(base::StringPiece(kWriteData,
    347                                                   kWriteDataSize)));
    348 }
    349 
    350 // Write a message then read a message.
    351 TEST_F(GCMSocketStreamTest, WriteThenRead) {
    352   BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
    353                                         kReadData,
    354                                         kReadDataSize)),
    355               WriteList(1, net::MockWrite(net::SYNCHRONOUS,
    356                                           kWriteData,
    357                                           kWriteDataSize)));
    358 
    359   ASSERT_EQ(kWriteDataSize,
    360             DoOutputStreamWrite(base::StringPiece(kWriteData,
    361                                                   kWriteDataSize)));
    362 
    363   WaitForData(kReadDataSize);
    364   ASSERT_EQ(std::string(kReadData, kReadDataSize),
    365               DoInputStreamRead(kReadDataSize));
    366 }
    367 
    368 // Read a message then write a message.
    369 TEST_F(GCMSocketStreamTest, ReadThenWrite) {
    370   BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
    371                                         kReadData,
    372                                         kReadDataSize)),
    373               WriteList(1, net::MockWrite(net::SYNCHRONOUS,
    374                                           kWriteData,
    375                                           kWriteDataSize)));
    376 
    377   WaitForData(kReadDataSize);
    378   ASSERT_EQ(std::string(kReadData, kReadDataSize),
    379               DoInputStreamRead(kReadDataSize));
    380 
    381   ASSERT_EQ(kWriteDataSize,
    382             DoOutputStreamWrite(base::StringPiece(kWriteData,
    383                                                   kWriteDataSize)));
    384 }
    385 
    386 // Simulate a write that gets aborted.
    387 TEST_F(GCMSocketStreamTest, WriteError) {
    388   int result = net::ERR_ABORTED;
    389   BuildSocket(ReadList(),
    390               WriteList(1, net::MockWrite(net::SYNCHRONOUS, result)));
    391   DoOutputStreamWrite(base::StringPiece(kWriteData, kWriteDataSize));
    392   ASSERT_EQ(SocketOutputStream::CLOSED, output_stream()->GetState());
    393   ASSERT_EQ(result, output_stream()->last_error());
    394 }
    395 
    396 // Simulate a write after the connection is closed.
    397 TEST_F(GCMSocketStreamTest, WriteDisconnected) {
    398   BuildSocket(ReadList(), WriteList());
    399   socket()->Disconnect();
    400   DoOutputStreamWrite(base::StringPiece(kWriteData, kWriteDataSize));
    401   ASSERT_EQ(SocketOutputStream::CLOSED, output_stream()->GetState());
    402   ASSERT_EQ(net::ERR_CONNECTION_CLOSED, output_stream()->last_error());
    403 }
    404 
    405 }  // namespace
    406 }  // namespace gcm
    407