Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/channel.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/bind.h"
     10 #include "base/compiler_specific.h"
     11 #include "base/logging.h"
     12 #include "base/macros.h"
     13 #include "base/strings/stringprintf.h"
     14 #include "mojo/embedder/platform_handle_vector.h"
     15 #include "mojo/system/message_pipe_endpoint.h"
     16 #include "mojo/system/transport_data.h"
     17 
     18 namespace mojo {
     19 namespace system {
     20 
     21 static_assert(Channel::kBootstrapEndpointId !=
     22                   MessageInTransit::kInvalidEndpointId,
     23               "kBootstrapEndpointId is invalid");
     24 
     25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
     26     Channel::kBootstrapEndpointId;
     27 
     28 Channel::Channel(embedder::PlatformSupport* platform_support)
     29     : platform_support_(platform_support),
     30       is_running_(false),
     31       is_shutting_down_(false),
     32       next_local_id_(kBootstrapEndpointId) {
     33 }
     34 
     35 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
     36   DCHECK(creation_thread_checker_.CalledOnValidThread());
     37   DCHECK(raw_channel);
     38 
     39   // No need to take |lock_|, since this must be called before this object
     40   // becomes thread-safe.
     41   DCHECK(!is_running_);
     42   raw_channel_ = raw_channel.Pass();
     43 
     44   if (!raw_channel_->Init(this)) {
     45     raw_channel_.reset();
     46     return false;
     47   }
     48 
     49   is_running_ = true;
     50   return true;
     51 }
     52 
     53 void Channel::Shutdown() {
     54   DCHECK(creation_thread_checker_.CalledOnValidThread());
     55 
     56   IdToEndpointMap to_destroy;
     57   {
     58     base::AutoLock locker(lock_);
     59     if (!is_running_)
     60       return;
     61 
     62     // Note: Don't reset |raw_channel_|, in case we're being called from within
     63     // |OnReadMessage()| or |OnError()|.
     64     raw_channel_->Shutdown();
     65     is_running_ = false;
     66 
     67     // We need to deal with it outside the lock.
     68     std::swap(to_destroy, local_id_to_endpoint_map_);
     69   }
     70 
     71   size_t num_live = 0;
     72   size_t num_zombies = 0;
     73   for (IdToEndpointMap::iterator it = to_destroy.begin();
     74        it != to_destroy.end();
     75        ++it) {
     76     if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) {
     77       it->second->message_pipe_->OnRemove(it->second->port_);
     78       num_live++;
     79     } else {
     80       DCHECK(!it->second->message_pipe_.get());
     81       num_zombies++;
     82     }
     83     it->second->DetachFromChannel();
     84   }
     85   DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
     86                                        << " live endpoints and " << num_zombies
     87                                        << " zombies";
     88 }
     89 
     90 void Channel::WillShutdownSoon() {
     91   base::AutoLock locker(lock_);
     92   is_shutting_down_ = true;
     93 }
     94 
     95 // Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
     96 // keeps the endpoint alive even after the lock is released. Otherwise, there's
     97 // the temptation to simply pass the result of |new ChannelEndpoint(...)|
     98 // directly to this function, which wouldn't be sufficient for safety.
     99 MessageInTransit::EndpointId Channel::AttachEndpoint(
    100     scoped_refptr<ChannelEndpoint> endpoint) {
    101   DCHECK(endpoint.get());
    102 
    103   MessageInTransit::EndpointId local_id;
    104   {
    105     base::AutoLock locker(lock_);
    106 
    107     DLOG_IF(WARNING, is_shutting_down_)
    108         << "AttachEndpoint() while shutting down";
    109 
    110     while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
    111            local_id_to_endpoint_map_.find(next_local_id_) !=
    112                local_id_to_endpoint_map_.end())
    113       next_local_id_++;
    114 
    115     local_id = next_local_id_;
    116     next_local_id_++;
    117     local_id_to_endpoint_map_[local_id] = endpoint;
    118   }
    119 
    120   endpoint->AttachToChannel(this, local_id);
    121   return local_id;
    122 }
    123 
    124 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
    125                                      MessageInTransit::EndpointId remote_id) {
    126   scoped_refptr<ChannelEndpoint> endpoint;
    127   ChannelEndpoint::State state;
    128   {
    129     base::AutoLock locker(lock_);
    130 
    131     DLOG_IF(WARNING, is_shutting_down_)
    132         << "RunMessagePipeEndpoint() while shutting down";
    133 
    134     IdToEndpointMap::const_iterator it =
    135         local_id_to_endpoint_map_.find(local_id);
    136     if (it == local_id_to_endpoint_map_.end())
    137       return false;
    138     endpoint = it->second;
    139     state = it->second->state_;
    140   }
    141 
    142   // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
    143   // and ignore it.
    144   if (state != ChannelEndpoint::STATE_NORMAL) {
    145     DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
    146                 "(local ID " << local_id << ", remote ID " << remote_id << ")";
    147     return true;
    148   }
    149 
    150   // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
    151   // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
    152   endpoint->Run(remote_id);
    153   return true;
    154 }
    155 
    156 void Channel::RunRemoteMessagePipeEndpoint(
    157     MessageInTransit::EndpointId local_id,
    158     MessageInTransit::EndpointId remote_id) {
    159 #if DCHECK_IS_ON
    160   {
    161     base::AutoLock locker(lock_);
    162     DCHECK(local_id_to_endpoint_map_.find(local_id) !=
    163            local_id_to_endpoint_map_.end());
    164   }
    165 #endif
    166 
    167   if (!SendControlMessage(
    168           MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
    169           local_id,
    170           remote_id)) {
    171     HandleLocalError(base::StringPrintf(
    172         "Failed to send message to run remote message pipe endpoint (local ID "
    173         "%u, remote ID %u)",
    174         static_cast<unsigned>(local_id),
    175         static_cast<unsigned>(remote_id)));
    176   }
    177 }
    178 
    179 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
    180   base::AutoLock locker(lock_);
    181   if (!is_running_) {
    182     // TODO(vtl): I think this is probably not an error condition, but I should
    183     // think about it (and the shutdown sequence) more carefully.
    184     LOG(WARNING) << "WriteMessage() after shutdown";
    185     return false;
    186   }
    187 
    188   DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
    189   return raw_channel_->WriteMessage(message.Pass());
    190 }
    191 
    192 bool Channel::IsWriteBufferEmpty() {
    193   base::AutoLock locker(lock_);
    194   if (!is_running_)
    195     return true;
    196   return raw_channel_->IsWriteBufferEmpty();
    197 }
    198 
    199 void Channel::DetachMessagePipeEndpoint(
    200     MessageInTransit::EndpointId local_id,
    201     MessageInTransit::EndpointId remote_id) {
    202   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
    203 
    204   // If this is non-null after the locked block, the endpoint should be detached
    205   // (and no remove message sent).
    206   scoped_refptr<ChannelEndpoint> endpoint_to_detach;
    207   {
    208     base::AutoLock locker_(lock_);
    209     if (!is_running_)
    210       return;
    211 
    212     IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
    213     DCHECK(it != local_id_to_endpoint_map_.end());
    214 
    215     switch (it->second->state_) {
    216       case ChannelEndpoint::STATE_NORMAL:
    217         it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK;
    218         it->second->message_pipe_ = nullptr;
    219         if (remote_id == MessageInTransit::kInvalidEndpointId)
    220           return;
    221         // We have to send a remove message (outside the lock).
    222         break;
    223       case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
    224         endpoint_to_detach = it->second;
    225         local_id_to_endpoint_map_.erase(it);
    226         // We have to detach (outside the lock).
    227         break;
    228       case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
    229         NOTREACHED();
    230         return;
    231     }
    232   }
    233   if (endpoint_to_detach.get()) {
    234     endpoint_to_detach->DetachFromChannel();
    235     return;
    236   }
    237 
    238   if (!SendControlMessage(
    239           MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
    240           local_id,
    241           remote_id)) {
    242     HandleLocalError(base::StringPrintf(
    243         "Failed to send message to remove remote message pipe endpoint (local "
    244         "ID %u, remote ID %u)",
    245         static_cast<unsigned>(local_id),
    246         static_cast<unsigned>(remote_id)));
    247   }
    248 }
    249 
    250 size_t Channel::GetSerializedPlatformHandleSize() const {
    251   return raw_channel_->GetSerializedPlatformHandleSize();
    252 }
    253 
    254 Channel::~Channel() {
    255   // The channel should have been shut down first.
    256   DCHECK(!is_running_);
    257 }
    258 
    259 void Channel::OnReadMessage(
    260     const MessageInTransit::View& message_view,
    261     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
    262   DCHECK(creation_thread_checker_.CalledOnValidThread());
    263 
    264   switch (message_view.type()) {
    265     case MessageInTransit::kTypeMessagePipeEndpoint:
    266     case MessageInTransit::kTypeMessagePipe:
    267       OnReadMessageForDownstream(message_view, platform_handles.Pass());
    268       break;
    269     case MessageInTransit::kTypeChannel:
    270       OnReadMessageForChannel(message_view, platform_handles.Pass());
    271       break;
    272     default:
    273       HandleRemoteError(
    274           base::StringPrintf("Received message of invalid type %u",
    275                              static_cast<unsigned>(message_view.type())));
    276       break;
    277   }
    278 }
    279 
    280 void Channel::OnError(Error error) {
    281   DCHECK(creation_thread_checker_.CalledOnValidThread());
    282 
    283   switch (error) {
    284     case ERROR_READ_SHUTDOWN:
    285       // The other side was cleanly closed, so this isn't actually an error.
    286       DVLOG(1) << "RawChannel read error (shutdown)";
    287       break;
    288     case ERROR_READ_BROKEN: {
    289       base::AutoLock locker(lock_);
    290       LOG_IF(ERROR, !is_shutting_down_)
    291           << "RawChannel read error (connection broken)";
    292       break;
    293     }
    294     case ERROR_READ_BAD_MESSAGE:
    295       // Receiving a bad message means either a bug, data corruption, or
    296       // malicious attack (probably due to some other bug).
    297       LOG(ERROR) << "RawChannel read error (received bad message)";
    298       break;
    299     case ERROR_READ_UNKNOWN:
    300       LOG(ERROR) << "RawChannel read error (unknown)";
    301       break;
    302     case ERROR_WRITE:
    303       // Write errors are slightly notable: they probably shouldn't happen under
    304       // normal operation (but maybe the other side crashed).
    305       LOG(WARNING) << "RawChannel write error";
    306       break;
    307   }
    308   Shutdown();
    309 }
    310 
    311 void Channel::OnReadMessageForDownstream(
    312     const MessageInTransit::View& message_view,
    313     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
    314   DCHECK(creation_thread_checker_.CalledOnValidThread());
    315   DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
    316          message_view.type() == MessageInTransit::kTypeMessagePipe);
    317 
    318   MessageInTransit::EndpointId local_id = message_view.destination_id();
    319   if (local_id == MessageInTransit::kInvalidEndpointId) {
    320     HandleRemoteError("Received message with no destination ID");
    321     return;
    322   }
    323 
    324   ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL;
    325   scoped_refptr<MessagePipe> message_pipe;
    326   unsigned port = ~0u;
    327   bool nonexistent_local_id_error = false;
    328   {
    329     base::AutoLock locker(lock_);
    330 
    331     // Since we own |raw_channel_|, and this method and |Shutdown()| should only
    332     // be called from the creation thread, |raw_channel_| should never be null
    333     // here.
    334     DCHECK(is_running_);
    335 
    336     IdToEndpointMap::const_iterator it =
    337         local_id_to_endpoint_map_.find(local_id);
    338     if (it == local_id_to_endpoint_map_.end()) {
    339       nonexistent_local_id_error = true;
    340     } else {
    341       state = it->second->state_;
    342       message_pipe = it->second->message_pipe_;
    343       port = it->second->port_;
    344     }
    345   }
    346   if (nonexistent_local_id_error) {
    347     HandleRemoteError(base::StringPrintf(
    348         "Received a message for nonexistent local destination ID %u",
    349         static_cast<unsigned>(local_id)));
    350     // This is strongly indicative of some problem. However, it's not a fatal
    351     // error, since it may indicate a buggy (or hostile) remote process. Don't
    352     // die even for Debug builds, since handling this properly needs to be
    353     // tested (TODO(vtl)).
    354     DLOG(ERROR) << "This should not happen under normal operation.";
    355     return;
    356   }
    357 
    358   // Ignore messages for zombie endpoints (not an error).
    359   if (state != ChannelEndpoint::STATE_NORMAL) {
    360     DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
    361              << local_id << ", remote ID = " << message_view.source_id() << ")";
    362     return;
    363   }
    364 
    365   // We need to duplicate the message (data), because |EnqueueMessage()| will
    366   // take ownership of it.
    367   scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
    368   if (message_view.transport_data_buffer_size() > 0) {
    369     DCHECK(message_view.transport_data_buffer());
    370     message->SetDispatchers(TransportData::DeserializeDispatchers(
    371         message_view.transport_data_buffer(),
    372         message_view.transport_data_buffer_size(),
    373         platform_handles.Pass(),
    374         this));
    375   }
    376   MojoResult result = message_pipe->EnqueueMessage(
    377       MessagePipe::GetPeerPort(port), message.Pass());
    378   if (result != MOJO_RESULT_OK) {
    379     // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
    380     // has been closed (in an unavoidable race). This might also be a "remote"
    381     // error, e.g., if the remote side is sending invalid control messages (to
    382     // the message pipe).
    383     HandleLocalError(base::StringPrintf(
    384         "Failed to enqueue message to local ID %u (result %d)",
    385         static_cast<unsigned>(local_id),
    386         static_cast<int>(result)));
    387     return;
    388   }
    389 }
    390 
    391 void Channel::OnReadMessageForChannel(
    392     const MessageInTransit::View& message_view,
    393     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
    394   DCHECK(creation_thread_checker_.CalledOnValidThread());
    395   DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
    396 
    397   // Currently, no channel messages take platform handles.
    398   if (platform_handles) {
    399     HandleRemoteError(
    400         "Received invalid channel message (has platform handles)");
    401     NOTREACHED();
    402     return;
    403   }
    404 
    405   switch (message_view.subtype()) {
    406     case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
    407       DVLOG(2) << "Handling channel message to run message pipe (local ID "
    408                << message_view.destination_id() << ", remote ID "
    409                << message_view.source_id() << ")";
    410       if (!RunMessagePipeEndpoint(message_view.destination_id(),
    411                                   message_view.source_id())) {
    412         HandleRemoteError(
    413             "Received invalid channel message to run message pipe");
    414       }
    415       break;
    416     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
    417       DVLOG(2) << "Handling channel message to remove message pipe (local ID "
    418                << message_view.destination_id() << ", remote ID "
    419                << message_view.source_id() << ")";
    420       if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
    421                                      message_view.source_id())) {
    422         HandleRemoteError(
    423             "Received invalid channel message to remove message pipe");
    424       }
    425       break;
    426     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
    427       DVLOG(2) << "Handling channel message to ack remove message pipe (local "
    428                   "ID " << message_view.destination_id() << ", remote ID "
    429                << message_view.source_id() << ")";
    430       if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
    431                                      message_view.source_id())) {
    432         HandleRemoteError(
    433             "Received invalid channel message to ack remove message pipe");
    434       }
    435       break;
    436     default:
    437       HandleRemoteError("Received invalid channel message");
    438       NOTREACHED();
    439       break;
    440   }
    441 }
    442 
    443 bool Channel::RemoveMessagePipeEndpoint(
    444     MessageInTransit::EndpointId local_id,
    445     MessageInTransit::EndpointId remote_id) {
    446   DCHECK(creation_thread_checker_.CalledOnValidThread());
    447 
    448   // If this is non-null after the locked block, the endpoint should be detached
    449   // (and no remove ack message sent).
    450   scoped_refptr<ChannelEndpoint> endpoint_to_detach;
    451   scoped_refptr<MessagePipe> message_pipe;
    452   unsigned port = ~0u;
    453   {
    454     base::AutoLock locker(lock_);
    455 
    456     IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
    457     if (it == local_id_to_endpoint_map_.end()) {
    458       DVLOG(2) << "Remove message pipe error: not found";
    459       return false;
    460     }
    461 
    462     switch (it->second->state_) {
    463       case ChannelEndpoint::STATE_NORMAL:
    464         it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH;
    465         message_pipe = it->second->message_pipe_;
    466         port = it->second->port_;
    467         it->second->message_pipe_ = nullptr;
    468         // We have to send a remove ack message (outside the lock).
    469         break;
    470       case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
    471         DVLOG(2) << "Remove message pipe error: wrong state";
    472         return false;
    473       case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
    474         endpoint_to_detach = it->second;
    475         local_id_to_endpoint_map_.erase(it);
    476         // We have to detach (outside the lock).
    477         break;
    478     }
    479   }
    480   if (endpoint_to_detach.get()) {
    481     endpoint_to_detach->DetachFromChannel();
    482     return true;
    483   }
    484 
    485   if (!SendControlMessage(
    486           MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
    487           local_id,
    488           remote_id)) {
    489     HandleLocalError(base::StringPrintf(
    490         "Failed to send message to remove remote message pipe endpoint ack "
    491         "(local ID %u, remote ID %u)",
    492         static_cast<unsigned>(local_id),
    493         static_cast<unsigned>(remote_id)));
    494   }
    495 
    496   message_pipe->OnRemove(port);
    497 
    498   return true;
    499 }
    500 
    501 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
    502                                  MessageInTransit::EndpointId local_id,
    503                                  MessageInTransit::EndpointId remote_id) {
    504   DVLOG(2) << "Sending channel control message: subtype " << subtype
    505            << ", local ID " << local_id << ", remote ID " << remote_id;
    506   scoped_ptr<MessageInTransit> message(new MessageInTransit(
    507       MessageInTransit::kTypeChannel, subtype, 0, nullptr));
    508   message->set_source_id(local_id);
    509   message->set_destination_id(remote_id);
    510   return WriteMessage(message.Pass());
    511 }
    512 
    513 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
    514   // TODO(vtl): Is this how we really want to handle this? Probably we want to
    515   // terminate the connection, since it's spewing invalid stuff.
    516   LOG(WARNING) << error_message;
    517 }
    518 
    519 void Channel::HandleLocalError(const base::StringPiece& error_message) {
    520   // TODO(vtl): Is this how we really want to handle this?
    521   // Sometimes we'll want to propagate the error back to the message pipe
    522   // (endpoint), and notify it that the remote is (effectively) closed.
    523   // Sometimes we'll want to kill the channel (and notify all the endpoints that
    524   // their remotes are dead.
    525   LOG(WARNING) << error_message;
    526 }
    527 
    528 }  // namespace system
    529 }  // namespace mojo
    530