Home | History | Annotate | Download | only in core
      1 // Copyright 2017 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/core/channel.h"
      6 
      7 #include <lib/fdio/limits.h>
      8 #include <lib/fdio/util.h>
      9 #include <lib/zx/channel.h>
     10 #include <lib/zx/handle.h>
     11 #include <zircon/processargs.h>
     12 #include <zircon/status.h>
     13 #include <zircon/syscalls.h>
     14 #include <algorithm>
     15 
     16 #include "base/bind.h"
     17 #include "base/containers/circular_deque.h"
     18 #include "base/files/scoped_file.h"
     19 #include "base/fuchsia/fuchsia_logging.h"
     20 #include "base/location.h"
     21 #include "base/macros.h"
     22 #include "base/memory/ref_counted.h"
     23 #include "base/message_loop/message_loop_current.h"
     24 #include "base/message_loop/message_pump_for_io.h"
     25 #include "base/stl_util.h"
     26 #include "base/synchronization/lock.h"
     27 #include "base/task_runner.h"
     28 #include "mojo/core/platform_handle_in_transit.h"
     29 
     30 namespace mojo {
     31 namespace core {
     32 
     33 namespace {
     34 
     35 const size_t kMaxBatchReadCapacity = 256 * 1024;
     36 
     37 bool UnwrapPlatformHandle(PlatformHandleInTransit handle,
     38                           Channel::Message::HandleInfoEntry* info_out,
     39                           std::vector<PlatformHandleInTransit>* handles_out) {
     40   DCHECK(handle.handle().is_valid());
     41 
     42   if (!handle.handle().is_valid_fd()) {
     43     *info_out = {0u, 0u};
     44     handles_out->emplace_back(std::move(handle));
     45     return true;
     46   }
     47 
     48   // Each FDIO file descriptor is implemented using one or more native resources
     49   // and can be un-wrapped into a set of |handle| and |info| pairs, with |info|
     50   // consisting of an FDIO-defined type & arguments (see zircon/processargs.h).
     51   //
     52   // We try to transfer the FD, but if that fails (for example if the file has
     53   // already been dup()d into another FD) we may need to clone.
     54   zx_handle_t handles[FDIO_MAX_HANDLES] = {};
     55   uint32_t info[FDIO_MAX_HANDLES] = {};
     56   zx_status_t result =
     57       fdio_transfer_fd(handle.handle().GetFD().get(), 0, handles, info);
     58   if (result > 0) {
     59     // On success, the fd in |handle| has been transferred and is no longer
     60     // valid. Release from the PlatformHandle to avoid close()ing an invalid
     61     // an invalid handle.
     62     handle.CompleteTransit();
     63   } else if (result == ZX_ERR_UNAVAILABLE) {
     64     // No luck, try cloning instead.
     65     result = fdio_clone_fd(handle.handle().GetFD().get(), 0, handles, info);
     66   }
     67 
     68   if (result <= 0) {
     69     ZX_DLOG(ERROR, result) << "fdio_transfer_fd("
     70                            << handle.handle().GetFD().get() << ")";
     71     return false;
     72   }
     73   DCHECK_LE(result, FDIO_MAX_HANDLES);
     74 
     75   // We assume here that only the |PA_HND_TYPE| of the |info| really matters,
     76   // and that that is the same for all the underlying handles.
     77   *info_out = {PA_HND_TYPE(info[0]), result};
     78   for (int i = 0; i < result; ++i) {
     79     DCHECK_EQ(PA_HND_TYPE(info[0]), PA_HND_TYPE(info[i]));
     80     DCHECK_EQ(0u, PA_HND_SUBTYPE(info[i]));
     81     handles_out->emplace_back(
     82         PlatformHandleInTransit(PlatformHandle(zx::handle(handles[i]))));
     83   }
     84 
     85   return true;
     86 }
     87 
     88 PlatformHandle WrapPlatformHandles(Channel::Message::HandleInfoEntry info,
     89                                    base::circular_deque<zx::handle>* handles) {
     90   PlatformHandle out_handle;
     91   if (!info.type) {
     92     out_handle = PlatformHandle(std::move(handles->front()));
     93     handles->pop_front();
     94   } else {
     95     if (info.count > FDIO_MAX_HANDLES)
     96       return PlatformHandle();
     97 
     98     // Fetch the required number of handles from |handles| and set up type info.
     99     zx_handle_t fd_handles[FDIO_MAX_HANDLES] = {};
    100     uint32_t fd_infos[FDIO_MAX_HANDLES] = {};
    101     for (int i = 0; i < info.count; ++i) {
    102       fd_handles[i] = (*handles)[i].get();
    103       fd_infos[i] = PA_HND(info.type, 0);
    104     }
    105 
    106     // Try to wrap the handles into an FDIO file descriptor.
    107     base::ScopedFD out_fd;
    108     zx_status_t result =
    109         fdio_create_fd(fd_handles, fd_infos, info.count, out_fd.receive());
    110     if (result != ZX_OK) {
    111       ZX_DLOG(ERROR, result) << "fdio_create_fd";
    112       return PlatformHandle();
    113     }
    114 
    115     // The handles are owned by FDIO now, so |release()| them before removing
    116     // the entries from |handles|.
    117     for (int i = 0; i < info.count; ++i) {
    118       ignore_result(handles->front().release());
    119       handles->pop_front();
    120     }
    121 
    122     out_handle = PlatformHandle(std::move(out_fd));
    123   }
    124   return out_handle;
    125 }
    126 
    127 // A view over a Channel::Message object. The write queue uses these since
    128 // large messages may need to be sent in chunks.
    129 class MessageView {
    130  public:
    131   // Owns |message|. |offset| indexes the first unsent byte in the message.
    132   MessageView(Channel::MessagePtr message, size_t offset)
    133       : message_(std::move(message)),
    134         offset_(offset),
    135         handles_(message_->TakeHandlesForTransport()) {
    136     DCHECK_GT(message_->data_num_bytes(), offset_);
    137   }
    138 
    139   MessageView(MessageView&& other) { *this = std::move(other); }
    140 
    141   MessageView& operator=(MessageView&& other) {
    142     message_ = std::move(other.message_);
    143     offset_ = other.offset_;
    144     handles_ = std::move(other.handles_);
    145     return *this;
    146   }
    147 
    148   ~MessageView() {}
    149 
    150   const void* data() const {
    151     return static_cast<const char*>(message_->data()) + offset_;
    152   }
    153 
    154   size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
    155 
    156   size_t data_offset() const { return offset_; }
    157   void advance_data_offset(size_t num_bytes) {
    158     DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
    159     offset_ += num_bytes;
    160   }
    161 
    162   std::vector<PlatformHandleInTransit> TakeHandles() {
    163     if (handles_.empty())
    164       return std::vector<PlatformHandleInTransit>();
    165 
    166     // We can only pass Fuchsia handles via IPC, so unwrap any FDIO file-
    167     // descriptors in |handles_| into the underlying handles, and serialize the
    168     // metadata, if any, into the extra header.
    169     auto* handles_info = reinterpret_cast<Channel::Message::HandleInfoEntry*>(
    170         message_->mutable_extra_header());
    171     memset(handles_info, 0, message_->extra_header_size());
    172 
    173     std::vector<PlatformHandleInTransit> in_handles = std::move(handles_);
    174     handles_.reserve(in_handles.size());
    175     for (size_t i = 0; i < in_handles.size(); i++) {
    176       if (!UnwrapPlatformHandle(std::move(in_handles[i]), &handles_info[i],
    177                                 &handles_))
    178         return std::vector<PlatformHandleInTransit>();
    179     }
    180     return std::move(handles_);
    181   }
    182 
    183  private:
    184   Channel::MessagePtr message_;
    185   size_t offset_;
    186   std::vector<PlatformHandleInTransit> handles_;
    187 
    188   DISALLOW_COPY_AND_ASSIGN(MessageView);
    189 };
    190 
    191 class ChannelFuchsia : public Channel,
    192                        public base::MessageLoopCurrent::DestructionObserver,
    193                        public base::MessagePumpForIO::ZxHandleWatcher {
    194  public:
    195   ChannelFuchsia(Delegate* delegate,
    196                  ConnectionParams connection_params,
    197                  scoped_refptr<base::TaskRunner> io_task_runner)
    198       : Channel(delegate),
    199         self_(this),
    200         handle_(
    201             connection_params.TakeEndpoint().TakePlatformHandle().TakeHandle()),
    202         io_task_runner_(io_task_runner) {
    203     CHECK(handle_.is_valid());
    204   }
    205 
    206   void Start() override {
    207     if (io_task_runner_->RunsTasksInCurrentSequence()) {
    208       StartOnIOThread();
    209     } else {
    210       io_task_runner_->PostTask(
    211           FROM_HERE, base::BindOnce(&ChannelFuchsia::StartOnIOThread, this));
    212     }
    213   }
    214 
    215   void ShutDownImpl() override {
    216     // Always shut down asynchronously when called through the public interface.
    217     io_task_runner_->PostTask(
    218         FROM_HERE, base::BindOnce(&ChannelFuchsia::ShutDownOnIOThread, this));
    219   }
    220 
    221   void Write(MessagePtr message) override {
    222     bool write_error = false;
    223     {
    224       base::AutoLock lock(write_lock_);
    225       if (reject_writes_)
    226         return;
    227       if (!WriteNoLock(MessageView(std::move(message), 0)))
    228         reject_writes_ = write_error = true;
    229     }
    230     if (write_error) {
    231       // Do not synchronously invoke OnWriteError(). Write() may have been
    232       // called by the delegate and we don't want to re-enter it.
    233       io_task_runner_->PostTask(
    234           FROM_HERE, base::BindOnce(&ChannelFuchsia::OnWriteError, this,
    235                                     Error::kDisconnected));
    236     }
    237   }
    238 
    239   void LeakHandle() override {
    240     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    241     leak_handle_ = true;
    242   }
    243 
    244   bool GetReadPlatformHandles(const void* payload,
    245                               size_t payload_size,
    246                               size_t num_handles,
    247                               const void* extra_header,
    248                               size_t extra_header_size,
    249                               std::vector<PlatformHandle>* handles,
    250                               bool* deferred) override {
    251     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    252     if (num_handles > std::numeric_limits<uint16_t>::max())
    253       return false;
    254 
    255     // Locate the handle info and verify there is enough of it.
    256     if (!extra_header)
    257       return false;
    258     const auto* handles_info =
    259         reinterpret_cast<const Channel::Message::HandleInfoEntry*>(
    260             extra_header);
    261     size_t handles_info_size = sizeof(handles_info[0]) * num_handles;
    262     if (handles_info_size > extra_header_size)
    263       return false;
    264 
    265     // Some caller-supplied handles may be FDIO file-descriptors, which were
    266     // un-wrapped to more than one native platform resource handle for transfer.
    267     // We may therefore need to expect more than |num_handles| handles to have
    268     // been accumulated in |incoming_handles_|, based on the handle info.
    269     size_t num_raw_handles = 0u;
    270     for (size_t i = 0; i < num_handles; ++i)
    271       num_raw_handles += handles_info[i].type ? handles_info[i].count : 1;
    272 
    273     // If there are too few handles then we're not ready yet, so return true
    274     // indicating things are OK, but leave |handles| empty.
    275     if (incoming_handles_.size() < num_raw_handles)
    276       return true;
    277 
    278     handles->reserve(num_handles);
    279     for (size_t i = 0; i < num_handles; ++i) {
    280       handles->emplace_back(
    281           WrapPlatformHandles(handles_info[i], &incoming_handles_));
    282     }
    283     return true;
    284   }
    285 
    286  private:
    287   ~ChannelFuchsia() override { DCHECK(!read_watch_); }
    288 
    289   void StartOnIOThread() {
    290     DCHECK(!read_watch_);
    291 
    292     base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
    293 
    294     read_watch_.reset(
    295         new base::MessagePumpForIO::ZxHandleWatchController(FROM_HERE));
    296     base::MessageLoopCurrentForIO::Get()->WatchZxHandle(
    297         handle_.get(), true /* persistent */,
    298         ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED, read_watch_.get(), this);
    299   }
    300 
    301   void ShutDownOnIOThread() {
    302     base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
    303 
    304     read_watch_.reset();
    305     if (leak_handle_)
    306       ignore_result(handle_.release());
    307     handle_.reset();
    308 
    309     // May destroy the |this| if it was the last reference.
    310     self_ = nullptr;
    311   }
    312 
    313   // base::MessageLoopCurrent::DestructionObserver:
    314   void WillDestroyCurrentMessageLoop() override {
    315     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    316     if (self_)
    317       ShutDownOnIOThread();
    318   }
    319 
    320   // base::MessagePumpForIO::ZxHandleWatcher:
    321   void OnZxHandleSignalled(zx_handle_t handle, zx_signals_t signals) override {
    322     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    323     CHECK_EQ(handle, handle_.get());
    324     DCHECK((ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED) & signals);
    325 
    326     // We always try to read message(s), even if ZX_CHANNEL_PEER_CLOSED, since
    327     // the peer may have closed while messages were still unread, in the pipe.
    328 
    329     bool validation_error = false;
    330     bool read_error = false;
    331     size_t next_read_size = 0;
    332     size_t buffer_capacity = 0;
    333     size_t total_bytes_read = 0;
    334     do {
    335       buffer_capacity = next_read_size;
    336       char* buffer = GetReadBuffer(&buffer_capacity);
    337       DCHECK_GT(buffer_capacity, 0u);
    338 
    339       uint32_t bytes_read = 0;
    340       uint32_t handles_read = 0;
    341       zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {};
    342 
    343       zx_status_t read_result =
    344           handle_.read(0, buffer, buffer_capacity, &bytes_read, handles,
    345                        base::size(handles), &handles_read);
    346       if (read_result == ZX_OK) {
    347         for (size_t i = 0; i < handles_read; ++i) {
    348           incoming_handles_.emplace_back(handles[i]);
    349         }
    350         total_bytes_read += bytes_read;
    351         if (!OnReadComplete(bytes_read, &next_read_size)) {
    352           read_error = true;
    353           validation_error = true;
    354           break;
    355         }
    356       } else if (read_result == ZX_ERR_BUFFER_TOO_SMALL) {
    357         DCHECK_LE(handles_read, base::size(handles));
    358         next_read_size = bytes_read;
    359       } else if (read_result == ZX_ERR_SHOULD_WAIT) {
    360         break;
    361       } else {
    362         ZX_DLOG_IF(ERROR, read_result != ZX_ERR_PEER_CLOSED, read_result)
    363             << "zx_channel_read";
    364         read_error = true;
    365         break;
    366       }
    367     } while (total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0);
    368     if (read_error) {
    369       // Stop receiving read notifications.
    370       read_watch_.reset();
    371       if (validation_error)
    372         OnError(Error::kReceivedMalformedData);
    373       else
    374         OnError(Error::kDisconnected);
    375     }
    376   }
    377 
    378   // Attempts to write a message directly to the channel. If the full message
    379   // cannot be written, it's queued and a wait is initiated to write the message
    380   // ASAP on the I/O thread.
    381   bool WriteNoLock(MessageView message_view) {
    382     uint32_t write_bytes = 0;
    383     do {
    384       message_view.advance_data_offset(write_bytes);
    385 
    386       std::vector<PlatformHandleInTransit> outgoing_handles =
    387           message_view.TakeHandles();
    388       zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {};
    389       size_t handles_count = outgoing_handles.size();
    390 
    391       DCHECK_LE(handles_count, base::size(handles));
    392       for (size_t i = 0; i < handles_count; ++i) {
    393         DCHECK(outgoing_handles[i].handle().is_valid());
    394         handles[i] = outgoing_handles[i].handle().GetHandle().get();
    395       }
    396 
    397       write_bytes = std::min(message_view.data_num_bytes(),
    398                              static_cast<size_t>(ZX_CHANNEL_MAX_MSG_BYTES));
    399       zx_status_t result = handle_.write(0, message_view.data(), write_bytes,
    400                                          handles, handles_count);
    401       // zx_channel_write() consumes |handles| whether or not it succeeds, so
    402       // release() our copies now, to avoid them being double-closed.
    403       for (auto& outgoing_handle : outgoing_handles)
    404         outgoing_handle.CompleteTransit();
    405 
    406       if (result != ZX_OK) {
    407         // TODO(fuchsia): Handle ZX_ERR_SHOULD_WAIT flow-control errors, once
    408         // the platform starts generating them. See https://crbug.com/754084.
    409         ZX_DLOG_IF(ERROR, result != ZX_ERR_PEER_CLOSED, result)
    410             << "WriteNoLock(zx_channel_write)";
    411         return false;
    412       }
    413 
    414     } while (write_bytes < message_view.data_num_bytes());
    415 
    416     return true;
    417   }
    418 
    419   void OnWriteError(Error error) {
    420     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    421     DCHECK(reject_writes_);
    422 
    423     if (error == Error::kDisconnected) {
    424       // If we can't write because the pipe is disconnected then continue
    425       // reading to fetch any in-flight messages, relying on end-of-stream to
    426       // signal the actual disconnection.
    427       if (read_watch_) {
    428         // TODO: When we add flow-control for writes, we also need to reset the
    429         // write-watcher here.
    430         return;
    431       }
    432     }
    433 
    434     OnError(error);
    435   }
    436 
    437   // Keeps the Channel alive at least until explicit shutdown on the IO thread.
    438   scoped_refptr<Channel> self_;
    439 
    440   zx::channel handle_;
    441   scoped_refptr<base::TaskRunner> io_task_runner_;
    442 
    443   // These members are only used on the IO thread.
    444   std::unique_ptr<base::MessagePumpForIO::ZxHandleWatchController> read_watch_;
    445   base::circular_deque<zx::handle> incoming_handles_;
    446   bool leak_handle_ = false;
    447 
    448   base::Lock write_lock_;
    449   bool reject_writes_ = false;
    450 
    451   DISALLOW_COPY_AND_ASSIGN(ChannelFuchsia);
    452 };
    453 
    454 }  // namespace
    455 
    456 // static
    457 scoped_refptr<Channel> Channel::Create(
    458     Delegate* delegate,
    459     ConnectionParams connection_params,
    460     scoped_refptr<base::TaskRunner> io_task_runner) {
    461   return new ChannelFuchsia(delegate, std::move(connection_params),
    462                             std::move(io_task_runner));
    463 }
    464 
    465 }  // namespace core
    466 }  // namespace mojo
    467