Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/proxy_message_pipe_endpoint.h"
      6 
      7 #include <string.h>
      8 
      9 #include "base/logging.h"
     10 #include "mojo/system/channel.h"
     11 #include "mojo/system/local_message_pipe_endpoint.h"
     12 #include "mojo/system/message_pipe_dispatcher.h"
     13 
     14 namespace mojo {
     15 namespace system {
     16 
     17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
     18     : local_id_(MessageInTransit::kInvalidEndpointId),
     19       remote_id_(MessageInTransit::kInvalidEndpointId),
     20       is_peer_open_(true) {
     21 }
     22 
     23 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
     24     LocalMessagePipeEndpoint* local_message_pipe_endpoint,
     25     bool is_peer_open)
     26     : local_id_(MessageInTransit::kInvalidEndpointId),
     27       remote_id_(MessageInTransit::kInvalidEndpointId),
     28       is_peer_open_(is_peer_open),
     29       paused_message_queue_(MessageInTransitQueue::PassContents(),
     30                             local_message_pipe_endpoint->message_queue()) {
     31   local_message_pipe_endpoint->Close();
     32 }
     33 
     34 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
     35   DCHECK(!is_running());
     36   DCHECK(!is_attached());
     37   AssertConsistentState();
     38   DCHECK(paused_message_queue_.IsEmpty());
     39 }
     40 
     41 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const {
     42   return kTypeProxy;
     43 }
     44 
     45 bool ProxyMessagePipeEndpoint::OnPeerClose() {
     46   DCHECK(is_peer_open_);
     47 
     48   is_peer_open_ = false;
     49 
     50   // If our outgoing message queue isn't empty, we shouldn't be destroyed yet.
     51   if (!paused_message_queue_.IsEmpty())
     52     return true;
     53 
     54   if (is_attached()) {
     55     if (!is_running()) {
     56       // If we're not running yet, we can't be destroyed yet, because we're
     57       // still waiting for the "run" message from the other side.
     58       return true;
     59     }
     60 
     61     Detach();
     62   }
     63 
     64   return false;
     65 }
     66 
     67 // Note: We may have to enqueue messages even when our (local) peer isn't open
     68 // -- it may have been written to and closed immediately, before we were ready.
     69 // This case is handled in |Run()| (which will call us).
     70 void ProxyMessagePipeEndpoint::EnqueueMessage(
     71     scoped_ptr<MessageInTransit> message) {
     72   if (is_running()) {
     73     message->SerializeAndCloseDispatchers(channel_.get());
     74 
     75     message->set_source_id(local_id_);
     76     message->set_destination_id(remote_id_);
     77     if (!channel_->WriteMessage(message.Pass()))
     78       LOG(WARNING) << "Failed to write message to channel";
     79   } else {
     80     paused_message_queue_.AddMessage(message.Pass());
     81   }
     82 }
     83 
     84 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel,
     85                                       MessageInTransit::EndpointId local_id) {
     86   DCHECK(channel);
     87   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
     88 
     89   DCHECK(!is_attached());
     90 
     91   AssertConsistentState();
     92   channel_ = channel;
     93   local_id_ = local_id;
     94   AssertConsistentState();
     95 }
     96 
     97 bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
     98   // Assertions about arguments:
     99   DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
    100 
    101   // Assertions about current state:
    102   DCHECK(is_attached());
    103   DCHECK(!is_running());
    104 
    105   AssertConsistentState();
    106   remote_id_ = remote_id;
    107   AssertConsistentState();
    108 
    109   while (!paused_message_queue_.IsEmpty())
    110     EnqueueMessage(paused_message_queue_.GetMessage());
    111 
    112   if (is_peer_open_)
    113     return true;  // Stay alive.
    114 
    115   // We were just waiting to die.
    116   Detach();
    117   return false;
    118 }
    119 
    120 void ProxyMessagePipeEndpoint::OnRemove() {
    121   Detach();
    122 }
    123 
    124 void ProxyMessagePipeEndpoint::Detach() {
    125   DCHECK(is_attached());
    126 
    127   AssertConsistentState();
    128   channel_->DetachMessagePipeEndpoint(local_id_, remote_id_);
    129   channel_ = NULL;
    130   local_id_ = MessageInTransit::kInvalidEndpointId;
    131   remote_id_ = MessageInTransit::kInvalidEndpointId;
    132   paused_message_queue_.Clear();
    133   AssertConsistentState();
    134 }
    135 
    136 #ifndef NDEBUG
    137 void ProxyMessagePipeEndpoint::AssertConsistentState() const {
    138   if (is_attached()) {
    139     DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
    140   } else {  // Not attached.
    141     DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
    142     DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
    143   }
    144 }
    145 #endif
    146 
    147 }  // namespace system
    148 }  // namespace mojo
    149