Home | History | Annotate | Download | only in system
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/channel.h"
      6 
      7 #include <errno.h>
      8 #include <sys/socket.h>
      9 
     10 #include <algorithm>
     11 #include <deque>
     12 #include <limits>
     13 #include <memory>
     14 
     15 #include "base/bind.h"
     16 #include "base/location.h"
     17 #include "base/macros.h"
     18 #include "base/memory/ref_counted.h"
     19 #include "base/message_loop/message_loop.h"
     20 #include "base/synchronization/lock.h"
     21 #include "base/task_runner.h"
     22 #include "mojo/edk/embedder/platform_channel_utils_posix.h"
     23 #include "mojo/edk/embedder/platform_handle_vector.h"
     24 
     25 #if !defined(OS_NACL)
     26 #include <sys/uio.h>
     27 #endif
     28 
     29 namespace mojo {
     30 namespace edk {
     31 
     32 namespace {
     33 
     34 const size_t kMaxBatchReadCapacity = 256 * 1024;
     35 
     36 // A view over a Channel::Message object. The write queue uses these since
     37 // large messages may need to be sent in chunks.
     38 class MessageView {
     39  public:
     40   // Owns |message|. |offset| indexes the first unsent byte in the message.
     41   MessageView(Channel::MessagePtr message, size_t offset)
     42       : message_(std::move(message)),
     43         offset_(offset),
     44         handles_(message_->TakeHandlesForTransport()) {
     45     DCHECK_GT(message_->data_num_bytes(), offset_);
     46   }
     47 
     48   MessageView(MessageView&& other) { *this = std::move(other); }
     49 
     50   MessageView& operator=(MessageView&& other) {
     51     message_ = std::move(other.message_);
     52     offset_ = other.offset_;
     53     handles_ = std::move(other.handles_);
     54     return *this;
     55   }
     56 
     57   ~MessageView() {}
     58 
     59   const void* data() const {
     60     return static_cast<const char*>(message_->data()) + offset_;
     61   }
     62 
     63   size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
     64 
     65   size_t data_offset() const { return offset_; }
     66   void advance_data_offset(size_t num_bytes) {
     67     DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
     68     offset_ += num_bytes;
     69   }
     70 
     71   ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); }
     72   Channel::MessagePtr TakeMessage() { return std::move(message_); }
     73 
     74   void SetHandles(ScopedPlatformHandleVectorPtr handles) {
     75     handles_ = std::move(handles);
     76   }
     77 
     78  private:
     79   Channel::MessagePtr message_;
     80   size_t offset_;
     81   ScopedPlatformHandleVectorPtr handles_;
     82 
     83   DISALLOW_COPY_AND_ASSIGN(MessageView);
     84 };
     85 
     86 class ChannelPosix : public Channel,
     87                      public base::MessageLoop::DestructionObserver,
     88                      public base::MessageLoopForIO::Watcher {
     89  public:
     90   ChannelPosix(Delegate* delegate,
     91                ConnectionParams connection_params,
     92                scoped_refptr<base::TaskRunner> io_task_runner)
     93       : Channel(delegate),
     94         self_(this),
     95         handle_(connection_params.TakeChannelHandle()),
     96         io_task_runner_(io_task_runner)
     97 #if defined(OS_MACOSX)
     98         ,
     99         handles_to_close_(new PlatformHandleVector)
    100 #endif
    101   {
    102     CHECK(handle_.is_valid());
    103   }
    104 
    105   void Start() override {
    106     if (io_task_runner_->RunsTasksOnCurrentThread()) {
    107       StartOnIOThread();
    108     } else {
    109       io_task_runner_->PostTask(
    110           FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this));
    111     }
    112   }
    113 
    114   void ShutDownImpl() override {
    115     // Always shut down asynchronously when called through the public interface.
    116     io_task_runner_->PostTask(
    117         FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this));
    118   }
    119 
    120   void Write(MessagePtr message) override {
    121     bool write_error = false;
    122     {
    123       base::AutoLock lock(write_lock_);
    124       if (reject_writes_)
    125         return;
    126       if (outgoing_messages_.empty()) {
    127         if (!WriteNoLock(MessageView(std::move(message), 0)))
    128           reject_writes_ = write_error = true;
    129       } else {
    130         outgoing_messages_.emplace_back(std::move(message), 0);
    131       }
    132     }
    133     if (write_error) {
    134       // Do not synchronously invoke OnError(). Write() may have been called by
    135       // the delegate and we don't want to re-enter it.
    136       io_task_runner_->PostTask(FROM_HERE,
    137                                 base::Bind(&ChannelPosix::OnError, this));
    138     }
    139   }
    140 
    141   void LeakHandle() override {
    142     DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    143     leak_handle_ = true;
    144   }
    145 
    146   bool GetReadPlatformHandles(
    147       size_t num_handles,
    148       const void* extra_header,
    149       size_t extra_header_size,
    150       ScopedPlatformHandleVectorPtr* handles) override {
    151     if (num_handles > std::numeric_limits<uint16_t>::max())
    152       return false;
    153 #if defined(OS_MACOSX) && !defined(OS_IOS)
    154     // On OSX, we can have mach ports which are located in the extra header
    155     // section.
    156     using MachPortsEntry = Channel::Message::MachPortsEntry;
    157     using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader;
    158     CHECK(extra_header_size >=
    159           sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry));
    160     const MachPortsExtraHeader* mach_ports_header =
    161         reinterpret_cast<const MachPortsExtraHeader*>(extra_header);
    162     size_t num_mach_ports = mach_ports_header->num_ports;
    163     CHECK(num_mach_ports <= num_handles);
    164     if (incoming_platform_handles_.size() + num_mach_ports < num_handles) {
    165       handles->reset();
    166       return true;
    167     }
    168 
    169     handles->reset(new PlatformHandleVector(num_handles));
    170     const MachPortsEntry* mach_ports = mach_ports_header->entries;
    171     for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) {
    172       if (mach_port_index < num_mach_ports &&
    173           mach_ports[mach_port_index].index == i) {
    174         (*handles)->at(i) = PlatformHandle(
    175             static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port));
    176         CHECK((*handles)->at(i).type == PlatformHandle::Type::MACH);
    177         // These are actually just Mach port names until they're resolved from
    178         // the remote process.
    179         (*handles)->at(i).type = PlatformHandle::Type::MACH_NAME;
    180         mach_port_index++;
    181       } else {
    182         CHECK(!incoming_platform_handles_.empty());
    183         (*handles)->at(i) = incoming_platform_handles_.front();
    184         incoming_platform_handles_.pop_front();
    185       }
    186     }
    187 #else
    188     if (incoming_platform_handles_.size() < num_handles) {
    189       handles->reset();
    190       return true;
    191     }
    192 
    193     handles->reset(new PlatformHandleVector(num_handles));
    194     for (size_t i = 0; i < num_handles; ++i) {
    195       (*handles)->at(i) = incoming_platform_handles_.front();
    196       incoming_platform_handles_.pop_front();
    197     }
    198 #endif
    199 
    200     return true;
    201   }
    202 
    203  private:
    204   ~ChannelPosix() override {
    205     DCHECK(!read_watcher_);
    206     DCHECK(!write_watcher_);
    207     for (auto handle : incoming_platform_handles_)
    208       handle.CloseIfNecessary();
    209   }
    210 
    211   void StartOnIOThread() {
    212     DCHECK(!read_watcher_);
    213     DCHECK(!write_watcher_);
    214     read_watcher_.reset(
    215         new base::MessageLoopForIO::FileDescriptorWatcher(FROM_HERE));
    216     base::MessageLoop::current()->AddDestructionObserver(this);
    217     if (handle_.get().needs_connection) {
    218       base::MessageLoopForIO::current()->WatchFileDescriptor(
    219           handle_.get().handle, false /* persistent */,
    220           base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
    221     } else {
    222       write_watcher_.reset(
    223           new base::MessageLoopForIO::FileDescriptorWatcher(FROM_HERE));
    224       base::MessageLoopForIO::current()->WatchFileDescriptor(
    225           handle_.get().handle, true /* persistent */,
    226           base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
    227       base::AutoLock lock(write_lock_);
    228       FlushOutgoingMessagesNoLock();
    229     }
    230   }
    231 
    232   void WaitForWriteOnIOThread() {
    233     base::AutoLock lock(write_lock_);
    234     WaitForWriteOnIOThreadNoLock();
    235   }
    236 
    237   void WaitForWriteOnIOThreadNoLock() {
    238     if (pending_write_)
    239       return;
    240     if (!write_watcher_)
    241       return;
    242     if (io_task_runner_->RunsTasksOnCurrentThread()) {
    243       pending_write_ = true;
    244       base::MessageLoopForIO::current()->WatchFileDescriptor(
    245           handle_.get().handle, false /* persistent */,
    246           base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this);
    247     } else {
    248       io_task_runner_->PostTask(
    249           FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this));
    250     }
    251   }
    252 
    253   void ShutDownOnIOThread() {
    254     base::MessageLoop::current()->RemoveDestructionObserver(this);
    255 
    256     read_watcher_.reset();
    257     write_watcher_.reset();
    258     if (leak_handle_)
    259       ignore_result(handle_.release());
    260     handle_.reset();
    261 #if defined(OS_MACOSX)
    262     handles_to_close_.reset();
    263 #endif
    264 
    265     // May destroy the |this| if it was the last reference.
    266     self_ = nullptr;
    267   }
    268 
    269   // base::MessageLoop::DestructionObserver:
    270   void WillDestroyCurrentMessageLoop() override {
    271     DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    272     if (self_)
    273       ShutDownOnIOThread();
    274   }
    275 
    276   // base::MessageLoopForIO::Watcher:
    277   void OnFileCanReadWithoutBlocking(int fd) override {
    278     CHECK_EQ(fd, handle_.get().handle);
    279     if (handle_.get().needs_connection) {
    280 #if !defined(OS_NACL)
    281       read_watcher_.reset();
    282       base::MessageLoop::current()->RemoveDestructionObserver(this);
    283 
    284       ScopedPlatformHandle accept_fd;
    285       ServerAcceptConnection(handle_.get(), &accept_fd);
    286       if (!accept_fd.is_valid()) {
    287         OnError();
    288         return;
    289       }
    290       handle_ = std::move(accept_fd);
    291       StartOnIOThread();
    292 #else
    293       NOTREACHED();
    294 #endif
    295       return;
    296     }
    297 
    298     bool read_error = false;
    299     size_t next_read_size = 0;
    300     size_t buffer_capacity = 0;
    301     size_t total_bytes_read = 0;
    302     size_t bytes_read = 0;
    303     do {
    304       buffer_capacity = next_read_size;
    305       char* buffer = GetReadBuffer(&buffer_capacity);
    306       DCHECK_GT(buffer_capacity, 0u);
    307 
    308       ssize_t read_result = PlatformChannelRecvmsg(
    309           handle_.get(),
    310           buffer,
    311           buffer_capacity,
    312           &incoming_platform_handles_);
    313 
    314       if (read_result > 0) {
    315         bytes_read = static_cast<size_t>(read_result);
    316         total_bytes_read += bytes_read;
    317         if (!OnReadComplete(bytes_read, &next_read_size)) {
    318           read_error = true;
    319           break;
    320         }
    321       } else if (read_result == 0 ||
    322                  (errno != EAGAIN && errno != EWOULDBLOCK)) {
    323         read_error = true;
    324         break;
    325       }
    326     } while (bytes_read == buffer_capacity &&
    327              total_bytes_read < kMaxBatchReadCapacity &&
    328              next_read_size > 0);
    329     if (read_error) {
    330       // Stop receiving read notifications.
    331       read_watcher_.reset();
    332 
    333       OnError();
    334     }
    335   }
    336 
    337   void OnFileCanWriteWithoutBlocking(int fd) override {
    338     bool write_error = false;
    339     {
    340       base::AutoLock lock(write_lock_);
    341       pending_write_ = false;
    342       if (!FlushOutgoingMessagesNoLock())
    343         reject_writes_ = write_error = true;
    344     }
    345     if (write_error)
    346       OnError();
    347   }
    348 
    349   // Attempts to write a message directly to the channel. If the full message
    350   // cannot be written, it's queued and a wait is initiated to write the message
    351   // ASAP on the I/O thread.
    352   bool WriteNoLock(MessageView message_view) {
    353     if (handle_.get().needs_connection) {
    354       outgoing_messages_.emplace_front(std::move(message_view));
    355       return true;
    356     }
    357     size_t bytes_written = 0;
    358     do {
    359       message_view.advance_data_offset(bytes_written);
    360 
    361       ssize_t result;
    362       ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles();
    363       if (handles && handles->size()) {
    364         iovec iov = {
    365           const_cast<void*>(message_view.data()),
    366           message_view.data_num_bytes()
    367         };
    368         // TODO: Handle lots of handles.
    369         result = PlatformChannelSendmsgWithHandles(
    370             handle_.get(), &iov, 1, handles->data(), handles->size());
    371         if (result >= 0) {
    372 #if defined(OS_MACOSX)
    373           // There is a bug on OSX which makes it dangerous to close
    374           // a file descriptor while it is in transit. So instead we
    375           // store the file descriptor in a set and send a message to
    376           // the recipient, which is queued AFTER the message that
    377           // sent the FD. The recipient will reply to the message,
    378           // letting us know that it is now safe to close the file
    379           // descriptor. For more information, see:
    380           // http://crbug.com/298276
    381           std::vector<int> fds;
    382           for (auto& handle : *handles)
    383             fds.push_back(handle.handle);
    384           {
    385             base::AutoLock l(handles_to_close_lock_);
    386             for (auto& handle : *handles)
    387               handles_to_close_->push_back(handle);
    388           }
    389           MessagePtr fds_message(
    390               new Channel::Message(sizeof(fds[0]) * fds.size(), 0,
    391                                    Message::MessageType::HANDLES_SENT));
    392           memcpy(fds_message->mutable_payload(), fds.data(),
    393                  sizeof(fds[0]) * fds.size());
    394           outgoing_messages_.emplace_back(std::move(fds_message), 0);
    395           handles->clear();
    396 #else
    397           handles.reset();
    398 #endif  // defined(OS_MACOSX)
    399         }
    400       } else {
    401         result = PlatformChannelWrite(handle_.get(), message_view.data(),
    402                                       message_view.data_num_bytes());
    403       }
    404 
    405       if (result < 0) {
    406         if (errno != EAGAIN && errno != EWOULDBLOCK
    407 #if defined(OS_MACOSX)
    408             // On OS X if sendmsg() is trying to send fds between processes and
    409             // there isn't enough room in the output buffer to send the fd
    410             // structure over atomically then EMSGSIZE is returned.
    411             //
    412             // EMSGSIZE presents a problem since the system APIs can only call
    413             // us when there's room in the socket buffer and not when there is
    414             // "enough" room.
    415             //
    416             // The current behavior is to return to the event loop when EMSGSIZE
    417             // is received and hopefull service another FD.  This is however
    418             // still technically a busy wait since the event loop will call us
    419             // right back until the receiver has read enough data to allow
    420             // passing the FD over atomically.
    421             && errno != EMSGSIZE
    422 #endif
    423             ) {
    424           return false;
    425         }
    426         message_view.SetHandles(std::move(handles));
    427         outgoing_messages_.emplace_front(std::move(message_view));
    428         WaitForWriteOnIOThreadNoLock();
    429         return true;
    430       }
    431 
    432       bytes_written = static_cast<size_t>(result);
    433     } while (bytes_written < message_view.data_num_bytes());
    434 
    435     return FlushOutgoingMessagesNoLock();
    436   }
    437 
    438   bool FlushOutgoingMessagesNoLock() {
    439     std::deque<MessageView> messages;
    440     std::swap(outgoing_messages_, messages);
    441 
    442     while (!messages.empty()) {
    443       if (!WriteNoLock(std::move(messages.front())))
    444         return false;
    445 
    446       messages.pop_front();
    447       if (!outgoing_messages_.empty()) {
    448         // The message was requeued by WriteNoLock(), so we have to wait for
    449         // pipe to become writable again. Repopulate the message queue and exit.
    450         // If sending the message triggered any control messages, they may be
    451         // in |outgoing_messages_| in addition to or instead of the message
    452         // being sent.
    453         std::swap(messages, outgoing_messages_);
    454         while (!messages.empty()) {
    455           outgoing_messages_.push_front(std::move(messages.back()));
    456           messages.pop_back();
    457         }
    458         return true;
    459       }
    460     }
    461 
    462     return true;
    463   }
    464 
    465 #if defined(OS_MACOSX)
    466   bool OnControlMessage(Message::MessageType message_type,
    467                         const void* payload,
    468                         size_t payload_size,
    469                         ScopedPlatformHandleVectorPtr handles) override {
    470     switch (message_type) {
    471       case Message::MessageType::HANDLES_SENT: {
    472         if (payload_size == 0)
    473           break;
    474         MessagePtr message(new Channel::Message(
    475             payload_size, 0, Message::MessageType::HANDLES_SENT_ACK));
    476         memcpy(message->mutable_payload(), payload, payload_size);
    477         Write(std::move(message));
    478         return true;
    479       }
    480 
    481       case Message::MessageType::HANDLES_SENT_ACK: {
    482         size_t num_fds = payload_size / sizeof(int);
    483         if (num_fds == 0 || payload_size % sizeof(int) != 0)
    484           break;
    485 
    486         const int* fds = reinterpret_cast<const int*>(payload);
    487         if (!CloseHandles(fds, num_fds))
    488           break;
    489         return true;
    490       }
    491 
    492       default:
    493         break;
    494     }
    495 
    496     return false;
    497   }
    498 
    499   // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
    500   // |fds| does not match a sequence of handles in |handles_to_close_|.
    501   bool CloseHandles(const int* fds, size_t num_fds) {
    502     base::AutoLock l(handles_to_close_lock_);
    503     if (!num_fds)
    504       return false;
    505 
    506     auto start =
    507         std::find_if(handles_to_close_->begin(), handles_to_close_->end(),
    508                      [&fds](const PlatformHandle& handle) {
    509                        return handle.handle == fds[0];
    510                      });
    511     if (start == handles_to_close_->end())
    512       return false;
    513 
    514     auto it = start;
    515     size_t i = 0;
    516     // The FDs in the message should match a sequence of handles in
    517     // |handles_to_close_|.
    518     for (; i < num_fds && it != handles_to_close_->end(); i++, ++it) {
    519       if (it->handle != fds[i])
    520         return false;
    521 
    522       it->CloseIfNecessary();
    523     }
    524     if (i != num_fds)
    525       return false;
    526 
    527     handles_to_close_->erase(start, it);
    528     return true;
    529   }
    530 #endif  // defined(OS_MACOSX)
    531 
    532   // Keeps the Channel alive at least until explicit shutdown on the IO thread.
    533   scoped_refptr<Channel> self_;
    534 
    535   ScopedPlatformHandle handle_;
    536   scoped_refptr<base::TaskRunner> io_task_runner_;
    537 
    538   // These watchers must only be accessed on the IO thread.
    539   std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
    540   std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
    541 
    542   std::deque<PlatformHandle> incoming_platform_handles_;
    543 
    544   // Protects |pending_write_| and |outgoing_messages_|.
    545   base::Lock write_lock_;
    546   bool pending_write_ = false;
    547   bool reject_writes_ = false;
    548   std::deque<MessageView> outgoing_messages_;
    549 
    550   bool leak_handle_ = false;
    551 
    552 #if defined(OS_MACOSX)
    553   base::Lock handles_to_close_lock_;
    554   ScopedPlatformHandleVectorPtr handles_to_close_;
    555 #endif
    556 
    557   DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
    558 };
    559 
    560 }  // namespace
    561 
    562 // static
    563 scoped_refptr<Channel> Channel::Create(
    564     Delegate* delegate,
    565     ConnectionParams connection_params,
    566     scoped_refptr<base::TaskRunner> io_task_runner) {
    567   return new ChannelPosix(delegate, std::move(connection_params),
    568                           io_task_runner);
    569 }
    570 
    571 }  // namespace edk
    572 }  // namespace mojo
    573