Home | History | Annotate | Download | only in destream
      1 /*-------------------------------------------------------------------------
      2  * drawElements Stream Library
      3  * ---------------------------
      4  *
      5  * Copyright 2014 The Android Open Source Project
      6  *
      7  * Licensed under the Apache License, Version 2.0 (the "License");
      8  * you may not use this file except in compliance with the License.
      9  * You may obtain a copy of the License at
     10  *
     11  *      http://www.apache.org/licenses/LICENSE-2.0
     12  *
     13  * Unless required by applicable law or agreed to in writing, software
     14  * distributed under the License is distributed on an "AS IS" BASIS,
     15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     16  * See the License for the specific language governing permissions and
     17  * limitations under the License.
     18  *
     19  *//*!
     20  * \file
     21  * \brief Thread safe ringbuffer
     22  *//*--------------------------------------------------------------------*/
     23 #include "deRingbuffer.h"
     24 
     25 #include "deInt32.h"
     26 #include "deMemory.h"
     27 #include "deSemaphore.h"
     28 
     29 #include <stdlib.h>
     30 #include <stdio.h>
     31 
     32 struct deRingbuffer_s
     33 {
     34 	deInt32			blockSize;
     35 	deInt32			blockCount;
     36 	deInt32*		blockUsage;
     37 	deUint8*		buffer;
     38 
     39 	deSemaphore		emptyCount;
     40 	deSemaphore		fullCount;
     41 
     42 	deInt32			outBlock;
     43 	deInt32			outPos;
     44 
     45 	deInt32			inBlock;
     46 	deInt32			inPos;
     47 
     48 	deBool			stopNotified;
     49 	deBool			consumerStopping;
     50 };
     51 
     52 deRingbuffer* deRingbuffer_create (deInt32 blockSize, deInt32 blockCount)
     53 {
     54 	deRingbuffer* ringbuffer = (deRingbuffer*)deCalloc(sizeof(deRingbuffer));
     55 
     56 	DE_ASSERT(ringbuffer);
     57 	DE_ASSERT(blockCount > 0);
     58 	DE_ASSERT(blockSize > 0);
     59 
     60 	ringbuffer->blockSize	= blockSize;
     61 	ringbuffer->blockCount	= blockCount;
     62 	ringbuffer->buffer		= (deUint8*)deMalloc(sizeof(deUint8) * (size_t)blockSize * (size_t)blockCount);
     63 	ringbuffer->blockUsage	= (deInt32*)deMalloc(sizeof(deUint32) * (size_t)blockCount);
     64 	ringbuffer->emptyCount	= deSemaphore_create(ringbuffer->blockCount, DE_NULL);
     65 	ringbuffer->fullCount	= deSemaphore_create(0, DE_NULL);
     66 
     67 	if (!ringbuffer->buffer		||
     68 		!ringbuffer->blockUsage	||
     69 		!ringbuffer->emptyCount	||
     70 		!ringbuffer->fullCount)
     71 	{
     72 		if (ringbuffer->emptyCount)
     73 			deSemaphore_destroy(ringbuffer->emptyCount);
     74 		if (ringbuffer->fullCount)
     75 			deSemaphore_destroy(ringbuffer->fullCount);
     76 		deFree(ringbuffer->buffer);
     77 		deFree(ringbuffer->blockUsage);
     78 		deFree(ringbuffer);
     79 		return DE_NULL;
     80 	}
     81 
     82 	memset(ringbuffer->blockUsage, 0, sizeof(deInt32) * (size_t)blockCount);
     83 
     84 	ringbuffer->outBlock	= 0;
     85 	ringbuffer->outPos		= 0;
     86 
     87 	ringbuffer->inBlock		= 0;
     88 	ringbuffer->inPos		= 0;
     89 
     90 	ringbuffer->stopNotified		= DE_FALSE;
     91 	ringbuffer->consumerStopping	= DE_FALSE;
     92 
     93 	return ringbuffer;
     94 }
     95 
     96 void deRingbuffer_stop (deRingbuffer* ringbuffer)
     97 {
     98 	/* Set notify to true and increment fullCount to let consumer continue */
     99 	ringbuffer->stopNotified = DE_TRUE;
    100 	deSemaphore_increment(ringbuffer->fullCount);
    101 }
    102 
    103 void deRingbuffer_destroy (deRingbuffer* ringbuffer)
    104 {
    105 	deSemaphore_destroy(ringbuffer->emptyCount);
    106 	deSemaphore_destroy(ringbuffer->fullCount);
    107 
    108 	free(ringbuffer->buffer);
    109 	free(ringbuffer->blockUsage);
    110 	free(ringbuffer);
    111 }
    112 
    113 static deStreamResult producerStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* written)
    114 {
    115 	deRingbuffer* ringbuffer = (deRingbuffer*)stream;
    116 
    117 	DE_ASSERT(stream);
    118 	/* If ringbuffer is stopping return error on write */
    119 	if (ringbuffer->stopNotified)
    120 	{
    121 		DE_ASSERT(DE_FALSE);
    122 		return DE_STREAMRESULT_ERROR;
    123 	}
    124 
    125 	*written = 0;
    126 
    127 	/* Write while more data available */
    128 	while (*written < bufSize)
    129 	{
    130 		deInt32		writeSize	= 0;
    131 		deUint8*	src			= DE_NULL;
    132 		deUint8*	dst			= DE_NULL;
    133 
    134 		/* If between blocks accuire new block */
    135 		if (ringbuffer->inPos == 0)
    136 		{
    137 			deSemaphore_decrement(ringbuffer->emptyCount);
    138 		}
    139 
    140 		writeSize	= deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written);
    141 		dst			= ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos;
    142 		src			= (deUint8*)buf + *written;
    143 
    144 		deMemcpy(dst, src, (size_t)writeSize);
    145 
    146 		ringbuffer->inPos += writeSize;
    147 		*written += writeSize;
    148 		ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize;
    149 
    150 		/* Block is full move to next one (or "between" this and next block) */
    151 		if (ringbuffer->inPos == ringbuffer->blockSize)
    152 		{
    153 			ringbuffer->inPos = 0;
    154 			ringbuffer->inBlock++;
    155 
    156 			if (ringbuffer->inBlock == ringbuffer->blockCount)
    157 				ringbuffer->inBlock = 0;
    158 			deSemaphore_increment(ringbuffer->fullCount);
    159 		}
    160 	}
    161 
    162 	return DE_STREAMRESULT_SUCCESS;
    163 }
    164 
    165 static deStreamResult producerStream_flush (deStreamData* stream)
    166 {
    167 	deRingbuffer* ringbuffer = (deRingbuffer*)stream;
    168 
    169 	DE_ASSERT(stream);
    170 
    171 	/* No blocks reserved by producer */
    172 	if (ringbuffer->inPos == 0)
    173 		return DE_STREAMRESULT_SUCCESS;
    174 
    175 	ringbuffer->inPos		= 0;
    176 	ringbuffer->inBlock++;
    177 
    178 	if (ringbuffer->inBlock == ringbuffer->blockCount)
    179 		ringbuffer->inBlock = 0;
    180 
    181 	deSemaphore_increment(ringbuffer->fullCount);
    182 	return DE_STREAMRESULT_SUCCESS;
    183 }
    184 
    185 static deStreamResult producerStream_deinit (deStreamData* stream)
    186 {
    187 	DE_ASSERT(stream);
    188 
    189 	producerStream_flush(stream);
    190 
    191 	/* \note mika Stream doesn't own ringbuffer, so it's not deallocated */
    192 	return DE_STREAMRESULT_SUCCESS;
    193 }
    194 
    195 static deStreamResult consumerStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* read)
    196 {
    197 	deRingbuffer* ringbuffer = (deRingbuffer*)stream;
    198 
    199 	DE_ASSERT(stream);
    200 
    201 	*read = 0;
    202 	DE_ASSERT(ringbuffer);
    203 
    204 	while (*read < bufSize)
    205 	{
    206 		deInt32		writeSize	= 0;
    207 		deUint8*	src			= DE_NULL;
    208 		deUint8*	dst			= DE_NULL;
    209 
    210 		/* If between blocks accuire new block */
    211 		if (ringbuffer->outPos == 0)
    212 		{
    213 			/* If consumer is set to stop after everything is consumed,
    214 			 * do not block if there is no more input left
    215 			 */
    216 			if (ringbuffer->consumerStopping)
    217 			{
    218 				/* Try to accuire new block, if can't there is no more input */
    219 				if (!deSemaphore_tryDecrement(ringbuffer->fullCount))
    220 				{
    221 					return DE_STREAMRESULT_END_OF_STREAM;
    222 				}
    223 			}
    224 			else
    225 			{
    226 				/* If not stopping block until there is more input */
    227 				deSemaphore_decrement(ringbuffer->fullCount);
    228 				/* Ringbuffer was set to stop */
    229 				if (ringbuffer->stopNotified)
    230 				{
    231 					ringbuffer->consumerStopping = DE_TRUE;
    232 				}
    233 			}
    234 
    235 		}
    236 
    237 		writeSize	= deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read);
    238 		src			= ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos;
    239 		dst			= (deUint8*)buf + *read;
    240 
    241 		deMemcpy(dst, src, (size_t)writeSize);
    242 
    243 		ringbuffer->outPos += writeSize;
    244 		*read += writeSize;
    245 
    246 		/* Block is consumed move to next one (or "between" this and next block) */
    247 		if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock])
    248 		{
    249 			ringbuffer->blockUsage[ringbuffer->outBlock] = 0;
    250 			ringbuffer->outPos = 0;
    251 			ringbuffer->outBlock++;
    252 
    253 			if (ringbuffer->outBlock == ringbuffer->blockCount)
    254 				ringbuffer->outBlock = 0;
    255 
    256 			deSemaphore_increment(ringbuffer->emptyCount);
    257 		}
    258 	}
    259 
    260 	return DE_STREAMRESULT_SUCCESS;
    261 }
    262 
    263 
    264 static deStreamResult consumerStream_deinit (deStreamData* stream)
    265 {
    266 	DE_ASSERT(stream);
    267 	DE_UNREF(stream);
    268 
    269 	return DE_STREAMRESULT_SUCCESS;
    270 }
    271 
    272 /* There are no sensible errors so status is always good */
    273 deStreamStatus dummy_getStatus (deStreamData* stream)
    274 {
    275 	DE_UNREF(stream);
    276 
    277 	return DE_STREAMSTATUS_GOOD;
    278 }
    279 
    280 /* There are no sensible errors in ringbuffer */
    281 static const char* dummy_getError (deStreamData* stream)
    282 {
    283 	DE_ASSERT(stream);
    284 	DE_UNREF(stream);
    285 	return DE_NULL;
    286 }
    287 
    288 static const deIOStreamVFTable producerStreamVFTable = {
    289 	DE_NULL,
    290 	producerStream_write,
    291 	dummy_getError,
    292 	producerStream_flush,
    293 	producerStream_deinit,
    294 	dummy_getStatus
    295 };
    296 
    297 static const deIOStreamVFTable consumerStreamVFTable = {
    298 	consumerStream_read,
    299 	DE_NULL,
    300 	dummy_getError,
    301 	DE_NULL,
    302 	consumerStream_deinit,
    303 	dummy_getStatus
    304 };
    305 
    306 void deProducerStream_init (deOutStream* stream, deRingbuffer* buffer)
    307 {
    308 	stream->ioStream.streamData = (deStreamData*)buffer;
    309 	stream->ioStream.vfTable = &producerStreamVFTable;
    310 }
    311 
    312 void deConsumerStream_init (deInStream* stream, deRingbuffer* buffer)
    313 {
    314 	stream->ioStream.streamData = (deStreamData*)buffer;
    315 	stream->ioStream.vfTable = &consumerStreamVFTable;
    316 }
    317