Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/message_pipe.h"
      6 
      7 #include "base/logging.h"
      8 #include "mojo/system/channel.h"
      9 #include "mojo/system/local_message_pipe_endpoint.h"
     10 #include "mojo/system/message_in_transit.h"
     11 #include "mojo/system/message_pipe_dispatcher.h"
     12 #include "mojo/system/message_pipe_endpoint.h"
     13 #include "mojo/system/proxy_message_pipe_endpoint.h"
     14 
     15 namespace mojo {
     16 namespace system {
     17 
     18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
     19                          scoped_ptr<MessagePipeEndpoint> endpoint1) {
     20   endpoints_[0].reset(endpoint0.release());
     21   endpoints_[1].reset(endpoint1.release());
     22 }
     23 
     24 MessagePipe::MessagePipe() {
     25   endpoints_[0].reset(new LocalMessagePipeEndpoint());
     26   endpoints_[1].reset(new LocalMessagePipeEndpoint());
     27 }
     28 
     29 // static
     30 unsigned MessagePipe::GetPeerPort(unsigned port) {
     31   DCHECK(port == 0 || port == 1);
     32   return port ^ 1;
     33 }
     34 
     35 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
     36   DCHECK(port == 0 || port == 1);
     37   base::AutoLock locker(lock_);
     38   DCHECK(endpoints_[port]);
     39 
     40   return endpoints_[port]->GetType();
     41 }
     42 
     43 void MessagePipe::CancelAllWaiters(unsigned port) {
     44   DCHECK(port == 0 || port == 1);
     45 
     46   base::AutoLock locker(lock_);
     47   DCHECK(endpoints_[port]);
     48   endpoints_[port]->CancelAllWaiters();
     49 }
     50 
     51 void MessagePipe::Close(unsigned port) {
     52   DCHECK(port == 0 || port == 1);
     53 
     54   unsigned destination_port = GetPeerPort(port);
     55 
     56   base::AutoLock locker(lock_);
     57   DCHECK(endpoints_[port]);
     58 
     59   endpoints_[port]->Close();
     60   if (endpoints_[destination_port]) {
     61     if (!endpoints_[destination_port]->OnPeerClose())
     62       endpoints_[destination_port].reset();
     63   }
     64   endpoints_[port].reset();
     65 }
     66 
     67 // TODO(vtl): Handle flags.
     68 MojoResult MessagePipe::WriteMessage(
     69     unsigned port,
     70     const void* bytes,
     71     uint32_t num_bytes,
     72     std::vector<DispatcherTransport>* transports,
     73     MojoWriteMessageFlags flags) {
     74   DCHECK(port == 0 || port == 1);
     75   return EnqueueMessageInternal(
     76       GetPeerPort(port),
     77       make_scoped_ptr(new MessageInTransit(
     78           MessageInTransit::kTypeMessagePipeEndpoint,
     79           MessageInTransit::kSubtypeMessagePipeEndpointData,
     80           num_bytes,
     81           bytes)),
     82       transports);
     83 }
     84 
     85 MojoResult MessagePipe::ReadMessage(unsigned port,
     86                                     void* bytes,
     87                                     uint32_t* num_bytes,
     88                                     DispatcherVector* dispatchers,
     89                                     uint32_t* num_dispatchers,
     90                                     MojoReadMessageFlags flags) {
     91   DCHECK(port == 0 || port == 1);
     92 
     93   base::AutoLock locker(lock_);
     94   DCHECK(endpoints_[port]);
     95 
     96   return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers,
     97                                        num_dispatchers, flags);
     98 }
     99 
    100 MojoResult MessagePipe::AddWaiter(unsigned port,
    101                                   Waiter* waiter,
    102                                   MojoHandleSignals signals,
    103                                   uint32_t context) {
    104   DCHECK(port == 0 || port == 1);
    105 
    106   base::AutoLock locker(lock_);
    107   DCHECK(endpoints_[port]);
    108 
    109   return endpoints_[port]->AddWaiter(waiter, signals, context);
    110 }
    111 
    112 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
    113   DCHECK(port == 0 || port == 1);
    114 
    115   base::AutoLock locker(lock_);
    116   DCHECK(endpoints_[port]);
    117 
    118   endpoints_[port]->RemoveWaiter(waiter);
    119 }
    120 
    121 void MessagePipe::ConvertLocalToProxy(unsigned port) {
    122   DCHECK(port == 0 || port == 1);
    123 
    124   base::AutoLock locker(lock_);
    125   DCHECK(endpoints_[port]);
    126   DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
    127 
    128   bool is_peer_open = !!endpoints_[GetPeerPort(port)];
    129 
    130   // TODO(vtl): Hopefully this will work if the peer has been closed and when
    131   // the peer is local. If the peer is remote, we should do something more
    132   // sophisticated.
    133   DCHECK(!is_peer_open ||
    134          endpoints_[GetPeerPort(port)]->GetType() ==
    135              MessagePipeEndpoint::kTypeLocal);
    136 
    137   scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
    138       new ProxyMessagePipeEndpoint(
    139           static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
    140           is_peer_open));
    141   endpoints_[port].swap(replacement_endpoint);
    142 }
    143 
    144 MojoResult MessagePipe::EnqueueMessage(
    145     unsigned port,
    146     scoped_ptr<MessageInTransit> message) {
    147   return EnqueueMessageInternal(port, message.Pass(), NULL);
    148 }
    149 
    150 bool MessagePipe::Attach(unsigned port,
    151                          scoped_refptr<Channel> channel,
    152                          MessageInTransit::EndpointId local_id) {
    153   DCHECK(port == 0 || port == 1);
    154   DCHECK(channel);
    155   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
    156 
    157   base::AutoLock locker(lock_);
    158   if (!endpoints_[port])
    159     return false;
    160 
    161   DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
    162   endpoints_[port]->Attach(channel, local_id);
    163   return true;
    164 }
    165 
    166 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
    167   DCHECK(port == 0 || port == 1);
    168   DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
    169 
    170   base::AutoLock locker(lock_);
    171   DCHECK(endpoints_[port]);
    172   if (!endpoints_[port]->Run(remote_id))
    173     endpoints_[port].reset();
    174 }
    175 
    176 void MessagePipe::OnRemove(unsigned port) {
    177   unsigned destination_port = GetPeerPort(port);
    178 
    179   base::AutoLock locker(lock_);
    180   // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
    181   if (!endpoints_[port])
    182     return;
    183 
    184   endpoints_[port]->OnRemove();
    185   if (endpoints_[destination_port]) {
    186     if (!endpoints_[destination_port]->OnPeerClose())
    187       endpoints_[destination_port].reset();
    188   }
    189   endpoints_[port].reset();
    190 }
    191 
    192 MessagePipe::~MessagePipe() {
    193   // Owned by the dispatchers. The owning dispatchers should only release us via
    194   // their |Close()| method, which should inform us of being closed via our
    195   // |Close()|. Thus these should already be null.
    196   DCHECK(!endpoints_[0]);
    197   DCHECK(!endpoints_[1]);
    198 }
    199 
    200 MojoResult MessagePipe::EnqueueMessageInternal(
    201     unsigned port,
    202     scoped_ptr<MessageInTransit> message,
    203     std::vector<DispatcherTransport>* transports) {
    204   DCHECK(port == 0 || port == 1);
    205   DCHECK(message);
    206 
    207   if (message->type() == MessageInTransit::kTypeMessagePipe) {
    208     DCHECK(!transports);
    209     return HandleControlMessage(port, message.Pass());
    210   }
    211 
    212   DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
    213 
    214   base::AutoLock locker(lock_);
    215   DCHECK(endpoints_[GetPeerPort(port)]);
    216 
    217   // The destination port need not be open, unlike the source port.
    218   if (!endpoints_[port])
    219     return MOJO_RESULT_FAILED_PRECONDITION;
    220 
    221   if (transports) {
    222     MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
    223     if (result != MOJO_RESULT_OK)
    224       return result;
    225   }
    226 
    227   // The endpoint's |EnqueueMessage()| may not report failure.
    228   endpoints_[port]->EnqueueMessage(message.Pass());
    229   return MOJO_RESULT_OK;
    230 }
    231 
    232 MojoResult MessagePipe::AttachTransportsNoLock(
    233     unsigned port,
    234     MessageInTransit* message,
    235     std::vector<DispatcherTransport>* transports) {
    236   DCHECK(!message->has_dispatchers());
    237 
    238   // You're not allowed to send either handle to a message pipe over the message
    239   // pipe, so check for this. (The case of trying to write a handle to itself is
    240   // taken care of by |Core|. That case kind of makes sense, but leads to
    241   // complications if, e.g., both sides try to do the same thing with their
    242   // respective handles simultaneously. The other case, of trying to write the
    243   // peer handle to a handle, doesn't make sense -- since no handle will be
    244   // available to read the message from.)
    245   for (size_t i = 0; i < transports->size(); i++) {
    246     if (!(*transports)[i].is_valid())
    247       continue;
    248     if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
    249       MessagePipeDispatcherTransport mp_transport((*transports)[i]);
    250       if (mp_transport.GetMessagePipe() == this) {
    251         // The other case should have been disallowed by |Core|. (Note: |port|
    252         // is the peer port of the handle given to |WriteMessage()|.)
    253         DCHECK_EQ(mp_transport.GetPort(), port);
    254         return MOJO_RESULT_INVALID_ARGUMENT;
    255       }
    256     }
    257   }
    258 
    259   // Clone the dispatchers and attach them to the message. (This must be done as
    260   // a separate loop, since we want to leave the dispatchers alone on failure.)
    261   scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
    262   dispatchers->reserve(transports->size());
    263   for (size_t i = 0; i < transports->size(); i++) {
    264     if ((*transports)[i].is_valid()) {
    265       dispatchers->push_back(
    266           (*transports)[i].CreateEquivalentDispatcherAndClose());
    267     } else {
    268       LOG(WARNING) << "Enqueueing null dispatcher";
    269       dispatchers->push_back(scoped_refptr<Dispatcher>());
    270     }
    271   }
    272   message->SetDispatchers(dispatchers.Pass());
    273   return MOJO_RESULT_OK;
    274 }
    275 
    276 MojoResult MessagePipe::HandleControlMessage(
    277     unsigned /*port*/,
    278     scoped_ptr<MessageInTransit> message) {
    279   LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
    280                << message->subtype();
    281   return MOJO_RESULT_UNKNOWN;
    282 }
    283 
    284 }  // namespace system
    285 }  // namespace mojo
    286