1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "ipc/ipc_channel_proxy.h" 6 7 #include "base/bind.h" 8 #include "base/compiler_specific.h" 9 #include "base/location.h" 10 #include "base/memory/ref_counted.h" 11 #include "base/memory/scoped_ptr.h" 12 #include "base/single_thread_task_runner.h" 13 #include "base/thread_task_runner_handle.h" 14 #include "ipc/ipc_channel_factory.h" 15 #include "ipc/ipc_listener.h" 16 #include "ipc/ipc_logging.h" 17 #include "ipc/ipc_message_macros.h" 18 #include "ipc/message_filter.h" 19 #include "ipc/message_filter_router.h" 20 21 namespace IPC { 22 23 //------------------------------------------------------------------------------ 24 25 ChannelProxy::Context::Context( 26 Listener* listener, 27 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) 28 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 29 listener_(listener), 30 ipc_task_runner_(ipc_task_runner), 31 channel_connected_called_(false), 32 message_filter_router_(new MessageFilterRouter()), 33 peer_pid_(base::kNullProcessId) { 34 DCHECK(ipc_task_runner_.get()); 35 // The Listener thread where Messages are handled must be a separate thread 36 // to avoid oversubscribing the IO thread. If you trigger this error, you 37 // need to either: 38 // 1) Create the ChannelProxy on a different thread, or 39 // 2) Just use Channel 40 // Note, we currently make an exception for a NULL listener. That usage 41 // basically works, but is outside the intent of ChannelProxy. This support 42 // will disappear, so please don't rely on it. See crbug.com/364241 43 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get())); 44 } 45 46 ChannelProxy::Context::~Context() { 47 } 48 49 void ChannelProxy::Context::ClearIPCTaskRunner() { 50 ipc_task_runner_ = NULL; 51 } 52 53 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) { 54 DCHECK(!channel_); 55 channel_id_ = factory->GetName(); 56 channel_ = factory->BuildChannel(this); 57 } 58 59 bool ChannelProxy::Context::TryFilters(const Message& message) { 60 DCHECK(message_filter_router_); 61 #ifdef IPC_MESSAGE_LOG_ENABLED 62 Logging* logger = Logging::GetInstance(); 63 if (logger->Enabled()) 64 logger->OnPreDispatchMessage(message); 65 #endif 66 67 if (message_filter_router_->TryFilters(message)) { 68 if (message.dispatch_error()) { 69 listener_task_runner_->PostTask( 70 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message)); 71 } 72 #ifdef IPC_MESSAGE_LOG_ENABLED 73 if (logger->Enabled()) 74 logger->OnPostDispatchMessage(message, channel_id_); 75 #endif 76 return true; 77 } 78 return false; 79 } 80 81 // Called on the IPC::Channel thread 82 bool ChannelProxy::Context::OnMessageReceived(const Message& message) { 83 // First give a chance to the filters to process this message. 84 if (!TryFilters(message)) 85 OnMessageReceivedNoFilter(message); 86 return true; 87 } 88 89 // Called on the IPC::Channel thread 90 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { 91 listener_task_runner_->PostTask( 92 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message)); 93 return true; 94 } 95 96 // Called on the IPC::Channel thread 97 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) { 98 // We cache off the peer_pid so it can be safely accessed from both threads. 99 peer_pid_ = channel_->GetPeerPID(); 100 101 // Add any pending filters. This avoids a race condition where someone 102 // creates a ChannelProxy, calls AddFilter, and then right after starts the 103 // peer process. The IO thread could receive a message before the task to add 104 // the filter is run on the IO thread. 105 OnAddFilter(); 106 107 // See above comment about using listener_task_runner_ here. 108 listener_task_runner_->PostTask( 109 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this)); 110 } 111 112 // Called on the IPC::Channel thread 113 void ChannelProxy::Context::OnChannelError() { 114 for (size_t i = 0; i < filters_.size(); ++i) 115 filters_[i]->OnChannelError(); 116 117 // See above comment about using listener_task_runner_ here. 118 listener_task_runner_->PostTask( 119 FROM_HERE, base::Bind(&Context::OnDispatchError, this)); 120 } 121 122 // Called on the IPC::Channel thread 123 void ChannelProxy::Context::OnChannelOpened() { 124 DCHECK(channel_ != NULL); 125 126 // Assume a reference to ourselves on behalf of this thread. This reference 127 // will be released when we are closed. 128 AddRef(); 129 130 if (!channel_->Connect()) { 131 OnChannelError(); 132 return; 133 } 134 135 for (size_t i = 0; i < filters_.size(); ++i) 136 filters_[i]->OnFilterAdded(channel_.get()); 137 } 138 139 // Called on the IPC::Channel thread 140 void ChannelProxy::Context::OnChannelClosed() { 141 // It's okay for IPC::ChannelProxy::Close to be called more than once, which 142 // would result in this branch being taken. 143 if (!channel_) 144 return; 145 146 for (size_t i = 0; i < filters_.size(); ++i) { 147 filters_[i]->OnChannelClosing(); 148 filters_[i]->OnFilterRemoved(); 149 } 150 151 // We don't need the filters anymore. 152 message_filter_router_->Clear(); 153 filters_.clear(); 154 // We don't need the lock, because at this point, the listener thread can't 155 // access it any more. 156 pending_filters_.clear(); 157 158 channel_.reset(); 159 160 // Balance with the reference taken during startup. This may result in 161 // self-destruction. 162 Release(); 163 } 164 165 void ChannelProxy::Context::Clear() { 166 listener_ = NULL; 167 } 168 169 // Called on the IPC::Channel thread 170 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) { 171 if (!channel_) { 172 OnChannelClosed(); 173 return; 174 } 175 176 if (!channel_->Send(message.release())) 177 OnChannelError(); 178 } 179 180 // Called on the IPC::Channel thread 181 void ChannelProxy::Context::OnAddFilter() { 182 // Our OnChannelConnected method has not yet been called, so we can't be 183 // sure that channel_ is valid yet. When OnChannelConnected *is* called, 184 // it invokes OnAddFilter, so any pending filter(s) will be added at that 185 // time. 186 if (peer_pid_ == base::kNullProcessId) 187 return; 188 189 std::vector<scoped_refptr<MessageFilter> > new_filters; 190 { 191 base::AutoLock auto_lock(pending_filters_lock_); 192 new_filters.swap(pending_filters_); 193 } 194 195 for (size_t i = 0; i < new_filters.size(); ++i) { 196 filters_.push_back(new_filters[i]); 197 198 message_filter_router_->AddFilter(new_filters[i].get()); 199 200 // The channel has already been created and connected, so we need to 201 // inform the filters right now. 202 new_filters[i]->OnFilterAdded(channel_.get()); 203 new_filters[i]->OnChannelConnected(peer_pid_); 204 } 205 } 206 207 // Called on the IPC::Channel thread 208 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { 209 if (peer_pid_ == base::kNullProcessId) { 210 // The channel is not yet connected, so any filters are still pending. 211 base::AutoLock auto_lock(pending_filters_lock_); 212 for (size_t i = 0; i < pending_filters_.size(); ++i) { 213 if (pending_filters_[i].get() == filter) { 214 filter->OnFilterRemoved(); 215 pending_filters_.erase(pending_filters_.begin() + i); 216 return; 217 } 218 } 219 return; 220 } 221 if (!channel_) 222 return; // The filters have already been deleted. 223 224 message_filter_router_->RemoveFilter(filter); 225 226 for (size_t i = 0; i < filters_.size(); ++i) { 227 if (filters_[i].get() == filter) { 228 filter->OnFilterRemoved(); 229 filters_.erase(filters_.begin() + i); 230 return; 231 } 232 } 233 234 NOTREACHED() << "filter to be removed not found"; 235 } 236 237 // Called on the listener's thread 238 void ChannelProxy::Context::AddFilter(MessageFilter* filter) { 239 base::AutoLock auto_lock(pending_filters_lock_); 240 pending_filters_.push_back(make_scoped_refptr(filter)); 241 ipc_task_runner_->PostTask( 242 FROM_HERE, base::Bind(&Context::OnAddFilter, this)); 243 } 244 245 // Called on the listener's thread 246 void ChannelProxy::Context::OnDispatchMessage(const Message& message) { 247 #ifdef IPC_MESSAGE_LOG_ENABLED 248 Logging* logger = Logging::GetInstance(); 249 std::string name; 250 logger->GetMessageText(message.type(), &name, &message, NULL); 251 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage", 252 "name", name); 253 #else 254 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage", 255 "class", IPC_MESSAGE_ID_CLASS(message.type()), 256 "line", IPC_MESSAGE_ID_LINE(message.type())); 257 #endif 258 259 if (!listener_) 260 return; 261 262 OnDispatchConnected(); 263 264 #ifdef IPC_MESSAGE_LOG_ENABLED 265 if (message.type() == IPC_LOGGING_ID) { 266 logger->OnReceivedLoggingMessage(message); 267 return; 268 } 269 270 if (logger->Enabled()) 271 logger->OnPreDispatchMessage(message); 272 #endif 273 274 listener_->OnMessageReceived(message); 275 if (message.dispatch_error()) 276 listener_->OnBadMessageReceived(message); 277 278 #ifdef IPC_MESSAGE_LOG_ENABLED 279 if (logger->Enabled()) 280 logger->OnPostDispatchMessage(message, channel_id_); 281 #endif 282 } 283 284 // Called on the listener's thread 285 void ChannelProxy::Context::OnDispatchConnected() { 286 if (channel_connected_called_) 287 return; 288 289 channel_connected_called_ = true; 290 if (listener_) 291 listener_->OnChannelConnected(peer_pid_); 292 } 293 294 // Called on the listener's thread 295 void ChannelProxy::Context::OnDispatchError() { 296 if (listener_) 297 listener_->OnChannelError(); 298 } 299 300 // Called on the listener's thread 301 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) { 302 if (listener_) 303 listener_->OnBadMessageReceived(message); 304 } 305 306 //----------------------------------------------------------------------------- 307 308 // static 309 scoped_ptr<ChannelProxy> ChannelProxy::Create( 310 const IPC::ChannelHandle& channel_handle, 311 Channel::Mode mode, 312 Listener* listener, 313 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { 314 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner)); 315 channel->Init(channel_handle, mode, true); 316 return channel.Pass(); 317 } 318 319 // static 320 scoped_ptr<ChannelProxy> ChannelProxy::Create( 321 scoped_ptr<ChannelFactory> factory, 322 Listener* listener, 323 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { 324 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner)); 325 channel->Init(factory.Pass(), true); 326 return channel.Pass(); 327 } 328 329 ChannelProxy::ChannelProxy(Context* context) 330 : context_(context), 331 did_init_(false) { 332 } 333 334 ChannelProxy::ChannelProxy( 335 Listener* listener, 336 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) 337 : context_(new Context(listener, ipc_task_runner)), did_init_(false) { 338 } 339 340 ChannelProxy::~ChannelProxy() { 341 DCHECK(CalledOnValidThread()); 342 343 Close(); 344 } 345 346 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, 347 Channel::Mode mode, 348 bool create_pipe_now) { 349 #if defined(OS_POSIX) 350 // When we are creating a server on POSIX, we need its file descriptor 351 // to be created immediately so that it can be accessed and passed 352 // to other processes. Forcing it to be created immediately avoids 353 // race conditions that may otherwise arise. 354 if (mode & Channel::MODE_SERVER_FLAG) { 355 create_pipe_now = true; 356 } 357 #endif // defined(OS_POSIX) 358 Init(ChannelFactory::Create(channel_handle, mode), 359 create_pipe_now); 360 } 361 362 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory, 363 bool create_pipe_now) { 364 DCHECK(CalledOnValidThread()); 365 DCHECK(!did_init_); 366 367 if (create_pipe_now) { 368 // Create the channel immediately. This effectively sets up the 369 // low-level pipe so that the client can connect. Without creating 370 // the pipe immediately, it is possible for a listener to attempt 371 // to connect and get an error since the pipe doesn't exist yet. 372 context_->CreateChannel(factory.Pass()); 373 } else { 374 context_->ipc_task_runner()->PostTask( 375 FROM_HERE, base::Bind(&Context::CreateChannel, 376 context_.get(), Passed(factory.Pass()))); 377 } 378 379 // complete initialization on the background thread 380 context_->ipc_task_runner()->PostTask( 381 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get())); 382 383 did_init_ = true; 384 } 385 386 void ChannelProxy::Close() { 387 DCHECK(CalledOnValidThread()); 388 389 // Clear the backpointer to the listener so that any pending calls to 390 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is 391 // possible that the channel could be closed while it is receiving messages! 392 context_->Clear(); 393 394 if (context_->ipc_task_runner()) { 395 context_->ipc_task_runner()->PostTask( 396 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get())); 397 } 398 } 399 400 bool ChannelProxy::Send(Message* message) { 401 DCHECK(did_init_); 402 403 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are 404 // tests that call Send() from a wrong thread. See http://crbug.com/163523. 405 406 #ifdef IPC_MESSAGE_LOG_ENABLED 407 Logging::GetInstance()->OnSendMessage(message, context_->channel_id()); 408 #endif 409 410 context_->ipc_task_runner()->PostTask( 411 FROM_HERE, 412 base::Bind(&ChannelProxy::Context::OnSendMessage, 413 context_, base::Passed(scoped_ptr<Message>(message)))); 414 return true; 415 } 416 417 void ChannelProxy::AddFilter(MessageFilter* filter) { 418 DCHECK(CalledOnValidThread()); 419 420 context_->AddFilter(filter); 421 } 422 423 void ChannelProxy::RemoveFilter(MessageFilter* filter) { 424 DCHECK(CalledOnValidThread()); 425 426 context_->ipc_task_runner()->PostTask( 427 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(), 428 make_scoped_refptr(filter))); 429 } 430 431 void ChannelProxy::ClearIPCTaskRunner() { 432 DCHECK(CalledOnValidThread()); 433 434 context()->ClearIPCTaskRunner(); 435 } 436 437 #if defined(OS_POSIX) && !defined(OS_NACL) 438 // See the TODO regarding lazy initialization of the channel in 439 // ChannelProxy::Init(). 440 int ChannelProxy::GetClientFileDescriptor() { 441 DCHECK(CalledOnValidThread()); 442 443 Channel* channel = context_.get()->channel_.get(); 444 // Channel must have been created first. 445 DCHECK(channel) << context_.get()->channel_id_; 446 return channel->GetClientFileDescriptor(); 447 } 448 449 int ChannelProxy::TakeClientFileDescriptor() { 450 DCHECK(CalledOnValidThread()); 451 452 Channel* channel = context_.get()->channel_.get(); 453 // Channel must have been created first. 454 DCHECK(channel) << context_.get()->channel_id_; 455 return channel->TakeClientFileDescriptor(); 456 } 457 #endif 458 459 //----------------------------------------------------------------------------- 460 461 } // namespace IPC 462