1 /*------------------------------------------------------------------------- 2 * drawElements C++ Base Library 3 * ----------------------------- 4 * 5 * Copyright 2014 The Android Open Source Project 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 *//*! 20 * \file 21 * \brief Thread-safe ring buffer template. 22 *//*--------------------------------------------------------------------*/ 23 24 #include "deThreadSafeRingBuffer.hpp" 25 #include "deRandom.hpp" 26 #include "deThread.hpp" 27 28 #include <vector> 29 30 using std::vector; 31 32 namespace de 33 { 34 35 namespace 36 { 37 38 struct Message 39 { 40 deUint32 data; 41 42 Message (deUint16 threadId, deUint16 payload) 43 : data((threadId << 16) | payload) 44 { 45 } 46 47 Message (void) 48 : data(0) 49 { 50 } 51 52 deUint16 getThreadId (void) const { return (deUint16)(data >> 16); } 53 deUint16 getPayload (void) const { return (deUint16)(data & 0xffff); } 54 }; 55 56 class Consumer : public Thread 57 { 58 public: 59 Consumer (ThreadSafeRingBuffer<Message>& buffer, int numProducers) 60 : m_buffer (buffer) 61 { 62 m_lastPayload.resize(numProducers, 0); 63 m_payloadSum.resize(numProducers, 0); 64 } 65 66 void run (void) 67 { 68 for (;;) 69 { 70 Message msg = m_buffer.popBack(); 71 72 deUint16 threadId = msg.getThreadId(); 73 74 if (threadId == 0xffff) 75 break; 76 77 DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size())); 78 DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload()); 79 80 m_lastPayload[threadId] = msg.getPayload(); 81 m_payloadSum[threadId] += (deUint32)msg.getPayload(); 82 } 83 } 84 85 deUint32 getPayloadSum (deUint16 threadId) const 86 { 87 return m_payloadSum[threadId]; 88 } 89 90 private: 91 ThreadSafeRingBuffer<Message>& m_buffer; 92 vector<deUint16> m_lastPayload; 93 vector<deUint32> m_payloadSum; 94 }; 95 96 class Producer : public Thread 97 { 98 public: 99 Producer (ThreadSafeRingBuffer<Message>& buffer, deUint16 threadId, int dataSize) 100 : m_buffer (buffer) 101 , m_threadId (threadId) 102 , m_dataSize (dataSize) 103 { 104 } 105 106 void run (void) 107 { 108 // Yield to give main thread chance to start other producers. 109 deSleep(1); 110 111 for (int ndx = 0; ndx < m_dataSize; ndx++) 112 m_buffer.pushFront(Message(m_threadId, (deUint16)ndx)); 113 } 114 115 private: 116 ThreadSafeRingBuffer<Message>& m_buffer; 117 deUint16 m_threadId; 118 int m_dataSize; 119 }; 120 121 } // anonymous 122 123 void ThreadSafeRingBuffer_selfTest (void) 124 { 125 const int numIterations = 16; 126 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++) 127 { 128 Random rnd (iterNdx); 129 int bufSize = rnd.getInt(1, 2048); 130 int numProducers = rnd.getInt(1, 16); 131 int numConsumers = rnd.getInt(1, 16); 132 int dataSize = rnd.getInt(1000, 10000); 133 ThreadSafeRingBuffer<Message> buffer (bufSize); 134 vector<Producer*> producers; 135 vector<Consumer*> consumers; 136 137 for (int i = 0; i < numProducers; i++) 138 producers.push_back(new Producer(buffer, (deUint16)i, dataSize)); 139 140 for (int i = 0; i < numConsumers; i++) 141 consumers.push_back(new Consumer(buffer, numProducers)); 142 143 // Start consumers. 144 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++) 145 (*i)->start(); 146 147 // Start producers. 148 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++) 149 (*i)->start(); 150 151 // Wait for producers. 152 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++) 153 (*i)->join(); 154 155 // Write end messages for consumers. 156 for (int i = 0; i < numConsumers; i++) 157 buffer.pushFront(Message(0xffff, 0)); 158 159 // Wait for consumers. 160 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++) 161 (*i)->join(); 162 163 // Verify payload sums. 164 deUint32 refSum = 0; 165 for (int i = 0; i < dataSize; i++) 166 refSum += (deUint32)(deUint16)i; 167 168 for (int i = 0; i < numProducers; i++) 169 { 170 deUint32 cmpSum = 0; 171 for (int j = 0; j < numConsumers; j++) 172 cmpSum += consumers[j]->getPayloadSum((deUint16)i); 173 DE_TEST_ASSERT(refSum == cmpSum); 174 } 175 176 // Free resources. 177 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++) 178 delete *i; 179 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++) 180 delete *i; 181 } 182 } 183 184 } // de 185