1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/channel.h" 6 7 #include <algorithm> 8 9 #include "base/basictypes.h" 10 #include "base/bind.h" 11 #include "base/compiler_specific.h" 12 #include "base/logging.h" 13 #include "base/strings/stringprintf.h" 14 #include "mojo/embedder/platform_handle_vector.h" 15 #include "mojo/system/message_pipe_endpoint.h" 16 #include "mojo/system/transport_data.h" 17 18 namespace mojo { 19 namespace system { 20 21 COMPILE_ASSERT(Channel::kBootstrapEndpointId != 22 MessageInTransit::kInvalidEndpointId, 23 kBootstrapEndpointId_is_invalid); 24 25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId 26 Channel::kBootstrapEndpointId; 27 28 Channel::EndpointInfo::EndpointInfo() 29 : state(STATE_NORMAL), 30 port() { 31 } 32 33 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe, 34 unsigned port) 35 : state(STATE_NORMAL), 36 message_pipe(message_pipe), 37 port(port) { 38 } 39 40 Channel::EndpointInfo::~EndpointInfo() { 41 } 42 43 Channel::Channel() 44 : is_running_(false), 45 next_local_id_(kBootstrapEndpointId) { 46 } 47 48 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) { 49 DCHECK(creation_thread_checker_.CalledOnValidThread()); 50 DCHECK(raw_channel); 51 52 // No need to take |lock_|, since this must be called before this object 53 // becomes thread-safe. 54 DCHECK(!is_running_no_lock()); 55 raw_channel_ = raw_channel.Pass(); 56 57 if (!raw_channel_->Init(this)) { 58 raw_channel_.reset(); 59 return false; 60 } 61 62 is_running_ = true; 63 return true; 64 } 65 66 void Channel::Shutdown() { 67 DCHECK(creation_thread_checker_.CalledOnValidThread()); 68 69 IdToEndpointInfoMap to_destroy; 70 { 71 base::AutoLock locker(lock_); 72 if (!is_running_no_lock()) 73 return; 74 75 // Note: Don't reset |raw_channel_|, in case we're being called from within 76 // |OnReadMessage()| or |OnFatalError()|. 77 raw_channel_->Shutdown(); 78 is_running_ = false; 79 80 // We need to deal with it outside the lock. 81 std::swap(to_destroy, local_id_to_endpoint_info_map_); 82 } 83 84 size_t num_live = 0; 85 size_t num_zombies = 0; 86 for (IdToEndpointInfoMap::iterator it = to_destroy.begin(); 87 it != to_destroy.end(); 88 ++it) { 89 if (it->second.state == EndpointInfo::STATE_NORMAL) { 90 it->second.message_pipe->OnRemove(it->second.port); 91 num_live++; 92 } else { 93 DCHECK(!it->second.message_pipe); 94 num_zombies++; 95 } 96 } 97 DVLOG_IF(2, num_live || num_zombies) 98 << "Shut down Channel with " << num_live << " live endpoints and " 99 << num_zombies << " zombies"; 100 } 101 102 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( 103 scoped_refptr<MessagePipe> message_pipe, 104 unsigned port) { 105 DCHECK(message_pipe); 106 DCHECK(port == 0 || port == 1); 107 108 MessageInTransit::EndpointId local_id; 109 { 110 base::AutoLock locker(lock_); 111 112 while (next_local_id_ == MessageInTransit::kInvalidEndpointId || 113 local_id_to_endpoint_info_map_.find(next_local_id_) != 114 local_id_to_endpoint_info_map_.end()) 115 next_local_id_++; 116 117 local_id = next_local_id_; 118 next_local_id_++; 119 120 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid 121 // some expensive reference count increment/decrements.) Once this is done, 122 // we should be able to delete |EndpointInfo|'s default constructor. 123 local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port); 124 } 125 126 // This might fail if that port got an |OnPeerClose()| before attaching. 127 if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id)) 128 return local_id; 129 130 // Note: If it failed, quite possibly the endpoint info was removed from that 131 // map (there's a race between us adding it to the map above and calling 132 // |Attach()|). And even if an entry exists for |local_id|, we need to check 133 // that it's the one we added (and not some other one that was added since). 134 { 135 base::AutoLock locker(lock_); 136 IdToEndpointInfoMap::iterator it = 137 local_id_to_endpoint_info_map_.find(local_id); 138 if (it != local_id_to_endpoint_info_map_.end() && 139 it->second.message_pipe.get() == message_pipe.get() && 140 it->second.port == port) { 141 DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL); 142 // TODO(vtl): FIXME -- This is wrong. We need to specify (to 143 // |AttachMessagePipeEndpoint()| who's going to be responsible for calling 144 // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a 145 // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to 146 // run, then we'll get messages to an "invalid" local ID (for running, for 147 // removal). 148 local_id_to_endpoint_info_map_.erase(it); 149 } 150 } 151 return MessageInTransit::kInvalidEndpointId; 152 } 153 154 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, 155 MessageInTransit::EndpointId remote_id) { 156 EndpointInfo endpoint_info; 157 { 158 base::AutoLock locker(lock_); 159 160 IdToEndpointInfoMap::const_iterator it = 161 local_id_to_endpoint_info_map_.find(local_id); 162 if (it == local_id_to_endpoint_info_map_.end()) 163 return false; 164 endpoint_info = it->second; 165 } 166 167 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint| 168 // and ignore it. 169 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { 170 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint " 171 "(local ID " << local_id << ", remote ID " << remote_id << ")"; 172 return true; 173 } 174 175 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already 176 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). 177 endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); 178 return true; 179 } 180 181 void Channel::RunRemoteMessagePipeEndpoint( 182 MessageInTransit::EndpointId local_id, 183 MessageInTransit::EndpointId remote_id) { 184 #if DCHECK_IS_ON 185 { 186 base::AutoLock locker(lock_); 187 DCHECK(local_id_to_endpoint_info_map_.find(local_id) != 188 local_id_to_endpoint_info_map_.end()); 189 } 190 #endif 191 192 if (!SendControlMessage( 193 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, 194 local_id, remote_id)) { 195 HandleLocalError(base::StringPrintf( 196 "Failed to send message to run remote message pipe endpoint (local ID " 197 "%u, remote ID %u)", 198 static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); 199 } 200 } 201 202 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) { 203 base::AutoLock locker(lock_); 204 if (!is_running_no_lock()) { 205 // TODO(vtl): I think this is probably not an error condition, but I should 206 // think about it (and the shutdown sequence) more carefully. 207 LOG(WARNING) << "WriteMessage() after shutdown"; 208 return false; 209 } 210 211 return raw_channel_->WriteMessage(message.Pass()); 212 } 213 214 bool Channel::IsWriteBufferEmpty() { 215 base::AutoLock locker(lock_); 216 if (!is_running_no_lock()) 217 return true; 218 return raw_channel_->IsWriteBufferEmpty(); 219 } 220 221 void Channel::DetachMessagePipeEndpoint( 222 MessageInTransit::EndpointId local_id, 223 MessageInTransit::EndpointId remote_id) { 224 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 225 226 bool should_send_remove_message = false; 227 { 228 base::AutoLock locker_(lock_); 229 if (!is_running_no_lock()) 230 return; 231 232 IdToEndpointInfoMap::iterator it = 233 local_id_to_endpoint_info_map_.find(local_id); 234 DCHECK(it != local_id_to_endpoint_info_map_.end()); 235 236 switch (it->second.state) { 237 case EndpointInfo::STATE_NORMAL: 238 it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; 239 it->second.message_pipe = NULL; 240 should_send_remove_message = 241 (remote_id != MessageInTransit::kInvalidEndpointId); 242 break; 243 case EndpointInfo::STATE_WAIT_LOCAL_DETACH: 244 local_id_to_endpoint_info_map_.erase(it); 245 break; 246 case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK: 247 NOTREACHED(); 248 break; 249 case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK: 250 it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; 251 break; 252 } 253 } 254 if (!should_send_remove_message) 255 return; 256 257 if (!SendControlMessage( 258 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, 259 local_id, remote_id)) { 260 HandleLocalError(base::StringPrintf( 261 "Failed to send message to remove remote message pipe endpoint (local " 262 "ID %u, remote ID %u)", 263 static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); 264 } 265 } 266 267 size_t Channel::GetSerializedPlatformHandleSize() const { 268 return raw_channel_->GetSerializedPlatformHandleSize(); 269 } 270 271 Channel::~Channel() { 272 // The channel should have been shut down first. 273 DCHECK(!is_running_no_lock()); 274 } 275 276 void Channel::OnReadMessage( 277 const MessageInTransit::View& message_view, 278 embedder::ScopedPlatformHandleVectorPtr platform_handles) { 279 switch (message_view.type()) { 280 case MessageInTransit::kTypeMessagePipeEndpoint: 281 case MessageInTransit::kTypeMessagePipe: 282 OnReadMessageForDownstream(message_view, platform_handles.Pass()); 283 break; 284 case MessageInTransit::kTypeChannel: 285 OnReadMessageForChannel(message_view, platform_handles.Pass()); 286 break; 287 default: 288 HandleRemoteError(base::StringPrintf( 289 "Received message of invalid type %u", 290 static_cast<unsigned>(message_view.type()))); 291 break; 292 } 293 } 294 295 void Channel::OnFatalError(FatalError fatal_error) { 296 switch (fatal_error) { 297 case FATAL_ERROR_READ: 298 // Most read errors aren't notable: they just reflect that the other side 299 // tore down the channel. 300 DVLOG(1) << "RawChannel fatal error (read)"; 301 break; 302 case FATAL_ERROR_WRITE: 303 // Write errors are slightly notable: they probably shouldn't happen under 304 // normal operation (but maybe the other side crashed). 305 LOG(WARNING) << "RawChannel fatal error (write)"; 306 break; 307 } 308 Shutdown(); 309 } 310 311 void Channel::OnReadMessageForDownstream( 312 const MessageInTransit::View& message_view, 313 embedder::ScopedPlatformHandleVectorPtr platform_handles) { 314 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint || 315 message_view.type() == MessageInTransit::kTypeMessagePipe); 316 317 MessageInTransit::EndpointId local_id = message_view.destination_id(); 318 if (local_id == MessageInTransit::kInvalidEndpointId) { 319 HandleRemoteError("Received message with no destination ID"); 320 return; 321 } 322 323 EndpointInfo endpoint_info; 324 { 325 base::AutoLock locker(lock_); 326 327 // Since we own |raw_channel_|, and this method and |Shutdown()| should only 328 // be called from the creation thread, |raw_channel_| should never be null 329 // here. 330 DCHECK(is_running_no_lock()); 331 332 IdToEndpointInfoMap::const_iterator it = 333 local_id_to_endpoint_info_map_.find(local_id); 334 if (it == local_id_to_endpoint_info_map_.end()) { 335 HandleRemoteError(base::StringPrintf( 336 "Received a message for nonexistent local destination ID %u", 337 static_cast<unsigned>(local_id))); 338 // This is strongly indicative of some problem. However, it's not a fatal 339 // error, since it may indicate a bug (or hostile) remote process. Don't 340 // die even for Debug builds, since handling this properly needs to be 341 // tested (TODO(vtl)). 342 DLOG(ERROR) << "This should not happen under normal operation."; 343 return; 344 } 345 endpoint_info = it->second; 346 } 347 348 // Ignore messages for zombie endpoints (not an error). 349 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { 350 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = " 351 << local_id << ", remote ID = " << message_view.source_id() << ")"; 352 return; 353 } 354 355 // We need to duplicate the message (data), because |EnqueueMessage()| will 356 // take ownership of it. 357 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); 358 if (message_view.transport_data_buffer_size() > 0) { 359 DCHECK(message_view.transport_data_buffer()); 360 message->SetDispatchers( 361 TransportData::DeserializeDispatchers( 362 message_view.transport_data_buffer(), 363 message_view.transport_data_buffer_size(), 364 platform_handles.Pass(), 365 this)); 366 } 367 MojoResult result = endpoint_info.message_pipe->EnqueueMessage( 368 MessagePipe::GetPeerPort(endpoint_info.port), message.Pass()); 369 if (result != MOJO_RESULT_OK) { 370 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint 371 // has been closed (in an unavoidable race). This might also be a "remote" 372 // error, e.g., if the remote side is sending invalid control messages (to 373 // the message pipe). 374 HandleLocalError(base::StringPrintf( 375 "Failed to enqueue message to local ID %u (result %d)", 376 static_cast<unsigned>(local_id), static_cast<int>(result))); 377 return; 378 } 379 } 380 381 void Channel::OnReadMessageForChannel( 382 const MessageInTransit::View& message_view, 383 embedder::ScopedPlatformHandleVectorPtr platform_handles) { 384 DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel); 385 386 // Currently, no channel messages take platform handles. 387 if (platform_handles) { 388 HandleRemoteError( 389 "Received invalid channel message (has platform handles)"); 390 NOTREACHED(); 391 return; 392 } 393 394 switch (message_view.subtype()) { 395 case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint: 396 DVLOG(2) << "Handling channel message to run message pipe (local ID " 397 << message_view.destination_id() << ", remote ID " 398 << message_view.source_id() << ")"; 399 if (!RunMessagePipeEndpoint(message_view.destination_id(), 400 message_view.source_id())) { 401 HandleRemoteError( 402 "Received invalid channel message to run message pipe"); 403 } 404 break; 405 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint: 406 DVLOG(2) << "Handling channel message to remove message pipe (local ID " 407 << message_view.destination_id() << ", remote ID " 408 << message_view.source_id() << ")"; 409 if (!RemoveMessagePipeEndpoint(message_view.destination_id(), 410 message_view.source_id())) { 411 HandleRemoteError( 412 "Received invalid channel message to remove message pipe"); 413 } 414 break; 415 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck: 416 DVLOG(2) << "Handling channel message to ack remove message pipe (local " 417 "ID " 418 << message_view.destination_id() << ", remote ID " 419 << message_view.source_id() << ")"; 420 if (!RemoveMessagePipeEndpoint(message_view.destination_id(), 421 message_view.source_id())) { 422 HandleRemoteError( 423 "Received invalid channel message to ack remove message pipe"); 424 } 425 break; 426 default: 427 HandleRemoteError("Received invalid channel message"); 428 NOTREACHED(); 429 break; 430 } 431 } 432 433 bool Channel::RemoveMessagePipeEndpoint( 434 MessageInTransit::EndpointId local_id, 435 MessageInTransit::EndpointId remote_id) { 436 EndpointInfo endpoint_info; 437 { 438 base::AutoLock locker(lock_); 439 440 IdToEndpointInfoMap::iterator it = 441 local_id_to_endpoint_info_map_.find(local_id); 442 if (it == local_id_to_endpoint_info_map_.end()) { 443 DVLOG(2) << "Remove message pipe error: not found"; 444 return false; 445 } 446 447 // If it's waiting for the remove ack, just do it and return. 448 if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) { 449 local_id_to_endpoint_info_map_.erase(it); 450 return true; 451 } 452 453 if (it->second.state != EndpointInfo::STATE_NORMAL) { 454 DVLOG(2) << "Remove message pipe error: wrong state"; 455 return false; 456 } 457 458 it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH; 459 endpoint_info = it->second; 460 it->second.message_pipe = NULL; 461 } 462 463 if (!SendControlMessage( 464 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck, 465 local_id, remote_id)) { 466 HandleLocalError(base::StringPrintf( 467 "Failed to send message to remove remote message pipe endpoint ack " 468 "(local ID %u, remote ID %u)", 469 static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); 470 } 471 472 endpoint_info.message_pipe->OnRemove(endpoint_info.port); 473 474 return true; 475 } 476 477 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, 478 MessageInTransit::EndpointId local_id, 479 MessageInTransit::EndpointId remote_id) { 480 DVLOG(2) << "Sending channel control message: subtype " << subtype 481 << ", local ID " << local_id << ", remote ID " << remote_id; 482 scoped_ptr<MessageInTransit> message(new MessageInTransit( 483 MessageInTransit::kTypeChannel, subtype, 0, NULL)); 484 message->set_source_id(local_id); 485 message->set_destination_id(remote_id); 486 return WriteMessage(message.Pass()); 487 } 488 489 void Channel::HandleRemoteError(const base::StringPiece& error_message) { 490 // TODO(vtl): Is this how we really want to handle this? Probably we want to 491 // terminate the connection, since it's spewing invalid stuff. 492 LOG(WARNING) << error_message; 493 } 494 495 void Channel::HandleLocalError(const base::StringPiece& error_message) { 496 // TODO(vtl): Is this how we really want to handle this? 497 // Sometimes we'll want to propagate the error back to the message pipe 498 // (endpoint), and notify it that the remote is (effectively) closed. 499 // Sometimes we'll want to kill the channel (and notify all the endpoints that 500 // their remotes are dead. 501 LOG(WARNING) << error_message; 502 } 503 504 } // namespace system 505 } // namespace mojo 506