Home | History | Annotate | Download | only in ipc
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/ipc_channel_proxy.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/compiler_specific.h"
      9 #include "base/location.h"
     10 #include "base/memory/ref_counted.h"
     11 #include "base/memory/scoped_ptr.h"
     12 #include "base/single_thread_task_runner.h"
     13 #include "base/thread_task_runner_handle.h"
     14 #include "ipc/ipc_listener.h"
     15 #include "ipc/ipc_logging.h"
     16 #include "ipc/ipc_message_macros.h"
     17 #include "ipc/message_filter.h"
     18 #include "ipc/message_filter_router.h"
     19 
     20 namespace IPC {
     21 
     22 //------------------------------------------------------------------------------
     23 
     24 ChannelProxy::Context::Context(Listener* listener,
     25                                base::SingleThreadTaskRunner* ipc_task_runner)
     26     : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
     27       listener_(listener),
     28       ipc_task_runner_(ipc_task_runner),
     29       channel_connected_called_(false),
     30       message_filter_router_(new MessageFilterRouter()),
     31       peer_pid_(base::kNullProcessId) {
     32   DCHECK(ipc_task_runner_.get());
     33   // The Listener thread where Messages are handled must be a separate thread
     34   // to avoid oversubscribing the IO thread. If you trigger this error, you
     35   // need to either:
     36   // 1) Create the ChannelProxy on a different thread, or
     37   // 2) Just use Channel
     38   // Note, we currently make an exception for a NULL listener. That usage
     39   // basically works, but is outside the intent of ChannelProxy. This support
     40   // will disappear, so please don't rely on it. See crbug.com/364241
     41   DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
     42 }
     43 
     44 ChannelProxy::Context::~Context() {
     45 }
     46 
     47 void ChannelProxy::Context::ClearIPCTaskRunner() {
     48   ipc_task_runner_ = NULL;
     49 }
     50 
     51 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
     52                                           const Channel::Mode& mode) {
     53   DCHECK(!channel_);
     54   channel_id_ = handle.name;
     55   channel_ = Channel::Create(handle, mode, this);
     56 }
     57 
     58 bool ChannelProxy::Context::TryFilters(const Message& message) {
     59   DCHECK(message_filter_router_);
     60 #ifdef IPC_MESSAGE_LOG_ENABLED
     61   Logging* logger = Logging::GetInstance();
     62   if (logger->Enabled())
     63     logger->OnPreDispatchMessage(message);
     64 #endif
     65 
     66   if (message_filter_router_->TryFilters(message)) {
     67     if (message.dispatch_error()) {
     68       listener_task_runner_->PostTask(
     69           FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
     70     }
     71 #ifdef IPC_MESSAGE_LOG_ENABLED
     72     if (logger->Enabled())
     73       logger->OnPostDispatchMessage(message, channel_id_);
     74 #endif
     75     return true;
     76   }
     77   return false;
     78 }
     79 
     80 // Called on the IPC::Channel thread
     81 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
     82   // First give a chance to the filters to process this message.
     83   if (!TryFilters(message))
     84     OnMessageReceivedNoFilter(message);
     85   return true;
     86 }
     87 
     88 // Called on the IPC::Channel thread
     89 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
     90   listener_task_runner_->PostTask(
     91       FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
     92   return true;
     93 }
     94 
     95 // Called on the IPC::Channel thread
     96 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
     97   // We cache off the peer_pid so it can be safely accessed from both threads.
     98   peer_pid_ = channel_->GetPeerPID();
     99 
    100   // Add any pending filters.  This avoids a race condition where someone
    101   // creates a ChannelProxy, calls AddFilter, and then right after starts the
    102   // peer process.  The IO thread could receive a message before the task to add
    103   // the filter is run on the IO thread.
    104   OnAddFilter();
    105 
    106   // See above comment about using listener_task_runner_ here.
    107   listener_task_runner_->PostTask(
    108       FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
    109 }
    110 
    111 // Called on the IPC::Channel thread
    112 void ChannelProxy::Context::OnChannelError() {
    113   for (size_t i = 0; i < filters_.size(); ++i)
    114     filters_[i]->OnChannelError();
    115 
    116   // See above comment about using listener_task_runner_ here.
    117   listener_task_runner_->PostTask(
    118       FROM_HERE, base::Bind(&Context::OnDispatchError, this));
    119 }
    120 
    121 // Called on the IPC::Channel thread
    122 void ChannelProxy::Context::OnChannelOpened() {
    123   DCHECK(channel_ != NULL);
    124 
    125   // Assume a reference to ourselves on behalf of this thread.  This reference
    126   // will be released when we are closed.
    127   AddRef();
    128 
    129   if (!channel_->Connect()) {
    130     OnChannelError();
    131     return;
    132   }
    133 
    134   for (size_t i = 0; i < filters_.size(); ++i)
    135     filters_[i]->OnFilterAdded(channel_.get());
    136 }
    137 
    138 // Called on the IPC::Channel thread
    139 void ChannelProxy::Context::OnChannelClosed() {
    140   // It's okay for IPC::ChannelProxy::Close to be called more than once, which
    141   // would result in this branch being taken.
    142   if (!channel_)
    143     return;
    144 
    145   for (size_t i = 0; i < filters_.size(); ++i) {
    146     filters_[i]->OnChannelClosing();
    147     filters_[i]->OnFilterRemoved();
    148   }
    149 
    150   // We don't need the filters anymore.
    151   message_filter_router_->Clear();
    152   filters_.clear();
    153   // We don't need the lock, because at this point, the listener thread can't
    154   // access it any more.
    155   pending_filters_.clear();
    156 
    157   channel_.reset();
    158 
    159   // Balance with the reference taken during startup.  This may result in
    160   // self-destruction.
    161   Release();
    162 }
    163 
    164 void ChannelProxy::Context::Clear() {
    165   listener_ = NULL;
    166 }
    167 
    168 // Called on the IPC::Channel thread
    169 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
    170   if (!channel_) {
    171     OnChannelClosed();
    172     return;
    173   }
    174 
    175   if (!channel_->Send(message.release()))
    176     OnChannelError();
    177 }
    178 
    179 // Called on the IPC::Channel thread
    180 void ChannelProxy::Context::OnAddFilter() {
    181   // Our OnChannelConnected method has not yet been called, so we can't be
    182   // sure that channel_ is valid yet. When OnChannelConnected *is* called,
    183   // it invokes OnAddFilter, so any pending filter(s) will be added at that
    184   // time.
    185   if (peer_pid_ == base::kNullProcessId)
    186     return;
    187 
    188   std::vector<scoped_refptr<MessageFilter> > new_filters;
    189   {
    190     base::AutoLock auto_lock(pending_filters_lock_);
    191     new_filters.swap(pending_filters_);
    192   }
    193 
    194   for (size_t i = 0; i < new_filters.size(); ++i) {
    195     filters_.push_back(new_filters[i]);
    196 
    197     message_filter_router_->AddFilter(new_filters[i].get());
    198 
    199     // The channel has already been created and connected, so we need to
    200     // inform the filters right now.
    201     new_filters[i]->OnFilterAdded(channel_.get());
    202     new_filters[i]->OnChannelConnected(peer_pid_);
    203   }
    204 }
    205 
    206 // Called on the IPC::Channel thread
    207 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
    208   if (peer_pid_ == base::kNullProcessId) {
    209     // The channel is not yet connected, so any filters are still pending.
    210     base::AutoLock auto_lock(pending_filters_lock_);
    211     for (size_t i = 0; i < pending_filters_.size(); ++i) {
    212       if (pending_filters_[i].get() == filter) {
    213         filter->OnFilterRemoved();
    214         pending_filters_.erase(pending_filters_.begin() + i);
    215         return;
    216       }
    217     }
    218     return;
    219   }
    220   if (!channel_)
    221     return;  // The filters have already been deleted.
    222 
    223   message_filter_router_->RemoveFilter(filter);
    224 
    225   for (size_t i = 0; i < filters_.size(); ++i) {
    226     if (filters_[i].get() == filter) {
    227       filter->OnFilterRemoved();
    228       filters_.erase(filters_.begin() + i);
    229       return;
    230     }
    231   }
    232 
    233   NOTREACHED() << "filter to be removed not found";
    234 }
    235 
    236 // Called on the listener's thread
    237 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
    238   base::AutoLock auto_lock(pending_filters_lock_);
    239   pending_filters_.push_back(make_scoped_refptr(filter));
    240   ipc_task_runner_->PostTask(
    241       FROM_HERE, base::Bind(&Context::OnAddFilter, this));
    242 }
    243 
    244 // Called on the listener's thread
    245 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
    246 #ifdef IPC_MESSAGE_LOG_ENABLED
    247   Logging* logger = Logging::GetInstance();
    248   std::string name;
    249   logger->GetMessageText(message.type(), &name, &message, NULL);
    250   TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
    251                "name", name);
    252 #else
    253   TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
    254                "class", IPC_MESSAGE_ID_CLASS(message.type()),
    255                "line", IPC_MESSAGE_ID_LINE(message.type()));
    256 #endif
    257 
    258   if (!listener_)
    259     return;
    260 
    261   OnDispatchConnected();
    262 
    263 #ifdef IPC_MESSAGE_LOG_ENABLED
    264   if (message.type() == IPC_LOGGING_ID) {
    265     logger->OnReceivedLoggingMessage(message);
    266     return;
    267   }
    268 
    269   if (logger->Enabled())
    270     logger->OnPreDispatchMessage(message);
    271 #endif
    272 
    273   listener_->OnMessageReceived(message);
    274   if (message.dispatch_error())
    275     listener_->OnBadMessageReceived(message);
    276 
    277 #ifdef IPC_MESSAGE_LOG_ENABLED
    278   if (logger->Enabled())
    279     logger->OnPostDispatchMessage(message, channel_id_);
    280 #endif
    281 }
    282 
    283 // Called on the listener's thread
    284 void ChannelProxy::Context::OnDispatchConnected() {
    285   if (channel_connected_called_)
    286     return;
    287 
    288   channel_connected_called_ = true;
    289   if (listener_)
    290     listener_->OnChannelConnected(peer_pid_);
    291 }
    292 
    293 // Called on the listener's thread
    294 void ChannelProxy::Context::OnDispatchError() {
    295   if (listener_)
    296     listener_->OnChannelError();
    297 }
    298 
    299 // Called on the listener's thread
    300 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
    301   if (listener_)
    302     listener_->OnBadMessageReceived(message);
    303 }
    304 
    305 //-----------------------------------------------------------------------------
    306 
    307 // static
    308 scoped_ptr<ChannelProxy> ChannelProxy::Create(
    309     const IPC::ChannelHandle& channel_handle,
    310     Channel::Mode mode,
    311     Listener* listener,
    312     base::SingleThreadTaskRunner* ipc_task_runner) {
    313   scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
    314   channel->Init(channel_handle, mode, true);
    315   return channel.Pass();
    316 }
    317 
    318 ChannelProxy::ChannelProxy(Context* context)
    319     : context_(context),
    320       did_init_(false) {
    321 }
    322 
    323 ChannelProxy::ChannelProxy(Listener* listener,
    324                            base::SingleThreadTaskRunner* ipc_task_runner)
    325     : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
    326 }
    327 
    328 ChannelProxy::~ChannelProxy() {
    329   DCHECK(CalledOnValidThread());
    330 
    331   Close();
    332 }
    333 
    334 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
    335                         Channel::Mode mode,
    336                         bool create_pipe_now) {
    337   DCHECK(CalledOnValidThread());
    338   DCHECK(!did_init_);
    339 #if defined(OS_POSIX)
    340   // When we are creating a server on POSIX, we need its file descriptor
    341   // to be created immediately so that it can be accessed and passed
    342   // to other processes. Forcing it to be created immediately avoids
    343   // race conditions that may otherwise arise.
    344   if (mode & Channel::MODE_SERVER_FLAG) {
    345     create_pipe_now = true;
    346   }
    347 #endif  // defined(OS_POSIX)
    348 
    349   if (create_pipe_now) {
    350     // Create the channel immediately.  This effectively sets up the
    351     // low-level pipe so that the client can connect.  Without creating
    352     // the pipe immediately, it is possible for a listener to attempt
    353     // to connect and get an error since the pipe doesn't exist yet.
    354     context_->CreateChannel(channel_handle, mode);
    355   } else {
    356     context_->ipc_task_runner()->PostTask(
    357         FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
    358                               channel_handle, mode));
    359   }
    360 
    361   // complete initialization on the background thread
    362   context_->ipc_task_runner()->PostTask(
    363       FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
    364 
    365   did_init_ = true;
    366 }
    367 
    368 void ChannelProxy::Close() {
    369   DCHECK(CalledOnValidThread());
    370 
    371   // Clear the backpointer to the listener so that any pending calls to
    372   // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
    373   // possible that the channel could be closed while it is receiving messages!
    374   context_->Clear();
    375 
    376   if (context_->ipc_task_runner()) {
    377     context_->ipc_task_runner()->PostTask(
    378         FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
    379   }
    380 }
    381 
    382 bool ChannelProxy::Send(Message* message) {
    383   DCHECK(did_init_);
    384 
    385   // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
    386   // tests that call Send() from a wrong thread. See http://crbug.com/163523.
    387 
    388 #ifdef IPC_MESSAGE_LOG_ENABLED
    389   Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
    390 #endif
    391 
    392   context_->ipc_task_runner()->PostTask(
    393       FROM_HERE,
    394       base::Bind(&ChannelProxy::Context::OnSendMessage,
    395                  context_, base::Passed(scoped_ptr<Message>(message))));
    396   return true;
    397 }
    398 
    399 void ChannelProxy::AddFilter(MessageFilter* filter) {
    400   DCHECK(CalledOnValidThread());
    401 
    402   context_->AddFilter(filter);
    403 }
    404 
    405 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
    406   DCHECK(CalledOnValidThread());
    407 
    408   context_->ipc_task_runner()->PostTask(
    409       FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
    410                             make_scoped_refptr(filter)));
    411 }
    412 
    413 void ChannelProxy::ClearIPCTaskRunner() {
    414   DCHECK(CalledOnValidThread());
    415 
    416   context()->ClearIPCTaskRunner();
    417 }
    418 
    419 #if defined(OS_POSIX) && !defined(OS_NACL)
    420 // See the TODO regarding lazy initialization of the channel in
    421 // ChannelProxy::Init().
    422 int ChannelProxy::GetClientFileDescriptor() {
    423   DCHECK(CalledOnValidThread());
    424 
    425   Channel* channel = context_.get()->channel_.get();
    426   // Channel must have been created first.
    427   DCHECK(channel) << context_.get()->channel_id_;
    428   return channel->GetClientFileDescriptor();
    429 }
    430 
    431 int ChannelProxy::TakeClientFileDescriptor() {
    432   DCHECK(CalledOnValidThread());
    433 
    434   Channel* channel = context_.get()->channel_.get();
    435   // Channel must have been created first.
    436   DCHECK(channel) << context_.get()->channel_id_;
    437   return channel->TakeClientFileDescriptor();
    438 }
    439 #endif
    440 
    441 //-----------------------------------------------------------------------------
    442 
    443 }  // namespace IPC
    444