Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/channel.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/basictypes.h"
     10 #include "base/bind.h"
     11 #include "base/compiler_specific.h"
     12 #include "base/logging.h"
     13 #include "base/strings/stringprintf.h"
     14 #include "mojo/embedder/platform_handle_vector.h"
     15 #include "mojo/system/message_pipe_endpoint.h"
     16 #include "mojo/system/transport_data.h"
     17 
     18 namespace mojo {
     19 namespace system {
     20 
     21 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
     22                    MessageInTransit::kInvalidEndpointId,
     23                kBootstrapEndpointId_is_invalid);
     24 
     25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
     26     Channel::kBootstrapEndpointId;
     27 
     28 Channel::EndpointInfo::EndpointInfo()
     29     : state(STATE_NORMAL),
     30       port() {
     31 }
     32 
     33 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
     34                                     unsigned port)
     35     : state(STATE_NORMAL),
     36       message_pipe(message_pipe),
     37       port(port) {
     38 }
     39 
     40 Channel::EndpointInfo::~EndpointInfo() {
     41 }
     42 
     43 Channel::Channel()
     44     : is_running_(false),
     45       next_local_id_(kBootstrapEndpointId) {
     46 }
     47 
     48 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
     49   DCHECK(creation_thread_checker_.CalledOnValidThread());
     50   DCHECK(raw_channel);
     51 
     52   // No need to take |lock_|, since this must be called before this object
     53   // becomes thread-safe.
     54   DCHECK(!is_running_no_lock());
     55   raw_channel_ = raw_channel.Pass();
     56 
     57   if (!raw_channel_->Init(this)) {
     58     raw_channel_.reset();
     59     return false;
     60   }
     61 
     62   is_running_ = true;
     63   return true;
     64 }
     65 
     66 void Channel::Shutdown() {
     67   DCHECK(creation_thread_checker_.CalledOnValidThread());
     68 
     69   IdToEndpointInfoMap to_destroy;
     70   {
     71     base::AutoLock locker(lock_);
     72     if (!is_running_no_lock())
     73       return;
     74 
     75     // Note: Don't reset |raw_channel_|, in case we're being called from within
     76     // |OnReadMessage()| or |OnFatalError()|.
     77     raw_channel_->Shutdown();
     78     is_running_ = false;
     79 
     80     // We need to deal with it outside the lock.
     81     std::swap(to_destroy, local_id_to_endpoint_info_map_);
     82   }
     83 
     84   size_t num_live = 0;
     85   size_t num_zombies = 0;
     86   for (IdToEndpointInfoMap::iterator it = to_destroy.begin();
     87        it != to_destroy.end();
     88        ++it) {
     89     if (it->second.state == EndpointInfo::STATE_NORMAL) {
     90       it->second.message_pipe->OnRemove(it->second.port);
     91       num_live++;
     92     } else {
     93       DCHECK(!it->second.message_pipe);
     94       num_zombies++;
     95     }
     96   }
     97   DVLOG_IF(2, num_live || num_zombies)
     98       << "Shut down Channel with " << num_live << " live endpoints and "
     99       << num_zombies << " zombies";
    100 }
    101 
    102 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
    103     scoped_refptr<MessagePipe> message_pipe,
    104     unsigned port) {
    105   DCHECK(message_pipe);
    106   DCHECK(port == 0 || port == 1);
    107 
    108   MessageInTransit::EndpointId local_id;
    109   {
    110     base::AutoLock locker(lock_);
    111 
    112     while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
    113            local_id_to_endpoint_info_map_.find(next_local_id_) !=
    114                local_id_to_endpoint_info_map_.end())
    115       next_local_id_++;
    116 
    117     local_id = next_local_id_;
    118     next_local_id_++;
    119 
    120     // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
    121     // some expensive reference count increment/decrements.) Once this is done,
    122     // we should be able to delete |EndpointInfo|'s default constructor.
    123     local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
    124   }
    125 
    126   // This might fail if that port got an |OnPeerClose()| before attaching.
    127   if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id))
    128     return local_id;
    129 
    130   // Note: If it failed, quite possibly the endpoint info was removed from that
    131   // map (there's a race between us adding it to the map above and calling
    132   // |Attach()|). And even if an entry exists for |local_id|, we need to check
    133   // that it's the one we added (and not some other one that was added since).
    134   {
    135     base::AutoLock locker(lock_);
    136     IdToEndpointInfoMap::iterator it =
    137         local_id_to_endpoint_info_map_.find(local_id);
    138     if (it != local_id_to_endpoint_info_map_.end() &&
    139         it->second.message_pipe.get() == message_pipe.get() &&
    140         it->second.port == port) {
    141       DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL);
    142       // TODO(vtl): FIXME -- This is wrong. We need to specify (to
    143       // |AttachMessagePipeEndpoint()| who's going to be responsible for calling
    144       // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a
    145       // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to
    146       // run, then we'll get messages to an "invalid" local ID (for running, for
    147       // removal).
    148       local_id_to_endpoint_info_map_.erase(it);
    149     }
    150   }
    151   return MessageInTransit::kInvalidEndpointId;
    152 }
    153 
    154 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
    155                                      MessageInTransit::EndpointId remote_id) {
    156   EndpointInfo endpoint_info;
    157   {
    158     base::AutoLock locker(lock_);
    159 
    160     IdToEndpointInfoMap::const_iterator it =
    161         local_id_to_endpoint_info_map_.find(local_id);
    162     if (it == local_id_to_endpoint_info_map_.end())
    163       return false;
    164     endpoint_info = it->second;
    165   }
    166 
    167   // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
    168   // and ignore it.
    169   if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
    170     DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
    171                 "(local ID " << local_id << ", remote ID " << remote_id << ")";
    172     return true;
    173   }
    174 
    175   // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
    176   // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
    177   endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
    178   return true;
    179 }
    180 
    181 void Channel::RunRemoteMessagePipeEndpoint(
    182     MessageInTransit::EndpointId local_id,
    183     MessageInTransit::EndpointId remote_id) {
    184 #if DCHECK_IS_ON
    185   {
    186     base::AutoLock locker(lock_);
    187     DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
    188                local_id_to_endpoint_info_map_.end());
    189   }
    190 #endif
    191 
    192   if (!SendControlMessage(
    193            MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
    194            local_id, remote_id)) {
    195     HandleLocalError(base::StringPrintf(
    196         "Failed to send message to run remote message pipe endpoint (local ID "
    197         "%u, remote ID %u)",
    198         static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
    199   }
    200 }
    201 
    202 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
    203   base::AutoLock locker(lock_);
    204   if (!is_running_no_lock()) {
    205     // TODO(vtl): I think this is probably not an error condition, but I should
    206     // think about it (and the shutdown sequence) more carefully.
    207     LOG(WARNING) << "WriteMessage() after shutdown";
    208     return false;
    209   }
    210 
    211   return raw_channel_->WriteMessage(message.Pass());
    212 }
    213 
    214 bool Channel::IsWriteBufferEmpty() {
    215   base::AutoLock locker(lock_);
    216   if (!is_running_no_lock())
    217     return true;
    218   return raw_channel_->IsWriteBufferEmpty();
    219 }
    220 
    221 void Channel::DetachMessagePipeEndpoint(
    222     MessageInTransit::EndpointId local_id,
    223     MessageInTransit::EndpointId remote_id) {
    224   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
    225 
    226   bool should_send_remove_message = false;
    227   {
    228     base::AutoLock locker_(lock_);
    229     if (!is_running_no_lock())
    230       return;
    231 
    232     IdToEndpointInfoMap::iterator it =
    233         local_id_to_endpoint_info_map_.find(local_id);
    234     DCHECK(it != local_id_to_endpoint_info_map_.end());
    235 
    236     switch (it->second.state) {
    237       case EndpointInfo::STATE_NORMAL:
    238         it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
    239         it->second.message_pipe = NULL;
    240         should_send_remove_message =
    241             (remote_id != MessageInTransit::kInvalidEndpointId);
    242         break;
    243       case EndpointInfo::STATE_WAIT_LOCAL_DETACH:
    244         local_id_to_endpoint_info_map_.erase(it);
    245         break;
    246       case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK:
    247         NOTREACHED();
    248         break;
    249       case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK:
    250         it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
    251         break;
    252     }
    253   }
    254   if (!should_send_remove_message)
    255     return;
    256 
    257   if (!SendControlMessage(
    258            MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
    259            local_id, remote_id)) {
    260     HandleLocalError(base::StringPrintf(
    261         "Failed to send message to remove remote message pipe endpoint (local "
    262         "ID %u, remote ID %u)",
    263         static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
    264   }
    265 }
    266 
    267 size_t Channel::GetSerializedPlatformHandleSize() const {
    268   return raw_channel_->GetSerializedPlatformHandleSize();
    269 }
    270 
    271 Channel::~Channel() {
    272   // The channel should have been shut down first.
    273   DCHECK(!is_running_no_lock());
    274 }
    275 
    276 void Channel::OnReadMessage(
    277     const MessageInTransit::View& message_view,
    278     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
    279   switch (message_view.type()) {
    280     case MessageInTransit::kTypeMessagePipeEndpoint:
    281     case MessageInTransit::kTypeMessagePipe:
    282       OnReadMessageForDownstream(message_view, platform_handles.Pass());
    283       break;
    284     case MessageInTransit::kTypeChannel:
    285       OnReadMessageForChannel(message_view, platform_handles.Pass());
    286       break;
    287     default:
    288       HandleRemoteError(base::StringPrintf(
    289           "Received message of invalid type %u",
    290           static_cast<unsigned>(message_view.type())));
    291       break;
    292   }
    293 }
    294 
    295 void Channel::OnFatalError(FatalError fatal_error) {
    296   switch (fatal_error) {
    297     case FATAL_ERROR_READ:
    298       // Most read errors aren't notable: they just reflect that the other side
    299       // tore down the channel.
    300       DVLOG(1) << "RawChannel fatal error (read)";
    301       break;
    302     case FATAL_ERROR_WRITE:
    303       // Write errors are slightly notable: they probably shouldn't happen under
    304       // normal operation (but maybe the other side crashed).
    305       LOG(WARNING) << "RawChannel fatal error (write)";
    306       break;
    307   }
    308   Shutdown();
    309 }
    310 
    311 void Channel::OnReadMessageForDownstream(
    312     const MessageInTransit::View& message_view,
    313     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
    314   DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
    315          message_view.type() == MessageInTransit::kTypeMessagePipe);
    316 
    317   MessageInTransit::EndpointId local_id = message_view.destination_id();
    318   if (local_id == MessageInTransit::kInvalidEndpointId) {
    319     HandleRemoteError("Received message with no destination ID");
    320     return;
    321   }
    322 
    323   EndpointInfo endpoint_info;
    324   {
    325     base::AutoLock locker(lock_);
    326 
    327     // Since we own |raw_channel_|, and this method and |Shutdown()| should only
    328     // be called from the creation thread, |raw_channel_| should never be null
    329     // here.
    330     DCHECK(is_running_no_lock());
    331 
    332     IdToEndpointInfoMap::const_iterator it =
    333         local_id_to_endpoint_info_map_.find(local_id);
    334     if (it == local_id_to_endpoint_info_map_.end()) {
    335       HandleRemoteError(base::StringPrintf(
    336           "Received a message for nonexistent local destination ID %u",
    337           static_cast<unsigned>(local_id)));
    338       // This is strongly indicative of some problem. However, it's not a fatal
    339       // error, since it may indicate a bug (or hostile) remote process. Don't
    340       // die even for Debug builds, since handling this properly needs to be
    341       // tested (TODO(vtl)).
    342       DLOG(ERROR) << "This should not happen under normal operation.";
    343       return;
    344     }
    345     endpoint_info = it->second;
    346   }
    347 
    348   // Ignore messages for zombie endpoints (not an error).
    349   if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
    350     DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
    351              << local_id << ", remote ID = " << message_view.source_id() << ")";
    352     return;
    353   }
    354 
    355   // We need to duplicate the message (data), because |EnqueueMessage()| will
    356   // take ownership of it.
    357   scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
    358   if (message_view.transport_data_buffer_size() > 0) {
    359     DCHECK(message_view.transport_data_buffer());
    360     message->SetDispatchers(
    361         TransportData::DeserializeDispatchers(
    362             message_view.transport_data_buffer(),
    363             message_view.transport_data_buffer_size(),
    364             platform_handles.Pass(),
    365             this));
    366   }
    367   MojoResult result = endpoint_info.message_pipe->EnqueueMessage(
    368       MessagePipe::GetPeerPort(endpoint_info.port), message.Pass());
    369   if (result != MOJO_RESULT_OK) {
    370     // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
    371     // has been closed (in an unavoidable race). This might also be a "remote"
    372     // error, e.g., if the remote side is sending invalid control messages (to
    373     // the message pipe).
    374     HandleLocalError(base::StringPrintf(
    375         "Failed to enqueue message to local ID %u (result %d)",
    376         static_cast<unsigned>(local_id), static_cast<int>(result)));
    377     return;
    378   }
    379 }
    380 
    381 void Channel::OnReadMessageForChannel(
    382     const MessageInTransit::View& message_view,
    383     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
    384   DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
    385 
    386   // Currently, no channel messages take platform handles.
    387   if (platform_handles) {
    388     HandleRemoteError(
    389         "Received invalid channel message (has platform handles)");
    390     NOTREACHED();
    391     return;
    392   }
    393 
    394   switch (message_view.subtype()) {
    395     case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
    396       DVLOG(2) << "Handling channel message to run message pipe (local ID "
    397                << message_view.destination_id() << ", remote ID "
    398                << message_view.source_id() << ")";
    399       if (!RunMessagePipeEndpoint(message_view.destination_id(),
    400                                   message_view.source_id())) {
    401         HandleRemoteError(
    402             "Received invalid channel message to run message pipe");
    403       }
    404       break;
    405     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
    406       DVLOG(2) << "Handling channel message to remove message pipe (local ID "
    407                << message_view.destination_id() << ", remote ID "
    408                << message_view.source_id() << ")";
    409       if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
    410                                      message_view.source_id())) {
    411         HandleRemoteError(
    412             "Received invalid channel message to remove message pipe");
    413       }
    414       break;
    415     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
    416       DVLOG(2) << "Handling channel message to ack remove message pipe (local "
    417                   "ID "
    418                << message_view.destination_id() << ", remote ID "
    419                << message_view.source_id() << ")";
    420       if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
    421                                      message_view.source_id())) {
    422         HandleRemoteError(
    423             "Received invalid channel message to ack remove message pipe");
    424       }
    425       break;
    426     default:
    427       HandleRemoteError("Received invalid channel message");
    428       NOTREACHED();
    429       break;
    430   }
    431 }
    432 
    433 bool Channel::RemoveMessagePipeEndpoint(
    434     MessageInTransit::EndpointId local_id,
    435     MessageInTransit::EndpointId remote_id) {
    436   EndpointInfo endpoint_info;
    437   {
    438     base::AutoLock locker(lock_);
    439 
    440     IdToEndpointInfoMap::iterator it =
    441         local_id_to_endpoint_info_map_.find(local_id);
    442     if (it == local_id_to_endpoint_info_map_.end()) {
    443       DVLOG(2) << "Remove message pipe error: not found";
    444       return false;
    445     }
    446 
    447     // If it's waiting for the remove ack, just do it and return.
    448     if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) {
    449       local_id_to_endpoint_info_map_.erase(it);
    450       return true;
    451     }
    452 
    453     if (it->second.state != EndpointInfo::STATE_NORMAL) {
    454       DVLOG(2) << "Remove message pipe error: wrong state";
    455       return false;
    456     }
    457 
    458     it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH;
    459     endpoint_info = it->second;
    460     it->second.message_pipe = NULL;
    461   }
    462 
    463   if (!SendControlMessage(
    464            MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
    465            local_id, remote_id)) {
    466     HandleLocalError(base::StringPrintf(
    467         "Failed to send message to remove remote message pipe endpoint ack "
    468         "(local ID %u, remote ID %u)",
    469         static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
    470   }
    471 
    472   endpoint_info.message_pipe->OnRemove(endpoint_info.port);
    473 
    474   return true;
    475 }
    476 
    477 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
    478                                  MessageInTransit::EndpointId local_id,
    479                                  MessageInTransit::EndpointId remote_id) {
    480   DVLOG(2) << "Sending channel control message: subtype " << subtype
    481            << ", local ID " << local_id << ", remote ID " << remote_id;
    482   scoped_ptr<MessageInTransit> message(new MessageInTransit(
    483       MessageInTransit::kTypeChannel, subtype, 0, NULL));
    484   message->set_source_id(local_id);
    485   message->set_destination_id(remote_id);
    486   return WriteMessage(message.Pass());
    487 }
    488 
    489 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
    490   // TODO(vtl): Is this how we really want to handle this? Probably we want to
    491   // terminate the connection, since it's spewing invalid stuff.
    492   LOG(WARNING) << error_message;
    493 }
    494 
    495 void Channel::HandleLocalError(const base::StringPiece& error_message) {
    496   // TODO(vtl): Is this how we really want to handle this?
    497   // Sometimes we'll want to propagate the error back to the message pipe
    498   // (endpoint), and notify it that the remote is (effectively) closed.
    499   // Sometimes we'll want to kill the channel (and notify all the endpoints that
    500   // their remotes are dead.
    501   LOG(WARNING) << error_message;
    502 }
    503 
    504 }  // namespace system
    505 }  // namespace mojo
    506