1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SINGLE_STATE_QUEUE_H 18 #define SINGLE_STATE_QUEUE_H 19 20 // Non-blocking single element state queue, or 21 // Non-blocking single-reader / single-writer multi-word atomic load / store 22 23 #include <stdint.h> 24 #include <cutils/atomic.h> 25 26 namespace android { 27 28 template<typename T> class SingleStateQueue { 29 30 public: 31 32 class Mutator; 33 class Observer; 34 35 enum SSQ_STATUS { 36 SSQ_PENDING, /* = 0 */ 37 SSQ_READ, 38 SSQ_DONE, 39 }; 40 41 struct Shared { 42 // needs to be part of a union so don't define constructor or destructor 43 44 friend class Mutator; 45 friend class Observer; 46 47 private: 48 void init() { mAck = 0; mSequence = 0; } 49 50 volatile int32_t mAck; 51 volatile int32_t mSequence; 52 T mValue; 53 }; 54 55 class Mutator { 56 public: 57 Mutator(Shared *shared) 58 : mSequence(0), mShared(shared) 59 { 60 // exactly one of Mutator and Observer must initialize, currently it is Observer 61 // shared->init(); 62 } 63 64 // push new value onto state queue, overwriting previous value; 65 // returns a sequence number which can be used with ack() 66 int32_t push(const T& value) 67 { 68 Shared *shared = mShared; 69 int32_t sequence = mSequence; 70 sequence++; 71 android_atomic_acquire_store(sequence, &shared->mSequence); 72 shared->mValue = value; 73 sequence++; 74 android_atomic_release_store(sequence, &shared->mSequence); 75 mSequence = sequence; 76 // consider signalling a futex here, if we know that observer is waiting 77 return sequence; 78 } 79 80 // returns the status of the last state push. This may be a stale value. 81 // 82 // SSQ_PENDING, or 0, means it has not been observed 83 // SSQ_READ means it has been read 84 // SSQ_DONE means it has been acted upon, after Observer::done() is called 85 enum SSQ_STATUS ack() const 86 { 87 // in the case of SSQ_DONE, prevent any subtle data-races of subsequent reads 88 // being performed (out-of-order) before the ack read, should the caller be 89 // depending on sequentiality of reads. 90 const int32_t ack = android_atomic_acquire_load(&mShared->mAck); 91 return ack - mSequence & ~1 ? SSQ_PENDING /* seq differ */ : 92 ack & 1 ? SSQ_DONE : SSQ_READ; 93 } 94 95 // return true if a push with specified sequence number or later has been observed 96 bool ack(int32_t sequence) const 97 { 98 // this relies on 2's complement rollover to detect an ancient sequence number 99 return mShared->mAck - sequence >= 0; 100 } 101 102 private: 103 int32_t mSequence; 104 Shared * const mShared; 105 }; 106 107 class Observer { 108 public: 109 Observer(Shared *shared) 110 : mSequence(0), mSeed(1), mShared(shared) 111 { 112 // exactly one of Mutator and Observer must initialize, currently it is Observer 113 shared->init(); 114 } 115 116 // return true if value has changed 117 bool poll(T& value) 118 { 119 Shared *shared = mShared; 120 int32_t before = shared->mSequence; 121 if (before == mSequence) { 122 return false; 123 } 124 for (int tries = 0; ; ) { 125 const int MAX_TRIES = 5; 126 if (before & 1) { 127 if (++tries >= MAX_TRIES) { 128 return false; 129 } 130 before = shared->mSequence; 131 } else { 132 android_memory_barrier(); 133 T temp = shared->mValue; 134 int32_t after = android_atomic_release_load(&shared->mSequence); 135 if (after == before) { 136 value = temp; 137 shared->mAck = before; 138 mSequence = before; // mSequence is even after poll success 139 return true; 140 } 141 if (++tries >= MAX_TRIES) { 142 return false; 143 } 144 before = after; 145 } 146 } 147 } 148 149 // (optional) used to indicate to the Mutator that the state that has been polled 150 // has also been acted upon. 151 void done() 152 { 153 const int32_t ack = mShared->mAck + 1; 154 // ensure all previous writes have been performed. 155 android_atomic_release_store(ack, &mShared->mAck); // mSequence is odd after "done" 156 } 157 158 private: 159 int32_t mSequence; 160 int mSeed; // for PRNG 161 Shared * const mShared; 162 }; 163 164 #if 0 165 SingleStateQueue(void /*Shared*/ *shared); 166 /*virtual*/ ~SingleStateQueue() { } 167 168 static size_t size() { return sizeof(Shared); } 169 #endif 170 171 }; 172 173 } // namespace android 174 175 #endif // SINGLE_STATE_QUEUE_H 176