Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/data_pipe.h"
      6 
      7 #include <string.h>
      8 
      9 #include <algorithm>
     10 #include <limits>
     11 
     12 #include "base/compiler_specific.h"
     13 #include "base/logging.h"
     14 #include "mojo/system/constants.h"
     15 #include "mojo/system/memory.h"
     16 #include "mojo/system/options_validation.h"
     17 #include "mojo/system/waiter_list.h"
     18 
     19 namespace mojo {
     20 namespace system {
     21 
     22 // static
     23 const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = {
     24     static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
     25     MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1u,
     26     static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)};
     27 
     28 // static
     29 MojoResult DataPipe::ValidateCreateOptions(
     30     UserPointer<const MojoCreateDataPipeOptions> in_options,
     31     MojoCreateDataPipeOptions* out_options) {
     32   const MojoCreateDataPipeOptionsFlags kKnownFlags =
     33       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;
     34 
     35   *out_options = kDefaultCreateOptions;
     36   if (in_options.IsNull())
     37     return MOJO_RESULT_OK;
     38 
     39   UserOptionsReader<MojoCreateDataPipeOptions> reader(in_options);
     40   if (!reader.is_valid())
     41     return MOJO_RESULT_INVALID_ARGUMENT;
     42 
     43   if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, flags, reader))
     44     return MOJO_RESULT_OK;
     45   if ((reader.options().flags & ~kKnownFlags))
     46     return MOJO_RESULT_UNIMPLEMENTED;
     47   out_options->flags = reader.options().flags;
     48 
     49   // Checks for fields beyond |flags|:
     50 
     51   if (!OPTIONS_STRUCT_HAS_MEMBER(
     52           MojoCreateDataPipeOptions, element_num_bytes, reader))
     53     return MOJO_RESULT_OK;
     54   if (reader.options().element_num_bytes == 0)
     55     return MOJO_RESULT_INVALID_ARGUMENT;
     56   out_options->element_num_bytes = reader.options().element_num_bytes;
     57 
     58   if (!OPTIONS_STRUCT_HAS_MEMBER(
     59           MojoCreateDataPipeOptions, capacity_num_bytes, reader) ||
     60       reader.options().capacity_num_bytes == 0) {
     61     // Round the default capacity down to a multiple of the element size (but at
     62     // least one element).
     63     out_options->capacity_num_bytes =
     64         std::max(static_cast<uint32_t>(kDefaultDataPipeCapacityBytes -
     65                                        (kDefaultDataPipeCapacityBytes %
     66                                         out_options->element_num_bytes)),
     67                  out_options->element_num_bytes);
     68     return MOJO_RESULT_OK;
     69   }
     70   if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0)
     71     return MOJO_RESULT_INVALID_ARGUMENT;
     72   if (reader.options().capacity_num_bytes > kMaxDataPipeCapacityBytes)
     73     return MOJO_RESULT_RESOURCE_EXHAUSTED;
     74   out_options->capacity_num_bytes = reader.options().capacity_num_bytes;
     75 
     76   return MOJO_RESULT_OK;
     77 }
     78 
     79 void DataPipe::ProducerCancelAllWaiters() {
     80   base::AutoLock locker(lock_);
     81   DCHECK(has_local_producer_no_lock());
     82   producer_waiter_list_->CancelAllWaiters();
     83 }
     84 
     85 void DataPipe::ProducerClose() {
     86   base::AutoLock locker(lock_);
     87   DCHECK(producer_open_);
     88   producer_open_ = false;
     89   DCHECK(has_local_producer_no_lock());
     90   producer_waiter_list_.reset();
     91   // Not a bug, except possibly in "user" code.
     92   DVLOG_IF(2, producer_in_two_phase_write_no_lock())
     93       << "Producer closed with active two-phase write";
     94   producer_two_phase_max_num_bytes_written_ = 0;
     95   ProducerCloseImplNoLock();
     96   AwakeConsumerWaitersForStateChangeNoLock(
     97       ConsumerGetHandleSignalsStateImplNoLock());
     98 }
     99 
    100 MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
    101                                        UserPointer<uint32_t> num_bytes,
    102                                        bool all_or_none) {
    103   base::AutoLock locker(lock_);
    104   DCHECK(has_local_producer_no_lock());
    105 
    106   if (producer_in_two_phase_write_no_lock())
    107     return MOJO_RESULT_BUSY;
    108   if (!consumer_open_no_lock())
    109     return MOJO_RESULT_FAILED_PRECONDITION;
    110 
    111   // Returning "busy" takes priority over "invalid argument".
    112   uint32_t max_num_bytes_to_write = num_bytes.Get();
    113   if (max_num_bytes_to_write % element_num_bytes_ != 0)
    114     return MOJO_RESULT_INVALID_ARGUMENT;
    115 
    116   if (max_num_bytes_to_write == 0)
    117     return MOJO_RESULT_OK;  // Nothing to do.
    118 
    119   uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0;
    120 
    121   HandleSignalsState old_consumer_state =
    122       ConsumerGetHandleSignalsStateImplNoLock();
    123   MojoResult rv = ProducerWriteDataImplNoLock(
    124       elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
    125   HandleSignalsState new_consumer_state =
    126       ConsumerGetHandleSignalsStateImplNoLock();
    127   if (!new_consumer_state.equals(old_consumer_state))
    128     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
    129   return rv;
    130 }
    131 
    132 MojoResult DataPipe::ProducerBeginWriteData(
    133     UserPointer<void*> buffer,
    134     UserPointer<uint32_t> buffer_num_bytes,
    135     bool all_or_none) {
    136   base::AutoLock locker(lock_);
    137   DCHECK(has_local_producer_no_lock());
    138 
    139   if (producer_in_two_phase_write_no_lock())
    140     return MOJO_RESULT_BUSY;
    141   if (!consumer_open_no_lock())
    142     return MOJO_RESULT_FAILED_PRECONDITION;
    143 
    144   uint32_t min_num_bytes_to_write = 0;
    145   if (all_or_none) {
    146     min_num_bytes_to_write = buffer_num_bytes.Get();
    147     if (min_num_bytes_to_write % element_num_bytes_ != 0)
    148       return MOJO_RESULT_INVALID_ARGUMENT;
    149   }
    150 
    151   MojoResult rv = ProducerBeginWriteDataImplNoLock(
    152       buffer, buffer_num_bytes, min_num_bytes_to_write);
    153   if (rv != MOJO_RESULT_OK)
    154     return rv;
    155   // Note: No need to awake producer waiters, even though we're going from
    156   // writable to non-writable (since you can't wait on non-writability).
    157   // Similarly, though this may have discarded data (in "may discard" mode),
    158   // making it non-readable, there's still no need to awake consumer waiters.
    159   DCHECK(producer_in_two_phase_write_no_lock());
    160   return MOJO_RESULT_OK;
    161 }
    162 
    163 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
    164   base::AutoLock locker(lock_);
    165   DCHECK(has_local_producer_no_lock());
    166 
    167   if (!producer_in_two_phase_write_no_lock())
    168     return MOJO_RESULT_FAILED_PRECONDITION;
    169   // Note: Allow successful completion of the two-phase write even if the
    170   // consumer has been closed.
    171 
    172   HandleSignalsState old_consumer_state =
    173       ConsumerGetHandleSignalsStateImplNoLock();
    174   MojoResult rv;
    175   if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
    176       num_bytes_written % element_num_bytes_ != 0) {
    177     rv = MOJO_RESULT_INVALID_ARGUMENT;
    178     producer_two_phase_max_num_bytes_written_ = 0;
    179   } else {
    180     rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
    181   }
    182   // Two-phase write ended even on failure.
    183   DCHECK(!producer_in_two_phase_write_no_lock());
    184   // If we're now writable, we *became* writable (since we weren't writable
    185   // during the two-phase write), so awake producer waiters.
    186   HandleSignalsState new_producer_state =
    187       ProducerGetHandleSignalsStateImplNoLock();
    188   if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
    189     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
    190   HandleSignalsState new_consumer_state =
    191       ConsumerGetHandleSignalsStateImplNoLock();
    192   if (!new_consumer_state.equals(old_consumer_state))
    193     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
    194   return rv;
    195 }
    196 
    197 HandleSignalsState DataPipe::ProducerGetHandleSignalsState() {
    198   base::AutoLock locker(lock_);
    199   DCHECK(has_local_producer_no_lock());
    200   return ProducerGetHandleSignalsStateImplNoLock();
    201 }
    202 
    203 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
    204                                        MojoHandleSignals signals,
    205                                        uint32_t context,
    206                                        HandleSignalsState* signals_state) {
    207   base::AutoLock locker(lock_);
    208   DCHECK(has_local_producer_no_lock());
    209 
    210   HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock();
    211   if (producer_state.satisfies(signals)) {
    212     if (signals_state)
    213       *signals_state = producer_state;
    214     return MOJO_RESULT_ALREADY_EXISTS;
    215   }
    216   if (!producer_state.can_satisfy(signals)) {
    217     if (signals_state)
    218       *signals_state = producer_state;
    219     return MOJO_RESULT_FAILED_PRECONDITION;
    220   }
    221 
    222   producer_waiter_list_->AddWaiter(waiter, signals, context);
    223   return MOJO_RESULT_OK;
    224 }
    225 
    226 void DataPipe::ProducerRemoveWaiter(Waiter* waiter,
    227                                     HandleSignalsState* signals_state) {
    228   base::AutoLock locker(lock_);
    229   DCHECK(has_local_producer_no_lock());
    230   producer_waiter_list_->RemoveWaiter(waiter);
    231   if (signals_state)
    232     *signals_state = ProducerGetHandleSignalsStateImplNoLock();
    233 }
    234 
    235 bool DataPipe::ProducerIsBusy() const {
    236   base::AutoLock locker(lock_);
    237   return producer_in_two_phase_write_no_lock();
    238 }
    239 
    240 void DataPipe::ConsumerCancelAllWaiters() {
    241   base::AutoLock locker(lock_);
    242   DCHECK(has_local_consumer_no_lock());
    243   consumer_waiter_list_->CancelAllWaiters();
    244 }
    245 
    246 void DataPipe::ConsumerClose() {
    247   base::AutoLock locker(lock_);
    248   DCHECK(consumer_open_);
    249   consumer_open_ = false;
    250   DCHECK(has_local_consumer_no_lock());
    251   consumer_waiter_list_.reset();
    252   // Not a bug, except possibly in "user" code.
    253   DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
    254       << "Consumer closed with active two-phase read";
    255   consumer_two_phase_max_num_bytes_read_ = 0;
    256   ConsumerCloseImplNoLock();
    257   AwakeProducerWaitersForStateChangeNoLock(
    258       ProducerGetHandleSignalsStateImplNoLock());
    259 }
    260 
    261 MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements,
    262                                       UserPointer<uint32_t> num_bytes,
    263                                       bool all_or_none) {
    264   base::AutoLock locker(lock_);
    265   DCHECK(has_local_consumer_no_lock());
    266 
    267   if (consumer_in_two_phase_read_no_lock())
    268     return MOJO_RESULT_BUSY;
    269 
    270   uint32_t max_num_bytes_to_read = num_bytes.Get();
    271   if (max_num_bytes_to_read % element_num_bytes_ != 0)
    272     return MOJO_RESULT_INVALID_ARGUMENT;
    273 
    274   if (max_num_bytes_to_read == 0)
    275     return MOJO_RESULT_OK;  // Nothing to do.
    276 
    277   uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
    278 
    279   HandleSignalsState old_producer_state =
    280       ProducerGetHandleSignalsStateImplNoLock();
    281   MojoResult rv = ConsumerReadDataImplNoLock(
    282       elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read);
    283   HandleSignalsState new_producer_state =
    284       ProducerGetHandleSignalsStateImplNoLock();
    285   if (!new_producer_state.equals(old_producer_state))
    286     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
    287   return rv;
    288 }
    289 
    290 MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
    291                                          bool all_or_none) {
    292   base::AutoLock locker(lock_);
    293   DCHECK(has_local_consumer_no_lock());
    294 
    295   if (consumer_in_two_phase_read_no_lock())
    296     return MOJO_RESULT_BUSY;
    297 
    298   uint32_t max_num_bytes_to_discard = num_bytes.Get();
    299   if (max_num_bytes_to_discard % element_num_bytes_ != 0)
    300     return MOJO_RESULT_INVALID_ARGUMENT;
    301 
    302   if (max_num_bytes_to_discard == 0)
    303     return MOJO_RESULT_OK;  // Nothing to do.
    304 
    305   uint32_t min_num_bytes_to_discard =
    306       all_or_none ? max_num_bytes_to_discard : 0;
    307 
    308   HandleSignalsState old_producer_state =
    309       ProducerGetHandleSignalsStateImplNoLock();
    310   MojoResult rv = ConsumerDiscardDataImplNoLock(
    311       num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
    312   HandleSignalsState new_producer_state =
    313       ProducerGetHandleSignalsStateImplNoLock();
    314   if (!new_producer_state.equals(old_producer_state))
    315     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
    316   return rv;
    317 }
    318 
    319 MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
    320   base::AutoLock locker(lock_);
    321   DCHECK(has_local_consumer_no_lock());
    322 
    323   if (consumer_in_two_phase_read_no_lock())
    324     return MOJO_RESULT_BUSY;
    325 
    326   // Note: Don't need to validate |*num_bytes| for query.
    327   return ConsumerQueryDataImplNoLock(num_bytes);
    328 }
    329 
    330 MojoResult DataPipe::ConsumerBeginReadData(
    331     UserPointer<const void*> buffer,
    332     UserPointer<uint32_t> buffer_num_bytes,
    333     bool all_or_none) {
    334   base::AutoLock locker(lock_);
    335   DCHECK(has_local_consumer_no_lock());
    336 
    337   if (consumer_in_two_phase_read_no_lock())
    338     return MOJO_RESULT_BUSY;
    339 
    340   uint32_t min_num_bytes_to_read = 0;
    341   if (all_or_none) {
    342     min_num_bytes_to_read = buffer_num_bytes.Get();
    343     if (min_num_bytes_to_read % element_num_bytes_ != 0)
    344       return MOJO_RESULT_INVALID_ARGUMENT;
    345   }
    346 
    347   MojoResult rv = ConsumerBeginReadDataImplNoLock(
    348       buffer, buffer_num_bytes, min_num_bytes_to_read);
    349   if (rv != MOJO_RESULT_OK)
    350     return rv;
    351   DCHECK(consumer_in_two_phase_read_no_lock());
    352   return MOJO_RESULT_OK;
    353 }
    354 
    355 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
    356   base::AutoLock locker(lock_);
    357   DCHECK(has_local_consumer_no_lock());
    358 
    359   if (!consumer_in_two_phase_read_no_lock())
    360     return MOJO_RESULT_FAILED_PRECONDITION;
    361 
    362   HandleSignalsState old_producer_state =
    363       ProducerGetHandleSignalsStateImplNoLock();
    364   MojoResult rv;
    365   if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
    366       num_bytes_read % element_num_bytes_ != 0) {
    367     rv = MOJO_RESULT_INVALID_ARGUMENT;
    368     consumer_two_phase_max_num_bytes_read_ = 0;
    369   } else {
    370     rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
    371   }
    372   // Two-phase read ended even on failure.
    373   DCHECK(!consumer_in_two_phase_read_no_lock());
    374   // If we're now readable, we *became* readable (since we weren't readable
    375   // during the two-phase read), so awake consumer waiters.
    376   HandleSignalsState new_consumer_state =
    377       ConsumerGetHandleSignalsStateImplNoLock();
    378   if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
    379     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
    380   HandleSignalsState new_producer_state =
    381       ProducerGetHandleSignalsStateImplNoLock();
    382   if (!new_producer_state.equals(old_producer_state))
    383     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
    384   return rv;
    385 }
    386 
    387 HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() {
    388   base::AutoLock locker(lock_);
    389   DCHECK(has_local_consumer_no_lock());
    390   return ConsumerGetHandleSignalsStateImplNoLock();
    391 }
    392 
    393 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
    394                                        MojoHandleSignals signals,
    395                                        uint32_t context,
    396                                        HandleSignalsState* signals_state) {
    397   base::AutoLock locker(lock_);
    398   DCHECK(has_local_consumer_no_lock());
    399 
    400   HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock();
    401   if (consumer_state.satisfies(signals)) {
    402     if (signals_state)
    403       *signals_state = consumer_state;
    404     return MOJO_RESULT_ALREADY_EXISTS;
    405   }
    406   if (!consumer_state.can_satisfy(signals)) {
    407     if (signals_state)
    408       *signals_state = consumer_state;
    409     return MOJO_RESULT_FAILED_PRECONDITION;
    410   }
    411 
    412   consumer_waiter_list_->AddWaiter(waiter, signals, context);
    413   return MOJO_RESULT_OK;
    414 }
    415 
    416 void DataPipe::ConsumerRemoveWaiter(Waiter* waiter,
    417                                     HandleSignalsState* signals_state) {
    418   base::AutoLock locker(lock_);
    419   DCHECK(has_local_consumer_no_lock());
    420   consumer_waiter_list_->RemoveWaiter(waiter);
    421   if (signals_state)
    422     *signals_state = ConsumerGetHandleSignalsStateImplNoLock();
    423 }
    424 
    425 bool DataPipe::ConsumerIsBusy() const {
    426   base::AutoLock locker(lock_);
    427   return consumer_in_two_phase_read_no_lock();
    428 }
    429 
    430 DataPipe::DataPipe(bool has_local_producer,
    431                    bool has_local_consumer,
    432                    const MojoCreateDataPipeOptions& validated_options)
    433     : may_discard_((validated_options.flags &
    434                     MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
    435       element_num_bytes_(validated_options.element_num_bytes),
    436       capacity_num_bytes_(validated_options.capacity_num_bytes),
    437       producer_open_(true),
    438       consumer_open_(true),
    439       producer_waiter_list_(has_local_producer ? new WaiterList() : nullptr),
    440       consumer_waiter_list_(has_local_consumer ? new WaiterList() : nullptr),
    441       producer_two_phase_max_num_bytes_written_(0),
    442       consumer_two_phase_max_num_bytes_read_(0) {
    443   // Check that the passed in options actually are validated.
    444   MojoCreateDataPipeOptions unused ALLOW_UNUSED = {0};
    445   DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
    446             MOJO_RESULT_OK);
    447 }
    448 
    449 DataPipe::~DataPipe() {
    450   DCHECK(!producer_open_);
    451   DCHECK(!consumer_open_);
    452   DCHECK(!producer_waiter_list_);
    453   DCHECK(!consumer_waiter_list_);
    454 }
    455 
    456 void DataPipe::AwakeProducerWaitersForStateChangeNoLock(
    457     const HandleSignalsState& new_producer_state) {
    458   lock_.AssertAcquired();
    459   if (!has_local_producer_no_lock())
    460     return;
    461   producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state);
    462 }
    463 
    464 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock(
    465     const HandleSignalsState& new_consumer_state) {
    466   lock_.AssertAcquired();
    467   if (!has_local_consumer_no_lock())
    468     return;
    469   consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state);
    470 }
    471 
    472 }  // namespace system
    473 }  // namespace mojo
    474