1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/bind.h" 6 #include "base/compiler_specific.h" 7 #include "base/debug/trace_event.h" 8 #include "base/location.h" 9 #include "base/memory/ref_counted.h" 10 #include "base/memory/scoped_ptr.h" 11 #include "base/single_thread_task_runner.h" 12 #include "base/thread_task_runner_handle.h" 13 #include "ipc/ipc_channel_proxy.h" 14 #include "ipc/ipc_listener.h" 15 #include "ipc/ipc_logging.h" 16 #include "ipc/ipc_message_macros.h" 17 #include "ipc/ipc_message_utils.h" 18 19 namespace IPC { 20 21 //------------------------------------------------------------------------------ 22 23 ChannelProxy::MessageFilter::MessageFilter() {} 24 25 void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {} 26 27 void ChannelProxy::MessageFilter::OnFilterRemoved() {} 28 29 void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {} 30 31 void ChannelProxy::MessageFilter::OnChannelError() {} 32 33 void ChannelProxy::MessageFilter::OnChannelClosing() {} 34 35 bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) { 36 return false; 37 } 38 39 void ChannelProxy::MessageFilter::OnDestruct() const { 40 delete this; 41 } 42 43 ChannelProxy::MessageFilter::~MessageFilter() {} 44 45 //------------------------------------------------------------------------------ 46 47 ChannelProxy::Context::Context(Listener* listener, 48 base::SingleThreadTaskRunner* ipc_task_runner) 49 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 50 listener_(listener), 51 ipc_task_runner_(ipc_task_runner), 52 channel_connected_called_(false), 53 peer_pid_(base::kNullProcessId) { 54 DCHECK(ipc_task_runner_.get()); 55 } 56 57 ChannelProxy::Context::~Context() { 58 } 59 60 void ChannelProxy::Context::ClearIPCTaskRunner() { 61 ipc_task_runner_ = NULL; 62 } 63 64 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle, 65 const Channel::Mode& mode) { 66 DCHECK(channel_.get() == NULL); 67 channel_id_ = handle.name; 68 channel_.reset(new Channel(handle, mode, this)); 69 } 70 71 bool ChannelProxy::Context::TryFilters(const Message& message) { 72 #ifdef IPC_MESSAGE_LOG_ENABLED 73 Logging* logger = Logging::GetInstance(); 74 if (logger->Enabled()) 75 logger->OnPreDispatchMessage(message); 76 #endif 77 78 for (size_t i = 0; i < filters_.size(); ++i) { 79 if (filters_[i]->OnMessageReceived(message)) { 80 #ifdef IPC_MESSAGE_LOG_ENABLED 81 if (logger->Enabled()) 82 logger->OnPostDispatchMessage(message, channel_id_); 83 #endif 84 return true; 85 } 86 } 87 return false; 88 } 89 90 // Called on the IPC::Channel thread 91 bool ChannelProxy::Context::OnMessageReceived(const Message& message) { 92 // First give a chance to the filters to process this message. 93 if (!TryFilters(message)) 94 OnMessageReceivedNoFilter(message); 95 return true; 96 } 97 98 // Called on the IPC::Channel thread 99 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { 100 // NOTE: This code relies on the listener's message loop not going away while 101 // this thread is active. That should be a reasonable assumption, but it 102 // feels risky. We may want to invent some more indirect way of referring to 103 // a MessageLoop if this becomes a problem. 104 listener_task_runner_->PostTask( 105 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message)); 106 return true; 107 } 108 109 // Called on the IPC::Channel thread 110 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) { 111 // Add any pending filters. This avoids a race condition where someone 112 // creates a ChannelProxy, calls AddFilter, and then right after starts the 113 // peer process. The IO thread could receive a message before the task to add 114 // the filter is run on the IO thread. 115 OnAddFilter(); 116 117 // We cache off the peer_pid so it can be safely accessed from both threads. 118 peer_pid_ = channel_->peer_pid(); 119 for (size_t i = 0; i < filters_.size(); ++i) 120 filters_[i]->OnChannelConnected(peer_pid); 121 122 // See above comment about using listener_task_runner_ here. 123 listener_task_runner_->PostTask( 124 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this)); 125 } 126 127 // Called on the IPC::Channel thread 128 void ChannelProxy::Context::OnChannelError() { 129 for (size_t i = 0; i < filters_.size(); ++i) 130 filters_[i]->OnChannelError(); 131 132 // See above comment about using listener_task_runner_ here. 133 listener_task_runner_->PostTask( 134 FROM_HERE, base::Bind(&Context::OnDispatchError, this)); 135 } 136 137 // Called on the IPC::Channel thread 138 void ChannelProxy::Context::OnChannelOpened() { 139 DCHECK(channel_ != NULL); 140 141 // Assume a reference to ourselves on behalf of this thread. This reference 142 // will be released when we are closed. 143 AddRef(); 144 145 if (!channel_->Connect()) { 146 OnChannelError(); 147 return; 148 } 149 150 for (size_t i = 0; i < filters_.size(); ++i) 151 filters_[i]->OnFilterAdded(channel_.get()); 152 } 153 154 // Called on the IPC::Channel thread 155 void ChannelProxy::Context::OnChannelClosed() { 156 // It's okay for IPC::ChannelProxy::Close to be called more than once, which 157 // would result in this branch being taken. 158 if (!channel_.get()) 159 return; 160 161 for (size_t i = 0; i < filters_.size(); ++i) { 162 filters_[i]->OnChannelClosing(); 163 filters_[i]->OnFilterRemoved(); 164 } 165 166 // We don't need the filters anymore. 167 filters_.clear(); 168 169 channel_.reset(); 170 171 // Balance with the reference taken during startup. This may result in 172 // self-destruction. 173 Release(); 174 } 175 176 void ChannelProxy::Context::Clear() { 177 listener_ = NULL; 178 } 179 180 // Called on the IPC::Channel thread 181 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) { 182 if (!channel_.get()) { 183 OnChannelClosed(); 184 return; 185 } 186 if (!channel_->Send(message.release())) 187 OnChannelError(); 188 } 189 190 // Called on the IPC::Channel thread 191 void ChannelProxy::Context::OnAddFilter() { 192 std::vector<scoped_refptr<MessageFilter> > new_filters; 193 { 194 base::AutoLock auto_lock(pending_filters_lock_); 195 new_filters.swap(pending_filters_); 196 } 197 198 for (size_t i = 0; i < new_filters.size(); ++i) { 199 filters_.push_back(new_filters[i]); 200 201 // If the channel has already been created, then we need to send this 202 // message so that the filter gets access to the Channel. 203 if (channel_.get()) 204 new_filters[i]->OnFilterAdded(channel_.get()); 205 // Ditto for if the channel has been connected. 206 if (peer_pid_) 207 new_filters[i]->OnChannelConnected(peer_pid_); 208 } 209 } 210 211 // Called on the IPC::Channel thread 212 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { 213 if (!channel_.get()) 214 return; // The filters have already been deleted. 215 216 for (size_t i = 0; i < filters_.size(); ++i) { 217 if (filters_[i].get() == filter) { 218 filter->OnFilterRemoved(); 219 filters_.erase(filters_.begin() + i); 220 return; 221 } 222 } 223 224 NOTREACHED() << "filter to be removed not found"; 225 } 226 227 // Called on the listener's thread 228 void ChannelProxy::Context::AddFilter(MessageFilter* filter) { 229 base::AutoLock auto_lock(pending_filters_lock_); 230 pending_filters_.push_back(make_scoped_refptr(filter)); 231 ipc_task_runner_->PostTask( 232 FROM_HERE, base::Bind(&Context::OnAddFilter, this)); 233 } 234 235 // Called on the listener's thread 236 void ChannelProxy::Context::OnDispatchMessage(const Message& message) { 237 #ifdef IPC_MESSAGE_LOG_ENABLED 238 Logging* logger = Logging::GetInstance(); 239 std::string name; 240 logger->GetMessageText(message.type(), &name, &message, NULL); 241 TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage", 242 "name", name); 243 #else 244 TRACE_EVENT2("task", "ChannelProxy::Context::OnDispatchMessage", 245 "class", IPC_MESSAGE_ID_CLASS(message.type()), 246 "line", IPC_MESSAGE_ID_LINE(message.type())); 247 #endif 248 249 if (!listener_) 250 return; 251 252 OnDispatchConnected(); 253 254 #ifdef IPC_MESSAGE_LOG_ENABLED 255 if (message.type() == IPC_LOGGING_ID) { 256 logger->OnReceivedLoggingMessage(message); 257 return; 258 } 259 260 if (logger->Enabled()) 261 logger->OnPreDispatchMessage(message); 262 #endif 263 264 listener_->OnMessageReceived(message); 265 266 #ifdef IPC_MESSAGE_LOG_ENABLED 267 if (logger->Enabled()) 268 logger->OnPostDispatchMessage(message, channel_id_); 269 #endif 270 } 271 272 // Called on the listener's thread 273 void ChannelProxy::Context::OnDispatchConnected() { 274 if (channel_connected_called_) 275 return; 276 277 channel_connected_called_ = true; 278 if (listener_) 279 listener_->OnChannelConnected(peer_pid_); 280 } 281 282 // Called on the listener's thread 283 void ChannelProxy::Context::OnDispatchError() { 284 if (listener_) 285 listener_->OnChannelError(); 286 } 287 288 //----------------------------------------------------------------------------- 289 290 ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle, 291 Channel::Mode mode, 292 Listener* listener, 293 base::SingleThreadTaskRunner* ipc_task_runner) 294 : context_(new Context(listener, ipc_task_runner)), 295 outgoing_message_filter_(NULL), 296 did_init_(false) { 297 Init(channel_handle, mode, true); 298 } 299 300 ChannelProxy::ChannelProxy(Context* context) 301 : context_(context), 302 outgoing_message_filter_(NULL), 303 did_init_(false) { 304 } 305 306 ChannelProxy::~ChannelProxy() { 307 DCHECK(CalledOnValidThread()); 308 309 Close(); 310 } 311 312 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, 313 Channel::Mode mode, 314 bool create_pipe_now) { 315 DCHECK(CalledOnValidThread()); 316 DCHECK(!did_init_); 317 #if defined(OS_POSIX) 318 // When we are creating a server on POSIX, we need its file descriptor 319 // to be created immediately so that it can be accessed and passed 320 // to other processes. Forcing it to be created immediately avoids 321 // race conditions that may otherwise arise. 322 if (mode & Channel::MODE_SERVER_FLAG) { 323 create_pipe_now = true; 324 } 325 #endif // defined(OS_POSIX) 326 327 if (create_pipe_now) { 328 // Create the channel immediately. This effectively sets up the 329 // low-level pipe so that the client can connect. Without creating 330 // the pipe immediately, it is possible for a listener to attempt 331 // to connect and get an error since the pipe doesn't exist yet. 332 context_->CreateChannel(channel_handle, mode); 333 } else { 334 context_->ipc_task_runner()->PostTask( 335 FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(), 336 channel_handle, mode)); 337 } 338 339 // complete initialization on the background thread 340 context_->ipc_task_runner()->PostTask( 341 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get())); 342 343 did_init_ = true; 344 } 345 346 void ChannelProxy::Close() { 347 DCHECK(CalledOnValidThread()); 348 349 // Clear the backpointer to the listener so that any pending calls to 350 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is 351 // possible that the channel could be closed while it is receiving messages! 352 context_->Clear(); 353 354 if (context_->ipc_task_runner()) { 355 context_->ipc_task_runner()->PostTask( 356 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get())); 357 } 358 } 359 360 bool ChannelProxy::Send(Message* message) { 361 DCHECK(did_init_); 362 363 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are 364 // tests that call Send() from a wrong thread. See http://crbug.com/163523. 365 if (outgoing_message_filter()) 366 message = outgoing_message_filter()->Rewrite(message); 367 368 #ifdef IPC_MESSAGE_LOG_ENABLED 369 Logging::GetInstance()->OnSendMessage(message, context_->channel_id()); 370 #endif 371 372 context_->ipc_task_runner()->PostTask( 373 FROM_HERE, 374 base::Bind(&ChannelProxy::Context::OnSendMessage, 375 context_, base::Passed(scoped_ptr<Message>(message)))); 376 return true; 377 } 378 379 void ChannelProxy::AddFilter(MessageFilter* filter) { 380 DCHECK(CalledOnValidThread()); 381 382 context_->AddFilter(filter); 383 } 384 385 void ChannelProxy::RemoveFilter(MessageFilter* filter) { 386 DCHECK(CalledOnValidThread()); 387 388 context_->ipc_task_runner()->PostTask( 389 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(), 390 make_scoped_refptr(filter))); 391 } 392 393 void ChannelProxy::ClearIPCTaskRunner() { 394 DCHECK(CalledOnValidThread()); 395 396 context()->ClearIPCTaskRunner(); 397 } 398 399 #if defined(OS_POSIX) && !defined(OS_NACL) 400 // See the TODO regarding lazy initialization of the channel in 401 // ChannelProxy::Init(). 402 int ChannelProxy::GetClientFileDescriptor() { 403 DCHECK(CalledOnValidThread()); 404 405 Channel* channel = context_.get()->channel_.get(); 406 // Channel must have been created first. 407 DCHECK(channel) << context_.get()->channel_id_; 408 return channel->GetClientFileDescriptor(); 409 } 410 411 int ChannelProxy::TakeClientFileDescriptor() { 412 DCHECK(CalledOnValidThread()); 413 414 Channel* channel = context_.get()->channel_.get(); 415 // Channel must have been created first. 416 DCHECK(channel) << context_.get()->channel_id_; 417 return channel->TakeClientFileDescriptor(); 418 } 419 420 bool ChannelProxy::GetPeerEuid(uid_t* peer_euid) const { 421 DCHECK(CalledOnValidThread()); 422 423 Channel* channel = context_.get()->channel_.get(); 424 // Channel must have been created first. 425 DCHECK(channel) << context_.get()->channel_id_; 426 return channel->GetPeerEuid(peer_euid); 427 } 428 #endif 429 430 //----------------------------------------------------------------------------- 431 432 } // namespace IPC 433