1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/message_pipe.h" 6 7 #include "base/logging.h" 8 #include "mojo/system/channel_endpoint.h" 9 #include "mojo/system/local_message_pipe_endpoint.h" 10 #include "mojo/system/message_in_transit.h" 11 #include "mojo/system/message_pipe_dispatcher.h" 12 #include "mojo/system/message_pipe_endpoint.h" 13 #include "mojo/system/proxy_message_pipe_endpoint.h" 14 15 namespace mojo { 16 namespace system { 17 18 // static 19 MessagePipe* MessagePipe::CreateLocalLocal() { 20 MessagePipe* message_pipe = new MessagePipe(); 21 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 22 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 23 return message_pipe; 24 } 25 26 // static 27 MessagePipe* MessagePipe::CreateLocalProxy( 28 scoped_refptr<ChannelEndpoint>* channel_endpoint) { 29 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. 30 MessagePipe* message_pipe = new MessagePipe(); 31 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 32 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); 33 message_pipe->endpoints_[1].reset( 34 new ProxyMessagePipeEndpoint(channel_endpoint->get())); 35 return message_pipe; 36 } 37 38 // static 39 MessagePipe* MessagePipe::CreateProxyLocal( 40 scoped_refptr<ChannelEndpoint>* channel_endpoint) { 41 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. 42 MessagePipe* message_pipe = new MessagePipe(); 43 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); 44 message_pipe->endpoints_[0].reset( 45 new ProxyMessagePipeEndpoint(channel_endpoint->get())); 46 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 47 return message_pipe; 48 } 49 50 // static 51 unsigned MessagePipe::GetPeerPort(unsigned port) { 52 DCHECK(port == 0 || port == 1); 53 return port ^ 1; 54 } 55 56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { 57 DCHECK(port == 0 || port == 1); 58 base::AutoLock locker(lock_); 59 DCHECK(endpoints_[port]); 60 61 return endpoints_[port]->GetType(); 62 } 63 64 void MessagePipe::CancelAllWaiters(unsigned port) { 65 DCHECK(port == 0 || port == 1); 66 67 base::AutoLock locker(lock_); 68 DCHECK(endpoints_[port]); 69 endpoints_[port]->CancelAllWaiters(); 70 } 71 72 void MessagePipe::Close(unsigned port) { 73 DCHECK(port == 0 || port == 1); 74 75 unsigned destination_port = GetPeerPort(port); 76 77 base::AutoLock locker(lock_); 78 DCHECK(endpoints_[port]); 79 80 endpoints_[port]->Close(); 81 if (endpoints_[destination_port]) { 82 if (!endpoints_[destination_port]->OnPeerClose()) 83 endpoints_[destination_port].reset(); 84 } 85 endpoints_[port].reset(); 86 } 87 88 // TODO(vtl): Handle flags. 89 MojoResult MessagePipe::WriteMessage( 90 unsigned port, 91 UserPointer<const void> bytes, 92 uint32_t num_bytes, 93 std::vector<DispatcherTransport>* transports, 94 MojoWriteMessageFlags flags) { 95 DCHECK(port == 0 || port == 1); 96 return EnqueueMessageInternal( 97 GetPeerPort(port), 98 make_scoped_ptr(new MessageInTransit( 99 MessageInTransit::kTypeMessagePipeEndpoint, 100 MessageInTransit::kSubtypeMessagePipeEndpointData, 101 num_bytes, 102 bytes)), 103 transports); 104 } 105 106 MojoResult MessagePipe::ReadMessage(unsigned port, 107 UserPointer<void> bytes, 108 UserPointer<uint32_t> num_bytes, 109 DispatcherVector* dispatchers, 110 uint32_t* num_dispatchers, 111 MojoReadMessageFlags flags) { 112 DCHECK(port == 0 || port == 1); 113 114 base::AutoLock locker(lock_); 115 DCHECK(endpoints_[port]); 116 117 return endpoints_[port]->ReadMessage( 118 bytes, num_bytes, dispatchers, num_dispatchers, flags); 119 } 120 121 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { 122 DCHECK(port == 0 || port == 1); 123 124 base::AutoLock locker(const_cast<base::Lock&>(lock_)); 125 DCHECK(endpoints_[port]); 126 127 return endpoints_[port]->GetHandleSignalsState(); 128 } 129 130 MojoResult MessagePipe::AddWaiter(unsigned port, 131 Waiter* waiter, 132 MojoHandleSignals signals, 133 uint32_t context, 134 HandleSignalsState* signals_state) { 135 DCHECK(port == 0 || port == 1); 136 137 base::AutoLock locker(lock_); 138 DCHECK(endpoints_[port]); 139 140 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state); 141 } 142 143 void MessagePipe::RemoveWaiter(unsigned port, 144 Waiter* waiter, 145 HandleSignalsState* signals_state) { 146 DCHECK(port == 0 || port == 1); 147 148 base::AutoLock locker(lock_); 149 DCHECK(endpoints_[port]); 150 151 endpoints_[port]->RemoveWaiter(waiter, signals_state); 152 } 153 154 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { 155 DCHECK(port == 0 || port == 1); 156 157 base::AutoLock locker(lock_); 158 DCHECK(endpoints_[port]); 159 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); 160 161 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a 162 // |MessagePipe| with two proxy endpoints, which will then act as a proxy 163 // (rather than trying to connect the two ends directly). 164 DLOG_IF(WARNING, 165 !!endpoints_[GetPeerPort(port)] && 166 endpoints_[GetPeerPort(port)]->GetType() != 167 MessagePipeEndpoint::kTypeLocal) 168 << "Direct message pipe passing across multiple channels not yet " 169 "implemented; will proxy"; 170 171 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass()); 172 scoped_refptr<ChannelEndpoint> channel_endpoint( 173 new ChannelEndpoint(this, port)); 174 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get())); 175 channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>( 176 old_endpoint.get())->message_queue()); 177 old_endpoint->Close(); 178 179 return channel_endpoint; 180 } 181 182 MojoResult MessagePipe::EnqueueMessage(unsigned port, 183 scoped_ptr<MessageInTransit> message) { 184 return EnqueueMessageInternal(port, message.Pass(), nullptr); 185 } 186 187 void MessagePipe::OnRemove(unsigned port) { 188 unsigned destination_port = GetPeerPort(port); 189 190 base::AutoLock locker(lock_); 191 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called. 192 if (!endpoints_[port]) 193 return; 194 195 if (endpoints_[destination_port]) { 196 if (!endpoints_[destination_port]->OnPeerClose()) 197 endpoints_[destination_port].reset(); 198 } 199 endpoints_[port].reset(); 200 } 201 202 MessagePipe::MessagePipe() { 203 } 204 205 MessagePipe::~MessagePipe() { 206 // Owned by the dispatchers. The owning dispatchers should only release us via 207 // their |Close()| method, which should inform us of being closed via our 208 // |Close()|. Thus these should already be null. 209 DCHECK(!endpoints_[0]); 210 DCHECK(!endpoints_[1]); 211 } 212 213 MojoResult MessagePipe::EnqueueMessageInternal( 214 unsigned port, 215 scoped_ptr<MessageInTransit> message, 216 std::vector<DispatcherTransport>* transports) { 217 DCHECK(port == 0 || port == 1); 218 DCHECK(message); 219 220 if (message->type() == MessageInTransit::kTypeMessagePipe) { 221 DCHECK(!transports); 222 return HandleControlMessage(port, message.Pass()); 223 } 224 225 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint); 226 227 base::AutoLock locker(lock_); 228 DCHECK(endpoints_[GetPeerPort(port)]); 229 230 // The destination port need not be open, unlike the source port. 231 if (!endpoints_[port]) 232 return MOJO_RESULT_FAILED_PRECONDITION; 233 234 if (transports) { 235 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); 236 if (result != MOJO_RESULT_OK) 237 return result; 238 } 239 240 // The endpoint's |EnqueueMessage()| may not report failure. 241 endpoints_[port]->EnqueueMessage(message.Pass()); 242 return MOJO_RESULT_OK; 243 } 244 245 MojoResult MessagePipe::AttachTransportsNoLock( 246 unsigned port, 247 MessageInTransit* message, 248 std::vector<DispatcherTransport>* transports) { 249 DCHECK(!message->has_dispatchers()); 250 251 // You're not allowed to send either handle to a message pipe over the message 252 // pipe, so check for this. (The case of trying to write a handle to itself is 253 // taken care of by |Core|. That case kind of makes sense, but leads to 254 // complications if, e.g., both sides try to do the same thing with their 255 // respective handles simultaneously. The other case, of trying to write the 256 // peer handle to a handle, doesn't make sense -- since no handle will be 257 // available to read the message from.) 258 for (size_t i = 0; i < transports->size(); i++) { 259 if (!(*transports)[i].is_valid()) 260 continue; 261 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) { 262 MessagePipeDispatcherTransport mp_transport((*transports)[i]); 263 if (mp_transport.GetMessagePipe() == this) { 264 // The other case should have been disallowed by |Core|. (Note: |port| 265 // is the peer port of the handle given to |WriteMessage()|.) 266 DCHECK_EQ(mp_transport.GetPort(), port); 267 return MOJO_RESULT_INVALID_ARGUMENT; 268 } 269 } 270 } 271 272 // Clone the dispatchers and attach them to the message. (This must be done as 273 // a separate loop, since we want to leave the dispatchers alone on failure.) 274 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); 275 dispatchers->reserve(transports->size()); 276 for (size_t i = 0; i < transports->size(); i++) { 277 if ((*transports)[i].is_valid()) { 278 dispatchers->push_back( 279 (*transports)[i].CreateEquivalentDispatcherAndClose()); 280 } else { 281 LOG(WARNING) << "Enqueueing null dispatcher"; 282 dispatchers->push_back(scoped_refptr<Dispatcher>()); 283 } 284 } 285 message->SetDispatchers(dispatchers.Pass()); 286 return MOJO_RESULT_OK; 287 } 288 289 MojoResult MessagePipe::HandleControlMessage( 290 unsigned /*port*/, 291 scoped_ptr<MessageInTransit> message) { 292 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " 293 << message->subtype(); 294 return MOJO_RESULT_UNKNOWN; 295 } 296 297 } // namespace system 298 } // namespace mojo 299