1 #ifndef _DEBLOCKBUFFER_HPP 2 #define _DEBLOCKBUFFER_HPP 3 /*------------------------------------------------------------------------- 4 * drawElements C++ Base Library 5 * ----------------------------- 6 * 7 * Copyright 2014 The Android Open Source Project 8 * 9 * Licensed under the Apache License, Version 2.0 (the "License"); 10 * you may not use this file except in compliance with the License. 11 * You may obtain a copy of the License at 12 * 13 * http://www.apache.org/licenses/LICENSE-2.0 14 * 15 * Unless required by applicable law or agreed to in writing, software 16 * distributed under the License is distributed on an "AS IS" BASIS, 17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 18 * See the License for the specific language governing permissions and 19 * limitations under the License. 20 * 21 *//*! 22 * \file 23 * \brief Block-based thread-safe queue. 24 *//*--------------------------------------------------------------------*/ 25 26 #include "deBlockBuffer.hpp" 27 #include "deMutex.hpp" 28 #include "deSemaphore.h" 29 30 #include <exception> 31 32 namespace de 33 { 34 35 void BlockBuffer_selfTest (void); 36 37 class BufferCanceledException : public std::exception 38 { 39 public: 40 inline BufferCanceledException (void) {} 41 inline ~BufferCanceledException (void) throw() {} 42 43 const char* what (void) const throw() { return "BufferCanceledException"; } 44 }; 45 46 template <typename T> 47 class BlockBuffer 48 { 49 public: 50 typedef BufferCanceledException CanceledException; 51 52 BlockBuffer (int blockSize, int numBlocks); 53 ~BlockBuffer (void); 54 55 void clear (void); //!< Resets buffer. Will block until pending writes and reads have completed. 56 57 void write (int numElements, const T* elements); 58 int tryWrite (int numElements, const T* elements); 59 void flush (void); 60 bool tryFlush (void); 61 62 void read (int numElements, T* elements); 63 int tryRead (int numElements, T* elements); 64 65 void cancel (void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException. 66 bool isCanceled (void) const { return !!m_canceled; } 67 68 private: 69 BlockBuffer (const BlockBuffer& other); 70 BlockBuffer& operator= (const BlockBuffer& other); 71 72 int writeToCurrentBlock (int numElements, const T* elements, bool blocking); 73 int readFromCurrentBlock(int numElements, T* elements, bool blocking); 74 75 void flushWriteBlock (void); 76 77 deSemaphore m_fill; //!< Block fill count. 78 deSemaphore m_empty; //!< Block empty count. 79 80 int m_writeBlock; //!< Current write block ndx. 81 int m_writePos; //!< Position in block. 0 if block is not yet acquired. 82 83 int m_readBlock; //!< Current read block ndx. 84 int m_readPos; //!< Position in block. 0 if block is not yet acquired. 85 86 int m_blockSize; 87 int m_numBlocks; 88 89 T* m_elements; 90 int* m_numUsedInBlock; 91 92 Mutex m_writeLock; 93 Mutex m_readLock; 94 95 volatile deUint32 m_canceled; 96 } DE_WARN_UNUSED_TYPE; 97 98 template <typename T> 99 BlockBuffer<T>::BlockBuffer (int blockSize, int numBlocks) 100 : m_fill (0) 101 , m_empty (0) 102 , m_writeBlock (0) 103 , m_writePos (0) 104 , m_readBlock (0) 105 , m_readPos (0) 106 , m_blockSize (blockSize) 107 , m_numBlocks (numBlocks) 108 , m_elements (DE_NULL) 109 , m_numUsedInBlock (DE_NULL) 110 , m_writeLock () 111 , m_readLock () 112 , m_canceled (DE_FALSE) 113 { 114 DE_ASSERT(blockSize > 0); 115 DE_ASSERT(numBlocks > 0); 116 117 try 118 { 119 m_elements = new T[m_numBlocks*m_blockSize]; 120 m_numUsedInBlock = new int[m_numBlocks]; 121 } 122 catch (...) 123 { 124 delete[] m_elements; 125 delete[] m_numUsedInBlock; 126 throw; 127 } 128 129 m_fill = deSemaphore_create(0, DE_NULL); 130 m_empty = deSemaphore_create(numBlocks, DE_NULL); 131 DE_ASSERT(m_fill && m_empty); 132 } 133 134 template <typename T> 135 BlockBuffer<T>::~BlockBuffer (void) 136 { 137 delete[] m_elements; 138 delete[] m_numUsedInBlock; 139 140 deSemaphore_destroy(m_fill); 141 deSemaphore_destroy(m_empty); 142 } 143 144 template <typename T> 145 void BlockBuffer<T>::clear (void) 146 { 147 ScopedLock readLock (m_readLock); 148 ScopedLock writeLock (m_writeLock); 149 150 deSemaphore_destroy(m_fill); 151 deSemaphore_destroy(m_empty); 152 153 m_fill = deSemaphore_create(0, DE_NULL); 154 m_empty = deSemaphore_create(m_numBlocks, DE_NULL); 155 m_writeBlock = 0; 156 m_writePos = 0; 157 m_readBlock = 0; 158 m_readPos = 0; 159 m_canceled = DE_FALSE; 160 161 DE_ASSERT(m_fill && m_empty); 162 } 163 164 template <typename T> 165 void BlockBuffer<T>::cancel (void) 166 { 167 DE_ASSERT(!m_canceled); 168 m_canceled = DE_TRUE; 169 170 deSemaphore_increment(m_empty); 171 deSemaphore_increment(m_fill); 172 } 173 174 template <typename T> 175 int BlockBuffer<T>::writeToCurrentBlock (int numElements, const T* elements, bool blocking) 176 { 177 DE_ASSERT(numElements > 0 && elements != DE_NULL); 178 179 if (m_writePos == 0) 180 { 181 /* Write thread doesn't own current block - need to acquire. */ 182 if (blocking) 183 deSemaphore_decrement(m_empty); 184 else 185 { 186 if (!deSemaphore_tryDecrement(m_empty)) 187 return 0; 188 } 189 190 /* Check for canceled bit. */ 191 if (m_canceled) 192 { 193 // \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here. 194 deSemaphore_increment(m_empty); 195 m_writeLock.unlock(); 196 throw CanceledException(); 197 } 198 } 199 200 /* Write thread owns current block. */ 201 T* block = m_elements + m_writeBlock*m_blockSize; 202 int numToWrite = de::min(numElements, m_blockSize-m_writePos); 203 204 DE_ASSERT(numToWrite > 0); 205 206 for (int ndx = 0; ndx < numToWrite; ndx++) 207 block[m_writePos+ndx] = elements[ndx]; 208 209 m_writePos += numToWrite; 210 211 if (m_writePos == m_blockSize) 212 flushWriteBlock(); /* Flush current write block. */ 213 214 return numToWrite; 215 } 216 217 template <typename T> 218 int BlockBuffer<T>::readFromCurrentBlock (int numElements, T* elements, bool blocking) 219 { 220 DE_ASSERT(numElements > 0 && elements != DE_NULL); 221 222 if (m_readPos == 0) 223 { 224 /* Read thread doesn't own current block - need to acquire. */ 225 if (blocking) 226 deSemaphore_decrement(m_fill); 227 else 228 { 229 if (!deSemaphore_tryDecrement(m_fill)) 230 return 0; 231 } 232 233 /* Check for canceled bit. */ 234 if (m_canceled) 235 { 236 // \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here. 237 deSemaphore_increment(m_fill); 238 m_readLock.unlock(); 239 throw CanceledException(); 240 } 241 } 242 243 /* Read thread now owns current block. */ 244 const T* block = m_elements + m_readBlock*m_blockSize; 245 int numUsedInBlock = m_numUsedInBlock[m_readBlock]; 246 int numToRead = de::min(numElements, numUsedInBlock-m_readPos); 247 248 DE_ASSERT(numToRead > 0); 249 250 for (int ndx = 0; ndx < numToRead; ndx++) 251 elements[ndx] = block[m_readPos+ndx]; 252 253 m_readPos += numToRead; 254 255 if (m_readPos == numUsedInBlock) 256 { 257 /* Free current read block and advance. */ 258 m_readBlock = (m_readBlock+1) % m_numBlocks; 259 m_readPos = 0; 260 deSemaphore_increment(m_empty); 261 } 262 263 return numToRead; 264 } 265 266 template <typename T> 267 int BlockBuffer<T>::tryWrite (int numElements, const T* elements) 268 { 269 int numWritten = 0; 270 271 DE_ASSERT(numElements > 0 && elements != DE_NULL); 272 273 if (m_canceled) 274 throw CanceledException(); 275 276 if (!m_writeLock.tryLock()) 277 return numWritten; 278 279 while (numWritten < numElements) 280 { 281 int ret = writeToCurrentBlock(numElements-numWritten, elements+numWritten, false /* non-blocking */); 282 283 if (ret == 0) 284 break; /* Write failed. */ 285 286 numWritten += ret; 287 } 288 289 m_writeLock.unlock(); 290 291 return numWritten; 292 } 293 294 template <typename T> 295 void BlockBuffer<T>::write (int numElements, const T* elements) 296 { 297 DE_ASSERT(numElements > 0 && elements != DE_NULL); 298 299 if (m_canceled) 300 throw CanceledException(); 301 302 m_writeLock.lock(); 303 304 int numWritten = 0; 305 while (numWritten < numElements) 306 numWritten += writeToCurrentBlock(numElements-numWritten, elements+numWritten, true /* blocking */); 307 308 m_writeLock.unlock(); 309 } 310 311 template <typename T> 312 void BlockBuffer<T>::flush (void) 313 { 314 m_writeLock.lock(); 315 316 if (m_writePos > 0) 317 flushWriteBlock(); 318 319 m_writeLock.unlock(); 320 } 321 322 template <typename T> 323 bool BlockBuffer<T>::tryFlush (void) 324 { 325 if (!m_writeLock.tryLock()) 326 return false; 327 328 if (m_writePos > 0) 329 flushWriteBlock(); 330 331 m_writeLock.unlock(); 332 333 return true; 334 } 335 336 template <typename T> 337 void BlockBuffer<T>::flushWriteBlock (void) 338 { 339 DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize)); 340 341 m_numUsedInBlock[m_writeBlock] = m_writePos; 342 m_writeBlock = (m_writeBlock+1) % m_numBlocks; 343 m_writePos = 0; 344 deSemaphore_increment(m_fill); 345 } 346 347 template <typename T> 348 int BlockBuffer<T>::tryRead (int numElements, T* elements) 349 { 350 int numRead = 0; 351 352 if (m_canceled) 353 throw CanceledException(); 354 355 if (!m_readLock.tryLock()) 356 return numRead; 357 358 while (numRead < numElements) 359 { 360 int ret = readFromCurrentBlock(numElements-numRead, &elements[numRead], false /* non-blocking */); 361 362 if (ret == 0) 363 break; /* Failed. */ 364 365 numRead += ret; 366 } 367 368 m_readLock.unlock(); 369 370 return numRead; 371 } 372 373 template <typename T> 374 void BlockBuffer<T>::read (int numElements, T* elements) 375 { 376 DE_ASSERT(numElements > 0 && elements != DE_NULL); 377 378 if (m_canceled) 379 throw CanceledException(); 380 381 m_readLock.lock(); 382 383 int numRead = 0; 384 while (numRead < numElements) 385 numRead += readFromCurrentBlock(numElements-numRead, &elements[numRead], true /* blocking */); 386 387 m_readLock.unlock(); 388 } 389 390 } // de 391 392 #endif // _DEBLOCKBUFFER_HPP 393