Home | History | Annotate | Download | only in system
      1 // Copyright 2015 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/message_pipe_dispatcher.h"
      6 
      7 #include <limits>
      8 #include <memory>
      9 
     10 #include "base/logging.h"
     11 #include "base/macros.h"
     12 #include "base/memory/ref_counted.h"
     13 #include "mojo/edk/embedder/embedder_internal.h"
     14 #include "mojo/edk/system/core.h"
     15 #include "mojo/edk/system/message_for_transit.h"
     16 #include "mojo/edk/system/node_controller.h"
     17 #include "mojo/edk/system/ports/message_filter.h"
     18 #include "mojo/edk/system/ports_message.h"
     19 #include "mojo/edk/system/request_context.h"
     20 
     21 namespace mojo {
     22 namespace edk {
     23 
     24 namespace {
     25 
     26 using DispatcherHeader = MessageForTransit::DispatcherHeader;
     27 using MessageHeader = MessageForTransit::MessageHeader;
     28 
     29 #pragma pack(push, 1)
     30 
     31 struct SerializedState {
     32   uint64_t pipe_id;
     33   int8_t endpoint;
     34   char padding[7];
     35 };
     36 
     37 static_assert(sizeof(SerializedState) % 8 == 0,
     38               "Invalid SerializedState size.");
     39 
     40 #pragma pack(pop)
     41 
     42 }  // namespace
     43 
     44 // A PortObserver which forwards to a MessagePipeDispatcher. This owns a
     45 // reference to the MPD to ensure it lives as long as the observed port.
     46 class MessagePipeDispatcher::PortObserverThunk
     47     : public NodeController::PortObserver {
     48  public:
     49   explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)
     50       : dispatcher_(dispatcher) {}
     51 
     52  private:
     53   ~PortObserverThunk() override {}
     54 
     55   // NodeController::PortObserver:
     56   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
     57 
     58   scoped_refptr<MessagePipeDispatcher> dispatcher_;
     59 
     60   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
     61 };
     62 
     63 // A MessageFilter used by ReadMessage to determine whether a message should
     64 // actually be consumed yet.
     65 class ReadMessageFilter : public ports::MessageFilter {
     66  public:
     67   // Creates a new ReadMessageFilter which captures and potentially modifies
     68   // various (unowned) local state within MessagePipeDispatcher::ReadMessage.
     69   ReadMessageFilter(bool read_any_size,
     70                     bool may_discard,
     71                     uint32_t* num_bytes,
     72                     uint32_t* num_handles,
     73                     bool* no_space,
     74                     bool* invalid_message)
     75       : read_any_size_(read_any_size),
     76         may_discard_(may_discard),
     77         num_bytes_(num_bytes),
     78         num_handles_(num_handles),
     79         no_space_(no_space),
     80         invalid_message_(invalid_message) {}
     81 
     82   ~ReadMessageFilter() override {}
     83 
     84   // ports::MessageFilter:
     85   bool Match(const ports::Message& m) override {
     86     const PortsMessage& message = static_cast<const PortsMessage&>(m);
     87     if (message.num_payload_bytes() < sizeof(MessageHeader)) {
     88       *invalid_message_ = true;
     89       return true;
     90     }
     91 
     92     const MessageHeader* header =
     93         static_cast<const MessageHeader*>(message.payload_bytes());
     94     if (header->header_size > message.num_payload_bytes()) {
     95       *invalid_message_ = true;
     96       return true;
     97     }
     98 
     99     uint32_t bytes_to_read = 0;
    100     uint32_t bytes_available =
    101         static_cast<uint32_t>(message.num_payload_bytes()) -
    102         header->header_size;
    103     if (num_bytes_) {
    104       bytes_to_read = std::min(*num_bytes_, bytes_available);
    105       *num_bytes_ = bytes_available;
    106     }
    107 
    108     uint32_t handles_to_read = 0;
    109     uint32_t handles_available = header->num_dispatchers;
    110     if (num_handles_) {
    111       handles_to_read = std::min(*num_handles_, handles_available);
    112       *num_handles_ = handles_available;
    113     }
    114 
    115     if (handles_to_read < handles_available ||
    116         (!read_any_size_ && bytes_to_read < bytes_available)) {
    117       *no_space_ = true;
    118       return may_discard_;
    119     }
    120 
    121     return true;
    122   }
    123 
    124  private:
    125   const bool read_any_size_;
    126   const bool may_discard_;
    127   uint32_t* const num_bytes_;
    128   uint32_t* const num_handles_;
    129   bool* const no_space_;
    130   bool* const invalid_message_;
    131 
    132   DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter);
    133 };
    134 
    135 #if DCHECK_IS_ON()
    136 
    137 // A MessageFilter which never matches a message. Used to peek at the size of
    138 // the next available message on a port, for debug logging only.
    139 class PeekSizeMessageFilter : public ports::MessageFilter {
    140  public:
    141   PeekSizeMessageFilter() {}
    142   ~PeekSizeMessageFilter() override {}
    143 
    144   // ports::MessageFilter:
    145   bool Match(const ports::Message& message) override {
    146     message_size_ = message.num_payload_bytes();
    147     return false;
    148   }
    149 
    150   size_t message_size() const { return message_size_; }
    151 
    152  private:
    153   size_t message_size_ = 0;
    154 
    155   DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter);
    156 };
    157 
    158 #endif  // DCHECK_IS_ON()
    159 
    160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
    161                                              const ports::PortRef& port,
    162                                              uint64_t pipe_id,
    163                                              int endpoint)
    164     : node_controller_(node_controller),
    165       port_(port),
    166       pipe_id_(pipe_id),
    167       endpoint_(endpoint) {
    168   DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
    169            << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
    170 
    171   node_controller_->SetPortObserver(
    172       port_,
    173       make_scoped_refptr(new PortObserverThunk(this)));
    174 }
    175 
    176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
    177   node_controller_->SetPortObserver(port_, nullptr);
    178   node_controller_->SetPortObserver(other->port_, nullptr);
    179 
    180   ports::PortRef port0;
    181   {
    182     base::AutoLock lock(signal_lock_);
    183     port0 = port_;
    184     port_closed_.Set(true);
    185     awakables_.CancelAll();
    186   }
    187 
    188   ports::PortRef port1;
    189   {
    190     base::AutoLock lock(other->signal_lock_);
    191     port1 = other->port_;
    192     other->port_closed_.Set(true);
    193     other->awakables_.CancelAll();
    194   }
    195 
    196   // Both ports are always closed by this call.
    197   int rv = node_controller_->MergeLocalPorts(port0, port1);
    198   return rv == ports::OK;
    199 }
    200 
    201 Dispatcher::Type MessagePipeDispatcher::GetType() const {
    202   return Type::MESSAGE_PIPE;
    203 }
    204 
    205 MojoResult MessagePipeDispatcher::Close() {
    206   base::AutoLock lock(signal_lock_);
    207   DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
    208            << " [port=" << port_.name() << "]";
    209   return CloseNoLock();
    210 }
    211 
    212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
    213                                         const Watcher::WatchCallback& callback,
    214                                         uintptr_t context) {
    215   base::AutoLock lock(signal_lock_);
    216 
    217   if (port_closed_ || in_transit_)
    218     return MOJO_RESULT_INVALID_ARGUMENT;
    219 
    220   return awakables_.AddWatcher(
    221       signals, callback, context, GetHandleSignalsStateNoLock());
    222 }
    223 
    224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
    225   base::AutoLock lock(signal_lock_);
    226 
    227   if (port_closed_ || in_transit_)
    228     return MOJO_RESULT_INVALID_ARGUMENT;
    229 
    230   return awakables_.RemoveWatcher(context);
    231 }
    232 
    233 MojoResult MessagePipeDispatcher::WriteMessage(
    234     std::unique_ptr<MessageForTransit> message,
    235     MojoWriteMessageFlags flags) {
    236   if (port_closed_ || in_transit_)
    237     return MOJO_RESULT_INVALID_ARGUMENT;
    238 
    239   size_t num_bytes = message->num_bytes();
    240   int rv = node_controller_->SendMessage(port_, message->TakePortsMessage());
    241 
    242   DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
    243            << " [port=" << port_.name() << "; rv=" << rv
    244            << "; num_bytes=" << num_bytes << "]";
    245 
    246   if (rv != ports::OK) {
    247     if (rv == ports::ERROR_PORT_UNKNOWN ||
    248         rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
    249         rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
    250       return MOJO_RESULT_INVALID_ARGUMENT;
    251     } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
    252       return MOJO_RESULT_FAILED_PRECONDITION;
    253     }
    254 
    255     NOTREACHED();
    256     return MOJO_RESULT_UNKNOWN;
    257   }
    258 
    259   return MOJO_RESULT_OK;
    260 }
    261 
    262 MojoResult MessagePipeDispatcher::ReadMessage(
    263     std::unique_ptr<MessageForTransit>* message,
    264     uint32_t* num_bytes,
    265     MojoHandle* handles,
    266     uint32_t* num_handles,
    267     MojoReadMessageFlags flags,
    268     bool read_any_size) {
    269   // We can't read from a port that's closed or in transit!
    270   if (port_closed_ || in_transit_)
    271     return MOJO_RESULT_INVALID_ARGUMENT;
    272 
    273   bool no_space = false;
    274   bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
    275   bool invalid_message = false;
    276 
    277   // Grab a message if the provided handles buffer is large enough. If the input
    278   // |num_bytes| is provided and |read_any_size| is false, we also ensure
    279   // that it specifies a size at least as large as the next available payload.
    280   //
    281   // If |read_any_size| is true, the input value of |*num_bytes| is ignored.
    282   // This flag exists to support both new and old API behavior.
    283 
    284   ports::ScopedMessage ports_message;
    285   ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles,
    286                            &no_space, &invalid_message);
    287   int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter);
    288 
    289   if (invalid_message)
    290     return MOJO_RESULT_UNKNOWN;
    291 
    292   if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
    293     if (rv == ports::ERROR_PORT_UNKNOWN ||
    294         rv == ports::ERROR_PORT_STATE_UNEXPECTED)
    295       return MOJO_RESULT_INVALID_ARGUMENT;
    296 
    297     NOTREACHED();
    298     return MOJO_RESULT_UNKNOWN;  // TODO: Add a better error code here?
    299   }
    300 
    301   if (no_space) {
    302     // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't
    303     // sufficient to hold this message's data. The message will still be in
    304     // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
    305     return MOJO_RESULT_RESOURCE_EXHAUSTED;
    306   }
    307 
    308   if (!ports_message) {
    309     // No message was available in queue.
    310 
    311     if (rv == ports::OK)
    312       return MOJO_RESULT_SHOULD_WAIT;
    313 
    314     // Peer is closed and there are no more messages to read.
    315     DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
    316     return MOJO_RESULT_FAILED_PRECONDITION;
    317   }
    318 
    319   // Alright! We have a message and the caller has provided sufficient storage
    320   // in which to receive it.
    321 
    322   std::unique_ptr<PortsMessage> msg(
    323       static_cast<PortsMessage*>(ports_message.release()));
    324 
    325   const MessageHeader* header =
    326       static_cast<const MessageHeader*>(msg->payload_bytes());
    327   const DispatcherHeader* dispatcher_headers =
    328       reinterpret_cast<const DispatcherHeader*>(header + 1);
    329 
    330   if (header->num_dispatchers > std::numeric_limits<uint16_t>::max())
    331     return MOJO_RESULT_UNKNOWN;
    332 
    333   // Deserialize dispatchers.
    334   if (header->num_dispatchers > 0) {
    335     CHECK(handles);
    336     std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers);
    337     size_t data_payload_index = sizeof(MessageHeader) +
    338         header->num_dispatchers * sizeof(DispatcherHeader);
    339     if (data_payload_index > header->header_size)
    340       return MOJO_RESULT_UNKNOWN;
    341     const char* dispatcher_data = reinterpret_cast<const char*>(
    342         dispatcher_headers + header->num_dispatchers);
    343     size_t port_index = 0;
    344     size_t platform_handle_index = 0;
    345     ScopedPlatformHandleVectorPtr msg_handles = msg->TakeHandles();
    346     const size_t num_msg_handles = msg_handles ? msg_handles->size() : 0;
    347     for (size_t i = 0; i < header->num_dispatchers; ++i) {
    348       const DispatcherHeader& dh = dispatcher_headers[i];
    349       Type type = static_cast<Type>(dh.type);
    350 
    351       size_t next_payload_index = data_payload_index + dh.num_bytes;
    352       if (msg->num_payload_bytes() < next_payload_index ||
    353           next_payload_index < data_payload_index) {
    354         return MOJO_RESULT_UNKNOWN;
    355       }
    356 
    357       size_t next_port_index = port_index + dh.num_ports;
    358       if (msg->num_ports() < next_port_index || next_port_index < port_index)
    359         return MOJO_RESULT_UNKNOWN;
    360 
    361       size_t next_platform_handle_index =
    362           platform_handle_index + dh.num_platform_handles;
    363       if (num_msg_handles < next_platform_handle_index ||
    364           next_platform_handle_index < platform_handle_index) {
    365         return MOJO_RESULT_UNKNOWN;
    366       }
    367 
    368       PlatformHandle* out_handles =
    369           num_msg_handles ? msg_handles->data() + platform_handle_index
    370                           : nullptr;
    371       dispatchers[i].dispatcher = Dispatcher::Deserialize(
    372           type, dispatcher_data, dh.num_bytes, msg->ports() + port_index,
    373           dh.num_ports, out_handles, dh.num_platform_handles);
    374       if (!dispatchers[i].dispatcher)
    375         return MOJO_RESULT_UNKNOWN;
    376 
    377       dispatcher_data += dh.num_bytes;
    378       data_payload_index = next_payload_index;
    379       port_index = next_port_index;
    380       platform_handle_index = next_platform_handle_index;
    381     }
    382 
    383     if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers,
    384                                                              handles))
    385       return MOJO_RESULT_UNKNOWN;
    386   }
    387 
    388   CHECK(msg);
    389   *message = MessageForTransit::WrapPortsMessage(std::move(msg));
    390   return MOJO_RESULT_OK;
    391 }
    392 
    393 HandleSignalsState
    394 MessagePipeDispatcher::GetHandleSignalsState() const {
    395   base::AutoLock lock(signal_lock_);
    396   return GetHandleSignalsStateNoLock();
    397 }
    398 
    399 MojoResult MessagePipeDispatcher::AddAwakable(
    400     Awakable* awakable,
    401     MojoHandleSignals signals,
    402     uintptr_t context,
    403     HandleSignalsState* signals_state) {
    404   base::AutoLock lock(signal_lock_);
    405 
    406   if (port_closed_ || in_transit_) {
    407     if (signals_state)
    408       *signals_state = HandleSignalsState();
    409     return MOJO_RESULT_INVALID_ARGUMENT;
    410   }
    411 
    412   HandleSignalsState state = GetHandleSignalsStateNoLock();
    413 
    414   DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint "
    415            << endpoint_ << " [awakable=" << awakable << "; port="
    416            << port_.name() << "; signals=" << signals << "; satisfied="
    417            << state.satisfied_signals << "; satisfiable="
    418            << state.satisfiable_signals << "]";
    419 
    420   if (state.satisfies(signals)) {
    421     if (signals_state)
    422       *signals_state = state;
    423     DVLOG(2) << "Signals already set for " << port_.name();
    424     return MOJO_RESULT_ALREADY_EXISTS;
    425   }
    426   if (!state.can_satisfy(signals)) {
    427     if (signals_state)
    428       *signals_state = state;
    429     DVLOG(2) << "Signals impossible to satisfy for " << port_.name();
    430     return MOJO_RESULT_FAILED_PRECONDITION;
    431   }
    432 
    433   DVLOG(2) << "Adding awakable to pipe " << pipe_id_ << " endpoint "
    434            << endpoint_ << " [awakable=" << awakable << "; port="
    435            << port_.name() << "; signals=" << signals << "]";
    436 
    437   awakables_.Add(awakable, signals, context);
    438   return MOJO_RESULT_OK;
    439 }
    440 
    441 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable,
    442                                            HandleSignalsState* signals_state) {
    443   base::AutoLock lock(signal_lock_);
    444   if (port_closed_ || in_transit_) {
    445     if (signals_state)
    446       *signals_state = HandleSignalsState();
    447   } else if (signals_state) {
    448     *signals_state = GetHandleSignalsStateNoLock();
    449   }
    450 
    451   DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint "
    452            << endpoint_ << " [awakable=" << awakable << "; port="
    453            << port_.name() << "]";
    454 
    455   awakables_.Remove(awakable);
    456 }
    457 
    458 void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes,
    459                                            uint32_t* num_ports,
    460                                            uint32_t* num_handles) {
    461   *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
    462   *num_ports = 1;
    463   *num_handles = 0;
    464 }
    465 
    466 bool MessagePipeDispatcher::EndSerialize(void* destination,
    467                                          ports::PortName* ports,
    468                                          PlatformHandle* handles) {
    469   SerializedState* state = static_cast<SerializedState*>(destination);
    470   state->pipe_id = pipe_id_;
    471   state->endpoint = static_cast<int8_t>(endpoint_);
    472   memset(state->padding, 0, sizeof(state->padding));
    473   ports[0] = port_.name();
    474   return true;
    475 }
    476 
    477 bool MessagePipeDispatcher::BeginTransit() {
    478   base::AutoLock lock(signal_lock_);
    479   if (in_transit_ || port_closed_)
    480     return false;
    481   in_transit_.Set(true);
    482   return in_transit_;
    483 }
    484 
    485 void MessagePipeDispatcher::CompleteTransitAndClose() {
    486   node_controller_->SetPortObserver(port_, nullptr);
    487 
    488   base::AutoLock lock(signal_lock_);
    489   port_transferred_ = true;
    490   in_transit_.Set(false);
    491   CloseNoLock();
    492 }
    493 
    494 void MessagePipeDispatcher::CancelTransit() {
    495   base::AutoLock lock(signal_lock_);
    496   in_transit_.Set(false);
    497 
    498   // Something may have happened while we were waiting for potential transit.
    499   awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
    500 }
    501 
    502 // static
    503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
    504     const void* data,
    505     size_t num_bytes,
    506     const ports::PortName* ports,
    507     size_t num_ports,
    508     PlatformHandle* handles,
    509     size_t num_handles) {
    510   if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState))
    511     return nullptr;
    512 
    513   const SerializedState* state = static_cast<const SerializedState*>(data);
    514 
    515   ports::PortRef port;
    516   CHECK_EQ(
    517       ports::OK,
    518       internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port));
    519 
    520   return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port,
    521                                    state->pipe_id, state->endpoint);
    522 }
    523 
    524 MessagePipeDispatcher::~MessagePipeDispatcher() {
    525   DCHECK(port_closed_ && !in_transit_);
    526 }
    527 
    528 MojoResult MessagePipeDispatcher::CloseNoLock() {
    529   signal_lock_.AssertAcquired();
    530   if (port_closed_ || in_transit_)
    531     return MOJO_RESULT_INVALID_ARGUMENT;
    532 
    533   port_closed_.Set(true);
    534   awakables_.CancelAll();
    535 
    536   if (!port_transferred_) {
    537     base::AutoUnlock unlock(signal_lock_);
    538     node_controller_->ClosePort(port_);
    539   }
    540 
    541   return MOJO_RESULT_OK;
    542 }
    543 
    544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
    545   HandleSignalsState rv;
    546 
    547   ports::PortStatus port_status;
    548   if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
    549     CHECK(in_transit_ || port_transferred_ || port_closed_);
    550     return HandleSignalsState();
    551   }
    552 
    553   if (port_status.has_messages) {
    554     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    555     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    556   }
    557   if (port_status.receiving_messages)
    558     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    559   if (!port_status.peer_closed) {
    560     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
    561     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
    562     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    563   } else {
    564     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
    565   }
    566   rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
    567   return rv;
    568 }
    569 
    570 void MessagePipeDispatcher::OnPortStatusChanged() {
    571   DCHECK(RequestContext::current());
    572 
    573   base::AutoLock lock(signal_lock_);
    574 
    575   // We stop observing our port as soon as it's transferred, but this can race
    576   // with events which are raised right before that happens. This is fine to
    577   // ignore.
    578   if (port_transferred_)
    579     return;
    580 
    581 #if DCHECK_IS_ON()
    582   ports::PortStatus port_status;
    583   if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) {
    584     if (port_status.has_messages) {
    585       ports::ScopedMessage unused;
    586       PeekSizeMessageFilter filter;
    587       node_controller_->node()->GetMessage(port_, &unused, &filter);
    588       DVLOG(4) << "New message detected on message pipe " << pipe_id_
    589                << " endpoint " << endpoint_ << " [port=" << port_.name()
    590                << "; size=" << filter.message_size() << "]";
    591     }
    592     if (port_status.peer_closed) {
    593       DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
    594                << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
    595     }
    596   }
    597 #endif
    598 
    599   awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
    600 }
    601 
    602 }  // namespace edk
    603 }  // namespace mojo
    604