Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/data_pipe.h"
      6 
      7 #include <string.h>
      8 
      9 #include <algorithm>
     10 #include <limits>
     11 
     12 #include "base/logging.h"
     13 #include "mojo/system/constants.h"
     14 #include "mojo/system/memory.h"
     15 #include "mojo/system/waiter_list.h"
     16 
     17 namespace mojo {
     18 namespace system {
     19 
     20 void DataPipe::ProducerCancelAllWaiters() {
     21   base::AutoLock locker(lock_);
     22   DCHECK(has_local_producer_no_lock());
     23   producer_waiter_list_->CancelAllWaiters();
     24 }
     25 
     26 void DataPipe::ProducerClose() {
     27   base::AutoLock locker(lock_);
     28   DCHECK(has_local_producer_no_lock());
     29   producer_waiter_list_.reset();
     30   ProducerCloseImplNoLock();
     31 }
     32 
     33 MojoResult DataPipe::ProducerWriteData(const void* elements,
     34                                        uint32_t* num_elements,
     35                                        MojoWriteDataFlags flags) {
     36   base::AutoLock locker(lock_);
     37   DCHECK(has_local_producer_no_lock());
     38 
     39   if (producer_in_two_phase_write_)
     40     return MOJO_RESULT_BUSY;
     41 
     42   // TODO(vtl): This implementation may write less than requested, even if room
     43   // is available. Fix this. (Probably make a subclass-specific impl.)
     44   void* buffer = NULL;
     45   uint32_t buffer_num_elements = *num_elements;
     46   MojoResult rv = ProducerBeginWriteDataImplNoLock(&buffer,
     47                                                    &buffer_num_elements,
     48                                                    flags);
     49   if (rv != MOJO_RESULT_OK)
     50     return rv;
     51 
     52   uint32_t num_elements_to_write = std::min(*num_elements, buffer_num_elements);
     53   memcpy(buffer, elements, num_elements_to_write * element_size_);
     54 
     55   rv = ProducerEndWriteDataImplNoLock(num_elements_to_write);
     56   if (rv != MOJO_RESULT_OK)
     57     return rv;
     58 
     59   *num_elements = num_elements_to_write;
     60   return MOJO_RESULT_OK;
     61 }
     62 
     63 MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
     64                                             uint32_t* buffer_num_elements,
     65                                             MojoWriteDataFlags flags) {
     66   base::AutoLock locker(lock_);
     67   DCHECK(has_local_producer_no_lock());
     68 
     69   if (producer_in_two_phase_write_)
     70     return MOJO_RESULT_BUSY;
     71 
     72   MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer,
     73                                                    buffer_num_elements,
     74                                                    flags);
     75   if (rv != MOJO_RESULT_OK)
     76     return rv;
     77 
     78   producer_in_two_phase_write_ = true;
     79   return MOJO_RESULT_OK;
     80 }
     81 
     82 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_elements_written) {
     83   base::AutoLock locker(lock_);
     84   DCHECK(has_local_producer_no_lock());
     85 
     86   if (!producer_in_two_phase_write_)
     87     return MOJO_RESULT_FAILED_PRECONDITION;
     88 
     89   MojoResult rv = ProducerEndWriteDataImplNoLock(num_elements_written);
     90   producer_in_two_phase_write_ = false;  // End two-phase write even on failure.
     91   return rv;
     92 }
     93 
     94 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
     95                                        MojoWaitFlags flags,
     96                                        MojoResult wake_result) {
     97   base::AutoLock locker(lock_);
     98   DCHECK(has_local_producer_no_lock());
     99 
    100   if ((flags & ProducerSatisfiedFlagsNoLock()))
    101     return MOJO_RESULT_ALREADY_EXISTS;
    102   if (!(flags & ProducerSatisfiableFlagsNoLock()))
    103     return MOJO_RESULT_FAILED_PRECONDITION;
    104 
    105   producer_waiter_list_->AddWaiter(waiter, flags, wake_result);
    106   return MOJO_RESULT_OK;
    107 }
    108 
    109 void DataPipe::ProducerRemoveWaiter(Waiter* waiter) {
    110   base::AutoLock locker(lock_);
    111   DCHECK(has_local_producer_no_lock());
    112   producer_waiter_list_->RemoveWaiter(waiter);
    113 }
    114 
    115 void DataPipe::ConsumerCancelAllWaiters() {
    116   base::AutoLock locker(lock_);
    117   DCHECK(has_local_consumer_no_lock());
    118   consumer_waiter_list_->CancelAllWaiters();
    119 }
    120 
    121 void DataPipe::ConsumerClose() {
    122   base::AutoLock locker(lock_);
    123   DCHECK(has_local_consumer_no_lock());
    124   consumer_waiter_list_.reset();
    125   ConsumerCloseImplNoLock();
    126 }
    127 
    128 MojoResult DataPipe::ConsumerReadData(void* elements,
    129                                       uint32_t* num_elements,
    130                                       MojoReadDataFlags flags) {
    131   base::AutoLock locker(lock_);
    132   DCHECK(has_local_consumer_no_lock());
    133 
    134   if (consumer_in_two_phase_read_)
    135     return MOJO_RESULT_BUSY;
    136 
    137   if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
    138     return ConsumerDiscardDataNoLock(num_elements,
    139                                      (flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE));
    140   }
    141   if ((flags & MOJO_READ_DATA_FLAG_QUERY))
    142     return ConsumerQueryDataNoLock(num_elements);
    143 
    144   // TODO(vtl): This implementation may write less than requested, even if room
    145   // is available. Fix this. (Probably make a subclass-specific impl.)
    146   const void* buffer = NULL;
    147   uint32_t buffer_num_elements = 0;
    148   MojoResult rv = ConsumerBeginReadDataImplNoLock(&buffer,
    149                                                   &buffer_num_elements,
    150                                                   flags);
    151   if (rv != MOJO_RESULT_OK)
    152     return rv;
    153 
    154   uint32_t num_elements_to_read = std::min(*num_elements, buffer_num_elements);
    155   memcpy(elements, buffer, num_elements_to_read * element_size_);
    156 
    157   rv = ConsumerEndReadDataImplNoLock(num_elements_to_read);
    158   if (rv != MOJO_RESULT_OK)
    159     return rv;
    160 
    161   *num_elements = num_elements_to_read;
    162   return MOJO_RESULT_OK;
    163 }
    164 
    165 MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
    166                                            uint32_t* buffer_num_elements,
    167                                            MojoReadDataFlags flags) {
    168   base::AutoLock locker(lock_);
    169   DCHECK(has_local_consumer_no_lock());
    170 
    171   if (consumer_in_two_phase_read_)
    172     return MOJO_RESULT_BUSY;
    173 
    174   MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer,
    175                                                   buffer_num_elements,
    176                                                   flags);
    177   if (rv != MOJO_RESULT_OK)
    178     return rv;
    179 
    180   consumer_in_two_phase_read_ = true;
    181   return MOJO_RESULT_OK;
    182 }
    183 
    184 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_elements_read) {
    185   base::AutoLock locker(lock_);
    186   DCHECK(has_local_consumer_no_lock());
    187 
    188   if (!consumer_in_two_phase_read_)
    189     return MOJO_RESULT_FAILED_PRECONDITION;
    190 
    191   MojoResult rv = ConsumerEndReadDataImplNoLock(num_elements_read);
    192   consumer_in_two_phase_read_ = false;  // End two-phase read even on failure.
    193   return rv;
    194 }
    195 
    196 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
    197                                        MojoWaitFlags flags,
    198                                        MojoResult wake_result) {
    199   base::AutoLock locker(lock_);
    200   DCHECK(has_local_consumer_no_lock());
    201 
    202   if ((flags & ConsumerSatisfiedFlagsNoLock()))
    203     return MOJO_RESULT_ALREADY_EXISTS;
    204   if (!(flags & ConsumerSatisfiableFlagsNoLock()))
    205     return MOJO_RESULT_FAILED_PRECONDITION;
    206 
    207   consumer_waiter_list_->AddWaiter(waiter, flags, wake_result);
    208   return MOJO_RESULT_OK;
    209 }
    210 
    211 void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) {
    212   base::AutoLock locker(lock_);
    213   DCHECK(has_local_consumer_no_lock());
    214   consumer_waiter_list_->RemoveWaiter(waiter);
    215 }
    216 
    217 DataPipe::DataPipe(bool has_local_producer, bool has_local_consumer)
    218     : element_size_(0),
    219       producer_waiter_list_(has_local_producer ? new WaiterList() : NULL),
    220       consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL),
    221       producer_in_two_phase_write_(false),
    222       consumer_in_two_phase_read_(false) {
    223   DCHECK(has_local_producer || has_local_consumer);
    224 }
    225 
    226 DataPipe::~DataPipe() {
    227   DCHECK(!has_local_producer_no_lock());
    228   DCHECK(!has_local_consumer_no_lock());
    229 }
    230 
    231 MojoResult DataPipe::Init(bool may_discard,
    232                           size_t element_size,
    233                           size_t capacity_num_elements) {
    234   // No need to lock: This method is not thread-safe.
    235 
    236   if (element_size == 0)
    237     return MOJO_RESULT_INVALID_ARGUMENT;
    238   if (!capacity_num_elements) {
    239     // Set the capacity to the default (rounded down by element size, but always
    240     // at least one element).
    241     capacity_num_elements =
    242         std::max(static_cast<size_t>(1),
    243                  kDefaultDataPipeCapacityBytes / element_size);
    244   }
    245   if (capacity_num_elements >
    246           std::numeric_limits<uint32_t>::max() / element_size)
    247     return MOJO_RESULT_INVALID_ARGUMENT;
    248   if (capacity_num_elements * element_size > kMaxDataPipeCapacityBytes)
    249     return MOJO_RESULT_RESOURCE_EXHAUSTED;
    250 
    251   may_discard_ = may_discard;
    252   element_size_ = element_size;
    253   capacity_num_elements_ = capacity_num_elements;
    254   return MOJO_RESULT_OK;
    255 }
    256 
    257 void DataPipe::AwakeProducerWaitersForStateChangeNoLock() {
    258   lock_.AssertAcquired();
    259   if (!has_local_producer_no_lock())
    260     return;
    261   producer_waiter_list_->AwakeWaitersForStateChange(
    262       ProducerSatisfiedFlagsNoLock(), ProducerSatisfiableFlagsNoLock());
    263 }
    264 
    265 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock() {
    266   lock_.AssertAcquired();
    267   if (!has_local_consumer_no_lock())
    268     return;
    269   consumer_waiter_list_->AwakeWaitersForStateChange(
    270       ConsumerSatisfiedFlagsNoLock(), ConsumerSatisfiableFlagsNoLock());
    271 }
    272 
    273 }  // namespace system
    274 }  // namespace mojo
    275