1 /*------------------------------------------------------------------------- 2 * drawElements Stream Library 3 * --------------------------- 4 * 5 * Copyright 2014 The Android Open Source Project 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 *//*! 20 * \file 21 * \brief Buffered and threaded input and output streams 22 *//*--------------------------------------------------------------------*/ 23 24 #include "deThreadStream.h" 25 #include "deStreamCpyThread.h" 26 #include "deRingbuffer.h" 27 #include "stdlib.h" 28 29 typedef struct deThreadInStream_s 30 { 31 deRingbuffer* ringbuffer; 32 deInStream* input; 33 deInStream consumerStream; 34 deOutStream producerStream; 35 deThread thread; 36 int bufferSize; 37 } deThreadInStream; 38 39 typedef struct deThreadOutStream_s 40 { 41 deRingbuffer* ringbuffer; 42 deInStream consumerStream; 43 deOutStream producerStream; 44 deStreamCpyThread* thread; 45 } deThreadOutStream; 46 47 static void inStreamCopy (void* arg) 48 { 49 deThreadInStream* threadStream = (deThreadInStream*)arg; 50 51 deUint8* buffer = malloc(sizeof(deUint8) * threadStream->bufferSize); 52 53 for(;;) 54 { 55 deInt32 read = 0; 56 deInt32 written = 0; 57 deStreamResult readResult = DE_STREAMRESULT_ERROR; 58 59 readResult = deInStream_read(threadStream->input, buffer, threadStream->bufferSize, &read); 60 DE_ASSERT(readResult != DE_STREAMRESULT_ERROR); 61 while (written < read) 62 { 63 deInt32 wrote = 0; 64 65 /* \todo [mika] Handle errors */ 66 deOutStream_write(&(threadStream->producerStream), buffer, read - written, &wrote); 67 68 written += wrote; 69 } 70 71 if (readResult == DE_STREAMRESULT_END_OF_STREAM) 72 { 73 break; 74 } 75 } 76 77 deOutStream_flush(&(threadStream->producerStream)); 78 deRingbuffer_stop(threadStream->ringbuffer); 79 free(buffer); 80 81 } 82 83 static deStreamResult threadInStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* numRead) 84 { 85 deThreadInStream* threadStream = (deThreadInStream*)stream; 86 return deInStream_read(&(threadStream->consumerStream), buf, bufSize, numRead); 87 } 88 89 static const char* threadInStream_getError (deStreamData* stream) 90 { 91 deThreadInStream* threadStream = (deThreadInStream*)stream; 92 93 /* \todo [mika] Add handling for errors on thread stream */ 94 return deInStream_getError(&(threadStream->consumerStream)); 95 } 96 97 static deStreamStatus threadInStream_getStatus (deStreamData* stream) 98 { 99 deThreadInStream* threadStream = (deThreadInStream*)stream; 100 101 /* \todo [mika] Add handling for status on thread stream */ 102 return deInStream_getStatus(&(threadStream->consumerStream)); 103 } 104 105 /* \note [mika] Used by both in and out stream */ 106 static deStreamResult threadStream_deinit (deStreamData* stream) 107 { 108 deThreadInStream* threadStream = (deThreadInStream*)stream; 109 110 deRingbuffer_stop(threadStream->ringbuffer); 111 112 deThread_join(threadStream->thread); 113 deThread_destroy(threadStream->thread); 114 115 deOutStream_deinit(&(threadStream->producerStream)); 116 deInStream_deinit(&(threadStream->consumerStream)); 117 118 deRingbuffer_destroy(threadStream->ringbuffer); 119 120 return DE_STREAMRESULT_SUCCESS; 121 } 122 123 static const deIOStreamVFTable threadInStreamVFTable = { 124 threadInStream_read, 125 DE_NULL, 126 threadInStream_getError, 127 DE_NULL, 128 threadStream_deinit, 129 threadInStream_getStatus 130 }; 131 132 void deThreadInStream_init (deInStream* stream, deInStream* input, int ringbufferBlockSize, int ringbufferBlockCount) 133 { 134 deThreadInStream* threadStream = DE_NULL; 135 136 threadStream = malloc(sizeof(deThreadInStream)); 137 DE_ASSERT(threadStream); 138 139 threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount); 140 DE_ASSERT(threadStream->ringbuffer); 141 142 threadStream->bufferSize = ringbufferBlockSize; 143 threadStream->input = input; 144 deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer); 145 deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer); 146 147 threadStream->thread = deThread_create(inStreamCopy, threadStream, DE_NULL); 148 stream->ioStream.vfTable = &threadInStreamVFTable; 149 stream->ioStream.streamData = threadStream; 150 } 151 152 static deStreamResult threadOutStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* numWritten) 153 { 154 deThreadOutStream* threadStream = (deThreadOutStream*)stream; 155 return deOutStream_write(&(threadStream->producerStream), buf, bufSize, numWritten); 156 } 157 158 static const char* threadOutStream_getError (deStreamData* stream) 159 { 160 deThreadOutStream* threadStream = (deThreadOutStream*)stream; 161 162 /* \todo [mika] Add handling for errors on thread stream */ 163 return deOutStream_getError(&(threadStream->producerStream)); 164 } 165 166 static deStreamStatus threadOutStream_getStatus (deStreamData* stream) 167 { 168 deThreadOutStream* threadStream = (deThreadOutStream*)stream; 169 170 /* \todo [mika] Add handling for errors on thread stream */ 171 return deOutStream_getStatus(&(threadStream->producerStream)); 172 } 173 174 static deStreamResult threadOutStream_flush (deStreamData* stream) 175 { 176 deThreadOutStream* threadStream = (deThreadOutStream*)stream; 177 178 return deOutStream_flush(&(threadStream->producerStream)); 179 } 180 181 static const deIOStreamVFTable threadOutStreamVFTable = { 182 DE_NULL, 183 threadOutStream_write, 184 threadOutStream_getError, 185 threadOutStream_flush, 186 threadStream_deinit, 187 threadOutStream_getStatus 188 }; 189 190 void deThreadOutStream_init (deOutStream* stream, deOutStream* output, int ringbufferBlockSize, int ringbufferBlockCount) 191 { 192 deThreadOutStream* threadStream = DE_NULL; 193 194 threadStream = malloc(sizeof(deThreadOutStream)); 195 DE_ASSERT(threadStream); 196 197 threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount); 198 DE_ASSERT(threadStream->ringbuffer); 199 200 deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer); 201 deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer); 202 203 threadStream->thread = deStreamCpyThread_create(&(threadStream->consumerStream), output, ringbufferBlockSize); 204 stream->ioStream.vfTable = &threadOutStreamVFTable; 205 stream->ioStream.streamData = threadStream; 206 } 207 208