1 /* 2 * libjingle 3 * Copyright 2011, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #ifndef TALK_BASE_ATOMICOPS_H_ 29 #define TALK_BASE_ATOMICOPS_H_ 30 31 #include <string> 32 33 #include "talk/base/basictypes.h" 34 #include "talk/base/common.h" 35 #include "talk/base/logging.h" 36 #include "talk/base/scoped_ptr.h" 37 38 namespace talk_base { 39 40 // A single-producer, single-consumer, fixed-size queue. 41 // All methods not ending in Unsafe can be safely called without locking, 42 // provided that calls to consumer methods (Peek/Pop) or producer methods (Push) 43 // only happen on a single thread per method type. If multiple threads need to 44 // read simultaneously or write simultaneously, other synchronization is 45 // necessary. Synchronization is also required if a call into any Unsafe method 46 // could happen at the same time as a call to any other method. 47 template <typename T> 48 class FixedSizeLockFreeQueue { 49 private: 50 // Atomic primitives and memory barrier 51 #if defined(__arm__) 52 typedef uint32 Atomic32; 53 54 // Copied from google3/base/atomicops-internals-arm-v6plus.h 55 static inline void MemoryBarrier() { 56 asm volatile("dmb":::"memory"); 57 } 58 59 // Adapted from google3/base/atomicops-internals-arm-v6plus.h 60 static inline void AtomicIncrement(volatile Atomic32* ptr) { 61 Atomic32 str_success, value; 62 asm volatile ( 63 "1:\n" 64 "ldrex %1, [%2]\n" 65 "add %1, %1, #1\n" 66 "strex %0, %1, [%2]\n" 67 "teq %0, #0\n" 68 "bne 1b" 69 : "=&r"(str_success), "=&r"(value) 70 : "r" (ptr) 71 : "cc", "memory"); 72 } 73 #elif !defined(SKIP_ATOMIC_CHECK) 74 #error "No atomic operations defined for the given architecture." 75 #endif 76 77 public: 78 // Constructs an empty queue, with capacity 0. 79 FixedSizeLockFreeQueue() : pushed_count_(0), 80 popped_count_(0), 81 capacity_(0), 82 data_() {} 83 // Constructs an empty queue with the given capacity. 84 FixedSizeLockFreeQueue(size_t capacity) : pushed_count_(0), 85 popped_count_(0), 86 capacity_(capacity), 87 data_(new T[capacity]) {} 88 89 // Pushes a value onto the queue. Returns true if the value was successfully 90 // pushed (there was space in the queue). This method can be safely called at 91 // the same time as PeekFront/PopFront. 92 bool PushBack(T value) { 93 if (capacity_ == 0) { 94 LOG(LS_WARNING) << "Queue capacity is 0."; 95 return false; 96 } 97 if (IsFull()) { 98 return false; 99 } 100 101 data_[pushed_count_ % capacity_] = value; 102 // Make sure the data is written before the count is incremented, so other 103 // threads can't see the value exists before being able to read it. 104 MemoryBarrier(); 105 AtomicIncrement(&pushed_count_); 106 return true; 107 } 108 109 // Retrieves the oldest value pushed onto the queue. Returns true if there was 110 // an item to peek (the queue was non-empty). This method can be safely called 111 // at the same time as PushBack. 112 bool PeekFront(T* value_out) { 113 if (capacity_ == 0) { 114 LOG(LS_WARNING) << "Queue capacity is 0."; 115 return false; 116 } 117 if (IsEmpty()) { 118 return false; 119 } 120 121 *value_out = data_[popped_count_ % capacity_]; 122 return true; 123 } 124 125 // Retrieves the oldest value pushed onto the queue and removes it from the 126 // queue. Returns true if there was an item to pop (the queue was non-empty). 127 // This method can be safely called at the same time as PushBack. 128 bool PopFront(T* value_out) { 129 if (PeekFront(value_out)) { 130 AtomicIncrement(&popped_count_); 131 return true; 132 } 133 return false; 134 } 135 136 // Clears the current items in the queue and sets the new (fixed) size. This 137 // method cannot be called at the same time as any other method. 138 void ClearAndResizeUnsafe(int new_capacity) { 139 capacity_ = new_capacity; 140 data_.reset(new T[new_capacity]); 141 pushed_count_ = 0; 142 popped_count_ = 0; 143 } 144 145 // Returns true if there is no space left in the queue for new elements. 146 int IsFull() const { return pushed_count_ == popped_count_ + capacity_; } 147 // Returns true if there are no elements in the queue. 148 int IsEmpty() const { return pushed_count_ == popped_count_; } 149 // Returns the current number of elements in the queue. This is always in the 150 // range [0, capacity] 151 size_t Size() const { return pushed_count_ - popped_count_; } 152 153 // Returns the capacity of the queue (max size). 154 size_t capacity() const { return capacity_; } 155 156 private: 157 volatile Atomic32 pushed_count_; 158 volatile Atomic32 popped_count_; 159 size_t capacity_; 160 talk_base::scoped_ptr<T[]> data_; 161 DISALLOW_COPY_AND_ASSIGN(FixedSizeLockFreeQueue); 162 }; 163 164 } 165 166 #endif // TALK_BASE_ATOMICOPS_H_ 167