Home | History | Annotate | Download | only in ipc
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/bind.h"
      6 #include "base/compiler_specific.h"
      7 #include "base/debug/trace_event.h"
      8 #include "base/location.h"
      9 #include "base/memory/ref_counted.h"
     10 #include "base/memory/scoped_ptr.h"
     11 #include "base/single_thread_task_runner.h"
     12 #include "base/thread_task_runner_handle.h"
     13 #include "ipc/ipc_channel_proxy.h"
     14 #include "ipc/ipc_listener.h"
     15 #include "ipc/ipc_logging.h"
     16 #include "ipc/ipc_message_macros.h"
     17 #include "ipc/ipc_message_utils.h"
     18 
     19 namespace IPC {
     20 
     21 //------------------------------------------------------------------------------
     22 
     23 ChannelProxy::MessageFilter::MessageFilter() {}
     24 
     25 void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {}
     26 
     27 void ChannelProxy::MessageFilter::OnFilterRemoved() {}
     28 
     29 void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {}
     30 
     31 void ChannelProxy::MessageFilter::OnChannelError() {}
     32 
     33 void ChannelProxy::MessageFilter::OnChannelClosing() {}
     34 
     35 bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) {
     36   return false;
     37 }
     38 
     39 void ChannelProxy::MessageFilter::OnDestruct() const {
     40   delete this;
     41 }
     42 
     43 ChannelProxy::MessageFilter::~MessageFilter() {}
     44 
     45 //------------------------------------------------------------------------------
     46 
     47 ChannelProxy::Context::Context(Listener* listener,
     48                                base::SingleThreadTaskRunner* ipc_task_runner)
     49     : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
     50       listener_(listener),
     51       ipc_task_runner_(ipc_task_runner),
     52       channel_connected_called_(false),
     53       peer_pid_(base::kNullProcessId) {
     54   DCHECK(ipc_task_runner_.get());
     55 }
     56 
     57 ChannelProxy::Context::~Context() {
     58 }
     59 
     60 void ChannelProxy::Context::ClearIPCTaskRunner() {
     61   ipc_task_runner_ = NULL;
     62 }
     63 
     64 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
     65                                           const Channel::Mode& mode) {
     66   DCHECK(channel_.get() == NULL);
     67   channel_id_ = handle.name;
     68   channel_.reset(new Channel(handle, mode, this));
     69 }
     70 
     71 bool ChannelProxy::Context::TryFilters(const Message& message) {
     72 #ifdef IPC_MESSAGE_LOG_ENABLED
     73   Logging* logger = Logging::GetInstance();
     74   if (logger->Enabled())
     75     logger->OnPreDispatchMessage(message);
     76 #endif
     77 
     78   for (size_t i = 0; i < filters_.size(); ++i) {
     79     if (filters_[i]->OnMessageReceived(message)) {
     80 #ifdef IPC_MESSAGE_LOG_ENABLED
     81       if (logger->Enabled())
     82         logger->OnPostDispatchMessage(message, channel_id_);
     83 #endif
     84       return true;
     85     }
     86   }
     87   return false;
     88 }
     89 
     90 // Called on the IPC::Channel thread
     91 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
     92   // First give a chance to the filters to process this message.
     93   if (!TryFilters(message))
     94     OnMessageReceivedNoFilter(message);
     95   return true;
     96 }
     97 
     98 // Called on the IPC::Channel thread
     99 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
    100   // NOTE: This code relies on the listener's message loop not going away while
    101   // this thread is active.  That should be a reasonable assumption, but it
    102   // feels risky.  We may want to invent some more indirect way of referring to
    103   // a MessageLoop if this becomes a problem.
    104   listener_task_runner_->PostTask(
    105       FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
    106   return true;
    107 }
    108 
    109 // Called on the IPC::Channel thread
    110 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
    111   // Add any pending filters.  This avoids a race condition where someone
    112   // creates a ChannelProxy, calls AddFilter, and then right after starts the
    113   // peer process.  The IO thread could receive a message before the task to add
    114   // the filter is run on the IO thread.
    115   OnAddFilter();
    116 
    117   // We cache off the peer_pid so it can be safely accessed from both threads.
    118   peer_pid_ = channel_->peer_pid();
    119   for (size_t i = 0; i < filters_.size(); ++i)
    120     filters_[i]->OnChannelConnected(peer_pid);
    121 
    122   // See above comment about using listener_task_runner_ here.
    123   listener_task_runner_->PostTask(
    124       FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
    125 }
    126 
    127 // Called on the IPC::Channel thread
    128 void ChannelProxy::Context::OnChannelError() {
    129   for (size_t i = 0; i < filters_.size(); ++i)
    130     filters_[i]->OnChannelError();
    131 
    132   // See above comment about using listener_task_runner_ here.
    133   listener_task_runner_->PostTask(
    134       FROM_HERE, base::Bind(&Context::OnDispatchError, this));
    135 }
    136 
    137 // Called on the IPC::Channel thread
    138 void ChannelProxy::Context::OnChannelOpened() {
    139   DCHECK(channel_ != NULL);
    140 
    141   // Assume a reference to ourselves on behalf of this thread.  This reference
    142   // will be released when we are closed.
    143   AddRef();
    144 
    145   if (!channel_->Connect()) {
    146     OnChannelError();
    147     return;
    148   }
    149 
    150   for (size_t i = 0; i < filters_.size(); ++i)
    151     filters_[i]->OnFilterAdded(channel_.get());
    152 }
    153 
    154 // Called on the IPC::Channel thread
    155 void ChannelProxy::Context::OnChannelClosed() {
    156   // It's okay for IPC::ChannelProxy::Close to be called more than once, which
    157   // would result in this branch being taken.
    158   if (!channel_.get())
    159     return;
    160 
    161   for (size_t i = 0; i < filters_.size(); ++i) {
    162     filters_[i]->OnChannelClosing();
    163     filters_[i]->OnFilterRemoved();
    164   }
    165 
    166   // We don't need the filters anymore.
    167   filters_.clear();
    168 
    169   channel_.reset();
    170 
    171   // Balance with the reference taken during startup.  This may result in
    172   // self-destruction.
    173   Release();
    174 }
    175 
    176 void ChannelProxy::Context::Clear() {
    177   listener_ = NULL;
    178 }
    179 
    180 // Called on the IPC::Channel thread
    181 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
    182   if (!channel_.get()) {
    183     OnChannelClosed();
    184     return;
    185   }
    186   if (!channel_->Send(message.release()))
    187     OnChannelError();
    188 }
    189 
    190 // Called on the IPC::Channel thread
    191 void ChannelProxy::Context::OnAddFilter() {
    192   std::vector<scoped_refptr<MessageFilter> > new_filters;
    193   {
    194     base::AutoLock auto_lock(pending_filters_lock_);
    195     new_filters.swap(pending_filters_);
    196   }
    197 
    198   for (size_t i = 0; i < new_filters.size(); ++i) {
    199     filters_.push_back(new_filters[i]);
    200 
    201     // If the channel has already been created, then we need to send this
    202     // message so that the filter gets access to the Channel.
    203     if (channel_.get())
    204       new_filters[i]->OnFilterAdded(channel_.get());
    205     // Ditto for if the channel has been connected.
    206     if (peer_pid_)
    207       new_filters[i]->OnChannelConnected(peer_pid_);
    208   }
    209 }
    210 
    211 // Called on the IPC::Channel thread
    212 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
    213   if (!channel_.get())
    214     return;  // The filters have already been deleted.
    215 
    216   for (size_t i = 0; i < filters_.size(); ++i) {
    217     if (filters_[i].get() == filter) {
    218       filter->OnFilterRemoved();
    219       filters_.erase(filters_.begin() + i);
    220       return;
    221     }
    222   }
    223 
    224   NOTREACHED() << "filter to be removed not found";
    225 }
    226 
    227 // Called on the listener's thread
    228 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
    229   base::AutoLock auto_lock(pending_filters_lock_);
    230   pending_filters_.push_back(make_scoped_refptr(filter));
    231   ipc_task_runner_->PostTask(
    232       FROM_HERE, base::Bind(&Context::OnAddFilter, this));
    233 }
    234 
    235 // Called on the listener's thread
    236 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
    237 #ifdef IPC_MESSAGE_LOG_ENABLED
    238   Logging* logger = Logging::GetInstance();
    239   std::string name;
    240   logger->GetMessageText(message.type(), &name, &message, NULL);
    241   TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage",
    242                "name", name);
    243 #else
    244   TRACE_EVENT2("task", "ChannelProxy::Context::OnDispatchMessage",
    245                "class", IPC_MESSAGE_ID_CLASS(message.type()),
    246                "line", IPC_MESSAGE_ID_LINE(message.type()));
    247 #endif
    248 
    249   if (!listener_)
    250     return;
    251 
    252   OnDispatchConnected();
    253 
    254 #ifdef IPC_MESSAGE_LOG_ENABLED
    255   if (message.type() == IPC_LOGGING_ID) {
    256     logger->OnReceivedLoggingMessage(message);
    257     return;
    258   }
    259 
    260   if (logger->Enabled())
    261     logger->OnPreDispatchMessage(message);
    262 #endif
    263 
    264   listener_->OnMessageReceived(message);
    265 
    266 #ifdef IPC_MESSAGE_LOG_ENABLED
    267   if (logger->Enabled())
    268     logger->OnPostDispatchMessage(message, channel_id_);
    269 #endif
    270 }
    271 
    272 // Called on the listener's thread
    273 void ChannelProxy::Context::OnDispatchConnected() {
    274   if (channel_connected_called_)
    275     return;
    276 
    277   channel_connected_called_ = true;
    278   if (listener_)
    279     listener_->OnChannelConnected(peer_pid_);
    280 }
    281 
    282 // Called on the listener's thread
    283 void ChannelProxy::Context::OnDispatchError() {
    284   if (listener_)
    285     listener_->OnChannelError();
    286 }
    287 
    288 //-----------------------------------------------------------------------------
    289 
    290 ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle,
    291                            Channel::Mode mode,
    292                            Listener* listener,
    293                            base::SingleThreadTaskRunner* ipc_task_runner)
    294     : context_(new Context(listener, ipc_task_runner)),
    295       outgoing_message_filter_(NULL),
    296       did_init_(false) {
    297   Init(channel_handle, mode, true);
    298 }
    299 
    300 ChannelProxy::ChannelProxy(Context* context)
    301     : context_(context),
    302       outgoing_message_filter_(NULL),
    303       did_init_(false) {
    304 }
    305 
    306 ChannelProxy::~ChannelProxy() {
    307   DCHECK(CalledOnValidThread());
    308 
    309   Close();
    310 }
    311 
    312 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
    313                         Channel::Mode mode,
    314                         bool create_pipe_now) {
    315   DCHECK(CalledOnValidThread());
    316   DCHECK(!did_init_);
    317 #if defined(OS_POSIX)
    318   // When we are creating a server on POSIX, we need its file descriptor
    319   // to be created immediately so that it can be accessed and passed
    320   // to other processes. Forcing it to be created immediately avoids
    321   // race conditions that may otherwise arise.
    322   if (mode & Channel::MODE_SERVER_FLAG) {
    323     create_pipe_now = true;
    324   }
    325 #endif  // defined(OS_POSIX)
    326 
    327   if (create_pipe_now) {
    328     // Create the channel immediately.  This effectively sets up the
    329     // low-level pipe so that the client can connect.  Without creating
    330     // the pipe immediately, it is possible for a listener to attempt
    331     // to connect and get an error since the pipe doesn't exist yet.
    332     context_->CreateChannel(channel_handle, mode);
    333   } else {
    334     context_->ipc_task_runner()->PostTask(
    335         FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
    336                               channel_handle, mode));
    337   }
    338 
    339   // complete initialization on the background thread
    340   context_->ipc_task_runner()->PostTask(
    341       FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
    342 
    343   did_init_ = true;
    344 }
    345 
    346 void ChannelProxy::Close() {
    347   DCHECK(CalledOnValidThread());
    348 
    349   // Clear the backpointer to the listener so that any pending calls to
    350   // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
    351   // possible that the channel could be closed while it is receiving messages!
    352   context_->Clear();
    353 
    354   if (context_->ipc_task_runner()) {
    355     context_->ipc_task_runner()->PostTask(
    356         FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
    357   }
    358 }
    359 
    360 bool ChannelProxy::Send(Message* message) {
    361   DCHECK(did_init_);
    362 
    363   // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
    364   // tests that call Send() from a wrong thread. See http://crbug.com/163523.
    365   if (outgoing_message_filter())
    366     message = outgoing_message_filter()->Rewrite(message);
    367 
    368 #ifdef IPC_MESSAGE_LOG_ENABLED
    369   Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
    370 #endif
    371 
    372   context_->ipc_task_runner()->PostTask(
    373       FROM_HERE,
    374       base::Bind(&ChannelProxy::Context::OnSendMessage,
    375                  context_, base::Passed(scoped_ptr<Message>(message))));
    376   return true;
    377 }
    378 
    379 void ChannelProxy::AddFilter(MessageFilter* filter) {
    380   DCHECK(CalledOnValidThread());
    381 
    382   context_->AddFilter(filter);
    383 }
    384 
    385 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
    386   DCHECK(CalledOnValidThread());
    387 
    388   context_->ipc_task_runner()->PostTask(
    389       FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
    390                             make_scoped_refptr(filter)));
    391 }
    392 
    393 void ChannelProxy::ClearIPCTaskRunner() {
    394   DCHECK(CalledOnValidThread());
    395 
    396   context()->ClearIPCTaskRunner();
    397 }
    398 
    399 #if defined(OS_POSIX) && !defined(OS_NACL)
    400 // See the TODO regarding lazy initialization of the channel in
    401 // ChannelProxy::Init().
    402 int ChannelProxy::GetClientFileDescriptor() {
    403   DCHECK(CalledOnValidThread());
    404 
    405   Channel* channel = context_.get()->channel_.get();
    406   // Channel must have been created first.
    407   DCHECK(channel) << context_.get()->channel_id_;
    408   return channel->GetClientFileDescriptor();
    409 }
    410 
    411 int ChannelProxy::TakeClientFileDescriptor() {
    412   DCHECK(CalledOnValidThread());
    413 
    414   Channel* channel = context_.get()->channel_.get();
    415   // Channel must have been created first.
    416   DCHECK(channel) << context_.get()->channel_id_;
    417   return channel->TakeClientFileDescriptor();
    418 }
    419 
    420 bool ChannelProxy::GetPeerEuid(uid_t* peer_euid) const {
    421   DCHECK(CalledOnValidThread());
    422 
    423   Channel* channel = context_.get()->channel_.get();
    424   // Channel must have been created first.
    425   DCHECK(channel) << context_.get()->channel_id_;
    426   return channel->GetPeerEuid(peer_euid);
    427 }
    428 #endif
    429 
    430 //-----------------------------------------------------------------------------
    431 
    432 }  // namespace IPC
    433