Home | History | Annotate | Download | only in decpp
      1 #ifndef _DEBLOCKBUFFER_HPP
      2 #define _DEBLOCKBUFFER_HPP
      3 /*-------------------------------------------------------------------------
      4  * drawElements C++ Base Library
      5  * -----------------------------
      6  *
      7  * Copyright 2014 The Android Open Source Project
      8  *
      9  * Licensed under the Apache License, Version 2.0 (the "License");
     10  * you may not use this file except in compliance with the License.
     11  * You may obtain a copy of the License at
     12  *
     13  *      http://www.apache.org/licenses/LICENSE-2.0
     14  *
     15  * Unless required by applicable law or agreed to in writing, software
     16  * distributed under the License is distributed on an "AS IS" BASIS,
     17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     18  * See the License for the specific language governing permissions and
     19  * limitations under the License.
     20  *
     21  *//*!
     22  * \file
     23  * \brief Block-based thread-safe queue.
     24  *//*--------------------------------------------------------------------*/
     25 
     26 #include "deBlockBuffer.hpp"
     27 #include "deMutex.hpp"
     28 #include "deSemaphore.h"
     29 
     30 #include <exception>
     31 
     32 namespace de
     33 {
     34 
     35 void BlockBuffer_selfTest (void);
     36 
     37 class BufferCanceledException : public std::exception
     38 {
     39 public:
     40 	inline BufferCanceledException	(void) {}
     41 	inline ~BufferCanceledException	(void) throw() {}
     42 
     43 	const char* what (void) const throw() { return "BufferCanceledException"; }
     44 };
     45 
     46 template <typename T>
     47 class BlockBuffer
     48 {
     49 public:
     50 	typedef BufferCanceledException CanceledException;
     51 
     52 					BlockBuffer			(int blockSize, int numBlocks);
     53 					~BlockBuffer		(void);
     54 
     55 	void			clear				(void); //!< Resets buffer. Will block until pending writes and reads have completed.
     56 
     57 	void			write				(int numElements, const T* elements);
     58 	int				tryWrite			(int numElements, const T* elements);
     59 	void			flush				(void);
     60 	bool			tryFlush			(void);
     61 
     62 	void			read				(int numElements, T* elements);
     63 	int				tryRead				(int numElements, T* elements);
     64 
     65 	void			cancel				(void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException.
     66 	bool			isCanceled			(void) const { return !!m_canceled; }
     67 
     68 private:
     69 					BlockBuffer			(const BlockBuffer& other);
     70 	BlockBuffer&	operator=			(const BlockBuffer& other);
     71 
     72 	int				writeToCurrentBlock	(int numElements, const T* elements, bool blocking);
     73 	int				readFromCurrentBlock(int numElements, T* elements, bool blocking);
     74 
     75 	void			flushWriteBlock		(void);
     76 
     77 	deSemaphore		m_fill;				//!< Block fill count.
     78 	deSemaphore		m_empty;			//!< Block empty count.
     79 
     80 	int				m_writeBlock;		//!< Current write block ndx.
     81 	int				m_writePos;			//!< Position in block. 0 if block is not yet acquired.
     82 
     83 	int				m_readBlock;		//!< Current read block ndx.
     84 	int				m_readPos;			//!< Position in block. 0 if block is not yet acquired.
     85 
     86 	int				m_blockSize;
     87 	int				m_numBlocks;
     88 
     89 	T*				m_elements;
     90 	int*			m_numUsedInBlock;
     91 
     92 	Mutex			m_writeLock;
     93 	Mutex			m_readLock;
     94 
     95 	volatile deUint32	m_canceled;
     96 } DE_WARN_UNUSED_TYPE;
     97 
     98 template <typename T>
     99 BlockBuffer<T>::BlockBuffer (int blockSize, int numBlocks)
    100 	: m_fill			(0)
    101 	, m_empty			(0)
    102 	, m_writeBlock		(0)
    103 	, m_writePos		(0)
    104 	, m_readBlock		(0)
    105 	, m_readPos			(0)
    106 	, m_blockSize		(blockSize)
    107 	, m_numBlocks		(numBlocks)
    108 	, m_elements		(DE_NULL)
    109 	, m_numUsedInBlock	(DE_NULL)
    110 	, m_writeLock		()
    111 	, m_readLock		()
    112 	, m_canceled		(DE_FALSE)
    113 {
    114 	DE_ASSERT(blockSize > 0);
    115 	DE_ASSERT(numBlocks > 0);
    116 
    117 	try
    118 	{
    119 		m_elements			= new T[m_numBlocks*m_blockSize];
    120 		m_numUsedInBlock	= new int[m_numBlocks];
    121 	}
    122 	catch (...)
    123 	{
    124 		delete[] m_elements;
    125 		delete[] m_numUsedInBlock;
    126 		throw;
    127 	}
    128 
    129 	m_fill	= deSemaphore_create(0, DE_NULL);
    130 	m_empty	= deSemaphore_create(numBlocks, DE_NULL);
    131 	DE_ASSERT(m_fill && m_empty);
    132 }
    133 
    134 template <typename T>
    135 BlockBuffer<T>::~BlockBuffer (void)
    136 {
    137 	delete[] m_elements;
    138 	delete[] m_numUsedInBlock;
    139 
    140 	deSemaphore_destroy(m_fill);
    141 	deSemaphore_destroy(m_empty);
    142 }
    143 
    144 template <typename T>
    145 void BlockBuffer<T>::clear (void)
    146 {
    147 	ScopedLock readLock		(m_readLock);
    148 	ScopedLock writeLock	(m_writeLock);
    149 
    150 	deSemaphore_destroy(m_fill);
    151 	deSemaphore_destroy(m_empty);
    152 
    153 	m_fill			= deSemaphore_create(0, DE_NULL);
    154 	m_empty			= deSemaphore_create(m_numBlocks, DE_NULL);
    155 	m_writeBlock	= 0;
    156 	m_writePos		= 0;
    157 	m_readBlock		= 0;
    158 	m_readPos		= 0;
    159 	m_canceled		= DE_FALSE;
    160 
    161 	DE_ASSERT(m_fill && m_empty);
    162 }
    163 
    164 template <typename T>
    165 void BlockBuffer<T>::cancel (void)
    166 {
    167 	DE_ASSERT(!m_canceled);
    168 	m_canceled = DE_TRUE;
    169 
    170 	deSemaphore_increment(m_empty);
    171 	deSemaphore_increment(m_fill);
    172 }
    173 
    174 template <typename T>
    175 int BlockBuffer<T>::writeToCurrentBlock (int numElements, const T* elements, bool blocking)
    176 {
    177 	DE_ASSERT(numElements > 0 && elements != DE_NULL);
    178 
    179 	if (m_writePos == 0)
    180 	{
    181 		/* Write thread doesn't own current block - need to acquire. */
    182 		if (blocking)
    183 			deSemaphore_decrement(m_empty);
    184 		else
    185 		{
    186 			if (!deSemaphore_tryDecrement(m_empty))
    187 				return 0;
    188 		}
    189 
    190 		/* Check for canceled bit. */
    191 		if (m_canceled)
    192 		{
    193 			// \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here.
    194 			deSemaphore_increment(m_empty);
    195 			m_writeLock.unlock();
    196 			throw CanceledException();
    197 		}
    198 	}
    199 
    200 	/* Write thread owns current block. */
    201 	T*		block			= m_elements + m_writeBlock*m_blockSize;
    202 	int		numToWrite		= de::min(numElements, m_blockSize-m_writePos);
    203 
    204 	DE_ASSERT(numToWrite > 0);
    205 
    206 	for (int ndx = 0; ndx < numToWrite; ndx++)
    207 		block[m_writePos+ndx] = elements[ndx];
    208 
    209 	m_writePos += numToWrite;
    210 
    211 	if (m_writePos == m_blockSize)
    212 		flushWriteBlock(); /* Flush current write block. */
    213 
    214 	return numToWrite;
    215 }
    216 
    217 template <typename T>
    218 int BlockBuffer<T>::readFromCurrentBlock (int numElements, T* elements, bool blocking)
    219 {
    220 	DE_ASSERT(numElements > 0 && elements != DE_NULL);
    221 
    222 	if (m_readPos == 0)
    223 	{
    224 		/* Read thread doesn't own current block - need to acquire. */
    225 		if (blocking)
    226 			deSemaphore_decrement(m_fill);
    227 		else
    228 		{
    229 			if (!deSemaphore_tryDecrement(m_fill))
    230 				return 0;
    231 		}
    232 
    233 		/* Check for canceled bit. */
    234 		if (m_canceled)
    235 		{
    236 			// \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here.
    237 			deSemaphore_increment(m_fill);
    238 			m_readLock.unlock();
    239 			throw CanceledException();
    240 		}
    241 	}
    242 
    243 	/* Read thread now owns current block. */
    244 	const T*	block			= m_elements + m_readBlock*m_blockSize;
    245 	int			numUsedInBlock	= m_numUsedInBlock[m_readBlock];
    246 	int			numToRead		= de::min(numElements, numUsedInBlock-m_readPos);
    247 
    248 	DE_ASSERT(numToRead > 0);
    249 
    250 	for (int ndx = 0; ndx < numToRead; ndx++)
    251 		elements[ndx] = block[m_readPos+ndx];
    252 
    253 	m_readPos += numToRead;
    254 
    255 	if (m_readPos == numUsedInBlock)
    256 	{
    257 		/* Free current read block and advance. */
    258 		m_readBlock		= (m_readBlock+1) % m_numBlocks;
    259 		m_readPos		= 0;
    260 		deSemaphore_increment(m_empty);
    261 	}
    262 
    263 	return numToRead;
    264 }
    265 
    266 template <typename T>
    267 int BlockBuffer<T>::tryWrite (int numElements, const T* elements)
    268 {
    269 	int numWritten = 0;
    270 
    271 	DE_ASSERT(numElements > 0 && elements != DE_NULL);
    272 
    273 	if (m_canceled)
    274 		throw CanceledException();
    275 
    276 	if (!m_writeLock.tryLock())
    277 		return numWritten;
    278 
    279 	while (numWritten < numElements)
    280 	{
    281 		int ret = writeToCurrentBlock(numElements-numWritten, elements+numWritten, false /* non-blocking */);
    282 
    283 		if (ret == 0)
    284 			break; /* Write failed. */
    285 
    286 		numWritten += ret;
    287 	}
    288 
    289 	m_writeLock.unlock();
    290 
    291 	return numWritten;
    292 }
    293 
    294 template <typename T>
    295 void BlockBuffer<T>::write (int numElements, const T* elements)
    296 {
    297 	DE_ASSERT(numElements > 0 && elements != DE_NULL);
    298 
    299 	if (m_canceled)
    300 		throw CanceledException();
    301 
    302 	m_writeLock.lock();
    303 
    304 	int numWritten = 0;
    305 	while (numWritten < numElements)
    306 		numWritten += writeToCurrentBlock(numElements-numWritten, elements+numWritten, true /* blocking */);
    307 
    308 	m_writeLock.unlock();
    309 }
    310 
    311 template <typename T>
    312 void BlockBuffer<T>::flush (void)
    313 {
    314 	m_writeLock.lock();
    315 
    316 	if (m_writePos > 0)
    317 		flushWriteBlock();
    318 
    319 	m_writeLock.unlock();
    320 }
    321 
    322 template <typename T>
    323 bool BlockBuffer<T>::tryFlush (void)
    324 {
    325 	if (!m_writeLock.tryLock())
    326 		return false;
    327 
    328 	if (m_writePos > 0)
    329 		flushWriteBlock();
    330 
    331 	m_writeLock.unlock();
    332 
    333 	return true;
    334 }
    335 
    336 template <typename T>
    337 void BlockBuffer<T>::flushWriteBlock (void)
    338 {
    339 	DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize));
    340 
    341 	m_numUsedInBlock[m_writeBlock]	= m_writePos;
    342 	m_writeBlock					= (m_writeBlock+1) % m_numBlocks;
    343 	m_writePos						= 0;
    344 	deSemaphore_increment(m_fill);
    345 }
    346 
    347 template <typename T>
    348 int BlockBuffer<T>::tryRead (int numElements, T* elements)
    349 {
    350 	int numRead = 0;
    351 
    352 	if (m_canceled)
    353 		throw CanceledException();
    354 
    355 	if (!m_readLock.tryLock())
    356 		return numRead;
    357 
    358 	while (numRead < numElements)
    359 	{
    360 		int ret = readFromCurrentBlock(numElements-numRead, &elements[numRead], false /* non-blocking */);
    361 
    362 		if (ret == 0)
    363 			break; /* Failed. */
    364 
    365 		numRead += ret;
    366 	}
    367 
    368 	m_readLock.unlock();
    369 
    370 	return numRead;
    371 }
    372 
    373 template <typename T>
    374 void BlockBuffer<T>::read (int numElements, T* elements)
    375 {
    376 	DE_ASSERT(numElements > 0 && elements != DE_NULL);
    377 
    378 	if (m_canceled)
    379 		throw CanceledException();
    380 
    381 	m_readLock.lock();
    382 
    383 	int numRead = 0;
    384 	while (numRead < numElements)
    385 		numRead += readFromCurrentBlock(numElements-numRead, &elements[numRead], true /* blocking */);
    386 
    387 	m_readLock.unlock();
    388 }
    389 
    390 } // de
    391 
    392 #endif // _DEBLOCKBUFFER_HPP
    393