Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/channel.h"
      6 
      7 #include "base/basictypes.h"
      8 #include "base/bind.h"
      9 #include "base/compiler_specific.h"
     10 #include "base/logging.h"
     11 #include "base/message_loop/message_loop.h"
     12 #include "base/strings/stringprintf.h"
     13 
     14 namespace mojo {
     15 namespace system {
     16 
     17 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
     18                    MessageInTransit::kInvalidEndpointId,
     19                kBootstrapEndpointId_is_invalid);
     20 
     21 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
     22     Channel::kBootstrapEndpointId;
     23 
     24 Channel::EndpointInfo::EndpointInfo() {
     25 }
     26 
     27 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
     28                                     unsigned port)
     29     : message_pipe(message_pipe),
     30       port(port) {
     31 }
     32 
     33 Channel::EndpointInfo::~EndpointInfo() {
     34 }
     35 
     36 Channel::Channel()
     37     : next_local_id_(kBootstrapEndpointId) {
     38 }
     39 
     40 bool Channel::Init(const PlatformChannelHandle& handle) {
     41   DCHECK(creation_thread_checker_.CalledOnValidThread());
     42 
     43   // No need to take |lock_|, since this must be called before this object
     44   // becomes thread-safe.
     45   DCHECK(!raw_channel_.get());
     46 
     47   raw_channel_.reset(
     48       RawChannel::Create(handle, this, base::MessageLoop::current()));
     49   if (!raw_channel_->Init()) {
     50     raw_channel_.reset();
     51     return false;
     52   }
     53 
     54   return true;
     55 }
     56 
     57 void Channel::Shutdown() {
     58   DCHECK(creation_thread_checker_.CalledOnValidThread());
     59 
     60   base::AutoLock locker(lock_);
     61   DCHECK(raw_channel_.get());
     62   raw_channel_->Shutdown();
     63   raw_channel_.reset();
     64 
     65   // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that
     66   // it's empty?
     67 }
     68 
     69 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
     70     scoped_refptr<MessagePipe> message_pipe, unsigned port) {
     71   MessageInTransit::EndpointId local_id;
     72   {
     73     base::AutoLock locker(lock_);
     74 
     75     while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
     76            local_id_to_endpoint_info_map_.find(next_local_id_) !=
     77                local_id_to_endpoint_info_map_.end())
     78       next_local_id_++;
     79 
     80     local_id = next_local_id_;
     81     next_local_id_++;
     82 
     83     // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
     84     // some expensive reference count increment/decrements.) Once this is done,
     85     // we should be able to delete |EndpointInfo|'s default constructor.
     86     local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
     87   }
     88 
     89   message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
     90   return local_id;
     91 }
     92 
     93 void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
     94                                      MessageInTransit::EndpointId remote_id) {
     95   EndpointInfo endpoint_info;
     96   {
     97     base::AutoLock locker(lock_);
     98 
     99     IdToEndpointInfoMap::const_iterator it =
    100         local_id_to_endpoint_info_map_.find(local_id);
    101     CHECK(it != local_id_to_endpoint_info_map_.end());
    102     endpoint_info = it->second;
    103   }
    104 
    105   endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
    106 }
    107 
    108 bool Channel::WriteMessage(MessageInTransit* message) {
    109   base::AutoLock locker(lock_);
    110   if (!raw_channel_.get()) {
    111     // TODO(vtl): I think this is probably not an error condition, but I should
    112     // think about it (and the shutdown sequence) more carefully.
    113     LOG(INFO) << "WriteMessage() after shutdown";
    114     return false;
    115   }
    116 
    117   return raw_channel_->WriteMessage(message);
    118 }
    119 
    120 void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
    121   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
    122 
    123   base::AutoLock locker_(lock_);
    124   local_id_to_endpoint_info_map_.erase(local_id);
    125 }
    126 
    127 Channel::~Channel() {
    128   // The channel should have been shut down first.
    129   DCHECK(!raw_channel_.get());
    130 }
    131 
    132 void Channel::OnReadMessage(const MessageInTransit& message) {
    133   switch (message.type()) {
    134     case MessageInTransit::kTypeMessagePipeEndpoint:
    135     case MessageInTransit::kTypeMessagePipe:
    136       OnReadMessageForDownstream(message);
    137       break;
    138     case MessageInTransit::TYPE_CHANNEL:
    139       OnReadMessageForChannel(message);
    140       break;
    141     default:
    142       HandleRemoteError(base::StringPrintf(
    143           "Received message of invalid type %u",
    144           static_cast<unsigned>(message.type())));
    145       break;
    146   }
    147 }
    148 
    149 void Channel::OnFatalError(FatalError fatal_error) {
    150   // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead.
    151   NOTIMPLEMENTED();
    152 }
    153 
    154 void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
    155   DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
    156          message.type() == MessageInTransit::kTypeMessagePipe);
    157 
    158   MessageInTransit::EndpointId local_id = message.destination_id();
    159   if (local_id == MessageInTransit::kInvalidEndpointId) {
    160     HandleRemoteError("Received message with no destination ID");
    161     return;
    162   }
    163 
    164   EndpointInfo endpoint_info;
    165   {
    166     base::AutoLock locker(lock_);
    167 
    168     // Since we own |raw_channel_|, and this method and |Shutdown()| should only
    169     // be called from the creation thread, |raw_channel_| should never be null
    170     // here.
    171     DCHECK(raw_channel_.get());
    172 
    173     IdToEndpointInfoMap::const_iterator it =
    174         local_id_to_endpoint_info_map_.find(local_id);
    175     if (it == local_id_to_endpoint_info_map_.end()) {
    176       HandleRemoteError(base::StringPrintf(
    177           "Received a message for nonexistent local destination ID %u",
    178           static_cast<unsigned>(local_id)));
    179       return;
    180     }
    181     endpoint_info = it->second;
    182   }
    183 
    184   // We need to duplicate the message, because |EnqueueMessage()| will take
    185   // ownership of it.
    186   MessageInTransit* own_message = MessageInTransit::Create(
    187       message.type(), message.subtype(), message.data(), message.data_size());
    188   if (endpoint_info.message_pipe->EnqueueMessage(
    189           MessagePipe::GetPeerPort(endpoint_info.port), own_message, NULL) !=
    190               MOJO_RESULT_OK) {
    191     HandleLocalError(base::StringPrintf(
    192         "Failed to enqueue message to local destination ID %u",
    193         static_cast<unsigned>(local_id)));
    194     return;
    195   }
    196 }
    197 
    198 void Channel::OnReadMessageForChannel(const MessageInTransit& message) {
    199   // TODO(vtl): Currently no channel-only messages yet.
    200   HandleRemoteError("Received invalid channel message");
    201   NOTREACHED();
    202 }
    203 
    204 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
    205   // TODO(vtl): Is this how we really want to handle this?
    206   LOG(INFO) << error_message;
    207 }
    208 
    209 void Channel::HandleLocalError(const base::StringPiece& error_message) {
    210   // TODO(vtl): Is this how we really want to handle this?
    211   LOG(FATAL) << error_message;
    212 }
    213 
    214 }  // namespace system
    215 }  // namespace mojo
    216