1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/edk/system/message_pipe_dispatcher.h" 6 7 #include <limits> 8 #include <memory> 9 10 #include "base/logging.h" 11 #include "base/macros.h" 12 #include "base/memory/ref_counted.h" 13 #include "mojo/edk/embedder/embedder_internal.h" 14 #include "mojo/edk/system/core.h" 15 #include "mojo/edk/system/message_for_transit.h" 16 #include "mojo/edk/system/node_controller.h" 17 #include "mojo/edk/system/ports/message_filter.h" 18 #include "mojo/edk/system/ports_message.h" 19 #include "mojo/edk/system/request_context.h" 20 21 namespace mojo { 22 namespace edk { 23 24 namespace { 25 26 using DispatcherHeader = MessageForTransit::DispatcherHeader; 27 using MessageHeader = MessageForTransit::MessageHeader; 28 29 #pragma pack(push, 1) 30 31 struct SerializedState { 32 uint64_t pipe_id; 33 int8_t endpoint; 34 char padding[7]; 35 }; 36 37 static_assert(sizeof(SerializedState) % 8 == 0, 38 "Invalid SerializedState size."); 39 40 #pragma pack(pop) 41 42 } // namespace 43 44 // A PortObserver which forwards to a MessagePipeDispatcher. This owns a 45 // reference to the MPD to ensure it lives as long as the observed port. 46 class MessagePipeDispatcher::PortObserverThunk 47 : public NodeController::PortObserver { 48 public: 49 explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher) 50 : dispatcher_(dispatcher) {} 51 52 private: 53 ~PortObserverThunk() override {} 54 55 // NodeController::PortObserver: 56 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } 57 58 scoped_refptr<MessagePipeDispatcher> dispatcher_; 59 60 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); 61 }; 62 63 // A MessageFilter used by ReadMessage to determine whether a message should 64 // actually be consumed yet. 65 class ReadMessageFilter : public ports::MessageFilter { 66 public: 67 // Creates a new ReadMessageFilter which captures and potentially modifies 68 // various (unowned) local state within MessagePipeDispatcher::ReadMessage. 69 ReadMessageFilter(bool read_any_size, 70 bool may_discard, 71 uint32_t* num_bytes, 72 uint32_t* num_handles, 73 bool* no_space, 74 bool* invalid_message) 75 : read_any_size_(read_any_size), 76 may_discard_(may_discard), 77 num_bytes_(num_bytes), 78 num_handles_(num_handles), 79 no_space_(no_space), 80 invalid_message_(invalid_message) {} 81 82 ~ReadMessageFilter() override {} 83 84 // ports::MessageFilter: 85 bool Match(const ports::Message& m) override { 86 const PortsMessage& message = static_cast<const PortsMessage&>(m); 87 if (message.num_payload_bytes() < sizeof(MessageHeader)) { 88 *invalid_message_ = true; 89 return true; 90 } 91 92 const MessageHeader* header = 93 static_cast<const MessageHeader*>(message.payload_bytes()); 94 if (header->header_size > message.num_payload_bytes()) { 95 *invalid_message_ = true; 96 return true; 97 } 98 99 uint32_t bytes_to_read = 0; 100 uint32_t bytes_available = 101 static_cast<uint32_t>(message.num_payload_bytes()) - 102 header->header_size; 103 if (num_bytes_) { 104 bytes_to_read = std::min(*num_bytes_, bytes_available); 105 *num_bytes_ = bytes_available; 106 } 107 108 uint32_t handles_to_read = 0; 109 uint32_t handles_available = header->num_dispatchers; 110 if (num_handles_) { 111 handles_to_read = std::min(*num_handles_, handles_available); 112 *num_handles_ = handles_available; 113 } 114 115 if (handles_to_read < handles_available || 116 (!read_any_size_ && bytes_to_read < bytes_available)) { 117 *no_space_ = true; 118 return may_discard_; 119 } 120 121 return true; 122 } 123 124 private: 125 const bool read_any_size_; 126 const bool may_discard_; 127 uint32_t* const num_bytes_; 128 uint32_t* const num_handles_; 129 bool* const no_space_; 130 bool* const invalid_message_; 131 132 DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter); 133 }; 134 135 #if DCHECK_IS_ON() 136 137 // A MessageFilter which never matches a message. Used to peek at the size of 138 // the next available message on a port, for debug logging only. 139 class PeekSizeMessageFilter : public ports::MessageFilter { 140 public: 141 PeekSizeMessageFilter() {} 142 ~PeekSizeMessageFilter() override {} 143 144 // ports::MessageFilter: 145 bool Match(const ports::Message& message) override { 146 message_size_ = message.num_payload_bytes(); 147 return false; 148 } 149 150 size_t message_size() const { return message_size_; } 151 152 private: 153 size_t message_size_ = 0; 154 155 DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter); 156 }; 157 158 #endif // DCHECK_IS_ON() 159 160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, 161 const ports::PortRef& port, 162 uint64_t pipe_id, 163 int endpoint) 164 : node_controller_(node_controller), 165 port_(port), 166 pipe_id_(pipe_id), 167 endpoint_(endpoint) { 168 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() 169 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; 170 171 node_controller_->SetPortObserver( 172 port_, 173 make_scoped_refptr(new PortObserverThunk(this))); 174 } 175 176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { 177 node_controller_->SetPortObserver(port_, nullptr); 178 node_controller_->SetPortObserver(other->port_, nullptr); 179 180 ports::PortRef port0; 181 { 182 base::AutoLock lock(signal_lock_); 183 port0 = port_; 184 port_closed_.Set(true); 185 awakables_.CancelAll(); 186 } 187 188 ports::PortRef port1; 189 { 190 base::AutoLock lock(other->signal_lock_); 191 port1 = other->port_; 192 other->port_closed_.Set(true); 193 other->awakables_.CancelAll(); 194 } 195 196 // Both ports are always closed by this call. 197 int rv = node_controller_->MergeLocalPorts(port0, port1); 198 return rv == ports::OK; 199 } 200 201 Dispatcher::Type MessagePipeDispatcher::GetType() const { 202 return Type::MESSAGE_PIPE; 203 } 204 205 MojoResult MessagePipeDispatcher::Close() { 206 base::AutoLock lock(signal_lock_); 207 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ 208 << " [port=" << port_.name() << "]"; 209 return CloseNoLock(); 210 } 211 212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, 213 const Watcher::WatchCallback& callback, 214 uintptr_t context) { 215 base::AutoLock lock(signal_lock_); 216 217 if (port_closed_ || in_transit_) 218 return MOJO_RESULT_INVALID_ARGUMENT; 219 220 return awakables_.AddWatcher( 221 signals, callback, context, GetHandleSignalsStateNoLock()); 222 } 223 224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { 225 base::AutoLock lock(signal_lock_); 226 227 if (port_closed_ || in_transit_) 228 return MOJO_RESULT_INVALID_ARGUMENT; 229 230 return awakables_.RemoveWatcher(context); 231 } 232 233 MojoResult MessagePipeDispatcher::WriteMessage( 234 std::unique_ptr<MessageForTransit> message, 235 MojoWriteMessageFlags flags) { 236 if (port_closed_ || in_transit_) 237 return MOJO_RESULT_INVALID_ARGUMENT; 238 239 size_t num_bytes = message->num_bytes(); 240 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); 241 242 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ 243 << " [port=" << port_.name() << "; rv=" << rv 244 << "; num_bytes=" << num_bytes << "]"; 245 246 if (rv != ports::OK) { 247 if (rv == ports::ERROR_PORT_UNKNOWN || 248 rv == ports::ERROR_PORT_STATE_UNEXPECTED || 249 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { 250 return MOJO_RESULT_INVALID_ARGUMENT; 251 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { 252 return MOJO_RESULT_FAILED_PRECONDITION; 253 } 254 255 NOTREACHED(); 256 return MOJO_RESULT_UNKNOWN; 257 } 258 259 return MOJO_RESULT_OK; 260 } 261 262 MojoResult MessagePipeDispatcher::ReadMessage( 263 std::unique_ptr<MessageForTransit>* message, 264 uint32_t* num_bytes, 265 MojoHandle* handles, 266 uint32_t* num_handles, 267 MojoReadMessageFlags flags, 268 bool read_any_size) { 269 // We can't read from a port that's closed or in transit! 270 if (port_closed_ || in_transit_) 271 return MOJO_RESULT_INVALID_ARGUMENT; 272 273 bool no_space = false; 274 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; 275 bool invalid_message = false; 276 277 // Grab a message if the provided handles buffer is large enough. If the input 278 // |num_bytes| is provided and |read_any_size| is false, we also ensure 279 // that it specifies a size at least as large as the next available payload. 280 // 281 // If |read_any_size| is true, the input value of |*num_bytes| is ignored. 282 // This flag exists to support both new and old API behavior. 283 284 ports::ScopedMessage ports_message; 285 ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles, 286 &no_space, &invalid_message); 287 int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter); 288 289 if (invalid_message) 290 return MOJO_RESULT_UNKNOWN; 291 292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { 293 if (rv == ports::ERROR_PORT_UNKNOWN || 294 rv == ports::ERROR_PORT_STATE_UNEXPECTED) 295 return MOJO_RESULT_INVALID_ARGUMENT; 296 297 NOTREACHED(); 298 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? 299 } 300 301 if (no_space) { 302 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't 303 // sufficient to hold this message's data. The message will still be in 304 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. 305 return MOJO_RESULT_RESOURCE_EXHAUSTED; 306 } 307 308 if (!ports_message) { 309 // No message was available in queue. 310 311 if (rv == ports::OK) 312 return MOJO_RESULT_SHOULD_WAIT; 313 314 // Peer is closed and there are no more messages to read. 315 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); 316 return MOJO_RESULT_FAILED_PRECONDITION; 317 } 318 319 // Alright! We have a message and the caller has provided sufficient storage 320 // in which to receive it. 321 322 std::unique_ptr<PortsMessage> msg( 323 static_cast<PortsMessage*>(ports_message.release())); 324 325 const MessageHeader* header = 326 static_cast<const MessageHeader*>(msg->payload_bytes()); 327 const DispatcherHeader* dispatcher_headers = 328 reinterpret_cast<const DispatcherHeader*>(header + 1); 329 330 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) 331 return MOJO_RESULT_UNKNOWN; 332 333 // Deserialize dispatchers. 334 if (header->num_dispatchers > 0) { 335 CHECK(handles); 336 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers); 337 size_t data_payload_index = sizeof(MessageHeader) + 338 header->num_dispatchers * sizeof(DispatcherHeader); 339 if (data_payload_index > header->header_size) 340 return MOJO_RESULT_UNKNOWN; 341 const char* dispatcher_data = reinterpret_cast<const char*>( 342 dispatcher_headers + header->num_dispatchers); 343 size_t port_index = 0; 344 size_t platform_handle_index = 0; 345 ScopedPlatformHandleVectorPtr msg_handles = msg->TakeHandles(); 346 const size_t num_msg_handles = msg_handles ? msg_handles->size() : 0; 347 for (size_t i = 0; i < header->num_dispatchers; ++i) { 348 const DispatcherHeader& dh = dispatcher_headers[i]; 349 Type type = static_cast<Type>(dh.type); 350 351 size_t next_payload_index = data_payload_index + dh.num_bytes; 352 if (msg->num_payload_bytes() < next_payload_index || 353 next_payload_index < data_payload_index) { 354 return MOJO_RESULT_UNKNOWN; 355 } 356 357 size_t next_port_index = port_index + dh.num_ports; 358 if (msg->num_ports() < next_port_index || next_port_index < port_index) 359 return MOJO_RESULT_UNKNOWN; 360 361 size_t next_platform_handle_index = 362 platform_handle_index + dh.num_platform_handles; 363 if (num_msg_handles < next_platform_handle_index || 364 next_platform_handle_index < platform_handle_index) { 365 return MOJO_RESULT_UNKNOWN; 366 } 367 368 PlatformHandle* out_handles = 369 num_msg_handles ? msg_handles->data() + platform_handle_index 370 : nullptr; 371 dispatchers[i].dispatcher = Dispatcher::Deserialize( 372 type, dispatcher_data, dh.num_bytes, msg->ports() + port_index, 373 dh.num_ports, out_handles, dh.num_platform_handles); 374 if (!dispatchers[i].dispatcher) 375 return MOJO_RESULT_UNKNOWN; 376 377 dispatcher_data += dh.num_bytes; 378 data_payload_index = next_payload_index; 379 port_index = next_port_index; 380 platform_handle_index = next_platform_handle_index; 381 } 382 383 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers, 384 handles)) 385 return MOJO_RESULT_UNKNOWN; 386 } 387 388 CHECK(msg); 389 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); 390 return MOJO_RESULT_OK; 391 } 392 393 HandleSignalsState 394 MessagePipeDispatcher::GetHandleSignalsState() const { 395 base::AutoLock lock(signal_lock_); 396 return GetHandleSignalsStateNoLock(); 397 } 398 399 MojoResult MessagePipeDispatcher::AddAwakable( 400 Awakable* awakable, 401 MojoHandleSignals signals, 402 uintptr_t context, 403 HandleSignalsState* signals_state) { 404 base::AutoLock lock(signal_lock_); 405 406 if (port_closed_ || in_transit_) { 407 if (signals_state) 408 *signals_state = HandleSignalsState(); 409 return MOJO_RESULT_INVALID_ARGUMENT; 410 } 411 412 HandleSignalsState state = GetHandleSignalsStateNoLock(); 413 414 DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint " 415 << endpoint_ << " [awakable=" << awakable << "; port=" 416 << port_.name() << "; signals=" << signals << "; satisfied=" 417 << state.satisfied_signals << "; satisfiable=" 418 << state.satisfiable_signals << "]"; 419 420 if (state.satisfies(signals)) { 421 if (signals_state) 422 *signals_state = state; 423 DVLOG(2) << "Signals already set for " << port_.name(); 424 return MOJO_RESULT_ALREADY_EXISTS; 425 } 426 if (!state.can_satisfy(signals)) { 427 if (signals_state) 428 *signals_state = state; 429 DVLOG(2) << "Signals impossible to satisfy for " << port_.name(); 430 return MOJO_RESULT_FAILED_PRECONDITION; 431 } 432 433 DVLOG(2) << "Adding awakable to pipe " << pipe_id_ << " endpoint " 434 << endpoint_ << " [awakable=" << awakable << "; port=" 435 << port_.name() << "; signals=" << signals << "]"; 436 437 awakables_.Add(awakable, signals, context); 438 return MOJO_RESULT_OK; 439 } 440 441 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable, 442 HandleSignalsState* signals_state) { 443 base::AutoLock lock(signal_lock_); 444 if (port_closed_ || in_transit_) { 445 if (signals_state) 446 *signals_state = HandleSignalsState(); 447 } else if (signals_state) { 448 *signals_state = GetHandleSignalsStateNoLock(); 449 } 450 451 DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint " 452 << endpoint_ << " [awakable=" << awakable << "; port=" 453 << port_.name() << "]"; 454 455 awakables_.Remove(awakable); 456 } 457 458 void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes, 459 uint32_t* num_ports, 460 uint32_t* num_handles) { 461 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); 462 *num_ports = 1; 463 *num_handles = 0; 464 } 465 466 bool MessagePipeDispatcher::EndSerialize(void* destination, 467 ports::PortName* ports, 468 PlatformHandle* handles) { 469 SerializedState* state = static_cast<SerializedState*>(destination); 470 state->pipe_id = pipe_id_; 471 state->endpoint = static_cast<int8_t>(endpoint_); 472 memset(state->padding, 0, sizeof(state->padding)); 473 ports[0] = port_.name(); 474 return true; 475 } 476 477 bool MessagePipeDispatcher::BeginTransit() { 478 base::AutoLock lock(signal_lock_); 479 if (in_transit_ || port_closed_) 480 return false; 481 in_transit_.Set(true); 482 return in_transit_; 483 } 484 485 void MessagePipeDispatcher::CompleteTransitAndClose() { 486 node_controller_->SetPortObserver(port_, nullptr); 487 488 base::AutoLock lock(signal_lock_); 489 port_transferred_ = true; 490 in_transit_.Set(false); 491 CloseNoLock(); 492 } 493 494 void MessagePipeDispatcher::CancelTransit() { 495 base::AutoLock lock(signal_lock_); 496 in_transit_.Set(false); 497 498 // Something may have happened while we were waiting for potential transit. 499 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 500 } 501 502 // static 503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( 504 const void* data, 505 size_t num_bytes, 506 const ports::PortName* ports, 507 size_t num_ports, 508 PlatformHandle* handles, 509 size_t num_handles) { 510 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState)) 511 return nullptr; 512 513 const SerializedState* state = static_cast<const SerializedState*>(data); 514 515 ports::PortRef port; 516 CHECK_EQ( 517 ports::OK, 518 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port)); 519 520 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, 521 state->pipe_id, state->endpoint); 522 } 523 524 MessagePipeDispatcher::~MessagePipeDispatcher() { 525 DCHECK(port_closed_ && !in_transit_); 526 } 527 528 MojoResult MessagePipeDispatcher::CloseNoLock() { 529 signal_lock_.AssertAcquired(); 530 if (port_closed_ || in_transit_) 531 return MOJO_RESULT_INVALID_ARGUMENT; 532 533 port_closed_.Set(true); 534 awakables_.CancelAll(); 535 536 if (!port_transferred_) { 537 base::AutoUnlock unlock(signal_lock_); 538 node_controller_->ClosePort(port_); 539 } 540 541 return MOJO_RESULT_OK; 542 } 543 544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { 545 HandleSignalsState rv; 546 547 ports::PortStatus port_status; 548 if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) { 549 CHECK(in_transit_ || port_transferred_ || port_closed_); 550 return HandleSignalsState(); 551 } 552 553 if (port_status.has_messages) { 554 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; 555 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 556 } 557 if (port_status.receiving_messages) 558 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 559 if (!port_status.peer_closed) { 560 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 561 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 562 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 563 } else { 564 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 565 } 566 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 567 return rv; 568 } 569 570 void MessagePipeDispatcher::OnPortStatusChanged() { 571 DCHECK(RequestContext::current()); 572 573 base::AutoLock lock(signal_lock_); 574 575 // We stop observing our port as soon as it's transferred, but this can race 576 // with events which are raised right before that happens. This is fine to 577 // ignore. 578 if (port_transferred_) 579 return; 580 581 #if DCHECK_IS_ON() 582 ports::PortStatus port_status; 583 if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) { 584 if (port_status.has_messages) { 585 ports::ScopedMessage unused; 586 PeekSizeMessageFilter filter; 587 node_controller_->node()->GetMessage(port_, &unused, &filter); 588 DVLOG(4) << "New message detected on message pipe " << pipe_id_ 589 << " endpoint " << endpoint_ << " [port=" << port_.name() 590 << "; size=" << filter.message_size() << "]"; 591 } 592 if (port_status.peer_closed) { 593 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ 594 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; 595 } 596 } 597 #endif 598 599 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 600 } 601 602 } // namespace edk 603 } // namespace mojo 604