Home | History | Annotate | Download | only in ipc
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/ipc_channel_mojo.h"
      6 
      7 #include <stddef.h>
      8 #include <stdint.h>
      9 
     10 #include <memory>
     11 #include <utility>
     12 
     13 #include "base/bind.h"
     14 #include "base/bind_helpers.h"
     15 #include "base/command_line.h"
     16 #include "base/lazy_instance.h"
     17 #include "base/macros.h"
     18 #include "base/memory/ptr_util.h"
     19 #include "base/process/process_handle.h"
     20 #include "base/threading/thread_task_runner_handle.h"
     21 #include "build/build_config.h"
     22 #include "ipc/ipc_listener.h"
     23 #include "ipc/ipc_logging.h"
     24 #include "ipc/ipc_message_attachment_set.h"
     25 #include "ipc/ipc_message_macros.h"
     26 #include "ipc/ipc_mojo_bootstrap.h"
     27 #include "ipc/ipc_mojo_handle_attachment.h"
     28 #include "ipc/native_handle_type_converters.h"
     29 #include "mojo/public/cpp/bindings/binding.h"
     30 #include "mojo/public/cpp/system/platform_handle.h"
     31 
     32 namespace IPC {
     33 
     34 namespace {
     35 
     36 class MojoChannelFactory : public ChannelFactory {
     37  public:
     38   MojoChannelFactory(
     39       mojo::ScopedMessagePipeHandle handle,
     40       Channel::Mode mode,
     41       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
     42       const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
     43       : handle_(std::move(handle)),
     44         mode_(mode),
     45         ipc_task_runner_(ipc_task_runner),
     46         proxy_task_runner_(proxy_task_runner) {}
     47 
     48   std::unique_ptr<Channel> BuildChannel(Listener* listener) override {
     49     return ChannelMojo::Create(std::move(handle_), mode_, listener,
     50                                ipc_task_runner_, proxy_task_runner_);
     51   }
     52 
     53   scoped_refptr<base::SingleThreadTaskRunner> GetIPCTaskRunner() override {
     54     return ipc_task_runner_;
     55   }
     56 
     57  private:
     58   mojo::ScopedMessagePipeHandle handle_;
     59   const Channel::Mode mode_;
     60   scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner_;
     61   scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
     62 
     63   DISALLOW_COPY_AND_ASSIGN(MojoChannelFactory);
     64 };
     65 
     66 base::ProcessId GetSelfPID() {
     67 #if defined(OS_LINUX)
     68   if (int global_pid = Channel::GetGlobalPid())
     69     return global_pid;
     70 #endif  // OS_LINUX
     71 #if defined(OS_NACL)
     72   return -1;
     73 #else
     74   return base::GetCurrentProcId();
     75 #endif  // defined(OS_NACL)
     76 }
     77 
     78 }  // namespace
     79 
     80 //------------------------------------------------------------------------------
     81 
     82 // static
     83 std::unique_ptr<ChannelMojo> ChannelMojo::Create(
     84     mojo::ScopedMessagePipeHandle handle,
     85     Mode mode,
     86     Listener* listener,
     87     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
     88     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
     89   return base::WrapUnique(new ChannelMojo(std::move(handle), mode, listener,
     90                                           ipc_task_runner, proxy_task_runner));
     91 }
     92 
     93 // static
     94 std::unique_ptr<ChannelFactory> ChannelMojo::CreateServerFactory(
     95     mojo::ScopedMessagePipeHandle handle,
     96     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
     97     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
     98   return std::make_unique<MojoChannelFactory>(
     99       std::move(handle), Channel::MODE_SERVER, ipc_task_runner,
    100       proxy_task_runner);
    101 }
    102 
    103 // static
    104 std::unique_ptr<ChannelFactory> ChannelMojo::CreateClientFactory(
    105     mojo::ScopedMessagePipeHandle handle,
    106     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    107     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
    108   return std::make_unique<MojoChannelFactory>(
    109       std::move(handle), Channel::MODE_CLIENT, ipc_task_runner,
    110       proxy_task_runner);
    111 }
    112 
    113 ChannelMojo::ChannelMojo(
    114     mojo::ScopedMessagePipeHandle handle,
    115     Mode mode,
    116     Listener* listener,
    117     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    118     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
    119     : task_runner_(ipc_task_runner),
    120       pipe_(handle.get()),
    121       listener_(listener),
    122       weak_factory_(this) {
    123   weak_ptr_ = weak_factory_.GetWeakPtr();
    124   bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, ipc_task_runner,
    125                                      proxy_task_runner);
    126 }
    127 
    128 void ChannelMojo::ForwardMessageFromThreadSafePtr(mojo::Message message) {
    129   DCHECK(task_runner_->RunsTasksInCurrentSequence());
    130   if (!message_reader_ || !message_reader_->sender().is_bound())
    131     return;
    132   message_reader_->sender().internal_state()->ForwardMessage(
    133       std::move(message));
    134 }
    135 
    136 void ChannelMojo::ForwardMessageWithResponderFromThreadSafePtr(
    137     mojo::Message message,
    138     std::unique_ptr<mojo::MessageReceiver> responder) {
    139   DCHECK(task_runner_->RunsTasksInCurrentSequence());
    140   if (!message_reader_ || !message_reader_->sender().is_bound())
    141     return;
    142   message_reader_->sender().internal_state()->ForwardMessageWithResponder(
    143       std::move(message), std::move(responder));
    144 }
    145 
    146 ChannelMojo::~ChannelMojo() {
    147   DCHECK(task_runner_->RunsTasksInCurrentSequence());
    148   Close();
    149 }
    150 
    151 bool ChannelMojo::Connect() {
    152   DCHECK(task_runner_->RunsTasksInCurrentSequence());
    153 
    154   WillConnect();
    155 
    156   mojom::ChannelAssociatedPtr sender;
    157   mojom::ChannelAssociatedRequest receiver;
    158   bootstrap_->Connect(&sender, &receiver);
    159 
    160   DCHECK(!message_reader_);
    161   sender->SetPeerPid(GetSelfPID());
    162   message_reader_.reset(new internal::MessagePipeReader(
    163       pipe_, std::move(sender), std::move(receiver), this));
    164   return true;
    165 }
    166 
    167 void ChannelMojo::Pause() {
    168   bootstrap_->Pause();
    169 }
    170 
    171 void ChannelMojo::Unpause(bool flush) {
    172   bootstrap_->Unpause();
    173   if (flush)
    174     Flush();
    175 }
    176 
    177 void ChannelMojo::Flush() {
    178   bootstrap_->Flush();
    179 }
    180 
    181 void ChannelMojo::Close() {
    182   // NOTE: The MessagePipeReader's destructor may re-enter this function. Use
    183   // caution when changing this method.
    184   std::unique_ptr<internal::MessagePipeReader> reader =
    185       std::move(message_reader_);
    186   reader.reset();
    187 
    188   base::AutoLock lock(associated_interface_lock_);
    189   associated_interfaces_.clear();
    190 }
    191 
    192 void ChannelMojo::OnPipeError() {
    193   DCHECK(task_runner_);
    194   if (task_runner_->RunsTasksInCurrentSequence()) {
    195     listener_->OnChannelError();
    196   } else {
    197     task_runner_->PostTask(FROM_HERE,
    198                            base::Bind(&ChannelMojo::OnPipeError, weak_ptr_));
    199   }
    200 }
    201 
    202 void ChannelMojo::OnAssociatedInterfaceRequest(
    203     const std::string& name,
    204     mojo::ScopedInterfaceEndpointHandle handle) {
    205   GenericAssociatedInterfaceFactory factory;
    206   {
    207     base::AutoLock locker(associated_interface_lock_);
    208     auto iter = associated_interfaces_.find(name);
    209     if (iter != associated_interfaces_.end())
    210       factory = iter->second;
    211   }
    212 
    213   if (!factory.is_null())
    214     factory.Run(std::move(handle));
    215   else
    216     listener_->OnAssociatedInterfaceRequest(name, std::move(handle));
    217 }
    218 
    219 bool ChannelMojo::Send(Message* message) {
    220   DVLOG(2) << "sending message @" << message << " on channel @" << this
    221            << " with type " << message->type();
    222 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
    223   Logging::GetInstance()->OnSendMessage(message);
    224 #endif
    225 
    226   std::unique_ptr<Message> scoped_message = base::WrapUnique(message);
    227   if (!message_reader_)
    228     return false;
    229 
    230   // Comment copied from ipc_channel_posix.cc:
    231   // We can't close the pipe here, because calling OnChannelError may destroy
    232   // this object, and that would be bad if we are called from Send(). Instead,
    233   // we return false and hope the caller will close the pipe. If they do not,
    234   // the pipe will still be closed next time OnFileCanReadWithoutBlocking is
    235   // called.
    236   //
    237   // With Mojo, there's no OnFileCanReadWithoutBlocking, but we expect the
    238   // pipe's connection error handler will be invoked in its place.
    239   return message_reader_->Send(std::move(scoped_message));
    240 }
    241 
    242 Channel::AssociatedInterfaceSupport*
    243 ChannelMojo::GetAssociatedInterfaceSupport() { return this; }
    244 
    245 std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>>
    246 ChannelMojo::CreateThreadSafeChannel() {
    247   return std::make_unique<mojo::ThreadSafeForwarder<mojom::Channel>>(
    248       task_runner_,
    249       base::Bind(&ChannelMojo::ForwardMessageFromThreadSafePtr, weak_ptr_),
    250       base::Bind(&ChannelMojo::ForwardMessageWithResponderFromThreadSafePtr,
    251                  weak_ptr_),
    252       *bootstrap_->GetAssociatedGroup());
    253 }
    254 
    255 void ChannelMojo::OnPeerPidReceived(int32_t peer_pid) {
    256   listener_->OnChannelConnected(peer_pid);
    257 }
    258 
    259 void ChannelMojo::OnMessageReceived(const Message& message) {
    260   TRACE_EVENT2("ipc,toplevel", "ChannelMojo::OnMessageReceived",
    261                "class", IPC_MESSAGE_ID_CLASS(message.type()),
    262                "line", IPC_MESSAGE_ID_LINE(message.type()));
    263   listener_->OnMessageReceived(message);
    264   if (message.dispatch_error())
    265     listener_->OnBadMessageReceived(message);
    266 }
    267 
    268 void ChannelMojo::OnBrokenDataReceived() {
    269   listener_->OnBadMessageReceived(Message());
    270 }
    271 
    272 // static
    273 MojoResult ChannelMojo::ReadFromMessageAttachmentSet(
    274     Message* message,
    275     base::Optional<std::vector<mojo::native::SerializedHandlePtr>>* handles) {
    276   DCHECK(!*handles);
    277 
    278   MojoResult result = MOJO_RESULT_OK;
    279   if (!message->HasAttachments())
    280     return result;
    281 
    282   std::vector<mojo::native::SerializedHandlePtr> output_handles;
    283   MessageAttachmentSet* set = message->attachment_set();
    284 
    285   for (unsigned i = 0; result == MOJO_RESULT_OK && i < set->size(); ++i) {
    286     auto attachment = set->GetAttachmentAt(i);
    287     auto serialized_handle = mojo::native::SerializedHandle::New();
    288     serialized_handle->the_handle = attachment->TakeMojoHandle();
    289     serialized_handle->type =
    290         mojo::ConvertTo<mojo::native::SerializedHandle::Type>(
    291             attachment->GetType());
    292     output_handles.emplace_back(std::move(serialized_handle));
    293   }
    294   set->CommitAllDescriptors();
    295 
    296   if (!output_handles.empty())
    297     *handles = std::move(output_handles);
    298 
    299   return result;
    300 }
    301 
    302 // static
    303 MojoResult ChannelMojo::WriteToMessageAttachmentSet(
    304     base::Optional<std::vector<mojo::native::SerializedHandlePtr>> handles,
    305     Message* message) {
    306   if (!handles)
    307     return MOJO_RESULT_OK;
    308   for (size_t i = 0; i < handles->size(); ++i) {
    309     auto& handle = handles->at(i);
    310     scoped_refptr<MessageAttachment> unwrapped_attachment =
    311         MessageAttachment::CreateFromMojoHandle(
    312             std::move(handle->the_handle),
    313             mojo::ConvertTo<MessageAttachment::Type>(handle->type));
    314     if (!unwrapped_attachment) {
    315       DLOG(WARNING) << "Pipe failed to unwrap handles.";
    316       return MOJO_RESULT_UNKNOWN;
    317     }
    318 
    319     bool ok = message->attachment_set()->AddAttachment(
    320         std::move(unwrapped_attachment));
    321     DCHECK(ok);
    322     if (!ok) {
    323       LOG(ERROR) << "Failed to add new Mojo handle.";
    324       return MOJO_RESULT_UNKNOWN;
    325     }
    326   }
    327   return MOJO_RESULT_OK;
    328 }
    329 
    330 void ChannelMojo::AddGenericAssociatedInterface(
    331     const std::string& name,
    332     const GenericAssociatedInterfaceFactory& factory) {
    333   base::AutoLock locker(associated_interface_lock_);
    334   auto result = associated_interfaces_.insert({ name, factory });
    335   DCHECK(result.second);
    336 }
    337 
    338 void ChannelMojo::GetGenericRemoteAssociatedInterface(
    339     const std::string& name,
    340     mojo::ScopedInterfaceEndpointHandle handle) {
    341   if (message_reader_) {
    342     message_reader_->GetRemoteInterface(name, std::move(handle));
    343   } else {
    344     // Attach the associated interface to a disconnected pipe, so that the
    345     // associated interface pointer can be used to make calls (which are
    346     // dropped).
    347     mojo::AssociateWithDisconnectedPipe(std::move(handle));
    348   }
    349 }
    350 
    351 }  // namespace IPC
    352