Home | History | Annotate | Download | only in child
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/child/webmessageportchannel_impl.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/message_loop/message_loop_proxy.h"
      9 #include "content/child/child_process.h"
     10 #include "content/child/child_thread.h"
     11 #include "content/common/message_port_messages.h"
     12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
     13 #include "third_party/WebKit/public/platform/WebString.h"
     14 
     15 using blink::WebMessagePortChannel;
     16 using blink::WebMessagePortChannelArray;
     17 using blink::WebMessagePortChannelClient;
     18 using blink::WebString;
     19 
     20 namespace content {
     21 
     22 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
     23     base::MessageLoopProxy* child_thread_loop)
     24     : client_(NULL),
     25       route_id_(MSG_ROUTING_NONE),
     26       message_port_id_(MSG_ROUTING_NONE),
     27       child_thread_loop_(child_thread_loop) {
     28   AddRef();
     29   Init();
     30 }
     31 
     32 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
     33     int route_id,
     34     int message_port_id,
     35     base::MessageLoopProxy* child_thread_loop)
     36     : client_(NULL),
     37       route_id_(route_id),
     38       message_port_id_(message_port_id),
     39       child_thread_loop_(child_thread_loop) {
     40   AddRef();
     41   Init();
     42 }
     43 
     44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
     45   // If we have any queued messages with attached ports, manually destroy them.
     46   while (!message_queue_.empty()) {
     47     const std::vector<WebMessagePortChannelImpl*>& channel_array =
     48         message_queue_.front().ports;
     49     for (size_t i = 0; i < channel_array.size(); i++) {
     50       channel_array[i]->destroy();
     51     }
     52     message_queue_.pop();
     53   }
     54 
     55   if (message_port_id_ != MSG_ROUTING_NONE)
     56     Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_));
     57 
     58   if (route_id_ != MSG_ROUTING_NONE)
     59     ChildThread::current()->GetRouter()->RemoveRoute(route_id_);
     60 }
     61 
     62 // static
     63 void WebMessagePortChannelImpl::CreatePair(
     64     base::MessageLoopProxy* child_thread_loop,
     65     blink::WebMessagePortChannel** channel1,
     66     blink::WebMessagePortChannel** channel2) {
     67   WebMessagePortChannelImpl* impl1 =
     68       new WebMessagePortChannelImpl(child_thread_loop);
     69   WebMessagePortChannelImpl* impl2 =
     70       new WebMessagePortChannelImpl(child_thread_loop);
     71 
     72   impl1->Entangle(impl2);
     73   impl2->Entangle(impl1);
     74 
     75   *channel1 = impl1;
     76   *channel2 = impl2;
     77 }
     78 
     79 // static
     80 std::vector<int> WebMessagePortChannelImpl::ExtractMessagePortIDs(
     81     WebMessagePortChannelArray* channels) {
     82   std::vector<int> message_port_ids;
     83   if (channels) {
     84     message_port_ids.resize(channels->size());
     85     // Extract the port IDs from the source array, then free it.
     86     for (size_t i = 0; i < channels->size(); ++i) {
     87       WebMessagePortChannelImpl* webchannel =
     88           static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
     89       // The message port ids might not be set up yet if this channel
     90       // wasn't created on the main thread.
     91       DCHECK(webchannel->child_thread_loop_->BelongsToCurrentThread());
     92       message_port_ids[i] = webchannel->message_port_id();
     93       webchannel->QueueMessages();
     94       DCHECK(message_port_ids[i] != MSG_ROUTING_NONE);
     95     }
     96     delete channels;
     97   }
     98   return message_port_ids;
     99 }
    100 
    101 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
    102   // Must lock here since client_ is called on the main thread.
    103   base::AutoLock auto_lock(lock_);
    104   client_ = client;
    105 }
    106 
    107 void WebMessagePortChannelImpl::destroy() {
    108   setClient(NULL);
    109 
    110   // Release the object on the main thread, since the destructor might want to
    111   // send an IPC, and that has to happen on the main thread.
    112   child_thread_loop_->ReleaseSoon(FROM_HERE, this);
    113 }
    114 
    115 void WebMessagePortChannelImpl::postMessage(
    116     const WebString& message,
    117     WebMessagePortChannelArray* channels) {
    118   if (!child_thread_loop_->BelongsToCurrentThread()) {
    119     child_thread_loop_->PostTask(
    120         FROM_HERE,
    121         base::Bind(
    122             &WebMessagePortChannelImpl::PostMessage, this, message, channels));
    123   } else {
    124     PostMessage(message, channels);
    125   }
    126 }
    127 
    128 void WebMessagePortChannelImpl::PostMessage(
    129     const base::string16& message,
    130     WebMessagePortChannelArray* channels) {
    131   IPC::Message* msg = new MessagePortHostMsg_PostMessage(
    132       message_port_id_, message, ExtractMessagePortIDs(channels));
    133   Send(msg);
    134 }
    135 
    136 bool WebMessagePortChannelImpl::tryGetMessage(
    137     WebString* message,
    138     WebMessagePortChannelArray& channels) {
    139   base::AutoLock auto_lock(lock_);
    140   if (message_queue_.empty())
    141     return false;
    142 
    143   *message = message_queue_.front().message;
    144   const std::vector<WebMessagePortChannelImpl*>& channel_array =
    145       message_queue_.front().ports;
    146   WebMessagePortChannelArray result_ports(channel_array.size());
    147   for (size_t i = 0; i < channel_array.size(); i++) {
    148     result_ports[i] = channel_array[i];
    149   }
    150 
    151   channels.swap(result_ports);
    152   message_queue_.pop();
    153   return true;
    154 }
    155 
    156 void WebMessagePortChannelImpl::Init() {
    157   if (!child_thread_loop_->BelongsToCurrentThread()) {
    158     child_thread_loop_->PostTask(
    159         FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this));
    160     return;
    161   }
    162 
    163   if (route_id_ == MSG_ROUTING_NONE) {
    164     DCHECK(message_port_id_ == MSG_ROUTING_NONE);
    165     Send(new MessagePortHostMsg_CreateMessagePort(
    166         &route_id_, &message_port_id_));
    167   }
    168 
    169   ChildThread::current()->GetRouter()->AddRoute(route_id_, this);
    170 }
    171 
    172 void WebMessagePortChannelImpl::Entangle(
    173     scoped_refptr<WebMessagePortChannelImpl> channel) {
    174   // The message port ids might not be set up yet, if this channel wasn't
    175   // created on the main thread.  So need to wait until we're on the main thread
    176   // before getting the other message port id.
    177   if (!child_thread_loop_->BelongsToCurrentThread()) {
    178     child_thread_loop_->PostTask(
    179         FROM_HERE,
    180         base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
    181     return;
    182   }
    183 
    184   Send(new MessagePortHostMsg_Entangle(
    185       message_port_id_, channel->message_port_id()));
    186 }
    187 
    188 void WebMessagePortChannelImpl::QueueMessages() {
    189   if (!child_thread_loop_->BelongsToCurrentThread()) {
    190     child_thread_loop_->PostTask(
    191         FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
    192     return;
    193   }
    194   // This message port is being sent elsewhere (perhaps to another process).
    195   // The new endpoint needs to receive the queued messages, including ones that
    196   // could still be in-flight.  So we tell the browser to queue messages, and it
    197   // sends us an ack, whose receipt we know means that no more messages are
    198   // in-flight.  We then send the queued messages to the browser, which prepends
    199   // them to the ones it queued and it sends them to the new endpoint.
    200   Send(new MessagePortHostMsg_QueueMessages(message_port_id_));
    201 
    202   // The process could potentially go away while we're still waiting for
    203   // in-flight messages.  Ensure it stays alive.
    204   ChildProcess::current()->AddRefProcess();
    205 }
    206 
    207 void WebMessagePortChannelImpl::Send(IPC::Message* message) {
    208   if (!child_thread_loop_->BelongsToCurrentThread()) {
    209     DCHECK(!message->is_sync());
    210     child_thread_loop_->PostTask(
    211         FROM_HERE,
    212         base::Bind(&WebMessagePortChannelImpl::Send, this, message));
    213     return;
    214   }
    215 
    216   ChildThread::current()->GetRouter()->Send(message);
    217 }
    218 
    219 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
    220   bool handled = true;
    221   IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
    222     IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage)
    223     IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued)
    224     IPC_MESSAGE_UNHANDLED(handled = false)
    225   IPC_END_MESSAGE_MAP()
    226   return handled;
    227 }
    228 
    229 void WebMessagePortChannelImpl::OnMessage(
    230     const base::string16& message,
    231     const std::vector<int>& sent_message_port_ids,
    232     const std::vector<int>& new_routing_ids) {
    233   base::AutoLock auto_lock(lock_);
    234   Message msg;
    235   msg.message = message;
    236   if (!sent_message_port_ids.empty()) {
    237     msg.ports.resize(sent_message_port_ids.size());
    238     for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
    239       msg.ports[i] = new WebMessagePortChannelImpl(new_routing_ids[i],
    240                                                    sent_message_port_ids[i],
    241                                                    child_thread_loop_.get());
    242     }
    243   }
    244 
    245   bool was_empty = message_queue_.empty();
    246   message_queue_.push(msg);
    247   if (client_ && was_empty)
    248     client_->messageAvailable();
    249 }
    250 
    251 void WebMessagePortChannelImpl::OnMessagesQueued() {
    252   std::vector<QueuedMessage> queued_messages;
    253 
    254   {
    255     base::AutoLock auto_lock(lock_);
    256     queued_messages.reserve(message_queue_.size());
    257     while (!message_queue_.empty()) {
    258       base::string16 message = message_queue_.front().message;
    259       const std::vector<WebMessagePortChannelImpl*>& channel_array =
    260           message_queue_.front().ports;
    261       std::vector<int> port_ids(channel_array.size());
    262       for (size_t i = 0; i < channel_array.size(); ++i) {
    263         port_ids[i] = channel_array[i]->message_port_id();
    264         channel_array[i]->QueueMessages();
    265       }
    266       queued_messages.push_back(std::make_pair(message, port_ids));
    267       message_queue_.pop();
    268     }
    269   }
    270 
    271   Send(new MessagePortHostMsg_SendQueuedMessages(
    272       message_port_id_, queued_messages));
    273 
    274   message_port_id_ = MSG_ROUTING_NONE;
    275 
    276   Release();
    277   ChildProcess::current()->ReleaseProcess();
    278 }
    279 
    280 WebMessagePortChannelImpl::Message::Message() {}
    281 
    282 WebMessagePortChannelImpl::Message::~Message() {}
    283 
    284 }  // namespace content
    285