Home | History | Annotate | Download | only in destream
      1 /*-------------------------------------------------------------------------
      2  * drawElements Stream Library
      3  * ---------------------------
      4  *
      5  * Copyright 2014 The Android Open Source Project
      6  *
      7  * Licensed under the Apache License, Version 2.0 (the "License");
      8  * you may not use this file except in compliance with the License.
      9  * You may obtain a copy of the License at
     10  *
     11  *      http://www.apache.org/licenses/LICENSE-2.0
     12  *
     13  * Unless required by applicable law or agreed to in writing, software
     14  * distributed under the License is distributed on an "AS IS" BASIS,
     15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     16  * See the License for the specific language governing permissions and
     17  * limitations under the License.
     18  *
     19  *//*!
     20  * \file
     21  * \brief Buffered and threaded input and output streams
     22  *//*--------------------------------------------------------------------*/
     23 
     24 #include "deThreadStream.h"
     25 #include "deStreamCpyThread.h"
     26 #include "deRingbuffer.h"
     27 #include "stdlib.h"
     28 
     29 typedef struct deThreadInStream_s
     30 {
     31 	deRingbuffer*		ringbuffer;
     32 	deInStream*			input;
     33 	deInStream			consumerStream;
     34 	deOutStream			producerStream;
     35 	deThread			thread;
     36 	int					bufferSize;
     37 } deThreadInStream;
     38 
     39 typedef struct deThreadOutStream_s
     40 {
     41 	deRingbuffer*		ringbuffer;
     42 	deInStream			consumerStream;
     43 	deOutStream			producerStream;
     44 	deStreamCpyThread*	thread;
     45 } deThreadOutStream;
     46 
     47 static void inStreamCopy (void* arg)
     48 {
     49 	deThreadInStream* threadStream = (deThreadInStream*)arg;
     50 
     51 	deUint8* buffer = malloc(sizeof(deUint8) * threadStream->bufferSize);
     52 
     53 	for(;;)
     54 	{
     55 		deInt32 read	= 0;
     56 		deInt32 written	= 0;
     57 		deStreamResult readResult = DE_STREAMRESULT_ERROR;
     58 
     59 		readResult = deInStream_read(threadStream->input, buffer, threadStream->bufferSize, &read);
     60 		DE_ASSERT(readResult != DE_STREAMRESULT_ERROR);
     61 		while (written < read)
     62 		{
     63 			deInt32 wrote = 0;
     64 
     65 			/* \todo [mika] Handle errors */
     66 			deOutStream_write(&(threadStream->producerStream), buffer, read - written, &wrote);
     67 
     68 			written += wrote;
     69 		}
     70 
     71 		if (readResult == DE_STREAMRESULT_END_OF_STREAM)
     72 		{
     73 			break;
     74 		}
     75 	}
     76 
     77 	deOutStream_flush(&(threadStream->producerStream));
     78 	deRingbuffer_stop(threadStream->ringbuffer);
     79 	free(buffer);
     80 
     81 }
     82 
     83 static deStreamResult threadInStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* numRead)
     84 {
     85 	deThreadInStream* threadStream = (deThreadInStream*)stream;
     86 	return deInStream_read(&(threadStream->consumerStream), buf, bufSize, numRead);
     87 }
     88 
     89 static const char* threadInStream_getError (deStreamData* stream)
     90 {
     91 	deThreadInStream* threadStream = (deThreadInStream*)stream;
     92 
     93 	/* \todo [mika] Add handling for errors on thread stream */
     94 	return deInStream_getError(&(threadStream->consumerStream));
     95 }
     96 
     97 static deStreamStatus threadInStream_getStatus (deStreamData* stream)
     98 {
     99 	deThreadInStream* threadStream = (deThreadInStream*)stream;
    100 
    101 	/* \todo [mika] Add handling for status on thread stream */
    102 	return deInStream_getStatus(&(threadStream->consumerStream));
    103 }
    104 
    105 /* \note [mika] Used by both in and out stream */
    106 static deStreamResult threadStream_deinit (deStreamData* stream)
    107 {
    108 	deThreadInStream* threadStream = (deThreadInStream*)stream;
    109 
    110 	deRingbuffer_stop(threadStream->ringbuffer);
    111 
    112 	deThread_join(threadStream->thread);
    113 	deThread_destroy(threadStream->thread);
    114 
    115 	deOutStream_deinit(&(threadStream->producerStream));
    116 	deInStream_deinit(&(threadStream->consumerStream));
    117 
    118 	deRingbuffer_destroy(threadStream->ringbuffer);
    119 
    120 	return DE_STREAMRESULT_SUCCESS;
    121 }
    122 
    123 static const deIOStreamVFTable threadInStreamVFTable = {
    124 	threadInStream_read,
    125 	DE_NULL,
    126 	threadInStream_getError,
    127 	DE_NULL,
    128 	threadStream_deinit,
    129 	threadInStream_getStatus
    130 };
    131 
    132 void deThreadInStream_init (deInStream* stream, deInStream* input, int ringbufferBlockSize, int ringbufferBlockCount)
    133 {
    134 	deThreadInStream* threadStream = DE_NULL;
    135 
    136 	threadStream = malloc(sizeof(deThreadInStream));
    137 	DE_ASSERT(threadStream);
    138 
    139 	threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
    140 	DE_ASSERT(threadStream->ringbuffer);
    141 
    142 	threadStream->bufferSize = ringbufferBlockSize;
    143 	threadStream->input = input;
    144 	deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
    145 	deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
    146 
    147 	threadStream->thread		= deThread_create(inStreamCopy, threadStream, DE_NULL);
    148 	stream->ioStream.vfTable 	= &threadInStreamVFTable;
    149 	stream->ioStream.streamData = threadStream;
    150 }
    151 
    152 static deStreamResult threadOutStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* numWritten)
    153 {
    154 	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
    155 	return deOutStream_write(&(threadStream->producerStream), buf, bufSize, numWritten);
    156 }
    157 
    158 static const char* threadOutStream_getError (deStreamData* stream)
    159 {
    160 	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
    161 
    162 	/* \todo [mika] Add handling for errors on thread stream */
    163 	return deOutStream_getError(&(threadStream->producerStream));
    164 }
    165 
    166 static deStreamStatus threadOutStream_getStatus (deStreamData* stream)
    167 {
    168 	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
    169 
    170 	/* \todo [mika] Add handling for errors on thread stream */
    171 	return deOutStream_getStatus(&(threadStream->producerStream));
    172 }
    173 
    174 static deStreamResult threadOutStream_flush (deStreamData* stream)
    175 {
    176 	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
    177 
    178 	return deOutStream_flush(&(threadStream->producerStream));
    179 }
    180 
    181 static const deIOStreamVFTable threadOutStreamVFTable = {
    182 	DE_NULL,
    183 	threadOutStream_write,
    184 	threadOutStream_getError,
    185 	threadOutStream_flush,
    186 	threadStream_deinit,
    187 	threadOutStream_getStatus
    188 };
    189 
    190 void deThreadOutStream_init (deOutStream* stream, deOutStream* output, int ringbufferBlockSize, int ringbufferBlockCount)
    191 {
    192 	deThreadOutStream* threadStream = DE_NULL;
    193 
    194 	threadStream = malloc(sizeof(deThreadOutStream));
    195 	DE_ASSERT(threadStream);
    196 
    197 	threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
    198 	DE_ASSERT(threadStream->ringbuffer);
    199 
    200 	deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
    201 	deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
    202 
    203 	threadStream->thread		= deStreamCpyThread_create(&(threadStream->consumerStream), output, ringbufferBlockSize);
    204 	stream->ioStream.vfTable 	= &threadOutStreamVFTable;
    205 	stream->ioStream.streamData = threadStream;
    206 }
    207 
    208