1 /* MtCoder.c -- Multi-thread Coder 2 2010-09-24 : Igor Pavlov : Public domain */ 3 4 #include "Precomp.h" 5 6 #include <stdio.h> 7 8 #include "MtCoder.h" 9 10 void LoopThread_Construct(CLoopThread *p) 11 { 12 Thread_Construct(&p->thread); 13 Event_Construct(&p->startEvent); 14 Event_Construct(&p->finishedEvent); 15 } 16 17 void LoopThread_Close(CLoopThread *p) 18 { 19 Thread_Close(&p->thread); 20 Event_Close(&p->startEvent); 21 Event_Close(&p->finishedEvent); 22 } 23 24 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp) 25 { 26 CLoopThread *p = (CLoopThread *)pp; 27 for (;;) 28 { 29 if (Event_Wait(&p->startEvent) != 0) 30 return SZ_ERROR_THREAD; 31 if (p->stop) 32 return 0; 33 p->res = p->func(p->param); 34 if (Event_Set(&p->finishedEvent) != 0) 35 return SZ_ERROR_THREAD; 36 } 37 } 38 39 WRes LoopThread_Create(CLoopThread *p) 40 { 41 p->stop = 0; 42 RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent)); 43 RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent)); 44 return Thread_Create(&p->thread, LoopThreadFunc, p); 45 } 46 47 WRes LoopThread_StopAndWait(CLoopThread *p) 48 { 49 p->stop = 1; 50 if (Event_Set(&p->startEvent) != 0) 51 return SZ_ERROR_THREAD; 52 return Thread_Wait(&p->thread); 53 } 54 55 WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); } 56 WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); } 57 58 static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize) 59 { 60 return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK; 61 } 62 63 static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress) 64 { 65 unsigned i; 66 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++) 67 p->inSizes[i] = p->outSizes[i] = 0; 68 p->totalInSize = p->totalOutSize = 0; 69 p->progress = progress; 70 p->res = SZ_OK; 71 } 72 73 static void MtProgress_Reinit(CMtProgress *p, unsigned index) 74 { 75 p->inSizes[index] = 0; 76 p->outSizes[index] = 0; 77 } 78 79 #define UPDATE_PROGRESS(size, prev, total) \ 80 if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; } 81 82 SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize) 83 { 84 SRes res; 85 CriticalSection_Enter(&p->cs); 86 UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize) 87 UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize) 88 if (p->res == SZ_OK) 89 p->res = Progress(p->progress, p->totalInSize, p->totalOutSize); 90 res = p->res; 91 CriticalSection_Leave(&p->cs); 92 return res; 93 } 94 95 static void MtProgress_SetError(CMtProgress *p, SRes res) 96 { 97 CriticalSection_Enter(&p->cs); 98 if (p->res == SZ_OK) 99 p->res = res; 100 CriticalSection_Leave(&p->cs); 101 } 102 103 static void MtCoder_SetError(CMtCoder* p, SRes res) 104 { 105 CriticalSection_Enter(&p->cs); 106 if (p->res == SZ_OK) 107 p->res = res; 108 CriticalSection_Leave(&p->cs); 109 } 110 111 /* ---------- MtThread ---------- */ 112 113 void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder) 114 { 115 p->mtCoder = mtCoder; 116 p->outBuf = 0; 117 p->inBuf = 0; 118 Event_Construct(&p->canRead); 119 Event_Construct(&p->canWrite); 120 LoopThread_Construct(&p->thread); 121 } 122 123 #define RINOK_THREAD(x) { if((x) != 0) return SZ_ERROR_THREAD; } 124 125 static void CMtThread_CloseEvents(CMtThread *p) 126 { 127 Event_Close(&p->canRead); 128 Event_Close(&p->canWrite); 129 } 130 131 static void CMtThread_Destruct(CMtThread *p) 132 { 133 CMtThread_CloseEvents(p); 134 135 if (Thread_WasCreated(&p->thread.thread)) 136 { 137 LoopThread_StopAndWait(&p->thread); 138 LoopThread_Close(&p->thread); 139 } 140 141 if (p->mtCoder->alloc) 142 IAlloc_Free(p->mtCoder->alloc, p->outBuf); 143 p->outBuf = 0; 144 145 if (p->mtCoder->alloc) 146 IAlloc_Free(p->mtCoder->alloc, p->inBuf); 147 p->inBuf = 0; 148 } 149 150 #define MY_BUF_ALLOC(buf, size, newSize) \ 151 if (buf == 0 || size != newSize) \ 152 { IAlloc_Free(p->mtCoder->alloc, buf); \ 153 size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \ 154 if (buf == 0) return SZ_ERROR_MEM; } 155 156 static SRes CMtThread_Prepare(CMtThread *p) 157 { 158 MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize) 159 MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize) 160 161 p->stopReading = False; 162 p->stopWriting = False; 163 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead)); 164 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite)); 165 166 return SZ_OK; 167 } 168 169 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize) 170 { 171 size_t size = *processedSize; 172 *processedSize = 0; 173 while (size != 0) 174 { 175 size_t curSize = size; 176 SRes res = stream->Read(stream, data, &curSize); 177 *processedSize += curSize; 178 data += curSize; 179 size -= curSize; 180 RINOK(res); 181 if (curSize == 0) 182 return SZ_OK; 183 } 184 return SZ_OK; 185 } 186 187 #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads - 1 ? 0 : p->index + 1] 188 189 static SRes MtThread_Process(CMtThread *p, Bool *stop) 190 { 191 CMtThread *next; 192 *stop = True; 193 if (Event_Wait(&p->canRead) != 0) 194 return SZ_ERROR_THREAD; 195 196 next = GET_NEXT_THREAD(p); 197 198 if (p->stopReading) 199 { 200 next->stopReading = True; 201 return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD; 202 } 203 204 { 205 size_t size = p->mtCoder->blockSize; 206 size_t destSize = p->outBufSize; 207 208 RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size)); 209 next->stopReading = *stop = (size != p->mtCoder->blockSize); 210 if (Event_Set(&next->canRead) != 0) 211 return SZ_ERROR_THREAD; 212 213 RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index, 214 p->outBuf, &destSize, p->inBuf, size, *stop)); 215 216 MtProgress_Reinit(&p->mtCoder->mtProgress, p->index); 217 218 if (Event_Wait(&p->canWrite) != 0) 219 return SZ_ERROR_THREAD; 220 if (p->stopWriting) 221 return SZ_ERROR_FAIL; 222 if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize) 223 return SZ_ERROR_WRITE; 224 return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD; 225 } 226 } 227 228 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp) 229 { 230 CMtThread *p = (CMtThread *)pp; 231 for (;;) 232 { 233 Bool stop; 234 CMtThread *next = GET_NEXT_THREAD(p); 235 SRes res = MtThread_Process(p, &stop); 236 if (res != SZ_OK) 237 { 238 MtCoder_SetError(p->mtCoder, res); 239 MtProgress_SetError(&p->mtCoder->mtProgress, res); 240 next->stopReading = True; 241 next->stopWriting = True; 242 Event_Set(&next->canRead); 243 Event_Set(&next->canWrite); 244 return res; 245 } 246 if (stop) 247 return 0; 248 } 249 } 250 251 void MtCoder_Construct(CMtCoder* p) 252 { 253 unsigned i; 254 p->alloc = 0; 255 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++) 256 { 257 CMtThread *t = &p->threads[i]; 258 t->index = i; 259 CMtThread_Construct(t, p); 260 } 261 CriticalSection_Init(&p->cs); 262 CriticalSection_Init(&p->mtProgress.cs); 263 } 264 265 void MtCoder_Destruct(CMtCoder* p) 266 { 267 unsigned i; 268 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++) 269 CMtThread_Destruct(&p->threads[i]); 270 CriticalSection_Delete(&p->cs); 271 CriticalSection_Delete(&p->mtProgress.cs); 272 } 273 274 SRes MtCoder_Code(CMtCoder *p) 275 { 276 unsigned i, numThreads = p->numThreads; 277 SRes res = SZ_OK; 278 p->res = SZ_OK; 279 280 MtProgress_Init(&p->mtProgress, p->progress); 281 282 for (i = 0; i < numThreads; i++) 283 { 284 RINOK(CMtThread_Prepare(&p->threads[i])); 285 } 286 287 for (i = 0; i < numThreads; i++) 288 { 289 CMtThread *t = &p->threads[i]; 290 CLoopThread *lt = &t->thread; 291 292 if (!Thread_WasCreated(<->thread)) 293 { 294 lt->func = ThreadFunc; 295 lt->param = t; 296 297 if (LoopThread_Create(lt) != SZ_OK) 298 { 299 res = SZ_ERROR_THREAD; 300 break; 301 } 302 } 303 } 304 305 if (res == SZ_OK) 306 { 307 unsigned j; 308 for (i = 0; i < numThreads; i++) 309 { 310 CMtThread *t = &p->threads[i]; 311 if (LoopThread_StartSubThread(&t->thread) != SZ_OK) 312 { 313 res = SZ_ERROR_THREAD; 314 p->threads[0].stopReading = True; 315 break; 316 } 317 } 318 319 Event_Set(&p->threads[0].canWrite); 320 Event_Set(&p->threads[0].canRead); 321 322 for (j = 0; j < i; j++) 323 LoopThread_WaitSubThread(&p->threads[j].thread); 324 } 325 326 for (i = 0; i < numThreads; i++) 327 CMtThread_CloseEvents(&p->threads[i]); 328 return (res == SZ_OK) ? p->res : res; 329 } 330