1 /*------------------------------------------------------------------------- 2 * drawElements Stream Library 3 * --------------------------- 4 * 5 * Copyright 2014 The Android Open Source Project 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 *//*! 20 * \file 21 * \brief Thread safe ringbuffer 22 *//*--------------------------------------------------------------------*/ 23 #include "deRingbuffer.h" 24 25 #include "deInt32.h" 26 #include "deMemory.h" 27 #include "deSemaphore.h" 28 29 #include <stdlib.h> 30 #include <stdio.h> 31 32 struct deRingbuffer_s 33 { 34 deInt32 blockSize; 35 deInt32 blockCount; 36 deInt32* blockUsage; 37 deUint8* buffer; 38 39 deSemaphore emptyCount; 40 deSemaphore fullCount; 41 42 deInt32 outBlock; 43 deInt32 outPos; 44 45 deInt32 inBlock; 46 deInt32 inPos; 47 48 deBool stopNotified; 49 deBool consumerStopping; 50 }; 51 52 deRingbuffer* deRingbuffer_create (deInt32 blockSize, deInt32 blockCount) 53 { 54 deRingbuffer* ringbuffer = (deRingbuffer*)deCalloc(sizeof(deRingbuffer)); 55 56 DE_ASSERT(ringbuffer); 57 DE_ASSERT(blockCount > 0); 58 DE_ASSERT(blockSize > 0); 59 60 ringbuffer->blockSize = blockSize; 61 ringbuffer->blockCount = blockCount; 62 ringbuffer->buffer = (deUint8*)deMalloc(sizeof(deUint8) * (size_t)blockSize * (size_t)blockCount); 63 ringbuffer->blockUsage = (deInt32*)deMalloc(sizeof(deUint32) * (size_t)blockCount); 64 ringbuffer->emptyCount = deSemaphore_create(ringbuffer->blockCount, DE_NULL); 65 ringbuffer->fullCount = deSemaphore_create(0, DE_NULL); 66 67 if (!ringbuffer->buffer || 68 !ringbuffer->blockUsage || 69 !ringbuffer->emptyCount || 70 !ringbuffer->fullCount) 71 { 72 if (ringbuffer->emptyCount) 73 deSemaphore_destroy(ringbuffer->emptyCount); 74 if (ringbuffer->fullCount) 75 deSemaphore_destroy(ringbuffer->fullCount); 76 deFree(ringbuffer->buffer); 77 deFree(ringbuffer->blockUsage); 78 deFree(ringbuffer); 79 return DE_NULL; 80 } 81 82 memset(ringbuffer->blockUsage, 0, sizeof(deInt32) * (size_t)blockCount); 83 84 ringbuffer->outBlock = 0; 85 ringbuffer->outPos = 0; 86 87 ringbuffer->inBlock = 0; 88 ringbuffer->inPos = 0; 89 90 ringbuffer->stopNotified = DE_FALSE; 91 ringbuffer->consumerStopping = DE_FALSE; 92 93 return ringbuffer; 94 } 95 96 void deRingbuffer_stop (deRingbuffer* ringbuffer) 97 { 98 /* Set notify to true and increment fullCount to let consumer continue */ 99 ringbuffer->stopNotified = DE_TRUE; 100 deSemaphore_increment(ringbuffer->fullCount); 101 } 102 103 void deRingbuffer_destroy (deRingbuffer* ringbuffer) 104 { 105 deSemaphore_destroy(ringbuffer->emptyCount); 106 deSemaphore_destroy(ringbuffer->fullCount); 107 108 free(ringbuffer->buffer); 109 free(ringbuffer->blockUsage); 110 free(ringbuffer); 111 } 112 113 static deStreamResult producerStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* written) 114 { 115 deRingbuffer* ringbuffer = (deRingbuffer*)stream; 116 117 DE_ASSERT(stream); 118 /* If ringbuffer is stopping return error on write */ 119 if (ringbuffer->stopNotified) 120 { 121 DE_ASSERT(DE_FALSE); 122 return DE_STREAMRESULT_ERROR; 123 } 124 125 *written = 0; 126 127 /* Write while more data available */ 128 while (*written < bufSize) 129 { 130 deInt32 writeSize = 0; 131 deUint8* src = DE_NULL; 132 deUint8* dst = DE_NULL; 133 134 /* If between blocks accuire new block */ 135 if (ringbuffer->inPos == 0) 136 { 137 deSemaphore_decrement(ringbuffer->emptyCount); 138 } 139 140 writeSize = deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written); 141 dst = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos; 142 src = (deUint8*)buf + *written; 143 144 deMemcpy(dst, src, (size_t)writeSize); 145 146 ringbuffer->inPos += writeSize; 147 *written += writeSize; 148 ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize; 149 150 /* Block is full move to next one (or "between" this and next block) */ 151 if (ringbuffer->inPos == ringbuffer->blockSize) 152 { 153 ringbuffer->inPos = 0; 154 ringbuffer->inBlock++; 155 156 if (ringbuffer->inBlock == ringbuffer->blockCount) 157 ringbuffer->inBlock = 0; 158 deSemaphore_increment(ringbuffer->fullCount); 159 } 160 } 161 162 return DE_STREAMRESULT_SUCCESS; 163 } 164 165 static deStreamResult producerStream_flush (deStreamData* stream) 166 { 167 deRingbuffer* ringbuffer = (deRingbuffer*)stream; 168 169 DE_ASSERT(stream); 170 171 /* No blocks reserved by producer */ 172 if (ringbuffer->inPos == 0) 173 return DE_STREAMRESULT_SUCCESS; 174 175 ringbuffer->inPos = 0; 176 ringbuffer->inBlock++; 177 178 if (ringbuffer->inBlock == ringbuffer->blockCount) 179 ringbuffer->inBlock = 0; 180 181 deSemaphore_increment(ringbuffer->fullCount); 182 return DE_STREAMRESULT_SUCCESS; 183 } 184 185 static deStreamResult producerStream_deinit (deStreamData* stream) 186 { 187 DE_ASSERT(stream); 188 189 producerStream_flush(stream); 190 191 /* \note mika Stream doesn't own ringbuffer, so it's not deallocated */ 192 return DE_STREAMRESULT_SUCCESS; 193 } 194 195 static deStreamResult consumerStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* read) 196 { 197 deRingbuffer* ringbuffer = (deRingbuffer*)stream; 198 199 DE_ASSERT(stream); 200 201 *read = 0; 202 DE_ASSERT(ringbuffer); 203 204 while (*read < bufSize) 205 { 206 deInt32 writeSize = 0; 207 deUint8* src = DE_NULL; 208 deUint8* dst = DE_NULL; 209 210 /* If between blocks accuire new block */ 211 if (ringbuffer->outPos == 0) 212 { 213 /* If consumer is set to stop after everything is consumed, 214 * do not block if there is no more input left 215 */ 216 if (ringbuffer->consumerStopping) 217 { 218 /* Try to accuire new block, if can't there is no more input */ 219 if (!deSemaphore_tryDecrement(ringbuffer->fullCount)) 220 { 221 return DE_STREAMRESULT_END_OF_STREAM; 222 } 223 } 224 else 225 { 226 /* If not stopping block until there is more input */ 227 deSemaphore_decrement(ringbuffer->fullCount); 228 /* Ringbuffer was set to stop */ 229 if (ringbuffer->stopNotified) 230 { 231 ringbuffer->consumerStopping = DE_TRUE; 232 } 233 } 234 235 } 236 237 writeSize = deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read); 238 src = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos; 239 dst = (deUint8*)buf + *read; 240 241 deMemcpy(dst, src, (size_t)writeSize); 242 243 ringbuffer->outPos += writeSize; 244 *read += writeSize; 245 246 /* Block is consumed move to next one (or "between" this and next block) */ 247 if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock]) 248 { 249 ringbuffer->blockUsage[ringbuffer->outBlock] = 0; 250 ringbuffer->outPos = 0; 251 ringbuffer->outBlock++; 252 253 if (ringbuffer->outBlock == ringbuffer->blockCount) 254 ringbuffer->outBlock = 0; 255 256 deSemaphore_increment(ringbuffer->emptyCount); 257 } 258 } 259 260 return DE_STREAMRESULT_SUCCESS; 261 } 262 263 264 static deStreamResult consumerStream_deinit (deStreamData* stream) 265 { 266 DE_ASSERT(stream); 267 DE_UNREF(stream); 268 269 return DE_STREAMRESULT_SUCCESS; 270 } 271 272 /* There are no sensible errors so status is always good */ 273 deStreamStatus dummy_getStatus (deStreamData* stream) 274 { 275 DE_UNREF(stream); 276 277 return DE_STREAMSTATUS_GOOD; 278 } 279 280 /* There are no sensible errors in ringbuffer */ 281 static const char* dummy_getError (deStreamData* stream) 282 { 283 DE_ASSERT(stream); 284 DE_UNREF(stream); 285 return DE_NULL; 286 } 287 288 static const deIOStreamVFTable producerStreamVFTable = { 289 DE_NULL, 290 producerStream_write, 291 dummy_getError, 292 producerStream_flush, 293 producerStream_deinit, 294 dummy_getStatus 295 }; 296 297 static const deIOStreamVFTable consumerStreamVFTable = { 298 consumerStream_read, 299 DE_NULL, 300 dummy_getError, 301 DE_NULL, 302 consumerStream_deinit, 303 dummy_getStatus 304 }; 305 306 void deProducerStream_init (deOutStream* stream, deRingbuffer* buffer) 307 { 308 stream->ioStream.streamData = (deStreamData*)buffer; 309 stream->ioStream.vfTable = &producerStreamVFTable; 310 } 311 312 void deConsumerStream_init (deInStream* stream, deRingbuffer* buffer) 313 { 314 stream->ioStream.streamData = (deStreamData*)buffer; 315 stream->ioStream.vfTable = &consumerStreamVFTable; 316 } 317