Home | History | Annotate | Download | only in streams
      1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef LIBBRILLO_BRILLO_STREAMS_STREAM_H_
      6 #define LIBBRILLO_BRILLO_STREAMS_STREAM_H_
      7 
      8 #include <cstdint>
      9 #include <memory>
     10 
     11 #include <base/callback.h>
     12 #include <base/macros.h>
     13 #include <base/memory/weak_ptr.h>
     14 #include <base/time/time.h>
     15 #include <brillo/brillo_export.h>
     16 #include <brillo/errors/error.h>
     17 
     18 namespace brillo {
     19 
     20 // Stream is a base class that specific stream storage implementations must
     21 // derive from to provide I/O facilities.
     22 // The stream class provides general streaming I/O primitives to read, write and
     23 // seek within a stream. It has methods for asynchronous (callback-based) as
     24 // well as synchronous (both blocking and non-blocking) operations.
     25 // The Stream class is abstract and cannot be created by itself.
     26 // In order to construct a stream, you must use one of the derived classes'
     27 // factory methods which return a stream smart pointer (StreamPtr):
     28 //
     29 //    StreamPtr input_stream = FileStream::Open(path, AccessMode::READ);
     30 //    StreamPtr output_stream = MemoryStream::Create();
     31 //    uint8_t buf[1000];
     32 //    size_t read = 0;
     33 //    while (input_stream->ReadBlocking(buf, sizeof(buf), &read, nullptr)) {
     34 //      if (read == 0) break;
     35 //      output_stream->WriteAllBlocking(buf, read, nullptr);
     36 //    }
     37 //
     38 // NOTE ABOUT ASYNCHRONOUS OPERATIONS: Asynchronous I/O relies on a MessageLoop
     39 // instance to be present on the current thread. Using Stream::ReadAsync(),
     40 // Stream::WriteAsync() and similar will call MessageLoop::current() to access
     41 // the current message loop and abort if there isn't one for the current thread.
     42 // Also, only one outstanding asynchronous operation of particular kind (reading
     43 // or writing) at a time is supported. Trying to call ReadAsync() while another
     44 // asynchronous read operation is pending will fail with an error
     45 // ("operation_not_supported").
     46 //
     47 // NOTE ABOUT READING FROM/WRITING TO STREAMS: In many cases underlying streams
     48 // use buffered I/O. Using all read/write methods other than ReadAllAsync(),
     49 // ReadAllBlocking(), WriteAllAsync(), WriteAllBlocking() will return
     50 // immediately if there is any data available in the underlying buffer. That is,
     51 // trying to read 1000 bytes while the internal buffer contains only 100 will
     52 // return immediately with just those 100 bytes and no blocking or other I/O
     53 // traffic will be incurred. This guarantee is important for efficient and
     54 // correct implementation of duplex communication over pipes and sockets.
     55 //
     56 // NOTE TO IMPLEMENTERS: When creating new stream types, you must derive
     57 // from this class and provide the implementation for its pure virtual methods.
     58 // For operations that do not apply to your stream, make sure the corresponding
     59 // methods return "false" and set the error to "operation_not_supported".
     60 // You should use stream_utils::ErrorOperationNotSupported() for this. Also
     61 // Make sure the stream capabilities functions like CanRead(), etc return
     62 // correct values:
     63 //
     64 //    bool MyReadOnlyStream::CanRead() const { return true; }
     65 //    bool MyReadOnlyStream::CanWrite() const { return false; }
     66 //    bool MyReadOnlyStream::WriteBlocking(const void* buffer,
     67 //                                         size_t size_to_write,
     68 //                                         size_t* size_written,
     69 //                                         ErrorPtr* error) {
     70 //      return stream_utils::ErrorOperationNotSupported(error);
     71 //    }
     72 //
     73 // The class should also provide a static factory methods to create/open
     74 // a new stream:
     75 //
     76 //    static StreamPtr MyReadOnlyStream::Open(..., ErrorPtr* error) {
     77 //      auto my_stream = std::make_unique<MyReadOnlyStream>(...);
     78 //      if (!my_stream->Initialize(..., error))
     79 //        my_stream.reset();
     80 //      }
     81 //      return my_stream;
     82 //    }
     83 //
     84 class BRILLO_EXPORT Stream {
     85  public:
     86   // When seeking in streams, whence specifies the origin of the seek operation.
     87   enum class Whence { FROM_BEGIN, FROM_CURRENT, FROM_END };
     88   // Stream access mode for open operations (used in derived classes).
     89   enum class AccessMode { READ, WRITE, READ_WRITE };
     90 
     91   // Standard error callback for asynchronous operations.
     92   using ErrorCallback = base::Callback<void(const Error*)>;
     93 
     94   virtual ~Stream() = default;
     95 
     96   // == Stream capabilities ===================================================
     97 
     98   // Returns true while stream is open. Closing the last reference to the stream
     99   // will make this method return false.
    100   virtual bool IsOpen() const = 0;
    101 
    102   // Called to determine if read operations are supported on the stream (stream
    103   // is readable). This method does not check if there is actually any data to
    104   // read, only the fact that the stream is open in read mode and can be read
    105   // from in general.
    106   // If CanRead() returns false, it is guaranteed that the stream can't be
    107   // read from. However, if it returns true, there is no guarantee that the
    108   // subsequent read operation will actually succeed (for example, the stream
    109   // position could be at the end of the data stream, or the access mode of
    110   // the stream is unknown beforehand).
    111   virtual bool CanRead() const = 0;
    112 
    113   // Called to determine if write operations are supported on the stream (stream
    114   // is writable).
    115   // If CanWrite() returns false, it is guaranteed that the stream can't be
    116   // written to. However, if it returns true, the subsequent write operation
    117   // is not guaranteed to succeed (e.g. the output media could be out of free
    118   // space or a transport error could occur).
    119   virtual bool CanWrite() const = 0;
    120 
    121   // Called to determine if random access I/O operations are supported on
    122   // the stream. Sequential streams should return false.
    123   // If CanSeek() returns false, it is guaranteed that the stream can't use
    124   // Seek(). However, if it returns true, it might be possible to seek, but this
    125   // is not guaranteed since the actual underlying stream capabilities might
    126   // not be known.
    127   // Note that non-seekable streams might still maintain the current stream
    128   // position and GetPosition method might still be used even if CanSeek()
    129   // returns false. However SetPosition() will almost always fail in such
    130   // a case.
    131   virtual bool CanSeek() const = 0;
    132 
    133   // Called to determine if the size of the stream is known. Size of some
    134   // sequential streams (e.g. based on pipes) is unknown beforehand, so this
    135   // method can be used to check how reliable a call to GetSize() is.
    136   virtual bool CanGetSize() const = 0;
    137 
    138   // == Stream size operations ================================================
    139 
    140   // Returns the size of stream data.
    141   // If the stream size is unavailable/unknown, it returns 0.
    142   virtual uint64_t GetSize() const = 0;
    143 
    144   // Resizes the stream storage to |size|. Stream must be writable and support
    145   // this operation.
    146   virtual bool SetSizeBlocking(uint64_t size, ErrorPtr* error) = 0;
    147 
    148   // Truncates the stream at the current stream pointer.
    149   // Calls SetSizeBlocking(GetPosition(), ...).
    150   bool TruncateBlocking(ErrorPtr* error);
    151 
    152   // Returns the amount of data remaining in the stream. If the size of the
    153   // stream is unknown, or if the stream pointer is at or past the end of the
    154   // stream, the function returns 0.
    155   virtual uint64_t GetRemainingSize() const = 0;
    156 
    157   // == Seek operations =======================================================
    158 
    159   // Gets the position of the stream I/O pointer from the beginning of the
    160   // stream. If the stream position is unavailable/unknown, it returns 0.
    161   virtual uint64_t GetPosition() const = 0;
    162 
    163   // Moves the stream pointer to the specified position, relative to the
    164   // beginning of the stream. This calls Seek(position, Whence::FROM_BEGIN),
    165   // however it also provides proper |position| validation to ensure that
    166   // it doesn't overflow the range of signed int64_t used by Seek.
    167   bool SetPosition(uint64_t position, ErrorPtr* error);
    168 
    169   // Moves the stream pointer by |offset| bytes relative to |whence|.
    170   // When successful, returns true and sets the new pointer position from the
    171   // beginning of the stream to |new_position|. If |new_position| is nullptr,
    172   // new stream position is not returned.
    173   // On error, returns false and specifies additional details in |error| if it
    174   // is not nullptr.
    175   virtual bool Seek(int64_t offset,
    176                     Whence whence,
    177                     uint64_t* new_position,
    178                     ErrorPtr* error) = 0;
    179 
    180   // == Read operations =======================================================
    181 
    182   // -- Asynchronous ----------------------------------------------------------
    183 
    184   // Reads up to |size_to_read| bytes from the stream asynchronously. It is not
    185   // guaranteed that all requested data will be read. It is not an error for
    186   // this function to read fewer bytes than requested. If the function reads
    187   // zero bytes, it means that the end of stream is reached.
    188   // Upon successful read, the |success_callback| will be invoked with the
    189   // actual number of bytes read.
    190   // If an error occurs during the asynchronous operation, the |error_callback|
    191   // is invoked with the error details. The error object pointer passed in as a
    192   // parameter to the |error_callback| is valid only for the duration of that
    193   // callback.
    194   // If this function successfully schedules an asynchronous operation, it
    195   // returns true. If it fails immediately, it will return false and set the
    196   // error details to |error| object and will not call the success or error
    197   // callbacks.
    198   // The |buffer| must be at least |size_to_read| in size and must remain
    199   // valid for the duration of the asynchronous operation (until either
    200   // |success_callback| or |error_callback| is called).
    201   // Only one asynchronous operation at a time is allowed on the stream (read
    202   // and/or write)
    203   // Uses ReadNonBlocking() and MonitorDataAvailable().
    204   virtual bool ReadAsync(void* buffer,
    205                          size_t size_to_read,
    206                          const base::Callback<void(size_t)>& success_callback,
    207                          const ErrorCallback& error_callback,
    208                          ErrorPtr* error);
    209 
    210   // Similar to ReadAsync() operation above but reads exactly |size_to_read|
    211   // bytes from the stream into the |buffer|. Attempt to read past the end of
    212   // the stream is considered an error in this case and will trigger the
    213   // |error_callback|. The rest of restrictions and conditions of ReadAsync()
    214   // method applies to ReadAllAsync() as well.
    215   // Uses ReadNonBlocking() and MonitorDataAvailable().
    216   virtual bool ReadAllAsync(void* buffer,
    217                             size_t size_to_read,
    218                             const base::Closure& success_callback,
    219                             const ErrorCallback& error_callback,
    220                             ErrorPtr* error);
    221 
    222   // -- Synchronous non-blocking ----------------------------------------------
    223 
    224   // Reads up to |size_to_read| bytes from the stream without blocking.
    225   // The |buffer| must be at least |size_to_read| in size. It is not an error
    226   // for this function to return without reading all (or any) the data.
    227   // The actual amount of data read (which could be 0 bytes) is returned in
    228   // |size_read|.
    229   // On error, the function returns false and specifies additional error details
    230   // in |error|.
    231   // If end of stream is reached or if no data is currently available to be read
    232   // without blocking, |size_read| will contain 0 and the function will still
    233   // return true (success). In case of end-of-stream scenario, |end_of_stream|
    234   // will also be set to true to indicate that no more data is available.
    235   virtual bool ReadNonBlocking(void* buffer,
    236                                size_t size_to_read,
    237                                size_t* size_read,
    238                                bool* end_of_stream,
    239                                ErrorPtr* error) = 0;
    240 
    241   // -- Synchronous blocking --------------------------------------------------
    242 
    243   // Reads up to |size_to_read| bytes from the stream. This function will block
    244   // until at least one byte is read or the end of stream is reached or until
    245   // the stream is closed.
    246   // The |buffer| must be at least |size_to_read| in size. It is not an error
    247   // for this function to return without reading all the data. The actual amount
    248   // of data read (which could be 0 bytes) is returned in |size_read|.
    249   // On error, the function returns false and specifies additional error details
    250   // in |error|. In this case, the state of the stream pointer is undefined,
    251   // since some bytes might have been read successfully (and the pointer moved)
    252   // before the error has occurred and |size_read| is not updated.
    253   // If end of stream is reached, |size_read| will contain 0 and the function
    254   // will still return true (success).
    255   virtual bool ReadBlocking(void* buffer,
    256                             size_t size_to_read,
    257                             size_t* size_read,
    258                             ErrorPtr* error);
    259 
    260   // Reads exactly |size_to_read| bytes to |buffer|. Returns false on error
    261   // (reading fewer than requested bytes is treated as an error as well).
    262   // Calls ReadAllBlocking() repeatedly until all the data is read.
    263   virtual bool ReadAllBlocking(void* buffer,
    264                                size_t size_to_read,
    265                                ErrorPtr* error);
    266 
    267   // == Write operations ======================================================
    268 
    269   // -- Asynchronous ----------------------------------------------------------
    270 
    271   // Writes up to |size_to_write| bytes from |buffer| to the stream
    272   // asynchronously. It is not guaranteed that all requested data will be
    273   // written. It is not an error for this function to write fewer bytes than
    274   // requested.
    275   // Upon successful write, the |success_callback| will be invoked with the
    276   // actual number of bytes written.
    277   // If an error occurs during the asynchronous operation, the |error_callback|
    278   // is invoked with the error details. The error object pointer is valid only
    279   // for the duration of the error callback.
    280   // If this function successfully schedules an asynchronous operation, it
    281   // returns true. If it fails immediately, it will return false and set the
    282   // error details to |error| object and will not call the success or error
    283   // callbacks.
    284   // The |buffer| must be at least |size_to_write| in size and must remain
    285   // valid for the duration of the asynchronous operation (until either
    286   // |success_callback| or |error_callback| is called).
    287   // Only one asynchronous operation at a time is allowed on the stream (read
    288   // and/or write).
    289   // Uses WriteNonBlocking() and MonitorDataAvailable().
    290   virtual bool WriteAsync(const void* buffer,
    291                           size_t size_to_write,
    292                           const base::Callback<void(size_t)>& success_callback,
    293                           const ErrorCallback& error_callback,
    294                           ErrorPtr* error);
    295 
    296   // Similar to WriteAsync() operation above but writes exactly |size_to_write|
    297   // bytes from |buffet| to the stream. When all the data is written
    298   // successfully, the |success_callback| is invoked.
    299   // The rest of restrictions and conditions of WriteAsync() method applies to
    300   // WriteAllAsync() as well.
    301   // Uses WriteNonBlocking() and MonitorDataAvailable().
    302   virtual bool WriteAllAsync(const void* buffer,
    303                              size_t size_to_write,
    304                              const base::Closure& success_callback,
    305                              const ErrorCallback& error_callback,
    306                              ErrorPtr* error);
    307 
    308   // -- Synchronous non-blocking ----------------------------------------------
    309 
    310   // Writes up to |size_to_write| bytes to the stream. The |buffer| must be at
    311   // least |size_to_write| in size. It is not an error for this function to
    312   // return without writing all the data requested (or any data at all).
    313   // The actual amount of data written is returned in |size_written|.
    314   // On error, the function returns false and specifies additional error details
    315   // in |error|.
    316   virtual bool WriteNonBlocking(const void* buffer,
    317                                 size_t size_to_write,
    318                                 size_t* size_written,
    319                                 ErrorPtr* error) = 0;
    320 
    321   // -- Synchronous blocking --------------------------------------------------
    322 
    323   // Writes up to |size_to_write| bytes to the stream. The |buffer| must be at
    324   // least |size_to_write| in size. It is not an error for this function to
    325   // return without writing all the data requested. The actual amount of data
    326   // written is returned in |size_written|.
    327   // On error, the function returns false and specifies additional error details
    328   // in |error|.
    329   virtual bool WriteBlocking(const void* buffer,
    330                              size_t size_to_write,
    331                              size_t* size_written,
    332                              ErrorPtr* error);
    333 
    334   // Writes exactly |size_to_write| bytes to |buffer|. Returns false on error
    335   // (writing fewer than requested bytes is treated as an error as well).
    336   // Calls WriteBlocking() repeatedly until all the data is written.
    337   virtual bool WriteAllBlocking(const void* buffer,
    338                                 size_t size_to_write,
    339                                 ErrorPtr* error);
    340 
    341   // == Finalizing/closing streams  ===========================================
    342 
    343   // Flushes all the user-space data from cache output buffers to storage
    344   // medium. For read-only streams this is a no-op, however it is still valid
    345   // to call this method on read-only streams.
    346   // If an error occurs, the function returns false and specifies additional
    347   // error details in |error|.
    348   virtual bool FlushBlocking(ErrorPtr* error) = 0;
    349 
    350   // Flushes all the user-space data from the cache output buffer
    351   // asynchronously. When all the data is successfully flushed, the
    352   // |success_callback| is invoked. If an error occurs while flushing, partial
    353   // data might be flushed and |error_callback| is invoked. If there's an error
    354   // scheduling the flush operation, it returns false and neither callback will
    355   // be called.
    356   virtual bool FlushAsync(const base::Closure& success_callback,
    357                           const ErrorCallback& error_callback,
    358                           ErrorPtr* error);
    359 
    360   // Closes the underlying stream. The stream is also automatically closed
    361   // when the stream object is destroyed, but since closing a stream is
    362   // an operation that may fail, in situations when it is important to detect
    363   // the failure to close the stream, CloseBlocking() should be used explicitly
    364   // before destroying the stream object.
    365   virtual bool CloseBlocking(ErrorPtr* error) = 0;
    366 
    367   // == Data availability monitoring ==========================================
    368 
    369   // Overloaded by derived classes to provide stream monitoring for read/write
    370   // data availability for the stream. Calls |callback| when data can be read
    371   // and/or written without blocking.
    372   // |mode| specifies the type of operation to monitor for (read, write, both).
    373   virtual bool WaitForData(AccessMode mode,
    374                            const base::Callback<void(AccessMode)>& callback,
    375                            ErrorPtr* error) = 0;
    376 
    377   // Helper function for implementing blocking I/O. Blocks until the
    378   // non-blocking operation specified by |in_mode| can be performed.
    379   // If |out_mode| is not nullptr, it receives the actual operation that can be
    380   // performed. For example, watching a stream for READ_WRITE while only
    381   // READ can be performed, |out_mode| would contain READ even though |in_mode|
    382   // was set to READ_WRITE.
    383   // |timeout| is the maximum amount of time to wait. Set it to TimeDelta::Max()
    384   // to wait indefinitely.
    385   virtual bool WaitForDataBlocking(AccessMode in_mode,
    386                                    base::TimeDelta timeout,
    387                                    AccessMode* out_mode,
    388                                    ErrorPtr* error) = 0;
    389 
    390   // Cancels pending asynchronous read/write operations.
    391   virtual void CancelPendingAsyncOperations();
    392 
    393  protected:
    394   Stream() = default;
    395 
    396  private:
    397   // Simple wrapper to call the externally exposed |success_callback| that only
    398   // receives a size_t.
    399   BRILLO_PRIVATE static void IgnoreEOSCallback(
    400       const base::Callback<void(size_t)>& success_callback,
    401       size_t read,
    402       bool eos);
    403 
    404   // The internal implementation of ReadAsync() and ReadAllAsync().
    405   // Calls ReadNonBlocking and if there's no data available waits for it calling
    406   // WaitForData(). The extra |force_async_callback| tell whether the success
    407   // callback should be called from the main loop instead of directly from this
    408   // method. This method only calls WaitForData() if ReadNonBlocking() returns a
    409   // situation in which it would block (bytes_read = 0 and eos = false),
    410   // preventing us from calling WaitForData() on streams that don't support such
    411   // feature.
    412   BRILLO_PRIVATE bool ReadAsyncImpl(
    413       void* buffer,
    414       size_t size_to_read,
    415       const base::Callback<void(size_t, bool)>& success_callback,
    416       const ErrorCallback& error_callback,
    417       ErrorPtr* error,
    418       bool force_async_callback);
    419 
    420   // Called from the main loop when the ReadAsyncImpl finished right away
    421   // without waiting for data. We use this callback to call the
    422   // |sucess_callback| but invalidate the callback if the Stream is destroyed
    423   // while this call is waiting in the main loop.
    424   BRILLO_PRIVATE void OnReadAsyncDone(
    425       const base::Callback<void(size_t, bool)>& success_callback,
    426       size_t bytes_read,
    427       bool eos);
    428 
    429   // Called from WaitForData() when read operations can be performed
    430   // without blocking (the type of operation is provided in |mode|).
    431   BRILLO_PRIVATE void OnReadAvailable(
    432       void* buffer,
    433       size_t size_to_read,
    434       const base::Callback<void(size_t, bool)>& success_callback,
    435       const ErrorCallback& error_callback,
    436       AccessMode mode);
    437 
    438   // The internal implementation of WriteAsync() and WriteAllAsync().
    439   // Calls WriteNonBlocking and if the write would block for it to not block
    440   // calling WaitForData(). The extra |force_async_callback| tell whether the
    441   // success callback should be called from the main loop instead of directly
    442   // from this method. This method only calls WaitForData() if
    443   // WriteNonBlocking() returns a situation in which it would block
    444   // (size_written = 0 and eos = false), preventing us from calling
    445   // WaitForData() on streams that don't support such feature.
    446   BRILLO_PRIVATE bool WriteAsyncImpl(
    447       const void* buffer,
    448       size_t size_to_write,
    449       const base::Callback<void(size_t)>& success_callback,
    450       const ErrorCallback& error_callback,
    451       ErrorPtr* error,
    452       bool force_async_callback);
    453 
    454   // Called from the main loop when the WriteAsyncImpl finished right away
    455   // without waiting for data. We use this callback to call the
    456   // |sucess_callback| but invalidate the callback if the Stream is destroyed
    457   // while this call is waiting in the main loop.
    458   BRILLO_PRIVATE void OnWriteAsyncDone(
    459       const base::Callback<void(size_t)>& success_callback,
    460       size_t size_written);
    461 
    462   // Called from WaitForData() when write operations can be performed
    463   // without blocking (the type of operation is provided in |mode|).
    464   BRILLO_PRIVATE void OnWriteAvailable(
    465       const void* buffer,
    466       size_t size,
    467       const base::Callback<void(size_t)>& success_callback,
    468       const ErrorCallback& error_callback,
    469       AccessMode mode);
    470 
    471   // Helper callbacks to implement ReadAllAsync/WriteAllAsync.
    472   BRILLO_PRIVATE void ReadAllAsyncCallback(
    473       void* buffer,
    474       size_t size_to_read,
    475       const base::Closure& success_callback,
    476       const ErrorCallback& error_callback,
    477       size_t size_read,
    478       bool eos);
    479   BRILLO_PRIVATE void WriteAllAsyncCallback(
    480       const void* buffer,
    481       size_t size_to_write,
    482       const base::Closure& success_callback,
    483       const ErrorCallback& error_callback,
    484       size_t size_written);
    485 
    486   // Helper callbacks to implement FlushAsync().
    487   BRILLO_PRIVATE void FlushAsyncCallback(
    488       const base::Closure& success_callback,
    489       const ErrorCallback& error_callback);
    490 
    491   // Data members for asynchronous read operations.
    492   bool is_async_read_pending_{false};
    493 
    494   // Data members for asynchronous write operations.
    495   bool is_async_write_pending_{false};
    496 
    497   base::WeakPtrFactory<Stream> weak_ptr_factory_{this};
    498   DISALLOW_COPY_AND_ASSIGN(Stream);
    499 };
    500 
    501 // A smart pointer to the stream used to pass the stream object around.
    502 using StreamPtr = std::unique_ptr<Stream>;
    503 
    504 }  // namespace brillo
    505 
    506 #endif  // LIBBRILLO_BRILLO_STREAMS_STREAM_H_
    507