Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/data_pipe.h"
      6 
      7 #include <string.h>
      8 
      9 #include <algorithm>
     10 #include <limits>
     11 
     12 #include "base/compiler_specific.h"
     13 #include "base/logging.h"
     14 #include "mojo/system/constants.h"
     15 #include "mojo/system/memory.h"
     16 #include "mojo/system/options_validation.h"
     17 #include "mojo/system/waiter_list.h"
     18 
     19 namespace mojo {
     20 namespace system {
     21 
     22 // static
     23 const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = {
     24   static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
     25   MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
     26   1u,
     27   static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)
     28 };
     29 
     30 // static
     31 MojoResult DataPipe::ValidateCreateOptions(
     32     const MojoCreateDataPipeOptions* in_options,
     33     MojoCreateDataPipeOptions* out_options) {
     34   const MojoCreateDataPipeOptionsFlags kKnownFlags =
     35       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;
     36 
     37   *out_options = kDefaultCreateOptions;
     38   if (!in_options)
     39     return MOJO_RESULT_OK;
     40 
     41   MojoResult result =
     42       ValidateOptionsStructPointerSizeAndFlags<MojoCreateDataPipeOptions>(
     43           in_options, kKnownFlags, out_options);
     44   if (result != MOJO_RESULT_OK)
     45     return result;
     46 
     47   // Checks for fields beyond |flags|:
     48 
     49   if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions, element_num_bytes,
     50                                  in_options))
     51     return MOJO_RESULT_OK;
     52   if (in_options->element_num_bytes == 0)
     53     return MOJO_RESULT_INVALID_ARGUMENT;
     54   out_options->element_num_bytes = in_options->element_num_bytes;
     55 
     56   if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions, capacity_num_bytes,
     57                                  in_options) ||
     58       in_options->capacity_num_bytes == 0) {
     59     // Round the default capacity down to a multiple of the element size (but at
     60     // least one element).
     61     out_options->capacity_num_bytes = std::max(
     62         static_cast<uint32_t>(kDefaultDataPipeCapacityBytes -
     63             (kDefaultDataPipeCapacityBytes % out_options->element_num_bytes)),
     64         out_options->element_num_bytes);
     65     return MOJO_RESULT_OK;
     66   }
     67   if (in_options->capacity_num_bytes % out_options->element_num_bytes != 0)
     68     return MOJO_RESULT_INVALID_ARGUMENT;
     69   if (in_options->capacity_num_bytes > kMaxDataPipeCapacityBytes)
     70     return MOJO_RESULT_RESOURCE_EXHAUSTED;
     71   out_options->capacity_num_bytes = in_options->capacity_num_bytes;
     72 
     73   return MOJO_RESULT_OK;
     74 }
     75 
     76 void DataPipe::ProducerCancelAllWaiters() {
     77   base::AutoLock locker(lock_);
     78   DCHECK(has_local_producer_no_lock());
     79   producer_waiter_list_->CancelAllWaiters();
     80 }
     81 
     82 void DataPipe::ProducerClose() {
     83   base::AutoLock locker(lock_);
     84   DCHECK(producer_open_);
     85   producer_open_ = false;
     86   DCHECK(has_local_producer_no_lock());
     87   producer_waiter_list_.reset();
     88   // Not a bug, except possibly in "user" code.
     89   DVLOG_IF(2, producer_in_two_phase_write_no_lock())
     90       << "Producer closed with active two-phase write";
     91   producer_two_phase_max_num_bytes_written_ = 0;
     92   ProducerCloseImplNoLock();
     93   AwakeConsumerWaitersForStateChangeNoLock(
     94       ConsumerGetHandleSignalsStateNoLock());
     95 }
     96 
     97 MojoResult DataPipe::ProducerWriteData(const void* elements,
     98                                        uint32_t* num_bytes,
     99                                        bool all_or_none) {
    100   base::AutoLock locker(lock_);
    101   DCHECK(has_local_producer_no_lock());
    102 
    103   if (producer_in_two_phase_write_no_lock())
    104     return MOJO_RESULT_BUSY;
    105   if (!consumer_open_no_lock())
    106     return MOJO_RESULT_FAILED_PRECONDITION;
    107 
    108   // Returning "busy" takes priority over "invalid argument".
    109   if (*num_bytes % element_num_bytes_ != 0)
    110     return MOJO_RESULT_INVALID_ARGUMENT;
    111 
    112   if (*num_bytes == 0)
    113     return MOJO_RESULT_OK;  // Nothing to do.
    114 
    115   HandleSignalsState old_consumer_state = ConsumerGetHandleSignalsStateNoLock();
    116   MojoResult rv = ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none);
    117   HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock();
    118   if (!new_consumer_state.equals(old_consumer_state))
    119     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
    120   return rv;
    121 }
    122 
    123 MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
    124                                             uint32_t* buffer_num_bytes,
    125                                             bool all_or_none) {
    126   base::AutoLock locker(lock_);
    127   DCHECK(has_local_producer_no_lock());
    128 
    129   if (producer_in_two_phase_write_no_lock())
    130     return MOJO_RESULT_BUSY;
    131   if (!consumer_open_no_lock())
    132     return MOJO_RESULT_FAILED_PRECONDITION;
    133 
    134   if (all_or_none && *buffer_num_bytes % element_num_bytes_ != 0)
    135     return MOJO_RESULT_INVALID_ARGUMENT;
    136 
    137   MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes,
    138                                                    all_or_none);
    139   if (rv != MOJO_RESULT_OK)
    140     return rv;
    141   // Note: No need to awake producer waiters, even though we're going from
    142   // writable to non-writable (since you can't wait on non-writability).
    143   // Similarly, though this may have discarded data (in "may discard" mode),
    144   // making it non-readable, there's still no need to awake consumer waiters.
    145   DCHECK(producer_in_two_phase_write_no_lock());
    146   return MOJO_RESULT_OK;
    147 }
    148 
    149 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
    150   base::AutoLock locker(lock_);
    151   DCHECK(has_local_producer_no_lock());
    152 
    153   if (!producer_in_two_phase_write_no_lock())
    154     return MOJO_RESULT_FAILED_PRECONDITION;
    155   // Note: Allow successful completion of the two-phase write even if the
    156   // consumer has been closed.
    157 
    158   HandleSignalsState old_consumer_state = ConsumerGetHandleSignalsStateNoLock();
    159   MojoResult rv;
    160   if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
    161       num_bytes_written % element_num_bytes_ != 0) {
    162     rv = MOJO_RESULT_INVALID_ARGUMENT;
    163     producer_two_phase_max_num_bytes_written_ = 0;
    164   } else {
    165     rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
    166   }
    167   // Two-phase write ended even on failure.
    168   DCHECK(!producer_in_two_phase_write_no_lock());
    169   // If we're now writable, we *became* writable (since we weren't writable
    170   // during the two-phase write), so awake producer waiters.
    171   HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
    172   if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
    173     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
    174   HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock();
    175   if (!new_consumer_state.equals(old_consumer_state))
    176     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
    177   return rv;
    178 }
    179 
    180 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
    181                                        MojoHandleSignals signals,
    182                                        uint32_t context) {
    183   base::AutoLock locker(lock_);
    184   DCHECK(has_local_producer_no_lock());
    185 
    186   HandleSignalsState producer_state = ProducerGetHandleSignalsStateNoLock();
    187   if (producer_state.satisfies(signals))
    188     return MOJO_RESULT_ALREADY_EXISTS;
    189   if (!producer_state.can_satisfy(signals))
    190     return MOJO_RESULT_FAILED_PRECONDITION;
    191 
    192   producer_waiter_list_->AddWaiter(waiter, signals, context);
    193   return MOJO_RESULT_OK;
    194 }
    195 
    196 void DataPipe::ProducerRemoveWaiter(Waiter* waiter) {
    197   base::AutoLock locker(lock_);
    198   DCHECK(has_local_producer_no_lock());
    199   producer_waiter_list_->RemoveWaiter(waiter);
    200 }
    201 
    202 bool DataPipe::ProducerIsBusy() const {
    203   base::AutoLock locker(lock_);
    204   return producer_in_two_phase_write_no_lock();
    205 }
    206 
    207 void DataPipe::ConsumerCancelAllWaiters() {
    208   base::AutoLock locker(lock_);
    209   DCHECK(has_local_consumer_no_lock());
    210   consumer_waiter_list_->CancelAllWaiters();
    211 }
    212 
    213 void DataPipe::ConsumerClose() {
    214   base::AutoLock locker(lock_);
    215   DCHECK(consumer_open_);
    216   consumer_open_ = false;
    217   DCHECK(has_local_consumer_no_lock());
    218   consumer_waiter_list_.reset();
    219   // Not a bug, except possibly in "user" code.
    220   DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
    221       << "Consumer closed with active two-phase read";
    222   consumer_two_phase_max_num_bytes_read_ = 0;
    223   ConsumerCloseImplNoLock();
    224   AwakeProducerWaitersForStateChangeNoLock(
    225       ProducerGetHandleSignalsStateNoLock());
    226 }
    227 
    228 MojoResult DataPipe::ConsumerReadData(void* elements,
    229                                       uint32_t* num_bytes,
    230                                       bool all_or_none) {
    231   base::AutoLock locker(lock_);
    232   DCHECK(has_local_consumer_no_lock());
    233 
    234   if (consumer_in_two_phase_read_no_lock())
    235     return MOJO_RESULT_BUSY;
    236 
    237   if (*num_bytes % element_num_bytes_ != 0)
    238     return MOJO_RESULT_INVALID_ARGUMENT;
    239 
    240   if (*num_bytes == 0)
    241     return MOJO_RESULT_OK;  // Nothing to do.
    242 
    243   HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock();
    244   MojoResult rv = ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none);
    245   HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
    246   if (!new_producer_state.equals(old_producer_state))
    247     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
    248   return rv;
    249 }
    250 
    251 MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes,
    252                                          bool all_or_none) {
    253   base::AutoLock locker(lock_);
    254   DCHECK(has_local_consumer_no_lock());
    255 
    256   if (consumer_in_two_phase_read_no_lock())
    257     return MOJO_RESULT_BUSY;
    258 
    259   if (*num_bytes % element_num_bytes_ != 0)
    260     return MOJO_RESULT_INVALID_ARGUMENT;
    261 
    262   if (*num_bytes == 0)
    263     return MOJO_RESULT_OK;  // Nothing to do.
    264 
    265   HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock();
    266   MojoResult rv = ConsumerDiscardDataImplNoLock(num_bytes, all_or_none);
    267   HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
    268   if (!new_producer_state.equals(old_producer_state))
    269     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
    270   return rv;
    271 }
    272 
    273 MojoResult DataPipe::ConsumerQueryData(uint32_t* num_bytes) {
    274   base::AutoLock locker(lock_);
    275   DCHECK(has_local_consumer_no_lock());
    276 
    277   if (consumer_in_two_phase_read_no_lock())
    278     return MOJO_RESULT_BUSY;
    279 
    280   // Note: Don't need to validate |*num_bytes| for query.
    281   return ConsumerQueryDataImplNoLock(num_bytes);
    282 }
    283 
    284 MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
    285                                            uint32_t* buffer_num_bytes,
    286                                            bool all_or_none) {
    287   base::AutoLock locker(lock_);
    288   DCHECK(has_local_consumer_no_lock());
    289 
    290   if (consumer_in_two_phase_read_no_lock())
    291     return MOJO_RESULT_BUSY;
    292 
    293   if (all_or_none && *buffer_num_bytes % element_num_bytes_ != 0)
    294     return MOJO_RESULT_INVALID_ARGUMENT;
    295 
    296   MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes,
    297                                                   all_or_none);
    298   if (rv != MOJO_RESULT_OK)
    299     return rv;
    300   DCHECK(consumer_in_two_phase_read_no_lock());
    301   return MOJO_RESULT_OK;
    302 }
    303 
    304 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
    305   base::AutoLock locker(lock_);
    306   DCHECK(has_local_consumer_no_lock());
    307 
    308   if (!consumer_in_two_phase_read_no_lock())
    309     return MOJO_RESULT_FAILED_PRECONDITION;
    310 
    311   HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock();
    312   MojoResult rv;
    313   if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
    314       num_bytes_read % element_num_bytes_ != 0) {
    315     rv = MOJO_RESULT_INVALID_ARGUMENT;
    316     consumer_two_phase_max_num_bytes_read_ = 0;
    317   } else {
    318     rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
    319   }
    320   // Two-phase read ended even on failure.
    321   DCHECK(!consumer_in_two_phase_read_no_lock());
    322   // If we're now readable, we *became* readable (since we weren't readable
    323   // during the two-phase read), so awake consumer waiters.
    324   HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock();
    325   if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
    326     AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
    327   HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock();
    328   if (!new_producer_state.equals(old_producer_state))
    329     AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
    330   return rv;
    331 }
    332 
    333 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
    334                                        MojoHandleSignals signals,
    335                                        uint32_t context) {
    336   base::AutoLock locker(lock_);
    337   DCHECK(has_local_consumer_no_lock());
    338 
    339   HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateNoLock();
    340   if (consumer_state.satisfies(signals))
    341     return MOJO_RESULT_ALREADY_EXISTS;
    342   if (!consumer_state.can_satisfy(signals))
    343     return MOJO_RESULT_FAILED_PRECONDITION;
    344 
    345   consumer_waiter_list_->AddWaiter(waiter, signals, context);
    346   return MOJO_RESULT_OK;
    347 }
    348 
    349 void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) {
    350   base::AutoLock locker(lock_);
    351   DCHECK(has_local_consumer_no_lock());
    352   consumer_waiter_list_->RemoveWaiter(waiter);
    353 }
    354 
    355 bool DataPipe::ConsumerIsBusy() const {
    356   base::AutoLock locker(lock_);
    357   return consumer_in_two_phase_read_no_lock();
    358 }
    359 
    360 
    361 DataPipe::DataPipe(bool has_local_producer,
    362                    bool has_local_consumer,
    363                    const MojoCreateDataPipeOptions& validated_options)
    364     : may_discard_((validated_options.flags &
    365                        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
    366       element_num_bytes_(validated_options.element_num_bytes),
    367       capacity_num_bytes_(validated_options.capacity_num_bytes),
    368       producer_open_(true),
    369       consumer_open_(true),
    370       producer_waiter_list_(has_local_producer ? new WaiterList() : NULL),
    371       consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL),
    372       producer_two_phase_max_num_bytes_written_(0),
    373       consumer_two_phase_max_num_bytes_read_(0) {
    374   // Check that the passed in options actually are validated.
    375   MojoCreateDataPipeOptions unused ALLOW_UNUSED = { 0 };
    376   DCHECK_EQ(ValidateCreateOptions(&validated_options, &unused), MOJO_RESULT_OK);
    377 }
    378 
    379 DataPipe::~DataPipe() {
    380   DCHECK(!producer_open_);
    381   DCHECK(!consumer_open_);
    382   DCHECK(!producer_waiter_list_);
    383   DCHECK(!consumer_waiter_list_);
    384 }
    385 
    386 void DataPipe::AwakeProducerWaitersForStateChangeNoLock(
    387     const HandleSignalsState& new_producer_state) {
    388   lock_.AssertAcquired();
    389   if (!has_local_producer_no_lock())
    390     return;
    391   producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state);
    392 }
    393 
    394 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock(
    395     const HandleSignalsState& new_consumer_state) {
    396   lock_.AssertAcquired();
    397   if (!has_local_consumer_no_lock())
    398     return;
    399   consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state);
    400 }
    401 
    402 }  // namespace system
    403 }  // namespace mojo
    404