Home | History | Annotate | Download | only in child
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/child/webmessageportchannel_impl.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/message_loop/message_loop_proxy.h"
      9 #include "content/child/child_process.h"
     10 #include "content/child/child_thread.h"
     11 #include "content/common/message_port_messages.h"
     12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
     13 #include "third_party/WebKit/public/platform/WebString.h"
     14 
     15 using blink::WebMessagePortChannel;
     16 using blink::WebMessagePortChannelArray;
     17 using blink::WebMessagePortChannelClient;
     18 using blink::WebString;
     19 
     20 namespace content {
     21 
     22 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
     23     const scoped_refptr<base::MessageLoopProxy>& child_thread_loop)
     24     : client_(NULL),
     25       route_id_(MSG_ROUTING_NONE),
     26       message_port_id_(MSG_ROUTING_NONE),
     27       child_thread_loop_(child_thread_loop) {
     28   AddRef();
     29   Init();
     30 }
     31 
     32 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
     33     int route_id,
     34     int message_port_id,
     35     const scoped_refptr<base::MessageLoopProxy>& child_thread_loop)
     36     : client_(NULL),
     37       route_id_(route_id),
     38       message_port_id_(message_port_id),
     39       child_thread_loop_(child_thread_loop) {
     40   AddRef();
     41   Init();
     42 }
     43 
     44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
     45   // If we have any queued messages with attached ports, manually destroy them.
     46   while (!message_queue_.empty()) {
     47     const std::vector<WebMessagePortChannelImpl*>& channel_array =
     48         message_queue_.front().ports;
     49     for (size_t i = 0; i < channel_array.size(); i++) {
     50       channel_array[i]->destroy();
     51     }
     52     message_queue_.pop();
     53   }
     54 
     55   if (message_port_id_ != MSG_ROUTING_NONE)
     56     Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_));
     57 
     58   if (route_id_ != MSG_ROUTING_NONE)
     59     ChildThread::current()->GetRouter()->RemoveRoute(route_id_);
     60 }
     61 
     62 // static
     63 void WebMessagePortChannelImpl::CreatePair(
     64     const scoped_refptr<base::MessageLoopProxy>& child_thread_loop,
     65     blink::WebMessagePortChannel** channel1,
     66     blink::WebMessagePortChannel** channel2) {
     67   WebMessagePortChannelImpl* impl1 =
     68       new WebMessagePortChannelImpl(child_thread_loop);
     69   WebMessagePortChannelImpl* impl2 =
     70       new WebMessagePortChannelImpl(child_thread_loop);
     71 
     72   impl1->Entangle(impl2);
     73   impl2->Entangle(impl1);
     74 
     75   *channel1 = impl1;
     76   *channel2 = impl2;
     77 }
     78 
     79 // static
     80 std::vector<int> WebMessagePortChannelImpl::ExtractMessagePortIDs(
     81     WebMessagePortChannelArray* channels) {
     82   std::vector<int> message_port_ids;
     83   if (channels) {
     84     message_port_ids.resize(channels->size());
     85     // Extract the port IDs from the source array, then free it.
     86     for (size_t i = 0; i < channels->size(); ++i) {
     87       WebMessagePortChannelImpl* webchannel =
     88           static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
     89       // The message port ids might not be set up yet if this channel
     90       // wasn't created on the main thread.
     91       DCHECK(webchannel->child_thread_loop_->BelongsToCurrentThread());
     92       message_port_ids[i] = webchannel->message_port_id();
     93       webchannel->QueueMessages();
     94       DCHECK(message_port_ids[i] != MSG_ROUTING_NONE);
     95     }
     96     delete channels;
     97   }
     98   return message_port_ids;
     99 }
    100 
    101 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
    102   // Must lock here since client_ is called on the main thread.
    103   base::AutoLock auto_lock(lock_);
    104   client_ = client;
    105 }
    106 
    107 void WebMessagePortChannelImpl::destroy() {
    108   setClient(NULL);
    109 
    110   // Release the object on the main thread, since the destructor might want to
    111   // send an IPC, and that has to happen on the main thread.
    112   child_thread_loop_->ReleaseSoon(FROM_HERE, this);
    113 }
    114 
    115 void WebMessagePortChannelImpl::postMessage(
    116     const WebString& message,
    117     WebMessagePortChannelArray* channels) {
    118   if (!child_thread_loop_->BelongsToCurrentThread()) {
    119     child_thread_loop_->PostTask(
    120         FROM_HERE,
    121         base::Bind(
    122             &WebMessagePortChannelImpl::PostMessage, this,
    123             static_cast<base::string16>(message), channels));
    124   } else {
    125     PostMessage(message, channels);
    126   }
    127 }
    128 
    129 void WebMessagePortChannelImpl::PostMessage(
    130     const base::string16& message,
    131     WebMessagePortChannelArray* channels) {
    132   IPC::Message* msg = new MessagePortHostMsg_PostMessage(
    133       message_port_id_, message, ExtractMessagePortIDs(channels));
    134   Send(msg);
    135 }
    136 
    137 bool WebMessagePortChannelImpl::tryGetMessage(
    138     WebString* message,
    139     WebMessagePortChannelArray& channels) {
    140   base::AutoLock auto_lock(lock_);
    141   if (message_queue_.empty())
    142     return false;
    143 
    144   *message = message_queue_.front().message;
    145   const std::vector<WebMessagePortChannelImpl*>& channel_array =
    146       message_queue_.front().ports;
    147   WebMessagePortChannelArray result_ports(channel_array.size());
    148   for (size_t i = 0; i < channel_array.size(); i++) {
    149     result_ports[i] = channel_array[i];
    150   }
    151 
    152   channels.swap(result_ports);
    153   message_queue_.pop();
    154   return true;
    155 }
    156 
    157 void WebMessagePortChannelImpl::Init() {
    158   if (!child_thread_loop_->BelongsToCurrentThread()) {
    159     child_thread_loop_->PostTask(
    160         FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this));
    161     return;
    162   }
    163 
    164   if (route_id_ == MSG_ROUTING_NONE) {
    165     DCHECK(message_port_id_ == MSG_ROUTING_NONE);
    166     Send(new MessagePortHostMsg_CreateMessagePort(
    167         &route_id_, &message_port_id_));
    168   }
    169 
    170   ChildThread::current()->GetRouter()->AddRoute(route_id_, this);
    171 }
    172 
    173 void WebMessagePortChannelImpl::Entangle(
    174     scoped_refptr<WebMessagePortChannelImpl> channel) {
    175   // The message port ids might not be set up yet, if this channel wasn't
    176   // created on the main thread.  So need to wait until we're on the main thread
    177   // before getting the other message port id.
    178   if (!child_thread_loop_->BelongsToCurrentThread()) {
    179     child_thread_loop_->PostTask(
    180         FROM_HERE,
    181         base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
    182     return;
    183   }
    184 
    185   Send(new MessagePortHostMsg_Entangle(
    186       message_port_id_, channel->message_port_id()));
    187 }
    188 
    189 void WebMessagePortChannelImpl::QueueMessages() {
    190   if (!child_thread_loop_->BelongsToCurrentThread()) {
    191     child_thread_loop_->PostTask(
    192         FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
    193     return;
    194   }
    195   // This message port is being sent elsewhere (perhaps to another process).
    196   // The new endpoint needs to receive the queued messages, including ones that
    197   // could still be in-flight.  So we tell the browser to queue messages, and it
    198   // sends us an ack, whose receipt we know means that no more messages are
    199   // in-flight.  We then send the queued messages to the browser, which prepends
    200   // them to the ones it queued and it sends them to the new endpoint.
    201   Send(new MessagePortHostMsg_QueueMessages(message_port_id_));
    202 
    203   // The process could potentially go away while we're still waiting for
    204   // in-flight messages.  Ensure it stays alive.
    205   ChildProcess::current()->AddRefProcess();
    206 }
    207 
    208 void WebMessagePortChannelImpl::Send(IPC::Message* message) {
    209   if (!child_thread_loop_->BelongsToCurrentThread()) {
    210     DCHECK(!message->is_sync());
    211     child_thread_loop_->PostTask(
    212         FROM_HERE,
    213         base::Bind(&WebMessagePortChannelImpl::Send, this, message));
    214     return;
    215   }
    216 
    217   ChildThread::current()->GetRouter()->Send(message);
    218 }
    219 
    220 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
    221   bool handled = true;
    222   IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
    223     IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage)
    224     IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued)
    225     IPC_MESSAGE_UNHANDLED(handled = false)
    226   IPC_END_MESSAGE_MAP()
    227   return handled;
    228 }
    229 
    230 void WebMessagePortChannelImpl::OnMessage(
    231     const base::string16& message,
    232     const std::vector<int>& sent_message_port_ids,
    233     const std::vector<int>& new_routing_ids) {
    234   base::AutoLock auto_lock(lock_);
    235   Message msg;
    236   msg.message = message;
    237   if (!sent_message_port_ids.empty()) {
    238     msg.ports.resize(sent_message_port_ids.size());
    239     for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
    240       msg.ports[i] = new WebMessagePortChannelImpl(new_routing_ids[i],
    241                                                    sent_message_port_ids[i],
    242                                                    child_thread_loop_.get());
    243     }
    244   }
    245 
    246   bool was_empty = message_queue_.empty();
    247   message_queue_.push(msg);
    248   if (client_ && was_empty)
    249     client_->messageAvailable();
    250 }
    251 
    252 void WebMessagePortChannelImpl::OnMessagesQueued() {
    253   std::vector<QueuedMessage> queued_messages;
    254 
    255   {
    256     base::AutoLock auto_lock(lock_);
    257     queued_messages.reserve(message_queue_.size());
    258     while (!message_queue_.empty()) {
    259       base::string16 message = message_queue_.front().message;
    260       const std::vector<WebMessagePortChannelImpl*>& channel_array =
    261           message_queue_.front().ports;
    262       std::vector<int> port_ids(channel_array.size());
    263       for (size_t i = 0; i < channel_array.size(); ++i) {
    264         port_ids[i] = channel_array[i]->message_port_id();
    265         channel_array[i]->QueueMessages();
    266       }
    267       queued_messages.push_back(std::make_pair(message, port_ids));
    268       message_queue_.pop();
    269     }
    270   }
    271 
    272   Send(new MessagePortHostMsg_SendQueuedMessages(
    273       message_port_id_, queued_messages));
    274 
    275   message_port_id_ = MSG_ROUTING_NONE;
    276 
    277   Release();
    278   ChildProcess::current()->ReleaseProcess();
    279 }
    280 
    281 WebMessagePortChannelImpl::Message::Message() {}
    282 
    283 WebMessagePortChannelImpl::Message::~Message() {}
    284 
    285 }  // namespace content
    286