1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "content/child/webmessageportchannel_impl.h" 6 7 #include "base/bind.h" 8 #include "base/message_loop/message_loop_proxy.h" 9 #include "content/child/child_process.h" 10 #include "content/child/child_thread.h" 11 #include "content/common/worker_messages.h" 12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h" 13 #include "third_party/WebKit/public/platform/WebString.h" 14 15 using WebKit::WebMessagePortChannel; 16 using WebKit::WebMessagePortChannelArray; 17 using WebKit::WebMessagePortChannelClient; 18 using WebKit::WebString; 19 20 namespace content { 21 22 WebMessagePortChannelImpl::WebMessagePortChannelImpl( 23 base::MessageLoopProxy* child_thread_loop) 24 : client_(NULL), 25 route_id_(MSG_ROUTING_NONE), 26 message_port_id_(MSG_ROUTING_NONE), 27 child_thread_loop_(child_thread_loop) { 28 AddRef(); 29 Init(); 30 } 31 32 WebMessagePortChannelImpl::WebMessagePortChannelImpl( 33 int route_id, 34 int message_port_id, 35 base::MessageLoopProxy* child_thread_loop) 36 : client_(NULL), 37 route_id_(route_id), 38 message_port_id_(message_port_id), 39 child_thread_loop_(child_thread_loop) { 40 AddRef(); 41 Init(); 42 } 43 44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() { 45 // If we have any queued messages with attached ports, manually destroy them. 46 while (!message_queue_.empty()) { 47 const std::vector<WebMessagePortChannelImpl*>& channel_array = 48 message_queue_.front().ports; 49 for (size_t i = 0; i < channel_array.size(); i++) { 50 channel_array[i]->destroy(); 51 } 52 message_queue_.pop(); 53 } 54 55 if (message_port_id_ != MSG_ROUTING_NONE) 56 Send(new WorkerProcessHostMsg_DestroyMessagePort(message_port_id_)); 57 58 if (route_id_ != MSG_ROUTING_NONE) 59 ChildThread::current()->RemoveRoute(route_id_); 60 } 61 62 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) { 63 // Must lock here since client_ is called on the main thread. 64 base::AutoLock auto_lock(lock_); 65 client_ = client; 66 } 67 68 void WebMessagePortChannelImpl::destroy() { 69 setClient(NULL); 70 71 // Release the object on the main thread, since the destructor might want to 72 // send an IPC, and that has to happen on the main thread. 73 child_thread_loop_->ReleaseSoon(FROM_HERE, this); 74 } 75 76 void WebMessagePortChannelImpl::entangle(WebMessagePortChannel* channel) { 77 // The message port ids might not be set up yet, if this channel wasn't 78 // created on the main thread. So need to wait until we're on the main thread 79 // before getting the other message port id. 80 scoped_refptr<WebMessagePortChannelImpl> webchannel( 81 static_cast<WebMessagePortChannelImpl*>(channel)); 82 Entangle(webchannel); 83 } 84 85 void WebMessagePortChannelImpl::postMessage( 86 const WebString& message, 87 WebMessagePortChannelArray* channels) { 88 if (!child_thread_loop_->BelongsToCurrentThread()) { 89 child_thread_loop_->PostTask( 90 FROM_HERE, 91 base::Bind( 92 &WebMessagePortChannelImpl::postMessage, this, message, channels)); 93 return; 94 } 95 96 std::vector<int> message_port_ids(channels ? channels->size() : 0); 97 if (channels) { 98 // Extract the port IDs from the source array, then free it. 99 for (size_t i = 0; i < channels->size(); ++i) { 100 WebMessagePortChannelImpl* webchannel = 101 static_cast<WebMessagePortChannelImpl*>((*channels)[i]); 102 message_port_ids[i] = webchannel->message_port_id(); 103 webchannel->QueueMessages(); 104 DCHECK(message_port_ids[i] != MSG_ROUTING_NONE); 105 } 106 delete channels; 107 } 108 109 IPC::Message* msg = new WorkerProcessHostMsg_PostMessage( 110 message_port_id_, message, message_port_ids); 111 Send(msg); 112 } 113 114 bool WebMessagePortChannelImpl::tryGetMessage( 115 WebString* message, 116 WebMessagePortChannelArray& channels) { 117 base::AutoLock auto_lock(lock_); 118 if (message_queue_.empty()) 119 return false; 120 121 *message = message_queue_.front().message; 122 const std::vector<WebMessagePortChannelImpl*>& channel_array = 123 message_queue_.front().ports; 124 WebMessagePortChannelArray result_ports(channel_array.size()); 125 for (size_t i = 0; i < channel_array.size(); i++) { 126 result_ports[i] = channel_array[i]; 127 } 128 129 channels.swap(result_ports); 130 message_queue_.pop(); 131 return true; 132 } 133 134 void WebMessagePortChannelImpl::Init() { 135 if (!child_thread_loop_->BelongsToCurrentThread()) { 136 child_thread_loop_->PostTask( 137 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this)); 138 return; 139 } 140 141 if (route_id_ == MSG_ROUTING_NONE) { 142 DCHECK(message_port_id_ == MSG_ROUTING_NONE); 143 Send(new WorkerProcessHostMsg_CreateMessagePort( 144 &route_id_, &message_port_id_)); 145 } 146 147 ChildThread::current()->AddRoute(route_id_, this); 148 } 149 150 void WebMessagePortChannelImpl::Entangle( 151 scoped_refptr<WebMessagePortChannelImpl> channel) { 152 if (!child_thread_loop_->BelongsToCurrentThread()) { 153 child_thread_loop_->PostTask( 154 FROM_HERE, 155 base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel)); 156 return; 157 } 158 159 Send(new WorkerProcessHostMsg_Entangle( 160 message_port_id_, channel->message_port_id())); 161 } 162 163 void WebMessagePortChannelImpl::QueueMessages() { 164 if (!child_thread_loop_->BelongsToCurrentThread()) { 165 child_thread_loop_->PostTask( 166 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this)); 167 return; 168 } 169 // This message port is being sent elsewhere (perhaps to another process). 170 // The new endpoint needs to receive the queued messages, including ones that 171 // could still be in-flight. So we tell the browser to queue messages, and it 172 // sends us an ack, whose receipt we know means that no more messages are 173 // in-flight. We then send the queued messages to the browser, which prepends 174 // them to the ones it queued and it sends them to the new endpoint. 175 Send(new WorkerProcessHostMsg_QueueMessages(message_port_id_)); 176 177 // The process could potentially go away while we're still waiting for 178 // in-flight messages. Ensure it stays alive. 179 ChildProcess::current()->AddRefProcess(); 180 } 181 182 void WebMessagePortChannelImpl::Send(IPC::Message* message) { 183 if (!child_thread_loop_->BelongsToCurrentThread()) { 184 DCHECK(!message->is_sync()); 185 child_thread_loop_->PostTask( 186 FROM_HERE, 187 base::Bind(&WebMessagePortChannelImpl::Send, this, message)); 188 return; 189 } 190 191 ChildThread::current()->Send(message); 192 } 193 194 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) { 195 bool handled = true; 196 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message) 197 IPC_MESSAGE_HANDLER(WorkerProcessMsg_Message, OnMessage) 198 IPC_MESSAGE_HANDLER(WorkerProcessMsg_MessagesQueued, OnMessagedQueued) 199 IPC_MESSAGE_UNHANDLED(handled = false) 200 IPC_END_MESSAGE_MAP() 201 return handled; 202 } 203 204 void WebMessagePortChannelImpl::OnMessage( 205 const string16& message, 206 const std::vector<int>& sent_message_port_ids, 207 const std::vector<int>& new_routing_ids) { 208 base::AutoLock auto_lock(lock_); 209 Message msg; 210 msg.message = message; 211 if (!sent_message_port_ids.empty()) { 212 msg.ports.resize(sent_message_port_ids.size()); 213 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { 214 msg.ports[i] = new WebMessagePortChannelImpl(new_routing_ids[i], 215 sent_message_port_ids[i], 216 child_thread_loop_.get()); 217 } 218 } 219 220 bool was_empty = message_queue_.empty(); 221 message_queue_.push(msg); 222 if (client_ && was_empty) 223 client_->messageAvailable(); 224 } 225 226 void WebMessagePortChannelImpl::OnMessagedQueued() { 227 std::vector<QueuedMessage> queued_messages; 228 229 { 230 base::AutoLock auto_lock(lock_); 231 queued_messages.reserve(message_queue_.size()); 232 while (!message_queue_.empty()) { 233 string16 message = message_queue_.front().message; 234 const std::vector<WebMessagePortChannelImpl*>& channel_array = 235 message_queue_.front().ports; 236 std::vector<int> port_ids(channel_array.size()); 237 for (size_t i = 0; i < channel_array.size(); ++i) { 238 port_ids[i] = channel_array[i]->message_port_id(); 239 } 240 queued_messages.push_back(std::make_pair(message, port_ids)); 241 message_queue_.pop(); 242 } 243 } 244 245 Send(new WorkerProcessHostMsg_SendQueuedMessages( 246 message_port_id_, queued_messages)); 247 248 message_port_id_ = MSG_ROUTING_NONE; 249 250 Release(); 251 ChildProcess::current()->ReleaseProcess(); 252 } 253 254 WebMessagePortChannelImpl::Message::Message() {} 255 256 WebMessagePortChannelImpl::Message::~Message() {} 257 258 } // namespace content 259