1 /*------------------------------------------------------------------------- 2 * drawElements C++ Base Library 3 * ----------------------------- 4 * 5 * Copyright 2014 The Android Open Source Project 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 *//*! 20 * \file 21 * \brief Block-based thread-safe queue. 22 *//*--------------------------------------------------------------------*/ 23 24 #include "deBlockBuffer.hpp" 25 #include "deRandom.hpp" 26 #include "deThread.hpp" 27 #include "deInt32.h" 28 29 #include <vector> 30 31 namespace de 32 { 33 34 using std::vector; 35 36 namespace BlockBufferBasicTest 37 { 38 39 struct Message 40 { 41 deUint32 data; 42 43 Message (deUint16 threadId, deUint16 payload) 44 : data((threadId << 16) | payload) 45 { 46 } 47 48 Message (void) 49 : data(0) 50 { 51 } 52 53 deUint16 getThreadId (void) const { return data >> 16; } 54 deUint16 getPayload (void) const { return data & 0xffff; } 55 }; 56 57 typedef BlockBuffer<Message> MessageBuffer; 58 59 class Consumer : public Thread 60 { 61 public: 62 Consumer (MessageBuffer& buffer, int numProducers) 63 : m_buffer (buffer) 64 { 65 m_lastPayload.resize(numProducers, 0); 66 m_payloadSum.resize(numProducers, 0); 67 } 68 69 void run (void) 70 { 71 Random rnd ((deUint32)m_lastPayload.size()); 72 Message tmpBuf [64]; 73 bool consume = true; 74 75 while (consume) 76 { 77 int numToRead = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmpBuf)); 78 int numRead = m_buffer.tryRead(numToRead, &tmpBuf[0]); 79 80 for (int ndx = 0; ndx < numRead; ndx++) 81 { 82 const Message& msg = tmpBuf[ndx]; 83 84 deUint16 threadId = msg.getThreadId(); 85 86 if (threadId == 0xffff) 87 { 88 /* Feed back rest of messages to buffer (they are end messages) so other consumers wake up. */ 89 if (ndx+1 < numRead) 90 { 91 m_buffer.write(numRead-ndx-1, &tmpBuf[ndx+1]); 92 m_buffer.flush(); 93 } 94 95 consume = false; 96 break; 97 } 98 else 99 { 100 /* Verify message. */ 101 DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size())); 102 DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload()); 103 104 m_lastPayload[threadId] = msg.getPayload(); 105 m_payloadSum[threadId] += (deUint32)msg.getPayload(); 106 } 107 } 108 } 109 } 110 111 deUint32 getPayloadSum (deUint16 threadId) const 112 { 113 return m_payloadSum[threadId]; 114 } 115 116 private: 117 MessageBuffer& m_buffer; 118 vector<deUint16> m_lastPayload; 119 vector<deUint32> m_payloadSum; 120 }; 121 122 class Producer : public Thread 123 { 124 public: 125 Producer (MessageBuffer& buffer, deUint16 threadId, int numMessages) 126 : m_buffer (buffer) 127 , m_threadId (threadId) 128 , m_numMessages (numMessages) 129 { 130 } 131 132 void run (void) 133 { 134 // Yield to give main thread chance to start other producers. 135 deSleep(1); 136 137 Random rnd (m_threadId); 138 int msgNdx = 0; 139 Message tmpBuf[64]; 140 141 while (msgNdx < m_numMessages) 142 { 143 int writeSize = rnd.getInt(1, de::min(m_numMessages-msgNdx, DE_LENGTH_OF_ARRAY(tmpBuf))); 144 for (int ndx = 0; ndx < writeSize; ndx++) 145 tmpBuf[ndx] = Message(m_threadId, (deUint16)msgNdx++); 146 147 m_buffer.write(writeSize, &tmpBuf[0]); 148 if (rnd.getBool()) 149 m_buffer.flush(); 150 } 151 } 152 153 private: 154 MessageBuffer& m_buffer; 155 deUint16 m_threadId; 156 int m_numMessages; 157 }; 158 159 void runTest (void) 160 { 161 const int numIterations = 8; 162 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++) 163 { 164 Random rnd (iterNdx); 165 int numBlocks = rnd.getInt(2, 128); 166 int blockSize = rnd.getInt(1, 16); 167 int numProducers = rnd.getInt(1, 16); 168 int numConsumers = rnd.getInt(1, 16); 169 int dataSize = rnd.getInt(50, 200); 170 MessageBuffer buffer (blockSize, numBlocks); 171 vector<Producer*> producers; 172 vector<Consumer*> consumers; 173 174 for (int i = 0; i < numProducers; i++) 175 producers.push_back(new Producer(buffer, (deUint16)i, dataSize)); 176 177 for (int i = 0; i < numConsumers; i++) 178 consumers.push_back(new Consumer(buffer, numProducers)); 179 180 // Start consumers. 181 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++) 182 (*i)->start(); 183 184 // Start producers. 185 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++) 186 (*i)->start(); 187 188 // Wait for producers. 189 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++) 190 (*i)->join(); 191 192 // Write end messages for consumers. 193 const Message endMsg(0xffff, 0); 194 for (int i = 0; i < numConsumers; i++) 195 buffer.write(1, &endMsg); 196 buffer.flush(); 197 198 // Wait for consumers. 199 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++) 200 (*i)->join(); 201 202 // Verify payload sums. 203 deUint32 refSum = 0; 204 for (int i = 0; i < dataSize; i++) 205 refSum += (deUint32)(deUint16)i; 206 207 for (int i = 0; i < numProducers; i++) 208 { 209 deUint32 cmpSum = 0; 210 for (int j = 0; j < numConsumers; j++) 211 cmpSum += consumers[j]->getPayloadSum(i); 212 DE_TEST_ASSERT(refSum == cmpSum); 213 } 214 215 // Free resources. 216 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++) 217 delete *i; 218 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++) 219 delete *i; 220 } 221 } 222 223 } // BlockBufferBasicTest 224 225 namespace BlockBufferCancelTest 226 { 227 228 class Producer : public Thread 229 { 230 public: 231 Producer (BlockBuffer<deUint8>* buffer, deUint32 seed) 232 : m_buffer (buffer) 233 , m_seed (seed) 234 { 235 } 236 237 void run (void) 238 { 239 deUint8 tmp[1024]; 240 Random rnd(m_seed); 241 242 for (;;) 243 { 244 int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp)); 245 246 try 247 { 248 m_buffer->write(blockSize, &tmp[0]); 249 250 if (rnd.getBool()) 251 m_buffer->flush(); 252 } 253 catch (const BlockBuffer<deUint8>::CanceledException&) 254 { 255 break; 256 } 257 } 258 } 259 260 private: 261 BlockBuffer<deUint8>* m_buffer; 262 deUint32 m_seed; 263 }; 264 265 class Consumer : public Thread 266 { 267 public: 268 Consumer (BlockBuffer<deUint8>* buffer, deUint32 seed) 269 : m_buffer (buffer) 270 , m_seed (seed) 271 { 272 } 273 274 void run (void) 275 { 276 deUint8 tmp[1024]; 277 Random rnd(m_seed); 278 279 for (;;) 280 { 281 int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp)); 282 283 try 284 { 285 m_buffer->read(blockSize, &tmp[0]); 286 } 287 catch (const BlockBuffer<deUint8>::CanceledException&) 288 { 289 break; 290 } 291 } 292 } 293 294 private: 295 BlockBuffer<deUint8>* m_buffer; 296 deUint32 m_seed; 297 }; 298 299 void runTest (void) 300 { 301 BlockBuffer<deUint8> buffer (64, 16); 302 const int numIterations = 8; 303 304 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++) 305 { 306 Random rnd (deInt32Hash(iterNdx)); 307 int numThreads = rnd.getInt(1, 16); 308 int sleepMs = rnd.getInt(1, 200); 309 vector<Thread*> threads; 310 311 for (int i = 0; i < numThreads; i++) 312 { 313 if (rnd.getBool()) 314 threads.push_back(new Consumer(&buffer, rnd.getUint32())); 315 else 316 threads.push_back(new Producer(&buffer, rnd.getUint32())); 317 } 318 319 // Start threads. 320 for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++) 321 (*i)->start(); 322 323 // Sleep for a while. 324 deSleep(sleepMs); 325 326 // Cancel buffer. 327 buffer.cancel(); 328 329 // Wait for threads to finish. 330 for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++) 331 (*i)->join(); 332 333 // Reset buffer. 334 buffer.clear(); 335 336 // Delete threads 337 for (vector<Thread*>::iterator thread = threads.begin(); thread != threads.end(); ++thread) 338 delete *thread; 339 } 340 } 341 342 } // BlockBufferCancelTest 343 344 void BlockBuffer_selfTest (void) 345 { 346 BlockBufferBasicTest::runTest(); 347 BlockBufferCancelTest::runTest(); 348 } 349 350 } // de 351