1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" 6 7 #include <stddef.h> 8 #include <stdint.h> 9 10 #include <algorithm> 11 #include <limits> 12 #include <utility> 13 14 #include "base/bind.h" 15 #include "base/logging.h" 16 #include "base/memory/ref_counted.h" 17 #include "base/message_loop/message_loop.h" 18 #include "mojo/edk/embedder/embedder_internal.h" 19 #include "mojo/edk/embedder/platform_shared_buffer.h" 20 #include "mojo/edk/system/core.h" 21 #include "mojo/edk/system/data_pipe_control_message.h" 22 #include "mojo/edk/system/node_controller.h" 23 #include "mojo/edk/system/ports_message.h" 24 #include "mojo/edk/system/request_context.h" 25 #include "mojo/public/c/system/data_pipe.h" 26 27 namespace mojo { 28 namespace edk { 29 30 namespace { 31 32 const uint8_t kFlagPeerClosed = 0x01; 33 34 #pragma pack(push, 1) 35 36 struct SerializedState { 37 MojoCreateDataPipeOptions options; 38 uint64_t pipe_id; 39 uint32_t read_offset; 40 uint32_t bytes_available; 41 uint8_t flags; 42 char padding[7]; 43 }; 44 45 static_assert(sizeof(SerializedState) % 8 == 0, 46 "Invalid SerializedState size."); 47 48 #pragma pack(pop) 49 50 } // namespace 51 52 // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a 53 // reference to the dispatcher to ensure it lives as long as the observed port. 54 class DataPipeConsumerDispatcher::PortObserverThunk 55 : public NodeController::PortObserver { 56 public: 57 explicit PortObserverThunk( 58 scoped_refptr<DataPipeConsumerDispatcher> dispatcher) 59 : dispatcher_(dispatcher) {} 60 61 private: 62 ~PortObserverThunk() override {} 63 64 // NodeController::PortObserver: 65 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } 66 67 scoped_refptr<DataPipeConsumerDispatcher> dispatcher_; 68 69 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); 70 }; 71 72 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( 73 NodeController* node_controller, 74 const ports::PortRef& control_port, 75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, 76 const MojoCreateDataPipeOptions& options, 77 bool initialized, 78 uint64_t pipe_id) 79 : options_(options), 80 node_controller_(node_controller), 81 control_port_(control_port), 82 pipe_id_(pipe_id), 83 shared_ring_buffer_(shared_ring_buffer) { 84 if (initialized) { 85 base::AutoLock lock(lock_); 86 InitializeNoLock(); 87 } 88 } 89 90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { 91 return Type::DATA_PIPE_CONSUMER; 92 } 93 94 MojoResult DataPipeConsumerDispatcher::Close() { 95 base::AutoLock lock(lock_); 96 DVLOG(1) << "Closing data pipe consumer " << pipe_id_; 97 return CloseNoLock(); 98 } 99 100 101 MojoResult DataPipeConsumerDispatcher::Watch( 102 MojoHandleSignals signals, 103 const Watcher::WatchCallback& callback, 104 uintptr_t context) { 105 base::AutoLock lock(lock_); 106 107 if (is_closed_ || in_transit_) 108 return MOJO_RESULT_INVALID_ARGUMENT; 109 110 return awakable_list_.AddWatcher( 111 signals, callback, context, GetHandleSignalsStateNoLock()); 112 } 113 114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) { 115 base::AutoLock lock(lock_); 116 117 if (is_closed_ || in_transit_) 118 return MOJO_RESULT_INVALID_ARGUMENT; 119 120 return awakable_list_.RemoveWatcher(context); 121 } 122 123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, 124 uint32_t* num_bytes, 125 MojoReadDataFlags flags) { 126 base::AutoLock lock(lock_); 127 if (!shared_ring_buffer_ || in_transit_) 128 return MOJO_RESULT_INVALID_ARGUMENT; 129 130 if (in_two_phase_read_) 131 return MOJO_RESULT_BUSY; 132 133 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { 134 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || 135 (flags & MOJO_READ_DATA_FLAG_DISCARD)) 136 return MOJO_RESULT_INVALID_ARGUMENT; 137 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. 138 DVLOG_IF(2, elements) 139 << "Query mode: ignoring non-null |elements|"; 140 *num_bytes = static_cast<uint32_t>(bytes_available_); 141 return MOJO_RESULT_OK; 142 } 143 144 bool discard = false; 145 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { 146 // These flags are mutally exclusive. 147 if (flags & MOJO_READ_DATA_FLAG_PEEK) 148 return MOJO_RESULT_INVALID_ARGUMENT; 149 DVLOG_IF(2, elements) 150 << "Discard mode: ignoring non-null |elements|"; 151 discard = true; 152 } 153 154 uint32_t max_num_bytes_to_read = *num_bytes; 155 if (max_num_bytes_to_read % options_.element_num_bytes != 0) 156 return MOJO_RESULT_INVALID_ARGUMENT; 157 158 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; 159 uint32_t min_num_bytes_to_read = 160 all_or_none ? max_num_bytes_to_read : 0; 161 162 if (min_num_bytes_to_read > bytes_available_) { 163 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 164 : MOJO_RESULT_OUT_OF_RANGE; 165 } 166 167 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); 168 if (bytes_to_read == 0) { 169 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 170 : MOJO_RESULT_SHOULD_WAIT; 171 } 172 173 if (!discard) { 174 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 175 CHECK(data); 176 177 uint8_t* destination = static_cast<uint8_t*>(elements); 178 CHECK(destination); 179 180 DCHECK_LE(read_offset_, options_.capacity_num_bytes); 181 uint32_t tail_bytes_to_copy = 182 std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read); 183 uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy; 184 if (tail_bytes_to_copy > 0) 185 memcpy(destination, data + read_offset_, tail_bytes_to_copy); 186 if (head_bytes_to_copy > 0) 187 memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy); 188 } 189 *num_bytes = bytes_to_read; 190 191 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); 192 if (discard || !peek) { 193 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; 194 bytes_available_ -= bytes_to_read; 195 196 base::AutoUnlock unlock(lock_); 197 NotifyRead(bytes_to_read); 198 } 199 200 return MOJO_RESULT_OK; 201 } 202 203 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, 204 uint32_t* buffer_num_bytes, 205 MojoReadDataFlags flags) { 206 base::AutoLock lock(lock_); 207 if (!shared_ring_buffer_ || in_transit_) 208 return MOJO_RESULT_INVALID_ARGUMENT; 209 210 if (in_two_phase_read_) 211 return MOJO_RESULT_BUSY; 212 213 // These flags may not be used in two-phase mode. 214 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || 215 (flags & MOJO_READ_DATA_FLAG_QUERY) || 216 (flags & MOJO_READ_DATA_FLAG_PEEK)) 217 return MOJO_RESULT_INVALID_ARGUMENT; 218 219 if (bytes_available_ == 0) { 220 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 221 : MOJO_RESULT_SHOULD_WAIT; 222 } 223 224 DCHECK_LT(read_offset_, options_.capacity_num_bytes); 225 uint32_t bytes_to_read = std::min(bytes_available_, 226 options_.capacity_num_bytes - read_offset_); 227 228 CHECK(ring_buffer_mapping_); 229 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 230 CHECK(data); 231 232 in_two_phase_read_ = true; 233 *buffer = data + read_offset_; 234 *buffer_num_bytes = bytes_to_read; 235 two_phase_max_bytes_read_ = bytes_to_read; 236 237 return MOJO_RESULT_OK; 238 } 239 240 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { 241 base::AutoLock lock(lock_); 242 if (!in_two_phase_read_) 243 return MOJO_RESULT_FAILED_PRECONDITION; 244 245 if (in_transit_) 246 return MOJO_RESULT_INVALID_ARGUMENT; 247 248 CHECK(shared_ring_buffer_); 249 250 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); 251 MojoResult rv; 252 if (num_bytes_read > two_phase_max_bytes_read_ || 253 num_bytes_read % options_.element_num_bytes != 0) { 254 rv = MOJO_RESULT_INVALID_ARGUMENT; 255 } else { 256 rv = MOJO_RESULT_OK; 257 read_offset_ = 258 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes; 259 260 DCHECK_GE(bytes_available_, num_bytes_read); 261 bytes_available_ -= num_bytes_read; 262 263 base::AutoUnlock unlock(lock_); 264 NotifyRead(num_bytes_read); 265 } 266 267 in_two_phase_read_ = false; 268 two_phase_max_bytes_read_ = 0; 269 270 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 271 if (!new_state.equals(old_state)) 272 awakable_list_.AwakeForStateChange(new_state); 273 274 return rv; 275 } 276 277 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { 278 base::AutoLock lock(lock_); 279 return GetHandleSignalsStateNoLock(); 280 } 281 282 MojoResult DataPipeConsumerDispatcher::AddAwakable( 283 Awakable* awakable, 284 MojoHandleSignals signals, 285 uintptr_t context, 286 HandleSignalsState* signals_state) { 287 base::AutoLock lock(lock_); 288 if (!shared_ring_buffer_ || in_transit_) { 289 if (signals_state) 290 *signals_state = HandleSignalsState(); 291 return MOJO_RESULT_INVALID_ARGUMENT; 292 } 293 UpdateSignalsStateNoLock(); 294 HandleSignalsState state = GetHandleSignalsStateNoLock(); 295 if (state.satisfies(signals)) { 296 if (signals_state) 297 *signals_state = state; 298 return MOJO_RESULT_ALREADY_EXISTS; 299 } 300 if (!state.can_satisfy(signals)) { 301 if (signals_state) 302 *signals_state = state; 303 return MOJO_RESULT_FAILED_PRECONDITION; 304 } 305 306 awakable_list_.Add(awakable, signals, context); 307 return MOJO_RESULT_OK; 308 } 309 310 void DataPipeConsumerDispatcher::RemoveAwakable( 311 Awakable* awakable, 312 HandleSignalsState* signals_state) { 313 base::AutoLock lock(lock_); 314 if ((!shared_ring_buffer_ || in_transit_) && signals_state) 315 *signals_state = HandleSignalsState(); 316 else if (signals_state) 317 *signals_state = GetHandleSignalsStateNoLock(); 318 awakable_list_.Remove(awakable); 319 } 320 321 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes, 322 uint32_t* num_ports, 323 uint32_t* num_handles) { 324 base::AutoLock lock(lock_); 325 DCHECK(in_transit_); 326 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); 327 *num_ports = 1; 328 *num_handles = 1; 329 } 330 331 bool DataPipeConsumerDispatcher::EndSerialize( 332 void* destination, 333 ports::PortName* ports, 334 PlatformHandle* platform_handles) { 335 SerializedState* state = static_cast<SerializedState*>(destination); 336 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); 337 memset(state->padding, 0, sizeof(state->padding)); 338 339 base::AutoLock lock(lock_); 340 DCHECK(in_transit_); 341 state->pipe_id = pipe_id_; 342 state->read_offset = read_offset_; 343 state->bytes_available = bytes_available_; 344 state->flags = peer_closed_ ? kFlagPeerClosed : 0; 345 346 ports[0] = control_port_.name(); 347 348 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); 349 platform_handles[0] = buffer_handle_for_transit_.get(); 350 351 return true; 352 } 353 354 bool DataPipeConsumerDispatcher::BeginTransit() { 355 base::AutoLock lock(lock_); 356 if (in_transit_) 357 return false; 358 in_transit_ = !in_two_phase_read_; 359 return in_transit_; 360 } 361 362 void DataPipeConsumerDispatcher::CompleteTransitAndClose() { 363 node_controller_->SetPortObserver(control_port_, nullptr); 364 365 base::AutoLock lock(lock_); 366 DCHECK(in_transit_); 367 in_transit_ = false; 368 transferred_ = true; 369 ignore_result(buffer_handle_for_transit_.release()); 370 CloseNoLock(); 371 } 372 373 void DataPipeConsumerDispatcher::CancelTransit() { 374 base::AutoLock lock(lock_); 375 DCHECK(in_transit_); 376 in_transit_ = false; 377 buffer_handle_for_transit_.reset(); 378 UpdateSignalsStateNoLock(); 379 } 380 381 // static 382 scoped_refptr<DataPipeConsumerDispatcher> 383 DataPipeConsumerDispatcher::Deserialize(const void* data, 384 size_t num_bytes, 385 const ports::PortName* ports, 386 size_t num_ports, 387 PlatformHandle* handles, 388 size_t num_handles) { 389 if (num_ports != 1 || num_handles != 1 || 390 num_bytes != sizeof(SerializedState)) { 391 return nullptr; 392 } 393 394 const SerializedState* state = static_cast<const SerializedState*>(data); 395 396 NodeController* node_controller = internal::g_core->GetNodeController(); 397 ports::PortRef port; 398 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) 399 return nullptr; 400 401 PlatformHandle buffer_handle; 402 std::swap(buffer_handle, handles[0]); 403 scoped_refptr<PlatformSharedBuffer> ring_buffer = 404 PlatformSharedBuffer::CreateFromPlatformHandle( 405 state->options.capacity_num_bytes, 406 false /* read_only */, 407 ScopedPlatformHandle(buffer_handle)); 408 if (!ring_buffer) { 409 DLOG(ERROR) << "Failed to deserialize shared buffer handle."; 410 return nullptr; 411 } 412 413 scoped_refptr<DataPipeConsumerDispatcher> dispatcher = 414 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, 415 state->options, false /* initialized */, 416 state->pipe_id); 417 418 { 419 base::AutoLock lock(dispatcher->lock_); 420 dispatcher->read_offset_ = state->read_offset; 421 dispatcher->bytes_available_ = state->bytes_available; 422 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; 423 dispatcher->InitializeNoLock(); 424 } 425 426 return dispatcher; 427 } 428 429 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { 430 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && 431 !in_transit_); 432 } 433 434 void DataPipeConsumerDispatcher::InitializeNoLock() { 435 lock_.AssertAcquired(); 436 437 if (shared_ring_buffer_) { 438 DCHECK(!ring_buffer_mapping_); 439 ring_buffer_mapping_ = 440 shared_ring_buffer_->Map(0, options_.capacity_num_bytes); 441 if (!ring_buffer_mapping_) { 442 DLOG(ERROR) << "Failed to map shared buffer."; 443 shared_ring_buffer_ = nullptr; 444 } 445 } 446 447 base::AutoUnlock unlock(lock_); 448 node_controller_->SetPortObserver( 449 control_port_, 450 make_scoped_refptr(new PortObserverThunk(this))); 451 } 452 453 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { 454 lock_.AssertAcquired(); 455 if (is_closed_ || in_transit_) 456 return MOJO_RESULT_INVALID_ARGUMENT; 457 is_closed_ = true; 458 ring_buffer_mapping_.reset(); 459 shared_ring_buffer_ = nullptr; 460 461 awakable_list_.CancelAll(); 462 if (!transferred_) { 463 base::AutoUnlock unlock(lock_); 464 node_controller_->ClosePort(control_port_); 465 } 466 467 return MOJO_RESULT_OK; 468 } 469 470 HandleSignalsState 471 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { 472 lock_.AssertAcquired(); 473 474 HandleSignalsState rv; 475 if (shared_ring_buffer_ && bytes_available_) { 476 if (!in_two_phase_read_) 477 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; 478 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 479 } else if (!peer_closed_ && shared_ring_buffer_) { 480 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 481 } 482 483 if (peer_closed_) 484 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 485 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 486 return rv; 487 } 488 489 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { 490 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " 491 << num_bytes << " bytes read. [control_port=" 492 << control_port_.name() << "]"; 493 494 SendDataPipeControlMessage(node_controller_, control_port_, 495 DataPipeCommand::DATA_WAS_READ, num_bytes); 496 } 497 498 void DataPipeConsumerDispatcher::OnPortStatusChanged() { 499 DCHECK(RequestContext::current()); 500 501 base::AutoLock lock(lock_); 502 503 // We stop observing the control port as soon it's transferred, but this can 504 // race with events which are raised right before that happens. This is fine 505 // to ignore. 506 if (transferred_) 507 return; 508 509 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; 510 511 UpdateSignalsStateNoLock(); 512 } 513 514 void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { 515 lock_.AssertAcquired(); 516 517 bool was_peer_closed = peer_closed_; 518 size_t previous_bytes_available = bytes_available_; 519 520 ports::PortStatus port_status; 521 int rv = node_controller_->node()->GetStatus(control_port_, &port_status); 522 if (rv != ports::OK || !port_status.receiving_messages) { 523 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure" 524 << " [control_port=" << control_port_.name() << "]"; 525 peer_closed_ = true; 526 } else if (rv == ports::OK && port_status.has_messages && !in_transit_) { 527 ports::ScopedMessage message; 528 do { 529 int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr, 530 &message); 531 if (rv != ports::OK) 532 peer_closed_ = true; 533 if (message) { 534 if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) { 535 peer_closed_ = true; 536 break; 537 } 538 539 const DataPipeControlMessage* m = 540 static_cast<const DataPipeControlMessage*>( 541 message->payload_bytes()); 542 543 if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) { 544 DLOG(ERROR) << "Unexpected control message from producer."; 545 peer_closed_ = true; 546 break; 547 } 548 549 if (static_cast<size_t>(bytes_available_) + m->num_bytes > 550 options_.capacity_num_bytes) { 551 DLOG(ERROR) << "Producer claims to have written too many bytes."; 552 peer_closed_ = true; 553 break; 554 } 555 556 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " 557 << m->num_bytes << " bytes were written. [control_port=" 558 << control_port_.name() << "]"; 559 560 bytes_available_ += m->num_bytes; 561 } 562 } while (message); 563 } 564 565 if (peer_closed_ != was_peer_closed || 566 bytes_available_ != previous_bytes_available) { 567 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 568 } 569 } 570 571 } // namespace edk 572 } // namespace mojo 573