1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/proxy_message_pipe_endpoint.h" 6 7 #include <string.h> 8 9 #include "base/logging.h" 10 #include "mojo/system/channel.h" 11 #include "mojo/system/local_message_pipe_endpoint.h" 12 #include "mojo/system/message_pipe_dispatcher.h" 13 14 namespace mojo { 15 namespace system { 16 17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() 18 : local_id_(MessageInTransit::kInvalidEndpointId), 19 remote_id_(MessageInTransit::kInvalidEndpointId), 20 is_peer_open_(true) { 21 } 22 23 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( 24 LocalMessagePipeEndpoint* local_message_pipe_endpoint, 25 bool is_peer_open) 26 : local_id_(MessageInTransit::kInvalidEndpointId), 27 remote_id_(MessageInTransit::kInvalidEndpointId), 28 is_peer_open_(is_peer_open), 29 paused_message_queue_(MessageInTransitQueue::PassContents(), 30 local_message_pipe_endpoint->message_queue()) { 31 local_message_pipe_endpoint->Close(); 32 } 33 34 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { 35 DCHECK(!is_running()); 36 DCHECK(!is_attached()); 37 AssertConsistentState(); 38 DCHECK(paused_message_queue_.IsEmpty()); 39 } 40 41 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { 42 return kTypeProxy; 43 } 44 45 bool ProxyMessagePipeEndpoint::OnPeerClose() { 46 DCHECK(is_peer_open_); 47 48 is_peer_open_ = false; 49 50 // If our outgoing message queue isn't empty, we shouldn't be destroyed yet. 51 if (!paused_message_queue_.IsEmpty()) 52 return true; 53 54 if (is_attached()) { 55 if (!is_running()) { 56 // If we're not running yet, we can't be destroyed yet, because we're 57 // still waiting for the "run" message from the other side. 58 return true; 59 } 60 61 Detach(); 62 } 63 64 return false; 65 } 66 67 // Note: We may have to enqueue messages even when our (local) peer isn't open 68 // -- it may have been written to and closed immediately, before we were ready. 69 // This case is handled in |Run()| (which will call us). 70 void ProxyMessagePipeEndpoint::EnqueueMessage( 71 scoped_ptr<MessageInTransit> message) { 72 if (is_running()) { 73 message->SerializeAndCloseDispatchers(channel_.get()); 74 75 message->set_source_id(local_id_); 76 message->set_destination_id(remote_id_); 77 if (!channel_->WriteMessage(message.Pass())) 78 LOG(WARNING) << "Failed to write message to channel"; 79 } else { 80 paused_message_queue_.AddMessage(message.Pass()); 81 } 82 } 83 84 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel, 85 MessageInTransit::EndpointId local_id) { 86 DCHECK(channel); 87 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 88 89 DCHECK(!is_attached()); 90 91 AssertConsistentState(); 92 channel_ = channel; 93 local_id_ = local_id; 94 AssertConsistentState(); 95 } 96 97 bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { 98 // Assertions about arguments: 99 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); 100 101 // Assertions about current state: 102 DCHECK(is_attached()); 103 DCHECK(!is_running()); 104 105 AssertConsistentState(); 106 remote_id_ = remote_id; 107 AssertConsistentState(); 108 109 while (!paused_message_queue_.IsEmpty()) 110 EnqueueMessage(paused_message_queue_.GetMessage()); 111 112 if (is_peer_open_) 113 return true; // Stay alive. 114 115 // We were just waiting to die. 116 Detach(); 117 return false; 118 } 119 120 void ProxyMessagePipeEndpoint::OnRemove() { 121 Detach(); 122 } 123 124 void ProxyMessagePipeEndpoint::Detach() { 125 DCHECK(is_attached()); 126 127 AssertConsistentState(); 128 channel_->DetachMessagePipeEndpoint(local_id_, remote_id_); 129 channel_ = NULL; 130 local_id_ = MessageInTransit::kInvalidEndpointId; 131 remote_id_ = MessageInTransit::kInvalidEndpointId; 132 paused_message_queue_.Clear(); 133 AssertConsistentState(); 134 } 135 136 #ifndef NDEBUG 137 void ProxyMessagePipeEndpoint::AssertConsistentState() const { 138 if (is_attached()) { 139 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); 140 } else { // Not attached. 141 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); 142 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); 143 } 144 } 145 #endif 146 147 } // namespace system 148 } // namespace mojo 149