Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/raw_channel.h"
      6 
      7 #include <errno.h>
      8 #include <string.h>
      9 #include <unistd.h>
     10 
     11 #include <algorithm>
     12 #include <deque>
     13 #include <vector>
     14 
     15 #include "base/basictypes.h"
     16 #include "base/bind.h"
     17 #include "base/compiler_specific.h"
     18 #include "base/location.h"
     19 #include "base/logging.h"
     20 #include "base/memory/scoped_ptr.h"
     21 #include "base/memory/weak_ptr.h"
     22 #include "base/message_loop/message_loop.h"
     23 #include "base/posix/eintr_wrapper.h"
     24 #include "base/synchronization/lock.h"
     25 #include "mojo/system/message_in_transit.h"
     26 #include "mojo/system/platform_channel_handle.h"
     27 
     28 namespace mojo {
     29 namespace system {
     30 
     31 namespace {
     32 
     33 const size_t kReadSize = 4096;
     34 
     35 class RawChannelPosix : public RawChannel,
     36                         public base::MessageLoopForIO::Watcher {
     37  public:
     38   RawChannelPosix(const PlatformChannelHandle& handle,
     39                   Delegate* delegate,
     40                   base::MessageLoop* message_loop);
     41   virtual ~RawChannelPosix();
     42 
     43   // |RawChannel| implementation:
     44   virtual bool Init() OVERRIDE;
     45   virtual void Shutdown() OVERRIDE;
     46   virtual bool WriteMessage(MessageInTransit* message) OVERRIDE;
     47 
     48  private:
     49   // |base::MessageLoopForIO::Watcher| implementation:
     50   virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
     51   virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
     52 
     53   // Watches for |fd_| to become writable. Must be called on the I/O thread.
     54   void WaitToWrite();
     55 
     56   // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
     57   // thread WITHOUT |write_lock_| held.
     58   void CallOnFatalError(Delegate::FatalError fatal_error);
     59 
     60   // Writes the message at the front of |write_message_queue_|, starting at
     61   // |write_message_offset_|. It removes and destroys if the write completes and
     62   // otherwise updates |write_message_offset_|. Returns true on success. Must be
     63   // called under |write_lock_|.
     64   bool WriteFrontMessageNoLock();
     65 
     66   // Cancels all pending writes and destroys the contents of
     67   // |write_message_queue_|. Should only be called if |is_dead_| is false; sets
     68   // |is_dead_| to true. Must be called under |write_lock_|.
     69   void CancelPendingWritesNoLock();
     70 
     71   base::MessageLoopForIO* message_loop_for_io() {
     72     return static_cast<base::MessageLoopForIO*>(message_loop());
     73   }
     74 
     75   int fd_;
     76 
     77   // Only used on the I/O thread:
     78   scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
     79   scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
     80 
     81   // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_|
     82   // is always aligned with a message boundary (we will copy memory to ensure
     83   // this), but |read_buffer_| may be larger than the actual number of bytes we
     84   // have.
     85   std::vector<char> read_buffer_;
     86   size_t read_buffer_num_valid_bytes_;
     87 
     88   base::Lock write_lock_;  // Protects the following members.
     89   bool is_dead_;
     90   std::deque<MessageInTransit*> write_message_queue_;
     91   size_t write_message_offset_;
     92   // This is used for posting tasks from write threads to the I/O thread. It
     93   // must only be accessed under |write_lock_|. The weak pointers it produces
     94   // are only used/invalidated on the I/O thread.
     95   base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
     96 
     97   DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
     98 };
     99 
    100 RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle,
    101                                  Delegate* delegate,
    102                                  base::MessageLoop* message_loop)
    103     : RawChannel(delegate, message_loop),
    104       fd_(handle.fd),
    105       read_buffer_num_valid_bytes_(0),
    106       is_dead_(false),
    107       write_message_offset_(0),
    108       weak_ptr_factory_(this) {
    109   CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO);
    110   DCHECK_NE(fd_, -1);
    111 }
    112 
    113 RawChannelPosix::~RawChannelPosix() {
    114   DCHECK(is_dead_);
    115   DCHECK_EQ(fd_, -1);
    116 
    117   // No need to take the |write_lock_| here -- if there are still weak pointers
    118   // outstanding, then we're hosed anyway (since we wouldn't be able to
    119   // invalidate them cleanly, since we might not be on the I/O thread).
    120   DCHECK(!weak_ptr_factory_.HasWeakPtrs());
    121 
    122   // These must have been shut down/destroyed on the I/O thread.
    123   DCHECK(!read_watcher_.get());
    124   DCHECK(!write_watcher_.get());
    125 }
    126 
    127 bool RawChannelPosix::Init() {
    128   DCHECK_EQ(base::MessageLoop::current(), message_loop());
    129 
    130   DCHECK(!read_watcher_.get());
    131   read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
    132   DCHECK(!write_watcher_.get());
    133   write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
    134 
    135   // No need to take the lock. No one should be using us yet.
    136   DCHECK(write_message_queue_.empty());
    137 
    138   if (!message_loop_for_io()->WatchFileDescriptor(fd_, true,
    139           base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
    140     // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
    141     // (in the sense of returning the message loop's state to what it was before
    142     // it was called).
    143     read_watcher_.reset();
    144     write_watcher_.reset();
    145     return false;
    146   }
    147 
    148   return true;
    149 }
    150 
    151 void RawChannelPosix::Shutdown() {
    152   DCHECK_EQ(base::MessageLoop::current(), message_loop());
    153 
    154   base::AutoLock locker(write_lock_);
    155   if (!is_dead_)
    156     CancelPendingWritesNoLock();
    157 
    158   DCHECK_NE(fd_, -1);
    159   if (close(fd_) != 0)
    160     PLOG(ERROR) << "close";
    161   fd_ = -1;
    162 
    163   weak_ptr_factory_.InvalidateWeakPtrs();
    164 
    165   read_watcher_.reset();  // This will stop watching (if necessary).
    166   write_watcher_.reset();  // This will stop watching (if necessary).
    167 }
    168 
    169 // Reminder: This must be thread-safe, and takes ownership of |message| on
    170 // success.
    171 bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
    172   base::AutoLock locker(write_lock_);
    173   if (is_dead_) {
    174     message->Destroy();
    175     return false;
    176   }
    177 
    178   if (!write_message_queue_.empty()) {
    179     write_message_queue_.push_back(message);
    180     return true;
    181   }
    182 
    183   write_message_queue_.push_front(message);
    184   DCHECK_EQ(write_message_offset_, 0u);
    185   bool result = WriteFrontMessageNoLock();
    186   DCHECK(result || write_message_queue_.empty());
    187 
    188   if (!result) {
    189     // Even if we're on the I/O thread, don't call |OnFatalError()| in the
    190     // nested context.
    191     message_loop()->PostTask(FROM_HERE,
    192                              base::Bind(&RawChannelPosix::CallOnFatalError,
    193                                         weak_ptr_factory_.GetWeakPtr(),
    194                                         Delegate::FATAL_ERROR_FAILED_WRITE));
    195   } else if (!write_message_queue_.empty()) {
    196     // Set up to wait for the FD to become writable. If we're not on the I/O
    197     // thread, we have to post a task to do this.
    198     if (base::MessageLoop::current() == message_loop()) {
    199       WaitToWrite();
    200     } else {
    201       message_loop()->PostTask(FROM_HERE,
    202                                base::Bind(&RawChannelPosix::WaitToWrite,
    203                                           weak_ptr_factory_.GetWeakPtr()));
    204     }
    205   }
    206 
    207   return result;
    208 }
    209 
    210 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
    211   DCHECK_EQ(fd, fd_);
    212   DCHECK_EQ(base::MessageLoop::current(), message_loop());
    213 
    214   bool did_dispatch_message = false;
    215   // Tracks the offset of the first undispatched message in |read_buffer_|.
    216   // Currently, we copy data to ensure that this is zero at the beginning.
    217   size_t read_buffer_start = 0;
    218   for (;;) {
    219     if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
    220             < kReadSize) {
    221       // Use power-of-2 buffer sizes.
    222       // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
    223       // maximum message size to whatever extent necessary).
    224       // TODO(vtl): We may often be able to peek at the header and get the real
    225       // required extra space (which may be much bigger than |kReadSize|).
    226       size_t new_size = std::max(read_buffer_.size(), kReadSize);
    227       while (new_size <
    228                  read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
    229         new_size *= 2;
    230 
    231       // TODO(vtl): It's suboptimal to zero out the fresh memory.
    232       read_buffer_.resize(new_size, 0);
    233     }
    234 
    235     ssize_t bytes_read = HANDLE_EINTR(
    236         read(fd_,
    237              &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
    238              kReadSize));
    239     if (bytes_read < 0) {
    240       if (errno != EAGAIN && errno != EWOULDBLOCK) {
    241         PLOG(ERROR) << "read";
    242         {
    243           base::AutoLock locker(write_lock_);
    244           CancelPendingWritesNoLock();
    245         }
    246         CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
    247         return;
    248       }
    249 
    250       break;
    251     }
    252 
    253     read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
    254 
    255     // Dispatch all the messages that we can.
    256     while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) {
    257       const MessageInTransit* message =
    258           reinterpret_cast<const MessageInTransit*>(
    259               &read_buffer_[read_buffer_start]);
    260       DCHECK_EQ(reinterpret_cast<size_t>(message) %
    261                     MessageInTransit::kMessageAlignment, 0u);
    262       // If we have the header, not the whole message....
    263       if (read_buffer_num_valid_bytes_ <
    264               message->size_with_header_and_padding())
    265         break;
    266 
    267       // Dispatch the message.
    268       delegate()->OnReadMessage(*message);
    269       if (!read_watcher_.get()) {
    270         // |Shutdown()| was called in |OnReadMessage()|.
    271         // TODO(vtl): Add test for this case.
    272         return;
    273       }
    274       did_dispatch_message = true;
    275 
    276       // Update our state.
    277       read_buffer_start += message->size_with_header_and_padding();
    278       read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding();
    279     }
    280 
    281     // If we dispatched any messages, stop reading for now (and let the message
    282     // loop do its thing for another round).
    283     // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
    284     // a single message. Risks: slower, more complex if we want to avoid lots of
    285     // copying. ii. Keep reading until there's no more data and dispatch all the
    286     // messages we can. Risks: starvation of other users of the message loop.)
    287     if (did_dispatch_message)
    288       break;
    289 
    290     // If we didn't max out |kReadSize|, stop reading for now.
    291     if (static_cast<size_t>(bytes_read) < kReadSize)
    292       break;
    293 
    294     // Else try to read some more....
    295   }
    296 
    297   // Move data back to start.
    298   if (read_buffer_start > 0) {
    299     memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
    300             read_buffer_num_valid_bytes_);
    301     read_buffer_start = 0;
    302   }
    303 }
    304 
    305 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
    306   DCHECK_EQ(fd, fd_);
    307   DCHECK_EQ(base::MessageLoop::current(), message_loop());
    308 
    309   bool did_fail = false;
    310   {
    311     base::AutoLock locker(write_lock_);
    312     DCHECK(!is_dead_);
    313     DCHECK(!write_message_queue_.empty());
    314 
    315     bool result = WriteFrontMessageNoLock();
    316     DCHECK(result || write_message_queue_.empty());
    317 
    318     if (!result)
    319       did_fail = true;
    320     else if (!write_message_queue_.empty())
    321       WaitToWrite();
    322   }
    323   if (did_fail)
    324     CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
    325 }
    326 
    327 void RawChannelPosix::WaitToWrite() {
    328   DCHECK_EQ(base::MessageLoop::current(), message_loop());
    329 
    330   DCHECK(write_watcher_.get());
    331   bool result = message_loop_for_io()->WatchFileDescriptor(
    332       fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(),
    333       this);
    334   DCHECK(result);
    335 }
    336 
    337 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
    338   DCHECK_EQ(base::MessageLoop::current(), message_loop());
    339   // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
    340   delegate()->OnFatalError(fatal_error);
    341 }
    342 
    343 bool RawChannelPosix::WriteFrontMessageNoLock() {
    344   write_lock_.AssertAcquired();
    345 
    346   DCHECK(!is_dead_);
    347   DCHECK(!write_message_queue_.empty());
    348 
    349   MessageInTransit* message = write_message_queue_.front();
    350   DCHECK_LT(write_message_offset_, message->size_with_header_and_padding());
    351   size_t bytes_to_write =
    352       message->size_with_header_and_padding() - write_message_offset_;
    353   ssize_t bytes_written = HANDLE_EINTR(
    354       write(fd_,
    355             reinterpret_cast<char*>(message) + write_message_offset_,
    356             bytes_to_write));
    357   if (bytes_written < 0) {
    358     if (errno != EAGAIN && errno != EWOULDBLOCK) {
    359       PLOG(ERROR) << "write of size " << bytes_to_write;
    360       CancelPendingWritesNoLock();
    361       return false;
    362     }
    363 
    364     // We simply failed to write since we'd block. The logic is the same as if
    365     // we got a partial write.
    366     bytes_written = 0;
    367   }
    368 
    369   DCHECK_GE(bytes_written, 0);
    370   if (static_cast<size_t>(bytes_written) < bytes_to_write) {
    371     // Partial (or no) write.
    372     write_message_offset_ += static_cast<size_t>(bytes_written);
    373   } else {
    374     // Complete write.
    375     DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
    376     write_message_queue_.pop_front();
    377     write_message_offset_ = 0;
    378     message->Destroy();
    379   }
    380 
    381   return true;
    382 }
    383 
    384 void RawChannelPosix::CancelPendingWritesNoLock() {
    385   write_lock_.AssertAcquired();
    386   DCHECK(!is_dead_);
    387 
    388   is_dead_ = true;
    389   for (std::deque<MessageInTransit*>::iterator it =
    390            write_message_queue_.begin(); it != write_message_queue_.end();
    391        ++it) {
    392     (*it)->Destroy();
    393   }
    394   write_message_queue_.clear();
    395 }
    396 
    397 }  // namespace
    398 
    399 // -----------------------------------------------------------------------------
    400 
    401 // Static factory method declared in raw_channel.h.
    402 // static
    403 RawChannel* RawChannel::Create(const PlatformChannelHandle& handle,
    404                                Delegate* delegate,
    405                                base::MessageLoop* message_loop) {
    406   return new RawChannelPosix(handle, delegate, message_loop);
    407 }
    408 
    409 }  // namespace system
    410 }  // namespace mojo
    411