1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/channel.h" 6 7 #include <algorithm> 8 9 #include "base/bind.h" 10 #include "base/compiler_specific.h" 11 #include "base/logging.h" 12 #include "base/macros.h" 13 #include "base/strings/stringprintf.h" 14 #include "mojo/embedder/platform_handle_vector.h" 15 #include "mojo/system/message_pipe_endpoint.h" 16 #include "mojo/system/transport_data.h" 17 18 namespace mojo { 19 namespace system { 20 21 static_assert(Channel::kBootstrapEndpointId != 22 MessageInTransit::kInvalidEndpointId, 23 "kBootstrapEndpointId is invalid"); 24 25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId 26 Channel::kBootstrapEndpointId; 27 28 Channel::Channel(embedder::PlatformSupport* platform_support) 29 : platform_support_(platform_support), 30 is_running_(false), 31 is_shutting_down_(false), 32 next_local_id_(kBootstrapEndpointId) { 33 } 34 35 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) { 36 DCHECK(creation_thread_checker_.CalledOnValidThread()); 37 DCHECK(raw_channel); 38 39 // No need to take |lock_|, since this must be called before this object 40 // becomes thread-safe. 41 DCHECK(!is_running_); 42 raw_channel_ = raw_channel.Pass(); 43 44 if (!raw_channel_->Init(this)) { 45 raw_channel_.reset(); 46 return false; 47 } 48 49 is_running_ = true; 50 return true; 51 } 52 53 void Channel::Shutdown() { 54 DCHECK(creation_thread_checker_.CalledOnValidThread()); 55 56 IdToEndpointMap to_destroy; 57 { 58 base::AutoLock locker(lock_); 59 if (!is_running_) 60 return; 61 62 // Note: Don't reset |raw_channel_|, in case we're being called from within 63 // |OnReadMessage()| or |OnError()|. 64 raw_channel_->Shutdown(); 65 is_running_ = false; 66 67 // We need to deal with it outside the lock. 68 std::swap(to_destroy, local_id_to_endpoint_map_); 69 } 70 71 size_t num_live = 0; 72 size_t num_zombies = 0; 73 for (IdToEndpointMap::iterator it = to_destroy.begin(); 74 it != to_destroy.end(); 75 ++it) { 76 if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) { 77 it->second->message_pipe_->OnRemove(it->second->port_); 78 num_live++; 79 } else { 80 DCHECK(!it->second->message_pipe_.get()); 81 num_zombies++; 82 } 83 it->second->DetachFromChannel(); 84 } 85 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live 86 << " live endpoints and " << num_zombies 87 << " zombies"; 88 } 89 90 void Channel::WillShutdownSoon() { 91 base::AutoLock locker(lock_); 92 is_shutting_down_ = true; 93 } 94 95 // Note: |endpoint| being a |scoped_refptr| makes this function safe, since it 96 // keeps the endpoint alive even after the lock is released. Otherwise, there's 97 // the temptation to simply pass the result of |new ChannelEndpoint(...)| 98 // directly to this function, which wouldn't be sufficient for safety. 99 MessageInTransit::EndpointId Channel::AttachEndpoint( 100 scoped_refptr<ChannelEndpoint> endpoint) { 101 DCHECK(endpoint.get()); 102 103 MessageInTransit::EndpointId local_id; 104 { 105 base::AutoLock locker(lock_); 106 107 DLOG_IF(WARNING, is_shutting_down_) 108 << "AttachEndpoint() while shutting down"; 109 110 while (next_local_id_ == MessageInTransit::kInvalidEndpointId || 111 local_id_to_endpoint_map_.find(next_local_id_) != 112 local_id_to_endpoint_map_.end()) 113 next_local_id_++; 114 115 local_id = next_local_id_; 116 next_local_id_++; 117 local_id_to_endpoint_map_[local_id] = endpoint; 118 } 119 120 endpoint->AttachToChannel(this, local_id); 121 return local_id; 122 } 123 124 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, 125 MessageInTransit::EndpointId remote_id) { 126 scoped_refptr<ChannelEndpoint> endpoint; 127 ChannelEndpoint::State state; 128 { 129 base::AutoLock locker(lock_); 130 131 DLOG_IF(WARNING, is_shutting_down_) 132 << "RunMessagePipeEndpoint() while shutting down"; 133 134 IdToEndpointMap::const_iterator it = 135 local_id_to_endpoint_map_.find(local_id); 136 if (it == local_id_to_endpoint_map_.end()) 137 return false; 138 endpoint = it->second; 139 state = it->second->state_; 140 } 141 142 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint| 143 // and ignore it. 144 if (state != ChannelEndpoint::STATE_NORMAL) { 145 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint " 146 "(local ID " << local_id << ", remote ID " << remote_id << ")"; 147 return true; 148 } 149 150 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already 151 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). 152 endpoint->Run(remote_id); 153 return true; 154 } 155 156 void Channel::RunRemoteMessagePipeEndpoint( 157 MessageInTransit::EndpointId local_id, 158 MessageInTransit::EndpointId remote_id) { 159 #if DCHECK_IS_ON 160 { 161 base::AutoLock locker(lock_); 162 DCHECK(local_id_to_endpoint_map_.find(local_id) != 163 local_id_to_endpoint_map_.end()); 164 } 165 #endif 166 167 if (!SendControlMessage( 168 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, 169 local_id, 170 remote_id)) { 171 HandleLocalError(base::StringPrintf( 172 "Failed to send message to run remote message pipe endpoint (local ID " 173 "%u, remote ID %u)", 174 static_cast<unsigned>(local_id), 175 static_cast<unsigned>(remote_id))); 176 } 177 } 178 179 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) { 180 base::AutoLock locker(lock_); 181 if (!is_running_) { 182 // TODO(vtl): I think this is probably not an error condition, but I should 183 // think about it (and the shutdown sequence) more carefully. 184 LOG(WARNING) << "WriteMessage() after shutdown"; 185 return false; 186 } 187 188 DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down"; 189 return raw_channel_->WriteMessage(message.Pass()); 190 } 191 192 bool Channel::IsWriteBufferEmpty() { 193 base::AutoLock locker(lock_); 194 if (!is_running_) 195 return true; 196 return raw_channel_->IsWriteBufferEmpty(); 197 } 198 199 void Channel::DetachMessagePipeEndpoint( 200 MessageInTransit::EndpointId local_id, 201 MessageInTransit::EndpointId remote_id) { 202 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 203 204 // If this is non-null after the locked block, the endpoint should be detached 205 // (and no remove message sent). 206 scoped_refptr<ChannelEndpoint> endpoint_to_detach; 207 { 208 base::AutoLock locker_(lock_); 209 if (!is_running_) 210 return; 211 212 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id); 213 DCHECK(it != local_id_to_endpoint_map_.end()); 214 215 switch (it->second->state_) { 216 case ChannelEndpoint::STATE_NORMAL: 217 it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK; 218 it->second->message_pipe_ = nullptr; 219 if (remote_id == MessageInTransit::kInvalidEndpointId) 220 return; 221 // We have to send a remove message (outside the lock). 222 break; 223 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH: 224 endpoint_to_detach = it->second; 225 local_id_to_endpoint_map_.erase(it); 226 // We have to detach (outside the lock). 227 break; 228 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK: 229 NOTREACHED(); 230 return; 231 } 232 } 233 if (endpoint_to_detach.get()) { 234 endpoint_to_detach->DetachFromChannel(); 235 return; 236 } 237 238 if (!SendControlMessage( 239 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, 240 local_id, 241 remote_id)) { 242 HandleLocalError(base::StringPrintf( 243 "Failed to send message to remove remote message pipe endpoint (local " 244 "ID %u, remote ID %u)", 245 static_cast<unsigned>(local_id), 246 static_cast<unsigned>(remote_id))); 247 } 248 } 249 250 size_t Channel::GetSerializedPlatformHandleSize() const { 251 return raw_channel_->GetSerializedPlatformHandleSize(); 252 } 253 254 Channel::~Channel() { 255 // The channel should have been shut down first. 256 DCHECK(!is_running_); 257 } 258 259 void Channel::OnReadMessage( 260 const MessageInTransit::View& message_view, 261 embedder::ScopedPlatformHandleVectorPtr platform_handles) { 262 DCHECK(creation_thread_checker_.CalledOnValidThread()); 263 264 switch (message_view.type()) { 265 case MessageInTransit::kTypeMessagePipeEndpoint: 266 case MessageInTransit::kTypeMessagePipe: 267 OnReadMessageForDownstream(message_view, platform_handles.Pass()); 268 break; 269 case MessageInTransit::kTypeChannel: 270 OnReadMessageForChannel(message_view, platform_handles.Pass()); 271 break; 272 default: 273 HandleRemoteError( 274 base::StringPrintf("Received message of invalid type %u", 275 static_cast<unsigned>(message_view.type()))); 276 break; 277 } 278 } 279 280 void Channel::OnError(Error error) { 281 DCHECK(creation_thread_checker_.CalledOnValidThread()); 282 283 switch (error) { 284 case ERROR_READ_SHUTDOWN: 285 // The other side was cleanly closed, so this isn't actually an error. 286 DVLOG(1) << "RawChannel read error (shutdown)"; 287 break; 288 case ERROR_READ_BROKEN: { 289 base::AutoLock locker(lock_); 290 LOG_IF(ERROR, !is_shutting_down_) 291 << "RawChannel read error (connection broken)"; 292 break; 293 } 294 case ERROR_READ_BAD_MESSAGE: 295 // Receiving a bad message means either a bug, data corruption, or 296 // malicious attack (probably due to some other bug). 297 LOG(ERROR) << "RawChannel read error (received bad message)"; 298 break; 299 case ERROR_READ_UNKNOWN: 300 LOG(ERROR) << "RawChannel read error (unknown)"; 301 break; 302 case ERROR_WRITE: 303 // Write errors are slightly notable: they probably shouldn't happen under 304 // normal operation (but maybe the other side crashed). 305 LOG(WARNING) << "RawChannel write error"; 306 break; 307 } 308 Shutdown(); 309 } 310 311 void Channel::OnReadMessageForDownstream( 312 const MessageInTransit::View& message_view, 313 embedder::ScopedPlatformHandleVectorPtr platform_handles) { 314 DCHECK(creation_thread_checker_.CalledOnValidThread()); 315 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint || 316 message_view.type() == MessageInTransit::kTypeMessagePipe); 317 318 MessageInTransit::EndpointId local_id = message_view.destination_id(); 319 if (local_id == MessageInTransit::kInvalidEndpointId) { 320 HandleRemoteError("Received message with no destination ID"); 321 return; 322 } 323 324 ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL; 325 scoped_refptr<MessagePipe> message_pipe; 326 unsigned port = ~0u; 327 bool nonexistent_local_id_error = false; 328 { 329 base::AutoLock locker(lock_); 330 331 // Since we own |raw_channel_|, and this method and |Shutdown()| should only 332 // be called from the creation thread, |raw_channel_| should never be null 333 // here. 334 DCHECK(is_running_); 335 336 IdToEndpointMap::const_iterator it = 337 local_id_to_endpoint_map_.find(local_id); 338 if (it == local_id_to_endpoint_map_.end()) { 339 nonexistent_local_id_error = true; 340 } else { 341 state = it->second->state_; 342 message_pipe = it->second->message_pipe_; 343 port = it->second->port_; 344 } 345 } 346 if (nonexistent_local_id_error) { 347 HandleRemoteError(base::StringPrintf( 348 "Received a message for nonexistent local destination ID %u", 349 static_cast<unsigned>(local_id))); 350 // This is strongly indicative of some problem. However, it's not a fatal 351 // error, since it may indicate a buggy (or hostile) remote process. Don't 352 // die even for Debug builds, since handling this properly needs to be 353 // tested (TODO(vtl)). 354 DLOG(ERROR) << "This should not happen under normal operation."; 355 return; 356 } 357 358 // Ignore messages for zombie endpoints (not an error). 359 if (state != ChannelEndpoint::STATE_NORMAL) { 360 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = " 361 << local_id << ", remote ID = " << message_view.source_id() << ")"; 362 return; 363 } 364 365 // We need to duplicate the message (data), because |EnqueueMessage()| will 366 // take ownership of it. 367 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); 368 if (message_view.transport_data_buffer_size() > 0) { 369 DCHECK(message_view.transport_data_buffer()); 370 message->SetDispatchers(TransportData::DeserializeDispatchers( 371 message_view.transport_data_buffer(), 372 message_view.transport_data_buffer_size(), 373 platform_handles.Pass(), 374 this)); 375 } 376 MojoResult result = message_pipe->EnqueueMessage( 377 MessagePipe::GetPeerPort(port), message.Pass()); 378 if (result != MOJO_RESULT_OK) { 379 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint 380 // has been closed (in an unavoidable race). This might also be a "remote" 381 // error, e.g., if the remote side is sending invalid control messages (to 382 // the message pipe). 383 HandleLocalError(base::StringPrintf( 384 "Failed to enqueue message to local ID %u (result %d)", 385 static_cast<unsigned>(local_id), 386 static_cast<int>(result))); 387 return; 388 } 389 } 390 391 void Channel::OnReadMessageForChannel( 392 const MessageInTransit::View& message_view, 393 embedder::ScopedPlatformHandleVectorPtr platform_handles) { 394 DCHECK(creation_thread_checker_.CalledOnValidThread()); 395 DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel); 396 397 // Currently, no channel messages take platform handles. 398 if (platform_handles) { 399 HandleRemoteError( 400 "Received invalid channel message (has platform handles)"); 401 NOTREACHED(); 402 return; 403 } 404 405 switch (message_view.subtype()) { 406 case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint: 407 DVLOG(2) << "Handling channel message to run message pipe (local ID " 408 << message_view.destination_id() << ", remote ID " 409 << message_view.source_id() << ")"; 410 if (!RunMessagePipeEndpoint(message_view.destination_id(), 411 message_view.source_id())) { 412 HandleRemoteError( 413 "Received invalid channel message to run message pipe"); 414 } 415 break; 416 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint: 417 DVLOG(2) << "Handling channel message to remove message pipe (local ID " 418 << message_view.destination_id() << ", remote ID " 419 << message_view.source_id() << ")"; 420 if (!RemoveMessagePipeEndpoint(message_view.destination_id(), 421 message_view.source_id())) { 422 HandleRemoteError( 423 "Received invalid channel message to remove message pipe"); 424 } 425 break; 426 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck: 427 DVLOG(2) << "Handling channel message to ack remove message pipe (local " 428 "ID " << message_view.destination_id() << ", remote ID " 429 << message_view.source_id() << ")"; 430 if (!RemoveMessagePipeEndpoint(message_view.destination_id(), 431 message_view.source_id())) { 432 HandleRemoteError( 433 "Received invalid channel message to ack remove message pipe"); 434 } 435 break; 436 default: 437 HandleRemoteError("Received invalid channel message"); 438 NOTREACHED(); 439 break; 440 } 441 } 442 443 bool Channel::RemoveMessagePipeEndpoint( 444 MessageInTransit::EndpointId local_id, 445 MessageInTransit::EndpointId remote_id) { 446 DCHECK(creation_thread_checker_.CalledOnValidThread()); 447 448 // If this is non-null after the locked block, the endpoint should be detached 449 // (and no remove ack message sent). 450 scoped_refptr<ChannelEndpoint> endpoint_to_detach; 451 scoped_refptr<MessagePipe> message_pipe; 452 unsigned port = ~0u; 453 { 454 base::AutoLock locker(lock_); 455 456 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id); 457 if (it == local_id_to_endpoint_map_.end()) { 458 DVLOG(2) << "Remove message pipe error: not found"; 459 return false; 460 } 461 462 switch (it->second->state_) { 463 case ChannelEndpoint::STATE_NORMAL: 464 it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH; 465 message_pipe = it->second->message_pipe_; 466 port = it->second->port_; 467 it->second->message_pipe_ = nullptr; 468 // We have to send a remove ack message (outside the lock). 469 break; 470 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH: 471 DVLOG(2) << "Remove message pipe error: wrong state"; 472 return false; 473 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK: 474 endpoint_to_detach = it->second; 475 local_id_to_endpoint_map_.erase(it); 476 // We have to detach (outside the lock). 477 break; 478 } 479 } 480 if (endpoint_to_detach.get()) { 481 endpoint_to_detach->DetachFromChannel(); 482 return true; 483 } 484 485 if (!SendControlMessage( 486 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck, 487 local_id, 488 remote_id)) { 489 HandleLocalError(base::StringPrintf( 490 "Failed to send message to remove remote message pipe endpoint ack " 491 "(local ID %u, remote ID %u)", 492 static_cast<unsigned>(local_id), 493 static_cast<unsigned>(remote_id))); 494 } 495 496 message_pipe->OnRemove(port); 497 498 return true; 499 } 500 501 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, 502 MessageInTransit::EndpointId local_id, 503 MessageInTransit::EndpointId remote_id) { 504 DVLOG(2) << "Sending channel control message: subtype " << subtype 505 << ", local ID " << local_id << ", remote ID " << remote_id; 506 scoped_ptr<MessageInTransit> message(new MessageInTransit( 507 MessageInTransit::kTypeChannel, subtype, 0, nullptr)); 508 message->set_source_id(local_id); 509 message->set_destination_id(remote_id); 510 return WriteMessage(message.Pass()); 511 } 512 513 void Channel::HandleRemoteError(const base::StringPiece& error_message) { 514 // TODO(vtl): Is this how we really want to handle this? Probably we want to 515 // terminate the connection, since it's spewing invalid stuff. 516 LOG(WARNING) << error_message; 517 } 518 519 void Channel::HandleLocalError(const base::StringPiece& error_message) { 520 // TODO(vtl): Is this how we really want to handle this? 521 // Sometimes we'll want to propagate the error back to the message pipe 522 // (endpoint), and notify it that the remote is (effectively) closed. 523 // Sometimes we'll want to kill the channel (and notify all the endpoints that 524 // their remotes are dead. 525 LOG(WARNING) << error_message; 526 } 527 528 } // namespace system 529 } // namespace mojo 530