Home | History | Annotate | Download | only in audio
      1 /*
      2  * Copyright (C) 2016 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 #pragma once
     17 
     18 #include <stdint.h>
     19 #include <unistd.h>
     20 #include <time.h>
     21 
     22 #include "common/libs/time/monotonic_time.h"
     23 
     24 /**
     25  * This abstract class simulates a buffer that either fills or empties at
     26  * a specified rate.
     27  *
     28  * The simulated buffer automatically fills or empties at a specific rate.
     29  *
     30  * An item is the thing contained in the simulated buffer. Items are moved
     31  * in and out of the buffer without subdivision.
     32  *
     33  * An integral number of items must arrive / depart in each second.
     34  * This number is stored in items_per_second_
     35  *
     36  * items_per_second * 2000000000 must fit within an int64_t. This
     37  * works if items_per_second is represented by an int32.
     38  *
     39  * The base class does have the concept of capacity, but doesn't use it.
     40  * It is included here to simplify unit testing.
     41  *
     42  * For actual use, see SimulatedInputBuffer and SimulatedOutputBuffer below.
     43  */
     44 class SimulatedBufferBase {
     45  public:
     46   static inline int64_t divide_and_round_up(int64_t q, int64_t d) {
     47     return q / d + ((q % d) != 0);
     48   }
     49 
     50   SimulatedBufferBase(
     51       int32_t items_per_second,
     52       int64_t simulated_item_capacity,
     53       cvd::time::MonotonicTimePointFactory* clock =
     54         cvd::time::MonotonicTimePointFactory::GetInstance()) :
     55     clock_(clock),
     56     current_item_num_(0),
     57     base_item_num_(0),
     58     simulated_item_capacity_(simulated_item_capacity),
     59     items_per_second_(items_per_second),
     60     initialize_(true),
     61     paused_(false) { }
     62 
     63   virtual ~SimulatedBufferBase() { }
     64 
     65   int64_t GetCurrentItemNum() {
     66     Update();
     67     return current_item_num_;
     68   }
     69 
     70   const cvd::time::MonotonicTimePoint GetLastUpdatedTime() const {
     71     return current_time_;
     72   }
     73 
     74   // Sleep for the given amount of time. Subclasses may override this to use
     75   // different sleep calls.
     76   // Sleep is best-effort. The code assumes that the acutal sleep time may be
     77   // greater or less than the time requested.
     78   virtual void SleepUntilTime(const cvd::time::MonotonicTimePoint& in) {
     79     struct timespec ts;
     80     in.ToTimespec(&ts);
     81     clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &ts, NULL);
     82   }
     83 
     84   // The time counter may not start at 0. Concrete classes should call this
     85   // to allow the buffer simulation to read the current time number and
     86   // initialize its internal state.
     87   virtual void Init() {
     88     if (initialize_) {
     89       clock_->FetchCurrentTime(&base_time_);
     90       current_time_ = base_time_;
     91       initialize_ = false;
     92     }
     93   }
     94 
     95   virtual void Update() {
     96     if (initialize_) {
     97       Init();
     98     }
     99     cvd::time::MonotonicTimePoint now;
    100     clock_->FetchCurrentTime(&now);
    101     // We can't call FetchCurrentTime() in the constuctor because a subclass may
    102     // want to override it, so we initialze the times to 0. If we detect this
    103     // case go ahead and initialize to a current timestamp.
    104     if (paused_) {
    105       base_time_ += now - current_time_;
    106       current_time_ = now;
    107       return;
    108     }
    109     // Avoid potential overflow by limiting the scaling to one time second.
    110     // There is no round-off error here because the bases are adjusted for full
    111     // seconds.
    112     // There is no issue with int64 overflow because 2's compliment subtraction
    113     // is immune to overflow.
    114     // However, this does assume that kNanosecondsPerSecond * items_per_second_
    115     // fits in an int64.
    116     cvd::time::Seconds seconds(now - base_time_);
    117     base_time_ += seconds;
    118     base_item_num_ += seconds.count() * items_per_second_;
    119     current_time_ = now;
    120     current_item_num_ =
    121         cvd::time::Nanoseconds(now - base_time_).count() *
    122         items_per_second_ / cvd::time::kNanosecondsPerSecond +
    123         base_item_num_;
    124   }
    125 
    126   // If set to true new items will not be created.
    127   bool SetPaused(bool new_state) {
    128     bool rval = paused_;
    129     Update();
    130     paused_ = new_state;
    131     return rval;
    132   }
    133 
    134   // Calculate the TimePoint that corresponds to an item.
    135   // Caution: This may not return a correct time for items in the past.
    136   cvd::time::MonotonicTimePoint CalculateItemTime(int64_t item) {
    137     int64_t seconds = (item - base_item_num_) / items_per_second_;
    138     int64_t new_base_item_num = base_item_num_ + seconds * items_per_second_;
    139     return base_time_ + cvd::time::Seconds(seconds) +
    140       cvd::time::Nanoseconds(divide_and_round_up(
    141           (item - new_base_item_num) *
    142           cvd::time::kNanosecondsPerSecond,
    143           items_per_second_));
    144   }
    145 
    146   // Sleep until the given item number is generated. If the generator is
    147   // paused unpause it to make the sleep finite.
    148   void SleepUntilItem(int64_t item) {
    149     if (paused_) {
    150       SetPaused(false);
    151     }
    152     cvd::time::MonotonicTimePoint desired_time =
    153         CalculateItemTime(item);
    154     while (1) {
    155       Update();
    156       if (current_item_num_ - item >= 0) {
    157         return;
    158       }
    159       SleepUntilTime(desired_time);
    160     }
    161   }
    162 
    163  protected:
    164   // Source of the timepoints.
    165   cvd::time::MonotonicTimePointFactory* clock_;
    166   // Time when the other values in the structure were updated.
    167   cvd::time::MonotonicTimePoint current_time_;
    168   // Most recent time when there was no round-off error between the clock and
    169   // items.
    170   cvd::time::MonotonicTimePoint base_time_;
    171   // Number of the current item.
    172   int64_t current_item_num_;
    173   // Most recent item number where there was no round-off error between the
    174   // clock and items.
    175   int64_t base_item_num_;
    176   // Simulated_Item_Capacity of the buffer in items.
    177   int64_t simulated_item_capacity_;
    178   // Number of items that are created in 1s. A typical number would be 48000.
    179   int32_t items_per_second_;
    180   bool initialize_;
    181   // If true then don't generate new items.
    182   bool paused_;
    183 };
    184 
    185 /**
    186  * This is a simulation of an output buffer that drains at a constant rate.
    187  */
    188 class SimulatedOutputBuffer : public SimulatedBufferBase {
    189  public:
    190   SimulatedOutputBuffer(
    191       int64_t item_rate,
    192       int64_t simulated_item_capacity,
    193       cvd::time::MonotonicTimePointFactory* clock =
    194         cvd::time::MonotonicTimePointFactory::GetInstance()) :
    195       SimulatedBufferBase(item_rate, simulated_item_capacity, clock) {
    196     output_buffer_item_num_ = current_item_num_;
    197   }
    198 
    199   void Update() override {
    200     SimulatedBufferBase::Update();
    201     if ((output_buffer_item_num_ - current_item_num_) < 0) {
    202       // We ran out of items at some point in the past. However, the
    203       // output capactiy can't be negative.
    204       output_buffer_item_num_ = current_item_num_;
    205     }
    206   }
    207 
    208   int64_t AddToOutputBuffer(int64_t num_new_items, bool block) {
    209     Update();
    210     // The easy case: num_new_items fit in the bucket.
    211     if ((output_buffer_item_num_ + num_new_items - current_item_num_) <=
    212         simulated_item_capacity_) {
    213       output_buffer_item_num_ += num_new_items;
    214       return num_new_items;
    215     }
    216     // If we're non-blocking accept enough items to fill the output.
    217     if (!block) {
    218       int64_t used = current_item_num_ + simulated_item_capacity_ -
    219           output_buffer_item_num_;
    220       output_buffer_item_num_ = current_item_num_ + simulated_item_capacity_;
    221       return used;
    222     }
    223     int64_t new_output_buffer_item_num = output_buffer_item_num_ + num_new_items;
    224     SleepUntilItem(new_output_buffer_item_num - simulated_item_capacity_);
    225     output_buffer_item_num_ = new_output_buffer_item_num;
    226     return num_new_items;
    227   }
    228 
    229   int64_t GetNextOutputBufferItemNum() {
    230     Update();
    231     return output_buffer_item_num_;
    232   }
    233 
    234   cvd::time::MonotonicTimePoint GetNextOutputBufferItemTime() {
    235     Update();
    236     return CalculateItemTime(output_buffer_item_num_);
    237   }
    238 
    239   int64_t GetOutputBufferSize() {
    240     Update();
    241     return output_buffer_item_num_ - current_item_num_;
    242   }
    243 
    244   void Drain() {
    245     SleepUntilItem(output_buffer_item_num_);
    246   }
    247 
    248  protected:
    249   int64_t output_buffer_item_num_;
    250 };
    251 
    252 /**
    253  * Simulates an input buffer that fills at a constant rate.
    254  */
    255 class SimulatedInputBuffer : public SimulatedBufferBase {
    256  public:
    257   SimulatedInputBuffer(
    258       int64_t item_rate,
    259       int64_t simulated_item_capacity,
    260       cvd::time::MonotonicTimePointFactory* clock =
    261         cvd::time::MonotonicTimePointFactory::GetInstance()) :
    262       SimulatedBufferBase(item_rate, simulated_item_capacity, clock) {
    263     input_buffer_item_num_ = current_item_num_;
    264     lost_input_items_ = 0;
    265   }
    266 
    267   void Update() override {
    268     SimulatedBufferBase::Update();
    269     if ((current_item_num_ - input_buffer_item_num_) >
    270         simulated_item_capacity_) {
    271       // The buffer overflowed at some point in the past. Account for the lost
    272       // times.
    273       int64_t new_input_buffer_item_num =
    274           current_item_num_ - simulated_item_capacity_;
    275       lost_input_items_ +=
    276           new_input_buffer_item_num - input_buffer_item_num_;
    277       input_buffer_item_num_ = new_input_buffer_item_num;
    278     }
    279   }
    280 
    281   int64_t RemoveFromInputBuffer(int64_t num_items_wanted, bool block) {
    282     Update();
    283     if (!block) {
    284       int64_t num_items_available = current_item_num_ - input_buffer_item_num_;
    285       if (num_items_available < num_items_wanted) {
    286         input_buffer_item_num_ += num_items_available;
    287         return num_items_available;
    288       } else {
    289         input_buffer_item_num_ += num_items_wanted;
    290         return num_items_wanted;
    291       }
    292     }
    293     // Calculate the item number that is being claimed. Sleep until it appears.
    294     // Advancing input_buffer_item_num_ causes a negative value to be compared
    295     // to the capacity, effectively disabling the overflow detection code
    296     // in Update().
    297     input_buffer_item_num_ += num_items_wanted;
    298     while (input_buffer_item_num_ - current_item_num_ > 0) {
    299       SleepUntilItem(input_buffer_item_num_);
    300     }
    301     return num_items_wanted;
    302   }
    303 
    304   int64_t GetLostInputItems() {
    305     Update();
    306     int64_t rval = lost_input_items_;
    307     lost_input_items_ = 0;
    308     return rval;
    309   }
    310 
    311  protected:
    312   int64_t input_buffer_item_num_;
    313   int64_t lost_input_items_;
    314 };
    315