1 // Copyright 2015 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include <brillo/streams/fake_stream.h> 6 7 #include <algorithm> 8 9 #include <base/bind.h> 10 #include <brillo/message_loops/message_loop.h> 11 #include <brillo/streams/stream_utils.h> 12 13 namespace brillo { 14 15 namespace { 16 17 // Gets a delta between the two times, makes sure that the delta is positive. 18 base::TimeDelta CalculateDelay(const base::Time& now, 19 const base::Time& delay_until) { 20 const base::TimeDelta zero_delay; 21 if (delay_until.is_null() || now >= delay_until) { 22 return zero_delay; 23 } 24 25 base::TimeDelta delay = delay_until - now; 26 if (delay < zero_delay) 27 delay = zero_delay; 28 return delay; 29 } 30 31 // Given the current clock time, and expected delays for read and write 32 // operations calculates the smaller wait delay of the two and sets the 33 // resulting operation to |*mode| and the delay to wait for into |*delay|. 34 void GetMinDelayAndMode(const base::Time& now, 35 bool read, const base::Time& delay_read_until, 36 bool write, const base::Time& delay_write_until, 37 Stream::AccessMode* mode, base::TimeDelta* delay) { 38 base::TimeDelta read_delay = base::TimeDelta::Max(); 39 base::TimeDelta write_delay = base::TimeDelta::Max(); 40 41 if (read) 42 read_delay = CalculateDelay(now, delay_read_until); 43 if (write) 44 write_delay = CalculateDelay(now, delay_write_until); 45 46 if (read_delay > write_delay) { 47 read = false; 48 } else if (read_delay < write_delay) { 49 write = false; 50 } 51 *mode = stream_utils::MakeAccessMode(read, write); 52 *delay = std::min(read_delay, write_delay); 53 } 54 55 } // anonymous namespace 56 57 FakeStream::FakeStream(Stream::AccessMode mode, 58 base::Clock* clock) 59 : mode_{mode}, clock_{clock} {} 60 61 void FakeStream::AddReadPacketData(base::TimeDelta delay, 62 const void* data, 63 size_t size) { 64 auto* byte_ptr = static_cast<const uint8_t*>(data); 65 AddReadPacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size}); 66 } 67 68 void FakeStream::AddReadPacketData(base::TimeDelta delay, brillo::Blob data) { 69 InputDataPacket packet; 70 packet.data = std::move(data); 71 packet.delay_before = delay; 72 incoming_queue_.push(std::move(packet)); 73 } 74 75 void FakeStream::AddReadPacketString(base::TimeDelta delay, 76 const std::string& data) { 77 AddReadPacketData(delay, brillo::Blob{data.begin(), data.end()}); 78 } 79 80 void FakeStream::QueueReadError(base::TimeDelta delay) { 81 QueueReadErrorWithMessage(delay, std::string{}); 82 } 83 84 void FakeStream::QueueReadErrorWithMessage(base::TimeDelta delay, 85 const std::string& message) { 86 InputDataPacket packet; 87 packet.data.assign(message.begin(), message.end()); 88 packet.delay_before = delay; 89 packet.read_error = true; 90 incoming_queue_.push(std::move(packet)); 91 } 92 93 void FakeStream::ClearReadQueue() { 94 std::queue<InputDataPacket>().swap(incoming_queue_); 95 delay_input_until_ = base::Time{}; 96 input_buffer_.clear(); 97 input_ptr_ = 0; 98 report_read_error_ = 0; 99 } 100 101 void FakeStream::ExpectWritePacketSize(base::TimeDelta delay, 102 size_t data_size) { 103 OutputDataPacket packet; 104 packet.expected_size = data_size; 105 packet.delay_before = delay; 106 outgoing_queue_.push(std::move(packet)); 107 } 108 109 void FakeStream::ExpectWritePacketData(base::TimeDelta delay, 110 const void* data, 111 size_t size) { 112 auto* byte_ptr = static_cast<const uint8_t*>(data); 113 ExpectWritePacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size}); 114 } 115 116 void FakeStream::ExpectWritePacketData(base::TimeDelta delay, 117 brillo::Blob data) { 118 OutputDataPacket packet; 119 packet.expected_size = data.size(); 120 packet.data = std::move(data); 121 packet.delay_before = delay; 122 outgoing_queue_.push(std::move(packet)); 123 } 124 125 void FakeStream::ExpectWritePacketString(base::TimeDelta delay, 126 const std::string& data) { 127 ExpectWritePacketData(delay, brillo::Blob{data.begin(), data.end()}); 128 } 129 130 void FakeStream::QueueWriteError(base::TimeDelta delay) { 131 QueueWriteErrorWithMessage(delay, std::string{}); 132 } 133 134 void FakeStream::QueueWriteErrorWithMessage(base::TimeDelta delay, 135 const std::string& message) { 136 OutputDataPacket packet; 137 packet.expected_size = 0; 138 packet.data.assign(message.begin(), message.end()); 139 packet.delay_before = delay; 140 packet.write_error = true; 141 outgoing_queue_.push(std::move(packet)); 142 } 143 144 void FakeStream::ClearWriteQueue() { 145 std::queue<OutputDataPacket>().swap(outgoing_queue_); 146 delay_output_until_ = base::Time{}; 147 output_buffer_.clear(); 148 expected_output_data_.clear(); 149 max_output_buffer_size_ = 0; 150 all_output_data_.clear(); 151 report_write_error_ = 0; 152 } 153 154 const brillo::Blob& FakeStream::GetFlushedOutputData() const { 155 return all_output_data_; 156 } 157 158 std::string FakeStream::GetFlushedOutputDataAsString() const { 159 return std::string{all_output_data_.begin(), all_output_data_.end()}; 160 } 161 162 bool FakeStream::CanRead() const { 163 return stream_utils::IsReadAccessMode(mode_); 164 } 165 166 bool FakeStream::CanWrite() const { 167 return stream_utils::IsWriteAccessMode(mode_); 168 } 169 170 bool FakeStream::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) { 171 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 172 } 173 174 bool FakeStream::Seek(int64_t /* offset */, 175 Whence /* whence */, 176 uint64_t* /* new_position */, 177 ErrorPtr* error) { 178 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 179 } 180 181 bool FakeStream::IsReadBufferEmpty() const { 182 return input_ptr_ >= input_buffer_.size(); 183 } 184 185 bool FakeStream::PopReadPacket() { 186 if (incoming_queue_.empty()) 187 return false; 188 const InputDataPacket& packet = incoming_queue_.front(); 189 input_ptr_ = 0; 190 input_buffer_ = std::move(packet.data); 191 delay_input_until_ = clock_->Now() + packet.delay_before; 192 incoming_queue_.pop(); 193 report_read_error_ = packet.read_error; 194 return true; 195 } 196 197 bool FakeStream::ReadNonBlocking(void* buffer, 198 size_t size_to_read, 199 size_t* size_read, 200 bool* end_of_stream, 201 ErrorPtr* error) { 202 if (!CanRead()) 203 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 204 205 if (!IsOpen()) 206 return stream_utils::ErrorStreamClosed(FROM_HERE, error); 207 208 for (;;) { 209 if (!delay_input_until_.is_null() && clock_->Now() < delay_input_until_) { 210 *size_read = 0; 211 if (end_of_stream) 212 *end_of_stream = false; 213 break; 214 } 215 216 if (report_read_error_) { 217 report_read_error_ = false; 218 std::string message{input_buffer_.begin(), input_buffer_.end()}; 219 if (message.empty()) 220 message = "Simulating read error for tests"; 221 input_buffer_.clear(); 222 Error::AddTo(error, FROM_HERE, "fake_stream", "read_error", message); 223 return false; 224 } 225 226 if (!IsReadBufferEmpty()) { 227 size_to_read = std::min(size_to_read, input_buffer_.size() - input_ptr_); 228 std::memcpy(buffer, input_buffer_.data() + input_ptr_, size_to_read); 229 input_ptr_ += size_to_read; 230 *size_read = size_to_read; 231 if (end_of_stream) 232 *end_of_stream = false; 233 break; 234 } 235 236 if (!PopReadPacket()) { 237 *size_read = 0; 238 if (end_of_stream) 239 *end_of_stream = true; 240 break; 241 } 242 } 243 return true; 244 } 245 246 bool FakeStream::IsWriteBufferFull() const { 247 return output_buffer_.size() >= max_output_buffer_size_; 248 } 249 250 bool FakeStream::PopWritePacket() { 251 if (outgoing_queue_.empty()) 252 return false; 253 const OutputDataPacket& packet = outgoing_queue_.front(); 254 expected_output_data_ = std::move(packet.data); 255 delay_output_until_ = clock_->Now() + packet.delay_before; 256 max_output_buffer_size_ = packet.expected_size; 257 report_write_error_ = packet.write_error; 258 outgoing_queue_.pop(); 259 return true; 260 } 261 262 bool FakeStream::WriteNonBlocking(const void* buffer, 263 size_t size_to_write, 264 size_t* size_written, 265 ErrorPtr* error) { 266 if (!CanWrite()) 267 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 268 269 if (!IsOpen()) 270 return stream_utils::ErrorStreamClosed(FROM_HERE, error); 271 272 for (;;) { 273 if (!delay_output_until_.is_null() && clock_->Now() < delay_output_until_) { 274 *size_written = 0; 275 return true; 276 } 277 278 if (report_write_error_) { 279 report_write_error_ = false; 280 std::string message{expected_output_data_.begin(), 281 expected_output_data_.end()}; 282 if (message.empty()) 283 message = "Simulating write error for tests"; 284 output_buffer_.clear(); 285 max_output_buffer_size_ = 0; 286 expected_output_data_.clear(); 287 Error::AddTo(error, FROM_HERE, "fake_stream", "write_error", message); 288 return false; 289 } 290 291 if (!IsWriteBufferFull()) { 292 bool success = true; 293 size_to_write = std::min(size_to_write, 294 max_output_buffer_size_ - output_buffer_.size()); 295 auto byte_ptr = static_cast<const uint8_t*>(buffer); 296 output_buffer_.insert(output_buffer_.end(), 297 byte_ptr, byte_ptr + size_to_write); 298 if (output_buffer_.size() == max_output_buffer_size_) { 299 if (!expected_output_data_.empty() && 300 expected_output_data_ != output_buffer_) { 301 // We expected different data to be written, report an error. 302 Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch", 303 "Unexpected data written"); 304 success = false; 305 } 306 307 all_output_data_.insert(all_output_data_.end(), 308 output_buffer_.begin(), output_buffer_.end()); 309 310 output_buffer_.clear(); 311 max_output_buffer_size_ = 0; 312 expected_output_data_.clear(); 313 } 314 *size_written = size_to_write; 315 return success; 316 } 317 318 if (!PopWritePacket()) { 319 // No more data expected. 320 Error::AddTo(error, FROM_HERE, "fake_stream", "full", 321 "No more output data expected"); 322 return false; 323 } 324 } 325 } 326 327 bool FakeStream::FlushBlocking(ErrorPtr* error) { 328 if (!CanWrite()) 329 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 330 331 if (!IsOpen()) 332 return stream_utils::ErrorStreamClosed(FROM_HERE, error); 333 334 bool success = true; 335 if (!output_buffer_.empty()) { 336 if (!expected_output_data_.empty() && 337 expected_output_data_ != output_buffer_) { 338 // We expected different data to be written, report an error. 339 Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch", 340 "Unexpected data written"); 341 success = false; 342 } 343 all_output_data_.insert(all_output_data_.end(), 344 output_buffer_.begin(), output_buffer_.end()); 345 346 output_buffer_.clear(); 347 max_output_buffer_size_ = 0; 348 expected_output_data_.clear(); 349 } 350 return success; 351 } 352 353 bool FakeStream::CloseBlocking(ErrorPtr* /* error */) { 354 is_open_ = false; 355 return true; 356 } 357 358 bool FakeStream::WaitForData(AccessMode mode, 359 const base::Callback<void(AccessMode)>& callback, 360 ErrorPtr* error) { 361 bool read_requested = stream_utils::IsReadAccessMode(mode); 362 bool write_requested = stream_utils::IsWriteAccessMode(mode); 363 364 if ((read_requested && !CanRead()) || (write_requested && !CanWrite())) 365 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 366 367 if (read_requested && IsReadBufferEmpty()) 368 PopReadPacket(); 369 if (write_requested && IsWriteBufferFull()) 370 PopWritePacket(); 371 372 base::TimeDelta delay; 373 GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_, 374 write_requested, delay_output_until_, &mode, &delay); 375 MessageLoop::current()->PostDelayedTask( 376 FROM_HERE, base::Bind(callback, mode), delay); 377 return true; 378 } 379 380 bool FakeStream::WaitForDataBlocking(AccessMode in_mode, 381 base::TimeDelta timeout, 382 AccessMode* out_mode, 383 ErrorPtr* error) { 384 bool read_requested = stream_utils::IsReadAccessMode(in_mode); 385 bool write_requested = stream_utils::IsWriteAccessMode(in_mode); 386 387 if ((read_requested && !CanRead()) || (write_requested && !CanWrite())) 388 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 389 390 base::TimeDelta delay; 391 GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_, 392 write_requested, delay_output_until_, out_mode, &delay); 393 394 if (timeout < delay) 395 return stream_utils::ErrorOperationTimeout(FROM_HERE, error); 396 397 LOG(INFO) << "TEST: Would have blocked for " << delay.InMilliseconds() 398 << " ms."; 399 400 return true; 401 } 402 403 } // namespace brillo 404