Home | History | Annotate | Download | only in ipc
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/ipc_channel_proxy.h"
      6 
      7 #include <stddef.h>
      8 #include <stdint.h>
      9 
     10 #include <utility>
     11 
     12 #include "base/bind.h"
     13 #include "base/compiler_specific.h"
     14 #include "base/location.h"
     15 #include "base/memory/ptr_util.h"
     16 #include "base/memory/ref_counted.h"
     17 #include "base/single_thread_task_runner.h"
     18 #include "base/threading/thread_task_runner_handle.h"
     19 #include "build/build_config.h"
     20 #include "ipc/ipc_channel_factory.h"
     21 #include "ipc/ipc_listener.h"
     22 #include "ipc/ipc_logging.h"
     23 #include "ipc/ipc_message_macros.h"
     24 #include "ipc/message_filter.h"
     25 #include "ipc/message_filter_router.h"
     26 
     27 namespace IPC {
     28 
     29 //------------------------------------------------------------------------------
     30 
     31 ChannelProxy::Context::Context(
     32     Listener* listener,
     33     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
     34     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner)
     35     : listener_task_runner_(listener_task_runner),
     36       listener_(listener),
     37       ipc_task_runner_(ipc_task_runner),
     38       channel_connected_called_(false),
     39       message_filter_router_(new MessageFilterRouter()),
     40       peer_pid_(base::kNullProcessId) {
     41   DCHECK(ipc_task_runner_.get());
     42   // The Listener thread where Messages are handled must be a separate thread
     43   // to avoid oversubscribing the IO thread. If you trigger this error, you
     44   // need to either:
     45   // 1) Create the ChannelProxy on a different thread, or
     46   // 2) Just use Channel
     47   // Note, we currently make an exception for a NULL listener. That usage
     48   // basically works, but is outside the intent of ChannelProxy. This support
     49   // will disappear, so please don't rely on it. See crbug.com/364241
     50   DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
     51 }
     52 
     53 ChannelProxy::Context::~Context() = default;
     54 
     55 void ChannelProxy::Context::ClearIPCTaskRunner() {
     56   ipc_task_runner_ = NULL;
     57 }
     58 
     59 void ChannelProxy::Context::CreateChannel(
     60     std::unique_ptr<ChannelFactory> factory) {
     61   base::AutoLock l(channel_lifetime_lock_);
     62   DCHECK(!channel_);
     63   DCHECK_EQ(factory->GetIPCTaskRunner(), ipc_task_runner_);
     64   channel_ = factory->BuildChannel(this);
     65 
     66   Channel::AssociatedInterfaceSupport* support =
     67       channel_->GetAssociatedInterfaceSupport();
     68   if (support) {
     69     thread_safe_channel_ = support->CreateThreadSafeChannel();
     70 
     71     base::AutoLock l(pending_filters_lock_);
     72     for (auto& entry : pending_io_thread_interfaces_)
     73       support->AddGenericAssociatedInterface(entry.first, entry.second);
     74     pending_io_thread_interfaces_.clear();
     75   }
     76 }
     77 
     78 bool ChannelProxy::Context::TryFilters(const Message& message) {
     79   DCHECK(message_filter_router_);
     80 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
     81   Logging* logger = Logging::GetInstance();
     82   if (logger->Enabled())
     83     logger->OnPreDispatchMessage(message);
     84 #endif
     85 
     86   if (message_filter_router_->TryFilters(message)) {
     87     if (message.dispatch_error()) {
     88       listener_task_runner_->PostTask(
     89           FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
     90     }
     91 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
     92     if (logger->Enabled())
     93       logger->OnPostDispatchMessage(message);
     94 #endif
     95     return true;
     96   }
     97   return false;
     98 }
     99 
    100 // Called on the IPC::Channel thread
    101 void ChannelProxy::Context::PauseChannel() {
    102   DCHECK(channel_);
    103   channel_->Pause();
    104 }
    105 
    106 // Called on the IPC::Channel thread
    107 void ChannelProxy::Context::UnpauseChannel(bool flush) {
    108   DCHECK(channel_);
    109   channel_->Unpause(flush);
    110 }
    111 
    112 // Called on the IPC::Channel thread
    113 void ChannelProxy::Context::FlushChannel() {
    114   DCHECK(channel_);
    115   channel_->Flush();
    116 }
    117 
    118 // Called on the IPC::Channel thread
    119 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
    120   // First give a chance to the filters to process this message.
    121   if (!TryFilters(message))
    122     OnMessageReceivedNoFilter(message);
    123   return true;
    124 }
    125 
    126 // Called on the IPC::Channel thread
    127 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
    128   listener_task_runner_->PostTask(
    129       FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
    130   return true;
    131 }
    132 
    133 // Called on the IPC::Channel thread
    134 void ChannelProxy::Context::OnChannelConnected(int32_t peer_pid) {
    135   // We cache off the peer_pid so it can be safely accessed from both threads.
    136   {
    137     base::AutoLock l(peer_pid_lock_);
    138     peer_pid_ = peer_pid;
    139   }
    140 
    141   // Add any pending filters.  This avoids a race condition where someone
    142   // creates a ChannelProxy, calls AddFilter, and then right after starts the
    143   // peer process.  The IO thread could receive a message before the task to add
    144   // the filter is run on the IO thread.
    145   OnAddFilter();
    146 
    147   // See above comment about using listener_task_runner_ here.
    148   listener_task_runner_->PostTask(
    149       FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
    150 }
    151 
    152 // Called on the IPC::Channel thread
    153 void ChannelProxy::Context::OnChannelError() {
    154   for (size_t i = 0; i < filters_.size(); ++i)
    155     filters_[i]->OnChannelError();
    156 
    157   // See above comment about using listener_task_runner_ here.
    158   listener_task_runner_->PostTask(
    159       FROM_HERE, base::Bind(&Context::OnDispatchError, this));
    160 }
    161 
    162 // Called on the IPC::Channel thread
    163 void ChannelProxy::Context::OnAssociatedInterfaceRequest(
    164     const std::string& interface_name,
    165     mojo::ScopedInterfaceEndpointHandle handle) {
    166   listener_task_runner_->PostTask(
    167       FROM_HERE, base::Bind(&Context::OnDispatchAssociatedInterfaceRequest,
    168                             this, interface_name, base::Passed(&handle)));
    169 }
    170 
    171 // Called on the IPC::Channel thread
    172 void ChannelProxy::Context::OnChannelOpened() {
    173   DCHECK(channel_ != NULL);
    174 
    175   // Assume a reference to ourselves on behalf of this thread.  This reference
    176   // will be released when we are closed.
    177   AddRef();
    178 
    179   if (!channel_->Connect()) {
    180     OnChannelError();
    181     return;
    182   }
    183 
    184   for (size_t i = 0; i < filters_.size(); ++i)
    185     filters_[i]->OnFilterAdded(channel_.get());
    186 }
    187 
    188 // Called on the IPC::Channel thread
    189 void ChannelProxy::Context::OnChannelClosed() {
    190   // It's okay for IPC::ChannelProxy::Close to be called more than once, which
    191   // would result in this branch being taken.
    192   if (!channel_)
    193     return;
    194 
    195   for (auto& filter : pending_filters_) {
    196     filter->OnChannelClosing();
    197     filter->OnFilterRemoved();
    198   }
    199   for (auto& filter : filters_) {
    200     filter->OnChannelClosing();
    201     filter->OnFilterRemoved();
    202   }
    203 
    204   // We don't need the filters anymore.
    205   message_filter_router_->Clear();
    206   filters_.clear();
    207   // We don't need the lock, because at this point, the listener thread can't
    208   // access it any more.
    209   pending_filters_.clear();
    210 
    211   ClearChannel();
    212 
    213   // Balance with the reference taken during startup.  This may result in
    214   // self-destruction.
    215   Release();
    216 }
    217 
    218 void ChannelProxy::Context::Clear() {
    219   listener_ = NULL;
    220 }
    221 
    222 // Called on the IPC::Channel thread
    223 void ChannelProxy::Context::OnSendMessage(std::unique_ptr<Message> message) {
    224   if (!channel_) {
    225     OnChannelClosed();
    226     return;
    227   }
    228 
    229   if (!channel_->Send(message.release()))
    230     OnChannelError();
    231 }
    232 
    233 // Called on the IPC::Channel thread
    234 void ChannelProxy::Context::OnAddFilter() {
    235   // Our OnChannelConnected method has not yet been called, so we can't be
    236   // sure that channel_ is valid yet. When OnChannelConnected *is* called,
    237   // it invokes OnAddFilter, so any pending filter(s) will be added at that
    238   // time.
    239   // No lock necessary for |peer_pid_| because it is only modified on this
    240   // thread.
    241   if (peer_pid_ == base::kNullProcessId)
    242     return;
    243 
    244   std::vector<scoped_refptr<MessageFilter> > new_filters;
    245   {
    246     base::AutoLock auto_lock(pending_filters_lock_);
    247     new_filters.swap(pending_filters_);
    248   }
    249 
    250   for (size_t i = 0; i < new_filters.size(); ++i) {
    251     filters_.push_back(new_filters[i]);
    252 
    253     message_filter_router_->AddFilter(new_filters[i].get());
    254 
    255     // The channel has already been created and connected, so we need to
    256     // inform the filters right now.
    257     new_filters[i]->OnFilterAdded(channel_.get());
    258     new_filters[i]->OnChannelConnected(peer_pid_);
    259   }
    260 }
    261 
    262 // Called on the IPC::Channel thread
    263 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
    264   // No lock necessary for |peer_pid_| because it is only modified on this
    265   // thread.
    266   if (peer_pid_ == base::kNullProcessId) {
    267     // The channel is not yet connected, so any filters are still pending.
    268     base::AutoLock auto_lock(pending_filters_lock_);
    269     for (size_t i = 0; i < pending_filters_.size(); ++i) {
    270       if (pending_filters_[i].get() == filter) {
    271         filter->OnFilterRemoved();
    272         pending_filters_.erase(pending_filters_.begin() + i);
    273         return;
    274       }
    275     }
    276     return;
    277   }
    278   if (!channel_)
    279     return;  // The filters have already been deleted.
    280 
    281   message_filter_router_->RemoveFilter(filter);
    282 
    283   for (size_t i = 0; i < filters_.size(); ++i) {
    284     if (filters_[i].get() == filter) {
    285       filter->OnFilterRemoved();
    286       filters_.erase(filters_.begin() + i);
    287       return;
    288     }
    289   }
    290 
    291   NOTREACHED() << "filter to be removed not found";
    292 }
    293 
    294 // Called on the listener's thread
    295 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
    296   base::AutoLock auto_lock(pending_filters_lock_);
    297   pending_filters_.push_back(base::WrapRefCounted(filter));
    298   ipc_task_runner_->PostTask(
    299       FROM_HERE, base::Bind(&Context::OnAddFilter, this));
    300 }
    301 
    302 // Called on the listener's thread
    303 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
    304   if (!listener_)
    305     return;
    306 
    307   OnDispatchConnected();
    308 
    309 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
    310   Logging* logger = Logging::GetInstance();
    311   if (message.type() == IPC_LOGGING_ID) {
    312     logger->OnReceivedLoggingMessage(message);
    313     return;
    314   }
    315 
    316   if (logger->Enabled())
    317     logger->OnPreDispatchMessage(message);
    318 #endif
    319 
    320   listener_->OnMessageReceived(message);
    321   if (message.dispatch_error())
    322     listener_->OnBadMessageReceived(message);
    323 
    324 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
    325   if (logger->Enabled())
    326     logger->OnPostDispatchMessage(message);
    327 #endif
    328 }
    329 
    330 // Called on the listener's thread
    331 void ChannelProxy::Context::OnDispatchConnected() {
    332   if (channel_connected_called_)
    333     return;
    334 
    335   base::ProcessId peer_pid;
    336   {
    337     base::AutoLock l(peer_pid_lock_);
    338     peer_pid = peer_pid_;
    339   }
    340   channel_connected_called_ = true;
    341   if (listener_)
    342     listener_->OnChannelConnected(peer_pid);
    343 }
    344 
    345 // Called on the listener's thread
    346 void ChannelProxy::Context::OnDispatchError() {
    347   if (listener_)
    348     listener_->OnChannelError();
    349 }
    350 
    351 // Called on the listener's thread
    352 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
    353   if (listener_)
    354     listener_->OnBadMessageReceived(message);
    355 }
    356 
    357 // Called on the listener's thread
    358 void ChannelProxy::Context::OnDispatchAssociatedInterfaceRequest(
    359     const std::string& interface_name,
    360     mojo::ScopedInterfaceEndpointHandle handle) {
    361   if (listener_)
    362     listener_->OnAssociatedInterfaceRequest(interface_name, std::move(handle));
    363 }
    364 
    365 void ChannelProxy::Context::ClearChannel() {
    366   base::AutoLock l(channel_lifetime_lock_);
    367   channel_.reset();
    368 }
    369 
    370 void ChannelProxy::Context::AddGenericAssociatedInterfaceForIOThread(
    371     const std::string& name,
    372     const GenericAssociatedInterfaceFactory& factory) {
    373   base::AutoLock l(channel_lifetime_lock_);
    374   if (!channel_) {
    375     base::AutoLock l(pending_filters_lock_);
    376     pending_io_thread_interfaces_.emplace_back(name, factory);
    377     return;
    378   }
    379   Channel::AssociatedInterfaceSupport* support =
    380       channel_->GetAssociatedInterfaceSupport();
    381   if (support)
    382     support->AddGenericAssociatedInterface(name, factory);
    383 }
    384 
    385 void ChannelProxy::Context::Send(Message* message) {
    386   ipc_task_runner()->PostTask(
    387       FROM_HERE, base::Bind(&ChannelProxy::Context::OnSendMessage, this,
    388                             base::Passed(base::WrapUnique(message))));
    389 }
    390 
    391 //-----------------------------------------------------------------------------
    392 
    393 // static
    394 std::unique_ptr<ChannelProxy> ChannelProxy::Create(
    395     const IPC::ChannelHandle& channel_handle,
    396     Channel::Mode mode,
    397     Listener* listener,
    398     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    399     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) {
    400   std::unique_ptr<ChannelProxy> channel(
    401       new ChannelProxy(listener, ipc_task_runner, listener_task_runner));
    402   channel->Init(channel_handle, mode, true);
    403   return channel;
    404 }
    405 
    406 // static
    407 std::unique_ptr<ChannelProxy> ChannelProxy::Create(
    408     std::unique_ptr<ChannelFactory> factory,
    409     Listener* listener,
    410     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    411     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) {
    412   std::unique_ptr<ChannelProxy> channel(
    413       new ChannelProxy(listener, ipc_task_runner, listener_task_runner));
    414   channel->Init(std::move(factory), true);
    415   return channel;
    416 }
    417 
    418 ChannelProxy::ChannelProxy(Context* context)
    419     : context_(context), did_init_(false) {
    420 #if defined(ENABLE_IPC_FUZZER)
    421   outgoing_message_filter_ = NULL;
    422 #endif
    423 }
    424 
    425 ChannelProxy::ChannelProxy(
    426     Listener* listener,
    427     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    428     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner)
    429     : context_(new Context(listener, ipc_task_runner, listener_task_runner)),
    430       did_init_(false) {
    431 #if defined(ENABLE_IPC_FUZZER)
    432   outgoing_message_filter_ = NULL;
    433 #endif
    434 }
    435 
    436 ChannelProxy::~ChannelProxy() {
    437   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    438 
    439   Close();
    440 }
    441 
    442 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
    443                         Channel::Mode mode,
    444                         bool create_pipe_now) {
    445 #if defined(OS_POSIX) || defined(OS_FUCHSIA)
    446   // When we are creating a server on POSIX, we need its file descriptor
    447   // to be created immediately so that it can be accessed and passed
    448   // to other processes. Forcing it to be created immediately avoids
    449   // race conditions that may otherwise arise.
    450   if (mode & Channel::MODE_SERVER_FLAG) {
    451     create_pipe_now = true;
    452   }
    453 #endif  // defined(OS_POSIX) || defined(OS_FUCHSIA)
    454   Init(
    455       ChannelFactory::Create(channel_handle, mode, context_->ipc_task_runner()),
    456       create_pipe_now);
    457 }
    458 
    459 void ChannelProxy::Init(std::unique_ptr<ChannelFactory> factory,
    460                         bool create_pipe_now) {
    461   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    462   DCHECK(!did_init_);
    463 
    464   if (create_pipe_now) {
    465     // Create the channel immediately.  This effectively sets up the
    466     // low-level pipe so that the client can connect.  Without creating
    467     // the pipe immediately, it is possible for a listener to attempt
    468     // to connect and get an error since the pipe doesn't exist yet.
    469     context_->CreateChannel(std::move(factory));
    470   } else {
    471     context_->ipc_task_runner()->PostTask(
    472         FROM_HERE, base::Bind(&Context::CreateChannel, context_,
    473                               base::Passed(&factory)));
    474   }
    475 
    476   // complete initialization on the background thread
    477   context_->ipc_task_runner()->PostTask(
    478       FROM_HERE,
    479       base::Bind(&Context::OnChannelOpened, context_));
    480 
    481   did_init_ = true;
    482   OnChannelInit();
    483 }
    484 
    485 void ChannelProxy::Pause() {
    486   context_->ipc_task_runner()->PostTask(
    487       FROM_HERE, base::Bind(&Context::PauseChannel, context_));
    488 }
    489 
    490 void ChannelProxy::Unpause(bool flush) {
    491   context_->ipc_task_runner()->PostTask(
    492       FROM_HERE, base::Bind(&Context::UnpauseChannel, context_, flush));
    493 }
    494 
    495 void ChannelProxy::Flush() {
    496   context_->ipc_task_runner()->PostTask(
    497       FROM_HERE, base::Bind(&Context::FlushChannel, context_));
    498 }
    499 
    500 void ChannelProxy::Close() {
    501   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    502 
    503   // Clear the backpointer to the listener so that any pending calls to
    504   // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
    505   // possible that the channel could be closed while it is receiving messages!
    506   context_->Clear();
    507 
    508   if (context_->ipc_task_runner()) {
    509     context_->ipc_task_runner()->PostTask(
    510         FROM_HERE, base::Bind(&Context::OnChannelClosed, context_));
    511   }
    512 }
    513 
    514 bool ChannelProxy::Send(Message* message) {
    515   DCHECK(!message->is_sync()) << "Need to use IPC::SyncChannel";
    516   SendInternal(message);
    517   return true;
    518 }
    519 
    520 void ChannelProxy::SendInternal(Message* message) {
    521   DCHECK(did_init_);
    522 
    523   // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
    524   // tests that call Send() from a wrong thread. See http://crbug.com/163523.
    525 
    526 #ifdef ENABLE_IPC_FUZZER
    527   // In IPC fuzzing builds, it is possible to define a filter to apply to
    528   // outgoing messages. It will either rewrite the message and return a new
    529   // one, freeing the original, or return the message unchanged.
    530   if (outgoing_message_filter())
    531     message = outgoing_message_filter()->Rewrite(message);
    532 #endif
    533 
    534 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
    535   Logging::GetInstance()->OnSendMessage(message);
    536 #endif
    537 
    538   // See https://crbug.com/766032. This is to ensure that senders of oversized
    539   // messages can be caught more easily in the wild.
    540   CHECK_LE(message->size(), Channel::kMaximumMessageSize);
    541 
    542   context_->Send(message);
    543 }
    544 
    545 void ChannelProxy::AddFilter(MessageFilter* filter) {
    546   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    547 
    548   context_->AddFilter(filter);
    549 }
    550 
    551 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
    552   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    553 
    554   context_->ipc_task_runner()->PostTask(
    555       FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_,
    556                             base::RetainedRef(filter)));
    557 }
    558 
    559 void ChannelProxy::AddGenericAssociatedInterfaceForIOThread(
    560     const std::string& name,
    561     const GenericAssociatedInterfaceFactory& factory) {
    562   context()->AddGenericAssociatedInterfaceForIOThread(name, factory);
    563 }
    564 
    565 void ChannelProxy::GetGenericRemoteAssociatedInterface(
    566     const std::string& name,
    567     mojo::ScopedInterfaceEndpointHandle handle) {
    568   DCHECK(did_init_);
    569   context()->thread_safe_channel().GetAssociatedInterface(
    570       name, mojom::GenericInterfaceAssociatedRequest(std::move(handle)));
    571 }
    572 
    573 void ChannelProxy::ClearIPCTaskRunner() {
    574   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    575   context()->ClearIPCTaskRunner();
    576 }
    577 
    578 void ChannelProxy::OnChannelInit() {
    579 }
    580 
    581 //-----------------------------------------------------------------------------
    582 
    583 }  // namespace IPC
    584