1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" 6 7 #include <stddef.h> 8 #include <stdint.h> 9 10 #include <algorithm> 11 #include <limits> 12 #include <utility> 13 14 #include "base/bind.h" 15 #include "base/logging.h" 16 #include "base/memory/ref_counted.h" 17 #include "base/message_loop/message_loop.h" 18 #include "mojo/edk/embedder/embedder_internal.h" 19 #include "mojo/edk/embedder/platform_shared_buffer.h" 20 #include "mojo/edk/system/core.h" 21 #include "mojo/edk/system/data_pipe_control_message.h" 22 #include "mojo/edk/system/node_controller.h" 23 #include "mojo/edk/system/ports_message.h" 24 #include "mojo/edk/system/request_context.h" 25 #include "mojo/public/c/system/data_pipe.h" 26 27 namespace mojo { 28 namespace edk { 29 30 namespace { 31 32 const uint8_t kFlagPeerClosed = 0x01; 33 34 #pragma pack(push, 1) 35 36 struct SerializedState { 37 MojoCreateDataPipeOptions options; 38 uint64_t pipe_id; 39 uint32_t read_offset; 40 uint32_t bytes_available; 41 uint8_t flags; 42 char padding[7]; 43 }; 44 45 static_assert(sizeof(SerializedState) % 8 == 0, 46 "Invalid SerializedState size."); 47 48 #pragma pack(pop) 49 50 } // namespace 51 52 // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a 53 // reference to the dispatcher to ensure it lives as long as the observed port. 54 class DataPipeConsumerDispatcher::PortObserverThunk 55 : public NodeController::PortObserver { 56 public: 57 explicit PortObserverThunk( 58 scoped_refptr<DataPipeConsumerDispatcher> dispatcher) 59 : dispatcher_(dispatcher) {} 60 61 private: 62 ~PortObserverThunk() override {} 63 64 // NodeController::PortObserver: 65 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } 66 67 scoped_refptr<DataPipeConsumerDispatcher> dispatcher_; 68 69 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); 70 }; 71 72 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( 73 NodeController* node_controller, 74 const ports::PortRef& control_port, 75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, 76 const MojoCreateDataPipeOptions& options, 77 bool initialized, 78 uint64_t pipe_id) 79 : options_(options), 80 node_controller_(node_controller), 81 control_port_(control_port), 82 pipe_id_(pipe_id), 83 shared_ring_buffer_(shared_ring_buffer) { 84 if (initialized) { 85 base::AutoLock lock(lock_); 86 InitializeNoLock(); 87 } 88 } 89 90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { 91 return Type::DATA_PIPE_CONSUMER; 92 } 93 94 MojoResult DataPipeConsumerDispatcher::Close() { 95 base::AutoLock lock(lock_); 96 DVLOG(1) << "Closing data pipe consumer " << pipe_id_; 97 return CloseNoLock(); 98 } 99 100 101 MojoResult DataPipeConsumerDispatcher::Watch( 102 MojoHandleSignals signals, 103 const Watcher::WatchCallback& callback, 104 uintptr_t context) { 105 base::AutoLock lock(lock_); 106 107 if (is_closed_ || in_transit_) 108 return MOJO_RESULT_INVALID_ARGUMENT; 109 110 return awakable_list_.AddWatcher( 111 signals, callback, context, GetHandleSignalsStateNoLock()); 112 } 113 114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) { 115 base::AutoLock lock(lock_); 116 117 if (is_closed_ || in_transit_) 118 return MOJO_RESULT_INVALID_ARGUMENT; 119 120 return awakable_list_.RemoveWatcher(context); 121 } 122 123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, 124 uint32_t* num_bytes, 125 MojoReadDataFlags flags) { 126 base::AutoLock lock(lock_); 127 new_data_available_ = false; 128 129 if (!shared_ring_buffer_ || in_transit_) 130 return MOJO_RESULT_INVALID_ARGUMENT; 131 132 if (in_two_phase_read_) 133 return MOJO_RESULT_BUSY; 134 135 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { 136 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || 137 (flags & MOJO_READ_DATA_FLAG_DISCARD)) 138 return MOJO_RESULT_INVALID_ARGUMENT; 139 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. 140 DVLOG_IF(2, elements) 141 << "Query mode: ignoring non-null |elements|"; 142 *num_bytes = static_cast<uint32_t>(bytes_available_); 143 return MOJO_RESULT_OK; 144 } 145 146 bool discard = false; 147 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { 148 // These flags are mutally exclusive. 149 if (flags & MOJO_READ_DATA_FLAG_PEEK) 150 return MOJO_RESULT_INVALID_ARGUMENT; 151 DVLOG_IF(2, elements) 152 << "Discard mode: ignoring non-null |elements|"; 153 discard = true; 154 } 155 156 uint32_t max_num_bytes_to_read = *num_bytes; 157 if (max_num_bytes_to_read % options_.element_num_bytes != 0) 158 return MOJO_RESULT_INVALID_ARGUMENT; 159 160 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; 161 uint32_t min_num_bytes_to_read = 162 all_or_none ? max_num_bytes_to_read : 0; 163 164 if (min_num_bytes_to_read > bytes_available_) { 165 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 166 : MOJO_RESULT_OUT_OF_RANGE; 167 } 168 169 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); 170 if (bytes_to_read == 0) { 171 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 172 : MOJO_RESULT_SHOULD_WAIT; 173 } 174 175 if (!discard) { 176 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 177 CHECK(data); 178 179 uint8_t* destination = static_cast<uint8_t*>(elements); 180 CHECK(destination); 181 182 DCHECK_LE(read_offset_, options_.capacity_num_bytes); 183 uint32_t tail_bytes_to_copy = 184 std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read); 185 uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy; 186 if (tail_bytes_to_copy > 0) 187 memcpy(destination, data + read_offset_, tail_bytes_to_copy); 188 if (head_bytes_to_copy > 0) 189 memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy); 190 } 191 *num_bytes = bytes_to_read; 192 193 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); 194 if (discard || !peek) { 195 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; 196 bytes_available_ -= bytes_to_read; 197 198 base::AutoUnlock unlock(lock_); 199 NotifyRead(bytes_to_read); 200 } 201 202 return MOJO_RESULT_OK; 203 } 204 205 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, 206 uint32_t* buffer_num_bytes, 207 MojoReadDataFlags flags) { 208 base::AutoLock lock(lock_); 209 new_data_available_ = false; 210 if (!shared_ring_buffer_ || in_transit_) 211 return MOJO_RESULT_INVALID_ARGUMENT; 212 213 if (in_two_phase_read_) 214 return MOJO_RESULT_BUSY; 215 216 // These flags may not be used in two-phase mode. 217 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || 218 (flags & MOJO_READ_DATA_FLAG_QUERY) || 219 (flags & MOJO_READ_DATA_FLAG_PEEK)) 220 return MOJO_RESULT_INVALID_ARGUMENT; 221 222 if (bytes_available_ == 0) { 223 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 224 : MOJO_RESULT_SHOULD_WAIT; 225 } 226 227 DCHECK_LT(read_offset_, options_.capacity_num_bytes); 228 uint32_t bytes_to_read = std::min(bytes_available_, 229 options_.capacity_num_bytes - read_offset_); 230 231 CHECK(ring_buffer_mapping_); 232 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 233 CHECK(data); 234 235 in_two_phase_read_ = true; 236 *buffer = data + read_offset_; 237 *buffer_num_bytes = bytes_to_read; 238 two_phase_max_bytes_read_ = bytes_to_read; 239 240 return MOJO_RESULT_OK; 241 } 242 243 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { 244 base::AutoLock lock(lock_); 245 if (!in_two_phase_read_) 246 return MOJO_RESULT_FAILED_PRECONDITION; 247 248 if (in_transit_) 249 return MOJO_RESULT_INVALID_ARGUMENT; 250 251 CHECK(shared_ring_buffer_); 252 253 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); 254 MojoResult rv; 255 if (num_bytes_read > two_phase_max_bytes_read_ || 256 num_bytes_read % options_.element_num_bytes != 0) { 257 rv = MOJO_RESULT_INVALID_ARGUMENT; 258 } else { 259 rv = MOJO_RESULT_OK; 260 read_offset_ = 261 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes; 262 263 DCHECK_GE(bytes_available_, num_bytes_read); 264 bytes_available_ -= num_bytes_read; 265 266 base::AutoUnlock unlock(lock_); 267 NotifyRead(num_bytes_read); 268 } 269 270 in_two_phase_read_ = false; 271 two_phase_max_bytes_read_ = 0; 272 273 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 274 if (!new_state.equals(old_state)) 275 awakable_list_.AwakeForStateChange(new_state); 276 277 return rv; 278 } 279 280 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { 281 base::AutoLock lock(lock_); 282 return GetHandleSignalsStateNoLock(); 283 } 284 285 MojoResult DataPipeConsumerDispatcher::AddAwakable( 286 Awakable* awakable, 287 MojoHandleSignals signals, 288 uintptr_t context, 289 HandleSignalsState* signals_state) { 290 base::AutoLock lock(lock_); 291 if (!shared_ring_buffer_ || in_transit_) { 292 if (signals_state) 293 *signals_state = HandleSignalsState(); 294 return MOJO_RESULT_INVALID_ARGUMENT; 295 } 296 UpdateSignalsStateNoLock(); 297 HandleSignalsState state = GetHandleSignalsStateNoLock(); 298 if (state.satisfies(signals)) { 299 if (signals_state) 300 *signals_state = state; 301 return MOJO_RESULT_ALREADY_EXISTS; 302 } 303 if (!state.can_satisfy(signals)) { 304 if (signals_state) 305 *signals_state = state; 306 return MOJO_RESULT_FAILED_PRECONDITION; 307 } 308 309 awakable_list_.Add(awakable, signals, context); 310 return MOJO_RESULT_OK; 311 } 312 313 void DataPipeConsumerDispatcher::RemoveAwakable( 314 Awakable* awakable, 315 HandleSignalsState* signals_state) { 316 base::AutoLock lock(lock_); 317 if ((!shared_ring_buffer_ || in_transit_) && signals_state) 318 *signals_state = HandleSignalsState(); 319 else if (signals_state) 320 *signals_state = GetHandleSignalsStateNoLock(); 321 awakable_list_.Remove(awakable); 322 } 323 324 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes, 325 uint32_t* num_ports, 326 uint32_t* num_handles) { 327 base::AutoLock lock(lock_); 328 DCHECK(in_transit_); 329 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); 330 *num_ports = 1; 331 *num_handles = 1; 332 } 333 334 bool DataPipeConsumerDispatcher::EndSerialize( 335 void* destination, 336 ports::PortName* ports, 337 PlatformHandle* platform_handles) { 338 SerializedState* state = static_cast<SerializedState*>(destination); 339 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); 340 memset(state->padding, 0, sizeof(state->padding)); 341 342 base::AutoLock lock(lock_); 343 DCHECK(in_transit_); 344 state->pipe_id = pipe_id_; 345 state->read_offset = read_offset_; 346 state->bytes_available = bytes_available_; 347 state->flags = peer_closed_ ? kFlagPeerClosed : 0; 348 349 ports[0] = control_port_.name(); 350 351 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); 352 platform_handles[0] = buffer_handle_for_transit_.get(); 353 354 return true; 355 } 356 357 bool DataPipeConsumerDispatcher::BeginTransit() { 358 base::AutoLock lock(lock_); 359 if (in_transit_) 360 return false; 361 in_transit_ = !in_two_phase_read_; 362 return in_transit_; 363 } 364 365 void DataPipeConsumerDispatcher::CompleteTransitAndClose() { 366 node_controller_->SetPortObserver(control_port_, nullptr); 367 368 base::AutoLock lock(lock_); 369 DCHECK(in_transit_); 370 in_transit_ = false; 371 transferred_ = true; 372 ignore_result(buffer_handle_for_transit_.release()); 373 CloseNoLock(); 374 } 375 376 void DataPipeConsumerDispatcher::CancelTransit() { 377 base::AutoLock lock(lock_); 378 DCHECK(in_transit_); 379 in_transit_ = false; 380 buffer_handle_for_transit_.reset(); 381 UpdateSignalsStateNoLock(); 382 } 383 384 // static 385 scoped_refptr<DataPipeConsumerDispatcher> 386 DataPipeConsumerDispatcher::Deserialize(const void* data, 387 size_t num_bytes, 388 const ports::PortName* ports, 389 size_t num_ports, 390 PlatformHandle* handles, 391 size_t num_handles) { 392 if (num_ports != 1 || num_handles != 1 || 393 num_bytes != sizeof(SerializedState)) { 394 return nullptr; 395 } 396 397 const SerializedState* state = static_cast<const SerializedState*>(data); 398 399 NodeController* node_controller = internal::g_core->GetNodeController(); 400 ports::PortRef port; 401 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) 402 return nullptr; 403 404 PlatformHandle buffer_handle; 405 std::swap(buffer_handle, handles[0]); 406 scoped_refptr<PlatformSharedBuffer> ring_buffer = 407 PlatformSharedBuffer::CreateFromPlatformHandle( 408 state->options.capacity_num_bytes, 409 false /* read_only */, 410 ScopedPlatformHandle(buffer_handle)); 411 if (!ring_buffer) { 412 DLOG(ERROR) << "Failed to deserialize shared buffer handle."; 413 return nullptr; 414 } 415 416 scoped_refptr<DataPipeConsumerDispatcher> dispatcher = 417 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, 418 state->options, false /* initialized */, 419 state->pipe_id); 420 421 { 422 base::AutoLock lock(dispatcher->lock_); 423 dispatcher->read_offset_ = state->read_offset; 424 dispatcher->bytes_available_ = state->bytes_available; 425 dispatcher->new_data_available_ = state->bytes_available > 0; 426 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; 427 dispatcher->InitializeNoLock(); 428 dispatcher->UpdateSignalsStateNoLock(); 429 } 430 431 return dispatcher; 432 } 433 434 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { 435 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && 436 !in_transit_); 437 } 438 439 void DataPipeConsumerDispatcher::InitializeNoLock() { 440 lock_.AssertAcquired(); 441 442 if (shared_ring_buffer_) { 443 DCHECK(!ring_buffer_mapping_); 444 ring_buffer_mapping_ = 445 shared_ring_buffer_->Map(0, options_.capacity_num_bytes); 446 if (!ring_buffer_mapping_) { 447 DLOG(ERROR) << "Failed to map shared buffer."; 448 shared_ring_buffer_ = nullptr; 449 } 450 } 451 452 base::AutoUnlock unlock(lock_); 453 node_controller_->SetPortObserver( 454 control_port_, 455 make_scoped_refptr(new PortObserverThunk(this))); 456 } 457 458 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { 459 lock_.AssertAcquired(); 460 if (is_closed_ || in_transit_) 461 return MOJO_RESULT_INVALID_ARGUMENT; 462 is_closed_ = true; 463 ring_buffer_mapping_.reset(); 464 shared_ring_buffer_ = nullptr; 465 466 awakable_list_.CancelAll(); 467 if (!transferred_) { 468 base::AutoUnlock unlock(lock_); 469 node_controller_->ClosePort(control_port_); 470 } 471 472 return MOJO_RESULT_OK; 473 } 474 475 HandleSignalsState 476 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { 477 lock_.AssertAcquired(); 478 479 HandleSignalsState rv; 480 if (shared_ring_buffer_ && bytes_available_) { 481 if (!in_two_phase_read_) { 482 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; 483 if (new_data_available_) 484 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; 485 } 486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 487 } else if (!peer_closed_ && shared_ring_buffer_) { 488 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 489 } 490 491 if (shared_ring_buffer_) { 492 if (new_data_available_ || !peer_closed_) 493 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; 494 } 495 496 if (peer_closed_) 497 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 498 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 499 500 return rv; 501 } 502 503 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { 504 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " 505 << num_bytes << " bytes read. [control_port=" 506 << control_port_.name() << "]"; 507 508 SendDataPipeControlMessage(node_controller_, control_port_, 509 DataPipeCommand::DATA_WAS_READ, num_bytes); 510 } 511 512 void DataPipeConsumerDispatcher::OnPortStatusChanged() { 513 DCHECK(RequestContext::current()); 514 515 base::AutoLock lock(lock_); 516 517 // We stop observing the control port as soon it's transferred, but this can 518 // race with events which are raised right before that happens. This is fine 519 // to ignore. 520 if (transferred_) 521 return; 522 523 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; 524 525 UpdateSignalsStateNoLock(); 526 } 527 528 void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { 529 lock_.AssertAcquired(); 530 531 bool was_peer_closed = peer_closed_; 532 size_t previous_bytes_available = bytes_available_; 533 534 ports::PortStatus port_status; 535 int rv = node_controller_->node()->GetStatus(control_port_, &port_status); 536 if (rv != ports::OK || !port_status.receiving_messages) { 537 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure" 538 << " [control_port=" << control_port_.name() << "]"; 539 peer_closed_ = true; 540 } else if (rv == ports::OK && port_status.has_messages && !in_transit_) { 541 ports::ScopedMessage message; 542 do { 543 int rv = node_controller_->node()->GetMessage( 544 control_port_, &message, nullptr); 545 if (rv != ports::OK) 546 peer_closed_ = true; 547 if (message) { 548 if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) { 549 peer_closed_ = true; 550 break; 551 } 552 553 const DataPipeControlMessage* m = 554 static_cast<const DataPipeControlMessage*>( 555 message->payload_bytes()); 556 557 if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) { 558 DLOG(ERROR) << "Unexpected control message from producer."; 559 peer_closed_ = true; 560 break; 561 } 562 563 if (static_cast<size_t>(bytes_available_) + m->num_bytes > 564 options_.capacity_num_bytes) { 565 DLOG(ERROR) << "Producer claims to have written too many bytes."; 566 peer_closed_ = true; 567 break; 568 } 569 570 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " 571 << m->num_bytes << " bytes were written. [control_port=" 572 << control_port_.name() << "]"; 573 574 bytes_available_ += m->num_bytes; 575 } 576 } while (message); 577 } 578 579 bool has_new_data = bytes_available_ != previous_bytes_available; 580 if (has_new_data) 581 new_data_available_ = true; 582 583 if (peer_closed_ != was_peer_closed || has_new_data) { 584 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 585 } 586 } 587 588 } // namespace edk 589 } // namespace mojo 590