Home | History | Annotate | Download | only in C
      1 /* MtCoder.c -- Multi-thread Coder
      2 2010-09-24 : Igor Pavlov : Public domain */
      3 
      4 #include "Precomp.h"
      5 
      6 #include <stdio.h>
      7 
      8 #include "MtCoder.h"
      9 
     10 void LoopThread_Construct(CLoopThread *p)
     11 {
     12   Thread_Construct(&p->thread);
     13   Event_Construct(&p->startEvent);
     14   Event_Construct(&p->finishedEvent);
     15 }
     16 
     17 void LoopThread_Close(CLoopThread *p)
     18 {
     19   Thread_Close(&p->thread);
     20   Event_Close(&p->startEvent);
     21   Event_Close(&p->finishedEvent);
     22 }
     23 
     24 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)
     25 {
     26   CLoopThread *p = (CLoopThread *)pp;
     27   for (;;)
     28   {
     29     if (Event_Wait(&p->startEvent) != 0)
     30       return SZ_ERROR_THREAD;
     31     if (p->stop)
     32       return 0;
     33     p->res = p->func(p->param);
     34     if (Event_Set(&p->finishedEvent) != 0)
     35       return SZ_ERROR_THREAD;
     36   }
     37 }
     38 
     39 WRes LoopThread_Create(CLoopThread *p)
     40 {
     41   p->stop = 0;
     42   RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));
     43   RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));
     44   return Thread_Create(&p->thread, LoopThreadFunc, p);
     45 }
     46 
     47 WRes LoopThread_StopAndWait(CLoopThread *p)
     48 {
     49   p->stop = 1;
     50   if (Event_Set(&p->startEvent) != 0)
     51     return SZ_ERROR_THREAD;
     52   return Thread_Wait(&p->thread);
     53 }
     54 
     55 WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }
     56 WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }
     57 
     58 static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)
     59 {
     60   return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;
     61 }
     62 
     63 static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
     64 {
     65   unsigned i;
     66   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
     67     p->inSizes[i] = p->outSizes[i] = 0;
     68   p->totalInSize = p->totalOutSize = 0;
     69   p->progress = progress;
     70   p->res = SZ_OK;
     71 }
     72 
     73 static void MtProgress_Reinit(CMtProgress *p, unsigned index)
     74 {
     75   p->inSizes[index] = 0;
     76   p->outSizes[index] = 0;
     77 }
     78 
     79 #define UPDATE_PROGRESS(size, prev, total) \
     80   if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }
     81 
     82 SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)
     83 {
     84   SRes res;
     85   CriticalSection_Enter(&p->cs);
     86   UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)
     87   UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)
     88   if (p->res == SZ_OK)
     89     p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);
     90   res = p->res;
     91   CriticalSection_Leave(&p->cs);
     92   return res;
     93 }
     94 
     95 static void MtProgress_SetError(CMtProgress *p, SRes res)
     96 {
     97   CriticalSection_Enter(&p->cs);
     98   if (p->res == SZ_OK)
     99     p->res = res;
    100   CriticalSection_Leave(&p->cs);
    101 }
    102 
    103 static void MtCoder_SetError(CMtCoder* p, SRes res)
    104 {
    105   CriticalSection_Enter(&p->cs);
    106   if (p->res == SZ_OK)
    107     p->res = res;
    108   CriticalSection_Leave(&p->cs);
    109 }
    110 
    111 /* ---------- MtThread ---------- */
    112 
    113 void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)
    114 {
    115   p->mtCoder = mtCoder;
    116   p->outBuf = 0;
    117   p->inBuf = 0;
    118   Event_Construct(&p->canRead);
    119   Event_Construct(&p->canWrite);
    120   LoopThread_Construct(&p->thread);
    121 }
    122 
    123 #define RINOK_THREAD(x) { if((x) != 0) return SZ_ERROR_THREAD; }
    124 
    125 static void CMtThread_CloseEvents(CMtThread *p)
    126 {
    127   Event_Close(&p->canRead);
    128   Event_Close(&p->canWrite);
    129 }
    130 
    131 static void CMtThread_Destruct(CMtThread *p)
    132 {
    133   CMtThread_CloseEvents(p);
    134 
    135   if (Thread_WasCreated(&p->thread.thread))
    136   {
    137     LoopThread_StopAndWait(&p->thread);
    138     LoopThread_Close(&p->thread);
    139   }
    140 
    141   if (p->mtCoder->alloc)
    142     IAlloc_Free(p->mtCoder->alloc, p->outBuf);
    143   p->outBuf = 0;
    144 
    145   if (p->mtCoder->alloc)
    146     IAlloc_Free(p->mtCoder->alloc, p->inBuf);
    147   p->inBuf = 0;
    148 }
    149 
    150 #define MY_BUF_ALLOC(buf, size, newSize) \
    151   if (buf == 0 || size != newSize) \
    152   { IAlloc_Free(p->mtCoder->alloc, buf); \
    153     size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \
    154     if (buf == 0) return SZ_ERROR_MEM; }
    155 
    156 static SRes CMtThread_Prepare(CMtThread *p)
    157 {
    158   MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)
    159   MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)
    160 
    161   p->stopReading = False;
    162   p->stopWriting = False;
    163   RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));
    164   RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));
    165 
    166   return SZ_OK;
    167 }
    168 
    169 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
    170 {
    171   size_t size = *processedSize;
    172   *processedSize = 0;
    173   while (size != 0)
    174   {
    175     size_t curSize = size;
    176     SRes res = stream->Read(stream, data, &curSize);
    177     *processedSize += curSize;
    178     data += curSize;
    179     size -= curSize;
    180     RINOK(res);
    181     if (curSize == 0)
    182       return SZ_OK;
    183   }
    184   return SZ_OK;
    185 }
    186 
    187 #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads  - 1 ? 0 : p->index + 1]
    188 
    189 static SRes MtThread_Process(CMtThread *p, Bool *stop)
    190 {
    191   CMtThread *next;
    192   *stop = True;
    193   if (Event_Wait(&p->canRead) != 0)
    194     return SZ_ERROR_THREAD;
    195 
    196   next = GET_NEXT_THREAD(p);
    197 
    198   if (p->stopReading)
    199   {
    200     next->stopReading = True;
    201     return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;
    202   }
    203 
    204   {
    205     size_t size = p->mtCoder->blockSize;
    206     size_t destSize = p->outBufSize;
    207 
    208     RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));
    209     next->stopReading = *stop = (size != p->mtCoder->blockSize);
    210     if (Event_Set(&next->canRead) != 0)
    211       return SZ_ERROR_THREAD;
    212 
    213     RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,
    214         p->outBuf, &destSize, p->inBuf, size, *stop));
    215 
    216     MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);
    217 
    218     if (Event_Wait(&p->canWrite) != 0)
    219       return SZ_ERROR_THREAD;
    220     if (p->stopWriting)
    221       return SZ_ERROR_FAIL;
    222     if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)
    223       return SZ_ERROR_WRITE;
    224     return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;
    225   }
    226 }
    227 
    228 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
    229 {
    230   CMtThread *p = (CMtThread *)pp;
    231   for (;;)
    232   {
    233     Bool stop;
    234     CMtThread *next = GET_NEXT_THREAD(p);
    235     SRes res = MtThread_Process(p, &stop);
    236     if (res != SZ_OK)
    237     {
    238       MtCoder_SetError(p->mtCoder, res);
    239       MtProgress_SetError(&p->mtCoder->mtProgress, res);
    240       next->stopReading = True;
    241       next->stopWriting = True;
    242       Event_Set(&next->canRead);
    243       Event_Set(&next->canWrite);
    244       return res;
    245     }
    246     if (stop)
    247       return 0;
    248   }
    249 }
    250 
    251 void MtCoder_Construct(CMtCoder* p)
    252 {
    253   unsigned i;
    254   p->alloc = 0;
    255   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
    256   {
    257     CMtThread *t = &p->threads[i];
    258     t->index = i;
    259     CMtThread_Construct(t, p);
    260   }
    261   CriticalSection_Init(&p->cs);
    262   CriticalSection_Init(&p->mtProgress.cs);
    263 }
    264 
    265 void MtCoder_Destruct(CMtCoder* p)
    266 {
    267   unsigned i;
    268   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
    269     CMtThread_Destruct(&p->threads[i]);
    270   CriticalSection_Delete(&p->cs);
    271   CriticalSection_Delete(&p->mtProgress.cs);
    272 }
    273 
    274 SRes MtCoder_Code(CMtCoder *p)
    275 {
    276   unsigned i, numThreads = p->numThreads;
    277   SRes res = SZ_OK;
    278   p->res = SZ_OK;
    279 
    280   MtProgress_Init(&p->mtProgress, p->progress);
    281 
    282   for (i = 0; i < numThreads; i++)
    283   {
    284     RINOK(CMtThread_Prepare(&p->threads[i]));
    285   }
    286 
    287   for (i = 0; i < numThreads; i++)
    288   {
    289     CMtThread *t = &p->threads[i];
    290     CLoopThread *lt = &t->thread;
    291 
    292     if (!Thread_WasCreated(&lt->thread))
    293     {
    294       lt->func = ThreadFunc;
    295       lt->param = t;
    296 
    297       if (LoopThread_Create(lt) != SZ_OK)
    298       {
    299         res = SZ_ERROR_THREAD;
    300         break;
    301       }
    302     }
    303   }
    304 
    305   if (res == SZ_OK)
    306   {
    307     unsigned j;
    308     for (i = 0; i < numThreads; i++)
    309     {
    310       CMtThread *t = &p->threads[i];
    311       if (LoopThread_StartSubThread(&t->thread) != SZ_OK)
    312       {
    313         res = SZ_ERROR_THREAD;
    314         p->threads[0].stopReading = True;
    315         break;
    316       }
    317     }
    318 
    319     Event_Set(&p->threads[0].canWrite);
    320     Event_Set(&p->threads[0].canRead);
    321 
    322     for (j = 0; j < i; j++)
    323       LoopThread_WaitSubThread(&p->threads[j].thread);
    324   }
    325 
    326   for (i = 0; i < numThreads; i++)
    327     CMtThread_CloseEvents(&p->threads[i]);
    328   return (res == SZ_OK) ? p->res : res;
    329 }
    330