1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "ipc/ipc_channel_proxy.h" 6 7 #include <stddef.h> 8 #include <stdint.h> 9 10 #include <utility> 11 12 #include "base/bind.h" 13 #include "base/compiler_specific.h" 14 #include "base/location.h" 15 #include "base/memory/ptr_util.h" 16 #include "base/memory/ref_counted.h" 17 #include "base/single_thread_task_runner.h" 18 #include "base/threading/thread_task_runner_handle.h" 19 #include "build/build_config.h" 20 #include "ipc/ipc_channel_factory.h" 21 #include "ipc/ipc_listener.h" 22 #include "ipc/ipc_logging.h" 23 #include "ipc/ipc_message_macros.h" 24 #include "ipc/message_filter.h" 25 #include "ipc/message_filter_router.h" 26 27 namespace IPC { 28 29 //------------------------------------------------------------------------------ 30 31 ChannelProxy::Context::Context( 32 Listener* listener, 33 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 34 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) 35 : listener_task_runner_(listener_task_runner), 36 listener_(listener), 37 ipc_task_runner_(ipc_task_runner), 38 channel_connected_called_(false), 39 message_filter_router_(new MessageFilterRouter()), 40 peer_pid_(base::kNullProcessId) { 41 DCHECK(ipc_task_runner_.get()); 42 // The Listener thread where Messages are handled must be a separate thread 43 // to avoid oversubscribing the IO thread. If you trigger this error, you 44 // need to either: 45 // 1) Create the ChannelProxy on a different thread, or 46 // 2) Just use Channel 47 // Note, we currently make an exception for a NULL listener. That usage 48 // basically works, but is outside the intent of ChannelProxy. This support 49 // will disappear, so please don't rely on it. See crbug.com/364241 50 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get())); 51 } 52 53 ChannelProxy::Context::~Context() = default; 54 55 void ChannelProxy::Context::ClearIPCTaskRunner() { 56 ipc_task_runner_ = NULL; 57 } 58 59 void ChannelProxy::Context::CreateChannel( 60 std::unique_ptr<ChannelFactory> factory) { 61 base::AutoLock l(channel_lifetime_lock_); 62 DCHECK(!channel_); 63 DCHECK_EQ(factory->GetIPCTaskRunner(), ipc_task_runner_); 64 channel_ = factory->BuildChannel(this); 65 66 Channel::AssociatedInterfaceSupport* support = 67 channel_->GetAssociatedInterfaceSupport(); 68 if (support) { 69 thread_safe_channel_ = support->CreateThreadSafeChannel(); 70 71 base::AutoLock l(pending_filters_lock_); 72 for (auto& entry : pending_io_thread_interfaces_) 73 support->AddGenericAssociatedInterface(entry.first, entry.second); 74 pending_io_thread_interfaces_.clear(); 75 } 76 } 77 78 bool ChannelProxy::Context::TryFilters(const Message& message) { 79 DCHECK(message_filter_router_); 80 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED) 81 Logging* logger = Logging::GetInstance(); 82 if (logger->Enabled()) 83 logger->OnPreDispatchMessage(message); 84 #endif 85 86 if (message_filter_router_->TryFilters(message)) { 87 if (message.dispatch_error()) { 88 listener_task_runner_->PostTask( 89 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message)); 90 } 91 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED) 92 if (logger->Enabled()) 93 logger->OnPostDispatchMessage(message); 94 #endif 95 return true; 96 } 97 return false; 98 } 99 100 // Called on the IPC::Channel thread 101 void ChannelProxy::Context::PauseChannel() { 102 DCHECK(channel_); 103 channel_->Pause(); 104 } 105 106 // Called on the IPC::Channel thread 107 void ChannelProxy::Context::UnpauseChannel(bool flush) { 108 DCHECK(channel_); 109 channel_->Unpause(flush); 110 } 111 112 // Called on the IPC::Channel thread 113 void ChannelProxy::Context::FlushChannel() { 114 DCHECK(channel_); 115 channel_->Flush(); 116 } 117 118 // Called on the IPC::Channel thread 119 bool ChannelProxy::Context::OnMessageReceived(const Message& message) { 120 // First give a chance to the filters to process this message. 121 if (!TryFilters(message)) 122 OnMessageReceivedNoFilter(message); 123 return true; 124 } 125 126 // Called on the IPC::Channel thread 127 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { 128 listener_task_runner_->PostTask( 129 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message)); 130 return true; 131 } 132 133 // Called on the IPC::Channel thread 134 void ChannelProxy::Context::OnChannelConnected(int32_t peer_pid) { 135 // We cache off the peer_pid so it can be safely accessed from both threads. 136 { 137 base::AutoLock l(peer_pid_lock_); 138 peer_pid_ = peer_pid; 139 } 140 141 // Add any pending filters. This avoids a race condition where someone 142 // creates a ChannelProxy, calls AddFilter, and then right after starts the 143 // peer process. The IO thread could receive a message before the task to add 144 // the filter is run on the IO thread. 145 OnAddFilter(); 146 147 // See above comment about using listener_task_runner_ here. 148 listener_task_runner_->PostTask( 149 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this)); 150 } 151 152 // Called on the IPC::Channel thread 153 void ChannelProxy::Context::OnChannelError() { 154 for (size_t i = 0; i < filters_.size(); ++i) 155 filters_[i]->OnChannelError(); 156 157 // See above comment about using listener_task_runner_ here. 158 listener_task_runner_->PostTask( 159 FROM_HERE, base::Bind(&Context::OnDispatchError, this)); 160 } 161 162 // Called on the IPC::Channel thread 163 void ChannelProxy::Context::OnAssociatedInterfaceRequest( 164 const std::string& interface_name, 165 mojo::ScopedInterfaceEndpointHandle handle) { 166 listener_task_runner_->PostTask( 167 FROM_HERE, base::Bind(&Context::OnDispatchAssociatedInterfaceRequest, 168 this, interface_name, base::Passed(&handle))); 169 } 170 171 // Called on the IPC::Channel thread 172 void ChannelProxy::Context::OnChannelOpened() { 173 DCHECK(channel_ != NULL); 174 175 // Assume a reference to ourselves on behalf of this thread. This reference 176 // will be released when we are closed. 177 AddRef(); 178 179 if (!channel_->Connect()) { 180 OnChannelError(); 181 return; 182 } 183 184 for (size_t i = 0; i < filters_.size(); ++i) 185 filters_[i]->OnFilterAdded(channel_.get()); 186 } 187 188 // Called on the IPC::Channel thread 189 void ChannelProxy::Context::OnChannelClosed() { 190 // It's okay for IPC::ChannelProxy::Close to be called more than once, which 191 // would result in this branch being taken. 192 if (!channel_) 193 return; 194 195 for (auto& filter : pending_filters_) { 196 filter->OnChannelClosing(); 197 filter->OnFilterRemoved(); 198 } 199 for (auto& filter : filters_) { 200 filter->OnChannelClosing(); 201 filter->OnFilterRemoved(); 202 } 203 204 // We don't need the filters anymore. 205 message_filter_router_->Clear(); 206 filters_.clear(); 207 // We don't need the lock, because at this point, the listener thread can't 208 // access it any more. 209 pending_filters_.clear(); 210 211 ClearChannel(); 212 213 // Balance with the reference taken during startup. This may result in 214 // self-destruction. 215 Release(); 216 } 217 218 void ChannelProxy::Context::Clear() { 219 listener_ = NULL; 220 } 221 222 // Called on the IPC::Channel thread 223 void ChannelProxy::Context::OnSendMessage(std::unique_ptr<Message> message) { 224 if (!channel_) { 225 OnChannelClosed(); 226 return; 227 } 228 229 if (!channel_->Send(message.release())) 230 OnChannelError(); 231 } 232 233 // Called on the IPC::Channel thread 234 void ChannelProxy::Context::OnAddFilter() { 235 // Our OnChannelConnected method has not yet been called, so we can't be 236 // sure that channel_ is valid yet. When OnChannelConnected *is* called, 237 // it invokes OnAddFilter, so any pending filter(s) will be added at that 238 // time. 239 // No lock necessary for |peer_pid_| because it is only modified on this 240 // thread. 241 if (peer_pid_ == base::kNullProcessId) 242 return; 243 244 std::vector<scoped_refptr<MessageFilter> > new_filters; 245 { 246 base::AutoLock auto_lock(pending_filters_lock_); 247 new_filters.swap(pending_filters_); 248 } 249 250 for (size_t i = 0; i < new_filters.size(); ++i) { 251 filters_.push_back(new_filters[i]); 252 253 message_filter_router_->AddFilter(new_filters[i].get()); 254 255 // The channel has already been created and connected, so we need to 256 // inform the filters right now. 257 new_filters[i]->OnFilterAdded(channel_.get()); 258 new_filters[i]->OnChannelConnected(peer_pid_); 259 } 260 } 261 262 // Called on the IPC::Channel thread 263 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { 264 // No lock necessary for |peer_pid_| because it is only modified on this 265 // thread. 266 if (peer_pid_ == base::kNullProcessId) { 267 // The channel is not yet connected, so any filters are still pending. 268 base::AutoLock auto_lock(pending_filters_lock_); 269 for (size_t i = 0; i < pending_filters_.size(); ++i) { 270 if (pending_filters_[i].get() == filter) { 271 filter->OnFilterRemoved(); 272 pending_filters_.erase(pending_filters_.begin() + i); 273 return; 274 } 275 } 276 return; 277 } 278 if (!channel_) 279 return; // The filters have already been deleted. 280 281 message_filter_router_->RemoveFilter(filter); 282 283 for (size_t i = 0; i < filters_.size(); ++i) { 284 if (filters_[i].get() == filter) { 285 filter->OnFilterRemoved(); 286 filters_.erase(filters_.begin() + i); 287 return; 288 } 289 } 290 291 NOTREACHED() << "filter to be removed not found"; 292 } 293 294 // Called on the listener's thread 295 void ChannelProxy::Context::AddFilter(MessageFilter* filter) { 296 base::AutoLock auto_lock(pending_filters_lock_); 297 pending_filters_.push_back(base::WrapRefCounted(filter)); 298 ipc_task_runner_->PostTask( 299 FROM_HERE, base::Bind(&Context::OnAddFilter, this)); 300 } 301 302 // Called on the listener's thread 303 void ChannelProxy::Context::OnDispatchMessage(const Message& message) { 304 if (!listener_) 305 return; 306 307 OnDispatchConnected(); 308 309 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED) 310 Logging* logger = Logging::GetInstance(); 311 if (message.type() == IPC_LOGGING_ID) { 312 logger->OnReceivedLoggingMessage(message); 313 return; 314 } 315 316 if (logger->Enabled()) 317 logger->OnPreDispatchMessage(message); 318 #endif 319 320 listener_->OnMessageReceived(message); 321 if (message.dispatch_error()) 322 listener_->OnBadMessageReceived(message); 323 324 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED) 325 if (logger->Enabled()) 326 logger->OnPostDispatchMessage(message); 327 #endif 328 } 329 330 // Called on the listener's thread 331 void ChannelProxy::Context::OnDispatchConnected() { 332 if (channel_connected_called_) 333 return; 334 335 base::ProcessId peer_pid; 336 { 337 base::AutoLock l(peer_pid_lock_); 338 peer_pid = peer_pid_; 339 } 340 channel_connected_called_ = true; 341 if (listener_) 342 listener_->OnChannelConnected(peer_pid); 343 } 344 345 // Called on the listener's thread 346 void ChannelProxy::Context::OnDispatchError() { 347 if (listener_) 348 listener_->OnChannelError(); 349 } 350 351 // Called on the listener's thread 352 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) { 353 if (listener_) 354 listener_->OnBadMessageReceived(message); 355 } 356 357 // Called on the listener's thread 358 void ChannelProxy::Context::OnDispatchAssociatedInterfaceRequest( 359 const std::string& interface_name, 360 mojo::ScopedInterfaceEndpointHandle handle) { 361 if (listener_) 362 listener_->OnAssociatedInterfaceRequest(interface_name, std::move(handle)); 363 } 364 365 void ChannelProxy::Context::ClearChannel() { 366 base::AutoLock l(channel_lifetime_lock_); 367 channel_.reset(); 368 } 369 370 void ChannelProxy::Context::AddGenericAssociatedInterfaceForIOThread( 371 const std::string& name, 372 const GenericAssociatedInterfaceFactory& factory) { 373 base::AutoLock l(channel_lifetime_lock_); 374 if (!channel_) { 375 base::AutoLock l(pending_filters_lock_); 376 pending_io_thread_interfaces_.emplace_back(name, factory); 377 return; 378 } 379 Channel::AssociatedInterfaceSupport* support = 380 channel_->GetAssociatedInterfaceSupport(); 381 if (support) 382 support->AddGenericAssociatedInterface(name, factory); 383 } 384 385 void ChannelProxy::Context::Send(Message* message) { 386 ipc_task_runner()->PostTask( 387 FROM_HERE, base::Bind(&ChannelProxy::Context::OnSendMessage, this, 388 base::Passed(base::WrapUnique(message)))); 389 } 390 391 //----------------------------------------------------------------------------- 392 393 // static 394 std::unique_ptr<ChannelProxy> ChannelProxy::Create( 395 const IPC::ChannelHandle& channel_handle, 396 Channel::Mode mode, 397 Listener* listener, 398 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 399 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) { 400 std::unique_ptr<ChannelProxy> channel( 401 new ChannelProxy(listener, ipc_task_runner, listener_task_runner)); 402 channel->Init(channel_handle, mode, true); 403 return channel; 404 } 405 406 // static 407 std::unique_ptr<ChannelProxy> ChannelProxy::Create( 408 std::unique_ptr<ChannelFactory> factory, 409 Listener* listener, 410 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 411 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) { 412 std::unique_ptr<ChannelProxy> channel( 413 new ChannelProxy(listener, ipc_task_runner, listener_task_runner)); 414 channel->Init(std::move(factory), true); 415 return channel; 416 } 417 418 ChannelProxy::ChannelProxy(Context* context) 419 : context_(context), did_init_(false) { 420 #if defined(ENABLE_IPC_FUZZER) 421 outgoing_message_filter_ = NULL; 422 #endif 423 } 424 425 ChannelProxy::ChannelProxy( 426 Listener* listener, 427 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 428 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) 429 : context_(new Context(listener, ipc_task_runner, listener_task_runner)), 430 did_init_(false) { 431 #if defined(ENABLE_IPC_FUZZER) 432 outgoing_message_filter_ = NULL; 433 #endif 434 } 435 436 ChannelProxy::~ChannelProxy() { 437 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 438 439 Close(); 440 } 441 442 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, 443 Channel::Mode mode, 444 bool create_pipe_now) { 445 #if defined(OS_POSIX) || defined(OS_FUCHSIA) 446 // When we are creating a server on POSIX, we need its file descriptor 447 // to be created immediately so that it can be accessed and passed 448 // to other processes. Forcing it to be created immediately avoids 449 // race conditions that may otherwise arise. 450 if (mode & Channel::MODE_SERVER_FLAG) { 451 create_pipe_now = true; 452 } 453 #endif // defined(OS_POSIX) || defined(OS_FUCHSIA) 454 Init( 455 ChannelFactory::Create(channel_handle, mode, context_->ipc_task_runner()), 456 create_pipe_now); 457 } 458 459 void ChannelProxy::Init(std::unique_ptr<ChannelFactory> factory, 460 bool create_pipe_now) { 461 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 462 DCHECK(!did_init_); 463 464 if (create_pipe_now) { 465 // Create the channel immediately. This effectively sets up the 466 // low-level pipe so that the client can connect. Without creating 467 // the pipe immediately, it is possible for a listener to attempt 468 // to connect and get an error since the pipe doesn't exist yet. 469 context_->CreateChannel(std::move(factory)); 470 } else { 471 context_->ipc_task_runner()->PostTask( 472 FROM_HERE, base::Bind(&Context::CreateChannel, context_, 473 base::Passed(&factory))); 474 } 475 476 // complete initialization on the background thread 477 context_->ipc_task_runner()->PostTask( 478 FROM_HERE, 479 base::Bind(&Context::OnChannelOpened, context_)); 480 481 did_init_ = true; 482 OnChannelInit(); 483 } 484 485 void ChannelProxy::Pause() { 486 context_->ipc_task_runner()->PostTask( 487 FROM_HERE, base::Bind(&Context::PauseChannel, context_)); 488 } 489 490 void ChannelProxy::Unpause(bool flush) { 491 context_->ipc_task_runner()->PostTask( 492 FROM_HERE, base::Bind(&Context::UnpauseChannel, context_, flush)); 493 } 494 495 void ChannelProxy::Flush() { 496 context_->ipc_task_runner()->PostTask( 497 FROM_HERE, base::Bind(&Context::FlushChannel, context_)); 498 } 499 500 void ChannelProxy::Close() { 501 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 502 503 // Clear the backpointer to the listener so that any pending calls to 504 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is 505 // possible that the channel could be closed while it is receiving messages! 506 context_->Clear(); 507 508 if (context_->ipc_task_runner()) { 509 context_->ipc_task_runner()->PostTask( 510 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_)); 511 } 512 } 513 514 bool ChannelProxy::Send(Message* message) { 515 DCHECK(!message->is_sync()) << "Need to use IPC::SyncChannel"; 516 SendInternal(message); 517 return true; 518 } 519 520 void ChannelProxy::SendInternal(Message* message) { 521 DCHECK(did_init_); 522 523 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are 524 // tests that call Send() from a wrong thread. See http://crbug.com/163523. 525 526 #ifdef ENABLE_IPC_FUZZER 527 // In IPC fuzzing builds, it is possible to define a filter to apply to 528 // outgoing messages. It will either rewrite the message and return a new 529 // one, freeing the original, or return the message unchanged. 530 if (outgoing_message_filter()) 531 message = outgoing_message_filter()->Rewrite(message); 532 #endif 533 534 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED) 535 Logging::GetInstance()->OnSendMessage(message); 536 #endif 537 538 // See https://crbug.com/766032. This is to ensure that senders of oversized 539 // messages can be caught more easily in the wild. 540 CHECK_LE(message->size(), Channel::kMaximumMessageSize); 541 542 context_->Send(message); 543 } 544 545 void ChannelProxy::AddFilter(MessageFilter* filter) { 546 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 547 548 context_->AddFilter(filter); 549 } 550 551 void ChannelProxy::RemoveFilter(MessageFilter* filter) { 552 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 553 554 context_->ipc_task_runner()->PostTask( 555 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_, 556 base::RetainedRef(filter))); 557 } 558 559 void ChannelProxy::AddGenericAssociatedInterfaceForIOThread( 560 const std::string& name, 561 const GenericAssociatedInterfaceFactory& factory) { 562 context()->AddGenericAssociatedInterfaceForIOThread(name, factory); 563 } 564 565 void ChannelProxy::GetGenericRemoteAssociatedInterface( 566 const std::string& name, 567 mojo::ScopedInterfaceEndpointHandle handle) { 568 DCHECK(did_init_); 569 context()->thread_safe_channel().GetAssociatedInterface( 570 name, mojom::GenericInterfaceAssociatedRequest(std::move(handle))); 571 } 572 573 void ChannelProxy::ClearIPCTaskRunner() { 574 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 575 context()->ClearIPCTaskRunner(); 576 } 577 578 void ChannelProxy::OnChannelInit() { 579 } 580 581 //----------------------------------------------------------------------------- 582 583 } // namespace IPC 584