Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/message_pipe.h"
      6 
      7 #include "base/logging.h"
      8 #include "mojo/system/channel.h"
      9 #include "mojo/system/dispatcher.h"
     10 #include "mojo/system/local_message_pipe_endpoint.h"
     11 #include "mojo/system/message_in_transit.h"
     12 #include "mojo/system/message_pipe_endpoint.h"
     13 #include "mojo/system/proxy_message_pipe_endpoint.h"
     14 
     15 namespace mojo {
     16 namespace system {
     17 
     18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0,
     19                          scoped_ptr<MessagePipeEndpoint> endpoint_1) {
     20   endpoints_[0].reset(endpoint_0.release());
     21   endpoints_[1].reset(endpoint_1.release());
     22 }
     23 
     24 MessagePipe::MessagePipe() {
     25   endpoints_[0].reset(new LocalMessagePipeEndpoint());
     26   endpoints_[1].reset(new LocalMessagePipeEndpoint());
     27 }
     28 
     29 // static
     30 unsigned MessagePipe::GetPeerPort(unsigned port) {
     31   DCHECK(port == 0 || port == 1);
     32   return port ^ 1;
     33 }
     34 
     35 void MessagePipe::CancelAllWaiters(unsigned port) {
     36   DCHECK(port == 0 || port == 1);
     37 
     38   base::AutoLock locker(lock_);
     39   DCHECK(endpoints_[port].get());
     40   endpoints_[port]->CancelAllWaiters();
     41 }
     42 
     43 void MessagePipe::Close(unsigned port) {
     44   DCHECK(port == 0 || port == 1);
     45 
     46   unsigned destination_port = GetPeerPort(port);
     47 
     48   base::AutoLock locker(lock_);
     49   DCHECK(endpoints_[port].get());
     50 
     51   endpoints_[port]->Close();
     52   bool should_destroy_destination = endpoints_[destination_port].get() ?
     53       !endpoints_[destination_port]->OnPeerClose() : false;
     54 
     55   endpoints_[port].reset();
     56   if (should_destroy_destination) {
     57     endpoints_[destination_port]->Close();
     58     endpoints_[destination_port].reset();
     59   }
     60 }
     61 
     62 // TODO(vtl): Support sending handles.
     63 // TODO(vtl): Handle flags.
     64 MojoResult MessagePipe::WriteMessage(
     65     unsigned port,
     66     const void* bytes, uint32_t num_bytes,
     67     const std::vector<Dispatcher*>* dispatchers,
     68     MojoWriteMessageFlags flags) {
     69   DCHECK(port == 0 || port == 1);
     70   return EnqueueMessage(
     71       GetPeerPort(port),
     72       MessageInTransit::Create(
     73           MessageInTransit::kTypeMessagePipeEndpoint,
     74           MessageInTransit::kSubtypeMessagePipeEndpointData,
     75           bytes, num_bytes),
     76       dispatchers);
     77 }
     78 
     79 MojoResult MessagePipe::ReadMessage(
     80     unsigned port,
     81     void* bytes, uint32_t* num_bytes,
     82     std::vector<scoped_refptr<Dispatcher> >* dispatchers,
     83     uint32_t* num_dispatchers,
     84     MojoReadMessageFlags flags) {
     85   DCHECK(port == 0 || port == 1);
     86 
     87   base::AutoLock locker(lock_);
     88   DCHECK(endpoints_[port].get());
     89 
     90   return endpoints_[port]->ReadMessage(bytes, num_bytes,
     91                                        dispatchers, num_dispatchers,
     92                                        flags);
     93 }
     94 
     95 MojoResult MessagePipe::AddWaiter(unsigned port,
     96                                   Waiter* waiter,
     97                                   MojoWaitFlags flags,
     98                                   MojoResult wake_result) {
     99   DCHECK(port == 0 || port == 1);
    100 
    101   base::AutoLock locker(lock_);
    102   DCHECK(endpoints_[port].get());
    103 
    104   return endpoints_[port]->AddWaiter(waiter, flags, wake_result);
    105 }
    106 
    107 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
    108   DCHECK(port == 0 || port == 1);
    109 
    110   base::AutoLock locker(lock_);
    111   DCHECK(endpoints_[port].get());
    112 
    113   endpoints_[port]->RemoveWaiter(waiter);
    114 }
    115 
    116 MojoResult MessagePipe::EnqueueMessage(
    117     unsigned port,
    118     MessageInTransit* message,
    119     const std::vector<Dispatcher*>* dispatchers) {
    120   DCHECK(port == 0 || port == 1);
    121   DCHECK(message);
    122 
    123   if (message->type() == MessageInTransit::kTypeMessagePipe) {
    124     DCHECK(!dispatchers);
    125     return HandleControlMessage(port, message);
    126   }
    127 
    128   DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
    129 
    130   base::AutoLock locker(lock_);
    131   DCHECK(endpoints_[GetPeerPort(port)].get());
    132 
    133   // The destination port need not be open, unlike the source port.
    134   if (!endpoints_[port].get()) {
    135     message->Destroy();
    136     return MOJO_RESULT_FAILED_PRECONDITION;
    137   }
    138 
    139   MojoResult result = endpoints_[port]->CanEnqueueMessage(message, dispatchers);
    140   if (result != MOJO_RESULT_OK) {
    141     message->Destroy();
    142     return result;
    143   }
    144 
    145   if (dispatchers) {
    146     DCHECK(!dispatchers->empty());
    147 
    148     std::vector<scoped_refptr<Dispatcher> > replacement_dispatchers;
    149     for (size_t i = 0; i < dispatchers->size(); i++) {
    150       replacement_dispatchers.push_back(
    151           (*dispatchers)[i]->CreateEquivalentDispatcherAndCloseNoLock());
    152     }
    153 
    154     endpoints_[port]->EnqueueMessage(message, &replacement_dispatchers);
    155   } else {
    156     endpoints_[port]->EnqueueMessage(message, NULL);
    157   }
    158 
    159   return MOJO_RESULT_OK;
    160 }
    161 
    162 void MessagePipe::Attach(unsigned port,
    163                          scoped_refptr<Channel> channel,
    164                          MessageInTransit::EndpointId local_id) {
    165   DCHECK(port == 0 || port == 1);
    166   DCHECK(channel.get());
    167   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
    168 
    169   base::AutoLock locker(lock_);
    170   DCHECK(endpoints_[port].get());
    171 
    172   endpoints_[port]->Attach(channel, local_id);
    173 }
    174 
    175 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
    176   DCHECK(port == 0 || port == 1);
    177   DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
    178 
    179   base::AutoLock locker(lock_);
    180   DCHECK(endpoints_[port].get());
    181 
    182   if (!endpoints_[port]->Run(remote_id)) {
    183     endpoints_[port]->Close();
    184     endpoints_[port].reset();
    185   }
    186 }
    187 
    188 MessagePipe::~MessagePipe() {
    189   // Owned by the dispatchers. The owning dispatchers should only release us via
    190   // their |Close()| method, which should inform us of being closed via our
    191   // |Close()|. Thus these should already be null.
    192   DCHECK(!endpoints_[0].get());
    193   DCHECK(!endpoints_[1].get());
    194 }
    195 
    196 MojoResult MessagePipe::HandleControlMessage(unsigned port,
    197                                              MessageInTransit* message) {
    198   DCHECK(port == 0 || port == 1);
    199   DCHECK(message);
    200   DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe);
    201 
    202   MojoResult rv = MOJO_RESULT_OK;
    203   switch (message->subtype()) {
    204     case MessageInTransit::kSubtypeMessagePipePeerClosed: {
    205       unsigned source_port = GetPeerPort(port);
    206 
    207       base::AutoLock locker(lock_);
    208       DCHECK(endpoints_[source_port].get());
    209 
    210       endpoints_[source_port]->Close();
    211       if (endpoints_[port].get())
    212         endpoints_[port]->OnPeerClose();
    213 
    214       endpoints_[source_port].reset();
    215       break;
    216     }
    217     default:
    218       LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
    219                    << message->subtype();
    220       rv = MOJO_RESULT_UNKNOWN;
    221       break;
    222   }
    223 
    224   message->Destroy();
    225   return rv;
    226 }
    227 
    228 }  // namespace system
    229 }  // namespace mojo
    230