1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "content/child/webmessageportchannel_impl.h" 6 7 #include "base/bind.h" 8 #include "base/message_loop/message_loop_proxy.h" 9 #include "content/child/child_process.h" 10 #include "content/child/child_thread.h" 11 #include "content/common/message_port_messages.h" 12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h" 13 #include "third_party/WebKit/public/platform/WebString.h" 14 15 using blink::WebMessagePortChannel; 16 using blink::WebMessagePortChannelArray; 17 using blink::WebMessagePortChannelClient; 18 using blink::WebString; 19 20 namespace content { 21 22 WebMessagePortChannelImpl::WebMessagePortChannelImpl( 23 const scoped_refptr<base::MessageLoopProxy>& child_thread_loop) 24 : client_(NULL), 25 route_id_(MSG_ROUTING_NONE), 26 message_port_id_(MSG_ROUTING_NONE), 27 child_thread_loop_(child_thread_loop) { 28 AddRef(); 29 Init(); 30 } 31 32 WebMessagePortChannelImpl::WebMessagePortChannelImpl( 33 int route_id, 34 int message_port_id, 35 const scoped_refptr<base::MessageLoopProxy>& child_thread_loop) 36 : client_(NULL), 37 route_id_(route_id), 38 message_port_id_(message_port_id), 39 child_thread_loop_(child_thread_loop) { 40 AddRef(); 41 Init(); 42 } 43 44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() { 45 // If we have any queued messages with attached ports, manually destroy them. 46 while (!message_queue_.empty()) { 47 const std::vector<WebMessagePortChannelImpl*>& channel_array = 48 message_queue_.front().ports; 49 for (size_t i = 0; i < channel_array.size(); i++) { 50 channel_array[i]->destroy(); 51 } 52 message_queue_.pop(); 53 } 54 55 if (message_port_id_ != MSG_ROUTING_NONE) 56 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_)); 57 58 if (route_id_ != MSG_ROUTING_NONE) 59 ChildThread::current()->GetRouter()->RemoveRoute(route_id_); 60 } 61 62 // static 63 void WebMessagePortChannelImpl::CreatePair( 64 const scoped_refptr<base::MessageLoopProxy>& child_thread_loop, 65 blink::WebMessagePortChannel** channel1, 66 blink::WebMessagePortChannel** channel2) { 67 WebMessagePortChannelImpl* impl1 = 68 new WebMessagePortChannelImpl(child_thread_loop); 69 WebMessagePortChannelImpl* impl2 = 70 new WebMessagePortChannelImpl(child_thread_loop); 71 72 impl1->Entangle(impl2); 73 impl2->Entangle(impl1); 74 75 *channel1 = impl1; 76 *channel2 = impl2; 77 } 78 79 // static 80 std::vector<int> WebMessagePortChannelImpl::ExtractMessagePortIDs( 81 WebMessagePortChannelArray* channels) { 82 std::vector<int> message_port_ids; 83 if (channels) { 84 message_port_ids.resize(channels->size()); 85 // Extract the port IDs from the source array, then free it. 86 for (size_t i = 0; i < channels->size(); ++i) { 87 WebMessagePortChannelImpl* webchannel = 88 static_cast<WebMessagePortChannelImpl*>((*channels)[i]); 89 // The message port ids might not be set up yet if this channel 90 // wasn't created on the main thread. 91 DCHECK(webchannel->child_thread_loop_->BelongsToCurrentThread()); 92 message_port_ids[i] = webchannel->message_port_id(); 93 webchannel->QueueMessages(); 94 DCHECK(message_port_ids[i] != MSG_ROUTING_NONE); 95 } 96 delete channels; 97 } 98 return message_port_ids; 99 } 100 101 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) { 102 // Must lock here since client_ is called on the main thread. 103 base::AutoLock auto_lock(lock_); 104 client_ = client; 105 } 106 107 void WebMessagePortChannelImpl::destroy() { 108 setClient(NULL); 109 110 // Release the object on the main thread, since the destructor might want to 111 // send an IPC, and that has to happen on the main thread. 112 child_thread_loop_->ReleaseSoon(FROM_HERE, this); 113 } 114 115 void WebMessagePortChannelImpl::postMessage( 116 const WebString& message, 117 WebMessagePortChannelArray* channels) { 118 if (!child_thread_loop_->BelongsToCurrentThread()) { 119 child_thread_loop_->PostTask( 120 FROM_HERE, 121 base::Bind( 122 &WebMessagePortChannelImpl::PostMessage, this, 123 static_cast<base::string16>(message), channels)); 124 } else { 125 PostMessage(message, channels); 126 } 127 } 128 129 void WebMessagePortChannelImpl::PostMessage( 130 const base::string16& message, 131 WebMessagePortChannelArray* channels) { 132 IPC::Message* msg = new MessagePortHostMsg_PostMessage( 133 message_port_id_, message, ExtractMessagePortIDs(channels)); 134 Send(msg); 135 } 136 137 bool WebMessagePortChannelImpl::tryGetMessage( 138 WebString* message, 139 WebMessagePortChannelArray& channels) { 140 base::AutoLock auto_lock(lock_); 141 if (message_queue_.empty()) 142 return false; 143 144 *message = message_queue_.front().message; 145 const std::vector<WebMessagePortChannelImpl*>& channel_array = 146 message_queue_.front().ports; 147 WebMessagePortChannelArray result_ports(channel_array.size()); 148 for (size_t i = 0; i < channel_array.size(); i++) { 149 result_ports[i] = channel_array[i]; 150 } 151 152 channels.swap(result_ports); 153 message_queue_.pop(); 154 return true; 155 } 156 157 void WebMessagePortChannelImpl::Init() { 158 if (!child_thread_loop_->BelongsToCurrentThread()) { 159 child_thread_loop_->PostTask( 160 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this)); 161 return; 162 } 163 164 if (route_id_ == MSG_ROUTING_NONE) { 165 DCHECK(message_port_id_ == MSG_ROUTING_NONE); 166 Send(new MessagePortHostMsg_CreateMessagePort( 167 &route_id_, &message_port_id_)); 168 } 169 170 ChildThread::current()->GetRouter()->AddRoute(route_id_, this); 171 } 172 173 void WebMessagePortChannelImpl::Entangle( 174 scoped_refptr<WebMessagePortChannelImpl> channel) { 175 // The message port ids might not be set up yet, if this channel wasn't 176 // created on the main thread. So need to wait until we're on the main thread 177 // before getting the other message port id. 178 if (!child_thread_loop_->BelongsToCurrentThread()) { 179 child_thread_loop_->PostTask( 180 FROM_HERE, 181 base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel)); 182 return; 183 } 184 185 Send(new MessagePortHostMsg_Entangle( 186 message_port_id_, channel->message_port_id())); 187 } 188 189 void WebMessagePortChannelImpl::QueueMessages() { 190 if (!child_thread_loop_->BelongsToCurrentThread()) { 191 child_thread_loop_->PostTask( 192 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this)); 193 return; 194 } 195 // This message port is being sent elsewhere (perhaps to another process). 196 // The new endpoint needs to receive the queued messages, including ones that 197 // could still be in-flight. So we tell the browser to queue messages, and it 198 // sends us an ack, whose receipt we know means that no more messages are 199 // in-flight. We then send the queued messages to the browser, which prepends 200 // them to the ones it queued and it sends them to the new endpoint. 201 Send(new MessagePortHostMsg_QueueMessages(message_port_id_)); 202 203 // The process could potentially go away while we're still waiting for 204 // in-flight messages. Ensure it stays alive. 205 ChildProcess::current()->AddRefProcess(); 206 } 207 208 void WebMessagePortChannelImpl::Send(IPC::Message* message) { 209 if (!child_thread_loop_->BelongsToCurrentThread()) { 210 DCHECK(!message->is_sync()); 211 child_thread_loop_->PostTask( 212 FROM_HERE, 213 base::Bind(&WebMessagePortChannelImpl::Send, this, message)); 214 return; 215 } 216 217 ChildThread::current()->GetRouter()->Send(message); 218 } 219 220 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) { 221 bool handled = true; 222 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message) 223 IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage) 224 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued) 225 IPC_MESSAGE_UNHANDLED(handled = false) 226 IPC_END_MESSAGE_MAP() 227 return handled; 228 } 229 230 void WebMessagePortChannelImpl::OnMessage( 231 const base::string16& message, 232 const std::vector<int>& sent_message_port_ids, 233 const std::vector<int>& new_routing_ids) { 234 base::AutoLock auto_lock(lock_); 235 Message msg; 236 msg.message = message; 237 if (!sent_message_port_ids.empty()) { 238 msg.ports.resize(sent_message_port_ids.size()); 239 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { 240 msg.ports[i] = new WebMessagePortChannelImpl(new_routing_ids[i], 241 sent_message_port_ids[i], 242 child_thread_loop_.get()); 243 } 244 } 245 246 bool was_empty = message_queue_.empty(); 247 message_queue_.push(msg); 248 if (client_ && was_empty) 249 client_->messageAvailable(); 250 } 251 252 void WebMessagePortChannelImpl::OnMessagesQueued() { 253 std::vector<QueuedMessage> queued_messages; 254 255 { 256 base::AutoLock auto_lock(lock_); 257 queued_messages.reserve(message_queue_.size()); 258 while (!message_queue_.empty()) { 259 base::string16 message = message_queue_.front().message; 260 const std::vector<WebMessagePortChannelImpl*>& channel_array = 261 message_queue_.front().ports; 262 std::vector<int> port_ids(channel_array.size()); 263 for (size_t i = 0; i < channel_array.size(); ++i) { 264 port_ids[i] = channel_array[i]->message_port_id(); 265 channel_array[i]->QueueMessages(); 266 } 267 queued_messages.push_back(std::make_pair(message, port_ids)); 268 message_queue_.pop(); 269 } 270 } 271 272 Send(new MessagePortHostMsg_SendQueuedMessages( 273 message_port_id_, queued_messages)); 274 275 message_port_id_ = MSG_ROUTING_NONE; 276 277 Release(); 278 ChildProcess::current()->ReleaseProcess(); 279 } 280 281 WebMessagePortChannelImpl::Message::Message() {} 282 283 WebMessagePortChannelImpl::Message::~Message() {} 284 285 } // namespace content 286