Home | History | Annotate | Download | only in core
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/core/data_pipe_producer_dispatcher.h"
      6 
      7 #include <stddef.h>
      8 #include <stdint.h>
      9 
     10 #include <utility>
     11 
     12 #include "base/bind.h"
     13 #include "base/logging.h"
     14 #include "base/memory/ref_counted.h"
     15 #include "mojo/core/configuration.h"
     16 #include "mojo/core/core.h"
     17 #include "mojo/core/data_pipe_control_message.h"
     18 #include "mojo/core/node_controller.h"
     19 #include "mojo/core/platform_handle_utils.h"
     20 #include "mojo/core/request_context.h"
     21 #include "mojo/core/user_message_impl.h"
     22 #include "mojo/public/c/system/data_pipe.h"
     23 
     24 namespace mojo {
     25 namespace core {
     26 
     27 namespace {
     28 
     29 const uint8_t kFlagPeerClosed = 0x01;
     30 
     31 #pragma pack(push, 1)
     32 
     33 struct SerializedState {
     34   MojoCreateDataPipeOptions options;
     35   uint64_t pipe_id;
     36   uint32_t write_offset;
     37   uint32_t available_capacity;
     38   uint8_t flags;
     39   uint64_t buffer_guid_high;
     40   uint64_t buffer_guid_low;
     41   char padding[7];
     42 };
     43 
     44 static_assert(sizeof(SerializedState) % 8 == 0,
     45               "Invalid SerializedState size.");
     46 
     47 #pragma pack(pop)
     48 
     49 }  // namespace
     50 
     51 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a
     52 // reference to the dispatcher to ensure it lives as long as the observed port.
     53 class DataPipeProducerDispatcher::PortObserverThunk
     54     : public NodeController::PortObserver {
     55  public:
     56   explicit PortObserverThunk(
     57       scoped_refptr<DataPipeProducerDispatcher> dispatcher)
     58       : dispatcher_(dispatcher) {}
     59 
     60  private:
     61   ~PortObserverThunk() override {}
     62 
     63   // NodeController::PortObserver:
     64   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
     65 
     66   scoped_refptr<DataPipeProducerDispatcher> dispatcher_;
     67 
     68   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
     69 };
     70 
     71 // static
     72 scoped_refptr<DataPipeProducerDispatcher> DataPipeProducerDispatcher::Create(
     73     NodeController* node_controller,
     74     const ports::PortRef& control_port,
     75     base::UnsafeSharedMemoryRegion shared_ring_buffer,
     76     const MojoCreateDataPipeOptions& options,
     77     uint64_t pipe_id) {
     78   scoped_refptr<DataPipeProducerDispatcher> producer =
     79       new DataPipeProducerDispatcher(node_controller, control_port,
     80                                      std::move(shared_ring_buffer), options,
     81                                      pipe_id);
     82   base::AutoLock lock(producer->lock_);
     83   if (!producer->InitializeNoLock())
     84     return nullptr;
     85   return producer;
     86 }
     87 
     88 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
     89   return Type::DATA_PIPE_PRODUCER;
     90 }
     91 
     92 MojoResult DataPipeProducerDispatcher::Close() {
     93   base::AutoLock lock(lock_);
     94   DVLOG(1) << "Closing data pipe producer " << pipe_id_;
     95   return CloseNoLock();
     96 }
     97 
     98 MojoResult DataPipeProducerDispatcher::WriteData(
     99     const void* elements,
    100     uint32_t* num_bytes,
    101     const MojoWriteDataOptions& options) {
    102   base::AutoLock lock(lock_);
    103   if (!shared_ring_buffer_.IsValid() || in_transit_)
    104     return MOJO_RESULT_INVALID_ARGUMENT;
    105 
    106   if (in_two_phase_write_)
    107     return MOJO_RESULT_BUSY;
    108 
    109   if (peer_closed_)
    110     return MOJO_RESULT_FAILED_PRECONDITION;
    111 
    112   if (*num_bytes % options_.element_num_bytes != 0)
    113     return MOJO_RESULT_INVALID_ARGUMENT;
    114   if (*num_bytes == 0)
    115     return MOJO_RESULT_OK;  // Nothing to do.
    116 
    117   if ((options.flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) &&
    118       (*num_bytes > available_capacity_)) {
    119     // Don't return "should wait" since you can't wait for a specified amount of
    120     // data.
    121     return MOJO_RESULT_OUT_OF_RANGE;
    122   }
    123 
    124   DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
    125   uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
    126   if (num_bytes_to_write == 0)
    127     return MOJO_RESULT_SHOULD_WAIT;
    128 
    129   *num_bytes = num_bytes_to_write;
    130 
    131   CHECK(ring_buffer_mapping_.IsValid());
    132   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_.memory());
    133   CHECK(data);
    134 
    135   const uint8_t* source = static_cast<const uint8_t*>(elements);
    136   CHECK(source);
    137 
    138   DCHECK_LE(write_offset_, options_.capacity_num_bytes);
    139   uint32_t tail_bytes_to_write =
    140       std::min(options_.capacity_num_bytes - write_offset_, num_bytes_to_write);
    141   uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
    142 
    143   DCHECK_GT(tail_bytes_to_write, 0u);
    144   memcpy(data + write_offset_, source, tail_bytes_to_write);
    145   if (head_bytes_to_write > 0)
    146     memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
    147 
    148   DCHECK_LE(num_bytes_to_write, available_capacity_);
    149   available_capacity_ -= num_bytes_to_write;
    150   write_offset_ =
    151       (write_offset_ + num_bytes_to_write) % options_.capacity_num_bytes;
    152 
    153   watchers_.NotifyState(GetHandleSignalsStateNoLock());
    154 
    155   base::AutoUnlock unlock(lock_);
    156   NotifyWrite(num_bytes_to_write);
    157 
    158   return MOJO_RESULT_OK;
    159 }
    160 
    161 MojoResult DataPipeProducerDispatcher::BeginWriteData(
    162     void** buffer,
    163     uint32_t* buffer_num_bytes) {
    164   base::AutoLock lock(lock_);
    165   if (!shared_ring_buffer_.IsValid() || in_transit_)
    166     return MOJO_RESULT_INVALID_ARGUMENT;
    167 
    168   if (in_two_phase_write_)
    169     return MOJO_RESULT_BUSY;
    170   if (peer_closed_)
    171     return MOJO_RESULT_FAILED_PRECONDITION;
    172 
    173   if (available_capacity_ == 0) {
    174     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
    175                         : MOJO_RESULT_SHOULD_WAIT;
    176   }
    177 
    178   in_two_phase_write_ = true;
    179   *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_,
    180                                available_capacity_);
    181   DCHECK_GT(*buffer_num_bytes, 0u);
    182 
    183   CHECK(ring_buffer_mapping_.IsValid());
    184   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_.memory());
    185   *buffer = data + write_offset_;
    186 
    187   return MOJO_RESULT_OK;
    188 }
    189 
    190 MojoResult DataPipeProducerDispatcher::EndWriteData(
    191     uint32_t num_bytes_written) {
    192   base::AutoLock lock(lock_);
    193   if (is_closed_ || in_transit_)
    194     return MOJO_RESULT_INVALID_ARGUMENT;
    195 
    196   if (!in_two_phase_write_)
    197     return MOJO_RESULT_FAILED_PRECONDITION;
    198 
    199   // Note: Allow successful completion of the two-phase write even if the other
    200   // side has been closed.
    201   MojoResult rv = MOJO_RESULT_OK;
    202   if (num_bytes_written > available_capacity_ ||
    203       num_bytes_written % options_.element_num_bytes != 0 ||
    204       write_offset_ + num_bytes_written > options_.capacity_num_bytes) {
    205     rv = MOJO_RESULT_INVALID_ARGUMENT;
    206   } else {
    207     DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes);
    208     available_capacity_ -= num_bytes_written;
    209     write_offset_ =
    210         (write_offset_ + num_bytes_written) % options_.capacity_num_bytes;
    211 
    212     base::AutoUnlock unlock(lock_);
    213     NotifyWrite(num_bytes_written);
    214   }
    215 
    216   in_two_phase_write_ = false;
    217 
    218   // If we're now writable, we *became* writable (since we weren't writable
    219   // during the two-phase write), so notify watchers.
    220   watchers_.NotifyState(GetHandleSignalsStateNoLock());
    221 
    222   return rv;
    223 }
    224 
    225 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
    226   base::AutoLock lock(lock_);
    227   return GetHandleSignalsStateNoLock();
    228 }
    229 
    230 MojoResult DataPipeProducerDispatcher::AddWatcherRef(
    231     const scoped_refptr<WatcherDispatcher>& watcher,
    232     uintptr_t context) {
    233   base::AutoLock lock(lock_);
    234   if (is_closed_ || in_transit_)
    235     return MOJO_RESULT_INVALID_ARGUMENT;
    236   return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
    237 }
    238 
    239 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
    240     WatcherDispatcher* watcher,
    241     uintptr_t context) {
    242   base::AutoLock lock(lock_);
    243   if (is_closed_ || in_transit_)
    244     return MOJO_RESULT_INVALID_ARGUMENT;
    245   return watchers_.Remove(watcher, context);
    246 }
    247 
    248 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
    249                                                 uint32_t* num_ports,
    250                                                 uint32_t* num_handles) {
    251   base::AutoLock lock(lock_);
    252   DCHECK(in_transit_);
    253   *num_bytes = sizeof(SerializedState);
    254   *num_ports = 1;
    255   *num_handles = 1;
    256 }
    257 
    258 bool DataPipeProducerDispatcher::EndSerialize(
    259     void* destination,
    260     ports::PortName* ports,
    261     PlatformHandle* platform_handles) {
    262   SerializedState* state = static_cast<SerializedState*>(destination);
    263   memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
    264   memset(state->padding, 0, sizeof(state->padding));
    265 
    266   base::AutoLock lock(lock_);
    267   DCHECK(in_transit_);
    268   state->pipe_id = pipe_id_;
    269   state->write_offset = write_offset_;
    270   state->available_capacity = available_capacity_;
    271   state->flags = peer_closed_ ? kFlagPeerClosed : 0;
    272 
    273   auto region_handle =
    274       base::UnsafeSharedMemoryRegion::TakeHandleForSerialization(
    275           std::move(shared_ring_buffer_));
    276   const base::UnguessableToken& guid = region_handle.GetGUID();
    277   state->buffer_guid_high = guid.GetHighForSerialization();
    278   state->buffer_guid_low = guid.GetLowForSerialization();
    279 
    280   ports[0] = control_port_.name();
    281 
    282   PlatformHandle handle;
    283   PlatformHandle ignored_handle;
    284   ExtractPlatformHandlesFromSharedMemoryRegionHandle(
    285       region_handle.PassPlatformHandle(), &handle, &ignored_handle);
    286   if (!handle.is_valid() || ignored_handle.is_valid())
    287     return false;
    288 
    289   platform_handles[0] = std::move(handle);
    290   return true;
    291 }
    292 
    293 bool DataPipeProducerDispatcher::BeginTransit() {
    294   base::AutoLock lock(lock_);
    295   if (in_transit_)
    296     return false;
    297   in_transit_ = !in_two_phase_write_;
    298   return in_transit_;
    299 }
    300 
    301 void DataPipeProducerDispatcher::CompleteTransitAndClose() {
    302   node_controller_->SetPortObserver(control_port_, nullptr);
    303 
    304   base::AutoLock lock(lock_);
    305   DCHECK(in_transit_);
    306   transferred_ = true;
    307   in_transit_ = false;
    308   CloseNoLock();
    309 }
    310 
    311 void DataPipeProducerDispatcher::CancelTransit() {
    312   base::AutoLock lock(lock_);
    313   DCHECK(in_transit_);
    314   in_transit_ = false;
    315 
    316   HandleSignalsState state = GetHandleSignalsStateNoLock();
    317   watchers_.NotifyState(state);
    318 }
    319 
    320 // static
    321 scoped_refptr<DataPipeProducerDispatcher>
    322 DataPipeProducerDispatcher::Deserialize(const void* data,
    323                                         size_t num_bytes,
    324                                         const ports::PortName* ports,
    325                                         size_t num_ports,
    326                                         PlatformHandle* handles,
    327                                         size_t num_handles) {
    328   if (num_ports != 1 || num_handles != 1 ||
    329       num_bytes != sizeof(SerializedState)) {
    330     return nullptr;
    331   }
    332 
    333   const SerializedState* state = static_cast<const SerializedState*>(data);
    334   if (!state->options.capacity_num_bytes || !state->options.element_num_bytes ||
    335       state->options.capacity_num_bytes < state->options.element_num_bytes) {
    336     return nullptr;
    337   }
    338 
    339   NodeController* node_controller = Core::Get()->GetNodeController();
    340   ports::PortRef port;
    341   if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
    342     return nullptr;
    343 
    344   auto region_handle = CreateSharedMemoryRegionHandleFromPlatformHandles(
    345       std::move(handles[0]), PlatformHandle());
    346   auto region = base::subtle::PlatformSharedMemoryRegion::Take(
    347       std::move(region_handle),
    348       base::subtle::PlatformSharedMemoryRegion::Mode::kUnsafe,
    349       state->options.capacity_num_bytes,
    350       base::UnguessableToken::Deserialize(state->buffer_guid_high,
    351                                           state->buffer_guid_low));
    352   auto ring_buffer =
    353       base::UnsafeSharedMemoryRegion::Deserialize(std::move(region));
    354   if (!ring_buffer.IsValid()) {
    355     DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
    356     return nullptr;
    357   }
    358 
    359   scoped_refptr<DataPipeProducerDispatcher> dispatcher =
    360       new DataPipeProducerDispatcher(node_controller, port,
    361                                      std::move(ring_buffer), state->options,
    362                                      state->pipe_id);
    363 
    364   {
    365     base::AutoLock lock(dispatcher->lock_);
    366     dispatcher->write_offset_ = state->write_offset;
    367     dispatcher->available_capacity_ = state->available_capacity;
    368     dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
    369     if (!dispatcher->InitializeNoLock())
    370       return nullptr;
    371     dispatcher->UpdateSignalsStateNoLock();
    372   }
    373 
    374   return dispatcher;
    375 }
    376 
    377 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
    378     NodeController* node_controller,
    379     const ports::PortRef& control_port,
    380     base::UnsafeSharedMemoryRegion shared_ring_buffer,
    381     const MojoCreateDataPipeOptions& options,
    382     uint64_t pipe_id)
    383     : options_(options),
    384       node_controller_(node_controller),
    385       control_port_(control_port),
    386       pipe_id_(pipe_id),
    387       watchers_(this),
    388       shared_ring_buffer_(std::move(shared_ring_buffer)),
    389       available_capacity_(options_.capacity_num_bytes) {}
    390 
    391 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
    392   DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_.IsValid() &&
    393          !ring_buffer_mapping_.IsValid());
    394 }
    395 
    396 bool DataPipeProducerDispatcher::InitializeNoLock() {
    397   lock_.AssertAcquired();
    398   if (!shared_ring_buffer_.IsValid())
    399     return false;
    400 
    401   DCHECK(!ring_buffer_mapping_.IsValid());
    402   ring_buffer_mapping_ = shared_ring_buffer_.Map();
    403   if (!ring_buffer_mapping_.IsValid()) {
    404     DLOG(ERROR) << "Failed to map shared buffer.";
    405     shared_ring_buffer_ = base::UnsafeSharedMemoryRegion();
    406     return false;
    407   }
    408 
    409   base::AutoUnlock unlock(lock_);
    410   node_controller_->SetPortObserver(
    411       control_port_, base::MakeRefCounted<PortObserverThunk>(this));
    412 
    413   return true;
    414 }
    415 
    416 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
    417   lock_.AssertAcquired();
    418   if (is_closed_ || in_transit_)
    419     return MOJO_RESULT_INVALID_ARGUMENT;
    420   is_closed_ = true;
    421   ring_buffer_mapping_ = base::WritableSharedMemoryMapping();
    422   shared_ring_buffer_ = base::UnsafeSharedMemoryRegion();
    423 
    424   watchers_.NotifyClosed();
    425   if (!transferred_) {
    426     base::AutoUnlock unlock(lock_);
    427     node_controller_->ClosePort(control_port_);
    428   }
    429 
    430   return MOJO_RESULT_OK;
    431 }
    432 
    433 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
    434     const {
    435   lock_.AssertAcquired();
    436   HandleSignalsState rv;
    437   if (!peer_closed_) {
    438     if (!in_two_phase_write_ && shared_ring_buffer_.IsValid() &&
    439         available_capacity_ > 0)
    440       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
    441     if (peer_remote_)
    442       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
    443     rv.satisfiable_signals |=
    444         MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_REMOTE;
    445   } else {
    446     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
    447   }
    448   rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
    449   return rv;
    450 }
    451 
    452 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
    453   DVLOG(1) << "Data pipe producer " << pipe_id_
    454            << " notifying peer: " << num_bytes
    455            << " bytes written. [control_port=" << control_port_.name() << "]";
    456 
    457   SendDataPipeControlMessage(node_controller_, control_port_,
    458                              DataPipeCommand::DATA_WAS_WRITTEN, num_bytes);
    459 }
    460 
    461 void DataPipeProducerDispatcher::OnPortStatusChanged() {
    462   DCHECK(RequestContext::current());
    463 
    464   base::AutoLock lock(lock_);
    465 
    466   // We stop observing the control port as soon it's transferred, but this can
    467   // race with events which are raised right before that happens. This is fine
    468   // to ignore.
    469   if (transferred_)
    470     return;
    471 
    472   DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
    473 
    474   UpdateSignalsStateNoLock();
    475 }
    476 
    477 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
    478   lock_.AssertAcquired();
    479 
    480   const bool was_peer_closed = peer_closed_;
    481   const bool was_peer_remote = peer_remote_;
    482   size_t previous_capacity = available_capacity_;
    483 
    484   ports::PortStatus port_status;
    485   int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
    486   peer_remote_ = rv == ports::OK && port_status.peer_remote;
    487   if (rv != ports::OK || !port_status.receiving_messages) {
    488     DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure"
    489              << " [control_port=" << control_port_.name() << "]";
    490     peer_closed_ = true;
    491   } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
    492     std::unique_ptr<ports::UserMessageEvent> message_event;
    493     do {
    494       int rv = node_controller_->node()->GetMessage(control_port_,
    495                                                     &message_event, nullptr);
    496       if (rv != ports::OK)
    497         peer_closed_ = true;
    498       if (message_event) {
    499         auto* message = message_event->GetMessage<UserMessageImpl>();
    500         if (message->user_payload_size() < sizeof(DataPipeControlMessage)) {
    501           peer_closed_ = true;
    502           break;
    503         }
    504 
    505         const DataPipeControlMessage* m =
    506             static_cast<const DataPipeControlMessage*>(message->user_payload());
    507 
    508         if (m->command != DataPipeCommand::DATA_WAS_READ) {
    509           DLOG(ERROR) << "Unexpected message from consumer.";
    510           peer_closed_ = true;
    511           break;
    512         }
    513 
    514         if (static_cast<size_t>(available_capacity_) + m->num_bytes >
    515             options_.capacity_num_bytes) {
    516           DLOG(ERROR) << "Consumer claims to have read too many bytes.";
    517           break;
    518         }
    519 
    520         DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that "
    521                  << m->num_bytes
    522                  << " bytes were read. [control_port=" << control_port_.name()
    523                  << "]";
    524 
    525         available_capacity_ += m->num_bytes;
    526       }
    527     } while (message_event);
    528   }
    529 
    530   if (peer_closed_ != was_peer_closed ||
    531       available_capacity_ != previous_capacity ||
    532       was_peer_remote != peer_remote_) {
    533     watchers_.NotifyState(GetHandleSignalsStateNoLock());
    534   }
    535 }
    536 
    537 }  // namespace core
    538 }  // namespace mojo
    539