Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
      6 
      7 #include <stddef.h>
      8 #include <stdint.h>
      9 
     10 #include <utility>
     11 
     12 #include "base/bind.h"
     13 #include "base/logging.h"
     14 #include "base/memory/ref_counted.h"
     15 #include "base/message_loop/message_loop.h"
     16 #include "mojo/edk/embedder/embedder_internal.h"
     17 #include "mojo/edk/embedder/platform_shared_buffer.h"
     18 #include "mojo/edk/system/configuration.h"
     19 #include "mojo/edk/system/core.h"
     20 #include "mojo/edk/system/data_pipe_control_message.h"
     21 #include "mojo/edk/system/node_controller.h"
     22 #include "mojo/edk/system/ports_message.h"
     23 #include "mojo/edk/system/request_context.h"
     24 #include "mojo/public/c/system/data_pipe.h"
     25 
     26 namespace mojo {
     27 namespace edk {
     28 
     29 namespace {
     30 
     31 const uint8_t kFlagPeerClosed = 0x01;
     32 
     33 #pragma pack(push, 1)
     34 
     35 struct SerializedState {
     36   MojoCreateDataPipeOptions options;
     37   uint64_t pipe_id;
     38   uint32_t write_offset;
     39   uint32_t available_capacity;
     40   uint8_t flags;
     41   char padding[7];
     42 };
     43 
     44 static_assert(sizeof(SerializedState) % 8 == 0,
     45               "Invalid SerializedState size.");
     46 
     47 #pragma pack(pop)
     48 
     49 }  // namespace
     50 
     51 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a
     52 // reference to the dispatcher to ensure it lives as long as the observed port.
     53 class DataPipeProducerDispatcher::PortObserverThunk
     54     : public NodeController::PortObserver {
     55  public:
     56   explicit PortObserverThunk(
     57       scoped_refptr<DataPipeProducerDispatcher> dispatcher)
     58       : dispatcher_(dispatcher) {}
     59 
     60  private:
     61   ~PortObserverThunk() override {}
     62 
     63   // NodeController::PortObserver:
     64   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
     65 
     66   scoped_refptr<DataPipeProducerDispatcher> dispatcher_;
     67 
     68   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
     69 };
     70 
     71 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
     72     NodeController* node_controller,
     73     const ports::PortRef& control_port,
     74     scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
     75     const MojoCreateDataPipeOptions& options,
     76     bool initialized,
     77     uint64_t pipe_id)
     78     : options_(options),
     79       node_controller_(node_controller),
     80       control_port_(control_port),
     81       pipe_id_(pipe_id),
     82       shared_ring_buffer_(shared_ring_buffer),
     83       available_capacity_(options_.capacity_num_bytes) {
     84   if (initialized) {
     85     base::AutoLock lock(lock_);
     86     InitializeNoLock();
     87   }
     88 }
     89 
     90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
     91   return Type::DATA_PIPE_PRODUCER;
     92 }
     93 
     94 MojoResult DataPipeProducerDispatcher::Close() {
     95   base::AutoLock lock(lock_);
     96   DVLOG(1) << "Closing data pipe producer " << pipe_id_;
     97   return CloseNoLock();
     98 }
     99 
    100 MojoResult DataPipeProducerDispatcher::Watch(
    101     MojoHandleSignals signals,
    102     const Watcher::WatchCallback& callback,
    103     uintptr_t context) {
    104   base::AutoLock lock(lock_);
    105 
    106   if (is_closed_ || in_transit_)
    107     return MOJO_RESULT_INVALID_ARGUMENT;
    108 
    109   return awakable_list_.AddWatcher(
    110       signals, callback, context, GetHandleSignalsStateNoLock());
    111 }
    112 
    113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
    114   base::AutoLock lock(lock_);
    115 
    116   if (is_closed_ || in_transit_)
    117     return MOJO_RESULT_INVALID_ARGUMENT;
    118 
    119   return awakable_list_.RemoveWatcher(context);
    120 }
    121 
    122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
    123                                                  uint32_t* num_bytes,
    124                                                  MojoWriteDataFlags flags) {
    125   base::AutoLock lock(lock_);
    126   if (!shared_ring_buffer_ || in_transit_)
    127     return MOJO_RESULT_INVALID_ARGUMENT;
    128 
    129   if (in_two_phase_write_)
    130     return MOJO_RESULT_BUSY;
    131 
    132   if (peer_closed_)
    133     return MOJO_RESULT_FAILED_PRECONDITION;
    134 
    135   if (*num_bytes % options_.element_num_bytes != 0)
    136     return MOJO_RESULT_INVALID_ARGUMENT;
    137   if (*num_bytes == 0)
    138     return MOJO_RESULT_OK;  // Nothing to do.
    139 
    140   bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
    141   uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
    142   if (min_num_bytes_to_write > options_.capacity_num_bytes) {
    143     // Don't return "should wait" since you can't wait for a specified amount of
    144     // data.
    145     return MOJO_RESULT_OUT_OF_RANGE;
    146   }
    147 
    148   DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
    149   uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
    150   if (num_bytes_to_write == 0)
    151     return MOJO_RESULT_SHOULD_WAIT;
    152 
    153   HandleSignalsState old_state = GetHandleSignalsStateNoLock();
    154 
    155   *num_bytes = num_bytes_to_write;
    156 
    157   CHECK(ring_buffer_mapping_);
    158   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
    159   CHECK(data);
    160 
    161   const uint8_t* source = static_cast<const uint8_t*>(elements);
    162   CHECK(source);
    163 
    164   DCHECK_LE(write_offset_, options_.capacity_num_bytes);
    165   uint32_t tail_bytes_to_write =
    166       std::min(options_.capacity_num_bytes - write_offset_,
    167                num_bytes_to_write);
    168   uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
    169 
    170   DCHECK_GT(tail_bytes_to_write, 0u);
    171   memcpy(data + write_offset_, source, tail_bytes_to_write);
    172   if (head_bytes_to_write > 0)
    173     memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
    174 
    175   DCHECK_LE(num_bytes_to_write, available_capacity_);
    176   available_capacity_ -= num_bytes_to_write;
    177   write_offset_ = (write_offset_ + num_bytes_to_write) %
    178       options_.capacity_num_bytes;
    179 
    180   HandleSignalsState new_state = GetHandleSignalsStateNoLock();
    181   if (!new_state.equals(old_state))
    182     awakable_list_.AwakeForStateChange(new_state);
    183 
    184   base::AutoUnlock unlock(lock_);
    185   NotifyWrite(num_bytes_to_write);
    186 
    187   return MOJO_RESULT_OK;
    188 }
    189 
    190 MojoResult DataPipeProducerDispatcher::BeginWriteData(
    191     void** buffer,
    192     uint32_t* buffer_num_bytes,
    193     MojoWriteDataFlags flags) {
    194   base::AutoLock lock(lock_);
    195   if (!shared_ring_buffer_ || in_transit_)
    196     return MOJO_RESULT_INVALID_ARGUMENT;
    197   if (in_two_phase_write_)
    198     return MOJO_RESULT_BUSY;
    199   if (peer_closed_)
    200     return MOJO_RESULT_FAILED_PRECONDITION;
    201 
    202   if (available_capacity_ == 0) {
    203     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
    204                         : MOJO_RESULT_SHOULD_WAIT;
    205   }
    206 
    207   in_two_phase_write_ = true;
    208   *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_,
    209                                available_capacity_);
    210   DCHECK_GT(*buffer_num_bytes, 0u);
    211 
    212   CHECK(ring_buffer_mapping_);
    213   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
    214   *buffer = data + write_offset_;
    215 
    216   return MOJO_RESULT_OK;
    217 }
    218 
    219 MojoResult DataPipeProducerDispatcher::EndWriteData(
    220     uint32_t num_bytes_written) {
    221   base::AutoLock lock(lock_);
    222   if (is_closed_ || in_transit_)
    223     return MOJO_RESULT_INVALID_ARGUMENT;
    224 
    225   if (!in_two_phase_write_)
    226     return MOJO_RESULT_FAILED_PRECONDITION;
    227 
    228   DCHECK(shared_ring_buffer_);
    229   DCHECK(ring_buffer_mapping_);
    230 
    231   // Note: Allow successful completion of the two-phase write even if the other
    232   // side has been closed.
    233   MojoResult rv = MOJO_RESULT_OK;
    234   if (num_bytes_written > available_capacity_ ||
    235       num_bytes_written % options_.element_num_bytes != 0 ||
    236       write_offset_ + num_bytes_written > options_.capacity_num_bytes) {
    237     rv = MOJO_RESULT_INVALID_ARGUMENT;
    238   } else {
    239     DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes);
    240     available_capacity_ -= num_bytes_written;
    241     write_offset_ = (write_offset_ + num_bytes_written) %
    242         options_.capacity_num_bytes;
    243 
    244     base::AutoUnlock unlock(lock_);
    245     NotifyWrite(num_bytes_written);
    246   }
    247 
    248   in_two_phase_write_ = false;
    249 
    250   // If we're now writable, we *became* writable (since we weren't writable
    251   // during the two-phase write), so awake producer awakables.
    252   HandleSignalsState new_state = GetHandleSignalsStateNoLock();
    253   if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
    254     awakable_list_.AwakeForStateChange(new_state);
    255 
    256   return rv;
    257 }
    258 
    259 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
    260   base::AutoLock lock(lock_);
    261   return GetHandleSignalsStateNoLock();
    262 }
    263 
    264 MojoResult DataPipeProducerDispatcher::AddAwakable(
    265     Awakable* awakable,
    266     MojoHandleSignals signals,
    267     uintptr_t context,
    268     HandleSignalsState* signals_state) {
    269   base::AutoLock lock(lock_);
    270   if (!shared_ring_buffer_ || in_transit_) {
    271     if (signals_state)
    272       *signals_state = HandleSignalsState();
    273     return MOJO_RESULT_INVALID_ARGUMENT;
    274   }
    275   UpdateSignalsStateNoLock();
    276   HandleSignalsState state = GetHandleSignalsStateNoLock();
    277   if (state.satisfies(signals)) {
    278     if (signals_state)
    279       *signals_state = state;
    280     return MOJO_RESULT_ALREADY_EXISTS;
    281   }
    282   if (!state.can_satisfy(signals)) {
    283     if (signals_state)
    284       *signals_state = state;
    285     return MOJO_RESULT_FAILED_PRECONDITION;
    286   }
    287 
    288   awakable_list_.Add(awakable, signals, context);
    289   return MOJO_RESULT_OK;
    290 }
    291 
    292 void DataPipeProducerDispatcher::RemoveAwakable(
    293     Awakable* awakable,
    294     HandleSignalsState* signals_state) {
    295   base::AutoLock lock(lock_);
    296   if ((!shared_ring_buffer_ || in_transit_) && signals_state)
    297     *signals_state = HandleSignalsState();
    298   else if (signals_state)
    299     *signals_state = GetHandleSignalsStateNoLock();
    300   awakable_list_.Remove(awakable);
    301 }
    302 
    303 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
    304                                                 uint32_t* num_ports,
    305                                                 uint32_t* num_handles) {
    306   base::AutoLock lock(lock_);
    307   DCHECK(in_transit_);
    308   *num_bytes = sizeof(SerializedState);
    309   *num_ports = 1;
    310   *num_handles = 1;
    311 }
    312 
    313 bool DataPipeProducerDispatcher::EndSerialize(
    314     void* destination,
    315     ports::PortName* ports,
    316     PlatformHandle* platform_handles) {
    317   SerializedState* state = static_cast<SerializedState*>(destination);
    318   memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
    319   memset(state->padding, 0, sizeof(state->padding));
    320 
    321   base::AutoLock lock(lock_);
    322   DCHECK(in_transit_);
    323   state->pipe_id = pipe_id_;
    324   state->write_offset = write_offset_;
    325   state->available_capacity = available_capacity_;
    326   state->flags = peer_closed_ ? kFlagPeerClosed : 0;
    327 
    328   ports[0] = control_port_.name();
    329 
    330   buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
    331   platform_handles[0] = buffer_handle_for_transit_.get();
    332 
    333   return true;
    334 }
    335 
    336 bool DataPipeProducerDispatcher::BeginTransit() {
    337   base::AutoLock lock(lock_);
    338   if (in_transit_)
    339     return false;
    340   in_transit_ = !in_two_phase_write_;
    341   return in_transit_;
    342 }
    343 
    344 void DataPipeProducerDispatcher::CompleteTransitAndClose() {
    345   node_controller_->SetPortObserver(control_port_, nullptr);
    346 
    347   base::AutoLock lock(lock_);
    348   DCHECK(in_transit_);
    349   transferred_ = true;
    350   in_transit_ = false;
    351   ignore_result(buffer_handle_for_transit_.release());
    352   CloseNoLock();
    353 }
    354 
    355 void DataPipeProducerDispatcher::CancelTransit() {
    356   base::AutoLock lock(lock_);
    357   DCHECK(in_transit_);
    358   in_transit_ = false;
    359   buffer_handle_for_transit_.reset();
    360   awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
    361 }
    362 
    363 // static
    364 scoped_refptr<DataPipeProducerDispatcher>
    365 DataPipeProducerDispatcher::Deserialize(const void* data,
    366                                         size_t num_bytes,
    367                                         const ports::PortName* ports,
    368                                         size_t num_ports,
    369                                         PlatformHandle* handles,
    370                                         size_t num_handles) {
    371   if (num_ports != 1 || num_handles != 1 ||
    372       num_bytes != sizeof(SerializedState)) {
    373     return nullptr;
    374   }
    375 
    376   const SerializedState* state = static_cast<const SerializedState*>(data);
    377 
    378   NodeController* node_controller = internal::g_core->GetNodeController();
    379   ports::PortRef port;
    380   if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
    381     return nullptr;
    382 
    383   PlatformHandle buffer_handle;
    384   std::swap(buffer_handle, handles[0]);
    385   scoped_refptr<PlatformSharedBuffer> ring_buffer =
    386       PlatformSharedBuffer::CreateFromPlatformHandle(
    387           state->options.capacity_num_bytes,
    388           false /* read_only */,
    389           ScopedPlatformHandle(buffer_handle));
    390   if (!ring_buffer) {
    391     DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
    392     return nullptr;
    393   }
    394 
    395   scoped_refptr<DataPipeProducerDispatcher> dispatcher =
    396       new DataPipeProducerDispatcher(node_controller, port, ring_buffer,
    397                                      state->options, false /* initialized */,
    398                                      state->pipe_id);
    399 
    400   {
    401     base::AutoLock lock(dispatcher->lock_);
    402     dispatcher->write_offset_ = state->write_offset;
    403     dispatcher->available_capacity_ = state->available_capacity;
    404     dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
    405     dispatcher->InitializeNoLock();
    406   }
    407 
    408   return dispatcher;
    409 }
    410 
    411 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
    412   DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ &&
    413          !ring_buffer_mapping_);
    414 }
    415 
    416 void DataPipeProducerDispatcher::InitializeNoLock() {
    417   lock_.AssertAcquired();
    418 
    419   if (shared_ring_buffer_) {
    420     ring_buffer_mapping_ =
    421         shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
    422     if (!ring_buffer_mapping_) {
    423       DLOG(ERROR) << "Failed to map shared buffer.";
    424       shared_ring_buffer_ = nullptr;
    425     }
    426   }
    427 
    428   base::AutoUnlock unlock(lock_);
    429   node_controller_->SetPortObserver(
    430       control_port_,
    431       make_scoped_refptr(new PortObserverThunk(this)));
    432 }
    433 
    434 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
    435   lock_.AssertAcquired();
    436   if (is_closed_ || in_transit_)
    437     return MOJO_RESULT_INVALID_ARGUMENT;
    438   is_closed_ = true;
    439   ring_buffer_mapping_.reset();
    440   shared_ring_buffer_ = nullptr;
    441 
    442   awakable_list_.CancelAll();
    443   if (!transferred_) {
    444     base::AutoUnlock unlock(lock_);
    445     node_controller_->ClosePort(control_port_);
    446   }
    447 
    448   return MOJO_RESULT_OK;
    449 }
    450 
    451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
    452     const {
    453   lock_.AssertAcquired();
    454   HandleSignalsState rv;
    455   if (!peer_closed_) {
    456     if (!in_two_phase_write_ && shared_ring_buffer_ &&
    457         available_capacity_ > 0)
    458       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
    459     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
    460   } else {
    461     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
    462   }
    463   rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
    464   return rv;
    465 }
    466 
    467 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
    468   DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: "
    469            << num_bytes << " bytes written. [control_port="
    470            << control_port_.name() << "]";
    471 
    472   SendDataPipeControlMessage(node_controller_, control_port_,
    473                              DataPipeCommand::DATA_WAS_WRITTEN, num_bytes);
    474 }
    475 
    476 void DataPipeProducerDispatcher::OnPortStatusChanged() {
    477   DCHECK(RequestContext::current());
    478 
    479   base::AutoLock lock(lock_);
    480 
    481   // We stop observing the control port as soon it's transferred, but this can
    482   // race with events which are raised right before that happens. This is fine
    483   // to ignore.
    484   if (transferred_)
    485     return;
    486 
    487   DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
    488 
    489   UpdateSignalsStateNoLock();
    490 }
    491 
    492 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
    493   lock_.AssertAcquired();
    494 
    495   bool was_peer_closed = peer_closed_;
    496   size_t previous_capacity = available_capacity_;
    497 
    498   ports::PortStatus port_status;
    499   int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
    500   if (rv != ports::OK || !port_status.receiving_messages) {
    501     DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure"
    502              << " [control_port=" << control_port_.name() << "]";
    503     peer_closed_ = true;
    504   } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
    505     ports::ScopedMessage message;
    506     do {
    507       int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
    508                                                       &message);
    509       if (rv != ports::OK)
    510         peer_closed_ = true;
    511       if (message) {
    512         if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
    513           peer_closed_ = true;
    514           break;
    515         }
    516 
    517         const DataPipeControlMessage* m =
    518             static_cast<const DataPipeControlMessage*>(
    519                 message->payload_bytes());
    520 
    521         if (m->command != DataPipeCommand::DATA_WAS_READ) {
    522           DLOG(ERROR) << "Unexpected message from consumer.";
    523           peer_closed_ = true;
    524           break;
    525         }
    526 
    527         if (static_cast<size_t>(available_capacity_) + m->num_bytes >
    528               options_.capacity_num_bytes) {
    529           DLOG(ERROR) << "Consumer claims to have read too many bytes.";
    530           break;
    531         }
    532 
    533         DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that "
    534                  << m->num_bytes << " bytes were read. [control_port="
    535                  << control_port_.name() << "]";
    536 
    537         available_capacity_ += m->num_bytes;
    538       }
    539     } while (message);
    540   }
    541 
    542   if (peer_closed_ != was_peer_closed ||
    543       available_capacity_ != previous_capacity) {
    544     awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
    545   }
    546 }
    547 
    548 }  // namespace edk
    549 }  // namespace mojo
    550