Home | History | Annotate | Download | only in worker_host
      1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/browser/worker_host/message_port_service.h"
      6 
      7 #include "content/browser/worker_host/worker_message_filter.h"
      8 #include "content/common/worker_messages.h"
      9 
     10 namespace content {
     11 
     12 struct MessagePortService::MessagePort {
     13   // |filter| and |route_id| are what we need to send messages to the port.
     14   // |filter| is just a weak pointer since we get notified when its process has
     15   // gone away and remove it.
     16   WorkerMessageFilter* filter;
     17   int route_id;
     18   // A globally unique id for this message port.
     19   int message_port_id;
     20   // The globally unique id of the entangled message port.
     21   int entangled_message_port_id;
     22   // If true, all messages to this message port are queued and not delivered.
     23   bool queue_messages;
     24   QueuedMessages queued_messages;
     25 };
     26 
     27 MessagePortService* MessagePortService::GetInstance() {
     28   return Singleton<MessagePortService>::get();
     29 }
     30 
     31 MessagePortService::MessagePortService()
     32     : next_message_port_id_(0) {
     33 }
     34 
     35 MessagePortService::~MessagePortService() {
     36 }
     37 
     38 void MessagePortService::UpdateMessagePort(
     39     int message_port_id,
     40     WorkerMessageFilter* filter,
     41     int routing_id) {
     42   if (!message_ports_.count(message_port_id)) {
     43     NOTREACHED();
     44     return;
     45   }
     46 
     47   MessagePort& port = message_ports_[message_port_id];
     48   port.filter = filter;
     49   port.route_id = routing_id;
     50 }
     51 
     52 void MessagePortService::OnWorkerMessageFilterClosing(
     53     WorkerMessageFilter* filter) {
     54   // Check if the (possibly) crashed process had any message ports.
     55   for (MessagePorts::iterator iter = message_ports_.begin();
     56        iter != message_ports_.end();) {
     57     MessagePorts::iterator cur_item = iter++;
     58     if (cur_item->second.filter == filter) {
     59       Erase(cur_item->first);
     60     }
     61   }
     62 }
     63 
     64 void MessagePortService::Create(int route_id,
     65                                 WorkerMessageFilter* filter,
     66                                 int* message_port_id) {
     67   *message_port_id = ++next_message_port_id_;
     68 
     69   MessagePort port;
     70   port.filter = filter;
     71   port.route_id = route_id;
     72   port.message_port_id = *message_port_id;
     73   port.entangled_message_port_id = MSG_ROUTING_NONE;
     74   port.queue_messages = false;
     75   message_ports_[*message_port_id] = port;
     76 }
     77 
     78 void MessagePortService::Destroy(int message_port_id) {
     79   if (!message_ports_.count(message_port_id)) {
     80     NOTREACHED();
     81     return;
     82   }
     83 
     84   DCHECK(message_ports_[message_port_id].queued_messages.empty());
     85   Erase(message_port_id);
     86 }
     87 
     88 void MessagePortService::Entangle(int local_message_port_id,
     89                                   int remote_message_port_id) {
     90   if (!message_ports_.count(local_message_port_id) ||
     91       !message_ports_.count(remote_message_port_id)) {
     92     NOTREACHED();
     93     return;
     94   }
     95 
     96   DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id ==
     97       MSG_ROUTING_NONE);
     98   message_ports_[remote_message_port_id].entangled_message_port_id =
     99       local_message_port_id;
    100 }
    101 
    102 void MessagePortService::PostMessage(
    103     int sender_message_port_id,
    104     const string16& message,
    105     const std::vector<int>& sent_message_port_ids) {
    106   if (!message_ports_.count(sender_message_port_id)) {
    107     NOTREACHED();
    108     return;
    109   }
    110 
    111   int entangled_message_port_id =
    112       message_ports_[sender_message_port_id].entangled_message_port_id;
    113   if (entangled_message_port_id == MSG_ROUTING_NONE)
    114     return;  // Process could have crashed.
    115 
    116   if (!message_ports_.count(entangled_message_port_id)) {
    117     NOTREACHED();
    118     return;
    119   }
    120 
    121   PostMessageTo(entangled_message_port_id, message, sent_message_port_ids);
    122 }
    123 
    124 void MessagePortService::PostMessageTo(
    125     int message_port_id,
    126     const string16& message,
    127     const std::vector<int>& sent_message_port_ids) {
    128   if (!message_ports_.count(message_port_id)) {
    129     NOTREACHED();
    130     return;
    131   }
    132   for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
    133     if (!message_ports_.count(sent_message_port_ids[i])) {
    134       NOTREACHED();
    135       return;
    136     }
    137   }
    138 
    139   MessagePort& entangled_port = message_ports_[message_port_id];
    140 
    141   std::vector<MessagePort*> sent_ports(sent_message_port_ids.size());
    142   for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
    143     sent_ports[i] = &message_ports_[sent_message_port_ids[i]];
    144     sent_ports[i]->queue_messages = true;
    145   }
    146 
    147   if (entangled_port.queue_messages) {
    148     entangled_port.queued_messages.push_back(
    149         std::make_pair(message, sent_message_port_ids));
    150     return;
    151   }
    152 
    153   if (!entangled_port.filter) {
    154     NOTREACHED();
    155     return;
    156   }
    157 
    158   // If a message port was sent around, the new location will need a routing
    159   // id.  Instead of having the created port send us a sync message to get it,
    160   // send along with the message.
    161   std::vector<int> new_routing_ids(sent_message_port_ids.size());
    162   for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
    163     new_routing_ids[i] = entangled_port.filter->GetNextRoutingID();
    164     sent_ports[i]->filter = entangled_port.filter;
    165 
    166     // Update the entry for the sent port as it can be in a different process.
    167     sent_ports[i]->route_id = new_routing_ids[i];
    168   }
    169 
    170   // Now send the message to the entangled port.
    171   entangled_port.filter->Send(new WorkerProcessMsg_Message(
    172       entangled_port.route_id, message, sent_message_port_ids,
    173       new_routing_ids));
    174 }
    175 
    176 void MessagePortService::QueueMessages(int message_port_id) {
    177   if (!message_ports_.count(message_port_id)) {
    178     NOTREACHED();
    179     return;
    180   }
    181 
    182   MessagePort& port = message_ports_[message_port_id];
    183   if (port.filter) {
    184     port.filter->Send(new WorkerProcessMsg_MessagesQueued(port.route_id));
    185     port.queue_messages = true;
    186     port.filter = NULL;
    187   }
    188 }
    189 
    190 void MessagePortService::SendQueuedMessages(
    191     int message_port_id,
    192     const QueuedMessages& queued_messages) {
    193   if (!message_ports_.count(message_port_id)) {
    194     NOTREACHED();
    195     return;
    196   }
    197 
    198   // Send the queued messages to the port again.  This time they'll reach the
    199   // new location.
    200   MessagePort& port = message_ports_[message_port_id];
    201   port.queue_messages = false;
    202   port.queued_messages.insert(port.queued_messages.begin(),
    203                               queued_messages.begin(),
    204                               queued_messages.end());
    205   SendQueuedMessagesIfPossible(message_port_id);
    206 }
    207 
    208 void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) {
    209   if (!message_ports_.count(message_port_id)) {
    210     NOTREACHED();
    211     return;
    212   }
    213 
    214   MessagePort& port = message_ports_[message_port_id];
    215   if (port.queue_messages || !port.filter)
    216     return;
    217 
    218   for (QueuedMessages::iterator iter = port.queued_messages.begin();
    219        iter != port.queued_messages.end(); ++iter) {
    220     PostMessageTo(message_port_id, iter->first, iter->second);
    221   }
    222   port.queued_messages.clear();
    223 }
    224 
    225 void MessagePortService::Erase(int message_port_id) {
    226   MessagePorts::iterator erase_item = message_ports_.find(message_port_id);
    227   DCHECK(erase_item != message_ports_.end());
    228 
    229   int entangled_id = erase_item->second.entangled_message_port_id;
    230   if (entangled_id != MSG_ROUTING_NONE) {
    231     // Do the disentanglement (and be paranoid about the other side existing
    232     // just in case something unusual happened during entanglement).
    233     if (message_ports_.count(entangled_id)) {
    234       message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE;
    235     }
    236   }
    237   message_ports_.erase(erase_item);
    238 }
    239 
    240 }  // namespace content
    241