Home | History | Annotate | Download | only in decpp
      1 /*-------------------------------------------------------------------------
      2  * drawElements C++ Base Library
      3  * -----------------------------
      4  *
      5  * Copyright 2014 The Android Open Source Project
      6  *
      7  * Licensed under the Apache License, Version 2.0 (the "License");
      8  * you may not use this file except in compliance with the License.
      9  * You may obtain a copy of the License at
     10  *
     11  *      http://www.apache.org/licenses/LICENSE-2.0
     12  *
     13  * Unless required by applicable law or agreed to in writing, software
     14  * distributed under the License is distributed on an "AS IS" BASIS,
     15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     16  * See the License for the specific language governing permissions and
     17  * limitations under the License.
     18  *
     19  *//*!
     20  * \file
     21  * \brief Block-based thread-safe queue.
     22  *//*--------------------------------------------------------------------*/
     23 
     24 #include "deBlockBuffer.hpp"
     25 #include "deRandom.hpp"
     26 #include "deThread.hpp"
     27 #include "deInt32.h"
     28 
     29 #include <vector>
     30 
     31 namespace de
     32 {
     33 
     34 using std::vector;
     35 
     36 namespace BlockBufferBasicTest
     37 {
     38 
     39 struct Message
     40 {
     41 	deUint32 data;
     42 
     43 	Message (deUint16 threadId, deUint16 payload)
     44 		: data((threadId << 16) | payload)
     45 	{
     46 	}
     47 
     48 	Message (void)
     49 		: data(0)
     50 	{
     51 	}
     52 
     53 	deUint16 getThreadId	(void) const { return (deUint16)(data >> 16);		}
     54 	deUint16 getPayload		(void) const { return (deUint16)(data & 0xffff);	}
     55 };
     56 
     57 typedef BlockBuffer<Message> MessageBuffer;
     58 
     59 class Consumer : public Thread
     60 {
     61 public:
     62 	Consumer (MessageBuffer& buffer, int numProducers)
     63 		: m_buffer		(buffer)
     64 	{
     65 		m_lastPayload.resize(numProducers, 0);
     66 		m_payloadSum.resize(numProducers, 0);
     67 	}
     68 
     69 	void run (void)
     70 	{
     71 		Random	rnd		((deUint32)m_lastPayload.size());
     72 		Message	tmpBuf	[64];
     73 		bool	consume	= true;
     74 
     75 		while (consume)
     76 		{
     77 			int numToRead	= rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmpBuf));
     78 			int numRead		= m_buffer.tryRead(numToRead, &tmpBuf[0]);
     79 
     80 			for (int ndx = 0; ndx < numRead; ndx++)
     81 			{
     82 				const Message& msg = tmpBuf[ndx];
     83 
     84 				deUint16 threadId = msg.getThreadId();
     85 
     86 				if (threadId == 0xffff)
     87 				{
     88 					/* Feed back rest of messages to buffer (they are end messages) so other consumers wake up. */
     89 					if (ndx+1 < numRead)
     90 					{
     91 						m_buffer.write(numRead-ndx-1, &tmpBuf[ndx+1]);
     92 						m_buffer.flush();
     93 					}
     94 
     95 					consume = false;
     96 					break;
     97 				}
     98 				else
     99 				{
    100 					/* Verify message. */
    101 					DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
    102 					DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload());
    103 
    104 					m_lastPayload[threadId]	 = msg.getPayload();
    105 					m_payloadSum[threadId]	+= (deUint32)msg.getPayload();
    106 				}
    107 			}
    108 		}
    109 	}
    110 
    111 	deUint32 getPayloadSum (deUint16 threadId) const
    112 	{
    113 		return m_payloadSum[threadId];
    114 	}
    115 
    116 private:
    117 	MessageBuffer&			m_buffer;
    118 	vector<deUint16>		m_lastPayload;
    119 	vector<deUint32>		m_payloadSum;
    120 };
    121 
    122 class Producer : public Thread
    123 {
    124 public:
    125 	Producer (MessageBuffer& buffer, deUint16 threadId, int numMessages)
    126 		: m_buffer		(buffer)
    127 		, m_threadId	(threadId)
    128 		, m_numMessages	(numMessages)
    129 	{
    130 	}
    131 
    132 	void run (void)
    133 	{
    134 		// Yield to give main thread chance to start other producers.
    135 		deSleep(1);
    136 
    137 		Random	rnd		(m_threadId);
    138 		int		msgNdx	= 0;
    139 		Message	tmpBuf[64];
    140 
    141 		while (msgNdx < m_numMessages)
    142 		{
    143 			int writeSize = rnd.getInt(1, de::min(m_numMessages-msgNdx, DE_LENGTH_OF_ARRAY(tmpBuf)));
    144 			for (int ndx = 0; ndx < writeSize; ndx++)
    145 				tmpBuf[ndx] = Message(m_threadId, (deUint16)msgNdx++);
    146 
    147 			m_buffer.write(writeSize, &tmpBuf[0]);
    148 			if (rnd.getBool())
    149 				m_buffer.flush();
    150 		}
    151 	}
    152 
    153 private:
    154 	MessageBuffer&	m_buffer;
    155 	deUint16		m_threadId;
    156 	int				m_numMessages;
    157 };
    158 
    159 void runTest (void)
    160 {
    161 	const int numIterations = 8;
    162 	for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
    163 	{
    164 		Random							rnd				(iterNdx);
    165 		int								numBlocks		= rnd.getInt(2, 128);
    166 		int								blockSize		= rnd.getInt(1, 16);
    167 		int								numProducers	= rnd.getInt(1, 16);
    168 		int								numConsumers	= rnd.getInt(1, 16);
    169 		int								dataSize		= rnd.getInt(50, 200);
    170 		MessageBuffer					buffer			(blockSize, numBlocks);
    171 		vector<Producer*>				producers;
    172 		vector<Consumer*>				consumers;
    173 
    174 		for (int i = 0; i < numProducers; i++)
    175 			producers.push_back(new Producer(buffer, (deUint16)i, dataSize));
    176 
    177 		for (int i = 0; i < numConsumers; i++)
    178 			consumers.push_back(new Consumer(buffer, numProducers));
    179 
    180 		// Start consumers.
    181 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
    182 			(*i)->start();
    183 
    184 		// Start producers.
    185 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
    186 			(*i)->start();
    187 
    188 		// Wait for producers.
    189 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
    190 			(*i)->join();
    191 
    192 		// Write end messages for consumers.
    193 		const Message endMsg(0xffff, 0);
    194 		for (int i = 0; i < numConsumers; i++)
    195 			buffer.write(1, &endMsg);
    196 		buffer.flush();
    197 
    198 		// Wait for consumers.
    199 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
    200 			(*i)->join();
    201 
    202 		// Verify payload sums.
    203 		deUint32 refSum = 0;
    204 		for (int i = 0; i < dataSize; i++)
    205 			refSum += (deUint32)(deUint16)i;
    206 
    207 		for (int i = 0; i < numProducers; i++)
    208 		{
    209 			deUint32 cmpSum = 0;
    210 			for (int j = 0; j < numConsumers; j++)
    211 				cmpSum += consumers[j]->getPayloadSum((deUint16)i);
    212 			DE_TEST_ASSERT(refSum == cmpSum);
    213 		}
    214 
    215 		// Free resources.
    216 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
    217 			delete *i;
    218 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
    219 			delete *i;
    220 	}
    221 }
    222 
    223 } // BlockBufferBasicTest
    224 
    225 namespace BlockBufferCancelTest
    226 {
    227 
    228 class Producer : public Thread
    229 {
    230 public:
    231 	Producer (BlockBuffer<deUint8>* buffer, deUint32 seed)
    232 		: m_buffer	(buffer)
    233 		, m_seed	(seed)
    234 	{
    235 	}
    236 
    237 	void run (void)
    238 	{
    239 		deUint8	tmp[1024];
    240 		Random	rnd(m_seed);
    241 
    242 		for (;;)
    243 		{
    244 			int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
    245 
    246 			try
    247 			{
    248 				m_buffer->write(blockSize, &tmp[0]);
    249 
    250 				if (rnd.getBool())
    251 					m_buffer->flush();
    252 			}
    253 			catch (const BlockBuffer<deUint8>::CanceledException&)
    254 			{
    255 				break;
    256 			}
    257 		}
    258 	}
    259 
    260 private:
    261 	BlockBuffer<deUint8>*	m_buffer;
    262 	deUint32				m_seed;
    263 };
    264 
    265 class Consumer : public Thread
    266 {
    267 public:
    268 	Consumer (BlockBuffer<deUint8>* buffer, deUint32 seed)
    269 		: m_buffer	(buffer)
    270 		, m_seed	(seed)
    271 	{
    272 	}
    273 
    274 	void run (void)
    275 	{
    276 		deUint8	tmp[1024];
    277 		Random	rnd(m_seed);
    278 
    279 		for (;;)
    280 		{
    281 			int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
    282 
    283 			try
    284 			{
    285 				m_buffer->read(blockSize, &tmp[0]);
    286 			}
    287 			catch (const BlockBuffer<deUint8>::CanceledException&)
    288 			{
    289 				break;
    290 			}
    291 		}
    292 	}
    293 
    294 private:
    295 	BlockBuffer<deUint8>*	m_buffer;
    296 	deUint32				m_seed;
    297 };
    298 
    299 void runTest (void)
    300 {
    301 	BlockBuffer<deUint8>	buffer			(64, 16);
    302 	const int				numIterations	= 8;
    303 
    304 	for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
    305 	{
    306 		Random				rnd				(deInt32Hash(iterNdx));
    307 		int					numThreads		= rnd.getInt(1, 16);
    308 		int					sleepMs			= rnd.getInt(1, 200);
    309 		vector<Thread*>		threads;
    310 
    311 		for (int i = 0; i < numThreads; i++)
    312 		{
    313 			if (rnd.getBool())
    314 				threads.push_back(new Consumer(&buffer, rnd.getUint32()));
    315 			else
    316 				threads.push_back(new Producer(&buffer, rnd.getUint32()));
    317 		}
    318 
    319 		// Start threads.
    320 		for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
    321 			(*i)->start();
    322 
    323 		// Sleep for a while.
    324 		deSleep(sleepMs);
    325 
    326 		// Cancel buffer.
    327 		buffer.cancel();
    328 
    329 		// Wait for threads to finish.
    330 		for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
    331 			(*i)->join();
    332 
    333 		// Reset buffer.
    334 		buffer.clear();
    335 
    336 		// Delete threads
    337 		for (vector<Thread*>::iterator thread = threads.begin(); thread != threads.end(); ++thread)
    338 			delete *thread;
    339 	}
    340 }
    341 
    342 } // BlockBufferCancelTest
    343 
    344 void BlockBuffer_selfTest (void)
    345 {
    346 	BlockBufferBasicTest::runTest();
    347 	BlockBufferCancelTest::runTest();
    348 }
    349 
    350 } // de
    351