1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/message_pipe.h" 6 7 #include "base/logging.h" 8 #include "mojo/system/channel.h" 9 #include "mojo/system/local_message_pipe_endpoint.h" 10 #include "mojo/system/message_in_transit.h" 11 #include "mojo/system/message_pipe_dispatcher.h" 12 #include "mojo/system/message_pipe_endpoint.h" 13 #include "mojo/system/proxy_message_pipe_endpoint.h" 14 15 namespace mojo { 16 namespace system { 17 18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0, 19 scoped_ptr<MessagePipeEndpoint> endpoint1) { 20 endpoints_[0].reset(endpoint0.release()); 21 endpoints_[1].reset(endpoint1.release()); 22 } 23 24 MessagePipe::MessagePipe() { 25 endpoints_[0].reset(new LocalMessagePipeEndpoint()); 26 endpoints_[1].reset(new LocalMessagePipeEndpoint()); 27 } 28 29 // static 30 unsigned MessagePipe::GetPeerPort(unsigned port) { 31 DCHECK(port == 0 || port == 1); 32 return port ^ 1; 33 } 34 35 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { 36 DCHECK(port == 0 || port == 1); 37 base::AutoLock locker(lock_); 38 DCHECK(endpoints_[port]); 39 40 return endpoints_[port]->GetType(); 41 } 42 43 void MessagePipe::CancelAllWaiters(unsigned port) { 44 DCHECK(port == 0 || port == 1); 45 46 base::AutoLock locker(lock_); 47 DCHECK(endpoints_[port]); 48 endpoints_[port]->CancelAllWaiters(); 49 } 50 51 void MessagePipe::Close(unsigned port) { 52 DCHECK(port == 0 || port == 1); 53 54 unsigned destination_port = GetPeerPort(port); 55 56 base::AutoLock locker(lock_); 57 DCHECK(endpoints_[port]); 58 59 endpoints_[port]->Close(); 60 if (endpoints_[destination_port]) { 61 if (!endpoints_[destination_port]->OnPeerClose()) 62 endpoints_[destination_port].reset(); 63 } 64 endpoints_[port].reset(); 65 } 66 67 // TODO(vtl): Handle flags. 68 MojoResult MessagePipe::WriteMessage( 69 unsigned port, 70 const void* bytes, 71 uint32_t num_bytes, 72 std::vector<DispatcherTransport>* transports, 73 MojoWriteMessageFlags flags) { 74 DCHECK(port == 0 || port == 1); 75 return EnqueueMessageInternal( 76 GetPeerPort(port), 77 make_scoped_ptr(new MessageInTransit( 78 MessageInTransit::kTypeMessagePipeEndpoint, 79 MessageInTransit::kSubtypeMessagePipeEndpointData, 80 num_bytes, 81 bytes)), 82 transports); 83 } 84 85 MojoResult MessagePipe::ReadMessage(unsigned port, 86 void* bytes, 87 uint32_t* num_bytes, 88 DispatcherVector* dispatchers, 89 uint32_t* num_dispatchers, 90 MojoReadMessageFlags flags) { 91 DCHECK(port == 0 || port == 1); 92 93 base::AutoLock locker(lock_); 94 DCHECK(endpoints_[port]); 95 96 return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers, 97 num_dispatchers, flags); 98 } 99 100 MojoResult MessagePipe::AddWaiter(unsigned port, 101 Waiter* waiter, 102 MojoHandleSignals signals, 103 uint32_t context) { 104 DCHECK(port == 0 || port == 1); 105 106 base::AutoLock locker(lock_); 107 DCHECK(endpoints_[port]); 108 109 return endpoints_[port]->AddWaiter(waiter, signals, context); 110 } 111 112 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { 113 DCHECK(port == 0 || port == 1); 114 115 base::AutoLock locker(lock_); 116 DCHECK(endpoints_[port]); 117 118 endpoints_[port]->RemoveWaiter(waiter); 119 } 120 121 void MessagePipe::ConvertLocalToProxy(unsigned port) { 122 DCHECK(port == 0 || port == 1); 123 124 base::AutoLock locker(lock_); 125 DCHECK(endpoints_[port]); 126 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); 127 128 bool is_peer_open = !!endpoints_[GetPeerPort(port)]; 129 130 // TODO(vtl): Hopefully this will work if the peer has been closed and when 131 // the peer is local. If the peer is remote, we should do something more 132 // sophisticated. 133 DCHECK(!is_peer_open || 134 endpoints_[GetPeerPort(port)]->GetType() == 135 MessagePipeEndpoint::kTypeLocal); 136 137 scoped_ptr<MessagePipeEndpoint> replacement_endpoint( 138 new ProxyMessagePipeEndpoint( 139 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()), 140 is_peer_open)); 141 endpoints_[port].swap(replacement_endpoint); 142 } 143 144 MojoResult MessagePipe::EnqueueMessage( 145 unsigned port, 146 scoped_ptr<MessageInTransit> message) { 147 return EnqueueMessageInternal(port, message.Pass(), NULL); 148 } 149 150 bool MessagePipe::Attach(unsigned port, 151 scoped_refptr<Channel> channel, 152 MessageInTransit::EndpointId local_id) { 153 DCHECK(port == 0 || port == 1); 154 DCHECK(channel); 155 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 156 157 base::AutoLock locker(lock_); 158 if (!endpoints_[port]) 159 return false; 160 161 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy); 162 endpoints_[port]->Attach(channel, local_id); 163 return true; 164 } 165 166 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) { 167 DCHECK(port == 0 || port == 1); 168 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); 169 170 base::AutoLock locker(lock_); 171 DCHECK(endpoints_[port]); 172 if (!endpoints_[port]->Run(remote_id)) 173 endpoints_[port].reset(); 174 } 175 176 void MessagePipe::OnRemove(unsigned port) { 177 unsigned destination_port = GetPeerPort(port); 178 179 base::AutoLock locker(lock_); 180 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called. 181 if (!endpoints_[port]) 182 return; 183 184 endpoints_[port]->OnRemove(); 185 if (endpoints_[destination_port]) { 186 if (!endpoints_[destination_port]->OnPeerClose()) 187 endpoints_[destination_port].reset(); 188 } 189 endpoints_[port].reset(); 190 } 191 192 MessagePipe::~MessagePipe() { 193 // Owned by the dispatchers. The owning dispatchers should only release us via 194 // their |Close()| method, which should inform us of being closed via our 195 // |Close()|. Thus these should already be null. 196 DCHECK(!endpoints_[0]); 197 DCHECK(!endpoints_[1]); 198 } 199 200 MojoResult MessagePipe::EnqueueMessageInternal( 201 unsigned port, 202 scoped_ptr<MessageInTransit> message, 203 std::vector<DispatcherTransport>* transports) { 204 DCHECK(port == 0 || port == 1); 205 DCHECK(message); 206 207 if (message->type() == MessageInTransit::kTypeMessagePipe) { 208 DCHECK(!transports); 209 return HandleControlMessage(port, message.Pass()); 210 } 211 212 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint); 213 214 base::AutoLock locker(lock_); 215 DCHECK(endpoints_[GetPeerPort(port)]); 216 217 // The destination port need not be open, unlike the source port. 218 if (!endpoints_[port]) 219 return MOJO_RESULT_FAILED_PRECONDITION; 220 221 if (transports) { 222 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); 223 if (result != MOJO_RESULT_OK) 224 return result; 225 } 226 227 // The endpoint's |EnqueueMessage()| may not report failure. 228 endpoints_[port]->EnqueueMessage(message.Pass()); 229 return MOJO_RESULT_OK; 230 } 231 232 MojoResult MessagePipe::AttachTransportsNoLock( 233 unsigned port, 234 MessageInTransit* message, 235 std::vector<DispatcherTransport>* transports) { 236 DCHECK(!message->has_dispatchers()); 237 238 // You're not allowed to send either handle to a message pipe over the message 239 // pipe, so check for this. (The case of trying to write a handle to itself is 240 // taken care of by |Core|. That case kind of makes sense, but leads to 241 // complications if, e.g., both sides try to do the same thing with their 242 // respective handles simultaneously. The other case, of trying to write the 243 // peer handle to a handle, doesn't make sense -- since no handle will be 244 // available to read the message from.) 245 for (size_t i = 0; i < transports->size(); i++) { 246 if (!(*transports)[i].is_valid()) 247 continue; 248 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) { 249 MessagePipeDispatcherTransport mp_transport((*transports)[i]); 250 if (mp_transport.GetMessagePipe() == this) { 251 // The other case should have been disallowed by |Core|. (Note: |port| 252 // is the peer port of the handle given to |WriteMessage()|.) 253 DCHECK_EQ(mp_transport.GetPort(), port); 254 return MOJO_RESULT_INVALID_ARGUMENT; 255 } 256 } 257 } 258 259 // Clone the dispatchers and attach them to the message. (This must be done as 260 // a separate loop, since we want to leave the dispatchers alone on failure.) 261 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); 262 dispatchers->reserve(transports->size()); 263 for (size_t i = 0; i < transports->size(); i++) { 264 if ((*transports)[i].is_valid()) { 265 dispatchers->push_back( 266 (*transports)[i].CreateEquivalentDispatcherAndClose()); 267 } else { 268 LOG(WARNING) << "Enqueueing null dispatcher"; 269 dispatchers->push_back(scoped_refptr<Dispatcher>()); 270 } 271 } 272 message->SetDispatchers(dispatchers.Pass()); 273 return MOJO_RESULT_OK; 274 } 275 276 MojoResult MessagePipe::HandleControlMessage( 277 unsigned /*port*/, 278 scoped_ptr<MessageInTransit> message) { 279 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " 280 << message->subtype(); 281 return MOJO_RESULT_UNKNOWN; 282 } 283 284 } // namespace system 285 } // namespace mojo 286