Home | History | Annotate | Download | only in C
      1 /* MtCoder.c -- Multi-thread Coder
      2 2010-09-24 : Igor Pavlov : Public domain */
      3 
      4 #include <stdio.h>
      5 
      6 #include "MtCoder.h"
      7 
      8 void LoopThread_Construct(CLoopThread *p)
      9 {
     10   Thread_Construct(&p->thread);
     11   Event_Construct(&p->startEvent);
     12   Event_Construct(&p->finishedEvent);
     13 }
     14 
     15 void LoopThread_Close(CLoopThread *p)
     16 {
     17   Thread_Close(&p->thread);
     18   Event_Close(&p->startEvent);
     19   Event_Close(&p->finishedEvent);
     20 }
     21 
     22 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)
     23 {
     24   CLoopThread *p = (CLoopThread *)pp;
     25   for (;;)
     26   {
     27     if (Event_Wait(&p->startEvent) != 0)
     28       return SZ_ERROR_THREAD;
     29     if (p->stop)
     30       return 0;
     31     p->res = p->func(p->param);
     32     if (Event_Set(&p->finishedEvent) != 0)
     33       return SZ_ERROR_THREAD;
     34   }
     35 }
     36 
     37 WRes LoopThread_Create(CLoopThread *p)
     38 {
     39   p->stop = 0;
     40   RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));
     41   RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));
     42   return Thread_Create(&p->thread, LoopThreadFunc, p);
     43 }
     44 
     45 WRes LoopThread_StopAndWait(CLoopThread *p)
     46 {
     47   p->stop = 1;
     48   if (Event_Set(&p->startEvent) != 0)
     49     return SZ_ERROR_THREAD;
     50   return Thread_Wait(&p->thread);
     51 }
     52 
     53 WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }
     54 WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }
     55 
     56 static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)
     57 {
     58   return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;
     59 }
     60 
     61 static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
     62 {
     63   unsigned i;
     64   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
     65     p->inSizes[i] = p->outSizes[i] = 0;
     66   p->totalInSize = p->totalOutSize = 0;
     67   p->progress = progress;
     68   p->res = SZ_OK;
     69 }
     70 
     71 static void MtProgress_Reinit(CMtProgress *p, unsigned index)
     72 {
     73   p->inSizes[index] = 0;
     74   p->outSizes[index] = 0;
     75 }
     76 
     77 #define UPDATE_PROGRESS(size, prev, total) \
     78   if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }
     79 
     80 SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)
     81 {
     82   SRes res;
     83   CriticalSection_Enter(&p->cs);
     84   UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)
     85   UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)
     86   if (p->res == SZ_OK)
     87     p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);
     88   res = p->res;
     89   CriticalSection_Leave(&p->cs);
     90   return res;
     91 }
     92 
     93 static void MtProgress_SetError(CMtProgress *p, SRes res)
     94 {
     95   CriticalSection_Enter(&p->cs);
     96   if (p->res == SZ_OK)
     97     p->res = res;
     98   CriticalSection_Leave(&p->cs);
     99 }
    100 
    101 static void MtCoder_SetError(CMtCoder* p, SRes res)
    102 {
    103   CriticalSection_Enter(&p->cs);
    104   if (p->res == SZ_OK)
    105     p->res = res;
    106   CriticalSection_Leave(&p->cs);
    107 }
    108 
    109 /* ---------- MtThread ---------- */
    110 
    111 void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)
    112 {
    113   p->mtCoder = mtCoder;
    114   p->outBuf = 0;
    115   p->inBuf = 0;
    116   Event_Construct(&p->canRead);
    117   Event_Construct(&p->canWrite);
    118   LoopThread_Construct(&p->thread);
    119 }
    120 
    121 #define RINOK_THREAD(x) { if((x) != 0) return SZ_ERROR_THREAD; }
    122 
    123 static void CMtThread_CloseEvents(CMtThread *p)
    124 {
    125   Event_Close(&p->canRead);
    126   Event_Close(&p->canWrite);
    127 }
    128 
    129 static void CMtThread_Destruct(CMtThread *p)
    130 {
    131   CMtThread_CloseEvents(p);
    132 
    133   if (Thread_WasCreated(&p->thread.thread))
    134   {
    135     LoopThread_StopAndWait(&p->thread);
    136     LoopThread_Close(&p->thread);
    137   }
    138 
    139   if (p->mtCoder->alloc)
    140     IAlloc_Free(p->mtCoder->alloc, p->outBuf);
    141   p->outBuf = 0;
    142 
    143   if (p->mtCoder->alloc)
    144     IAlloc_Free(p->mtCoder->alloc, p->inBuf);
    145   p->inBuf = 0;
    146 }
    147 
    148 #define MY_BUF_ALLOC(buf, size, newSize) \
    149   if (buf == 0 || size != newSize) \
    150   { IAlloc_Free(p->mtCoder->alloc, buf); \
    151     size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \
    152     if (buf == 0) return SZ_ERROR_MEM; }
    153 
    154 static SRes CMtThread_Prepare(CMtThread *p)
    155 {
    156   MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)
    157   MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)
    158 
    159   p->stopReading = False;
    160   p->stopWriting = False;
    161   RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));
    162   RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));
    163 
    164   return SZ_OK;
    165 }
    166 
    167 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
    168 {
    169   size_t size = *processedSize;
    170   *processedSize = 0;
    171   while (size != 0)
    172   {
    173     size_t curSize = size;
    174     SRes res = stream->Read(stream, data, &curSize);
    175     *processedSize += curSize;
    176     data += curSize;
    177     size -= curSize;
    178     RINOK(res);
    179     if (curSize == 0)
    180       return SZ_OK;
    181   }
    182   return SZ_OK;
    183 }
    184 
    185 #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads  - 1 ? 0 : p->index + 1]
    186 
    187 static SRes MtThread_Process(CMtThread *p, Bool *stop)
    188 {
    189   CMtThread *next;
    190   *stop = True;
    191   if (Event_Wait(&p->canRead) != 0)
    192     return SZ_ERROR_THREAD;
    193 
    194   next = GET_NEXT_THREAD(p);
    195 
    196   if (p->stopReading)
    197   {
    198     next->stopReading = True;
    199     return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;
    200   }
    201 
    202   {
    203     size_t size = p->mtCoder->blockSize;
    204     size_t destSize = p->outBufSize;
    205 
    206     RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));
    207     next->stopReading = *stop = (size != p->mtCoder->blockSize);
    208     if (Event_Set(&next->canRead) != 0)
    209       return SZ_ERROR_THREAD;
    210 
    211     RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,
    212         p->outBuf, &destSize, p->inBuf, size, *stop));
    213 
    214     MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);
    215 
    216     if (Event_Wait(&p->canWrite) != 0)
    217       return SZ_ERROR_THREAD;
    218     if (p->stopWriting)
    219       return SZ_ERROR_FAIL;
    220     if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)
    221       return SZ_ERROR_WRITE;
    222     return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;
    223   }
    224 }
    225 
    226 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
    227 {
    228   CMtThread *p = (CMtThread *)pp;
    229   for (;;)
    230   {
    231     Bool stop;
    232     CMtThread *next = GET_NEXT_THREAD(p);
    233     SRes res = MtThread_Process(p, &stop);
    234     if (res != SZ_OK)
    235     {
    236       MtCoder_SetError(p->mtCoder, res);
    237       MtProgress_SetError(&p->mtCoder->mtProgress, res);
    238       next->stopReading = True;
    239       next->stopWriting = True;
    240       Event_Set(&next->canRead);
    241       Event_Set(&next->canWrite);
    242       return res;
    243     }
    244     if (stop)
    245       return 0;
    246   }
    247 }
    248 
    249 void MtCoder_Construct(CMtCoder* p)
    250 {
    251   unsigned i;
    252   p->alloc = 0;
    253   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
    254   {
    255     CMtThread *t = &p->threads[i];
    256     t->index = i;
    257     CMtThread_Construct(t, p);
    258   }
    259   CriticalSection_Init(&p->cs);
    260   CriticalSection_Init(&p->mtProgress.cs);
    261 }
    262 
    263 void MtCoder_Destruct(CMtCoder* p)
    264 {
    265   unsigned i;
    266   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
    267     CMtThread_Destruct(&p->threads[i]);
    268   CriticalSection_Delete(&p->cs);
    269   CriticalSection_Delete(&p->mtProgress.cs);
    270 }
    271 
    272 SRes MtCoder_Code(CMtCoder *p)
    273 {
    274   unsigned i, numThreads = p->numThreads;
    275   SRes res = SZ_OK;
    276   p->res = SZ_OK;
    277 
    278   MtProgress_Init(&p->mtProgress, p->progress);
    279 
    280   for (i = 0; i < numThreads; i++)
    281   {
    282     RINOK(CMtThread_Prepare(&p->threads[i]));
    283   }
    284 
    285   for (i = 0; i < numThreads; i++)
    286   {
    287     CMtThread *t = &p->threads[i];
    288     CLoopThread *lt = &t->thread;
    289 
    290     if (!Thread_WasCreated(&lt->thread))
    291     {
    292       lt->func = ThreadFunc;
    293       lt->param = t;
    294 
    295       if (LoopThread_Create(lt) != SZ_OK)
    296       {
    297         res = SZ_ERROR_THREAD;
    298         break;
    299       }
    300     }
    301   }
    302 
    303   if (res == SZ_OK)
    304   {
    305     unsigned j;
    306     for (i = 0; i < numThreads; i++)
    307     {
    308       CMtThread *t = &p->threads[i];
    309       if (LoopThread_StartSubThread(&t->thread) != SZ_OK)
    310       {
    311         res = SZ_ERROR_THREAD;
    312         p->threads[0].stopReading = True;
    313         break;
    314       }
    315     }
    316 
    317     Event_Set(&p->threads[0].canWrite);
    318     Event_Set(&p->threads[0].canRead);
    319 
    320     for (j = 0; j < i; j++)
    321       LoopThread_WaitSubThread(&p->threads[j].thread);
    322   }
    323 
    324   for (i = 0; i < numThreads; i++)
    325     CMtThread_CloseEvents(&p->threads[i]);
    326   return (res == SZ_OK) ? p->res : res;
    327 }
    328