Home | History | Annotate | Download | only in memory
      1 /*
      2  * Copyright (C) 2019 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #ifndef SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
     18 #define SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
     19 
     20 #include "perfetto/base/optional.h"
     21 #include "perfetto/base/unix_socket.h"
     22 #include "perfetto/base/utils.h"
     23 #include "src/profiling/memory/scoped_spinlock.h"
     24 
     25 #include <atomic>
     26 #include <map>
     27 #include <memory>
     28 
     29 #include <stdint.h>
     30 
     31 namespace perfetto {
     32 namespace profiling {
     33 
     34 // A concurrent, multi-writer single-reader ring buffer FIFO, based on a
     35 // circular buffer over shared memory. It has similar semantics to a SEQ_PACKET
     36 // + O_NONBLOCK socket, specifically:
     37 //
     38 // - Writes are atomic, data is either written fully in the buffer or not.
     39 // - New writes are discarded if the buffer is full.
     40 // - If a write succeeds, the reader is guaranteed to see the whole buffer.
     41 // - Reads are atomic, no fragmentation.
     42 // - The reader sees writes in write order (% discarding).
     43 //
     44 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     45 // *IMPORTANT*: The ring buffer must be written under the assumption that the
     46 // other end modifies arbitrary shared memory without holding the spin-lock.
     47 // This means we must make local copies of read and write pointers for doing
     48 // bounds checks followed by reads / writes, as they might change in the
     49 // meantime.
     50 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     51 //
     52 // TODO:
     53 // - Write a benchmark.
     54 class SharedRingBuffer {
     55  public:
     56   class Buffer {
     57    public:
     58     Buffer() {}
     59     Buffer(uint8_t* d, size_t s) : data(d), size(s) {}
     60 
     61     Buffer(const Buffer&) = delete;
     62     Buffer& operator=(const Buffer&) = delete;
     63 
     64     Buffer(Buffer&&) = default;
     65     Buffer& operator=(Buffer&&) = default;
     66 
     67     operator bool() const { return data != nullptr; }
     68 
     69     uint8_t* data = nullptr;
     70     size_t size = 0;
     71   };
     72 
     73   struct Stats {
     74     uint64_t bytes_written;
     75     uint64_t num_writes_succeeded;
     76     uint64_t num_writes_corrupt;
     77     uint64_t num_writes_overflow;
     78 
     79     uint64_t num_reads_succeeded;
     80     uint64_t num_reads_corrupt;
     81     uint64_t num_reads_nodata;
     82 
     83     // Fields below get set by GetStats as copies of atomics in MetadataPage.
     84     uint64_t failed_spinlocks;
     85   };
     86 
     87   static base::Optional<SharedRingBuffer> Create(size_t);
     88   static base::Optional<SharedRingBuffer> Attach(base::ScopedFile);
     89 
     90   ~SharedRingBuffer();
     91   SharedRingBuffer() = default;
     92 
     93   SharedRingBuffer(SharedRingBuffer&&) noexcept;
     94   SharedRingBuffer& operator=(SharedRingBuffer&&);
     95 
     96   bool is_valid() const { return !!mem_; }
     97   size_t size() const { return size_; }
     98   int fd() const { return *mem_fd_; }
     99 
    100   Buffer BeginWrite(const ScopedSpinlock& spinlock, size_t size);
    101   void EndWrite(Buffer buf);
    102 
    103   Buffer BeginRead();
    104   void EndRead(Buffer);
    105 
    106   Stats GetStats(ScopedSpinlock& spinlock) {
    107     PERFETTO_DCHECK(spinlock.locked());
    108     Stats stats = meta_->stats;
    109     stats.failed_spinlocks =
    110         meta_->failed_spinlocks.load(std::memory_order_relaxed);
    111     return stats;
    112   }
    113 
    114   // This is used by the caller to be able to hold the SpinLock after
    115   // BeginWrite has returned. This is so that additional bookkeeping can be
    116   // done under the lock. This will be used to increment the sequence_number.
    117   ScopedSpinlock AcquireLock(ScopedSpinlock::Mode mode) {
    118     auto lock = ScopedSpinlock(&meta_->spinlock, mode);
    119     if (PERFETTO_UNLIKELY(!lock.locked()))
    120       meta_->failed_spinlocks.fetch_add(1, std::memory_order_relaxed);
    121     return lock;
    122   }
    123 
    124   // Exposed for fuzzers.
    125   struct MetadataPage {
    126     alignas(uint64_t) std::atomic<bool> spinlock;
    127     uint64_t read_pos;
    128     uint64_t write_pos;
    129 
    130     std::atomic<uint64_t> failed_spinlocks;
    131     Stats stats;
    132   };
    133 
    134  private:
    135   struct PointerPositions {
    136     uint64_t read_pos;
    137     uint64_t write_pos;
    138   };
    139 
    140   struct CreateFlag {};
    141   struct AttachFlag {};
    142   SharedRingBuffer(const SharedRingBuffer&) = delete;
    143   SharedRingBuffer& operator=(const SharedRingBuffer&) = delete;
    144   SharedRingBuffer(CreateFlag, size_t size);
    145   SharedRingBuffer(AttachFlag, base::ScopedFile mem_fd) {
    146     Initialize(std::move(mem_fd));
    147   }
    148 
    149   void Initialize(base::ScopedFile mem_fd);
    150   bool IsCorrupt(const PointerPositions& pos);
    151 
    152   inline base::Optional<PointerPositions> GetPointerPositions(
    153       const ScopedSpinlock& lock) {
    154     PERFETTO_DCHECK(lock.locked());
    155 
    156     PointerPositions pos;
    157     pos.read_pos = meta_->read_pos;
    158     pos.write_pos = meta_->write_pos;
    159 
    160     base::Optional<PointerPositions> result;
    161     if (IsCorrupt(pos))
    162       return result;
    163     result = pos;
    164     return result;
    165   }
    166 
    167   inline size_t read_avail(const PointerPositions& pos) {
    168     PERFETTO_DCHECK(pos.write_pos >= pos.read_pos);
    169     auto res = static_cast<size_t>(pos.write_pos - pos.read_pos);
    170     PERFETTO_DCHECK(res <= size_);
    171     return res;
    172   }
    173 
    174   inline size_t write_avail(const PointerPositions& pos) {
    175     return size_ - read_avail(pos);
    176   }
    177 
    178   inline uint8_t* at(uint64_t pos) { return mem_ + (pos & (size_ - 1)); }
    179 
    180   base::ScopedFile mem_fd_;
    181   MetadataPage* meta_ = nullptr;  // Start of the mmaped region.
    182   uint8_t* mem_ = nullptr;  // Start of the contents (i.e. meta_ + kPageSize).
    183 
    184   // Size of the ring buffer contents, without including metadata or the 2nd
    185   // mmap.
    186   size_t size_ = 0;
    187 
    188   // Remember to update the move ctor when adding new fields.
    189 };
    190 
    191 }  // namespace profiling
    192 }  // namespace perfetto
    193 
    194 #endif  // SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
    195