Home | History | Annotate | Download | only in child
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/child/webmessageportchannel_impl.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/message_loop/message_loop_proxy.h"
      9 #include "content/child/child_process.h"
     10 #include "content/child/child_thread.h"
     11 #include "content/common/worker_messages.h"
     12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
     13 #include "third_party/WebKit/public/platform/WebString.h"
     14 
     15 using WebKit::WebMessagePortChannel;
     16 using WebKit::WebMessagePortChannelArray;
     17 using WebKit::WebMessagePortChannelClient;
     18 using WebKit::WebString;
     19 
     20 namespace content {
     21 
     22 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
     23     base::MessageLoopProxy* child_thread_loop)
     24     : client_(NULL),
     25       route_id_(MSG_ROUTING_NONE),
     26       message_port_id_(MSG_ROUTING_NONE),
     27       child_thread_loop_(child_thread_loop) {
     28   AddRef();
     29   Init();
     30 }
     31 
     32 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
     33     int route_id,
     34     int message_port_id,
     35     base::MessageLoopProxy* child_thread_loop)
     36     : client_(NULL),
     37       route_id_(route_id),
     38       message_port_id_(message_port_id),
     39       child_thread_loop_(child_thread_loop) {
     40   AddRef();
     41   Init();
     42 }
     43 
     44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
     45   // If we have any queued messages with attached ports, manually destroy them.
     46   while (!message_queue_.empty()) {
     47     const std::vector<WebMessagePortChannelImpl*>& channel_array =
     48         message_queue_.front().ports;
     49     for (size_t i = 0; i < channel_array.size(); i++) {
     50       channel_array[i]->destroy();
     51     }
     52     message_queue_.pop();
     53   }
     54 
     55   if (message_port_id_ != MSG_ROUTING_NONE)
     56     Send(new WorkerProcessHostMsg_DestroyMessagePort(message_port_id_));
     57 
     58   if (route_id_ != MSG_ROUTING_NONE)
     59     ChildThread::current()->RemoveRoute(route_id_);
     60 }
     61 
     62 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
     63   // Must lock here since client_ is called on the main thread.
     64   base::AutoLock auto_lock(lock_);
     65   client_ = client;
     66 }
     67 
     68 void WebMessagePortChannelImpl::destroy() {
     69   setClient(NULL);
     70 
     71   // Release the object on the main thread, since the destructor might want to
     72   // send an IPC, and that has to happen on the main thread.
     73   child_thread_loop_->ReleaseSoon(FROM_HERE, this);
     74 }
     75 
     76 void WebMessagePortChannelImpl::entangle(WebMessagePortChannel* channel) {
     77   // The message port ids might not be set up yet, if this channel wasn't
     78   // created on the main thread.  So need to wait until we're on the main thread
     79   // before getting the other message port id.
     80   scoped_refptr<WebMessagePortChannelImpl> webchannel(
     81       static_cast<WebMessagePortChannelImpl*>(channel));
     82   Entangle(webchannel);
     83 }
     84 
     85 void WebMessagePortChannelImpl::postMessage(
     86     const WebString& message,
     87     WebMessagePortChannelArray* channels) {
     88   if (!child_thread_loop_->BelongsToCurrentThread()) {
     89     child_thread_loop_->PostTask(
     90         FROM_HERE,
     91         base::Bind(
     92             &WebMessagePortChannelImpl::postMessage, this, message, channels));
     93     return;
     94   }
     95 
     96   std::vector<int> message_port_ids(channels ? channels->size() : 0);
     97   if (channels) {
     98     // Extract the port IDs from the source array, then free it.
     99     for (size_t i = 0; i < channels->size(); ++i) {
    100       WebMessagePortChannelImpl* webchannel =
    101           static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
    102       message_port_ids[i] = webchannel->message_port_id();
    103       webchannel->QueueMessages();
    104       DCHECK(message_port_ids[i] != MSG_ROUTING_NONE);
    105     }
    106     delete channels;
    107   }
    108 
    109   IPC::Message* msg = new WorkerProcessHostMsg_PostMessage(
    110       message_port_id_, message, message_port_ids);
    111   Send(msg);
    112 }
    113 
    114 bool WebMessagePortChannelImpl::tryGetMessage(
    115     WebString* message,
    116     WebMessagePortChannelArray& channels) {
    117   base::AutoLock auto_lock(lock_);
    118   if (message_queue_.empty())
    119     return false;
    120 
    121   *message = message_queue_.front().message;
    122   const std::vector<WebMessagePortChannelImpl*>& channel_array =
    123       message_queue_.front().ports;
    124   WebMessagePortChannelArray result_ports(channel_array.size());
    125   for (size_t i = 0; i < channel_array.size(); i++) {
    126     result_ports[i] = channel_array[i];
    127   }
    128 
    129   channels.swap(result_ports);
    130   message_queue_.pop();
    131   return true;
    132 }
    133 
    134 void WebMessagePortChannelImpl::Init() {
    135   if (!child_thread_loop_->BelongsToCurrentThread()) {
    136     child_thread_loop_->PostTask(
    137         FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this));
    138     return;
    139   }
    140 
    141   if (route_id_ == MSG_ROUTING_NONE) {
    142     DCHECK(message_port_id_ == MSG_ROUTING_NONE);
    143     Send(new WorkerProcessHostMsg_CreateMessagePort(
    144         &route_id_, &message_port_id_));
    145   }
    146 
    147   ChildThread::current()->AddRoute(route_id_, this);
    148 }
    149 
    150 void WebMessagePortChannelImpl::Entangle(
    151     scoped_refptr<WebMessagePortChannelImpl> channel) {
    152   if (!child_thread_loop_->BelongsToCurrentThread()) {
    153     child_thread_loop_->PostTask(
    154         FROM_HERE,
    155         base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
    156     return;
    157   }
    158 
    159   Send(new WorkerProcessHostMsg_Entangle(
    160       message_port_id_, channel->message_port_id()));
    161 }
    162 
    163 void WebMessagePortChannelImpl::QueueMessages() {
    164   if (!child_thread_loop_->BelongsToCurrentThread()) {
    165     child_thread_loop_->PostTask(
    166         FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
    167     return;
    168   }
    169   // This message port is being sent elsewhere (perhaps to another process).
    170   // The new endpoint needs to receive the queued messages, including ones that
    171   // could still be in-flight.  So we tell the browser to queue messages, and it
    172   // sends us an ack, whose receipt we know means that no more messages are
    173   // in-flight.  We then send the queued messages to the browser, which prepends
    174   // them to the ones it queued and it sends them to the new endpoint.
    175   Send(new WorkerProcessHostMsg_QueueMessages(message_port_id_));
    176 
    177   // The process could potentially go away while we're still waiting for
    178   // in-flight messages.  Ensure it stays alive.
    179   ChildProcess::current()->AddRefProcess();
    180 }
    181 
    182 void WebMessagePortChannelImpl::Send(IPC::Message* message) {
    183   if (!child_thread_loop_->BelongsToCurrentThread()) {
    184     DCHECK(!message->is_sync());
    185     child_thread_loop_->PostTask(
    186         FROM_HERE,
    187         base::Bind(&WebMessagePortChannelImpl::Send, this, message));
    188     return;
    189   }
    190 
    191   ChildThread::current()->Send(message);
    192 }
    193 
    194 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
    195   bool handled = true;
    196   IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
    197     IPC_MESSAGE_HANDLER(WorkerProcessMsg_Message, OnMessage)
    198     IPC_MESSAGE_HANDLER(WorkerProcessMsg_MessagesQueued, OnMessagedQueued)
    199     IPC_MESSAGE_UNHANDLED(handled = false)
    200   IPC_END_MESSAGE_MAP()
    201   return handled;
    202 }
    203 
    204 void WebMessagePortChannelImpl::OnMessage(
    205     const string16& message,
    206     const std::vector<int>& sent_message_port_ids,
    207     const std::vector<int>& new_routing_ids) {
    208   base::AutoLock auto_lock(lock_);
    209   Message msg;
    210   msg.message = message;
    211   if (!sent_message_port_ids.empty()) {
    212     msg.ports.resize(sent_message_port_ids.size());
    213     for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
    214       msg.ports[i] = new WebMessagePortChannelImpl(new_routing_ids[i],
    215                                                    sent_message_port_ids[i],
    216                                                    child_thread_loop_.get());
    217     }
    218   }
    219 
    220   bool was_empty = message_queue_.empty();
    221   message_queue_.push(msg);
    222   if (client_ && was_empty)
    223     client_->messageAvailable();
    224 }
    225 
    226 void WebMessagePortChannelImpl::OnMessagedQueued() {
    227   std::vector<QueuedMessage> queued_messages;
    228 
    229   {
    230     base::AutoLock auto_lock(lock_);
    231     queued_messages.reserve(message_queue_.size());
    232     while (!message_queue_.empty()) {
    233       string16 message = message_queue_.front().message;
    234       const std::vector<WebMessagePortChannelImpl*>& channel_array =
    235           message_queue_.front().ports;
    236       std::vector<int> port_ids(channel_array.size());
    237       for (size_t i = 0; i < channel_array.size(); ++i) {
    238         port_ids[i] = channel_array[i]->message_port_id();
    239       }
    240       queued_messages.push_back(std::make_pair(message, port_ids));
    241       message_queue_.pop();
    242     }
    243   }
    244 
    245   Send(new WorkerProcessHostMsg_SendQueuedMessages(
    246       message_port_id_, queued_messages));
    247 
    248   message_port_id_ = MSG_ROUTING_NONE;
    249 
    250   Release();
    251   ChildProcess::current()->ReleaseProcess();
    252 }
    253 
    254 WebMessagePortChannelImpl::Message::Message() {}
    255 
    256 WebMessagePortChannelImpl::Message::~Message() {}
    257 
    258 }  // namespace content
    259