1 /* MtCoder.c -- Multi-thread Coder 2 2010-09-24 : Igor Pavlov : Public domain */ 3 4 #include <stdio.h> 5 6 #include "MtCoder.h" 7 8 void LoopThread_Construct(CLoopThread *p) 9 { 10 Thread_Construct(&p->thread); 11 Event_Construct(&p->startEvent); 12 Event_Construct(&p->finishedEvent); 13 } 14 15 void LoopThread_Close(CLoopThread *p) 16 { 17 Thread_Close(&p->thread); 18 Event_Close(&p->startEvent); 19 Event_Close(&p->finishedEvent); 20 } 21 22 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp) 23 { 24 CLoopThread *p = (CLoopThread *)pp; 25 for (;;) 26 { 27 if (Event_Wait(&p->startEvent) != 0) 28 return SZ_ERROR_THREAD; 29 if (p->stop) 30 return 0; 31 p->res = p->func(p->param); 32 if (Event_Set(&p->finishedEvent) != 0) 33 return SZ_ERROR_THREAD; 34 } 35 } 36 37 WRes LoopThread_Create(CLoopThread *p) 38 { 39 p->stop = 0; 40 RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent)); 41 RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent)); 42 return Thread_Create(&p->thread, LoopThreadFunc, p); 43 } 44 45 WRes LoopThread_StopAndWait(CLoopThread *p) 46 { 47 p->stop = 1; 48 if (Event_Set(&p->startEvent) != 0) 49 return SZ_ERROR_THREAD; 50 return Thread_Wait(&p->thread); 51 } 52 53 WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); } 54 WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); } 55 56 static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize) 57 { 58 return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK; 59 } 60 61 static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress) 62 { 63 unsigned i; 64 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++) 65 p->inSizes[i] = p->outSizes[i] = 0; 66 p->totalInSize = p->totalOutSize = 0; 67 p->progress = progress; 68 p->res = SZ_OK; 69 } 70 71 static void MtProgress_Reinit(CMtProgress *p, unsigned index) 72 { 73 p->inSizes[index] = 0; 74 p->outSizes[index] = 0; 75 } 76 77 #define UPDATE_PROGRESS(size, prev, total) \ 78 if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; } 79 80 SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize) 81 { 82 SRes res; 83 CriticalSection_Enter(&p->cs); 84 UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize) 85 UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize) 86 if (p->res == SZ_OK) 87 p->res = Progress(p->progress, p->totalInSize, p->totalOutSize); 88 res = p->res; 89 CriticalSection_Leave(&p->cs); 90 return res; 91 } 92 93 static void MtProgress_SetError(CMtProgress *p, SRes res) 94 { 95 CriticalSection_Enter(&p->cs); 96 if (p->res == SZ_OK) 97 p->res = res; 98 CriticalSection_Leave(&p->cs); 99 } 100 101 static void MtCoder_SetError(CMtCoder* p, SRes res) 102 { 103 CriticalSection_Enter(&p->cs); 104 if (p->res == SZ_OK) 105 p->res = res; 106 CriticalSection_Leave(&p->cs); 107 } 108 109 /* ---------- MtThread ---------- */ 110 111 void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder) 112 { 113 p->mtCoder = mtCoder; 114 p->outBuf = 0; 115 p->inBuf = 0; 116 Event_Construct(&p->canRead); 117 Event_Construct(&p->canWrite); 118 LoopThread_Construct(&p->thread); 119 } 120 121 #define RINOK_THREAD(x) { if((x) != 0) return SZ_ERROR_THREAD; } 122 123 static void CMtThread_CloseEvents(CMtThread *p) 124 { 125 Event_Close(&p->canRead); 126 Event_Close(&p->canWrite); 127 } 128 129 static void CMtThread_Destruct(CMtThread *p) 130 { 131 CMtThread_CloseEvents(p); 132 133 if (Thread_WasCreated(&p->thread.thread)) 134 { 135 LoopThread_StopAndWait(&p->thread); 136 LoopThread_Close(&p->thread); 137 } 138 139 if (p->mtCoder->alloc) 140 IAlloc_Free(p->mtCoder->alloc, p->outBuf); 141 p->outBuf = 0; 142 143 if (p->mtCoder->alloc) 144 IAlloc_Free(p->mtCoder->alloc, p->inBuf); 145 p->inBuf = 0; 146 } 147 148 #define MY_BUF_ALLOC(buf, size, newSize) \ 149 if (buf == 0 || size != newSize) \ 150 { IAlloc_Free(p->mtCoder->alloc, buf); \ 151 size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \ 152 if (buf == 0) return SZ_ERROR_MEM; } 153 154 static SRes CMtThread_Prepare(CMtThread *p) 155 { 156 MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize) 157 MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize) 158 159 p->stopReading = False; 160 p->stopWriting = False; 161 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead)); 162 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite)); 163 164 return SZ_OK; 165 } 166 167 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize) 168 { 169 size_t size = *processedSize; 170 *processedSize = 0; 171 while (size != 0) 172 { 173 size_t curSize = size; 174 SRes res = stream->Read(stream, data, &curSize); 175 *processedSize += curSize; 176 data += curSize; 177 size -= curSize; 178 RINOK(res); 179 if (curSize == 0) 180 return SZ_OK; 181 } 182 return SZ_OK; 183 } 184 185 #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads - 1 ? 0 : p->index + 1] 186 187 static SRes MtThread_Process(CMtThread *p, Bool *stop) 188 { 189 CMtThread *next; 190 *stop = True; 191 if (Event_Wait(&p->canRead) != 0) 192 return SZ_ERROR_THREAD; 193 194 next = GET_NEXT_THREAD(p); 195 196 if (p->stopReading) 197 { 198 next->stopReading = True; 199 return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD; 200 } 201 202 { 203 size_t size = p->mtCoder->blockSize; 204 size_t destSize = p->outBufSize; 205 206 RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size)); 207 next->stopReading = *stop = (size != p->mtCoder->blockSize); 208 if (Event_Set(&next->canRead) != 0) 209 return SZ_ERROR_THREAD; 210 211 RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index, 212 p->outBuf, &destSize, p->inBuf, size, *stop)); 213 214 MtProgress_Reinit(&p->mtCoder->mtProgress, p->index); 215 216 if (Event_Wait(&p->canWrite) != 0) 217 return SZ_ERROR_THREAD; 218 if (p->stopWriting) 219 return SZ_ERROR_FAIL; 220 if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize) 221 return SZ_ERROR_WRITE; 222 return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD; 223 } 224 } 225 226 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp) 227 { 228 CMtThread *p = (CMtThread *)pp; 229 for (;;) 230 { 231 Bool stop; 232 CMtThread *next = GET_NEXT_THREAD(p); 233 SRes res = MtThread_Process(p, &stop); 234 if (res != SZ_OK) 235 { 236 MtCoder_SetError(p->mtCoder, res); 237 MtProgress_SetError(&p->mtCoder->mtProgress, res); 238 next->stopReading = True; 239 next->stopWriting = True; 240 Event_Set(&next->canRead); 241 Event_Set(&next->canWrite); 242 return res; 243 } 244 if (stop) 245 return 0; 246 } 247 } 248 249 void MtCoder_Construct(CMtCoder* p) 250 { 251 unsigned i; 252 p->alloc = 0; 253 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++) 254 { 255 CMtThread *t = &p->threads[i]; 256 t->index = i; 257 CMtThread_Construct(t, p); 258 } 259 CriticalSection_Init(&p->cs); 260 CriticalSection_Init(&p->mtProgress.cs); 261 } 262 263 void MtCoder_Destruct(CMtCoder* p) 264 { 265 unsigned i; 266 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++) 267 CMtThread_Destruct(&p->threads[i]); 268 CriticalSection_Delete(&p->cs); 269 CriticalSection_Delete(&p->mtProgress.cs); 270 } 271 272 SRes MtCoder_Code(CMtCoder *p) 273 { 274 unsigned i, numThreads = p->numThreads; 275 SRes res = SZ_OK; 276 p->res = SZ_OK; 277 278 MtProgress_Init(&p->mtProgress, p->progress); 279 280 for (i = 0; i < numThreads; i++) 281 { 282 RINOK(CMtThread_Prepare(&p->threads[i])); 283 } 284 285 for (i = 0; i < numThreads; i++) 286 { 287 CMtThread *t = &p->threads[i]; 288 CLoopThread *lt = &t->thread; 289 290 if (!Thread_WasCreated(<->thread)) 291 { 292 lt->func = ThreadFunc; 293 lt->param = t; 294 295 if (LoopThread_Create(lt) != SZ_OK) 296 { 297 res = SZ_ERROR_THREAD; 298 break; 299 } 300 } 301 } 302 303 if (res == SZ_OK) 304 { 305 unsigned j; 306 for (i = 0; i < numThreads; i++) 307 { 308 CMtThread *t = &p->threads[i]; 309 if (LoopThread_StartSubThread(&t->thread) != SZ_OK) 310 { 311 res = SZ_ERROR_THREAD; 312 p->threads[0].stopReading = True; 313 break; 314 } 315 } 316 317 Event_Set(&p->threads[0].canWrite); 318 Event_Set(&p->threads[0].canRead); 319 320 for (j = 0; j < i; j++) 321 LoopThread_WaitSubThread(&p->threads[j].thread); 322 } 323 324 for (i = 0; i < numThreads; i++) 325 CMtThread_CloseEvents(&p->threads[i]); 326 return (res == SZ_OK) ? p->res : res; 327 } 328