Home | History | Annotate | Download | only in streams
      1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <brillo/streams/fake_stream.h>
      6 
      7 #include <algorithm>
      8 
      9 #include <base/bind.h>
     10 #include <brillo/message_loops/message_loop.h>
     11 #include <brillo/streams/stream_utils.h>
     12 
     13 namespace brillo {
     14 
     15 namespace {
     16 
     17 // Gets a delta between the two times, makes sure that the delta is positive.
     18 base::TimeDelta CalculateDelay(const base::Time& now,
     19                                const base::Time& delay_until) {
     20   const base::TimeDelta zero_delay;
     21   if (delay_until.is_null() || now >= delay_until) {
     22     return zero_delay;
     23   }
     24 
     25   base::TimeDelta delay = delay_until - now;
     26   if (delay < zero_delay)
     27     delay = zero_delay;
     28   return delay;
     29 }
     30 
     31 // Given the current clock time, and expected delays for read and write
     32 // operations calculates the smaller wait delay of the two and sets the
     33 // resulting operation to |*mode| and the delay to wait for into |*delay|.
     34 void GetMinDelayAndMode(const base::Time& now,
     35                         bool read, const base::Time& delay_read_until,
     36                         bool write, const base::Time& delay_write_until,
     37                         Stream::AccessMode* mode, base::TimeDelta* delay) {
     38   base::TimeDelta read_delay = base::TimeDelta::Max();
     39   base::TimeDelta write_delay = base::TimeDelta::Max();
     40 
     41   if (read)
     42     read_delay = CalculateDelay(now, delay_read_until);
     43   if (write)
     44     write_delay = CalculateDelay(now, delay_write_until);
     45 
     46   if (read_delay > write_delay) {
     47     read = false;
     48   } else if (read_delay < write_delay) {
     49     write = false;
     50   }
     51   *mode = stream_utils::MakeAccessMode(read, write);
     52   *delay = std::min(read_delay, write_delay);
     53 }
     54 
     55 }  // anonymous namespace
     56 
     57 FakeStream::FakeStream(Stream::AccessMode mode,
     58                        base::Clock* clock)
     59     : mode_{mode}, clock_{clock} {}
     60 
     61 void FakeStream::AddReadPacketData(base::TimeDelta delay,
     62                                    const void* data,
     63                                    size_t size) {
     64   auto* byte_ptr = static_cast<const uint8_t*>(data);
     65   AddReadPacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
     66 }
     67 
     68 void FakeStream::AddReadPacketData(base::TimeDelta delay, brillo::Blob data) {
     69   InputDataPacket packet;
     70   packet.data = std::move(data);
     71   packet.delay_before = delay;
     72   incoming_queue_.push(std::move(packet));
     73 }
     74 
     75 void FakeStream::AddReadPacketString(base::TimeDelta delay,
     76                                      const std::string& data) {
     77   AddReadPacketData(delay, brillo::Blob{data.begin(), data.end()});
     78 }
     79 
     80 void FakeStream::QueueReadError(base::TimeDelta delay) {
     81   QueueReadErrorWithMessage(delay, std::string{});
     82 }
     83 
     84 void FakeStream::QueueReadErrorWithMessage(base::TimeDelta delay,
     85                                            const std::string& message) {
     86   InputDataPacket packet;
     87   packet.data.assign(message.begin(), message.end());
     88   packet.delay_before = delay;
     89   packet.read_error = true;
     90   incoming_queue_.push(std::move(packet));
     91 }
     92 
     93 void FakeStream::ClearReadQueue() {
     94   std::queue<InputDataPacket>().swap(incoming_queue_);
     95   delay_input_until_ = base::Time{};
     96   input_buffer_.clear();
     97   input_ptr_ = 0;
     98   report_read_error_ = 0;
     99 }
    100 
    101 void FakeStream::ExpectWritePacketSize(base::TimeDelta delay,
    102                                        size_t data_size) {
    103   OutputDataPacket packet;
    104   packet.expected_size = data_size;
    105   packet.delay_before = delay;
    106   outgoing_queue_.push(std::move(packet));
    107 }
    108 
    109 void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
    110                                        const void* data,
    111                                        size_t size) {
    112   auto* byte_ptr = static_cast<const uint8_t*>(data);
    113   ExpectWritePacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
    114 }
    115 
    116 void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
    117                                        brillo::Blob data) {
    118   OutputDataPacket packet;
    119   packet.expected_size = data.size();
    120   packet.data = std::move(data);
    121   packet.delay_before = delay;
    122   outgoing_queue_.push(std::move(packet));
    123 }
    124 
    125 void FakeStream::ExpectWritePacketString(base::TimeDelta delay,
    126                                          const std::string& data) {
    127   ExpectWritePacketData(delay, brillo::Blob{data.begin(), data.end()});
    128 }
    129 
    130 void FakeStream::QueueWriteError(base::TimeDelta delay) {
    131   QueueWriteErrorWithMessage(delay, std::string{});
    132 }
    133 
    134 void FakeStream::QueueWriteErrorWithMessage(base::TimeDelta delay,
    135                                             const std::string& message) {
    136   OutputDataPacket packet;
    137   packet.expected_size = 0;
    138   packet.data.assign(message.begin(), message.end());
    139   packet.delay_before = delay;
    140   packet.write_error = true;
    141   outgoing_queue_.push(std::move(packet));
    142 }
    143 
    144 void FakeStream::ClearWriteQueue() {
    145   std::queue<OutputDataPacket>().swap(outgoing_queue_);
    146   delay_output_until_ = base::Time{};
    147   output_buffer_.clear();
    148   expected_output_data_.clear();
    149   max_output_buffer_size_ = 0;
    150   all_output_data_.clear();
    151   report_write_error_ = 0;
    152 }
    153 
    154 const brillo::Blob& FakeStream::GetFlushedOutputData() const {
    155   return all_output_data_;
    156 }
    157 
    158 std::string FakeStream::GetFlushedOutputDataAsString() const {
    159   return std::string{all_output_data_.begin(), all_output_data_.end()};
    160 }
    161 
    162 bool FakeStream::CanRead() const {
    163   return stream_utils::IsReadAccessMode(mode_);
    164 }
    165 
    166 bool FakeStream::CanWrite() const {
    167   return stream_utils::IsWriteAccessMode(mode_);
    168 }
    169 
    170 bool FakeStream::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
    171   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
    172 }
    173 
    174 bool FakeStream::Seek(int64_t /* offset */,
    175                       Whence /* whence */,
    176                       uint64_t* /* new_position */,
    177                       ErrorPtr* error) {
    178   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
    179 }
    180 
    181 bool FakeStream::IsReadBufferEmpty() const {
    182   return input_ptr_ >= input_buffer_.size();
    183 }
    184 
    185 bool FakeStream::PopReadPacket() {
    186   if (incoming_queue_.empty())
    187     return false;
    188   const InputDataPacket& packet = incoming_queue_.front();
    189   input_ptr_ = 0;
    190   input_buffer_ = std::move(packet.data);
    191   delay_input_until_ = clock_->Now() + packet.delay_before;
    192   incoming_queue_.pop();
    193   report_read_error_ = packet.read_error;
    194   return true;
    195 }
    196 
    197 bool FakeStream::ReadNonBlocking(void* buffer,
    198                                  size_t size_to_read,
    199                                  size_t* size_read,
    200                                  bool* end_of_stream,
    201                                  ErrorPtr* error) {
    202   if (!CanRead())
    203     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
    204 
    205   if (!IsOpen())
    206     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
    207 
    208   for (;;) {
    209     if (!delay_input_until_.is_null() && clock_->Now() < delay_input_until_) {
    210       *size_read = 0;
    211       if (end_of_stream)
    212         *end_of_stream = false;
    213       break;
    214     }
    215 
    216     if (report_read_error_) {
    217       report_read_error_ = false;
    218       std::string message{input_buffer_.begin(), input_buffer_.end()};
    219       if (message.empty())
    220         message = "Simulating read error for tests";
    221       input_buffer_.clear();
    222       Error::AddTo(error, FROM_HERE, "fake_stream", "read_error", message);
    223       return false;
    224     }
    225 
    226     if (!IsReadBufferEmpty()) {
    227       size_to_read = std::min(size_to_read, input_buffer_.size() - input_ptr_);
    228       std::memcpy(buffer, input_buffer_.data() + input_ptr_, size_to_read);
    229       input_ptr_ += size_to_read;
    230       *size_read = size_to_read;
    231       if (end_of_stream)
    232         *end_of_stream = false;
    233       break;
    234     }
    235 
    236     if (!PopReadPacket()) {
    237       *size_read = 0;
    238       if (end_of_stream)
    239         *end_of_stream = true;
    240       break;
    241     }
    242   }
    243   return true;
    244 }
    245 
    246 bool FakeStream::IsWriteBufferFull() const {
    247   return output_buffer_.size() >= max_output_buffer_size_;
    248 }
    249 
    250 bool FakeStream::PopWritePacket() {
    251   if (outgoing_queue_.empty())
    252     return false;
    253   const OutputDataPacket& packet = outgoing_queue_.front();
    254   expected_output_data_ = std::move(packet.data);
    255   delay_output_until_ = clock_->Now() + packet.delay_before;
    256   max_output_buffer_size_ = packet.expected_size;
    257   report_write_error_ = packet.write_error;
    258   outgoing_queue_.pop();
    259   return true;
    260 }
    261 
    262 bool FakeStream::WriteNonBlocking(const void* buffer,
    263                                   size_t size_to_write,
    264                                   size_t* size_written,
    265                                   ErrorPtr* error) {
    266   if (!CanWrite())
    267     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
    268 
    269   if (!IsOpen())
    270     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
    271 
    272   for (;;) {
    273     if (!delay_output_until_.is_null() && clock_->Now() < delay_output_until_) {
    274       *size_written = 0;
    275       return true;
    276     }
    277 
    278     if (report_write_error_) {
    279       report_write_error_ = false;
    280       std::string message{expected_output_data_.begin(),
    281                           expected_output_data_.end()};
    282       if (message.empty())
    283         message = "Simulating write error for tests";
    284       output_buffer_.clear();
    285       max_output_buffer_size_ = 0;
    286       expected_output_data_.clear();
    287       Error::AddTo(error, FROM_HERE, "fake_stream", "write_error", message);
    288       return false;
    289     }
    290 
    291     if (!IsWriteBufferFull()) {
    292       bool success = true;
    293       size_to_write = std::min(size_to_write,
    294                                max_output_buffer_size_ - output_buffer_.size());
    295       auto byte_ptr = static_cast<const uint8_t*>(buffer);
    296       output_buffer_.insert(output_buffer_.end(),
    297                             byte_ptr, byte_ptr + size_to_write);
    298       if (output_buffer_.size()  == max_output_buffer_size_) {
    299         if (!expected_output_data_.empty() &&
    300             expected_output_data_ != output_buffer_) {
    301           // We expected different data to be written, report an error.
    302           Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
    303                        "Unexpected data written");
    304           success = false;
    305         }
    306 
    307         all_output_data_.insert(all_output_data_.end(),
    308                                 output_buffer_.begin(), output_buffer_.end());
    309 
    310         output_buffer_.clear();
    311         max_output_buffer_size_ = 0;
    312         expected_output_data_.clear();
    313       }
    314       *size_written = size_to_write;
    315       return success;
    316     }
    317 
    318     if (!PopWritePacket()) {
    319       // No more data expected.
    320       Error::AddTo(error, FROM_HERE, "fake_stream", "full",
    321                    "No more output data expected");
    322       return false;
    323     }
    324   }
    325 }
    326 
    327 bool FakeStream::FlushBlocking(ErrorPtr* error) {
    328   if (!CanWrite())
    329     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
    330 
    331   if (!IsOpen())
    332     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
    333 
    334   bool success = true;
    335   if (!output_buffer_.empty()) {
    336     if (!expected_output_data_.empty() &&
    337         expected_output_data_ != output_buffer_) {
    338       // We expected different data to be written, report an error.
    339       Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
    340                    "Unexpected data written");
    341       success = false;
    342     }
    343     all_output_data_.insert(all_output_data_.end(),
    344                             output_buffer_.begin(), output_buffer_.end());
    345 
    346     output_buffer_.clear();
    347     max_output_buffer_size_ = 0;
    348     expected_output_data_.clear();
    349   }
    350   return success;
    351 }
    352 
    353 bool FakeStream::CloseBlocking(ErrorPtr* /* error */) {
    354   is_open_ = false;
    355   return true;
    356 }
    357 
    358 bool FakeStream::WaitForData(AccessMode mode,
    359                              const base::Callback<void(AccessMode)>& callback,
    360                              ErrorPtr* error) {
    361   bool read_requested = stream_utils::IsReadAccessMode(mode);
    362   bool write_requested = stream_utils::IsWriteAccessMode(mode);
    363 
    364   if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
    365     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
    366 
    367   if (read_requested && IsReadBufferEmpty())
    368     PopReadPacket();
    369   if (write_requested && IsWriteBufferFull())
    370     PopWritePacket();
    371 
    372   base::TimeDelta delay;
    373   GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
    374                      write_requested, delay_output_until_, &mode, &delay);
    375   MessageLoop::current()->PostDelayedTask(
    376       FROM_HERE, base::Bind(callback, mode), delay);
    377   return true;
    378 }
    379 
    380 bool FakeStream::WaitForDataBlocking(AccessMode in_mode,
    381                                      base::TimeDelta timeout,
    382                                      AccessMode* out_mode,
    383                                      ErrorPtr* error) {
    384   const base::TimeDelta zero_delay;
    385   bool read_requested = stream_utils::IsReadAccessMode(in_mode);
    386   bool write_requested = stream_utils::IsWriteAccessMode(in_mode);
    387 
    388   if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
    389     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
    390 
    391   base::TimeDelta delay;
    392   GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
    393                      write_requested, delay_output_until_, out_mode, &delay);
    394 
    395   if (timeout < delay)
    396     return stream_utils::ErrorOperationTimeout(FROM_HERE, error);
    397 
    398   LOG(INFO) << "TEST: Would have blocked for " << delay.InMilliseconds()
    399             << " ms.";
    400 
    401   return true;
    402 }
    403 
    404 }  // namespace brillo
    405