Home | History | Annotate | Download | only in C
      1 /* MtDec.c -- Multi-thread Decoder
      2 2018-07-04 : Igor Pavlov : Public domain */
      3 
      4 #include "Precomp.h"
      5 
      6 // #define SHOW_DEBUG_INFO
      7 
      8 // #include <stdio.h>
      9 
     10 #ifdef SHOW_DEBUG_INFO
     11 #include <stdio.h>
     12 #endif
     13 
     14 #ifdef SHOW_DEBUG_INFO
     15 #define PRF(x) x
     16 #else
     17 #define PRF(x)
     18 #endif
     19 
     20 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
     21 
     22 #include "MtDec.h"
     23 
     24 #ifndef _7ZIP_ST
     25 
     26 void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
     27 {
     28   p->progress = progress;
     29   p->res = SZ_OK;
     30   p->totalInSize = 0;
     31   p->totalOutSize = 0;
     32 }
     33 
     34 
     35 SRes MtProgress_Progress_ST(CMtProgress *p)
     36 {
     37   if (p->res == SZ_OK && p->progress)
     38     if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
     39       p->res = SZ_ERROR_PROGRESS;
     40   return p->res;
     41 }
     42 
     43 
     44 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
     45 {
     46   SRes res;
     47   CriticalSection_Enter(&p->cs);
     48 
     49   p->totalInSize += inSize;
     50   p->totalOutSize += outSize;
     51   if (p->res == SZ_OK && p->progress)
     52     if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
     53       p->res = SZ_ERROR_PROGRESS;
     54   res = p->res;
     55 
     56   CriticalSection_Leave(&p->cs);
     57   return res;
     58 }
     59 
     60 
     61 SRes MtProgress_GetError(CMtProgress *p)
     62 {
     63   SRes res;
     64   CriticalSection_Enter(&p->cs);
     65   res = p->res;
     66   CriticalSection_Leave(&p->cs);
     67   return res;
     68 }
     69 
     70 
     71 void MtProgress_SetError(CMtProgress *p, SRes res)
     72 {
     73   CriticalSection_Enter(&p->cs);
     74   if (p->res == SZ_OK)
     75     p->res = res;
     76   CriticalSection_Leave(&p->cs);
     77 }
     78 
     79 
     80 #define RINOK_THREAD(x) RINOK(x)
     81 
     82 
     83 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
     84 {
     85   if (Event_IsCreated(p))
     86     return Event_Reset(p);
     87   return AutoResetEvent_CreateNotSignaled(p);
     88 }
     89 
     90 
     91 
     92 typedef struct
     93 {
     94   void *next;
     95   void *pad[3];
     96 } CMtDecBufLink;
     97 
     98 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
     99 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
    100 
    101 
    102 
    103 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
    104 
    105 
    106 static WRes MtDecThread_CreateEvents(CMtDecThread *t)
    107 {
    108   WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
    109   if (wres == 0)
    110   {
    111     wres = ArEvent_OptCreate_And_Reset(&t->canRead);
    112     if (wres == 0)
    113       return SZ_OK;
    114   }
    115   return wres;
    116 }
    117 
    118 
    119 static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
    120 {
    121   WRes wres = MtDecThread_CreateEvents(t);
    122   // wres = 17; // for test
    123   if (wres == 0)
    124   {
    125     if (Thread_WasCreated(&t->thread))
    126       return SZ_OK;
    127     wres = Thread_Create(&t->thread, ThreadFunc, t);
    128     if (wres == 0)
    129       return SZ_OK;
    130   }
    131   return MY_SRes_HRESULT_FROM_WRes(wres);
    132 }
    133 
    134 
    135 void MtDecThread_FreeInBufs(CMtDecThread *t)
    136 {
    137   if (t->inBuf)
    138   {
    139     void *link = t->inBuf;
    140     t->inBuf = NULL;
    141     do
    142     {
    143       void *next = ((CMtDecBufLink *)link)->next;
    144       ISzAlloc_Free(t->mtDec->alloc, link);
    145       link = next;
    146     }
    147     while (link);
    148   }
    149 }
    150 
    151 
    152 static void MtDecThread_CloseThread(CMtDecThread *t)
    153 {
    154   if (Thread_WasCreated(&t->thread))
    155   {
    156     Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
    157     Event_Set(&t->canRead);
    158     Thread_Wait(&t->thread);
    159     Thread_Close(&t->thread);
    160   }
    161 
    162   Event_Close(&t->canRead);
    163   Event_Close(&t->canWrite);
    164 }
    165 
    166 static void MtDec_CloseThreads(CMtDec *p)
    167 {
    168   unsigned i;
    169   for (i = 0; i < MTDEC__THREADS_MAX; i++)
    170     MtDecThread_CloseThread(&p->threads[i]);
    171 }
    172 
    173 static void MtDecThread_Destruct(CMtDecThread *t)
    174 {
    175   MtDecThread_CloseThread(t);
    176   MtDecThread_FreeInBufs(t);
    177 }
    178 
    179 
    180 
    181 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
    182 {
    183   size_t size = *processedSize;
    184   *processedSize = 0;
    185   while (size != 0)
    186   {
    187     size_t cur = size;
    188     SRes res = ISeqInStream_Read(stream, data, &cur);
    189     *processedSize += cur;
    190     data += cur;
    191     size -= cur;
    192     RINOK(res);
    193     if (cur == 0)
    194       return SZ_OK;
    195   }
    196   return SZ_OK;
    197 }
    198 
    199 
    200 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
    201 {
    202   SRes res;
    203   CriticalSection_Enter(&p->mtProgress.cs);
    204   *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
    205   res = p->mtProgress.res;
    206   CriticalSection_Leave(&p->mtProgress.cs);
    207   return res;
    208 }
    209 
    210 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
    211 {
    212   SRes res;
    213   CriticalSection_Enter(&p->mtProgress.cs);
    214 
    215   p->mtProgress.totalInSize += inSize;
    216   p->mtProgress.totalOutSize += outSize;
    217   if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
    218     if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
    219       p->mtProgress.res = SZ_ERROR_PROGRESS;
    220 
    221   *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
    222   res = p->mtProgress.res;
    223 
    224   CriticalSection_Leave(&p->mtProgress.cs);
    225 
    226   return res;
    227 }
    228 
    229 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
    230 {
    231   CriticalSection_Enter(&p->mtProgress.cs);
    232   if (!p->needInterrupt || interruptIndex < p->interruptIndex)
    233   {
    234     p->interruptIndex = interruptIndex;
    235     p->needInterrupt = True;
    236   }
    237   CriticalSection_Leave(&p->mtProgress.cs);
    238 }
    239 
    240 Byte *MtDec_GetCrossBuff(CMtDec *p)
    241 {
    242   Byte *cr = p->crossBlock;
    243   if (!cr)
    244   {
    245     cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
    246     if (!cr)
    247       return NULL;
    248     p->crossBlock = cr;
    249   }
    250   return MTDEC__DATA_PTR_FROM_LINK(cr);
    251 }
    252 
    253 
    254 /*
    255   ThreadFunc2() returns:
    256   0      - in all normal cases (even for stream error or memory allocation error)
    257   (!= 0) - WRes error return by system threading function
    258 */
    259 
    260 // #define MTDEC_ProgessStep (1 << 22)
    261 #define MTDEC_ProgessStep (1 << 0)
    262 
    263 static WRes ThreadFunc2(CMtDecThread *t)
    264 {
    265   CMtDec *p = t->mtDec;
    266 
    267   PRF_STR_INT("ThreadFunc2", t->index);
    268 
    269   // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
    270 
    271   for (;;)
    272   {
    273     SRes res, codeRes;
    274     BoolInt wasInterrupted, isAllocError, overflow, finish;
    275     SRes threadingErrorSRes;
    276     BoolInt needCode, needWrite, needContinue;
    277 
    278     size_t inDataSize_Start;
    279     UInt64 inDataSize;
    280     // UInt64 inDataSize_Full;
    281 
    282     UInt64 blockIndex;
    283 
    284     UInt64 inPrev = 0;
    285     UInt64 outPrev = 0;
    286     UInt64 inCodePos;
    287     UInt64 outCodePos;
    288 
    289     Byte *afterEndData = NULL;
    290     size_t afterEndData_Size = 0;
    291 
    292     BoolInt canCreateNewThread = False;
    293     // CMtDecCallbackInfo parse;
    294     CMtDecThread *nextThread;
    295 
    296     PRF_STR_INT("Event_Wait(&t->canRead)", t->index);
    297 
    298     RINOK_THREAD(Event_Wait(&t->canRead));
    299     if (p->exitThread)
    300       return 0;
    301 
    302     PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
    303 
    304     // if (t->index == 3) return 19; // for test
    305 
    306     blockIndex = p->blockIndex++;
    307 
    308     // PRF(printf("\ncanRead\n"))
    309 
    310     res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
    311 
    312     finish = p->readWasFinished;
    313     needCode = False;
    314     needWrite = False;
    315     isAllocError = False;
    316     overflow = False;
    317 
    318     inDataSize_Start = 0;
    319     inDataSize = 0;
    320     // inDataSize_Full = 0;
    321 
    322     if (res == SZ_OK && !wasInterrupted)
    323     {
    324       // if (p->inStream)
    325       {
    326         CMtDecBufLink *prev = NULL;
    327         CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
    328         size_t crossSize = p->crossEnd - p->crossStart;
    329 
    330         PRF(printf("\ncrossSize = %d\n", crossSize));
    331 
    332         for (;;)
    333         {
    334           if (!link)
    335           {
    336             link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
    337             if (!link)
    338             {
    339               finish = True;
    340               // p->allocError_for_Read_BlockIndex = blockIndex;
    341               isAllocError = True;
    342               break;
    343             }
    344             link->next = NULL;
    345             if (prev)
    346             {
    347               // static unsigned g_num = 0;
    348               // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
    349               prev->next = link;
    350             }
    351             else
    352               t->inBuf = (void *)link;
    353           }
    354 
    355           {
    356             Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
    357             Byte *parseData = data;
    358             size_t size;
    359 
    360             if (crossSize != 0)
    361             {
    362               inDataSize = crossSize;
    363               // inDataSize_Full = inDataSize;
    364               inDataSize_Start = crossSize;
    365               size = crossSize;
    366               parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
    367               PRF(printf("\ncross : crossStart = %7d  crossEnd = %7d finish = %1d",
    368                   (int)p->crossStart, (int)p->crossEnd, (int)finish));
    369             }
    370             else
    371             {
    372               size = p->inBufSize;
    373 
    374               res = FullRead(p->inStream, data, &size);
    375 
    376               // size = 10; // test
    377 
    378               inDataSize += size;
    379               // inDataSize_Full = inDataSize;
    380               if (!prev)
    381                 inDataSize_Start = size;
    382 
    383               p->readProcessed += size;
    384               finish = (size != p->inBufSize);
    385               if (finish)
    386                 p->readWasFinished = True;
    387 
    388               // res = E_INVALIDARG; // test
    389 
    390               if (res != SZ_OK)
    391               {
    392                 // PRF(printf("\nRead error = %d\n", res))
    393                 // we want to decode all data before error
    394                 p->readRes = res;
    395                 // p->readError_BlockIndex = blockIndex;
    396                 p->readWasFinished = True;
    397                 finish = True;
    398                 res = SZ_OK;
    399                 // break;
    400               }
    401 
    402               if (inDataSize - inPrev >= MTDEC_ProgessStep)
    403               {
    404                 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
    405                 if (res != SZ_OK || wasInterrupted)
    406                   break;
    407                 inPrev = inDataSize;
    408               }
    409             }
    410 
    411             {
    412               CMtDecCallbackInfo parse;
    413 
    414               parse.startCall = (prev == NULL);
    415               parse.src = parseData;
    416               parse.srcSize = size;
    417               parse.srcFinished = finish;
    418               parse.canCreateNewThread = True;
    419 
    420               // PRF(printf("\nParse size = %d\n", (unsigned)size))
    421 
    422               p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
    423 
    424               needWrite = True;
    425               canCreateNewThread = parse.canCreateNewThread;
    426 
    427               // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
    428 
    429               if (
    430                   // parseRes != SZ_OK ||
    431                   // inDataSize - (size - parse.srcSize) > p->inBlockMax
    432                   // ||
    433                   parse.state == MTDEC_PARSE_OVERFLOW
    434                   // || wasInterrupted
    435                   )
    436               {
    437                 // Overflow or Parse error - switch from MT decoding to ST decoding
    438                 finish = True;
    439                 overflow = True;
    440 
    441                 {
    442                   PRF(printf("\n Overflow"));
    443                   // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
    444                   PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
    445                 }
    446 
    447                 if (crossSize != 0)
    448                   memcpy(data, parseData, size);
    449                 p->crossStart = 0;
    450                 p->crossEnd = 0;
    451                 break;
    452               }
    453 
    454               if (crossSize != 0)
    455               {
    456                 memcpy(data, parseData, parse.srcSize);
    457                 p->crossStart += parse.srcSize;
    458               }
    459 
    460               if (parse.state != MTDEC_PARSE_CONTINUE || finish)
    461               {
    462                 // we don't need to parse in current thread anymore
    463 
    464                 if (parse.state == MTDEC_PARSE_END)
    465                   finish = True;
    466 
    467                 needCode = True;
    468                 // p->crossFinished = finish;
    469 
    470                 if (parse.srcSize == size)
    471                 {
    472                   // full parsed - no cross transfer
    473                   p->crossStart = 0;
    474                   p->crossEnd = 0;
    475                   break;
    476                 }
    477 
    478                 if (parse.state == MTDEC_PARSE_END)
    479                 {
    480                   p->crossStart = 0;
    481                   p->crossEnd = 0;
    482 
    483                   if (crossSize != 0)
    484                     memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
    485                   afterEndData_Size = size - parse.srcSize;
    486                   afterEndData = parseData + parse.srcSize;
    487 
    488                   // we reduce data size to required bytes (parsed only)
    489                   inDataSize -= (size - parse.srcSize);
    490                   if (!prev)
    491                     inDataSize_Start = parse.srcSize;
    492                   break;
    493                 }
    494 
    495                 {
    496                   // partial parsed - need cross transfer
    497                   if (crossSize != 0)
    498                     inDataSize = parse.srcSize; // it's only parsed now
    499                   else
    500                   {
    501                     // partial parsed - is not in initial cross block - we need to copy new data to cross block
    502                     Byte *cr = MtDec_GetCrossBuff(p);
    503                     if (!cr)
    504                     {
    505                       {
    506                         PRF(printf("\ncross alloc error error\n"));
    507                         // res = SZ_ERROR_MEM;
    508                         finish = True;
    509                         // p->allocError_for_Read_BlockIndex = blockIndex;
    510                         isAllocError = True;
    511                         break;
    512                       }
    513                     }
    514 
    515                     {
    516                       size_t crSize = size - parse.srcSize;
    517                       inDataSize -= crSize;
    518                       p->crossEnd = crSize;
    519                       p->crossStart = 0;
    520                       memcpy(cr, parseData + parse.srcSize, crSize);
    521                     }
    522                   }
    523 
    524                   // inDataSize_Full = inDataSize;
    525                   if (!prev)
    526                     inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
    527 
    528                   finish = False;
    529                   break;
    530                 }
    531               }
    532 
    533               if (parse.srcSize != size)
    534               {
    535                 res = SZ_ERROR_FAIL;
    536                 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
    537                 break;
    538               }
    539             }
    540           }
    541 
    542           prev = link;
    543           link = link->next;
    544 
    545           if (crossSize != 0)
    546           {
    547             crossSize = 0;
    548             p->crossStart = 0;
    549             p->crossEnd = 0;
    550           }
    551         }
    552       }
    553 
    554       if (res == SZ_OK)
    555         res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
    556     }
    557 
    558     codeRes = SZ_OK;
    559 
    560     if (res == SZ_OK && needCode && !wasInterrupted)
    561     {
    562       codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
    563       if (codeRes != SZ_OK)
    564       {
    565         needCode = False;
    566         finish = True;
    567         // SZ_ERROR_MEM is expected error here.
    568         //   if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
    569         //   if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
    570       }
    571     }
    572 
    573     if (res != SZ_OK || wasInterrupted)
    574       finish = True;
    575 
    576     nextThread = NULL;
    577     threadingErrorSRes = SZ_OK;
    578 
    579     if (!finish)
    580     {
    581       if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
    582       {
    583         SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
    584         if (res2 == SZ_OK)
    585         {
    586           // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
    587           p->numStartedThreads++;
    588         }
    589         else
    590         {
    591           PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
    592           if (p->numStartedThreads == 1)
    593           {
    594             // if only one thread is possible, we leave muti-threading code
    595             finish = True;
    596             needCode = False;
    597             threadingErrorSRes = res2;
    598           }
    599           else
    600             p->numStartedThreads_Limit = p->numStartedThreads;
    601         }
    602       }
    603 
    604       if (!finish)
    605       {
    606         unsigned nextIndex = t->index + 1;
    607         nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
    608         RINOK_THREAD(Event_Set(&nextThread->canRead))
    609         // We have started executing for new iteration (with next thread)
    610         // And that next thread now is responsible for possible exit from decoding (threading_code)
    611       }
    612     }
    613 
    614     // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
    615     // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
    616     // if (  finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
    617     //   - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
    618     //   - otherwise we stop decoding and exit from ThreadFunc2()
    619 
    620     // Don't change (finish) variable in the further code
    621 
    622 
    623     // ---------- CODE ----------
    624 
    625     inPrev = 0;
    626     outPrev = 0;
    627     inCodePos = 0;
    628     outCodePos = 0;
    629 
    630     if (res == SZ_OK && needCode && codeRes == SZ_OK)
    631     {
    632       BoolInt isStartBlock = True;
    633       CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
    634 
    635       for (;;)
    636       {
    637         size_t inSize;
    638         int stop;
    639 
    640         if (isStartBlock)
    641           inSize = inDataSize_Start;
    642         else
    643         {
    644           UInt64 rem = inDataSize - inCodePos;
    645           inSize = p->inBufSize;
    646           if (inSize > rem)
    647             inSize = (size_t)rem;
    648         }
    649 
    650         inCodePos += inSize;
    651         stop = True;
    652 
    653         codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
    654             (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
    655             (inCodePos == inDataSize), // srcFinished
    656             &inCodePos, &outCodePos, &stop);
    657 
    658         if (codeRes != SZ_OK)
    659         {
    660           PRF(printf("\nCode Interrupt error = %x\n", codeRes));
    661           // we interrupt only later blocks
    662           MtDec_Interrupt(p, blockIndex);
    663           break;
    664         }
    665 
    666         if (stop || inCodePos == inDataSize)
    667           break;
    668 
    669         {
    670           const UInt64 inDelta = inCodePos - inPrev;
    671           const UInt64 outDelta = outCodePos - outPrev;
    672           if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
    673           {
    674             // Sleep(1);
    675             res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
    676             if (res != SZ_OK || wasInterrupted)
    677               break;
    678             inPrev = inCodePos;
    679             outPrev = outCodePos;
    680           }
    681         }
    682 
    683         link = link->next;
    684         isStartBlock = False;
    685       }
    686     }
    687 
    688 
    689     // ---------- WRITE ----------
    690 
    691     RINOK_THREAD(Event_Wait(&t->canWrite));
    692 
    693   {
    694     BoolInt isErrorMode = False;
    695     BoolInt canRecode = True;
    696     BoolInt needWriteToStream = needWrite;
    697 
    698     if (p->exitThread) return 0; // it's never executed in normal cases
    699 
    700     if (p->wasInterrupted)
    701       wasInterrupted = True;
    702     else
    703     {
    704       if (codeRes != SZ_OK) // || !needCode // check it !!!
    705       {
    706         p->wasInterrupted = True;
    707         p->codeRes = codeRes;
    708         if (codeRes == SZ_ERROR_MEM)
    709           isAllocError = True;
    710       }
    711 
    712       if (threadingErrorSRes)
    713       {
    714         p->wasInterrupted = True;
    715         p->threadingErrorSRes = threadingErrorSRes;
    716         needWriteToStream = False;
    717       }
    718       if (isAllocError)
    719       {
    720         p->wasInterrupted = True;
    721         p->isAllocError = True;
    722         needWriteToStream = False;
    723       }
    724       if (overflow)
    725       {
    726         p->wasInterrupted = True;
    727         p->overflow = True;
    728         needWriteToStream = False;
    729       }
    730     }
    731 
    732     if (needCode)
    733     {
    734       if (wasInterrupted)
    735       {
    736         inCodePos = 0;
    737         outCodePos = 0;
    738       }
    739       {
    740         const UInt64 inDelta = inCodePos - inPrev;
    741         const UInt64 outDelta = outCodePos - outPrev;
    742         // if (inDelta != 0 || outDelta != 0)
    743         res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
    744       }
    745     }
    746 
    747     needContinue = (!finish);
    748 
    749     // if (res == SZ_OK && needWrite && !wasInterrupted)
    750     if (needWrite)
    751     {
    752       // p->inProcessed += inCodePos;
    753 
    754       res = p->mtCallback->Write(p->mtCallbackObject, t->index,
    755           res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
    756           afterEndData, afterEndData_Size,
    757           &needContinue,
    758           &canRecode);
    759 
    760       // res= E_INVALIDARG; // for test
    761 
    762       PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
    763       PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
    764 
    765       if (res != SZ_OK)
    766       {
    767         PRF(printf("\nWrite error = %d\n", res));
    768         isErrorMode = True;
    769         p->wasInterrupted = True;
    770       }
    771       if (res != SZ_OK
    772           || (!needContinue && !finish))
    773       {
    774         PRF(printf("\nWrite Interrupt error = %x\n", res));
    775         MtDec_Interrupt(p, blockIndex);
    776       }
    777     }
    778 
    779     if (canRecode)
    780     if (!needCode
    781         || res != SZ_OK
    782         || p->wasInterrupted
    783         || codeRes != SZ_OK
    784         || wasInterrupted
    785         || p->numFilledThreads != 0
    786         || isErrorMode)
    787     {
    788       if (p->numFilledThreads == 0)
    789         p->filledThreadStart = t->index;
    790       if (inDataSize != 0 || !finish)
    791       {
    792         t->inDataSize_Start = inDataSize_Start;
    793         t->inDataSize = inDataSize;
    794         p->numFilledThreads++;
    795       }
    796       PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
    797       PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
    798     }
    799 
    800     if (!finish)
    801     {
    802       RINOK_THREAD(Event_Set(&nextThread->canWrite));
    803     }
    804     else
    805     {
    806       if (needContinue)
    807       {
    808         // we restore decoding with new iteration
    809         RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
    810       }
    811       else
    812       {
    813         // we exit from decoding
    814         if (t->index == 0)
    815           return SZ_OK;
    816         p->exitThread = True;
    817       }
    818       RINOK_THREAD(Event_Set(&p->threads[0].canRead));
    819     }
    820   }
    821   }
    822 }
    823 
    824 #ifdef _WIN32
    825 #define USE_ALLOCA
    826 #endif
    827 
    828 #ifdef USE_ALLOCA
    829 #ifdef _WIN32
    830 #include <malloc.h>
    831 #else
    832 #include <stdlib.h>
    833 #endif
    834 #endif
    835 
    836 
    837 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
    838 {
    839   WRes res;
    840 
    841   CMtDecThread *t = (CMtDecThread *)pp;
    842   CMtDec *p;
    843 
    844   // fprintf(stdout, "\n%d = %p\n", t->index, &t);
    845 
    846   res = ThreadFunc2(t);
    847   p = t->mtDec;
    848   if (res == 0)
    849     return p->exitThreadWRes;
    850   {
    851     // it's unexpected situation for some threading function error
    852     if (p->exitThreadWRes == 0)
    853       p->exitThreadWRes = res;
    854     PRF(printf("\nthread exit error = %d\n", res));
    855     p->exitThread = True;
    856     Event_Set(&p->threads[0].canRead);
    857     Event_Set(&p->threads[0].canWrite);
    858     MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
    859   }
    860   return res;
    861 }
    862 
    863 static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
    864 {
    865   CMtDecThread *t = (CMtDecThread *)pp;
    866 
    867   // fprintf(stderr, "\n%d = %p - before", t->index, &t);
    868   #ifdef USE_ALLOCA
    869   t->allocaPtr = alloca(t->index * 128);
    870   #endif
    871   return ThreadFunc1(pp);
    872 }
    873 
    874 
    875 int MtDec_PrepareRead(CMtDec *p)
    876 {
    877   if (p->crossBlock && p->crossStart == p->crossEnd)
    878   {
    879     ISzAlloc_Free(p->alloc, p->crossBlock);
    880     p->crossBlock = NULL;
    881   }
    882 
    883   {
    884     unsigned i;
    885     for (i = 0; i < MTDEC__THREADS_MAX; i++)
    886       if (i > p->numStartedThreads
    887           || p->numFilledThreads <=
    888             (i >= p->filledThreadStart ?
    889               i - p->filledThreadStart :
    890               i + p->numStartedThreads - p->filledThreadStart))
    891         MtDecThread_FreeInBufs(&p->threads[i]);
    892   }
    893 
    894   return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
    895 }
    896 
    897 
    898 const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
    899 {
    900   while (p->numFilledThreads != 0)
    901   {
    902     CMtDecThread *t = &p->threads[p->filledThreadStart];
    903 
    904     if (*inLim != 0)
    905     {
    906       {
    907         void *link = t->inBuf;
    908         void *next = ((CMtDecBufLink *)link)->next;
    909         ISzAlloc_Free(p->alloc, link);
    910         t->inBuf = next;
    911       }
    912 
    913       if (t->inDataSize == 0)
    914       {
    915         MtDecThread_FreeInBufs(t);
    916         if (--p->numFilledThreads == 0)
    917           break;
    918         if (++p->filledThreadStart == p->numStartedThreads)
    919           p->filledThreadStart = 0;
    920         t = &p->threads[p->filledThreadStart];
    921       }
    922     }
    923 
    924     {
    925       size_t lim = t->inDataSize_Start;
    926       if (lim != 0)
    927         t->inDataSize_Start = 0;
    928       else
    929       {
    930         UInt64 rem = t->inDataSize;
    931         lim = p->inBufSize;
    932         if (lim > rem)
    933           lim = (size_t)rem;
    934       }
    935       t->inDataSize -= lim;
    936       *inLim = lim;
    937       return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
    938     }
    939   }
    940 
    941   {
    942     size_t crossSize = p->crossEnd - p->crossStart;
    943     if (crossSize != 0)
    944     {
    945       const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
    946       *inLim = crossSize;
    947       p->crossStart = 0;
    948       p->crossEnd = 0;
    949       return data;
    950     }
    951     *inLim = 0;
    952     if (p->crossBlock)
    953     {
    954       ISzAlloc_Free(p->alloc, p->crossBlock);
    955       p->crossBlock = NULL;
    956     }
    957     return NULL;
    958   }
    959 }
    960 
    961 
    962 void MtDec_Construct(CMtDec *p)
    963 {
    964   unsigned i;
    965 
    966   p->inBufSize = (size_t)1 << 18;
    967 
    968   p->numThreadsMax = 0;
    969 
    970   p->inStream = NULL;
    971 
    972   // p->inData = NULL;
    973   // p->inDataSize = 0;
    974 
    975   p->crossBlock = NULL;
    976   p->crossStart = 0;
    977   p->crossEnd = 0;
    978 
    979   p->numFilledThreads = 0;
    980 
    981   p->progress = NULL;
    982   p->alloc = NULL;
    983 
    984   p->mtCallback = NULL;
    985   p->mtCallbackObject = NULL;
    986 
    987   p->allocatedBufsSize = 0;
    988 
    989   for (i = 0; i < MTDEC__THREADS_MAX; i++)
    990   {
    991     CMtDecThread *t = &p->threads[i];
    992     t->mtDec = p;
    993     t->index = i;
    994     t->inBuf = NULL;
    995     Event_Construct(&t->canRead);
    996     Event_Construct(&t->canWrite);
    997     Thread_Construct(&t->thread);
    998   }
    999 
   1000   // Event_Construct(&p->finishedEvent);
   1001 
   1002   CriticalSection_Init(&p->mtProgress.cs);
   1003 }
   1004 
   1005 
   1006 static void MtDec_Free(CMtDec *p)
   1007 {
   1008   unsigned i;
   1009 
   1010   p->exitThread = True;
   1011 
   1012   for (i = 0; i < MTDEC__THREADS_MAX; i++)
   1013     MtDecThread_Destruct(&p->threads[i]);
   1014 
   1015   // Event_Close(&p->finishedEvent);
   1016 
   1017   if (p->crossBlock)
   1018   {
   1019     ISzAlloc_Free(p->alloc, p->crossBlock);
   1020     p->crossBlock = NULL;
   1021   }
   1022 }
   1023 
   1024 
   1025 void MtDec_Destruct(CMtDec *p)
   1026 {
   1027   MtDec_Free(p);
   1028 
   1029   CriticalSection_Delete(&p->mtProgress.cs);
   1030 }
   1031 
   1032 
   1033 SRes MtDec_Code(CMtDec *p)
   1034 {
   1035   unsigned i;
   1036 
   1037   p->inProcessed = 0;
   1038 
   1039   p->blockIndex = 1; // it must be larger than not_defined index (0)
   1040   p->isAllocError = False;
   1041   p->overflow = False;
   1042   p->threadingErrorSRes = SZ_OK;
   1043 
   1044   p->needContinue = True;
   1045 
   1046   p->readWasFinished = False;
   1047   p->needInterrupt = False;
   1048   p->interruptIndex = (UInt64)(Int64)-1;
   1049 
   1050   p->readProcessed = 0;
   1051   p->readRes = SZ_OK;
   1052   p->codeRes = SZ_OK;
   1053   p->wasInterrupted = False;
   1054 
   1055   p->crossStart = 0;
   1056   p->crossEnd = 0;
   1057 
   1058   p->filledThreadStart = 0;
   1059   p->numFilledThreads = 0;
   1060 
   1061   {
   1062     unsigned numThreads = p->numThreadsMax;
   1063     if (numThreads > MTDEC__THREADS_MAX)
   1064       numThreads = MTDEC__THREADS_MAX;
   1065     p->numStartedThreads_Limit = numThreads;
   1066     p->numStartedThreads = 0;
   1067   }
   1068 
   1069   if (p->inBufSize != p->allocatedBufsSize)
   1070   {
   1071     for (i = 0; i < MTDEC__THREADS_MAX; i++)
   1072     {
   1073       CMtDecThread *t = &p->threads[i];
   1074       if (t->inBuf)
   1075         MtDecThread_FreeInBufs(t);
   1076     }
   1077     if (p->crossBlock)
   1078     {
   1079       ISzAlloc_Free(p->alloc, p->crossBlock);
   1080       p->crossBlock = NULL;
   1081     }
   1082 
   1083     p->allocatedBufsSize = p->inBufSize;
   1084   }
   1085 
   1086   MtProgress_Init(&p->mtProgress, p->progress);
   1087 
   1088   // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
   1089   p->exitThread = False;
   1090   p->exitThreadWRes = 0;
   1091 
   1092   {
   1093     WRes wres;
   1094     WRes sres;
   1095     CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
   1096     // wres = MtDecThread_CreateAndStart(nextThread);
   1097     wres = MtDecThread_CreateEvents(nextThread);
   1098     if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
   1099     if (wres == 0) { wres = Event_Set(&nextThread->canRead);
   1100     if (wres == 0) { wres = ThreadFunc(nextThread);
   1101     if (wres != 0)
   1102     {
   1103       p->needContinue = False;
   1104       MtDec_CloseThreads(p);
   1105     }}}}
   1106 
   1107     // wres = 17; // for test
   1108     // wres = Event_Wait(&p->finishedEvent);
   1109 
   1110     sres = MY_SRes_HRESULT_FROM_WRes(wres);
   1111 
   1112     if (sres != 0)
   1113       p->threadingErrorSRes = sres;
   1114 
   1115     if (
   1116         // wres == 0
   1117         // wres != 0
   1118         // || p->mtc.codeRes == SZ_ERROR_MEM
   1119         p->isAllocError
   1120         || p->threadingErrorSRes != SZ_OK
   1121         || p->overflow)
   1122     {
   1123       // p->needContinue = True;
   1124     }
   1125     else
   1126       p->needContinue = False;
   1127 
   1128     if (p->needContinue)
   1129       return SZ_OK;
   1130 
   1131     // if (sres != SZ_OK)
   1132       return sres;
   1133     // return E_FAIL;
   1134   }
   1135 }
   1136 
   1137 #endif
   1138