1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/embedder/embedder.h" 6 7 #include "base/bind.h" 8 #include "base/location.h" 9 #include "base/logging.h" 10 #include "base/memory/scoped_ptr.h" 11 #include "mojo/embedder/platform_support.h" 12 #include "mojo/system/channel.h" 13 #include "mojo/system/channel_endpoint.h" 14 #include "mojo/system/core.h" 15 #include "mojo/system/entrypoints.h" 16 #include "mojo/system/message_in_transit.h" 17 #include "mojo/system/message_pipe_dispatcher.h" 18 #include "mojo/system/platform_handle_dispatcher.h" 19 #include "mojo/system/raw_channel.h" 20 21 namespace mojo { 22 namespace embedder { 23 24 // This is defined here (instead of a header file), since it's opaque to the 25 // outside world. But we need to define it before our (internal-only) functions 26 // that use it. 27 struct ChannelInfo { 28 ChannelInfo() {} 29 ~ChannelInfo() {} 30 31 scoped_refptr<system::Channel> channel; 32 33 // May be null, in which case |DestroyChannelOnIOThread()| must be used (from 34 // the IO thread), instead of |DestroyChannel()|. 35 scoped_refptr<base::TaskRunner> io_thread_task_runner; 36 }; 37 38 namespace { 39 40 // Helper for |CreateChannel...()|. (Note: May return null for some failures.) 41 scoped_refptr<system::Channel> MakeChannel( 42 system::Core* core, 43 ScopedPlatformHandle platform_handle, 44 scoped_refptr<system::ChannelEndpoint> channel_endpoint) { 45 DCHECK(platform_handle.is_valid()); 46 47 // Create and initialize a |system::Channel|. 48 scoped_refptr<system::Channel> channel = 49 new system::Channel(core->platform_support()); 50 if (!channel->Init(system::RawChannel::Create(platform_handle.Pass()))) { 51 // This is very unusual (e.g., maybe |platform_handle| was invalid or we 52 // reached some system resource limit). 53 LOG(ERROR) << "Channel::Init() failed"; 54 // Return null, since |Shutdown()| shouldn't be called in this case. 55 return scoped_refptr<system::Channel>(); 56 } 57 // Once |Init()| has succeeded, we have to return |channel| (since 58 // |Shutdown()| will have to be called on it). 59 60 // Attach the endpoint. 61 system::MessageInTransit::EndpointId endpoint_id = 62 channel->AttachEndpoint(channel_endpoint); 63 if (endpoint_id == system::MessageInTransit::kInvalidEndpointId) { 64 // This means that, e.g., the other endpoint of the message pipe was closed 65 // first. But it's not necessarily an error per se. 66 DVLOG(2) << "Channel::AttachEndpoint() failed"; 67 return channel; 68 } 69 CHECK_EQ(endpoint_id, system::Channel::kBootstrapEndpointId); 70 71 if (!channel->RunMessagePipeEndpoint(system::Channel::kBootstrapEndpointId, 72 system::Channel::kBootstrapEndpointId)) { 73 // Currently, there's no reason for this to fail. 74 NOTREACHED() << "Channel::RunMessagePipeEndpoint() failed"; 75 return channel; 76 } 77 78 return channel; 79 } 80 81 void CreateChannelHelper( 82 system::Core* core, 83 ScopedPlatformHandle platform_handle, 84 scoped_ptr<ChannelInfo> channel_info, 85 scoped_refptr<system::ChannelEndpoint> channel_endpoint, 86 DidCreateChannelCallback callback, 87 scoped_refptr<base::TaskRunner> callback_thread_task_runner) { 88 channel_info->channel = 89 MakeChannel(core, platform_handle.Pass(), channel_endpoint); 90 91 // Hand the channel back to the embedder. 92 if (callback_thread_task_runner.get()) { 93 callback_thread_task_runner->PostTask( 94 FROM_HERE, base::Bind(callback, channel_info.release())); 95 } else { 96 callback.Run(channel_info.release()); 97 } 98 } 99 100 } // namespace 101 102 void Init(scoped_ptr<PlatformSupport> platform_support) { 103 system::entrypoints::SetCore(new system::Core(platform_support.Pass())); 104 } 105 106 // TODO(vtl): Write tests for this. 107 ScopedMessagePipeHandle CreateChannelOnIOThread( 108 ScopedPlatformHandle platform_handle, 109 ChannelInfo** channel_info) { 110 DCHECK(platform_handle.is_valid()); 111 DCHECK(channel_info); 112 113 scoped_refptr<system::ChannelEndpoint> channel_endpoint; 114 scoped_refptr<system::MessagePipeDispatcher> dispatcher = 115 system::MessagePipeDispatcher::CreateRemoteMessagePipe(&channel_endpoint); 116 117 system::Core* core = system::entrypoints::GetCore(); 118 DCHECK(core); 119 ScopedMessagePipeHandle rv( 120 MessagePipeHandle(core->AddDispatcher(dispatcher))); 121 122 *channel_info = new ChannelInfo(); 123 (*channel_info)->channel = 124 MakeChannel(core, platform_handle.Pass(), channel_endpoint); 125 126 return rv.Pass(); 127 } 128 129 ScopedMessagePipeHandle CreateChannel( 130 ScopedPlatformHandle platform_handle, 131 scoped_refptr<base::TaskRunner> io_thread_task_runner, 132 DidCreateChannelCallback callback, 133 scoped_refptr<base::TaskRunner> callback_thread_task_runner) { 134 DCHECK(platform_handle.is_valid()); 135 136 scoped_refptr<system::ChannelEndpoint> channel_endpoint; 137 scoped_refptr<system::MessagePipeDispatcher> dispatcher = 138 system::MessagePipeDispatcher::CreateRemoteMessagePipe(&channel_endpoint); 139 140 system::Core* core = system::entrypoints::GetCore(); 141 DCHECK(core); 142 ScopedMessagePipeHandle rv( 143 MessagePipeHandle(core->AddDispatcher(dispatcher))); 144 145 scoped_ptr<ChannelInfo> channel_info(new ChannelInfo()); 146 channel_info->io_thread_task_runner = io_thread_task_runner; 147 148 if (rv.is_valid()) { 149 io_thread_task_runner->PostTask(FROM_HERE, 150 base::Bind(&CreateChannelHelper, 151 base::Unretained(core), 152 base::Passed(&platform_handle), 153 base::Passed(&channel_info), 154 channel_endpoint, 155 callback, 156 callback_thread_task_runner)); 157 } else { 158 (callback_thread_task_runner.get() ? callback_thread_task_runner 159 : io_thread_task_runner) 160 ->PostTask(FROM_HERE, base::Bind(callback, channel_info.release())); 161 } 162 163 return rv.Pass(); 164 } 165 166 void DestroyChannelOnIOThread(ChannelInfo* channel_info) { 167 DCHECK(channel_info); 168 if (!channel_info->channel.get()) { 169 // Presumably, |Init()| on the channel failed. 170 return; 171 } 172 173 channel_info->channel->Shutdown(); 174 delete channel_info; 175 } 176 177 // TODO(vtl): Write tests for this. 178 void DestroyChannel(ChannelInfo* channel_info) { 179 DCHECK(channel_info); 180 DCHECK(channel_info->io_thread_task_runner.get()); 181 182 if (!channel_info->channel.get()) { 183 // Presumably, |Init()| on the channel failed. 184 return; 185 } 186 187 channel_info->channel->WillShutdownSoon(); 188 channel_info->io_thread_task_runner->PostTask( 189 FROM_HERE, base::Bind(&DestroyChannelOnIOThread, channel_info)); 190 } 191 192 void WillDestroyChannelSoon(ChannelInfo* channel_info) { 193 DCHECK(channel_info); 194 channel_info->channel->WillShutdownSoon(); 195 } 196 197 MojoResult CreatePlatformHandleWrapper( 198 ScopedPlatformHandle platform_handle, 199 MojoHandle* platform_handle_wrapper_handle) { 200 DCHECK(platform_handle_wrapper_handle); 201 202 scoped_refptr<system::Dispatcher> dispatcher( 203 new system::PlatformHandleDispatcher(platform_handle.Pass())); 204 205 system::Core* core = system::entrypoints::GetCore(); 206 DCHECK(core); 207 MojoHandle h = core->AddDispatcher(dispatcher); 208 if (h == MOJO_HANDLE_INVALID) { 209 LOG(ERROR) << "Handle table full"; 210 dispatcher->Close(); 211 return MOJO_RESULT_RESOURCE_EXHAUSTED; 212 } 213 214 *platform_handle_wrapper_handle = h; 215 return MOJO_RESULT_OK; 216 } 217 218 MojoResult PassWrappedPlatformHandle(MojoHandle platform_handle_wrapper_handle, 219 ScopedPlatformHandle* platform_handle) { 220 DCHECK(platform_handle); 221 222 system::Core* core = system::entrypoints::GetCore(); 223 DCHECK(core); 224 scoped_refptr<system::Dispatcher> dispatcher( 225 core->GetDispatcher(platform_handle_wrapper_handle)); 226 if (!dispatcher.get()) 227 return MOJO_RESULT_INVALID_ARGUMENT; 228 229 if (dispatcher->GetType() != system::Dispatcher::kTypePlatformHandle) 230 return MOJO_RESULT_INVALID_ARGUMENT; 231 232 *platform_handle = 233 static_cast<system::PlatformHandleDispatcher*>(dispatcher.get()) 234 ->PassPlatformHandle() 235 .Pass(); 236 return MOJO_RESULT_OK; 237 } 238 239 } // namespace embedder 240 } // namespace mojo 241