Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/message_pipe.h"
      6 
      7 #include "base/logging.h"
      8 #include "mojo/system/channel_endpoint.h"
      9 #include "mojo/system/local_message_pipe_endpoint.h"
     10 #include "mojo/system/message_in_transit.h"
     11 #include "mojo/system/message_pipe_dispatcher.h"
     12 #include "mojo/system/message_pipe_endpoint.h"
     13 #include "mojo/system/proxy_message_pipe_endpoint.h"
     14 
     15 namespace mojo {
     16 namespace system {
     17 
     18 // static
     19 MessagePipe* MessagePipe::CreateLocalLocal() {
     20   MessagePipe* message_pipe = new MessagePipe();
     21   message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
     22   message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
     23   return message_pipe;
     24 }
     25 
     26 // static
     27 MessagePipe* MessagePipe::CreateLocalProxy(
     28     scoped_refptr<ChannelEndpoint>* channel_endpoint) {
     29   DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
     30   MessagePipe* message_pipe = new MessagePipe();
     31   message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
     32   *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
     33   message_pipe->endpoints_[1].reset(
     34       new ProxyMessagePipeEndpoint(channel_endpoint->get()));
     35   return message_pipe;
     36 }
     37 
     38 // static
     39 MessagePipe* MessagePipe::CreateProxyLocal(
     40     scoped_refptr<ChannelEndpoint>* channel_endpoint) {
     41   DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
     42   MessagePipe* message_pipe = new MessagePipe();
     43   *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
     44   message_pipe->endpoints_[0].reset(
     45       new ProxyMessagePipeEndpoint(channel_endpoint->get()));
     46   message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
     47   return message_pipe;
     48 }
     49 
     50 // static
     51 unsigned MessagePipe::GetPeerPort(unsigned port) {
     52   DCHECK(port == 0 || port == 1);
     53   return port ^ 1;
     54 }
     55 
     56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
     57   DCHECK(port == 0 || port == 1);
     58   base::AutoLock locker(lock_);
     59   DCHECK(endpoints_[port]);
     60 
     61   return endpoints_[port]->GetType();
     62 }
     63 
     64 void MessagePipe::CancelAllWaiters(unsigned port) {
     65   DCHECK(port == 0 || port == 1);
     66 
     67   base::AutoLock locker(lock_);
     68   DCHECK(endpoints_[port]);
     69   endpoints_[port]->CancelAllWaiters();
     70 }
     71 
     72 void MessagePipe::Close(unsigned port) {
     73   DCHECK(port == 0 || port == 1);
     74 
     75   unsigned destination_port = GetPeerPort(port);
     76 
     77   base::AutoLock locker(lock_);
     78   DCHECK(endpoints_[port]);
     79 
     80   endpoints_[port]->Close();
     81   if (endpoints_[destination_port]) {
     82     if (!endpoints_[destination_port]->OnPeerClose())
     83       endpoints_[destination_port].reset();
     84   }
     85   endpoints_[port].reset();
     86 }
     87 
     88 // TODO(vtl): Handle flags.
     89 MojoResult MessagePipe::WriteMessage(
     90     unsigned port,
     91     UserPointer<const void> bytes,
     92     uint32_t num_bytes,
     93     std::vector<DispatcherTransport>* transports,
     94     MojoWriteMessageFlags flags) {
     95   DCHECK(port == 0 || port == 1);
     96   return EnqueueMessageInternal(
     97       GetPeerPort(port),
     98       make_scoped_ptr(new MessageInTransit(
     99           MessageInTransit::kTypeMessagePipeEndpoint,
    100           MessageInTransit::kSubtypeMessagePipeEndpointData,
    101           num_bytes,
    102           bytes)),
    103       transports);
    104 }
    105 
    106 MojoResult MessagePipe::ReadMessage(unsigned port,
    107                                     UserPointer<void> bytes,
    108                                     UserPointer<uint32_t> num_bytes,
    109                                     DispatcherVector* dispatchers,
    110                                     uint32_t* num_dispatchers,
    111                                     MojoReadMessageFlags flags) {
    112   DCHECK(port == 0 || port == 1);
    113 
    114   base::AutoLock locker(lock_);
    115   DCHECK(endpoints_[port]);
    116 
    117   return endpoints_[port]->ReadMessage(
    118       bytes, num_bytes, dispatchers, num_dispatchers, flags);
    119 }
    120 
    121 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
    122   DCHECK(port == 0 || port == 1);
    123 
    124   base::AutoLock locker(const_cast<base::Lock&>(lock_));
    125   DCHECK(endpoints_[port]);
    126 
    127   return endpoints_[port]->GetHandleSignalsState();
    128 }
    129 
    130 MojoResult MessagePipe::AddWaiter(unsigned port,
    131                                   Waiter* waiter,
    132                                   MojoHandleSignals signals,
    133                                   uint32_t context,
    134                                   HandleSignalsState* signals_state) {
    135   DCHECK(port == 0 || port == 1);
    136 
    137   base::AutoLock locker(lock_);
    138   DCHECK(endpoints_[port]);
    139 
    140   return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
    141 }
    142 
    143 void MessagePipe::RemoveWaiter(unsigned port,
    144                                Waiter* waiter,
    145                                HandleSignalsState* signals_state) {
    146   DCHECK(port == 0 || port == 1);
    147 
    148   base::AutoLock locker(lock_);
    149   DCHECK(endpoints_[port]);
    150 
    151   endpoints_[port]->RemoveWaiter(waiter, signals_state);
    152 }
    153 
    154 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
    155   DCHECK(port == 0 || port == 1);
    156 
    157   base::AutoLock locker(lock_);
    158   DCHECK(endpoints_[port]);
    159   DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
    160 
    161   // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
    162   // |MessagePipe| with two proxy endpoints, which will then act as a proxy
    163   // (rather than trying to connect the two ends directly).
    164   DLOG_IF(WARNING,
    165           !!endpoints_[GetPeerPort(port)] &&
    166               endpoints_[GetPeerPort(port)]->GetType() !=
    167                   MessagePipeEndpoint::kTypeLocal)
    168       << "Direct message pipe passing across multiple channels not yet "
    169          "implemented; will proxy";
    170 
    171   scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
    172   scoped_refptr<ChannelEndpoint> channel_endpoint(
    173       new ChannelEndpoint(this, port));
    174   endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
    175   channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>(
    176                                      old_endpoint.get())->message_queue());
    177   old_endpoint->Close();
    178 
    179   return channel_endpoint;
    180 }
    181 
    182 MojoResult MessagePipe::EnqueueMessage(unsigned port,
    183                                        scoped_ptr<MessageInTransit> message) {
    184   return EnqueueMessageInternal(port, message.Pass(), nullptr);
    185 }
    186 
    187 void MessagePipe::OnRemove(unsigned port) {
    188   unsigned destination_port = GetPeerPort(port);
    189 
    190   base::AutoLock locker(lock_);
    191   // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
    192   if (!endpoints_[port])
    193     return;
    194 
    195   if (endpoints_[destination_port]) {
    196     if (!endpoints_[destination_port]->OnPeerClose())
    197       endpoints_[destination_port].reset();
    198   }
    199   endpoints_[port].reset();
    200 }
    201 
    202 MessagePipe::MessagePipe() {
    203 }
    204 
    205 MessagePipe::~MessagePipe() {
    206   // Owned by the dispatchers. The owning dispatchers should only release us via
    207   // their |Close()| method, which should inform us of being closed via our
    208   // |Close()|. Thus these should already be null.
    209   DCHECK(!endpoints_[0]);
    210   DCHECK(!endpoints_[1]);
    211 }
    212 
    213 MojoResult MessagePipe::EnqueueMessageInternal(
    214     unsigned port,
    215     scoped_ptr<MessageInTransit> message,
    216     std::vector<DispatcherTransport>* transports) {
    217   DCHECK(port == 0 || port == 1);
    218   DCHECK(message);
    219 
    220   if (message->type() == MessageInTransit::kTypeMessagePipe) {
    221     DCHECK(!transports);
    222     return HandleControlMessage(port, message.Pass());
    223   }
    224 
    225   DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
    226 
    227   base::AutoLock locker(lock_);
    228   DCHECK(endpoints_[GetPeerPort(port)]);
    229 
    230   // The destination port need not be open, unlike the source port.
    231   if (!endpoints_[port])
    232     return MOJO_RESULT_FAILED_PRECONDITION;
    233 
    234   if (transports) {
    235     MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
    236     if (result != MOJO_RESULT_OK)
    237       return result;
    238   }
    239 
    240   // The endpoint's |EnqueueMessage()| may not report failure.
    241   endpoints_[port]->EnqueueMessage(message.Pass());
    242   return MOJO_RESULT_OK;
    243 }
    244 
    245 MojoResult MessagePipe::AttachTransportsNoLock(
    246     unsigned port,
    247     MessageInTransit* message,
    248     std::vector<DispatcherTransport>* transports) {
    249   DCHECK(!message->has_dispatchers());
    250 
    251   // You're not allowed to send either handle to a message pipe over the message
    252   // pipe, so check for this. (The case of trying to write a handle to itself is
    253   // taken care of by |Core|. That case kind of makes sense, but leads to
    254   // complications if, e.g., both sides try to do the same thing with their
    255   // respective handles simultaneously. The other case, of trying to write the
    256   // peer handle to a handle, doesn't make sense -- since no handle will be
    257   // available to read the message from.)
    258   for (size_t i = 0; i < transports->size(); i++) {
    259     if (!(*transports)[i].is_valid())
    260       continue;
    261     if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
    262       MessagePipeDispatcherTransport mp_transport((*transports)[i]);
    263       if (mp_transport.GetMessagePipe() == this) {
    264         // The other case should have been disallowed by |Core|. (Note: |port|
    265         // is the peer port of the handle given to |WriteMessage()|.)
    266         DCHECK_EQ(mp_transport.GetPort(), port);
    267         return MOJO_RESULT_INVALID_ARGUMENT;
    268       }
    269     }
    270   }
    271 
    272   // Clone the dispatchers and attach them to the message. (This must be done as
    273   // a separate loop, since we want to leave the dispatchers alone on failure.)
    274   scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
    275   dispatchers->reserve(transports->size());
    276   for (size_t i = 0; i < transports->size(); i++) {
    277     if ((*transports)[i].is_valid()) {
    278       dispatchers->push_back(
    279           (*transports)[i].CreateEquivalentDispatcherAndClose());
    280     } else {
    281       LOG(WARNING) << "Enqueueing null dispatcher";
    282       dispatchers->push_back(scoped_refptr<Dispatcher>());
    283     }
    284   }
    285   message->SetDispatchers(dispatchers.Pass());
    286   return MOJO_RESULT_OK;
    287 }
    288 
    289 MojoResult MessagePipe::HandleControlMessage(
    290     unsigned /*port*/,
    291     scoped_ptr<MessageInTransit> message) {
    292   LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
    293                << message->subtype();
    294   return MOJO_RESULT_UNKNOWN;
    295 }
    296 
    297 }  // namespace system
    298 }  // namespace mojo
    299