1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 6 7 #include <stddef.h> 8 #include <stdint.h> 9 10 #include <utility> 11 12 #include "base/bind.h" 13 #include "base/logging.h" 14 #include "base/memory/ref_counted.h" 15 #include "base/message_loop/message_loop.h" 16 #include "mojo/edk/embedder/embedder_internal.h" 17 #include "mojo/edk/embedder/platform_shared_buffer.h" 18 #include "mojo/edk/system/configuration.h" 19 #include "mojo/edk/system/core.h" 20 #include "mojo/edk/system/data_pipe_control_message.h" 21 #include "mojo/edk/system/node_controller.h" 22 #include "mojo/edk/system/ports_message.h" 23 #include "mojo/edk/system/request_context.h" 24 #include "mojo/public/c/system/data_pipe.h" 25 26 namespace mojo { 27 namespace edk { 28 29 namespace { 30 31 const uint8_t kFlagPeerClosed = 0x01; 32 33 #pragma pack(push, 1) 34 35 struct SerializedState { 36 MojoCreateDataPipeOptions options; 37 uint64_t pipe_id; 38 uint32_t write_offset; 39 uint32_t available_capacity; 40 uint8_t flags; 41 char padding[7]; 42 }; 43 44 static_assert(sizeof(SerializedState) % 8 == 0, 45 "Invalid SerializedState size."); 46 47 #pragma pack(pop) 48 49 } // namespace 50 51 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a 52 // reference to the dispatcher to ensure it lives as long as the observed port. 53 class DataPipeProducerDispatcher::PortObserverThunk 54 : public NodeController::PortObserver { 55 public: 56 explicit PortObserverThunk( 57 scoped_refptr<DataPipeProducerDispatcher> dispatcher) 58 : dispatcher_(dispatcher) {} 59 60 private: 61 ~PortObserverThunk() override {} 62 63 // NodeController::PortObserver: 64 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } 65 66 scoped_refptr<DataPipeProducerDispatcher> dispatcher_; 67 68 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); 69 }; 70 71 DataPipeProducerDispatcher::DataPipeProducerDispatcher( 72 NodeController* node_controller, 73 const ports::PortRef& control_port, 74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, 75 const MojoCreateDataPipeOptions& options, 76 bool initialized, 77 uint64_t pipe_id) 78 : options_(options), 79 node_controller_(node_controller), 80 control_port_(control_port), 81 pipe_id_(pipe_id), 82 shared_ring_buffer_(shared_ring_buffer), 83 available_capacity_(options_.capacity_num_bytes) { 84 if (initialized) { 85 base::AutoLock lock(lock_); 86 InitializeNoLock(); 87 } 88 } 89 90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { 91 return Type::DATA_PIPE_PRODUCER; 92 } 93 94 MojoResult DataPipeProducerDispatcher::Close() { 95 base::AutoLock lock(lock_); 96 DVLOG(1) << "Closing data pipe producer " << pipe_id_; 97 return CloseNoLock(); 98 } 99 100 MojoResult DataPipeProducerDispatcher::Watch( 101 MojoHandleSignals signals, 102 const Watcher::WatchCallback& callback, 103 uintptr_t context) { 104 base::AutoLock lock(lock_); 105 106 if (is_closed_ || in_transit_) 107 return MOJO_RESULT_INVALID_ARGUMENT; 108 109 return awakable_list_.AddWatcher( 110 signals, callback, context, GetHandleSignalsStateNoLock()); 111 } 112 113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { 114 base::AutoLock lock(lock_); 115 116 if (is_closed_ || in_transit_) 117 return MOJO_RESULT_INVALID_ARGUMENT; 118 119 return awakable_list_.RemoveWatcher(context); 120 } 121 122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, 123 uint32_t* num_bytes, 124 MojoWriteDataFlags flags) { 125 base::AutoLock lock(lock_); 126 if (!shared_ring_buffer_ || in_transit_) 127 return MOJO_RESULT_INVALID_ARGUMENT; 128 129 if (in_two_phase_write_) 130 return MOJO_RESULT_BUSY; 131 132 if (peer_closed_) 133 return MOJO_RESULT_FAILED_PRECONDITION; 134 135 if (*num_bytes % options_.element_num_bytes != 0) 136 return MOJO_RESULT_INVALID_ARGUMENT; 137 if (*num_bytes == 0) 138 return MOJO_RESULT_OK; // Nothing to do. 139 140 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; 141 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; 142 if (min_num_bytes_to_write > options_.capacity_num_bytes) { 143 // Don't return "should wait" since you can't wait for a specified amount of 144 // data. 145 return MOJO_RESULT_OUT_OF_RANGE; 146 } 147 148 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); 149 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); 150 if (num_bytes_to_write == 0) 151 return MOJO_RESULT_SHOULD_WAIT; 152 153 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); 154 155 *num_bytes = num_bytes_to_write; 156 157 CHECK(ring_buffer_mapping_); 158 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 159 CHECK(data); 160 161 const uint8_t* source = static_cast<const uint8_t*>(elements); 162 CHECK(source); 163 164 DCHECK_LE(write_offset_, options_.capacity_num_bytes); 165 uint32_t tail_bytes_to_write = 166 std::min(options_.capacity_num_bytes - write_offset_, 167 num_bytes_to_write); 168 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; 169 170 DCHECK_GT(tail_bytes_to_write, 0u); 171 memcpy(data + write_offset_, source, tail_bytes_to_write); 172 if (head_bytes_to_write > 0) 173 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); 174 175 DCHECK_LE(num_bytes_to_write, available_capacity_); 176 available_capacity_ -= num_bytes_to_write; 177 write_offset_ = (write_offset_ + num_bytes_to_write) % 178 options_.capacity_num_bytes; 179 180 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 181 if (!new_state.equals(old_state)) 182 awakable_list_.AwakeForStateChange(new_state); 183 184 base::AutoUnlock unlock(lock_); 185 NotifyWrite(num_bytes_to_write); 186 187 return MOJO_RESULT_OK; 188 } 189 190 MojoResult DataPipeProducerDispatcher::BeginWriteData( 191 void** buffer, 192 uint32_t* buffer_num_bytes, 193 MojoWriteDataFlags flags) { 194 base::AutoLock lock(lock_); 195 if (!shared_ring_buffer_ || in_transit_) 196 return MOJO_RESULT_INVALID_ARGUMENT; 197 if (in_two_phase_write_) 198 return MOJO_RESULT_BUSY; 199 if (peer_closed_) 200 return MOJO_RESULT_FAILED_PRECONDITION; 201 202 if (available_capacity_ == 0) { 203 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 204 : MOJO_RESULT_SHOULD_WAIT; 205 } 206 207 in_two_phase_write_ = true; 208 *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_, 209 available_capacity_); 210 DCHECK_GT(*buffer_num_bytes, 0u); 211 212 CHECK(ring_buffer_mapping_); 213 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 214 *buffer = data + write_offset_; 215 216 return MOJO_RESULT_OK; 217 } 218 219 MojoResult DataPipeProducerDispatcher::EndWriteData( 220 uint32_t num_bytes_written) { 221 base::AutoLock lock(lock_); 222 if (is_closed_ || in_transit_) 223 return MOJO_RESULT_INVALID_ARGUMENT; 224 225 if (!in_two_phase_write_) 226 return MOJO_RESULT_FAILED_PRECONDITION; 227 228 DCHECK(shared_ring_buffer_); 229 DCHECK(ring_buffer_mapping_); 230 231 // Note: Allow successful completion of the two-phase write even if the other 232 // side has been closed. 233 MojoResult rv = MOJO_RESULT_OK; 234 if (num_bytes_written > available_capacity_ || 235 num_bytes_written % options_.element_num_bytes != 0 || 236 write_offset_ + num_bytes_written > options_.capacity_num_bytes) { 237 rv = MOJO_RESULT_INVALID_ARGUMENT; 238 } else { 239 DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes); 240 available_capacity_ -= num_bytes_written; 241 write_offset_ = (write_offset_ + num_bytes_written) % 242 options_.capacity_num_bytes; 243 244 base::AutoUnlock unlock(lock_); 245 NotifyWrite(num_bytes_written); 246 } 247 248 in_two_phase_write_ = false; 249 250 // If we're now writable, we *became* writable (since we weren't writable 251 // during the two-phase write), so awake producer awakables. 252 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 253 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) 254 awakable_list_.AwakeForStateChange(new_state); 255 256 return rv; 257 } 258 259 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { 260 base::AutoLock lock(lock_); 261 return GetHandleSignalsStateNoLock(); 262 } 263 264 MojoResult DataPipeProducerDispatcher::AddAwakable( 265 Awakable* awakable, 266 MojoHandleSignals signals, 267 uintptr_t context, 268 HandleSignalsState* signals_state) { 269 base::AutoLock lock(lock_); 270 if (!shared_ring_buffer_ || in_transit_) { 271 if (signals_state) 272 *signals_state = HandleSignalsState(); 273 return MOJO_RESULT_INVALID_ARGUMENT; 274 } 275 UpdateSignalsStateNoLock(); 276 HandleSignalsState state = GetHandleSignalsStateNoLock(); 277 if (state.satisfies(signals)) { 278 if (signals_state) 279 *signals_state = state; 280 return MOJO_RESULT_ALREADY_EXISTS; 281 } 282 if (!state.can_satisfy(signals)) { 283 if (signals_state) 284 *signals_state = state; 285 return MOJO_RESULT_FAILED_PRECONDITION; 286 } 287 288 awakable_list_.Add(awakable, signals, context); 289 return MOJO_RESULT_OK; 290 } 291 292 void DataPipeProducerDispatcher::RemoveAwakable( 293 Awakable* awakable, 294 HandleSignalsState* signals_state) { 295 base::AutoLock lock(lock_); 296 if ((!shared_ring_buffer_ || in_transit_) && signals_state) 297 *signals_state = HandleSignalsState(); 298 else if (signals_state) 299 *signals_state = GetHandleSignalsStateNoLock(); 300 awakable_list_.Remove(awakable); 301 } 302 303 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, 304 uint32_t* num_ports, 305 uint32_t* num_handles) { 306 base::AutoLock lock(lock_); 307 DCHECK(in_transit_); 308 *num_bytes = sizeof(SerializedState); 309 *num_ports = 1; 310 *num_handles = 1; 311 } 312 313 bool DataPipeProducerDispatcher::EndSerialize( 314 void* destination, 315 ports::PortName* ports, 316 PlatformHandle* platform_handles) { 317 SerializedState* state = static_cast<SerializedState*>(destination); 318 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); 319 memset(state->padding, 0, sizeof(state->padding)); 320 321 base::AutoLock lock(lock_); 322 DCHECK(in_transit_); 323 state->pipe_id = pipe_id_; 324 state->write_offset = write_offset_; 325 state->available_capacity = available_capacity_; 326 state->flags = peer_closed_ ? kFlagPeerClosed : 0; 327 328 ports[0] = control_port_.name(); 329 330 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); 331 platform_handles[0] = buffer_handle_for_transit_.get(); 332 333 return true; 334 } 335 336 bool DataPipeProducerDispatcher::BeginTransit() { 337 base::AutoLock lock(lock_); 338 if (in_transit_) 339 return false; 340 in_transit_ = !in_two_phase_write_; 341 return in_transit_; 342 } 343 344 void DataPipeProducerDispatcher::CompleteTransitAndClose() { 345 node_controller_->SetPortObserver(control_port_, nullptr); 346 347 base::AutoLock lock(lock_); 348 DCHECK(in_transit_); 349 transferred_ = true; 350 in_transit_ = false; 351 ignore_result(buffer_handle_for_transit_.release()); 352 CloseNoLock(); 353 } 354 355 void DataPipeProducerDispatcher::CancelTransit() { 356 base::AutoLock lock(lock_); 357 DCHECK(in_transit_); 358 in_transit_ = false; 359 buffer_handle_for_transit_.reset(); 360 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 361 } 362 363 // static 364 scoped_refptr<DataPipeProducerDispatcher> 365 DataPipeProducerDispatcher::Deserialize(const void* data, 366 size_t num_bytes, 367 const ports::PortName* ports, 368 size_t num_ports, 369 PlatformHandle* handles, 370 size_t num_handles) { 371 if (num_ports != 1 || num_handles != 1 || 372 num_bytes != sizeof(SerializedState)) { 373 return nullptr; 374 } 375 376 const SerializedState* state = static_cast<const SerializedState*>(data); 377 378 NodeController* node_controller = internal::g_core->GetNodeController(); 379 ports::PortRef port; 380 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) 381 return nullptr; 382 383 PlatformHandle buffer_handle; 384 std::swap(buffer_handle, handles[0]); 385 scoped_refptr<PlatformSharedBuffer> ring_buffer = 386 PlatformSharedBuffer::CreateFromPlatformHandle( 387 state->options.capacity_num_bytes, 388 false /* read_only */, 389 ScopedPlatformHandle(buffer_handle)); 390 if (!ring_buffer) { 391 DLOG(ERROR) << "Failed to deserialize shared buffer handle."; 392 return nullptr; 393 } 394 395 scoped_refptr<DataPipeProducerDispatcher> dispatcher = 396 new DataPipeProducerDispatcher(node_controller, port, ring_buffer, 397 state->options, false /* initialized */, 398 state->pipe_id); 399 400 { 401 base::AutoLock lock(dispatcher->lock_); 402 dispatcher->write_offset_ = state->write_offset; 403 dispatcher->available_capacity_ = state->available_capacity; 404 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; 405 dispatcher->InitializeNoLock(); 406 } 407 408 return dispatcher; 409 } 410 411 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { 412 DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ && 413 !ring_buffer_mapping_); 414 } 415 416 void DataPipeProducerDispatcher::InitializeNoLock() { 417 lock_.AssertAcquired(); 418 419 if (shared_ring_buffer_) { 420 ring_buffer_mapping_ = 421 shared_ring_buffer_->Map(0, options_.capacity_num_bytes); 422 if (!ring_buffer_mapping_) { 423 DLOG(ERROR) << "Failed to map shared buffer."; 424 shared_ring_buffer_ = nullptr; 425 } 426 } 427 428 base::AutoUnlock unlock(lock_); 429 node_controller_->SetPortObserver( 430 control_port_, 431 make_scoped_refptr(new PortObserverThunk(this))); 432 } 433 434 MojoResult DataPipeProducerDispatcher::CloseNoLock() { 435 lock_.AssertAcquired(); 436 if (is_closed_ || in_transit_) 437 return MOJO_RESULT_INVALID_ARGUMENT; 438 is_closed_ = true; 439 ring_buffer_mapping_.reset(); 440 shared_ring_buffer_ = nullptr; 441 442 awakable_list_.CancelAll(); 443 if (!transferred_) { 444 base::AutoUnlock unlock(lock_); 445 node_controller_->ClosePort(control_port_); 446 } 447 448 return MOJO_RESULT_OK; 449 } 450 451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() 452 const { 453 lock_.AssertAcquired(); 454 HandleSignalsState rv; 455 if (!peer_closed_) { 456 if (!in_two_phase_write_ && shared_ring_buffer_ && 457 available_capacity_ > 0) 458 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 459 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 460 } else { 461 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 462 } 463 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 464 return rv; 465 } 466 467 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { 468 DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: " 469 << num_bytes << " bytes written. [control_port=" 470 << control_port_.name() << "]"; 471 472 SendDataPipeControlMessage(node_controller_, control_port_, 473 DataPipeCommand::DATA_WAS_WRITTEN, num_bytes); 474 } 475 476 void DataPipeProducerDispatcher::OnPortStatusChanged() { 477 DCHECK(RequestContext::current()); 478 479 base::AutoLock lock(lock_); 480 481 // We stop observing the control port as soon it's transferred, but this can 482 // race with events which are raised right before that happens. This is fine 483 // to ignore. 484 if (transferred_) 485 return; 486 487 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; 488 489 UpdateSignalsStateNoLock(); 490 } 491 492 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() { 493 lock_.AssertAcquired(); 494 495 bool was_peer_closed = peer_closed_; 496 size_t previous_capacity = available_capacity_; 497 498 ports::PortStatus port_status; 499 int rv = node_controller_->node()->GetStatus(control_port_, &port_status); 500 if (rv != ports::OK || !port_status.receiving_messages) { 501 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure" 502 << " [control_port=" << control_port_.name() << "]"; 503 peer_closed_ = true; 504 } else if (rv == ports::OK && port_status.has_messages && !in_transit_) { 505 ports::ScopedMessage message; 506 do { 507 int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr, 508 &message); 509 if (rv != ports::OK) 510 peer_closed_ = true; 511 if (message) { 512 if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) { 513 peer_closed_ = true; 514 break; 515 } 516 517 const DataPipeControlMessage* m = 518 static_cast<const DataPipeControlMessage*>( 519 message->payload_bytes()); 520 521 if (m->command != DataPipeCommand::DATA_WAS_READ) { 522 DLOG(ERROR) << "Unexpected message from consumer."; 523 peer_closed_ = true; 524 break; 525 } 526 527 if (static_cast<size_t>(available_capacity_) + m->num_bytes > 528 options_.capacity_num_bytes) { 529 DLOG(ERROR) << "Consumer claims to have read too many bytes."; 530 break; 531 } 532 533 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that " 534 << m->num_bytes << " bytes were read. [control_port=" 535 << control_port_.name() << "]"; 536 537 available_capacity_ += m->num_bytes; 538 } 539 } while (message); 540 } 541 542 if (peer_closed_ != was_peer_closed || 543 available_capacity_ != previous_capacity) { 544 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 545 } 546 } 547 548 } // namespace edk 549 } // namespace mojo 550