Home | History | Annotate | Download | only in libbroadcastring
      1 #ifndef ANDROID_DVR_BROADCAST_RING_H_
      2 #define ANDROID_DVR_BROADCAST_RING_H_
      3 
      4 #include <inttypes.h>
      5 #include <stddef.h>
      6 #include <stdio.h>
      7 #include <atomic>
      8 #include <limits>
      9 #include <tuple>
     10 #include <type_traits>
     11 #include <utility>
     12 
     13 #include "android-base/logging.h"
     14 
     15 #if ATOMIC_LONG_LOCK_FREE != 2 || ATOMIC_INT_LOCK_FREE != 2
     16 #error "This file requires lock free atomic uint32_t and long"
     17 #endif
     18 
     19 namespace android {
     20 namespace dvr {
     21 
     22 struct DefaultRingTraits {
     23   // Set this to false to allow compatibly expanding the record size.
     24   static constexpr bool kUseStaticRecordSize = false;
     25 
     26   // Set this to a nonzero value to fix the number of records in the ring.
     27   static constexpr uint32_t kStaticRecordCount = 0;
     28 
     29   // Set this to the max number of records that can be written simultaneously.
     30   static constexpr uint32_t kMaxReservedRecords = 1;
     31 
     32   // Set this to the min number of records that must be readable.
     33   static constexpr uint32_t kMinAvailableRecords = 1;
     34 };
     35 
     36 // Nonblocking ring suitable for concurrent single-writer, multi-reader access.
     37 //
     38 // Readers never block the writer and thus this is a nondeterministically lossy
     39 // transport in the absence of external synchronization. Don't use this as a
     40 // transport when deterministic behavior is required.
     41 //
     42 // Readers may have a read-only mapping; each reader's state is a single local
     43 // sequence number.
     44 //
     45 // The implementation takes care to avoid data races on record access.
     46 // Inconsistent data can only be returned if at least 2^32 records are written
     47 // during the read-side critical section.
     48 //
     49 // In addition, both readers and the writer are careful to avoid accesses
     50 // outside the bounds of the mmap area passed in during initialization even if
     51 // there is a misbehaving or malicious task with write access to the mmap area.
     52 //
     53 // When dynamic record size is enabled, readers use the record size in the ring
     54 // header when indexing the ring, so that it is possible to extend the record
     55 // type without breaking the read-side ABI.
     56 //
     57 // Avoid calling Put() in a tight loop; there should be significantly more time
     58 // between successive puts than it takes to read one record from memory to
     59 // ensure Get() completes quickly. This requirement should not be difficult to
     60 // achieve for most practical uses; 4kB puts at 10,000Hz is well below the
     61 // scaling limit on current mobile chips.
     62 //
     63 // Example Writer Usage:
     64 //
     65 //   using Record = MyRecordType;
     66 //   using Ring = BroadcastRing<Record>;
     67 //
     68 //   uint32_t record_count = kMyDesiredCount;
     69 //   uint32_t ring_size = Ring::MemorySize(record_count);
     70 //
     71 //   size_t page_size = sysconf(_SC_PAGESIZE);
     72 //   uint32_t mmap_size = (ring_size + (page_size - 1)) & ~(page_size - 1);
     73 //
     74 //   // Allocate & map via your preferred mechanism, e.g.
     75 //   int fd = open("/dev/shm/ring_test", O_CREAT|O_RDWR|O_CLOEXEC, 0600);
     76 //   CHECK(fd >= 0);
     77 //   CHECK(!ftruncate(fd, ring_size));
     78 //   void *mmap_base = mmap(nullptr, mmap_size, PROT_READ|PROT_WRITE,
     79 //                          MAP_SHARED, fd, 0);
     80 //   CHECK(mmap_base != MAP_FAILED);
     81 //   close(fd);
     82 //
     83 //   Ring ring = Ring::Create(mmap_base, mmap_size, record_count);
     84 //
     85 //   while (!done)
     86 //     ring.Put(BuildNextRecordBlocking());
     87 //
     88 //   CHECK(!munmap(mmap_base, mmap_size));
     89 //
     90 // Example Reader Usage:
     91 //
     92 //   using Record = MyRecordType;
     93 //   using Ring = BroadcastRing<Record>;
     94 //
     95 //   // Map via your preferred mechanism, e.g.
     96 //   int fd = open("/dev/shm/ring_test", O_RDONLY|O_CLOEXEC);
     97 //   CHECK(fd >= 0);
     98 //   struct stat st;
     99 //   CHECK(!fstat(fd, &st));
    100 //   size_t mmap_size = st.st_size;
    101 //   void *mmap_base = mmap(nullptr, mmap_size, PROT_READ,
    102 //                          MAP_SHARED, fd, 0);
    103 //   CHECK(mmap_base != MAP_FAILED);
    104 //   close(fd);
    105 //
    106 //   Ring ring;
    107 //   bool import_ok;
    108 //   std::tie(ring, import_ok) = Ring::Import(mmap_base, mmap_size);
    109 //   CHECK(import_ok);
    110 //
    111 //   uint32_t sequence;
    112 //
    113 //   // Choose starting point (using "0" is unpredictable but not dangerous)
    114 //   sequence = ring.GetOldestSequence();  // The oldest available
    115 //   sequence = ring.GetNewestSequence();  // The newest available
    116 //   sequence = ring.GetNextSequence();    // The next one produced
    117 //
    118 //   while (!done) {
    119 //     Record record;
    120 //
    121 //     if (you_want_to_process_all_available_records) {
    122 //       while (ring.Get(&sequence, &record)) {
    123 //         ProcessRecord(sequence, record);
    124 //         sequence++;
    125 //       }
    126 //     } else if (you_want_to_skip_to_the_newest_record) {
    127 //       if (ring.GetNewest(&sequence, &record)) {
    128 //         ProcessRecord(sequence, record);
    129 //         sequence++;
    130 //       }
    131 //     }
    132 //
    133 //     DoSomethingExpensiveOrBlocking();
    134 //   }
    135 //
    136 //   CHECK(!munmap(mmap_base, mmap_size));
    137 //
    138 template <typename RecordType, typename BaseTraits = DefaultRingTraits>
    139 class BroadcastRing {
    140  public:
    141   using Record = RecordType;
    142   struct Traits : public BaseTraits {
    143     // Must have enough space for writers, plus enough space for readers.
    144     static constexpr int kMinRecordCount =
    145         BaseTraits::kMaxReservedRecords + BaseTraits::kMinAvailableRecords;
    146 
    147     // Count of zero means dynamic, non-zero means static.
    148     static constexpr bool kUseStaticRecordCount =
    149         (BaseTraits::kStaticRecordCount != 0);
    150 
    151     // If both record size and count are static then the overall size is too.
    152     static constexpr bool kIsStaticSize =
    153         BaseTraits::kUseStaticRecordSize && kUseStaticRecordCount;
    154   };
    155 
    156   static constexpr bool IsPowerOfTwo(uint32_t size) {
    157     return (size & (size - 1)) == 0;
    158   }
    159 
    160   // Sanity check the options provided in Traits.
    161   static_assert(Traits::kMinRecordCount >= 1, "Min record count too small");
    162   static_assert(!Traits::kUseStaticRecordCount ||
    163                     Traits::kStaticRecordCount >= Traits::kMinRecordCount,
    164                 "Static record count is too small");
    165   static_assert(!Traits::kStaticRecordCount ||
    166                     IsPowerOfTwo(Traits::kStaticRecordCount),
    167                 "Static record count is not a power of two");
    168   static_assert(std::is_standard_layout<Record>::value,
    169                 "Record type must be standard layout");
    170 
    171   BroadcastRing() {}
    172 
    173   // Creates a new ring at |mmap| with |record_count| records.
    174   //
    175   // There must be at least |MemorySize(record_count)| bytes of space already
    176   // allocated at |mmap|. The ring does not take ownership.
    177   //
    178   // Use this function for dynamically sized rings.
    179   static BroadcastRing Create(void* mmap, size_t mmap_size,
    180                               uint32_t record_count) {
    181     BroadcastRing ring(mmap);
    182     CHECK(ring.ValidateGeometry(mmap_size, sizeof(Record), record_count));
    183     ring.InitializeHeader(sizeof(Record), record_count);
    184     return ring;
    185   }
    186 
    187   // Creates a new ring at |mmap|.
    188   //
    189   // There must be at least |MemorySize()| bytes of space already allocated at
    190   // |mmap|. The ring does not take ownership.
    191   //
    192   // Use this function for statically sized rings.
    193   static BroadcastRing Create(void* mmap, size_t mmap_size) {
    194     static_assert(Traits::kUseStaticRecordCount,
    195                   "Wrong Create() function called for dynamic record count");
    196     return Create(mmap, mmap_size, Traits::kStaticRecordCount);
    197   }
    198 
    199   // Imports an existing ring at |mmap|.
    200   //
    201   // Import may fail if the ring parameters in the mmap header are not sensible.
    202   // In this case the returned boolean is false; make sure to check this value.
    203   static std::tuple<BroadcastRing, bool> Import(void* mmap, size_t mmap_size) {
    204     BroadcastRing ring(mmap);
    205     uint32_t record_size = 0;
    206     uint32_t record_count = 0;
    207     if (mmap_size >= sizeof(Header)) {
    208       record_size = std::atomic_load_explicit(&ring.header_mmap()->record_size,
    209                                               std::memory_order_relaxed);
    210       record_count = std::atomic_load_explicit(
    211           &ring.header_mmap()->record_count, std::memory_order_relaxed);
    212     }
    213     bool ok = ring.ValidateGeometry(mmap_size, record_size, record_count);
    214     return std::make_tuple(ring, ok);
    215   }
    216 
    217   ~BroadcastRing() {}
    218 
    219   // Calculates the space necessary for a ring of size |record_count|.
    220   //
    221   // Use this function for dynamically sized rings.
    222   static constexpr size_t MemorySize(uint32_t record_count) {
    223     return sizeof(Header) + sizeof(Record) * record_count;
    224   }
    225 
    226   // Calculates the space necessary for a statically sized ring.
    227   //
    228   // Use this function for statically sized rings.
    229   static constexpr size_t MemorySize() {
    230     static_assert(
    231         Traits::kUseStaticRecordCount,
    232         "Wrong MemorySize() function called for dynamic record count");
    233     return MemorySize(Traits::kStaticRecordCount);
    234   }
    235 
    236   // Writes a record to the ring.
    237   //
    238   // The oldest record is overwritten unless the ring is not already full.
    239   void Put(const Record& record) {
    240     const int kRecordCount = 1;
    241     Reserve(kRecordCount);
    242     Geometry geometry = GetGeometry();
    243     PutRecordInternal(&record, record_mmap_writer(geometry.tail_index));
    244     Publish(kRecordCount);
    245   }
    246 
    247   // Gets sequence number of the oldest currently available record.
    248   uint32_t GetOldestSequence() const {
    249     return std::atomic_load_explicit(&header_mmap()->head,
    250                                      std::memory_order_relaxed);
    251   }
    252 
    253   // Gets sequence number of the first future record.
    254   //
    255   // If the returned value is passed to Get() and there is no concurrent Put(),
    256   // Get() will return false.
    257   uint32_t GetNextSequence() const {
    258     return std::atomic_load_explicit(&header_mmap()->tail,
    259                                      std::memory_order_relaxed);
    260   }
    261 
    262   // Gets sequence number of the newest currently available record.
    263   uint32_t GetNewestSequence() const { return GetNextSequence() - 1; }
    264 
    265   // Copies the oldest available record with sequence at least |*sequence| to
    266   // |record|.
    267   //
    268   // Returns false if there is no recent enough record available.
    269   //
    270   // Updates |*sequence| with the sequence number of the record returned. To get
    271   // the following record, increment this number by one.
    272   //
    273   // This function synchronizes with two other operations:
    274   //
    275   //    (1) Load-Acquire of |tail|
    276   //
    277   //        Together with the store-release in Publish(), this load-acquire
    278   //        ensures each store to a record in PutRecordInternal() happens-before
    279   //        any corresponding load in GetRecordInternal().
    280   //
    281   //        i.e. the stores for the records with sequence numbers < |tail| have
    282   //        completed from our perspective
    283   //
    284   //    (2) Acquire Fence between record access & final load of |head|
    285   //
    286   //        Together with the release fence in Reserve(), this ensures that if
    287   //        GetRecordInternal() loads a value stored in some execution of
    288   //        PutRecordInternal(), then the store of |head| in the Reserve() that
    289   //        preceeded it happens-before our final load of |head|.
    290   //
    291   //        i.e. if we read a record with sequence number >= |final_head| then
    292   //        no later store to that record has completed from our perspective
    293   bool Get(uint32_t* sequence /*inout*/, Record* record /*out*/) const {
    294     for (;;) {
    295       uint32_t tail = std::atomic_load_explicit(&header_mmap()->tail,
    296                                                 std::memory_order_acquire);
    297       uint32_t head = std::atomic_load_explicit(&header_mmap()->head,
    298                                                 std::memory_order_relaxed);
    299 
    300       if (tail - head > record_count())
    301         continue;  // Concurrent modification; re-try.
    302 
    303       if (*sequence - head > tail - head)
    304         *sequence = head;  // Out of window, skip forward to first available.
    305 
    306       if (*sequence == tail) return false;  // No new records available.
    307 
    308       Geometry geometry =
    309           CalculateGeometry(record_count(), record_size(), *sequence, tail);
    310 
    311       // Compute address explicitly in case record_size > sizeof(Record).
    312       RecordStorage* record_storage = record_mmap_reader(geometry.head_index);
    313 
    314       GetRecordInternal(record_storage, record);
    315 
    316       // NB: It is not sufficient to change this to a load-acquire of |head|.
    317       std::atomic_thread_fence(std::memory_order_acquire);
    318 
    319       uint32_t final_head = std::atomic_load_explicit(
    320           &header_mmap()->head, std::memory_order_relaxed);
    321 
    322       if (final_head - head > *sequence - head)
    323         continue;  // Concurrent modification; re-try.
    324 
    325       // Note: Combining the above 4 comparisons gives:
    326       // 0 <= final_head - head <= sequence - head < tail - head <= record_count
    327       //
    328       // We can also write this as:
    329       // head <=* final_head <=* sequence <* tail <=* head + record_count
    330       //
    331       // where <* orders by difference from head: x <* y if x - head < y - head.
    332       // This agrees with the order of sequence updates during "put" operations.
    333       return true;
    334     }
    335   }
    336 
    337   // Copies the newest available record with sequence at least |*sequence| to
    338   // |record|.
    339   //
    340   // Returns false if there is no recent enough record available.
    341   //
    342   // Updates |*sequence| with the sequence number of the record returned. To get
    343   // the following record, increment this number by one.
    344   bool GetNewest(uint32_t* sequence, Record* record) const {
    345     uint32_t newest_sequence = GetNewestSequence();
    346     if (*sequence == newest_sequence + 1) return false;
    347     *sequence = newest_sequence;
    348     return Get(sequence, record);
    349   }
    350 
    351   uint32_t record_count() const { return record_count_internal(); }
    352   uint32_t record_size() const { return record_size_internal(); }
    353   static constexpr uint32_t mmap_alignment() { return alignof(Mmap); }
    354 
    355  private:
    356   struct Header {
    357     // Record size for reading out of the ring. Writers always write the full
    358     // length; readers may need to read a prefix of each record.
    359     std::atomic<uint32_t> record_size;
    360 
    361     // Number of records in the ring.
    362     std::atomic<uint32_t> record_count;
    363 
    364     // Readable region is [head % record_count, tail % record_count).
    365     //
    366     // The region in [tail % record_count, head % record_count) was either never
    367     // populated or is being updated.
    368     //
    369     // These are sequences numbers, not indexes - indexes should be computed
    370     // with a modulus.
    371     //
    372     // To ensure consistency:
    373     //
    374     // (1) Writes advance |head| past any updated records before writing to
    375     //     them, and advance |tail| after they are written.
    376     // (2) Readers check |tail| before reading data and |head| after,
    377     //     making sure to discard any data that was written to concurrently.
    378     std::atomic<uint32_t> head;
    379     std::atomic<uint32_t> tail;
    380   };
    381 
    382   // Store using the standard word size.
    383   using StorageType = long;  // NOLINT
    384 
    385   // Always require 8 byte alignment so that the same record sizes are legal on
    386   // 32 and 64 bit builds.
    387   static constexpr size_t kRecordAlignment = 8;
    388   static_assert(kRecordAlignment % sizeof(StorageType) == 0,
    389                 "Bad record alignment");
    390 
    391   struct RecordStorage {
    392     // This is accessed with relaxed atomics to prevent data races on the
    393     // contained data, which would be undefined behavior.
    394     std::atomic<StorageType> data[sizeof(Record) / sizeof(StorageType)];
    395   };
    396 
    397   static_assert(sizeof(StorageType) *
    398                         std::extent<decltype(RecordStorage::data)>() ==
    399                     sizeof(Record),
    400                 "Record length must be a multiple of sizeof(StorageType)");
    401 
    402   struct Geometry {
    403     // Static geometry.
    404     uint32_t record_count;
    405     uint32_t record_size;
    406 
    407     // Copy of atomic sequence counts.
    408     uint32_t head;
    409     uint32_t tail;
    410 
    411     // First index of readable region.
    412     uint32_t head_index;
    413 
    414     // First index of writable region.
    415     uint32_t tail_index;
    416 
    417     // Number of records in readable region.
    418     uint32_t count;
    419 
    420     // Number of records in writable region.
    421     uint32_t space;
    422   };
    423 
    424   // Mmap area layout.
    425   //
    426   // Readers should not index directly into |records| as this is not valid when
    427   // dynamic record sizes are used; use record_mmap_reader() instead.
    428   struct Mmap {
    429     Header header;
    430     RecordStorage records[];
    431   };
    432 
    433   static_assert(std::is_standard_layout<Mmap>::value,
    434                 "Mmap must be standard layout");
    435   static_assert(sizeof(std::atomic<uint32_t>) == sizeof(uint32_t),
    436                 "Lockless atomics contain extra state");
    437   static_assert(sizeof(std::atomic<StorageType>) == sizeof(StorageType),
    438                 "Lockless atomics contain extra state");
    439 
    440   explicit BroadcastRing(void* mmap) {
    441     CHECK_EQ(0U, reinterpret_cast<uintptr_t>(mmap) % alignof(Mmap));
    442     data_.mmap = reinterpret_cast<Mmap*>(mmap);
    443   }
    444 
    445   // Initializes the mmap area header for a new ring.
    446   void InitializeHeader(uint32_t record_size, uint32_t record_count) {
    447     constexpr uint32_t kInitialSequence = -256;  // Force an early wrap.
    448     std::atomic_store_explicit(&header_mmap()->record_size, record_size,
    449                                std::memory_order_relaxed);
    450     std::atomic_store_explicit(&header_mmap()->record_count, record_count,
    451                                std::memory_order_relaxed);
    452     std::atomic_store_explicit(&header_mmap()->head, kInitialSequence,
    453                                std::memory_order_relaxed);
    454     std::atomic_store_explicit(&header_mmap()->tail, kInitialSequence,
    455                                std::memory_order_relaxed);
    456   }
    457 
    458   // Validates ring geometry.
    459   //
    460   // Ring geometry is validated carefully on import and then cached. This allows
    461   // us to avoid out-of-range accesses even if the parameters in the header are
    462   // later changed.
    463   bool ValidateGeometry(size_t mmap_size, uint32_t header_record_size,
    464                         uint32_t header_record_count) {
    465     set_record_size(header_record_size);
    466     set_record_count(header_record_count);
    467 
    468     if (record_size() != header_record_size) return false;
    469     if (record_count() != header_record_count) return false;
    470     if (record_count() < Traits::kMinRecordCount) return false;
    471     if (record_size() < sizeof(Record)) return false;
    472     if (record_size() % kRecordAlignment != 0) return false;
    473     if (!IsPowerOfTwo(record_count())) return false;
    474 
    475     size_t memory_size = record_count() * record_size();
    476     if (memory_size / record_size() != record_count()) return false;
    477     if (memory_size + sizeof(Header) < memory_size) return false;
    478     if (memory_size + sizeof(Header) > mmap_size) return false;
    479 
    480     return true;
    481   }
    482 
    483   // Copies a record into the ring.
    484   //
    485   // This is done with relaxed atomics because otherwise it is racy according to
    486   // the C++ memory model. This is very low overhead once optimized.
    487   static inline void PutRecordInternal(const Record* in, RecordStorage* out) {
    488     StorageType data[sizeof(Record) / sizeof(StorageType)];
    489     memcpy(data, in, sizeof(*in));
    490     for (size_t i = 0; i < std::extent<decltype(data)>(); ++i) {
    491       std::atomic_store_explicit(&out->data[i], data[i],
    492                                  std::memory_order_relaxed);
    493     }
    494   }
    495 
    496   // Copies a record out of the ring.
    497   //
    498   // This is done with relaxed atomics because otherwise it is racy according to
    499   // the C++ memory model. This is very low overhead once optimized.
    500   static inline void GetRecordInternal(RecordStorage* in, Record* out) {
    501     StorageType data[sizeof(Record) / sizeof(StorageType)];
    502     for (size_t i = 0; i < std::extent<decltype(data)>(); ++i) {
    503       data[i] =
    504           std::atomic_load_explicit(&in->data[i], std::memory_order_relaxed);
    505     }
    506     memcpy(out, &data, sizeof(*out));
    507   }
    508 
    509   // Converts a record's sequence number into a storage index.
    510   static uint32_t SequenceToIndex(uint32_t sequence, uint32_t record_count) {
    511     return sequence & (record_count - 1);
    512   }
    513 
    514   // Computes readable & writable ranges from ring parameters.
    515   static Geometry CalculateGeometry(uint32_t record_count, uint32_t record_size,
    516                                     uint32_t head, uint32_t tail) {
    517     Geometry geometry;
    518     geometry.record_count = record_count;
    519     geometry.record_size = record_size;
    520     DCHECK_EQ(0U, geometry.record_size % kRecordAlignment);
    521     geometry.head = head;
    522     geometry.tail = tail;
    523     geometry.head_index = SequenceToIndex(head, record_count);
    524     geometry.tail_index = SequenceToIndex(tail, record_count);
    525     geometry.count = geometry.tail - geometry.head;
    526     DCHECK_LE(geometry.count, record_count);
    527     geometry.space = geometry.record_count - geometry.count;
    528     return geometry;
    529   }
    530 
    531   // Gets the current ring readable & writable regions.
    532   //
    533   // This this is always safe from the writing thread since it is the only
    534   // thread allowed to update the header.
    535   Geometry GetGeometry() const {
    536     return CalculateGeometry(
    537         record_count(), record_size(),
    538         std::atomic_load_explicit(&header_mmap()->head,
    539                                   std::memory_order_relaxed),
    540         std::atomic_load_explicit(&header_mmap()->tail,
    541                                   std::memory_order_relaxed));
    542   }
    543 
    544   // Makes space for at least |reserve_count| records.
    545   //
    546   // There is nothing to prevent overwriting records that have concurrent
    547   // readers. We do however ensure that this situation can be detected: the
    548   // fence ensures the |head| update will be the first update seen by readers,
    549   // and readers check this value after reading and discard data that may have
    550   // been concurrently modified.
    551   void Reserve(uint32_t reserve_count) {
    552     Geometry geometry = GetGeometry();
    553     DCHECK_LE(reserve_count, Traits::kMaxReservedRecords);
    554     uint32_t needed =
    555         (geometry.space >= reserve_count ? 0 : reserve_count - geometry.space);
    556 
    557     std::atomic_store_explicit(&header_mmap()->head, geometry.head + needed,
    558                                std::memory_order_relaxed);
    559 
    560     // NB: It is not sufficient to change this to a store-release of |head|.
    561     std::atomic_thread_fence(std::memory_order_release);
    562   }
    563 
    564   // Makes |publish_count| records visible to readers.
    565   //
    566   // Space must have been reserved by a previous call to Reserve().
    567   void Publish(uint32_t publish_count) {
    568     Geometry geometry = GetGeometry();
    569     DCHECK_LE(publish_count, geometry.space);
    570     std::atomic_store_explicit(&header_mmap()->tail,
    571                                geometry.tail + publish_count,
    572                                std::memory_order_release);
    573   }
    574 
    575   // Helpers to compute addresses in mmap area.
    576   Mmap* mmap() const { return data_.mmap; }
    577   Header* header_mmap() const { return &data_.mmap->header; }
    578   RecordStorage* record_mmap_writer(uint32_t index) const {
    579     DCHECK_EQ(sizeof(Record), record_size());
    580     return &data_.mmap->records[index];
    581   }
    582   RecordStorage* record_mmap_reader(uint32_t index) const {
    583     if (Traits::kUseStaticRecordSize) {
    584       return &data_.mmap->records[index];
    585     } else {
    586       // Calculate the location of a record in the ring without assuming that
    587       // sizeof(Record) == record_size.
    588       return reinterpret_cast<RecordStorage*>(
    589           reinterpret_cast<char*>(data_.mmap->records) + index * record_size());
    590     }
    591   }
    592 
    593   // The following horrifying template gunk enables us to store just the mmap
    594   // base pointer for compile-time statically sized rings. Dynamically sized
    595   // rings also store the validated copy of the record size & count.
    596   //
    597   // This boils down to: use a compile time constant if available, and otherwise
    598   // load the value that was validated on import from a member variable.
    599   template <typename T = Traits>
    600   typename std::enable_if<T::kUseStaticRecordSize, uint32_t>::type
    601   record_size_internal() const {
    602     return sizeof(Record);
    603   }
    604 
    605   template <typename T = Traits>
    606   typename std::enable_if<!T::kUseStaticRecordSize, uint32_t>::type
    607   record_size_internal() const {
    608     return data_.record_size;
    609   }
    610 
    611   template <typename T = Traits>
    612   typename std::enable_if<T::kUseStaticRecordSize, void>::type set_record_size(
    613       uint32_t /*record_size*/) {}
    614 
    615   template <typename T = Traits>
    616   typename std::enable_if<!T::kUseStaticRecordSize, void>::type set_record_size(
    617       uint32_t record_size) {
    618     data_.record_size = record_size;
    619   }
    620 
    621   template <typename T = Traits>
    622   typename std::enable_if<T::kUseStaticRecordCount, uint32_t>::type
    623   record_count_internal() const {
    624     return Traits::kStaticRecordCount;
    625   }
    626 
    627   template <typename T = Traits>
    628   typename std::enable_if<!T::kUseStaticRecordCount, uint32_t>::type
    629   record_count_internal() const {
    630     return data_.record_count;
    631   }
    632 
    633   template <typename T = Traits>
    634   typename std::enable_if<T::kUseStaticRecordCount, void>::type
    635   set_record_count(uint32_t /*record_count*/) const {}
    636 
    637   template <typename T = Traits>
    638   typename std::enable_if<!T::kUseStaticRecordCount, void>::type
    639   set_record_count(uint32_t record_count) {
    640     data_.record_count = record_count;
    641   }
    642 
    643   // Data we need to store for statically sized rings.
    644   struct DataStaticSize {
    645     Mmap* mmap = nullptr;
    646   };
    647 
    648   // Data we need to store for dynamically sized rings.
    649   struct DataDynamicSize {
    650     Mmap* mmap = nullptr;
    651 
    652     // These are cached to make sure misbehaving writers cannot cause
    653     // out-of-bounds memory accesses by updating the values in the mmap header.
    654     uint32_t record_size = 0;
    655     uint32_t record_count = 0;
    656   };
    657 
    658   using DataStaticOrDynamic =
    659       typename std::conditional<Traits::kIsStaticSize, DataStaticSize,
    660                                 DataDynamicSize>::type;
    661 
    662   DataStaticOrDynamic data_;
    663 };
    664 
    665 }  // namespace dvr
    666 }  // namespace android
    667 
    668 #endif  // ANDROID_DVR_BROADCAST_RING_H_
    669