1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "google_apis/gcm/base/socket_stream.h" 6 7 #include "base/basictypes.h" 8 #include "base/bind.h" 9 #include "base/memory/scoped_ptr.h" 10 #include "base/run_loop.h" 11 #include "base/stl_util.h" 12 #include "base/strings/string_piece.h" 13 #include "net/socket/socket_test_util.h" 14 #include "testing/gtest/include/gtest/gtest.h" 15 16 namespace gcm { 17 namespace { 18 19 typedef std::vector<net::MockRead> ReadList; 20 typedef std::vector<net::MockWrite> WriteList; 21 22 const char kReadData[] = "read_data"; 23 const uint64 kReadDataSize = arraysize(kReadData) - 1; 24 const char kReadData2[] = "read_alternate_data"; 25 const uint64 kReadData2Size = arraysize(kReadData2) - 1; 26 const char kWriteData[] = "write_data"; 27 const uint64 kWriteDataSize = arraysize(kWriteData) - 1; 28 29 class GCMSocketStreamTest : public testing::Test { 30 public: 31 GCMSocketStreamTest(); 32 virtual ~GCMSocketStreamTest(); 33 34 // Build a socket with the expected reads and writes. 35 void BuildSocket(const ReadList& read_list, const WriteList& write_list); 36 37 // Pump the message loop until idle. 38 void PumpLoop(); 39 40 // Simulates a google::protobuf::io::CodedInputStream read. 41 base::StringPiece DoInputStreamRead(uint64 bytes); 42 // Simulates a google::protobuf::io::CodedOutputStream write. 43 uint64 DoOutputStreamWrite(const base::StringPiece& write_src); 44 45 // Synchronous Refresh wrapper. 46 void WaitForData(size_t msg_size); 47 48 base::MessageLoop* message_loop() { return &message_loop_; }; 49 net::DelayedSocketData* data_provider() { return data_provider_.get(); } 50 SocketInputStream* input_stream() { return socket_input_stream_.get(); } 51 SocketOutputStream* output_stream() { return socket_output_stream_.get(); } 52 net::StreamSocket* socket() { return socket_.get(); } 53 54 private: 55 void OpenConnection(); 56 void ResetInputStream(); 57 void ResetOutputStream(); 58 59 void ConnectCallback(int result); 60 61 // SocketStreams and their data providers. 62 ReadList mock_reads_; 63 WriteList mock_writes_; 64 scoped_ptr<net::DelayedSocketData> data_provider_; 65 scoped_ptr<SocketInputStream> socket_input_stream_; 66 scoped_ptr<SocketOutputStream> socket_output_stream_; 67 68 // net:: components. 69 scoped_ptr<net::StreamSocket> socket_; 70 net::MockClientSocketFactory socket_factory_; 71 net::AddressList address_list_; 72 73 base::MessageLoopForIO message_loop_; 74 }; 75 76 GCMSocketStreamTest::GCMSocketStreamTest() { 77 net::IPAddressNumber ip_number; 78 net::ParseIPLiteralToNumber("127.0.0.1", &ip_number); 79 address_list_ = net::AddressList::CreateFromIPAddress(ip_number, 5228); 80 } 81 82 GCMSocketStreamTest::~GCMSocketStreamTest() {} 83 84 void GCMSocketStreamTest::BuildSocket(const ReadList& read_list, 85 const WriteList& write_list) { 86 mock_reads_ = read_list; 87 mock_writes_ = write_list; 88 data_provider_.reset( 89 new net::DelayedSocketData( 90 0, 91 vector_as_array(&mock_reads_), mock_reads_.size(), 92 vector_as_array(&mock_writes_), mock_writes_.size())); 93 socket_factory_.AddSocketDataProvider(data_provider_.get()); 94 OpenConnection(); 95 ResetInputStream(); 96 ResetOutputStream(); 97 } 98 99 void GCMSocketStreamTest::PumpLoop() { 100 base::RunLoop run_loop; 101 run_loop.RunUntilIdle(); 102 } 103 104 base::StringPiece GCMSocketStreamTest::DoInputStreamRead(uint64 bytes) { 105 uint64 total_bytes_read = 0; 106 const void* initial_buffer = NULL; 107 const void* buffer = NULL; 108 int size = 0; 109 110 do { 111 DCHECK(socket_input_stream_->GetState() == SocketInputStream::EMPTY || 112 socket_input_stream_->GetState() == SocketInputStream::READY); 113 if (!socket_input_stream_->Next(&buffer, &size)) 114 break; 115 total_bytes_read += size; 116 if (initial_buffer) { // Verify the buffer doesn't skip data. 117 EXPECT_EQ(static_cast<const uint8*>(initial_buffer) + total_bytes_read, 118 static_cast<const uint8*>(buffer) + size); 119 } else { 120 initial_buffer = buffer; 121 } 122 } while (total_bytes_read < bytes); 123 124 if (total_bytes_read > bytes) { 125 socket_input_stream_->BackUp(total_bytes_read - bytes); 126 total_bytes_read = bytes; 127 } 128 129 return base::StringPiece(static_cast<const char*>(initial_buffer), 130 total_bytes_read); 131 } 132 133 uint64 GCMSocketStreamTest::DoOutputStreamWrite( 134 const base::StringPiece& write_src) { 135 DCHECK_EQ(socket_output_stream_->GetState(), SocketOutputStream::EMPTY); 136 uint64 total_bytes_written = 0; 137 void* buffer = NULL; 138 int size = 0; 139 size_t bytes = write_src.size(); 140 141 do { 142 if (!socket_output_stream_->Next(&buffer, &size)) 143 break; 144 uint64 bytes_to_write = (static_cast<uint64>(size) < bytes ? size : bytes); 145 memcpy(buffer, 146 write_src.data() + total_bytes_written, 147 bytes_to_write); 148 if (bytes_to_write < static_cast<uint64>(size)) 149 socket_output_stream_->BackUp(size - bytes_to_write); 150 total_bytes_written += bytes_to_write; 151 } while (total_bytes_written < bytes); 152 153 base::RunLoop run_loop; 154 if (socket_output_stream_->Flush(run_loop.QuitClosure()) == 155 net::ERR_IO_PENDING) { 156 run_loop.Run(); 157 } 158 159 return total_bytes_written; 160 } 161 162 void GCMSocketStreamTest::WaitForData(size_t msg_size) { 163 while (input_stream()->UnreadByteCount() < msg_size) { 164 base::RunLoop run_loop; 165 if (input_stream()->Refresh(run_loop.QuitClosure(), 166 msg_size - input_stream()->UnreadByteCount()) == 167 net::ERR_IO_PENDING) { 168 run_loop.Run(); 169 } 170 if (input_stream()->GetState() == SocketInputStream::CLOSED) 171 return; 172 } 173 } 174 175 void GCMSocketStreamTest::OpenConnection() { 176 socket_ = socket_factory_.CreateTransportClientSocket( 177 address_list_, NULL, net::NetLog::Source()); 178 socket_->Connect( 179 base::Bind(&GCMSocketStreamTest::ConnectCallback, 180 base::Unretained(this))); 181 PumpLoop(); 182 } 183 184 void GCMSocketStreamTest::ConnectCallback(int result) {} 185 186 void GCMSocketStreamTest::ResetInputStream() { 187 DCHECK(socket_.get()); 188 socket_input_stream_.reset(new SocketInputStream(socket_.get())); 189 } 190 191 void GCMSocketStreamTest::ResetOutputStream() { 192 DCHECK(socket_.get()); 193 socket_output_stream_.reset(new SocketOutputStream(socket_.get())); 194 } 195 196 // A read where all data is already available. 197 TEST_F(GCMSocketStreamTest, ReadDataSync) { 198 BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, 199 kReadData, 200 kReadDataSize)), 201 WriteList()); 202 203 WaitForData(kReadDataSize); 204 ASSERT_EQ(std::string(kReadData, kReadDataSize), 205 DoInputStreamRead(kReadDataSize)); 206 } 207 208 // A read that comes in two parts. 209 TEST_F(GCMSocketStreamTest, ReadPartialDataSync) { 210 size_t first_read_len = kReadDataSize / 2; 211 size_t second_read_len = kReadDataSize - first_read_len; 212 ReadList read_list; 213 read_list.push_back( 214 net::MockRead(net::SYNCHRONOUS, 215 kReadData, 216 first_read_len)); 217 read_list.push_back( 218 net::MockRead(net::SYNCHRONOUS, 219 &kReadData[first_read_len], 220 second_read_len)); 221 BuildSocket(read_list, WriteList()); 222 223 WaitForData(kReadDataSize); 224 ASSERT_EQ(std::string(kReadData, kReadDataSize), 225 DoInputStreamRead(kReadDataSize)); 226 } 227 228 // A read where no data is available at first (IO_PENDING will be returned). 229 TEST_F(GCMSocketStreamTest, ReadAsync) { 230 size_t first_read_len = kReadDataSize / 2; 231 size_t second_read_len = kReadDataSize - first_read_len; 232 ReadList read_list; 233 read_list.push_back( 234 net::MockRead(net::SYNCHRONOUS, net::ERR_IO_PENDING)); 235 read_list.push_back( 236 net::MockRead(net::ASYNC, kReadData, first_read_len)); 237 read_list.push_back( 238 net::MockRead(net::ASYNC, &kReadData[first_read_len], second_read_len)); 239 BuildSocket(read_list, WriteList()); 240 241 base::MessageLoop::current()->PostTask( 242 FROM_HERE, 243 base::Bind(&net::DelayedSocketData::ForceNextRead, 244 base::Unretained(data_provider()))); 245 WaitForData(kReadDataSize); 246 ASSERT_EQ(std::string(kReadData, kReadDataSize), 247 DoInputStreamRead(kReadDataSize)); 248 } 249 250 // Simulate two packets arriving at once. Read them in two separate calls. 251 TEST_F(GCMSocketStreamTest, TwoReadsAtOnce) { 252 std::string long_data = std::string(kReadData, kReadDataSize) + 253 std::string(kReadData2, kReadData2Size); 254 BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, 255 long_data.c_str(), 256 long_data.size())), 257 WriteList()); 258 259 WaitForData(kReadDataSize); 260 ASSERT_EQ(std::string(kReadData, kReadDataSize), 261 DoInputStreamRead(kReadDataSize)); 262 263 WaitForData(kReadData2Size); 264 ASSERT_EQ(std::string(kReadData2, kReadData2Size), 265 DoInputStreamRead(kReadData2Size)); 266 } 267 268 // Simulate two packets arriving at once. Read them in two calls separated 269 // by a Rebuild. 270 TEST_F(GCMSocketStreamTest, TwoReadsAtOnceWithRebuild) { 271 std::string long_data = std::string(kReadData, kReadDataSize) + 272 std::string(kReadData2, kReadData2Size); 273 BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, 274 long_data.c_str(), 275 long_data.size())), 276 WriteList()); 277 278 WaitForData(kReadDataSize); 279 ASSERT_EQ(std::string(kReadData, kReadDataSize), 280 DoInputStreamRead(kReadDataSize)); 281 282 input_stream()->RebuildBuffer(); 283 WaitForData(kReadData2Size); 284 ASSERT_EQ(std::string(kReadData2, kReadData2Size), 285 DoInputStreamRead(kReadData2Size)); 286 } 287 288 // Simulate a read that is aborted. 289 TEST_F(GCMSocketStreamTest, ReadError) { 290 int result = net::ERR_ABORTED; 291 BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, result)), 292 WriteList()); 293 294 WaitForData(kReadDataSize); 295 ASSERT_EQ(SocketInputStream::CLOSED, input_stream()->GetState()); 296 ASSERT_EQ(result, input_stream()->last_error()); 297 } 298 299 // Simulate a read after the connection is closed. 300 TEST_F(GCMSocketStreamTest, ReadDisconnected) { 301 BuildSocket(ReadList(), WriteList()); 302 socket()->Disconnect(); 303 WaitForData(kReadDataSize); 304 ASSERT_EQ(SocketInputStream::CLOSED, input_stream()->GetState()); 305 ASSERT_EQ(net::ERR_CONNECTION_CLOSED, input_stream()->last_error()); 306 } 307 308 // Write a full message in one go. 309 TEST_F(GCMSocketStreamTest, WriteFull) { 310 BuildSocket(ReadList(), 311 WriteList(1, net::MockWrite(net::SYNCHRONOUS, 312 kWriteData, 313 kWriteDataSize))); 314 ASSERT_EQ(kWriteDataSize, 315 DoOutputStreamWrite(base::StringPiece(kWriteData, 316 kWriteDataSize))); 317 } 318 319 // Write a message in two go's. 320 TEST_F(GCMSocketStreamTest, WritePartial) { 321 WriteList write_list; 322 write_list.push_back(net::MockWrite(net::SYNCHRONOUS, 323 kWriteData, 324 kWriteDataSize / 2)); 325 write_list.push_back(net::MockWrite(net::SYNCHRONOUS, 326 kWriteData + kWriteDataSize / 2, 327 kWriteDataSize / 2)); 328 BuildSocket(ReadList(), write_list); 329 ASSERT_EQ(kWriteDataSize, 330 DoOutputStreamWrite(base::StringPiece(kWriteData, 331 kWriteDataSize))); 332 } 333 334 // Write a message completely asynchronously (returns IO_PENDING before 335 // finishing the write in two go's). 336 TEST_F(GCMSocketStreamTest, WriteNone) { 337 WriteList write_list; 338 write_list.push_back(net::MockWrite(net::SYNCHRONOUS, 339 kWriteData, 340 kWriteDataSize / 2)); 341 write_list.push_back(net::MockWrite(net::SYNCHRONOUS, 342 kWriteData + kWriteDataSize / 2, 343 kWriteDataSize / 2)); 344 BuildSocket(ReadList(), write_list); 345 ASSERT_EQ(kWriteDataSize, 346 DoOutputStreamWrite(base::StringPiece(kWriteData, 347 kWriteDataSize))); 348 } 349 350 // Write a message then read a message. 351 TEST_F(GCMSocketStreamTest, WriteThenRead) { 352 BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, 353 kReadData, 354 kReadDataSize)), 355 WriteList(1, net::MockWrite(net::SYNCHRONOUS, 356 kWriteData, 357 kWriteDataSize))); 358 359 ASSERT_EQ(kWriteDataSize, 360 DoOutputStreamWrite(base::StringPiece(kWriteData, 361 kWriteDataSize))); 362 363 WaitForData(kReadDataSize); 364 ASSERT_EQ(std::string(kReadData, kReadDataSize), 365 DoInputStreamRead(kReadDataSize)); 366 } 367 368 // Read a message then write a message. 369 TEST_F(GCMSocketStreamTest, ReadThenWrite) { 370 BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, 371 kReadData, 372 kReadDataSize)), 373 WriteList(1, net::MockWrite(net::SYNCHRONOUS, 374 kWriteData, 375 kWriteDataSize))); 376 377 WaitForData(kReadDataSize); 378 ASSERT_EQ(std::string(kReadData, kReadDataSize), 379 DoInputStreamRead(kReadDataSize)); 380 381 ASSERT_EQ(kWriteDataSize, 382 DoOutputStreamWrite(base::StringPiece(kWriteData, 383 kWriteDataSize))); 384 } 385 386 // Simulate a write that gets aborted. 387 TEST_F(GCMSocketStreamTest, WriteError) { 388 int result = net::ERR_ABORTED; 389 BuildSocket(ReadList(), 390 WriteList(1, net::MockWrite(net::SYNCHRONOUS, result))); 391 DoOutputStreamWrite(base::StringPiece(kWriteData, kWriteDataSize)); 392 ASSERT_EQ(SocketOutputStream::CLOSED, output_stream()->GetState()); 393 ASSERT_EQ(result, output_stream()->last_error()); 394 } 395 396 // Simulate a write after the connection is closed. 397 TEST_F(GCMSocketStreamTest, WriteDisconnected) { 398 BuildSocket(ReadList(), WriteList()); 399 socket()->Disconnect(); 400 DoOutputStreamWrite(base::StringPiece(kWriteData, kWriteDataSize)); 401 ASSERT_EQ(SocketOutputStream::CLOSED, output_stream()->GetState()); 402 ASSERT_EQ(net::ERR_CONNECTION_CLOSED, output_stream()->last_error()); 403 } 404 405 } // namespace 406 } // namespace gcm 407