1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/core/data_pipe_producer_dispatcher.h" 6 7 #include <stddef.h> 8 #include <stdint.h> 9 10 #include <utility> 11 12 #include "base/bind.h" 13 #include "base/logging.h" 14 #include "base/memory/ref_counted.h" 15 #include "mojo/core/configuration.h" 16 #include "mojo/core/core.h" 17 #include "mojo/core/data_pipe_control_message.h" 18 #include "mojo/core/node_controller.h" 19 #include "mojo/core/platform_handle_utils.h" 20 #include "mojo/core/request_context.h" 21 #include "mojo/core/user_message_impl.h" 22 #include "mojo/public/c/system/data_pipe.h" 23 24 namespace mojo { 25 namespace core { 26 27 namespace { 28 29 const uint8_t kFlagPeerClosed = 0x01; 30 31 #pragma pack(push, 1) 32 33 struct SerializedState { 34 MojoCreateDataPipeOptions options; 35 uint64_t pipe_id; 36 uint32_t write_offset; 37 uint32_t available_capacity; 38 uint8_t flags; 39 uint64_t buffer_guid_high; 40 uint64_t buffer_guid_low; 41 char padding[7]; 42 }; 43 44 static_assert(sizeof(SerializedState) % 8 == 0, 45 "Invalid SerializedState size."); 46 47 #pragma pack(pop) 48 49 } // namespace 50 51 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a 52 // reference to the dispatcher to ensure it lives as long as the observed port. 53 class DataPipeProducerDispatcher::PortObserverThunk 54 : public NodeController::PortObserver { 55 public: 56 explicit PortObserverThunk( 57 scoped_refptr<DataPipeProducerDispatcher> dispatcher) 58 : dispatcher_(dispatcher) {} 59 60 private: 61 ~PortObserverThunk() override {} 62 63 // NodeController::PortObserver: 64 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } 65 66 scoped_refptr<DataPipeProducerDispatcher> dispatcher_; 67 68 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); 69 }; 70 71 // static 72 scoped_refptr<DataPipeProducerDispatcher> DataPipeProducerDispatcher::Create( 73 NodeController* node_controller, 74 const ports::PortRef& control_port, 75 base::UnsafeSharedMemoryRegion shared_ring_buffer, 76 const MojoCreateDataPipeOptions& options, 77 uint64_t pipe_id) { 78 scoped_refptr<DataPipeProducerDispatcher> producer = 79 new DataPipeProducerDispatcher(node_controller, control_port, 80 std::move(shared_ring_buffer), options, 81 pipe_id); 82 base::AutoLock lock(producer->lock_); 83 if (!producer->InitializeNoLock()) 84 return nullptr; 85 return producer; 86 } 87 88 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { 89 return Type::DATA_PIPE_PRODUCER; 90 } 91 92 MojoResult DataPipeProducerDispatcher::Close() { 93 base::AutoLock lock(lock_); 94 DVLOG(1) << "Closing data pipe producer " << pipe_id_; 95 return CloseNoLock(); 96 } 97 98 MojoResult DataPipeProducerDispatcher::WriteData( 99 const void* elements, 100 uint32_t* num_bytes, 101 const MojoWriteDataOptions& options) { 102 base::AutoLock lock(lock_); 103 if (!shared_ring_buffer_.IsValid() || in_transit_) 104 return MOJO_RESULT_INVALID_ARGUMENT; 105 106 if (in_two_phase_write_) 107 return MOJO_RESULT_BUSY; 108 109 if (peer_closed_) 110 return MOJO_RESULT_FAILED_PRECONDITION; 111 112 if (*num_bytes % options_.element_num_bytes != 0) 113 return MOJO_RESULT_INVALID_ARGUMENT; 114 if (*num_bytes == 0) 115 return MOJO_RESULT_OK; // Nothing to do. 116 117 if ((options.flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) && 118 (*num_bytes > available_capacity_)) { 119 // Don't return "should wait" since you can't wait for a specified amount of 120 // data. 121 return MOJO_RESULT_OUT_OF_RANGE; 122 } 123 124 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); 125 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); 126 if (num_bytes_to_write == 0) 127 return MOJO_RESULT_SHOULD_WAIT; 128 129 *num_bytes = num_bytes_to_write; 130 131 CHECK(ring_buffer_mapping_.IsValid()); 132 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_.memory()); 133 CHECK(data); 134 135 const uint8_t* source = static_cast<const uint8_t*>(elements); 136 CHECK(source); 137 138 DCHECK_LE(write_offset_, options_.capacity_num_bytes); 139 uint32_t tail_bytes_to_write = 140 std::min(options_.capacity_num_bytes - write_offset_, num_bytes_to_write); 141 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; 142 143 DCHECK_GT(tail_bytes_to_write, 0u); 144 memcpy(data + write_offset_, source, tail_bytes_to_write); 145 if (head_bytes_to_write > 0) 146 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); 147 148 DCHECK_LE(num_bytes_to_write, available_capacity_); 149 available_capacity_ -= num_bytes_to_write; 150 write_offset_ = 151 (write_offset_ + num_bytes_to_write) % options_.capacity_num_bytes; 152 153 watchers_.NotifyState(GetHandleSignalsStateNoLock()); 154 155 base::AutoUnlock unlock(lock_); 156 NotifyWrite(num_bytes_to_write); 157 158 return MOJO_RESULT_OK; 159 } 160 161 MojoResult DataPipeProducerDispatcher::BeginWriteData( 162 void** buffer, 163 uint32_t* buffer_num_bytes) { 164 base::AutoLock lock(lock_); 165 if (!shared_ring_buffer_.IsValid() || in_transit_) 166 return MOJO_RESULT_INVALID_ARGUMENT; 167 168 if (in_two_phase_write_) 169 return MOJO_RESULT_BUSY; 170 if (peer_closed_) 171 return MOJO_RESULT_FAILED_PRECONDITION; 172 173 if (available_capacity_ == 0) { 174 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 175 : MOJO_RESULT_SHOULD_WAIT; 176 } 177 178 in_two_phase_write_ = true; 179 *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_, 180 available_capacity_); 181 DCHECK_GT(*buffer_num_bytes, 0u); 182 183 CHECK(ring_buffer_mapping_.IsValid()); 184 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_.memory()); 185 *buffer = data + write_offset_; 186 187 return MOJO_RESULT_OK; 188 } 189 190 MojoResult DataPipeProducerDispatcher::EndWriteData( 191 uint32_t num_bytes_written) { 192 base::AutoLock lock(lock_); 193 if (is_closed_ || in_transit_) 194 return MOJO_RESULT_INVALID_ARGUMENT; 195 196 if (!in_two_phase_write_) 197 return MOJO_RESULT_FAILED_PRECONDITION; 198 199 // Note: Allow successful completion of the two-phase write even if the other 200 // side has been closed. 201 MojoResult rv = MOJO_RESULT_OK; 202 if (num_bytes_written > available_capacity_ || 203 num_bytes_written % options_.element_num_bytes != 0 || 204 write_offset_ + num_bytes_written > options_.capacity_num_bytes) { 205 rv = MOJO_RESULT_INVALID_ARGUMENT; 206 } else { 207 DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes); 208 available_capacity_ -= num_bytes_written; 209 write_offset_ = 210 (write_offset_ + num_bytes_written) % options_.capacity_num_bytes; 211 212 base::AutoUnlock unlock(lock_); 213 NotifyWrite(num_bytes_written); 214 } 215 216 in_two_phase_write_ = false; 217 218 // If we're now writable, we *became* writable (since we weren't writable 219 // during the two-phase write), so notify watchers. 220 watchers_.NotifyState(GetHandleSignalsStateNoLock()); 221 222 return rv; 223 } 224 225 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { 226 base::AutoLock lock(lock_); 227 return GetHandleSignalsStateNoLock(); 228 } 229 230 MojoResult DataPipeProducerDispatcher::AddWatcherRef( 231 const scoped_refptr<WatcherDispatcher>& watcher, 232 uintptr_t context) { 233 base::AutoLock lock(lock_); 234 if (is_closed_ || in_transit_) 235 return MOJO_RESULT_INVALID_ARGUMENT; 236 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); 237 } 238 239 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( 240 WatcherDispatcher* watcher, 241 uintptr_t context) { 242 base::AutoLock lock(lock_); 243 if (is_closed_ || in_transit_) 244 return MOJO_RESULT_INVALID_ARGUMENT; 245 return watchers_.Remove(watcher, context); 246 } 247 248 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, 249 uint32_t* num_ports, 250 uint32_t* num_handles) { 251 base::AutoLock lock(lock_); 252 DCHECK(in_transit_); 253 *num_bytes = sizeof(SerializedState); 254 *num_ports = 1; 255 *num_handles = 1; 256 } 257 258 bool DataPipeProducerDispatcher::EndSerialize( 259 void* destination, 260 ports::PortName* ports, 261 PlatformHandle* platform_handles) { 262 SerializedState* state = static_cast<SerializedState*>(destination); 263 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); 264 memset(state->padding, 0, sizeof(state->padding)); 265 266 base::AutoLock lock(lock_); 267 DCHECK(in_transit_); 268 state->pipe_id = pipe_id_; 269 state->write_offset = write_offset_; 270 state->available_capacity = available_capacity_; 271 state->flags = peer_closed_ ? kFlagPeerClosed : 0; 272 273 auto region_handle = 274 base::UnsafeSharedMemoryRegion::TakeHandleForSerialization( 275 std::move(shared_ring_buffer_)); 276 const base::UnguessableToken& guid = region_handle.GetGUID(); 277 state->buffer_guid_high = guid.GetHighForSerialization(); 278 state->buffer_guid_low = guid.GetLowForSerialization(); 279 280 ports[0] = control_port_.name(); 281 282 PlatformHandle handle; 283 PlatformHandle ignored_handle; 284 ExtractPlatformHandlesFromSharedMemoryRegionHandle( 285 region_handle.PassPlatformHandle(), &handle, &ignored_handle); 286 if (!handle.is_valid() || ignored_handle.is_valid()) 287 return false; 288 289 platform_handles[0] = std::move(handle); 290 return true; 291 } 292 293 bool DataPipeProducerDispatcher::BeginTransit() { 294 base::AutoLock lock(lock_); 295 if (in_transit_) 296 return false; 297 in_transit_ = !in_two_phase_write_; 298 return in_transit_; 299 } 300 301 void DataPipeProducerDispatcher::CompleteTransitAndClose() { 302 node_controller_->SetPortObserver(control_port_, nullptr); 303 304 base::AutoLock lock(lock_); 305 DCHECK(in_transit_); 306 transferred_ = true; 307 in_transit_ = false; 308 CloseNoLock(); 309 } 310 311 void DataPipeProducerDispatcher::CancelTransit() { 312 base::AutoLock lock(lock_); 313 DCHECK(in_transit_); 314 in_transit_ = false; 315 316 HandleSignalsState state = GetHandleSignalsStateNoLock(); 317 watchers_.NotifyState(state); 318 } 319 320 // static 321 scoped_refptr<DataPipeProducerDispatcher> 322 DataPipeProducerDispatcher::Deserialize(const void* data, 323 size_t num_bytes, 324 const ports::PortName* ports, 325 size_t num_ports, 326 PlatformHandle* handles, 327 size_t num_handles) { 328 if (num_ports != 1 || num_handles != 1 || 329 num_bytes != sizeof(SerializedState)) { 330 return nullptr; 331 } 332 333 const SerializedState* state = static_cast<const SerializedState*>(data); 334 if (!state->options.capacity_num_bytes || !state->options.element_num_bytes || 335 state->options.capacity_num_bytes < state->options.element_num_bytes) { 336 return nullptr; 337 } 338 339 NodeController* node_controller = Core::Get()->GetNodeController(); 340 ports::PortRef port; 341 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) 342 return nullptr; 343 344 auto region_handle = CreateSharedMemoryRegionHandleFromPlatformHandles( 345 std::move(handles[0]), PlatformHandle()); 346 auto region = base::subtle::PlatformSharedMemoryRegion::Take( 347 std::move(region_handle), 348 base::subtle::PlatformSharedMemoryRegion::Mode::kUnsafe, 349 state->options.capacity_num_bytes, 350 base::UnguessableToken::Deserialize(state->buffer_guid_high, 351 state->buffer_guid_low)); 352 auto ring_buffer = 353 base::UnsafeSharedMemoryRegion::Deserialize(std::move(region)); 354 if (!ring_buffer.IsValid()) { 355 DLOG(ERROR) << "Failed to deserialize shared buffer handle."; 356 return nullptr; 357 } 358 359 scoped_refptr<DataPipeProducerDispatcher> dispatcher = 360 new DataPipeProducerDispatcher(node_controller, port, 361 std::move(ring_buffer), state->options, 362 state->pipe_id); 363 364 { 365 base::AutoLock lock(dispatcher->lock_); 366 dispatcher->write_offset_ = state->write_offset; 367 dispatcher->available_capacity_ = state->available_capacity; 368 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; 369 if (!dispatcher->InitializeNoLock()) 370 return nullptr; 371 dispatcher->UpdateSignalsStateNoLock(); 372 } 373 374 return dispatcher; 375 } 376 377 DataPipeProducerDispatcher::DataPipeProducerDispatcher( 378 NodeController* node_controller, 379 const ports::PortRef& control_port, 380 base::UnsafeSharedMemoryRegion shared_ring_buffer, 381 const MojoCreateDataPipeOptions& options, 382 uint64_t pipe_id) 383 : options_(options), 384 node_controller_(node_controller), 385 control_port_(control_port), 386 pipe_id_(pipe_id), 387 watchers_(this), 388 shared_ring_buffer_(std::move(shared_ring_buffer)), 389 available_capacity_(options_.capacity_num_bytes) {} 390 391 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { 392 DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_.IsValid() && 393 !ring_buffer_mapping_.IsValid()); 394 } 395 396 bool DataPipeProducerDispatcher::InitializeNoLock() { 397 lock_.AssertAcquired(); 398 if (!shared_ring_buffer_.IsValid()) 399 return false; 400 401 DCHECK(!ring_buffer_mapping_.IsValid()); 402 ring_buffer_mapping_ = shared_ring_buffer_.Map(); 403 if (!ring_buffer_mapping_.IsValid()) { 404 DLOG(ERROR) << "Failed to map shared buffer."; 405 shared_ring_buffer_ = base::UnsafeSharedMemoryRegion(); 406 return false; 407 } 408 409 base::AutoUnlock unlock(lock_); 410 node_controller_->SetPortObserver( 411 control_port_, base::MakeRefCounted<PortObserverThunk>(this)); 412 413 return true; 414 } 415 416 MojoResult DataPipeProducerDispatcher::CloseNoLock() { 417 lock_.AssertAcquired(); 418 if (is_closed_ || in_transit_) 419 return MOJO_RESULT_INVALID_ARGUMENT; 420 is_closed_ = true; 421 ring_buffer_mapping_ = base::WritableSharedMemoryMapping(); 422 shared_ring_buffer_ = base::UnsafeSharedMemoryRegion(); 423 424 watchers_.NotifyClosed(); 425 if (!transferred_) { 426 base::AutoUnlock unlock(lock_); 427 node_controller_->ClosePort(control_port_); 428 } 429 430 return MOJO_RESULT_OK; 431 } 432 433 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() 434 const { 435 lock_.AssertAcquired(); 436 HandleSignalsState rv; 437 if (!peer_closed_) { 438 if (!in_two_phase_write_ && shared_ring_buffer_.IsValid() && 439 available_capacity_ > 0) 440 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 441 if (peer_remote_) 442 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE; 443 rv.satisfiable_signals |= 444 MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_REMOTE; 445 } else { 446 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 447 } 448 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 449 return rv; 450 } 451 452 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { 453 DVLOG(1) << "Data pipe producer " << pipe_id_ 454 << " notifying peer: " << num_bytes 455 << " bytes written. [control_port=" << control_port_.name() << "]"; 456 457 SendDataPipeControlMessage(node_controller_, control_port_, 458 DataPipeCommand::DATA_WAS_WRITTEN, num_bytes); 459 } 460 461 void DataPipeProducerDispatcher::OnPortStatusChanged() { 462 DCHECK(RequestContext::current()); 463 464 base::AutoLock lock(lock_); 465 466 // We stop observing the control port as soon it's transferred, but this can 467 // race with events which are raised right before that happens. This is fine 468 // to ignore. 469 if (transferred_) 470 return; 471 472 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; 473 474 UpdateSignalsStateNoLock(); 475 } 476 477 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() { 478 lock_.AssertAcquired(); 479 480 const bool was_peer_closed = peer_closed_; 481 const bool was_peer_remote = peer_remote_; 482 size_t previous_capacity = available_capacity_; 483 484 ports::PortStatus port_status; 485 int rv = node_controller_->node()->GetStatus(control_port_, &port_status); 486 peer_remote_ = rv == ports::OK && port_status.peer_remote; 487 if (rv != ports::OK || !port_status.receiving_messages) { 488 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure" 489 << " [control_port=" << control_port_.name() << "]"; 490 peer_closed_ = true; 491 } else if (rv == ports::OK && port_status.has_messages && !in_transit_) { 492 std::unique_ptr<ports::UserMessageEvent> message_event; 493 do { 494 int rv = node_controller_->node()->GetMessage(control_port_, 495 &message_event, nullptr); 496 if (rv != ports::OK) 497 peer_closed_ = true; 498 if (message_event) { 499 auto* message = message_event->GetMessage<UserMessageImpl>(); 500 if (message->user_payload_size() < sizeof(DataPipeControlMessage)) { 501 peer_closed_ = true; 502 break; 503 } 504 505 const DataPipeControlMessage* m = 506 static_cast<const DataPipeControlMessage*>(message->user_payload()); 507 508 if (m->command != DataPipeCommand::DATA_WAS_READ) { 509 DLOG(ERROR) << "Unexpected message from consumer."; 510 peer_closed_ = true; 511 break; 512 } 513 514 if (static_cast<size_t>(available_capacity_) + m->num_bytes > 515 options_.capacity_num_bytes) { 516 DLOG(ERROR) << "Consumer claims to have read too many bytes."; 517 break; 518 } 519 520 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that " 521 << m->num_bytes 522 << " bytes were read. [control_port=" << control_port_.name() 523 << "]"; 524 525 available_capacity_ += m->num_bytes; 526 } 527 } while (message_event); 528 } 529 530 if (peer_closed_ != was_peer_closed || 531 available_capacity_ != previous_capacity || 532 was_peer_remote != peer_remote_) { 533 watchers_.NotifyState(GetHandleSignalsStateNoLock()); 534 } 535 } 536 537 } // namespace core 538 } // namespace mojo 539