Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
      6 
      7 #include <stddef.h>
      8 #include <stdint.h>
      9 
     10 #include <algorithm>
     11 #include <limits>
     12 #include <utility>
     13 
     14 #include "base/bind.h"
     15 #include "base/logging.h"
     16 #include "base/memory/ref_counted.h"
     17 #include "base/message_loop/message_loop.h"
     18 #include "mojo/edk/embedder/embedder_internal.h"
     19 #include "mojo/edk/embedder/platform_shared_buffer.h"
     20 #include "mojo/edk/system/core.h"
     21 #include "mojo/edk/system/data_pipe_control_message.h"
     22 #include "mojo/edk/system/node_controller.h"
     23 #include "mojo/edk/system/ports_message.h"
     24 #include "mojo/edk/system/request_context.h"
     25 #include "mojo/public/c/system/data_pipe.h"
     26 
     27 namespace mojo {
     28 namespace edk {
     29 
     30 namespace {
     31 
     32 const uint8_t kFlagPeerClosed = 0x01;
     33 
     34 #pragma pack(push, 1)
     35 
     36 struct SerializedState {
     37   MojoCreateDataPipeOptions options;
     38   uint64_t pipe_id;
     39   uint32_t read_offset;
     40   uint32_t bytes_available;
     41   uint8_t flags;
     42   char padding[7];
     43 };
     44 
     45 static_assert(sizeof(SerializedState) % 8 == 0,
     46               "Invalid SerializedState size.");
     47 
     48 #pragma pack(pop)
     49 
     50 }  // namespace
     51 
     52 // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
     53 // reference to the dispatcher to ensure it lives as long as the observed port.
     54 class DataPipeConsumerDispatcher::PortObserverThunk
     55     : public NodeController::PortObserver {
     56  public:
     57   explicit PortObserverThunk(
     58       scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
     59       : dispatcher_(dispatcher) {}
     60 
     61  private:
     62   ~PortObserverThunk() override {}
     63 
     64   // NodeController::PortObserver:
     65   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
     66 
     67   scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
     68 
     69   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
     70 };
     71 
     72 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
     73     NodeController* node_controller,
     74     const ports::PortRef& control_port,
     75     scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
     76     const MojoCreateDataPipeOptions& options,
     77     bool initialized,
     78     uint64_t pipe_id)
     79     : options_(options),
     80       node_controller_(node_controller),
     81       control_port_(control_port),
     82       pipe_id_(pipe_id),
     83       shared_ring_buffer_(shared_ring_buffer) {
     84   if (initialized) {
     85     base::AutoLock lock(lock_);
     86     InitializeNoLock();
     87   }
     88 }
     89 
     90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
     91   return Type::DATA_PIPE_CONSUMER;
     92 }
     93 
     94 MojoResult DataPipeConsumerDispatcher::Close() {
     95   base::AutoLock lock(lock_);
     96   DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
     97   return CloseNoLock();
     98 }
     99 
    100 
    101 MojoResult DataPipeConsumerDispatcher::Watch(
    102     MojoHandleSignals signals,
    103     const Watcher::WatchCallback& callback,
    104     uintptr_t context) {
    105   base::AutoLock lock(lock_);
    106 
    107   if (is_closed_ || in_transit_)
    108     return MOJO_RESULT_INVALID_ARGUMENT;
    109 
    110   return awakable_list_.AddWatcher(
    111       signals, callback, context, GetHandleSignalsStateNoLock());
    112 }
    113 
    114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
    115   base::AutoLock lock(lock_);
    116 
    117   if (is_closed_ || in_transit_)
    118     return MOJO_RESULT_INVALID_ARGUMENT;
    119 
    120   return awakable_list_.RemoveWatcher(context);
    121 }
    122 
    123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
    124                                                 uint32_t* num_bytes,
    125                                                 MojoReadDataFlags flags) {
    126   base::AutoLock lock(lock_);
    127   if (!shared_ring_buffer_ || in_transit_)
    128     return MOJO_RESULT_INVALID_ARGUMENT;
    129 
    130   if (in_two_phase_read_)
    131     return MOJO_RESULT_BUSY;
    132 
    133   if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
    134     if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
    135         (flags & MOJO_READ_DATA_FLAG_DISCARD))
    136       return MOJO_RESULT_INVALID_ARGUMENT;
    137     DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD));  // Handled above.
    138     DVLOG_IF(2, elements)
    139         << "Query mode: ignoring non-null |elements|";
    140     *num_bytes = static_cast<uint32_t>(bytes_available_);
    141     return MOJO_RESULT_OK;
    142   }
    143 
    144   bool discard = false;
    145   if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
    146     // These flags are mutally exclusive.
    147     if (flags & MOJO_READ_DATA_FLAG_PEEK)
    148       return MOJO_RESULT_INVALID_ARGUMENT;
    149     DVLOG_IF(2, elements)
    150         << "Discard mode: ignoring non-null |elements|";
    151     discard = true;
    152   }
    153 
    154   uint32_t max_num_bytes_to_read = *num_bytes;
    155   if (max_num_bytes_to_read % options_.element_num_bytes != 0)
    156     return MOJO_RESULT_INVALID_ARGUMENT;
    157 
    158   bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
    159   uint32_t min_num_bytes_to_read =
    160       all_or_none ? max_num_bytes_to_read : 0;
    161 
    162   if (min_num_bytes_to_read > bytes_available_) {
    163     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
    164                         : MOJO_RESULT_OUT_OF_RANGE;
    165   }
    166 
    167   uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
    168   if (bytes_to_read == 0) {
    169     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
    170                         : MOJO_RESULT_SHOULD_WAIT;
    171   }
    172 
    173   if (!discard) {
    174     uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
    175     CHECK(data);
    176 
    177     uint8_t* destination = static_cast<uint8_t*>(elements);
    178     CHECK(destination);
    179 
    180     DCHECK_LE(read_offset_, options_.capacity_num_bytes);
    181     uint32_t tail_bytes_to_copy =
    182         std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
    183     uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
    184     if (tail_bytes_to_copy > 0)
    185       memcpy(destination, data + read_offset_, tail_bytes_to_copy);
    186     if (head_bytes_to_copy > 0)
    187       memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
    188   }
    189   *num_bytes = bytes_to_read;
    190 
    191   bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
    192   if (discard || !peek) {
    193     read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
    194     bytes_available_ -= bytes_to_read;
    195 
    196     base::AutoUnlock unlock(lock_);
    197     NotifyRead(bytes_to_read);
    198   }
    199 
    200   return MOJO_RESULT_OK;
    201 }
    202 
    203 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
    204                                                      uint32_t* buffer_num_bytes,
    205                                                      MojoReadDataFlags flags) {
    206   base::AutoLock lock(lock_);
    207   if (!shared_ring_buffer_ || in_transit_)
    208     return MOJO_RESULT_INVALID_ARGUMENT;
    209 
    210   if (in_two_phase_read_)
    211     return MOJO_RESULT_BUSY;
    212 
    213   // These flags may not be used in two-phase mode.
    214   if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
    215       (flags & MOJO_READ_DATA_FLAG_QUERY) ||
    216       (flags & MOJO_READ_DATA_FLAG_PEEK))
    217     return MOJO_RESULT_INVALID_ARGUMENT;
    218 
    219   if (bytes_available_ == 0) {
    220     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
    221                         : MOJO_RESULT_SHOULD_WAIT;
    222   }
    223 
    224   DCHECK_LT(read_offset_, options_.capacity_num_bytes);
    225   uint32_t bytes_to_read = std::min(bytes_available_,
    226                                     options_.capacity_num_bytes - read_offset_);
    227 
    228   CHECK(ring_buffer_mapping_);
    229   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
    230   CHECK(data);
    231 
    232   in_two_phase_read_ = true;
    233   *buffer = data + read_offset_;
    234   *buffer_num_bytes = bytes_to_read;
    235   two_phase_max_bytes_read_ = bytes_to_read;
    236 
    237   return MOJO_RESULT_OK;
    238 }
    239 
    240 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
    241   base::AutoLock lock(lock_);
    242   if (!in_two_phase_read_)
    243     return MOJO_RESULT_FAILED_PRECONDITION;
    244 
    245   if (in_transit_)
    246     return MOJO_RESULT_INVALID_ARGUMENT;
    247 
    248   CHECK(shared_ring_buffer_);
    249 
    250   HandleSignalsState old_state = GetHandleSignalsStateNoLock();
    251   MojoResult rv;
    252   if (num_bytes_read > two_phase_max_bytes_read_ ||
    253       num_bytes_read % options_.element_num_bytes != 0) {
    254     rv = MOJO_RESULT_INVALID_ARGUMENT;
    255   } else {
    256     rv = MOJO_RESULT_OK;
    257     read_offset_ =
    258         (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
    259 
    260     DCHECK_GE(bytes_available_, num_bytes_read);
    261     bytes_available_ -= num_bytes_read;
    262 
    263     base::AutoUnlock unlock(lock_);
    264     NotifyRead(num_bytes_read);
    265   }
    266 
    267   in_two_phase_read_ = false;
    268   two_phase_max_bytes_read_ = 0;
    269 
    270   HandleSignalsState new_state = GetHandleSignalsStateNoLock();
    271   if (!new_state.equals(old_state))
    272     awakable_list_.AwakeForStateChange(new_state);
    273 
    274   return rv;
    275 }
    276 
    277 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
    278   base::AutoLock lock(lock_);
    279   return GetHandleSignalsStateNoLock();
    280 }
    281 
    282 MojoResult DataPipeConsumerDispatcher::AddAwakable(
    283     Awakable* awakable,
    284     MojoHandleSignals signals,
    285     uintptr_t context,
    286     HandleSignalsState* signals_state) {
    287   base::AutoLock lock(lock_);
    288   if (!shared_ring_buffer_ || in_transit_) {
    289     if (signals_state)
    290       *signals_state = HandleSignalsState();
    291     return MOJO_RESULT_INVALID_ARGUMENT;
    292   }
    293   UpdateSignalsStateNoLock();
    294   HandleSignalsState state = GetHandleSignalsStateNoLock();
    295   if (state.satisfies(signals)) {
    296     if (signals_state)
    297       *signals_state = state;
    298     return MOJO_RESULT_ALREADY_EXISTS;
    299   }
    300   if (!state.can_satisfy(signals)) {
    301     if (signals_state)
    302       *signals_state = state;
    303     return MOJO_RESULT_FAILED_PRECONDITION;
    304   }
    305 
    306   awakable_list_.Add(awakable, signals, context);
    307   return MOJO_RESULT_OK;
    308 }
    309 
    310 void DataPipeConsumerDispatcher::RemoveAwakable(
    311     Awakable* awakable,
    312     HandleSignalsState* signals_state) {
    313   base::AutoLock lock(lock_);
    314   if ((!shared_ring_buffer_ || in_transit_) && signals_state)
    315     *signals_state = HandleSignalsState();
    316   else if (signals_state)
    317     *signals_state = GetHandleSignalsStateNoLock();
    318   awakable_list_.Remove(awakable);
    319 }
    320 
    321 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
    322                                                 uint32_t* num_ports,
    323                                                 uint32_t* num_handles) {
    324   base::AutoLock lock(lock_);
    325   DCHECK(in_transit_);
    326   *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
    327   *num_ports = 1;
    328   *num_handles = 1;
    329 }
    330 
    331 bool DataPipeConsumerDispatcher::EndSerialize(
    332     void* destination,
    333     ports::PortName* ports,
    334     PlatformHandle* platform_handles) {
    335   SerializedState* state = static_cast<SerializedState*>(destination);
    336   memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
    337   memset(state->padding, 0, sizeof(state->padding));
    338 
    339   base::AutoLock lock(lock_);
    340   DCHECK(in_transit_);
    341   state->pipe_id = pipe_id_;
    342   state->read_offset = read_offset_;
    343   state->bytes_available = bytes_available_;
    344   state->flags = peer_closed_ ? kFlagPeerClosed : 0;
    345 
    346   ports[0] = control_port_.name();
    347 
    348   buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
    349   platform_handles[0] = buffer_handle_for_transit_.get();
    350 
    351   return true;
    352 }
    353 
    354 bool DataPipeConsumerDispatcher::BeginTransit() {
    355   base::AutoLock lock(lock_);
    356   if (in_transit_)
    357     return false;
    358   in_transit_ = !in_two_phase_read_;
    359   return in_transit_;
    360 }
    361 
    362 void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
    363   node_controller_->SetPortObserver(control_port_, nullptr);
    364 
    365   base::AutoLock lock(lock_);
    366   DCHECK(in_transit_);
    367   in_transit_ = false;
    368   transferred_ = true;
    369   ignore_result(buffer_handle_for_transit_.release());
    370   CloseNoLock();
    371 }
    372 
    373 void DataPipeConsumerDispatcher::CancelTransit() {
    374   base::AutoLock lock(lock_);
    375   DCHECK(in_transit_);
    376   in_transit_ = false;
    377   buffer_handle_for_transit_.reset();
    378   UpdateSignalsStateNoLock();
    379 }
    380 
    381 // static
    382 scoped_refptr<DataPipeConsumerDispatcher>
    383 DataPipeConsumerDispatcher::Deserialize(const void* data,
    384                                         size_t num_bytes,
    385                                         const ports::PortName* ports,
    386                                         size_t num_ports,
    387                                         PlatformHandle* handles,
    388                                         size_t num_handles) {
    389   if (num_ports != 1 || num_handles != 1 ||
    390       num_bytes != sizeof(SerializedState)) {
    391     return nullptr;
    392   }
    393 
    394   const SerializedState* state = static_cast<const SerializedState*>(data);
    395 
    396   NodeController* node_controller = internal::g_core->GetNodeController();
    397   ports::PortRef port;
    398   if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
    399     return nullptr;
    400 
    401   PlatformHandle buffer_handle;
    402   std::swap(buffer_handle, handles[0]);
    403   scoped_refptr<PlatformSharedBuffer> ring_buffer =
    404       PlatformSharedBuffer::CreateFromPlatformHandle(
    405           state->options.capacity_num_bytes,
    406           false /* read_only */,
    407           ScopedPlatformHandle(buffer_handle));
    408   if (!ring_buffer) {
    409     DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
    410     return nullptr;
    411   }
    412 
    413   scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
    414       new DataPipeConsumerDispatcher(node_controller, port, ring_buffer,
    415                                      state->options, false /* initialized */,
    416                                      state->pipe_id);
    417 
    418   {
    419     base::AutoLock lock(dispatcher->lock_);
    420     dispatcher->read_offset_ = state->read_offset;
    421     dispatcher->bytes_available_ = state->bytes_available;
    422     dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
    423     dispatcher->InitializeNoLock();
    424   }
    425 
    426   return dispatcher;
    427 }
    428 
    429 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
    430   DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ &&
    431          !in_transit_);
    432 }
    433 
    434 void DataPipeConsumerDispatcher::InitializeNoLock() {
    435   lock_.AssertAcquired();
    436 
    437   if (shared_ring_buffer_) {
    438     DCHECK(!ring_buffer_mapping_);
    439     ring_buffer_mapping_ =
    440         shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
    441     if (!ring_buffer_mapping_) {
    442       DLOG(ERROR) << "Failed to map shared buffer.";
    443       shared_ring_buffer_ = nullptr;
    444     }
    445   }
    446 
    447   base::AutoUnlock unlock(lock_);
    448   node_controller_->SetPortObserver(
    449       control_port_,
    450       make_scoped_refptr(new PortObserverThunk(this)));
    451 }
    452 
    453 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
    454   lock_.AssertAcquired();
    455   if (is_closed_ || in_transit_)
    456     return MOJO_RESULT_INVALID_ARGUMENT;
    457   is_closed_ = true;
    458   ring_buffer_mapping_.reset();
    459   shared_ring_buffer_ = nullptr;
    460 
    461   awakable_list_.CancelAll();
    462   if (!transferred_) {
    463     base::AutoUnlock unlock(lock_);
    464     node_controller_->ClosePort(control_port_);
    465   }
    466 
    467   return MOJO_RESULT_OK;
    468 }
    469 
    470 HandleSignalsState
    471 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
    472   lock_.AssertAcquired();
    473 
    474   HandleSignalsState rv;
    475   if (shared_ring_buffer_ && bytes_available_) {
    476     if (!in_two_phase_read_)
    477       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    478     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    479   } else if (!peer_closed_ && shared_ring_buffer_) {
    480     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    481   }
    482 
    483   if (peer_closed_)
    484     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
    485   rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
    486   return rv;
    487 }
    488 
    489 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
    490   DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: "
    491            << num_bytes << " bytes read. [control_port="
    492            << control_port_.name() << "]";
    493 
    494   SendDataPipeControlMessage(node_controller_, control_port_,
    495                              DataPipeCommand::DATA_WAS_READ, num_bytes);
    496 }
    497 
    498 void DataPipeConsumerDispatcher::OnPortStatusChanged() {
    499   DCHECK(RequestContext::current());
    500 
    501   base::AutoLock lock(lock_);
    502 
    503   // We stop observing the control port as soon it's transferred, but this can
    504   // race with events which are raised right before that happens. This is fine
    505   // to ignore.
    506   if (transferred_)
    507     return;
    508 
    509   DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
    510 
    511   UpdateSignalsStateNoLock();
    512 }
    513 
    514 void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
    515   lock_.AssertAcquired();
    516 
    517   bool was_peer_closed = peer_closed_;
    518   size_t previous_bytes_available = bytes_available_;
    519 
    520   ports::PortStatus port_status;
    521   int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
    522   if (rv != ports::OK || !port_status.receiving_messages) {
    523     DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
    524              << " [control_port=" << control_port_.name() << "]";
    525     peer_closed_ = true;
    526   } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
    527     ports::ScopedMessage message;
    528     do {
    529       int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
    530                                                       &message);
    531       if (rv != ports::OK)
    532         peer_closed_ = true;
    533       if (message) {
    534         if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
    535           peer_closed_ = true;
    536           break;
    537         }
    538 
    539         const DataPipeControlMessage* m =
    540             static_cast<const DataPipeControlMessage*>(
    541                 message->payload_bytes());
    542 
    543         if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
    544           DLOG(ERROR) << "Unexpected control message from producer.";
    545           peer_closed_ = true;
    546           break;
    547         }
    548 
    549         if (static_cast<size_t>(bytes_available_) + m->num_bytes >
    550               options_.capacity_num_bytes) {
    551           DLOG(ERROR) << "Producer claims to have written too many bytes.";
    552           peer_closed_ = true;
    553           break;
    554         }
    555 
    556         DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
    557                  << m->num_bytes << " bytes were written. [control_port="
    558                  << control_port_.name() << "]";
    559 
    560         bytes_available_ += m->num_bytes;
    561       }
    562     } while (message);
    563   }
    564 
    565   if (peer_closed_ != was_peer_closed ||
    566       bytes_available_ != previous_bytes_available) {
    567     awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
    568   }
    569 }
    570 
    571 }  // namespace edk
    572 }  // namespace mojo
    573