1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/bind.h" 6 #include "base/compiler_specific.h" 7 #include "base/debug/trace_event.h" 8 #include "base/location.h" 9 #include "base/memory/ref_counted.h" 10 #include "base/memory/scoped_ptr.h" 11 #include "base/single_thread_task_runner.h" 12 #include "base/thread_task_runner_handle.h" 13 #include "ipc/ipc_channel_proxy.h" 14 #include "ipc/ipc_listener.h" 15 #include "ipc/ipc_logging.h" 16 #include "ipc/ipc_message_macros.h" 17 #include "ipc/ipc_message_utils.h" 18 19 namespace IPC { 20 21 //------------------------------------------------------------------------------ 22 23 ChannelProxy::MessageFilter::MessageFilter() {} 24 25 void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {} 26 27 void ChannelProxy::MessageFilter::OnFilterRemoved() {} 28 29 void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {} 30 31 void ChannelProxy::MessageFilter::OnChannelError() {} 32 33 void ChannelProxy::MessageFilter::OnChannelClosing() {} 34 35 bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) { 36 return false; 37 } 38 39 ChannelProxy::MessageFilter::~MessageFilter() {} 40 41 //------------------------------------------------------------------------------ 42 43 ChannelProxy::Context::Context(Listener* listener, 44 base::SingleThreadTaskRunner* ipc_task_runner) 45 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 46 listener_(listener), 47 ipc_task_runner_(ipc_task_runner), 48 channel_connected_called_(false), 49 peer_pid_(base::kNullProcessId) { 50 DCHECK(ipc_task_runner_.get()); 51 } 52 53 ChannelProxy::Context::~Context() { 54 } 55 56 void ChannelProxy::Context::ClearIPCTaskRunner() { 57 ipc_task_runner_ = NULL; 58 } 59 60 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle, 61 const Channel::Mode& mode) { 62 DCHECK(channel_.get() == NULL); 63 channel_id_ = handle.name; 64 channel_.reset(new Channel(handle, mode, this)); 65 } 66 67 bool ChannelProxy::Context::TryFilters(const Message& message) { 68 #ifdef IPC_MESSAGE_LOG_ENABLED 69 Logging* logger = Logging::GetInstance(); 70 if (logger->Enabled()) 71 logger->OnPreDispatchMessage(message); 72 #endif 73 74 for (size_t i = 0; i < filters_.size(); ++i) { 75 if (filters_[i]->OnMessageReceived(message)) { 76 #ifdef IPC_MESSAGE_LOG_ENABLED 77 if (logger->Enabled()) 78 logger->OnPostDispatchMessage(message, channel_id_); 79 #endif 80 return true; 81 } 82 } 83 return false; 84 } 85 86 // Called on the IPC::Channel thread 87 bool ChannelProxy::Context::OnMessageReceived(const Message& message) { 88 // First give a chance to the filters to process this message. 89 if (!TryFilters(message)) 90 OnMessageReceivedNoFilter(message); 91 return true; 92 } 93 94 // Called on the IPC::Channel thread 95 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { 96 // NOTE: This code relies on the listener's message loop not going away while 97 // this thread is active. That should be a reasonable assumption, but it 98 // feels risky. We may want to invent some more indirect way of referring to 99 // a MessageLoop if this becomes a problem. 100 listener_task_runner_->PostTask( 101 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message)); 102 return true; 103 } 104 105 // Called on the IPC::Channel thread 106 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) { 107 // Add any pending filters. This avoids a race condition where someone 108 // creates a ChannelProxy, calls AddFilter, and then right after starts the 109 // peer process. The IO thread could receive a message before the task to add 110 // the filter is run on the IO thread. 111 OnAddFilter(); 112 113 // We cache off the peer_pid so it can be safely accessed from both threads. 114 peer_pid_ = channel_->peer_pid(); 115 for (size_t i = 0; i < filters_.size(); ++i) 116 filters_[i]->OnChannelConnected(peer_pid); 117 118 // See above comment about using listener_task_runner_ here. 119 listener_task_runner_->PostTask( 120 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this)); 121 } 122 123 // Called on the IPC::Channel thread 124 void ChannelProxy::Context::OnChannelError() { 125 for (size_t i = 0; i < filters_.size(); ++i) 126 filters_[i]->OnChannelError(); 127 128 // See above comment about using listener_task_runner_ here. 129 listener_task_runner_->PostTask( 130 FROM_HERE, base::Bind(&Context::OnDispatchError, this)); 131 } 132 133 // Called on the IPC::Channel thread 134 void ChannelProxy::Context::OnChannelOpened() { 135 DCHECK(channel_ != NULL); 136 137 // Assume a reference to ourselves on behalf of this thread. This reference 138 // will be released when we are closed. 139 AddRef(); 140 141 if (!channel_->Connect()) { 142 OnChannelError(); 143 return; 144 } 145 146 for (size_t i = 0; i < filters_.size(); ++i) 147 filters_[i]->OnFilterAdded(channel_.get()); 148 } 149 150 // Called on the IPC::Channel thread 151 void ChannelProxy::Context::OnChannelClosed() { 152 // It's okay for IPC::ChannelProxy::Close to be called more than once, which 153 // would result in this branch being taken. 154 if (!channel_.get()) 155 return; 156 157 for (size_t i = 0; i < filters_.size(); ++i) { 158 filters_[i]->OnChannelClosing(); 159 filters_[i]->OnFilterRemoved(); 160 } 161 162 // We don't need the filters anymore. 163 filters_.clear(); 164 165 channel_.reset(); 166 167 // Balance with the reference taken during startup. This may result in 168 // self-destruction. 169 Release(); 170 } 171 172 void ChannelProxy::Context::Clear() { 173 listener_ = NULL; 174 } 175 176 // Called on the IPC::Channel thread 177 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) { 178 if (!channel_.get()) { 179 OnChannelClosed(); 180 return; 181 } 182 if (!channel_->Send(message.release())) 183 OnChannelError(); 184 } 185 186 // Called on the IPC::Channel thread 187 void ChannelProxy::Context::OnAddFilter() { 188 std::vector<scoped_refptr<MessageFilter> > new_filters; 189 { 190 base::AutoLock auto_lock(pending_filters_lock_); 191 new_filters.swap(pending_filters_); 192 } 193 194 for (size_t i = 0; i < new_filters.size(); ++i) { 195 filters_.push_back(new_filters[i]); 196 197 // If the channel has already been created, then we need to send this 198 // message so that the filter gets access to the Channel. 199 if (channel_.get()) 200 new_filters[i]->OnFilterAdded(channel_.get()); 201 // Ditto for if the channel has been connected. 202 if (peer_pid_) 203 new_filters[i]->OnChannelConnected(peer_pid_); 204 } 205 } 206 207 // Called on the IPC::Channel thread 208 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { 209 if (!channel_.get()) 210 return; // The filters have already been deleted. 211 212 for (size_t i = 0; i < filters_.size(); ++i) { 213 if (filters_[i].get() == filter) { 214 filter->OnFilterRemoved(); 215 filters_.erase(filters_.begin() + i); 216 return; 217 } 218 } 219 220 NOTREACHED() << "filter to be removed not found"; 221 } 222 223 // Called on the listener's thread 224 void ChannelProxy::Context::AddFilter(MessageFilter* filter) { 225 base::AutoLock auto_lock(pending_filters_lock_); 226 pending_filters_.push_back(make_scoped_refptr(filter)); 227 ipc_task_runner_->PostTask( 228 FROM_HERE, base::Bind(&Context::OnAddFilter, this)); 229 } 230 231 // Called on the listener's thread 232 void ChannelProxy::Context::OnDispatchMessage(const Message& message) { 233 #ifdef IPC_MESSAGE_LOG_ENABLED 234 Logging* logger = Logging::GetInstance(); 235 std::string name; 236 logger->GetMessageText(message.type(), &name, &message, NULL); 237 TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage", 238 "name", name); 239 #else 240 TRACE_EVENT2("task", "ChannelProxy::Context::OnDispatchMessage", 241 "class", IPC_MESSAGE_ID_CLASS(message.type()), 242 "line", IPC_MESSAGE_ID_LINE(message.type())); 243 #endif 244 245 if (!listener_) 246 return; 247 248 OnDispatchConnected(); 249 250 #ifdef IPC_MESSAGE_LOG_ENABLED 251 if (message.type() == IPC_LOGGING_ID) { 252 logger->OnReceivedLoggingMessage(message); 253 return; 254 } 255 256 if (logger->Enabled()) 257 logger->OnPreDispatchMessage(message); 258 #endif 259 260 listener_->OnMessageReceived(message); 261 262 #ifdef IPC_MESSAGE_LOG_ENABLED 263 if (logger->Enabled()) 264 logger->OnPostDispatchMessage(message, channel_id_); 265 #endif 266 } 267 268 // Called on the listener's thread 269 void ChannelProxy::Context::OnDispatchConnected() { 270 if (channel_connected_called_) 271 return; 272 273 channel_connected_called_ = true; 274 if (listener_) 275 listener_->OnChannelConnected(peer_pid_); 276 } 277 278 // Called on the listener's thread 279 void ChannelProxy::Context::OnDispatchError() { 280 if (listener_) 281 listener_->OnChannelError(); 282 } 283 284 //----------------------------------------------------------------------------- 285 286 ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle, 287 Channel::Mode mode, 288 Listener* listener, 289 base::SingleThreadTaskRunner* ipc_task_runner) 290 : context_(new Context(listener, ipc_task_runner)), 291 did_init_(false) { 292 Init(channel_handle, mode, true); 293 } 294 295 ChannelProxy::ChannelProxy(Context* context) 296 : context_(context), 297 did_init_(false) { 298 } 299 300 ChannelProxy::~ChannelProxy() { 301 DCHECK(CalledOnValidThread()); 302 303 Close(); 304 } 305 306 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, 307 Channel::Mode mode, 308 bool create_pipe_now) { 309 DCHECK(CalledOnValidThread()); 310 DCHECK(!did_init_); 311 #if defined(OS_POSIX) 312 // When we are creating a server on POSIX, we need its file descriptor 313 // to be created immediately so that it can be accessed and passed 314 // to other processes. Forcing it to be created immediately avoids 315 // race conditions that may otherwise arise. 316 if (mode & Channel::MODE_SERVER_FLAG) { 317 create_pipe_now = true; 318 } 319 #endif // defined(OS_POSIX) 320 321 if (create_pipe_now) { 322 // Create the channel immediately. This effectively sets up the 323 // low-level pipe so that the client can connect. Without creating 324 // the pipe immediately, it is possible for a listener to attempt 325 // to connect and get an error since the pipe doesn't exist yet. 326 context_->CreateChannel(channel_handle, mode); 327 } else { 328 context_->ipc_task_runner()->PostTask( 329 FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(), 330 channel_handle, mode)); 331 } 332 333 // complete initialization on the background thread 334 context_->ipc_task_runner()->PostTask( 335 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get())); 336 337 did_init_ = true; 338 } 339 340 void ChannelProxy::Close() { 341 DCHECK(CalledOnValidThread()); 342 343 // Clear the backpointer to the listener so that any pending calls to 344 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is 345 // possible that the channel could be closed while it is receiving messages! 346 context_->Clear(); 347 348 if (context_->ipc_task_runner()) { 349 context_->ipc_task_runner()->PostTask( 350 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get())); 351 } 352 } 353 354 bool ChannelProxy::Send(Message* message) { 355 DCHECK(did_init_); 356 357 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are 358 // tests that call Send() from a wrong thread. See http://crbug.com/163523. 359 360 #ifdef IPC_MESSAGE_LOG_ENABLED 361 Logging::GetInstance()->OnSendMessage(message, context_->channel_id()); 362 #endif 363 364 context_->ipc_task_runner()->PostTask( 365 FROM_HERE, 366 base::Bind(&ChannelProxy::Context::OnSendMessage, 367 context_, base::Passed(scoped_ptr<Message>(message)))); 368 return true; 369 } 370 371 void ChannelProxy::AddFilter(MessageFilter* filter) { 372 DCHECK(CalledOnValidThread()); 373 374 context_->AddFilter(filter); 375 } 376 377 void ChannelProxy::RemoveFilter(MessageFilter* filter) { 378 DCHECK(CalledOnValidThread()); 379 380 context_->ipc_task_runner()->PostTask( 381 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(), 382 make_scoped_refptr(filter))); 383 } 384 385 void ChannelProxy::ClearIPCTaskRunner() { 386 DCHECK(CalledOnValidThread()); 387 388 context()->ClearIPCTaskRunner(); 389 } 390 391 #if defined(OS_POSIX) && !defined(OS_NACL) 392 // See the TODO regarding lazy initialization of the channel in 393 // ChannelProxy::Init(). 394 int ChannelProxy::GetClientFileDescriptor() { 395 DCHECK(CalledOnValidThread()); 396 397 Channel* channel = context_.get()->channel_.get(); 398 // Channel must have been created first. 399 DCHECK(channel) << context_.get()->channel_id_; 400 return channel->GetClientFileDescriptor(); 401 } 402 403 int ChannelProxy::TakeClientFileDescriptor() { 404 DCHECK(CalledOnValidThread()); 405 406 Channel* channel = context_.get()->channel_.get(); 407 // Channel must have been created first. 408 DCHECK(channel) << context_.get()->channel_id_; 409 return channel->TakeClientFileDescriptor(); 410 } 411 412 bool ChannelProxy::GetPeerEuid(uid_t* peer_euid) const { 413 DCHECK(CalledOnValidThread()); 414 415 Channel* channel = context_.get()->channel_.get(); 416 // Channel must have been created first. 417 DCHECK(channel) << context_.get()->channel_id_; 418 return channel->GetPeerEuid(peer_euid); 419 } 420 #endif 421 422 //----------------------------------------------------------------------------- 423 424 } // namespace IPC 425