1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/message_pipe.h" 6 7 #include "base/logging.h" 8 #include "mojo/system/channel.h" 9 #include "mojo/system/dispatcher.h" 10 #include "mojo/system/local_message_pipe_endpoint.h" 11 #include "mojo/system/message_in_transit.h" 12 #include "mojo/system/message_pipe_endpoint.h" 13 #include "mojo/system/proxy_message_pipe_endpoint.h" 14 15 namespace mojo { 16 namespace system { 17 18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0, 19 scoped_ptr<MessagePipeEndpoint> endpoint_1) { 20 endpoints_[0].reset(endpoint_0.release()); 21 endpoints_[1].reset(endpoint_1.release()); 22 } 23 24 MessagePipe::MessagePipe() { 25 endpoints_[0].reset(new LocalMessagePipeEndpoint()); 26 endpoints_[1].reset(new LocalMessagePipeEndpoint()); 27 } 28 29 // static 30 unsigned MessagePipe::GetPeerPort(unsigned port) { 31 DCHECK(port == 0 || port == 1); 32 return port ^ 1; 33 } 34 35 void MessagePipe::CancelAllWaiters(unsigned port) { 36 DCHECK(port == 0 || port == 1); 37 38 base::AutoLock locker(lock_); 39 DCHECK(endpoints_[port].get()); 40 endpoints_[port]->CancelAllWaiters(); 41 } 42 43 void MessagePipe::Close(unsigned port) { 44 DCHECK(port == 0 || port == 1); 45 46 unsigned destination_port = GetPeerPort(port); 47 48 base::AutoLock locker(lock_); 49 DCHECK(endpoints_[port].get()); 50 51 endpoints_[port]->Close(); 52 bool should_destroy_destination = endpoints_[destination_port].get() ? 53 !endpoints_[destination_port]->OnPeerClose() : false; 54 55 endpoints_[port].reset(); 56 if (should_destroy_destination) { 57 endpoints_[destination_port]->Close(); 58 endpoints_[destination_port].reset(); 59 } 60 } 61 62 // TODO(vtl): Support sending handles. 63 // TODO(vtl): Handle flags. 64 MojoResult MessagePipe::WriteMessage( 65 unsigned port, 66 const void* bytes, uint32_t num_bytes, 67 const std::vector<Dispatcher*>* dispatchers, 68 MojoWriteMessageFlags flags) { 69 DCHECK(port == 0 || port == 1); 70 return EnqueueMessage( 71 GetPeerPort(port), 72 MessageInTransit::Create( 73 MessageInTransit::kTypeMessagePipeEndpoint, 74 MessageInTransit::kSubtypeMessagePipeEndpointData, 75 bytes, num_bytes), 76 dispatchers); 77 } 78 79 MojoResult MessagePipe::ReadMessage( 80 unsigned port, 81 void* bytes, uint32_t* num_bytes, 82 std::vector<scoped_refptr<Dispatcher> >* dispatchers, 83 uint32_t* num_dispatchers, 84 MojoReadMessageFlags flags) { 85 DCHECK(port == 0 || port == 1); 86 87 base::AutoLock locker(lock_); 88 DCHECK(endpoints_[port].get()); 89 90 return endpoints_[port]->ReadMessage(bytes, num_bytes, 91 dispatchers, num_dispatchers, 92 flags); 93 } 94 95 MojoResult MessagePipe::AddWaiter(unsigned port, 96 Waiter* waiter, 97 MojoWaitFlags flags, 98 MojoResult wake_result) { 99 DCHECK(port == 0 || port == 1); 100 101 base::AutoLock locker(lock_); 102 DCHECK(endpoints_[port].get()); 103 104 return endpoints_[port]->AddWaiter(waiter, flags, wake_result); 105 } 106 107 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { 108 DCHECK(port == 0 || port == 1); 109 110 base::AutoLock locker(lock_); 111 DCHECK(endpoints_[port].get()); 112 113 endpoints_[port]->RemoveWaiter(waiter); 114 } 115 116 MojoResult MessagePipe::EnqueueMessage( 117 unsigned port, 118 MessageInTransit* message, 119 const std::vector<Dispatcher*>* dispatchers) { 120 DCHECK(port == 0 || port == 1); 121 DCHECK(message); 122 123 if (message->type() == MessageInTransit::kTypeMessagePipe) { 124 DCHECK(!dispatchers); 125 return HandleControlMessage(port, message); 126 } 127 128 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint); 129 130 base::AutoLock locker(lock_); 131 DCHECK(endpoints_[GetPeerPort(port)].get()); 132 133 // The destination port need not be open, unlike the source port. 134 if (!endpoints_[port].get()) { 135 message->Destroy(); 136 return MOJO_RESULT_FAILED_PRECONDITION; 137 } 138 139 MojoResult result = endpoints_[port]->CanEnqueueMessage(message, dispatchers); 140 if (result != MOJO_RESULT_OK) { 141 message->Destroy(); 142 return result; 143 } 144 145 if (dispatchers) { 146 DCHECK(!dispatchers->empty()); 147 148 std::vector<scoped_refptr<Dispatcher> > replacement_dispatchers; 149 for (size_t i = 0; i < dispatchers->size(); i++) { 150 replacement_dispatchers.push_back( 151 (*dispatchers)[i]->CreateEquivalentDispatcherAndCloseNoLock()); 152 } 153 154 endpoints_[port]->EnqueueMessage(message, &replacement_dispatchers); 155 } else { 156 endpoints_[port]->EnqueueMessage(message, NULL); 157 } 158 159 return MOJO_RESULT_OK; 160 } 161 162 void MessagePipe::Attach(unsigned port, 163 scoped_refptr<Channel> channel, 164 MessageInTransit::EndpointId local_id) { 165 DCHECK(port == 0 || port == 1); 166 DCHECK(channel.get()); 167 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 168 169 base::AutoLock locker(lock_); 170 DCHECK(endpoints_[port].get()); 171 172 endpoints_[port]->Attach(channel, local_id); 173 } 174 175 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) { 176 DCHECK(port == 0 || port == 1); 177 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); 178 179 base::AutoLock locker(lock_); 180 DCHECK(endpoints_[port].get()); 181 182 if (!endpoints_[port]->Run(remote_id)) { 183 endpoints_[port]->Close(); 184 endpoints_[port].reset(); 185 } 186 } 187 188 MessagePipe::~MessagePipe() { 189 // Owned by the dispatchers. The owning dispatchers should only release us via 190 // their |Close()| method, which should inform us of being closed via our 191 // |Close()|. Thus these should already be null. 192 DCHECK(!endpoints_[0].get()); 193 DCHECK(!endpoints_[1].get()); 194 } 195 196 MojoResult MessagePipe::HandleControlMessage(unsigned port, 197 MessageInTransit* message) { 198 DCHECK(port == 0 || port == 1); 199 DCHECK(message); 200 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe); 201 202 MojoResult rv = MOJO_RESULT_OK; 203 switch (message->subtype()) { 204 case MessageInTransit::kSubtypeMessagePipePeerClosed: { 205 unsigned source_port = GetPeerPort(port); 206 207 base::AutoLock locker(lock_); 208 DCHECK(endpoints_[source_port].get()); 209 210 endpoints_[source_port]->Close(); 211 if (endpoints_[port].get()) 212 endpoints_[port]->OnPeerClose(); 213 214 endpoints_[source_port].reset(); 215 break; 216 } 217 default: 218 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " 219 << message->subtype(); 220 rv = MOJO_RESULT_UNKNOWN; 221 break; 222 } 223 224 message->Destroy(); 225 return rv; 226 } 227 228 } // namespace system 229 } // namespace mojo 230