Home | History | Annotate | Download | only in ipc
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/ipc_channel_proxy.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/compiler_specific.h"
      9 #include "base/location.h"
     10 #include "base/memory/ref_counted.h"
     11 #include "base/memory/scoped_ptr.h"
     12 #include "base/single_thread_task_runner.h"
     13 #include "base/thread_task_runner_handle.h"
     14 #include "ipc/ipc_channel_factory.h"
     15 #include "ipc/ipc_listener.h"
     16 #include "ipc/ipc_logging.h"
     17 #include "ipc/ipc_message_macros.h"
     18 #include "ipc/message_filter.h"
     19 #include "ipc/message_filter_router.h"
     20 
     21 namespace IPC {
     22 
     23 //------------------------------------------------------------------------------
     24 
     25 ChannelProxy::Context::Context(
     26     Listener* listener,
     27     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
     28     : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
     29       listener_(listener),
     30       ipc_task_runner_(ipc_task_runner),
     31       channel_connected_called_(false),
     32       message_filter_router_(new MessageFilterRouter()),
     33       peer_pid_(base::kNullProcessId) {
     34   DCHECK(ipc_task_runner_.get());
     35   // The Listener thread where Messages are handled must be a separate thread
     36   // to avoid oversubscribing the IO thread. If you trigger this error, you
     37   // need to either:
     38   // 1) Create the ChannelProxy on a different thread, or
     39   // 2) Just use Channel
     40   // Note, we currently make an exception for a NULL listener. That usage
     41   // basically works, but is outside the intent of ChannelProxy. This support
     42   // will disappear, so please don't rely on it. See crbug.com/364241
     43   DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
     44 }
     45 
     46 ChannelProxy::Context::~Context() {
     47 }
     48 
     49 void ChannelProxy::Context::ClearIPCTaskRunner() {
     50   ipc_task_runner_ = NULL;
     51 }
     52 
     53 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
     54   DCHECK(!channel_);
     55   channel_id_ = factory->GetName();
     56   channel_ = factory->BuildChannel(this);
     57 }
     58 
     59 bool ChannelProxy::Context::TryFilters(const Message& message) {
     60   DCHECK(message_filter_router_);
     61 #ifdef IPC_MESSAGE_LOG_ENABLED
     62   Logging* logger = Logging::GetInstance();
     63   if (logger->Enabled())
     64     logger->OnPreDispatchMessage(message);
     65 #endif
     66 
     67   if (message_filter_router_->TryFilters(message)) {
     68     if (message.dispatch_error()) {
     69       listener_task_runner_->PostTask(
     70           FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
     71     }
     72 #ifdef IPC_MESSAGE_LOG_ENABLED
     73     if (logger->Enabled())
     74       logger->OnPostDispatchMessage(message, channel_id_);
     75 #endif
     76     return true;
     77   }
     78   return false;
     79 }
     80 
     81 // Called on the IPC::Channel thread
     82 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
     83   // First give a chance to the filters to process this message.
     84   if (!TryFilters(message))
     85     OnMessageReceivedNoFilter(message);
     86   return true;
     87 }
     88 
     89 // Called on the IPC::Channel thread
     90 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
     91   listener_task_runner_->PostTask(
     92       FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
     93   return true;
     94 }
     95 
     96 // Called on the IPC::Channel thread
     97 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
     98   // We cache off the peer_pid so it can be safely accessed from both threads.
     99   peer_pid_ = channel_->GetPeerPID();
    100 
    101   // Add any pending filters.  This avoids a race condition where someone
    102   // creates a ChannelProxy, calls AddFilter, and then right after starts the
    103   // peer process.  The IO thread could receive a message before the task to add
    104   // the filter is run on the IO thread.
    105   OnAddFilter();
    106 
    107   // See above comment about using listener_task_runner_ here.
    108   listener_task_runner_->PostTask(
    109       FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
    110 }
    111 
    112 // Called on the IPC::Channel thread
    113 void ChannelProxy::Context::OnChannelError() {
    114   for (size_t i = 0; i < filters_.size(); ++i)
    115     filters_[i]->OnChannelError();
    116 
    117   // See above comment about using listener_task_runner_ here.
    118   listener_task_runner_->PostTask(
    119       FROM_HERE, base::Bind(&Context::OnDispatchError, this));
    120 }
    121 
    122 // Called on the IPC::Channel thread
    123 void ChannelProxy::Context::OnChannelOpened() {
    124   DCHECK(channel_ != NULL);
    125 
    126   // Assume a reference to ourselves on behalf of this thread.  This reference
    127   // will be released when we are closed.
    128   AddRef();
    129 
    130   if (!channel_->Connect()) {
    131     OnChannelError();
    132     return;
    133   }
    134 
    135   for (size_t i = 0; i < filters_.size(); ++i)
    136     filters_[i]->OnFilterAdded(channel_.get());
    137 }
    138 
    139 // Called on the IPC::Channel thread
    140 void ChannelProxy::Context::OnChannelClosed() {
    141   // It's okay for IPC::ChannelProxy::Close to be called more than once, which
    142   // would result in this branch being taken.
    143   if (!channel_)
    144     return;
    145 
    146   for (size_t i = 0; i < filters_.size(); ++i) {
    147     filters_[i]->OnChannelClosing();
    148     filters_[i]->OnFilterRemoved();
    149   }
    150 
    151   // We don't need the filters anymore.
    152   message_filter_router_->Clear();
    153   filters_.clear();
    154   // We don't need the lock, because at this point, the listener thread can't
    155   // access it any more.
    156   pending_filters_.clear();
    157 
    158   channel_.reset();
    159 
    160   // Balance with the reference taken during startup.  This may result in
    161   // self-destruction.
    162   Release();
    163 }
    164 
    165 void ChannelProxy::Context::Clear() {
    166   listener_ = NULL;
    167 }
    168 
    169 // Called on the IPC::Channel thread
    170 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
    171   if (!channel_) {
    172     OnChannelClosed();
    173     return;
    174   }
    175 
    176   if (!channel_->Send(message.release()))
    177     OnChannelError();
    178 }
    179 
    180 // Called on the IPC::Channel thread
    181 void ChannelProxy::Context::OnAddFilter() {
    182   // Our OnChannelConnected method has not yet been called, so we can't be
    183   // sure that channel_ is valid yet. When OnChannelConnected *is* called,
    184   // it invokes OnAddFilter, so any pending filter(s) will be added at that
    185   // time.
    186   if (peer_pid_ == base::kNullProcessId)
    187     return;
    188 
    189   std::vector<scoped_refptr<MessageFilter> > new_filters;
    190   {
    191     base::AutoLock auto_lock(pending_filters_lock_);
    192     new_filters.swap(pending_filters_);
    193   }
    194 
    195   for (size_t i = 0; i < new_filters.size(); ++i) {
    196     filters_.push_back(new_filters[i]);
    197 
    198     message_filter_router_->AddFilter(new_filters[i].get());
    199 
    200     // The channel has already been created and connected, so we need to
    201     // inform the filters right now.
    202     new_filters[i]->OnFilterAdded(channel_.get());
    203     new_filters[i]->OnChannelConnected(peer_pid_);
    204   }
    205 }
    206 
    207 // Called on the IPC::Channel thread
    208 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
    209   if (peer_pid_ == base::kNullProcessId) {
    210     // The channel is not yet connected, so any filters are still pending.
    211     base::AutoLock auto_lock(pending_filters_lock_);
    212     for (size_t i = 0; i < pending_filters_.size(); ++i) {
    213       if (pending_filters_[i].get() == filter) {
    214         filter->OnFilterRemoved();
    215         pending_filters_.erase(pending_filters_.begin() + i);
    216         return;
    217       }
    218     }
    219     return;
    220   }
    221   if (!channel_)
    222     return;  // The filters have already been deleted.
    223 
    224   message_filter_router_->RemoveFilter(filter);
    225 
    226   for (size_t i = 0; i < filters_.size(); ++i) {
    227     if (filters_[i].get() == filter) {
    228       filter->OnFilterRemoved();
    229       filters_.erase(filters_.begin() + i);
    230       return;
    231     }
    232   }
    233 
    234   NOTREACHED() << "filter to be removed not found";
    235 }
    236 
    237 // Called on the listener's thread
    238 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
    239   base::AutoLock auto_lock(pending_filters_lock_);
    240   pending_filters_.push_back(make_scoped_refptr(filter));
    241   ipc_task_runner_->PostTask(
    242       FROM_HERE, base::Bind(&Context::OnAddFilter, this));
    243 }
    244 
    245 // Called on the listener's thread
    246 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
    247 #ifdef IPC_MESSAGE_LOG_ENABLED
    248   Logging* logger = Logging::GetInstance();
    249   std::string name;
    250   logger->GetMessageText(message.type(), &name, &message, NULL);
    251   TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
    252                "name", name);
    253 #else
    254   TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
    255                "class", IPC_MESSAGE_ID_CLASS(message.type()),
    256                "line", IPC_MESSAGE_ID_LINE(message.type()));
    257 #endif
    258 
    259   if (!listener_)
    260     return;
    261 
    262   OnDispatchConnected();
    263 
    264 #ifdef IPC_MESSAGE_LOG_ENABLED
    265   if (message.type() == IPC_LOGGING_ID) {
    266     logger->OnReceivedLoggingMessage(message);
    267     return;
    268   }
    269 
    270   if (logger->Enabled())
    271     logger->OnPreDispatchMessage(message);
    272 #endif
    273 
    274   listener_->OnMessageReceived(message);
    275   if (message.dispatch_error())
    276     listener_->OnBadMessageReceived(message);
    277 
    278 #ifdef IPC_MESSAGE_LOG_ENABLED
    279   if (logger->Enabled())
    280     logger->OnPostDispatchMessage(message, channel_id_);
    281 #endif
    282 }
    283 
    284 // Called on the listener's thread
    285 void ChannelProxy::Context::OnDispatchConnected() {
    286   if (channel_connected_called_)
    287     return;
    288 
    289   channel_connected_called_ = true;
    290   if (listener_)
    291     listener_->OnChannelConnected(peer_pid_);
    292 }
    293 
    294 // Called on the listener's thread
    295 void ChannelProxy::Context::OnDispatchError() {
    296   if (listener_)
    297     listener_->OnChannelError();
    298 }
    299 
    300 // Called on the listener's thread
    301 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
    302   if (listener_)
    303     listener_->OnBadMessageReceived(message);
    304 }
    305 
    306 //-----------------------------------------------------------------------------
    307 
    308 // static
    309 scoped_ptr<ChannelProxy> ChannelProxy::Create(
    310     const IPC::ChannelHandle& channel_handle,
    311     Channel::Mode mode,
    312     Listener* listener,
    313     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
    314   scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
    315   channel->Init(channel_handle, mode, true);
    316   return channel.Pass();
    317 }
    318 
    319 // static
    320 scoped_ptr<ChannelProxy> ChannelProxy::Create(
    321     scoped_ptr<ChannelFactory> factory,
    322     Listener* listener,
    323     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
    324   scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
    325   channel->Init(factory.Pass(), true);
    326   return channel.Pass();
    327 }
    328 
    329 ChannelProxy::ChannelProxy(Context* context)
    330     : context_(context),
    331       did_init_(false) {
    332 }
    333 
    334 ChannelProxy::ChannelProxy(
    335     Listener* listener,
    336     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
    337     : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
    338 }
    339 
    340 ChannelProxy::~ChannelProxy() {
    341   DCHECK(CalledOnValidThread());
    342 
    343   Close();
    344 }
    345 
    346 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
    347                         Channel::Mode mode,
    348                         bool create_pipe_now) {
    349 #if defined(OS_POSIX)
    350   // When we are creating a server on POSIX, we need its file descriptor
    351   // to be created immediately so that it can be accessed and passed
    352   // to other processes. Forcing it to be created immediately avoids
    353   // race conditions that may otherwise arise.
    354   if (mode & Channel::MODE_SERVER_FLAG) {
    355     create_pipe_now = true;
    356   }
    357 #endif  // defined(OS_POSIX)
    358   Init(ChannelFactory::Create(channel_handle, mode),
    359        create_pipe_now);
    360 }
    361 
    362 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
    363                         bool create_pipe_now) {
    364   DCHECK(CalledOnValidThread());
    365   DCHECK(!did_init_);
    366 
    367   if (create_pipe_now) {
    368     // Create the channel immediately.  This effectively sets up the
    369     // low-level pipe so that the client can connect.  Without creating
    370     // the pipe immediately, it is possible for a listener to attempt
    371     // to connect and get an error since the pipe doesn't exist yet.
    372     context_->CreateChannel(factory.Pass());
    373   } else {
    374     context_->ipc_task_runner()->PostTask(
    375         FROM_HERE, base::Bind(&Context::CreateChannel,
    376                               context_.get(), Passed(factory.Pass())));
    377   }
    378 
    379   // complete initialization on the background thread
    380   context_->ipc_task_runner()->PostTask(
    381       FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
    382 
    383   did_init_ = true;
    384 }
    385 
    386 void ChannelProxy::Close() {
    387   DCHECK(CalledOnValidThread());
    388 
    389   // Clear the backpointer to the listener so that any pending calls to
    390   // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
    391   // possible that the channel could be closed while it is receiving messages!
    392   context_->Clear();
    393 
    394   if (context_->ipc_task_runner()) {
    395     context_->ipc_task_runner()->PostTask(
    396         FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
    397   }
    398 }
    399 
    400 bool ChannelProxy::Send(Message* message) {
    401   DCHECK(did_init_);
    402 
    403   // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
    404   // tests that call Send() from a wrong thread. See http://crbug.com/163523.
    405 
    406 #ifdef IPC_MESSAGE_LOG_ENABLED
    407   Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
    408 #endif
    409 
    410   context_->ipc_task_runner()->PostTask(
    411       FROM_HERE,
    412       base::Bind(&ChannelProxy::Context::OnSendMessage,
    413                  context_, base::Passed(scoped_ptr<Message>(message))));
    414   return true;
    415 }
    416 
    417 void ChannelProxy::AddFilter(MessageFilter* filter) {
    418   DCHECK(CalledOnValidThread());
    419 
    420   context_->AddFilter(filter);
    421 }
    422 
    423 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
    424   DCHECK(CalledOnValidThread());
    425 
    426   context_->ipc_task_runner()->PostTask(
    427       FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
    428                             make_scoped_refptr(filter)));
    429 }
    430 
    431 void ChannelProxy::ClearIPCTaskRunner() {
    432   DCHECK(CalledOnValidThread());
    433 
    434   context()->ClearIPCTaskRunner();
    435 }
    436 
    437 #if defined(OS_POSIX) && !defined(OS_NACL)
    438 // See the TODO regarding lazy initialization of the channel in
    439 // ChannelProxy::Init().
    440 int ChannelProxy::GetClientFileDescriptor() {
    441   DCHECK(CalledOnValidThread());
    442 
    443   Channel* channel = context_.get()->channel_.get();
    444   // Channel must have been created first.
    445   DCHECK(channel) << context_.get()->channel_id_;
    446   return channel->GetClientFileDescriptor();
    447 }
    448 
    449 int ChannelProxy::TakeClientFileDescriptor() {
    450   DCHECK(CalledOnValidThread());
    451 
    452   Channel* channel = context_.get()->channel_.get();
    453   // Channel must have been created first.
    454   DCHECK(channel) << context_.get()->channel_id_;
    455   return channel->TakeClientFileDescriptor();
    456 }
    457 #endif
    458 
    459 //-----------------------------------------------------------------------------
    460 
    461 }  // namespace IPC
    462