Home | History | Annotate | Download | only in streams
      1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <brillo/streams/stream.h>
      6 
      7 #include <algorithm>
      8 
      9 #include <base/bind.h>
     10 #include <brillo/message_loops/message_loop.h>
     11 #include <brillo/pointer_utils.h>
     12 #include <brillo/streams/stream_errors.h>
     13 #include <brillo/streams/stream_utils.h>
     14 
     15 namespace brillo {
     16 
     17 bool Stream::TruncateBlocking(ErrorPtr* error) {
     18   return SetSizeBlocking(GetPosition(), error);
     19 }
     20 
     21 bool Stream::SetPosition(uint64_t position, ErrorPtr* error) {
     22   if (!stream_utils::CheckInt64Overflow(FROM_HERE, position, 0, error))
     23     return false;
     24   return Seek(position, Whence::FROM_BEGIN, nullptr, error);
     25 }
     26 
     27 bool Stream::ReadAsync(void* buffer,
     28                        size_t size_to_read,
     29                        const base::Callback<void(size_t)>& success_callback,
     30                        const ErrorCallback& error_callback,
     31                        ErrorPtr* error) {
     32   if (is_async_read_pending_) {
     33     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
     34                  errors::stream::kOperationNotSupported,
     35                  "Another asynchronous operation is still pending");
     36     return false;
     37   }
     38 
     39   auto callback = base::Bind(&Stream::IgnoreEOSCallback, success_callback);
     40   // If we can read some data right away non-blocking we should still run the
     41   // callback from the main loop, so we pass true here for force_async_callback.
     42   return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
     43                        true);
     44 }
     45 
     46 bool Stream::ReadAllAsync(void* buffer,
     47                           size_t size_to_read,
     48                           const base::Closure& success_callback,
     49                           const ErrorCallback& error_callback,
     50                           ErrorPtr* error) {
     51   if (is_async_read_pending_) {
     52     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
     53                  errors::stream::kOperationNotSupported,
     54                  "Another asynchronous operation is still pending");
     55     return false;
     56   }
     57 
     58   auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
     59                              weak_ptr_factory_.GetWeakPtr(), buffer,
     60                              size_to_read, success_callback, error_callback);
     61   return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
     62                        true);
     63 }
     64 
     65 bool Stream::ReadBlocking(void* buffer,
     66                           size_t size_to_read,
     67                           size_t* size_read,
     68                           ErrorPtr* error) {
     69   for (;;) {
     70     bool eos = false;
     71     if (!ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
     72       return false;
     73 
     74     if (*size_read > 0 || eos)
     75       break;
     76 
     77     if (!WaitForDataBlocking(AccessMode::READ, base::TimeDelta::Max(), nullptr,
     78                              error)) {
     79       return false;
     80     }
     81   }
     82   return true;
     83 }
     84 
     85 bool Stream::ReadAllBlocking(void* buffer,
     86                              size_t size_to_read,
     87                              ErrorPtr* error) {
     88   while (size_to_read > 0) {
     89     size_t size_read = 0;
     90     if (!ReadBlocking(buffer, size_to_read, &size_read, error))
     91       return false;
     92 
     93     if (size_read == 0)
     94       return stream_utils::ErrorReadPastEndOfStream(FROM_HERE, error);
     95 
     96     size_to_read -= size_read;
     97     buffer = AdvancePointer(buffer, size_read);
     98   }
     99   return true;
    100 }
    101 
    102 bool Stream::WriteAsync(const void* buffer,
    103                         size_t size_to_write,
    104                         const base::Callback<void(size_t)>& success_callback,
    105                         const ErrorCallback& error_callback,
    106                         ErrorPtr* error) {
    107   if (is_async_write_pending_) {
    108     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
    109                  errors::stream::kOperationNotSupported,
    110                  "Another asynchronous operation is still pending");
    111     return false;
    112   }
    113   // If we can read some data right away non-blocking we should still run the
    114   // callback from the main loop, so we pass true here for force_async_callback.
    115   return WriteAsyncImpl(buffer, size_to_write, success_callback, error_callback,
    116                         error, true);
    117 }
    118 
    119 bool Stream::WriteAllAsync(const void* buffer,
    120                            size_t size_to_write,
    121                            const base::Closure& success_callback,
    122                            const ErrorCallback& error_callback,
    123                            ErrorPtr* error) {
    124   if (is_async_write_pending_) {
    125     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
    126                  errors::stream::kOperationNotSupported,
    127                  "Another asynchronous operation is still pending");
    128     return false;
    129   }
    130 
    131   auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
    132                              weak_ptr_factory_.GetWeakPtr(), buffer,
    133                              size_to_write, success_callback, error_callback);
    134   return WriteAsyncImpl(buffer, size_to_write, callback, error_callback, error,
    135                         true);
    136 }
    137 
    138 bool Stream::WriteBlocking(const void* buffer,
    139                            size_t size_to_write,
    140                            size_t* size_written,
    141                            ErrorPtr* error) {
    142   for (;;) {
    143     if (!WriteNonBlocking(buffer, size_to_write, size_written, error))
    144       return false;
    145 
    146     if (*size_written > 0 || size_to_write == 0)
    147       break;
    148 
    149     if (!WaitForDataBlocking(AccessMode::WRITE, base::TimeDelta::Max(), nullptr,
    150                              error)) {
    151       return false;
    152     }
    153   }
    154   return true;
    155 }
    156 
    157 bool Stream::WriteAllBlocking(const void* buffer,
    158                               size_t size_to_write,
    159                               ErrorPtr* error) {
    160   while (size_to_write > 0) {
    161     size_t size_written = 0;
    162     if (!WriteBlocking(buffer, size_to_write, &size_written, error))
    163       return false;
    164 
    165     if (size_written == 0) {
    166       Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
    167                    errors::stream::kPartialData,
    168                    "Failed to write all the data");
    169       return false;
    170     }
    171     size_to_write -= size_written;
    172     buffer = AdvancePointer(buffer, size_written);
    173   }
    174   return true;
    175 }
    176 
    177 bool Stream::FlushAsync(const base::Closure& success_callback,
    178                         const ErrorCallback& error_callback,
    179                         ErrorPtr* /* error */) {
    180   auto callback = base::Bind(&Stream::FlushAsyncCallback,
    181                              weak_ptr_factory_.GetWeakPtr(),
    182                              success_callback, error_callback);
    183   MessageLoop::current()->PostTask(FROM_HERE, callback);
    184   return true;
    185 }
    186 
    187 void Stream::IgnoreEOSCallback(
    188     const base::Callback<void(size_t)>& success_callback,
    189     size_t bytes,
    190     bool /* eos */) {
    191   success_callback.Run(bytes);
    192 }
    193 
    194 bool Stream::ReadAsyncImpl(
    195     void* buffer,
    196     size_t size_to_read,
    197     const base::Callback<void(size_t, bool)>& success_callback,
    198     const ErrorCallback& error_callback,
    199     ErrorPtr* error,
    200     bool force_async_callback) {
    201   CHECK(!is_async_read_pending_);
    202   // We set this value to true early in the function so calling others will
    203   // prevent us from calling WaitForData() to make calls to
    204   // ReadAsync() fail while we run WaitForData().
    205   is_async_read_pending_ = true;
    206 
    207   size_t read = 0;
    208   bool eos = false;
    209   if (!ReadNonBlocking(buffer, size_to_read, &read, &eos, error))
    210     return false;
    211 
    212   if (read > 0 || eos) {
    213     if (force_async_callback) {
    214       MessageLoop::current()->PostTask(
    215           FROM_HERE,
    216           base::Bind(&Stream::OnReadAsyncDone, weak_ptr_factory_.GetWeakPtr(),
    217                      success_callback, read, eos));
    218     } else {
    219       is_async_read_pending_ = false;
    220       success_callback.Run(read, eos);
    221     }
    222     return true;
    223   }
    224 
    225   is_async_read_pending_ = WaitForData(
    226       AccessMode::READ,
    227       base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(),
    228                  buffer, size_to_read, success_callback, error_callback),
    229       error);
    230   return is_async_read_pending_;
    231 }
    232 
    233 void Stream::OnReadAsyncDone(
    234     const base::Callback<void(size_t, bool)>& success_callback,
    235     size_t bytes_read,
    236     bool eos) {
    237   is_async_read_pending_ = false;
    238   success_callback.Run(bytes_read, eos);
    239 }
    240 
    241 void Stream::OnReadAvailable(
    242     void* buffer,
    243     size_t size_to_read,
    244     const base::Callback<void(size_t, bool)>& success_callback,
    245     const ErrorCallback& error_callback,
    246     AccessMode mode) {
    247   CHECK(stream_utils::IsReadAccessMode(mode));
    248   CHECK(is_async_read_pending_);
    249   is_async_read_pending_ = false;
    250   ErrorPtr error;
    251   // Just reschedule the read operation but don't need to run the callback from
    252   // the main loop since we are already running on a callback.
    253   if (!ReadAsyncImpl(buffer, size_to_read, success_callback, error_callback,
    254                      &error, false)) {
    255     error_callback.Run(error.get());
    256   }
    257 }
    258 
    259 bool Stream::WriteAsyncImpl(
    260     const void* buffer,
    261     size_t size_to_write,
    262     const base::Callback<void(size_t)>& success_callback,
    263     const ErrorCallback& error_callback,
    264     ErrorPtr* error,
    265     bool force_async_callback) {
    266   CHECK(!is_async_write_pending_);
    267   // We set this value to true early in the function so calling others will
    268   // prevent us from calling WaitForData() to make calls to
    269   // ReadAsync() fail while we run WaitForData().
    270   is_async_write_pending_ = true;
    271 
    272   size_t written = 0;
    273   if (!WriteNonBlocking(buffer, size_to_write, &written, error))
    274     return false;
    275 
    276   if (written > 0) {
    277     if (force_async_callback) {
    278       MessageLoop::current()->PostTask(
    279           FROM_HERE,
    280           base::Bind(&Stream::OnWriteAsyncDone, weak_ptr_factory_.GetWeakPtr(),
    281                      success_callback, written));
    282     } else {
    283       is_async_write_pending_ = false;
    284       success_callback.Run(written);
    285     }
    286     return true;
    287   }
    288   is_async_write_pending_ = WaitForData(
    289       AccessMode::WRITE,
    290       base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(),
    291                  buffer, size_to_write, success_callback, error_callback),
    292       error);
    293   return is_async_write_pending_;
    294 }
    295 
    296 void Stream::OnWriteAsyncDone(
    297     const base::Callback<void(size_t)>& success_callback,
    298     size_t size_written) {
    299   is_async_write_pending_ = false;
    300   success_callback.Run(size_written);
    301 }
    302 
    303 void Stream::OnWriteAvailable(
    304     const void* buffer,
    305     size_t size,
    306     const base::Callback<void(size_t)>& success_callback,
    307     const ErrorCallback& error_callback,
    308     AccessMode mode) {
    309   CHECK(stream_utils::IsWriteAccessMode(mode));
    310   CHECK(is_async_write_pending_);
    311   is_async_write_pending_ = false;
    312   ErrorPtr error;
    313   // Just reschedule the read operation but don't need to run the callback from
    314   // the main loop since we are already running on a callback.
    315   if (!WriteAsyncImpl(buffer, size, success_callback, error_callback, &error,
    316                       false)) {
    317     error_callback.Run(error.get());
    318   }
    319 }
    320 
    321 void Stream::ReadAllAsyncCallback(void* buffer,
    322                                   size_t size_to_read,
    323                                   const base::Closure& success_callback,
    324                                   const ErrorCallback& error_callback,
    325                                   size_t size_read,
    326                                   bool eos) {
    327   ErrorPtr error;
    328   size_to_read -= size_read;
    329   if (size_to_read != 0 && eos) {
    330     stream_utils::ErrorReadPastEndOfStream(FROM_HERE, &error);
    331     error_callback.Run(error.get());
    332     return;
    333   }
    334 
    335   if (size_to_read) {
    336     buffer = AdvancePointer(buffer, size_read);
    337     auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
    338                                weak_ptr_factory_.GetWeakPtr(), buffer,
    339                                size_to_read, success_callback, error_callback);
    340     if (!ReadAsyncImpl(buffer, size_to_read, callback, error_callback, &error,
    341                        false)) {
    342       error_callback.Run(error.get());
    343     }
    344   } else {
    345     success_callback.Run();
    346   }
    347 }
    348 
    349 void Stream::WriteAllAsyncCallback(const void* buffer,
    350                                    size_t size_to_write,
    351                                    const base::Closure& success_callback,
    352                                    const ErrorCallback& error_callback,
    353                                    size_t size_written) {
    354   ErrorPtr error;
    355   if (size_to_write != 0 && size_written == 0) {
    356     Error::AddTo(&error, FROM_HERE, errors::stream::kDomain,
    357                  errors::stream::kPartialData, "Failed to write all the data");
    358     error_callback.Run(error.get());
    359     return;
    360   }
    361   size_to_write -= size_written;
    362   if (size_to_write) {
    363     buffer = AdvancePointer(buffer, size_written);
    364     auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
    365                                weak_ptr_factory_.GetWeakPtr(), buffer,
    366                                size_to_write, success_callback, error_callback);
    367     if (!WriteAsyncImpl(buffer, size_to_write, callback, error_callback, &error,
    368                         false)) {
    369       error_callback.Run(error.get());
    370     }
    371   } else {
    372     success_callback.Run();
    373   }
    374 }
    375 
    376 void Stream::FlushAsyncCallback(const base::Closure& success_callback,
    377                                 const ErrorCallback& error_callback) {
    378   ErrorPtr error;
    379   if (FlushBlocking(&error)) {
    380     success_callback.Run();
    381   } else {
    382     error_callback.Run(error.get());
    383   }
    384 }
    385 
    386 void Stream::CancelPendingAsyncOperations() {
    387   weak_ptr_factory_.InvalidateWeakPtrs();
    388   is_async_read_pending_ = false;
    389   is_async_write_pending_ = false;
    390 }
    391 
    392 }  // namespace brillo
    393