1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "ipc/ipc_channel_mojo.h" 6 7 #include <stddef.h> 8 #include <stdint.h> 9 10 #include <memory> 11 #include <utility> 12 13 #include "base/bind.h" 14 #include "base/bind_helpers.h" 15 #include "base/command_line.h" 16 #include "base/lazy_instance.h" 17 #include "base/macros.h" 18 #include "base/memory/ptr_util.h" 19 #include "base/process/process_handle.h" 20 #include "base/threading/thread_task_runner_handle.h" 21 #include "build/build_config.h" 22 #include "ipc/ipc_listener.h" 23 #include "ipc/ipc_logging.h" 24 #include "ipc/ipc_message_attachment_set.h" 25 #include "ipc/ipc_message_macros.h" 26 #include "ipc/ipc_mojo_bootstrap.h" 27 #include "ipc/ipc_mojo_handle_attachment.h" 28 #include "ipc/native_handle_type_converters.h" 29 #include "mojo/public/cpp/bindings/binding.h" 30 #include "mojo/public/cpp/system/platform_handle.h" 31 32 namespace IPC { 33 34 namespace { 35 36 class MojoChannelFactory : public ChannelFactory { 37 public: 38 MojoChannelFactory( 39 mojo::ScopedMessagePipeHandle handle, 40 Channel::Mode mode, 41 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 42 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) 43 : handle_(std::move(handle)), 44 mode_(mode), 45 ipc_task_runner_(ipc_task_runner), 46 proxy_task_runner_(proxy_task_runner) {} 47 48 std::unique_ptr<Channel> BuildChannel(Listener* listener) override { 49 return ChannelMojo::Create(std::move(handle_), mode_, listener, 50 ipc_task_runner_, proxy_task_runner_); 51 } 52 53 scoped_refptr<base::SingleThreadTaskRunner> GetIPCTaskRunner() override { 54 return ipc_task_runner_; 55 } 56 57 private: 58 mojo::ScopedMessagePipeHandle handle_; 59 const Channel::Mode mode_; 60 scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner_; 61 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_; 62 63 DISALLOW_COPY_AND_ASSIGN(MojoChannelFactory); 64 }; 65 66 base::ProcessId GetSelfPID() { 67 #if defined(OS_LINUX) 68 if (int global_pid = Channel::GetGlobalPid()) 69 return global_pid; 70 #endif // OS_LINUX 71 #if defined(OS_NACL) 72 return -1; 73 #else 74 return base::GetCurrentProcId(); 75 #endif // defined(OS_NACL) 76 } 77 78 } // namespace 79 80 //------------------------------------------------------------------------------ 81 82 // static 83 std::unique_ptr<ChannelMojo> ChannelMojo::Create( 84 mojo::ScopedMessagePipeHandle handle, 85 Mode mode, 86 Listener* listener, 87 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 88 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) { 89 return base::WrapUnique(new ChannelMojo(std::move(handle), mode, listener, 90 ipc_task_runner, proxy_task_runner)); 91 } 92 93 // static 94 std::unique_ptr<ChannelFactory> ChannelMojo::CreateServerFactory( 95 mojo::ScopedMessagePipeHandle handle, 96 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 97 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) { 98 return std::make_unique<MojoChannelFactory>( 99 std::move(handle), Channel::MODE_SERVER, ipc_task_runner, 100 proxy_task_runner); 101 } 102 103 // static 104 std::unique_ptr<ChannelFactory> ChannelMojo::CreateClientFactory( 105 mojo::ScopedMessagePipeHandle handle, 106 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 107 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) { 108 return std::make_unique<MojoChannelFactory>( 109 std::move(handle), Channel::MODE_CLIENT, ipc_task_runner, 110 proxy_task_runner); 111 } 112 113 ChannelMojo::ChannelMojo( 114 mojo::ScopedMessagePipeHandle handle, 115 Mode mode, 116 Listener* listener, 117 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 118 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) 119 : task_runner_(ipc_task_runner), 120 pipe_(handle.get()), 121 listener_(listener), 122 weak_factory_(this) { 123 weak_ptr_ = weak_factory_.GetWeakPtr(); 124 bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, ipc_task_runner, 125 proxy_task_runner); 126 } 127 128 void ChannelMojo::ForwardMessageFromThreadSafePtr(mojo::Message message) { 129 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 130 if (!message_reader_ || !message_reader_->sender().is_bound()) 131 return; 132 message_reader_->sender().internal_state()->ForwardMessage( 133 std::move(message)); 134 } 135 136 void ChannelMojo::ForwardMessageWithResponderFromThreadSafePtr( 137 mojo::Message message, 138 std::unique_ptr<mojo::MessageReceiver> responder) { 139 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 140 if (!message_reader_ || !message_reader_->sender().is_bound()) 141 return; 142 message_reader_->sender().internal_state()->ForwardMessageWithResponder( 143 std::move(message), std::move(responder)); 144 } 145 146 ChannelMojo::~ChannelMojo() { 147 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 148 Close(); 149 } 150 151 bool ChannelMojo::Connect() { 152 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 153 154 WillConnect(); 155 156 mojom::ChannelAssociatedPtr sender; 157 mojom::ChannelAssociatedRequest receiver; 158 bootstrap_->Connect(&sender, &receiver); 159 160 DCHECK(!message_reader_); 161 sender->SetPeerPid(GetSelfPID()); 162 message_reader_.reset(new internal::MessagePipeReader( 163 pipe_, std::move(sender), std::move(receiver), this)); 164 return true; 165 } 166 167 void ChannelMojo::Pause() { 168 bootstrap_->Pause(); 169 } 170 171 void ChannelMojo::Unpause(bool flush) { 172 bootstrap_->Unpause(); 173 if (flush) 174 Flush(); 175 } 176 177 void ChannelMojo::Flush() { 178 bootstrap_->Flush(); 179 } 180 181 void ChannelMojo::Close() { 182 // NOTE: The MessagePipeReader's destructor may re-enter this function. Use 183 // caution when changing this method. 184 std::unique_ptr<internal::MessagePipeReader> reader = 185 std::move(message_reader_); 186 reader.reset(); 187 188 base::AutoLock lock(associated_interface_lock_); 189 associated_interfaces_.clear(); 190 } 191 192 void ChannelMojo::OnPipeError() { 193 DCHECK(task_runner_); 194 if (task_runner_->RunsTasksInCurrentSequence()) { 195 listener_->OnChannelError(); 196 } else { 197 task_runner_->PostTask(FROM_HERE, 198 base::Bind(&ChannelMojo::OnPipeError, weak_ptr_)); 199 } 200 } 201 202 void ChannelMojo::OnAssociatedInterfaceRequest( 203 const std::string& name, 204 mojo::ScopedInterfaceEndpointHandle handle) { 205 GenericAssociatedInterfaceFactory factory; 206 { 207 base::AutoLock locker(associated_interface_lock_); 208 auto iter = associated_interfaces_.find(name); 209 if (iter != associated_interfaces_.end()) 210 factory = iter->second; 211 } 212 213 if (!factory.is_null()) 214 factory.Run(std::move(handle)); 215 else 216 listener_->OnAssociatedInterfaceRequest(name, std::move(handle)); 217 } 218 219 bool ChannelMojo::Send(Message* message) { 220 DVLOG(2) << "sending message @" << message << " on channel @" << this 221 << " with type " << message->type(); 222 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED) 223 Logging::GetInstance()->OnSendMessage(message); 224 #endif 225 226 std::unique_ptr<Message> scoped_message = base::WrapUnique(message); 227 if (!message_reader_) 228 return false; 229 230 // Comment copied from ipc_channel_posix.cc: 231 // We can't close the pipe here, because calling OnChannelError may destroy 232 // this object, and that would be bad if we are called from Send(). Instead, 233 // we return false and hope the caller will close the pipe. If they do not, 234 // the pipe will still be closed next time OnFileCanReadWithoutBlocking is 235 // called. 236 // 237 // With Mojo, there's no OnFileCanReadWithoutBlocking, but we expect the 238 // pipe's connection error handler will be invoked in its place. 239 return message_reader_->Send(std::move(scoped_message)); 240 } 241 242 Channel::AssociatedInterfaceSupport* 243 ChannelMojo::GetAssociatedInterfaceSupport() { return this; } 244 245 std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>> 246 ChannelMojo::CreateThreadSafeChannel() { 247 return std::make_unique<mojo::ThreadSafeForwarder<mojom::Channel>>( 248 task_runner_, 249 base::Bind(&ChannelMojo::ForwardMessageFromThreadSafePtr, weak_ptr_), 250 base::Bind(&ChannelMojo::ForwardMessageWithResponderFromThreadSafePtr, 251 weak_ptr_), 252 *bootstrap_->GetAssociatedGroup()); 253 } 254 255 void ChannelMojo::OnPeerPidReceived(int32_t peer_pid) { 256 listener_->OnChannelConnected(peer_pid); 257 } 258 259 void ChannelMojo::OnMessageReceived(const Message& message) { 260 TRACE_EVENT2("ipc,toplevel", "ChannelMojo::OnMessageReceived", 261 "class", IPC_MESSAGE_ID_CLASS(message.type()), 262 "line", IPC_MESSAGE_ID_LINE(message.type())); 263 listener_->OnMessageReceived(message); 264 if (message.dispatch_error()) 265 listener_->OnBadMessageReceived(message); 266 } 267 268 void ChannelMojo::OnBrokenDataReceived() { 269 listener_->OnBadMessageReceived(Message()); 270 } 271 272 // static 273 MojoResult ChannelMojo::ReadFromMessageAttachmentSet( 274 Message* message, 275 base::Optional<std::vector<mojo::native::SerializedHandlePtr>>* handles) { 276 DCHECK(!*handles); 277 278 MojoResult result = MOJO_RESULT_OK; 279 if (!message->HasAttachments()) 280 return result; 281 282 std::vector<mojo::native::SerializedHandlePtr> output_handles; 283 MessageAttachmentSet* set = message->attachment_set(); 284 285 for (unsigned i = 0; result == MOJO_RESULT_OK && i < set->size(); ++i) { 286 auto attachment = set->GetAttachmentAt(i); 287 auto serialized_handle = mojo::native::SerializedHandle::New(); 288 serialized_handle->the_handle = attachment->TakeMojoHandle(); 289 serialized_handle->type = 290 mojo::ConvertTo<mojo::native::SerializedHandle::Type>( 291 attachment->GetType()); 292 output_handles.emplace_back(std::move(serialized_handle)); 293 } 294 set->CommitAllDescriptors(); 295 296 if (!output_handles.empty()) 297 *handles = std::move(output_handles); 298 299 return result; 300 } 301 302 // static 303 MojoResult ChannelMojo::WriteToMessageAttachmentSet( 304 base::Optional<std::vector<mojo::native::SerializedHandlePtr>> handles, 305 Message* message) { 306 if (!handles) 307 return MOJO_RESULT_OK; 308 for (size_t i = 0; i < handles->size(); ++i) { 309 auto& handle = handles->at(i); 310 scoped_refptr<MessageAttachment> unwrapped_attachment = 311 MessageAttachment::CreateFromMojoHandle( 312 std::move(handle->the_handle), 313 mojo::ConvertTo<MessageAttachment::Type>(handle->type)); 314 if (!unwrapped_attachment) { 315 DLOG(WARNING) << "Pipe failed to unwrap handles."; 316 return MOJO_RESULT_UNKNOWN; 317 } 318 319 bool ok = message->attachment_set()->AddAttachment( 320 std::move(unwrapped_attachment)); 321 DCHECK(ok); 322 if (!ok) { 323 LOG(ERROR) << "Failed to add new Mojo handle."; 324 return MOJO_RESULT_UNKNOWN; 325 } 326 } 327 return MOJO_RESULT_OK; 328 } 329 330 void ChannelMojo::AddGenericAssociatedInterface( 331 const std::string& name, 332 const GenericAssociatedInterfaceFactory& factory) { 333 base::AutoLock locker(associated_interface_lock_); 334 auto result = associated_interfaces_.insert({ name, factory }); 335 DCHECK(result.second); 336 } 337 338 void ChannelMojo::GetGenericRemoteAssociatedInterface( 339 const std::string& name, 340 mojo::ScopedInterfaceEndpointHandle handle) { 341 if (message_reader_) { 342 message_reader_->GetRemoteInterface(name, std::move(handle)); 343 } else { 344 // Attach the associated interface to a disconnected pipe, so that the 345 // associated interface pointer can be used to make calls (which are 346 // dropped). 347 mojo::AssociateWithDisconnectedPipe(std::move(handle)); 348 } 349 } 350 351 } // namespace IPC 352