Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
      6 
      7 #include <stddef.h>
      8 #include <stdint.h>
      9 
     10 #include <algorithm>
     11 #include <limits>
     12 #include <utility>
     13 
     14 #include "base/bind.h"
     15 #include "base/logging.h"
     16 #include "base/memory/ref_counted.h"
     17 #include "base/message_loop/message_loop.h"
     18 #include "mojo/edk/embedder/embedder_internal.h"
     19 #include "mojo/edk/embedder/platform_shared_buffer.h"
     20 #include "mojo/edk/system/core.h"
     21 #include "mojo/edk/system/data_pipe_control_message.h"
     22 #include "mojo/edk/system/node_controller.h"
     23 #include "mojo/edk/system/ports_message.h"
     24 #include "mojo/edk/system/request_context.h"
     25 #include "mojo/public/c/system/data_pipe.h"
     26 
     27 namespace mojo {
     28 namespace edk {
     29 
     30 namespace {
     31 
     32 const uint8_t kFlagPeerClosed = 0x01;
     33 
     34 #pragma pack(push, 1)
     35 
     36 struct SerializedState {
     37   MojoCreateDataPipeOptions options;
     38   uint64_t pipe_id;
     39   uint32_t read_offset;
     40   uint32_t bytes_available;
     41   uint8_t flags;
     42   char padding[7];
     43 };
     44 
     45 static_assert(sizeof(SerializedState) % 8 == 0,
     46               "Invalid SerializedState size.");
     47 
     48 #pragma pack(pop)
     49 
     50 }  // namespace
     51 
     52 // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
     53 // reference to the dispatcher to ensure it lives as long as the observed port.
     54 class DataPipeConsumerDispatcher::PortObserverThunk
     55     : public NodeController::PortObserver {
     56  public:
     57   explicit PortObserverThunk(
     58       scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
     59       : dispatcher_(dispatcher) {}
     60 
     61  private:
     62   ~PortObserverThunk() override {}
     63 
     64   // NodeController::PortObserver:
     65   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
     66 
     67   scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
     68 
     69   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
     70 };
     71 
     72 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
     73     NodeController* node_controller,
     74     const ports::PortRef& control_port,
     75     scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
     76     const MojoCreateDataPipeOptions& options,
     77     bool initialized,
     78     uint64_t pipe_id)
     79     : options_(options),
     80       node_controller_(node_controller),
     81       control_port_(control_port),
     82       pipe_id_(pipe_id),
     83       shared_ring_buffer_(shared_ring_buffer) {
     84   if (initialized) {
     85     base::AutoLock lock(lock_);
     86     InitializeNoLock();
     87   }
     88 }
     89 
     90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
     91   return Type::DATA_PIPE_CONSUMER;
     92 }
     93 
     94 MojoResult DataPipeConsumerDispatcher::Close() {
     95   base::AutoLock lock(lock_);
     96   DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
     97   return CloseNoLock();
     98 }
     99 
    100 
    101 MojoResult DataPipeConsumerDispatcher::Watch(
    102     MojoHandleSignals signals,
    103     const Watcher::WatchCallback& callback,
    104     uintptr_t context) {
    105   base::AutoLock lock(lock_);
    106 
    107   if (is_closed_ || in_transit_)
    108     return MOJO_RESULT_INVALID_ARGUMENT;
    109 
    110   return awakable_list_.AddWatcher(
    111       signals, callback, context, GetHandleSignalsStateNoLock());
    112 }
    113 
    114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
    115   base::AutoLock lock(lock_);
    116 
    117   if (is_closed_ || in_transit_)
    118     return MOJO_RESULT_INVALID_ARGUMENT;
    119 
    120   return awakable_list_.RemoveWatcher(context);
    121 }
    122 
    123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
    124                                                 uint32_t* num_bytes,
    125                                                 MojoReadDataFlags flags) {
    126   base::AutoLock lock(lock_);
    127   new_data_available_ = false;
    128 
    129   if (!shared_ring_buffer_ || in_transit_)
    130     return MOJO_RESULT_INVALID_ARGUMENT;
    131 
    132   if (in_two_phase_read_)
    133     return MOJO_RESULT_BUSY;
    134 
    135   if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
    136     if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
    137         (flags & MOJO_READ_DATA_FLAG_DISCARD))
    138       return MOJO_RESULT_INVALID_ARGUMENT;
    139     DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD));  // Handled above.
    140     DVLOG_IF(2, elements)
    141         << "Query mode: ignoring non-null |elements|";
    142     *num_bytes = static_cast<uint32_t>(bytes_available_);
    143     return MOJO_RESULT_OK;
    144   }
    145 
    146   bool discard = false;
    147   if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
    148     // These flags are mutally exclusive.
    149     if (flags & MOJO_READ_DATA_FLAG_PEEK)
    150       return MOJO_RESULT_INVALID_ARGUMENT;
    151     DVLOG_IF(2, elements)
    152         << "Discard mode: ignoring non-null |elements|";
    153     discard = true;
    154   }
    155 
    156   uint32_t max_num_bytes_to_read = *num_bytes;
    157   if (max_num_bytes_to_read % options_.element_num_bytes != 0)
    158     return MOJO_RESULT_INVALID_ARGUMENT;
    159 
    160   bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
    161   uint32_t min_num_bytes_to_read =
    162       all_or_none ? max_num_bytes_to_read : 0;
    163 
    164   if (min_num_bytes_to_read > bytes_available_) {
    165     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
    166                         : MOJO_RESULT_OUT_OF_RANGE;
    167   }
    168 
    169   uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
    170   if (bytes_to_read == 0) {
    171     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
    172                         : MOJO_RESULT_SHOULD_WAIT;
    173   }
    174 
    175   if (!discard) {
    176     uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
    177     CHECK(data);
    178 
    179     uint8_t* destination = static_cast<uint8_t*>(elements);
    180     CHECK(destination);
    181 
    182     DCHECK_LE(read_offset_, options_.capacity_num_bytes);
    183     uint32_t tail_bytes_to_copy =
    184         std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
    185     uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
    186     if (tail_bytes_to_copy > 0)
    187       memcpy(destination, data + read_offset_, tail_bytes_to_copy);
    188     if (head_bytes_to_copy > 0)
    189       memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
    190   }
    191   *num_bytes = bytes_to_read;
    192 
    193   bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
    194   if (discard || !peek) {
    195     read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
    196     bytes_available_ -= bytes_to_read;
    197 
    198     base::AutoUnlock unlock(lock_);
    199     NotifyRead(bytes_to_read);
    200   }
    201 
    202   return MOJO_RESULT_OK;
    203 }
    204 
    205 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
    206                                                      uint32_t* buffer_num_bytes,
    207                                                      MojoReadDataFlags flags) {
    208   base::AutoLock lock(lock_);
    209   new_data_available_ = false;
    210   if (!shared_ring_buffer_ || in_transit_)
    211     return MOJO_RESULT_INVALID_ARGUMENT;
    212 
    213   if (in_two_phase_read_)
    214     return MOJO_RESULT_BUSY;
    215 
    216   // These flags may not be used in two-phase mode.
    217   if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
    218       (flags & MOJO_READ_DATA_FLAG_QUERY) ||
    219       (flags & MOJO_READ_DATA_FLAG_PEEK))
    220     return MOJO_RESULT_INVALID_ARGUMENT;
    221 
    222   if (bytes_available_ == 0) {
    223     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
    224                         : MOJO_RESULT_SHOULD_WAIT;
    225   }
    226 
    227   DCHECK_LT(read_offset_, options_.capacity_num_bytes);
    228   uint32_t bytes_to_read = std::min(bytes_available_,
    229                                     options_.capacity_num_bytes - read_offset_);
    230 
    231   CHECK(ring_buffer_mapping_);
    232   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
    233   CHECK(data);
    234 
    235   in_two_phase_read_ = true;
    236   *buffer = data + read_offset_;
    237   *buffer_num_bytes = bytes_to_read;
    238   two_phase_max_bytes_read_ = bytes_to_read;
    239 
    240   return MOJO_RESULT_OK;
    241 }
    242 
    243 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
    244   base::AutoLock lock(lock_);
    245   if (!in_two_phase_read_)
    246     return MOJO_RESULT_FAILED_PRECONDITION;
    247 
    248   if (in_transit_)
    249     return MOJO_RESULT_INVALID_ARGUMENT;
    250 
    251   CHECK(shared_ring_buffer_);
    252 
    253   HandleSignalsState old_state = GetHandleSignalsStateNoLock();
    254   MojoResult rv;
    255   if (num_bytes_read > two_phase_max_bytes_read_ ||
    256       num_bytes_read % options_.element_num_bytes != 0) {
    257     rv = MOJO_RESULT_INVALID_ARGUMENT;
    258   } else {
    259     rv = MOJO_RESULT_OK;
    260     read_offset_ =
    261         (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
    262 
    263     DCHECK_GE(bytes_available_, num_bytes_read);
    264     bytes_available_ -= num_bytes_read;
    265 
    266     base::AutoUnlock unlock(lock_);
    267     NotifyRead(num_bytes_read);
    268   }
    269 
    270   in_two_phase_read_ = false;
    271   two_phase_max_bytes_read_ = 0;
    272 
    273   HandleSignalsState new_state = GetHandleSignalsStateNoLock();
    274   if (!new_state.equals(old_state))
    275     awakable_list_.AwakeForStateChange(new_state);
    276 
    277   return rv;
    278 }
    279 
    280 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
    281   base::AutoLock lock(lock_);
    282   return GetHandleSignalsStateNoLock();
    283 }
    284 
    285 MojoResult DataPipeConsumerDispatcher::AddAwakable(
    286     Awakable* awakable,
    287     MojoHandleSignals signals,
    288     uintptr_t context,
    289     HandleSignalsState* signals_state) {
    290   base::AutoLock lock(lock_);
    291   if (!shared_ring_buffer_ || in_transit_) {
    292     if (signals_state)
    293       *signals_state = HandleSignalsState();
    294     return MOJO_RESULT_INVALID_ARGUMENT;
    295   }
    296   UpdateSignalsStateNoLock();
    297   HandleSignalsState state = GetHandleSignalsStateNoLock();
    298   if (state.satisfies(signals)) {
    299     if (signals_state)
    300       *signals_state = state;
    301     return MOJO_RESULT_ALREADY_EXISTS;
    302   }
    303   if (!state.can_satisfy(signals)) {
    304     if (signals_state)
    305       *signals_state = state;
    306     return MOJO_RESULT_FAILED_PRECONDITION;
    307   }
    308 
    309   awakable_list_.Add(awakable, signals, context);
    310   return MOJO_RESULT_OK;
    311 }
    312 
    313 void DataPipeConsumerDispatcher::RemoveAwakable(
    314     Awakable* awakable,
    315     HandleSignalsState* signals_state) {
    316   base::AutoLock lock(lock_);
    317   if ((!shared_ring_buffer_ || in_transit_) && signals_state)
    318     *signals_state = HandleSignalsState();
    319   else if (signals_state)
    320     *signals_state = GetHandleSignalsStateNoLock();
    321   awakable_list_.Remove(awakable);
    322 }
    323 
    324 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
    325                                                 uint32_t* num_ports,
    326                                                 uint32_t* num_handles) {
    327   base::AutoLock lock(lock_);
    328   DCHECK(in_transit_);
    329   *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
    330   *num_ports = 1;
    331   *num_handles = 1;
    332 }
    333 
    334 bool DataPipeConsumerDispatcher::EndSerialize(
    335     void* destination,
    336     ports::PortName* ports,
    337     PlatformHandle* platform_handles) {
    338   SerializedState* state = static_cast<SerializedState*>(destination);
    339   memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
    340   memset(state->padding, 0, sizeof(state->padding));
    341 
    342   base::AutoLock lock(lock_);
    343   DCHECK(in_transit_);
    344   state->pipe_id = pipe_id_;
    345   state->read_offset = read_offset_;
    346   state->bytes_available = bytes_available_;
    347   state->flags = peer_closed_ ? kFlagPeerClosed : 0;
    348 
    349   ports[0] = control_port_.name();
    350 
    351   buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
    352   platform_handles[0] = buffer_handle_for_transit_.get();
    353 
    354   return true;
    355 }
    356 
    357 bool DataPipeConsumerDispatcher::BeginTransit() {
    358   base::AutoLock lock(lock_);
    359   if (in_transit_)
    360     return false;
    361   in_transit_ = !in_two_phase_read_;
    362   return in_transit_;
    363 }
    364 
    365 void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
    366   node_controller_->SetPortObserver(control_port_, nullptr);
    367 
    368   base::AutoLock lock(lock_);
    369   DCHECK(in_transit_);
    370   in_transit_ = false;
    371   transferred_ = true;
    372   ignore_result(buffer_handle_for_transit_.release());
    373   CloseNoLock();
    374 }
    375 
    376 void DataPipeConsumerDispatcher::CancelTransit() {
    377   base::AutoLock lock(lock_);
    378   DCHECK(in_transit_);
    379   in_transit_ = false;
    380   buffer_handle_for_transit_.reset();
    381   UpdateSignalsStateNoLock();
    382 }
    383 
    384 // static
    385 scoped_refptr<DataPipeConsumerDispatcher>
    386 DataPipeConsumerDispatcher::Deserialize(const void* data,
    387                                         size_t num_bytes,
    388                                         const ports::PortName* ports,
    389                                         size_t num_ports,
    390                                         PlatformHandle* handles,
    391                                         size_t num_handles) {
    392   if (num_ports != 1 || num_handles != 1 ||
    393       num_bytes != sizeof(SerializedState)) {
    394     return nullptr;
    395   }
    396 
    397   const SerializedState* state = static_cast<const SerializedState*>(data);
    398 
    399   NodeController* node_controller = internal::g_core->GetNodeController();
    400   ports::PortRef port;
    401   if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
    402     return nullptr;
    403 
    404   PlatformHandle buffer_handle;
    405   std::swap(buffer_handle, handles[0]);
    406   scoped_refptr<PlatformSharedBuffer> ring_buffer =
    407       PlatformSharedBuffer::CreateFromPlatformHandle(
    408           state->options.capacity_num_bytes,
    409           false /* read_only */,
    410           ScopedPlatformHandle(buffer_handle));
    411   if (!ring_buffer) {
    412     DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
    413     return nullptr;
    414   }
    415 
    416   scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
    417       new DataPipeConsumerDispatcher(node_controller, port, ring_buffer,
    418                                      state->options, false /* initialized */,
    419                                      state->pipe_id);
    420 
    421   {
    422     base::AutoLock lock(dispatcher->lock_);
    423     dispatcher->read_offset_ = state->read_offset;
    424     dispatcher->bytes_available_ = state->bytes_available;
    425     dispatcher->new_data_available_ = state->bytes_available > 0;
    426     dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
    427     dispatcher->InitializeNoLock();
    428     dispatcher->UpdateSignalsStateNoLock();
    429   }
    430 
    431   return dispatcher;
    432 }
    433 
    434 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
    435   DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ &&
    436          !in_transit_);
    437 }
    438 
    439 void DataPipeConsumerDispatcher::InitializeNoLock() {
    440   lock_.AssertAcquired();
    441 
    442   if (shared_ring_buffer_) {
    443     DCHECK(!ring_buffer_mapping_);
    444     ring_buffer_mapping_ =
    445         shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
    446     if (!ring_buffer_mapping_) {
    447       DLOG(ERROR) << "Failed to map shared buffer.";
    448       shared_ring_buffer_ = nullptr;
    449     }
    450   }
    451 
    452   base::AutoUnlock unlock(lock_);
    453   node_controller_->SetPortObserver(
    454       control_port_,
    455       make_scoped_refptr(new PortObserverThunk(this)));
    456 }
    457 
    458 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
    459   lock_.AssertAcquired();
    460   if (is_closed_ || in_transit_)
    461     return MOJO_RESULT_INVALID_ARGUMENT;
    462   is_closed_ = true;
    463   ring_buffer_mapping_.reset();
    464   shared_ring_buffer_ = nullptr;
    465 
    466   awakable_list_.CancelAll();
    467   if (!transferred_) {
    468     base::AutoUnlock unlock(lock_);
    469     node_controller_->ClosePort(control_port_);
    470   }
    471 
    472   return MOJO_RESULT_OK;
    473 }
    474 
    475 HandleSignalsState
    476 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
    477   lock_.AssertAcquired();
    478 
    479   HandleSignalsState rv;
    480   if (shared_ring_buffer_ && bytes_available_) {
    481     if (!in_two_phase_read_) {
    482       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    483       if (new_data_available_)
    484         rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
    485     }
    486     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    487   } else if (!peer_closed_ && shared_ring_buffer_) {
    488     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    489   }
    490 
    491   if (shared_ring_buffer_) {
    492     if (new_data_available_ || !peer_closed_)
    493       rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
    494   }
    495 
    496   if (peer_closed_)
    497     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
    498   rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
    499 
    500   return rv;
    501 }
    502 
    503 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
    504   DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: "
    505            << num_bytes << " bytes read. [control_port="
    506            << control_port_.name() << "]";
    507 
    508   SendDataPipeControlMessage(node_controller_, control_port_,
    509                              DataPipeCommand::DATA_WAS_READ, num_bytes);
    510 }
    511 
    512 void DataPipeConsumerDispatcher::OnPortStatusChanged() {
    513   DCHECK(RequestContext::current());
    514 
    515   base::AutoLock lock(lock_);
    516 
    517   // We stop observing the control port as soon it's transferred, but this can
    518   // race with events which are raised right before that happens. This is fine
    519   // to ignore.
    520   if (transferred_)
    521     return;
    522 
    523   DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
    524 
    525   UpdateSignalsStateNoLock();
    526 }
    527 
    528 void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
    529   lock_.AssertAcquired();
    530 
    531   bool was_peer_closed = peer_closed_;
    532   size_t previous_bytes_available = bytes_available_;
    533 
    534   ports::PortStatus port_status;
    535   int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
    536   if (rv != ports::OK || !port_status.receiving_messages) {
    537     DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
    538              << " [control_port=" << control_port_.name() << "]";
    539     peer_closed_ = true;
    540   } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
    541     ports::ScopedMessage message;
    542     do {
    543       int rv = node_controller_->node()->GetMessage(
    544           control_port_, &message, nullptr);
    545       if (rv != ports::OK)
    546         peer_closed_ = true;
    547       if (message) {
    548         if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
    549           peer_closed_ = true;
    550           break;
    551         }
    552 
    553         const DataPipeControlMessage* m =
    554             static_cast<const DataPipeControlMessage*>(
    555                 message->payload_bytes());
    556 
    557         if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
    558           DLOG(ERROR) << "Unexpected control message from producer.";
    559           peer_closed_ = true;
    560           break;
    561         }
    562 
    563         if (static_cast<size_t>(bytes_available_) + m->num_bytes >
    564               options_.capacity_num_bytes) {
    565           DLOG(ERROR) << "Producer claims to have written too many bytes.";
    566           peer_closed_ = true;
    567           break;
    568         }
    569 
    570         DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
    571                  << m->num_bytes << " bytes were written. [control_port="
    572                  << control_port_.name() << "]";
    573 
    574         bytes_available_ += m->num_bytes;
    575       }
    576     } while (message);
    577   }
    578 
    579   bool has_new_data = bytes_available_ != previous_bytes_available;
    580   if (has_new_data)
    581     new_data_available_ = true;
    582 
    583   if (peer_closed_ != was_peer_closed || has_new_data) {
    584     awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
    585   }
    586 }
    587 
    588 }  // namespace edk
    589 }  // namespace mojo
    590