Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef MOJO_SYSTEM_RAW_CHANNEL_H_
      6 #define MOJO_SYSTEM_RAW_CHANNEL_H_
      7 
      8 #include <deque>
      9 #include <vector>
     10 
     11 #include "base/macros.h"
     12 #include "base/memory/scoped_ptr.h"
     13 #include "base/memory/weak_ptr.h"
     14 #include "base/synchronization/lock.h"
     15 #include "mojo/embedder/platform_handle_vector.h"
     16 #include "mojo/embedder/scoped_platform_handle.h"
     17 #include "mojo/system/constants.h"
     18 #include "mojo/system/message_in_transit.h"
     19 #include "mojo/system/system_impl_export.h"
     20 
     21 namespace base {
     22 class MessageLoopForIO;
     23 }
     24 
     25 namespace mojo {
     26 namespace system {
     27 
     28 // |RawChannel| is an interface and base class for objects that wrap an OS
     29 // "pipe". It presents the following interface to users:
     30 //  - Receives and dispatches messages on an I/O thread (running a
     31 //    |MessageLoopForIO|.
     32 //  - Provides a thread-safe way of writing messages (|WriteMessage()|);
     33 //    writing/queueing messages will not block and is atomic from the point of
     34 //    view of the caller. If necessary, messages are queued (to be written on
     35 //    the aforementioned thread).
     36 //
     37 // OS-specific implementation subclasses are to be instantiated using the
     38 // |Create()| static factory method.
     39 //
     40 // With the exception of |WriteMessage()|, this class is thread-unsafe (and in
     41 // general its methods should only be used on the I/O thread, i.e., the thread
     42 // on which |Init()| is called).
     43 class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
     44  public:
     45   virtual ~RawChannel();
     46 
     47   // The |Delegate| is only accessed on the same thread as the message loop
     48   // (passed in on creation).
     49   class MOJO_SYSTEM_IMPL_EXPORT Delegate {
     50    public:
     51     enum FatalError {
     52       FATAL_ERROR_READ = 0,
     53       FATAL_ERROR_WRITE
     54     };
     55 
     56     // Called when a message is read. This may call |Shutdown()| (on the
     57     // |RawChannel|), but must not destroy it.
     58     virtual void OnReadMessage(
     59         const MessageInTransit::View& message_view,
     60         embedder::ScopedPlatformHandleVectorPtr platform_handles) = 0;
     61 
     62     // Called when there's a fatal error, which leads to the channel no longer
     63     // being viable. This may call |Shutdown()| (on the |RawChannel()|), but
     64     // must not destroy it.
     65     //
     66     // For each raw channel, at most one |FATAL_ERROR_READ| and at most one
     67     // |FATAL_ERROR_WRITE| notification will be issued (both may be issued).
     68     // After a |OnFatalError(FATAL_ERROR_READ)|, there will be no further calls
     69     // to |OnReadMessage()|.
     70     virtual void OnFatalError(FatalError fatal_error) = 0;
     71 
     72    protected:
     73     virtual ~Delegate() {}
     74   };
     75 
     76   // Static factory method. |handle| should be a handle to a
     77   // (platform-appropriate) bidirectional communication channel (e.g., a socket
     78   // on POSIX, a named pipe on Windows).
     79   static scoped_ptr<RawChannel> Create(embedder::ScopedPlatformHandle handle);
     80 
     81   // This must be called (on an I/O thread) before this object is used. Does
     82   // *not* take ownership of |delegate|. Both the I/O thread and |delegate| must
     83   // remain alive until |Shutdown()| is called (unless this fails); |delegate|
     84   // will no longer be used after |Shutdown()|. Returns true on success. On
     85   // failure, |Shutdown()| should *not* be called.
     86   bool Init(Delegate* delegate);
     87 
     88   // This must be called (on the I/O thread) before this object is destroyed.
     89   void Shutdown();
     90 
     91   // Writes the given message (or schedules it to be written). |message| must
     92   // have no |Dispatcher|s still attached (i.e.,
     93   // |SerializeAndCloseDispatchers()| should have been called). This method is
     94   // thread-safe and may be called from any thread. Returns true on success.
     95   bool WriteMessage(scoped_ptr<MessageInTransit> message);
     96 
     97   // Returns true if the write buffer is empty (i.e., all messages written using
     98   // |WriteMessage()| have actually been sent.
     99   // TODO(vtl): We should really also notify our delegate when the write buffer
    100   // becomes empty (or something like that).
    101   bool IsWriteBufferEmpty();
    102 
    103   // Returns the amount of space needed in the |MessageInTransit|'s
    104   // |TransportData|'s "platform handle table" per platform handle (to be
    105   // attached to a message). (This amount may be zero.)
    106   virtual size_t GetSerializedPlatformHandleSize() const = 0;
    107 
    108  protected:
    109   // Return values of |[Schedule]Read()| and |[Schedule]WriteNoLock()|.
    110   enum IOResult {
    111     IO_SUCCEEDED,
    112     IO_FAILED,
    113     IO_PENDING
    114   };
    115 
    116   class MOJO_SYSTEM_IMPL_EXPORT ReadBuffer {
    117    public:
    118     ReadBuffer();
    119     ~ReadBuffer();
    120 
    121     void GetBuffer(char** addr, size_t* size);
    122 
    123    private:
    124     friend class RawChannel;
    125 
    126     // We store data from |[Schedule]Read()|s in |buffer_|. The start of
    127     // |buffer_| is always aligned with a message boundary (we will copy memory
    128     // to ensure this), but |buffer_| may be larger than the actual number of
    129     // bytes we have.
    130     std::vector<char> buffer_;
    131     size_t num_valid_bytes_;
    132 
    133     DISALLOW_COPY_AND_ASSIGN(ReadBuffer);
    134   };
    135 
    136   class MOJO_SYSTEM_IMPL_EXPORT WriteBuffer {
    137    public:
    138     struct Buffer {
    139       const char* addr;
    140       size_t size;
    141     };
    142 
    143     explicit WriteBuffer(size_t serialized_platform_handle_size);
    144     ~WriteBuffer();
    145 
    146     // Returns true if there are (more) platform handles to be sent (from the
    147     // front of |message_queue_|).
    148     bool HavePlatformHandlesToSend() const;
    149     // Gets platform handles to be sent (from the front of |message_queue_|).
    150     // This should only be called if |HavePlatformHandlesToSend()| returned
    151     // true. There are two components to this: the actual |PlatformHandle|s
    152     // (which should be closed once sent) and any additional serialization
    153     // information (which will be embedded in the message's data; there are
    154     // |GetSerializedPlatformHandleSize()| bytes per handle). Once all platform
    155     // handles have been sent, the message data should be written next (see
    156     // |GetBuffers()|).
    157     void GetPlatformHandlesToSend(size_t* num_platform_handles,
    158                                   embedder::PlatformHandle** platform_handles,
    159                                   void** serialization_data);
    160 
    161     // Gets buffers to be written. These buffers will always come from the front
    162     // of |message_queue_|. Once they are completely written, the front
    163     // |MessageInTransit| should be popped (and destroyed); this is done in
    164     // |OnWriteCompletedNoLock()|.
    165     void GetBuffers(std::vector<Buffer>* buffers) const;
    166 
    167    private:
    168     friend class RawChannel;
    169 
    170     const size_t serialized_platform_handle_size_;
    171 
    172     // TODO(vtl): When C++11 is available, switch this to a deque of
    173     // |scoped_ptr|/|unique_ptr|s.
    174     std::deque<MessageInTransit*> message_queue_;
    175     // Platform handles are sent before the message data, but doing so may
    176     // require several passes. |platform_handles_offset_| indicates the position
    177     // in the first message's vector of platform handles to send next.
    178     size_t platform_handles_offset_;
    179     // The first message's data may have been partially sent. |data_offset_|
    180     // indicates the position in the first message's data to start the next
    181     // write.
    182     size_t data_offset_;
    183 
    184     DISALLOW_COPY_AND_ASSIGN(WriteBuffer);
    185   };
    186 
    187   RawChannel();
    188 
    189   // Must be called on the I/O thread WITHOUT |write_lock_| held.
    190   void OnReadCompleted(bool result, size_t bytes_read);
    191   // Must be called on the I/O thread WITHOUT |write_lock_| held.
    192   void OnWriteCompleted(bool result,
    193                         size_t platform_handles_written,
    194                         size_t bytes_written);
    195 
    196   base::MessageLoopForIO* message_loop_for_io() { return message_loop_for_io_; }
    197   base::Lock& write_lock() { return write_lock_; }
    198 
    199   // Should only be called on the I/O thread.
    200   ReadBuffer* read_buffer() { return read_buffer_.get(); }
    201 
    202   // Only called under |write_lock_|.
    203   WriteBuffer* write_buffer_no_lock() {
    204     write_lock_.AssertAcquired();
    205     return write_buffer_.get();
    206   }
    207 
    208   // Adds |message| to the write message queue. Implementation subclasses may
    209   // override this to add any additional "control" messages needed. This is
    210   // called (on any thread) with |write_lock_| held.
    211   virtual void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message);
    212 
    213   // Handles any control messages targeted to the |RawChannel| (or
    214   // implementation subclass). Implementation subclasses may override this to
    215   // handle any implementation-specific control messages, but should call
    216   // |RawChannel::OnReadMessageForRawChannel()| for any remaining messages.
    217   // Returns true on success and false on error (e.g., invalid control message).
    218   // This is only called on the I/O thread.
    219   virtual bool OnReadMessageForRawChannel(
    220       const MessageInTransit::View& message_view);
    221 
    222   // Reads into |read_buffer()|.
    223   // This class guarantees that:
    224   // - the area indicated by |GetBuffer()| will stay valid until read completion
    225   //   (but please also see the comments for |OnShutdownNoLock()|);
    226   // - a second read is not started if there is a pending read;
    227   // - the method is called on the I/O thread WITHOUT |write_lock_| held.
    228   //
    229   // The implementing subclass must guarantee that:
    230   // - |bytes_read| is untouched unless |Read()| returns |IO_SUCCEEDED|;
    231   // - if the method returns |IO_PENDING|, |OnReadCompleted()| will be called on
    232   //   the I/O thread to report the result, unless |Shutdown()| is called.
    233   virtual IOResult Read(size_t* bytes_read) = 0;
    234   // Similar to |Read()|, except that the implementing subclass must also
    235   // guarantee that the method doesn't succeed synchronously, i.e., it only
    236   // returns |IO_FAILED| or |IO_PENDING|.
    237   virtual IOResult ScheduleRead() = 0;
    238 
    239   // Called by |OnReadCompleted()| to get the platform handles associated with
    240   // the given platform handle table (from a message). This should only be
    241   // called when |num_platform_handles| is nonzero. Returns null if the
    242   // |num_platform_handles| handles are not available. Only called on the I/O
    243   // thread (without |write_lock_| held).
    244   virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
    245       size_t num_platform_handles,
    246       const void* platform_handle_table) = 0;
    247 
    248   // Writes contents in |write_buffer_no_lock()|.
    249   // This class guarantees that:
    250   // - the |PlatformHandle|s given by |GetPlatformHandlesToSend()| and the
    251   //   buffer(s) given by |GetBuffers()| will remain valid until write
    252   //   completion (see also the comments for |OnShutdownNoLock()|);
    253   // - a second write is not started if there is a pending write;
    254   // - the method is called under |write_lock_|.
    255   //
    256   // The implementing subclass must guarantee that:
    257   // - |platform_handles_written| and |bytes_written| are untouched unless
    258   //   |WriteNoLock()| returns |IO_SUCCEEDED|;
    259   // - if the method returns |IO_PENDING|, |OnWriteCompleted()| will be called
    260   //   on the I/O thread to report the result, unless |Shutdown()| is called.
    261   virtual IOResult WriteNoLock(size_t* platform_handles_written,
    262                                size_t* bytes_written) = 0;
    263   // Similar to |WriteNoLock()|, except that the implementing subclass must also
    264   // guarantee that the method doesn't succeed synchronously, i.e., it only
    265   // returns |IO_FAILED| or |IO_PENDING|.
    266   virtual IOResult ScheduleWriteNoLock() = 0;
    267 
    268   // Must be called on the I/O thread WITHOUT |write_lock_| held.
    269   virtual bool OnInit() = 0;
    270   // On shutdown, passes the ownership of the buffers to subclasses, which may
    271   // want to preserve them if there are pending read/write. Must be called on
    272   // the I/O thread under |write_lock_|.
    273   virtual void OnShutdownNoLock(
    274       scoped_ptr<ReadBuffer> read_buffer,
    275       scoped_ptr<WriteBuffer> write_buffer) = 0;
    276 
    277  private:
    278   // Calls |delegate_->OnFatalError(fatal_error)|. Must be called on the I/O
    279   // thread WITHOUT |write_lock_| held.
    280   void CallOnFatalError(Delegate::FatalError fatal_error);
    281 
    282   // If |result| is true, updates the write buffer and schedules a write
    283   // operation to run later if there are more contents to write. If |result| is
    284   // false or any error occurs during the method execution, cancels pending
    285   // writes and returns false.
    286   // Must be called only if |write_stopped_| is false and under |write_lock_|.
    287   bool OnWriteCompletedNoLock(bool result,
    288                               size_t platform_handles_written,
    289                               size_t bytes_written);
    290 
    291   // Set in |Init()| and never changed (hence usable on any thread without
    292   // locking):
    293   base::MessageLoopForIO* message_loop_for_io_;
    294 
    295   // Only used on the I/O thread:
    296   Delegate* delegate_;
    297   bool read_stopped_;
    298   scoped_ptr<ReadBuffer> read_buffer_;
    299 
    300   base::Lock write_lock_;  // Protects the following members.
    301   bool write_stopped_;
    302   scoped_ptr<WriteBuffer> write_buffer_;
    303 
    304   // This is used for posting tasks from write threads to the I/O thread. It
    305   // must only be accessed under |write_lock_|. The weak pointers it produces
    306   // are only used/invalidated on the I/O thread.
    307   base::WeakPtrFactory<RawChannel> weak_ptr_factory_;
    308 
    309   DISALLOW_COPY_AND_ASSIGN(RawChannel);
    310 };
    311 
    312 }  // namespace system
    313 }  // namespace mojo
    314 
    315 #endif  // MOJO_SYSTEM_RAW_CHANNEL_H_
    316