1 /* MtDec.c -- Multi-thread Decoder 2 2018-07-04 : Igor Pavlov : Public domain */ 3 4 #include "Precomp.h" 5 6 // #define SHOW_DEBUG_INFO 7 8 // #include <stdio.h> 9 10 #ifdef SHOW_DEBUG_INFO 11 #include <stdio.h> 12 #endif 13 14 #ifdef SHOW_DEBUG_INFO 15 #define PRF(x) x 16 #else 17 #define PRF(x) 18 #endif 19 20 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d)) 21 22 #include "MtDec.h" 23 24 #ifndef _7ZIP_ST 25 26 void MtProgress_Init(CMtProgress *p, ICompressProgress *progress) 27 { 28 p->progress = progress; 29 p->res = SZ_OK; 30 p->totalInSize = 0; 31 p->totalOutSize = 0; 32 } 33 34 35 SRes MtProgress_Progress_ST(CMtProgress *p) 36 { 37 if (p->res == SZ_OK && p->progress) 38 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK) 39 p->res = SZ_ERROR_PROGRESS; 40 return p->res; 41 } 42 43 44 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize) 45 { 46 SRes res; 47 CriticalSection_Enter(&p->cs); 48 49 p->totalInSize += inSize; 50 p->totalOutSize += outSize; 51 if (p->res == SZ_OK && p->progress) 52 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK) 53 p->res = SZ_ERROR_PROGRESS; 54 res = p->res; 55 56 CriticalSection_Leave(&p->cs); 57 return res; 58 } 59 60 61 SRes MtProgress_GetError(CMtProgress *p) 62 { 63 SRes res; 64 CriticalSection_Enter(&p->cs); 65 res = p->res; 66 CriticalSection_Leave(&p->cs); 67 return res; 68 } 69 70 71 void MtProgress_SetError(CMtProgress *p, SRes res) 72 { 73 CriticalSection_Enter(&p->cs); 74 if (p->res == SZ_OK) 75 p->res = res; 76 CriticalSection_Leave(&p->cs); 77 } 78 79 80 #define RINOK_THREAD(x) RINOK(x) 81 82 83 static WRes ArEvent_OptCreate_And_Reset(CEvent *p) 84 { 85 if (Event_IsCreated(p)) 86 return Event_Reset(p); 87 return AutoResetEvent_CreateNotSignaled(p); 88 } 89 90 91 92 typedef struct 93 { 94 void *next; 95 void *pad[3]; 96 } CMtDecBufLink; 97 98 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink) 99 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET) 100 101 102 103 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp); 104 105 106 static WRes MtDecThread_CreateEvents(CMtDecThread *t) 107 { 108 WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite); 109 if (wres == 0) 110 { 111 wres = ArEvent_OptCreate_And_Reset(&t->canRead); 112 if (wres == 0) 113 return SZ_OK; 114 } 115 return wres; 116 } 117 118 119 static SRes MtDecThread_CreateAndStart(CMtDecThread *t) 120 { 121 WRes wres = MtDecThread_CreateEvents(t); 122 // wres = 17; // for test 123 if (wres == 0) 124 { 125 if (Thread_WasCreated(&t->thread)) 126 return SZ_OK; 127 wres = Thread_Create(&t->thread, ThreadFunc, t); 128 if (wres == 0) 129 return SZ_OK; 130 } 131 return MY_SRes_HRESULT_FROM_WRes(wres); 132 } 133 134 135 void MtDecThread_FreeInBufs(CMtDecThread *t) 136 { 137 if (t->inBuf) 138 { 139 void *link = t->inBuf; 140 t->inBuf = NULL; 141 do 142 { 143 void *next = ((CMtDecBufLink *)link)->next; 144 ISzAlloc_Free(t->mtDec->alloc, link); 145 link = next; 146 } 147 while (link); 148 } 149 } 150 151 152 static void MtDecThread_CloseThread(CMtDecThread *t) 153 { 154 if (Thread_WasCreated(&t->thread)) 155 { 156 Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */ 157 Event_Set(&t->canRead); 158 Thread_Wait(&t->thread); 159 Thread_Close(&t->thread); 160 } 161 162 Event_Close(&t->canRead); 163 Event_Close(&t->canWrite); 164 } 165 166 static void MtDec_CloseThreads(CMtDec *p) 167 { 168 unsigned i; 169 for (i = 0; i < MTDEC__THREADS_MAX; i++) 170 MtDecThread_CloseThread(&p->threads[i]); 171 } 172 173 static void MtDecThread_Destruct(CMtDecThread *t) 174 { 175 MtDecThread_CloseThread(t); 176 MtDecThread_FreeInBufs(t); 177 } 178 179 180 181 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize) 182 { 183 size_t size = *processedSize; 184 *processedSize = 0; 185 while (size != 0) 186 { 187 size_t cur = size; 188 SRes res = ISeqInStream_Read(stream, data, &cur); 189 *processedSize += cur; 190 data += cur; 191 size -= cur; 192 RINOK(res); 193 if (cur == 0) 194 return SZ_OK; 195 } 196 return SZ_OK; 197 } 198 199 200 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted) 201 { 202 SRes res; 203 CriticalSection_Enter(&p->mtProgress.cs); 204 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex); 205 res = p->mtProgress.res; 206 CriticalSection_Leave(&p->mtProgress.cs); 207 return res; 208 } 209 210 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted) 211 { 212 SRes res; 213 CriticalSection_Enter(&p->mtProgress.cs); 214 215 p->mtProgress.totalInSize += inSize; 216 p->mtProgress.totalOutSize += outSize; 217 if (p->mtProgress.res == SZ_OK && p->mtProgress.progress) 218 if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK) 219 p->mtProgress.res = SZ_ERROR_PROGRESS; 220 221 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex); 222 res = p->mtProgress.res; 223 224 CriticalSection_Leave(&p->mtProgress.cs); 225 226 return res; 227 } 228 229 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex) 230 { 231 CriticalSection_Enter(&p->mtProgress.cs); 232 if (!p->needInterrupt || interruptIndex < p->interruptIndex) 233 { 234 p->interruptIndex = interruptIndex; 235 p->needInterrupt = True; 236 } 237 CriticalSection_Leave(&p->mtProgress.cs); 238 } 239 240 Byte *MtDec_GetCrossBuff(CMtDec *p) 241 { 242 Byte *cr = p->crossBlock; 243 if (!cr) 244 { 245 cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize); 246 if (!cr) 247 return NULL; 248 p->crossBlock = cr; 249 } 250 return MTDEC__DATA_PTR_FROM_LINK(cr); 251 } 252 253 254 /* 255 ThreadFunc2() returns: 256 0 - in all normal cases (even for stream error or memory allocation error) 257 (!= 0) - WRes error return by system threading function 258 */ 259 260 // #define MTDEC_ProgessStep (1 << 22) 261 #define MTDEC_ProgessStep (1 << 0) 262 263 static WRes ThreadFunc2(CMtDecThread *t) 264 { 265 CMtDec *p = t->mtDec; 266 267 PRF_STR_INT("ThreadFunc2", t->index); 268 269 // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index); 270 271 for (;;) 272 { 273 SRes res, codeRes; 274 BoolInt wasInterrupted, isAllocError, overflow, finish; 275 SRes threadingErrorSRes; 276 BoolInt needCode, needWrite, needContinue; 277 278 size_t inDataSize_Start; 279 UInt64 inDataSize; 280 // UInt64 inDataSize_Full; 281 282 UInt64 blockIndex; 283 284 UInt64 inPrev = 0; 285 UInt64 outPrev = 0; 286 UInt64 inCodePos; 287 UInt64 outCodePos; 288 289 Byte *afterEndData = NULL; 290 size_t afterEndData_Size = 0; 291 292 BoolInt canCreateNewThread = False; 293 // CMtDecCallbackInfo parse; 294 CMtDecThread *nextThread; 295 296 PRF_STR_INT("Event_Wait(&t->canRead)", t->index); 297 298 RINOK_THREAD(Event_Wait(&t->canRead)); 299 if (p->exitThread) 300 return 0; 301 302 PRF_STR_INT("after Event_Wait(&t->canRead)", t->index); 303 304 // if (t->index == 3) return 19; // for test 305 306 blockIndex = p->blockIndex++; 307 308 // PRF(printf("\ncanRead\n")) 309 310 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted); 311 312 finish = p->readWasFinished; 313 needCode = False; 314 needWrite = False; 315 isAllocError = False; 316 overflow = False; 317 318 inDataSize_Start = 0; 319 inDataSize = 0; 320 // inDataSize_Full = 0; 321 322 if (res == SZ_OK && !wasInterrupted) 323 { 324 // if (p->inStream) 325 { 326 CMtDecBufLink *prev = NULL; 327 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf; 328 size_t crossSize = p->crossEnd - p->crossStart; 329 330 PRF(printf("\ncrossSize = %d\n", crossSize)); 331 332 for (;;) 333 { 334 if (!link) 335 { 336 link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize); 337 if (!link) 338 { 339 finish = True; 340 // p->allocError_for_Read_BlockIndex = blockIndex; 341 isAllocError = True; 342 break; 343 } 344 link->next = NULL; 345 if (prev) 346 { 347 // static unsigned g_num = 0; 348 // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev)); 349 prev->next = link; 350 } 351 else 352 t->inBuf = (void *)link; 353 } 354 355 { 356 Byte *data = MTDEC__DATA_PTR_FROM_LINK(link); 357 Byte *parseData = data; 358 size_t size; 359 360 if (crossSize != 0) 361 { 362 inDataSize = crossSize; 363 // inDataSize_Full = inDataSize; 364 inDataSize_Start = crossSize; 365 size = crossSize; 366 parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart; 367 PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d", 368 (int)p->crossStart, (int)p->crossEnd, (int)finish)); 369 } 370 else 371 { 372 size = p->inBufSize; 373 374 res = FullRead(p->inStream, data, &size); 375 376 // size = 10; // test 377 378 inDataSize += size; 379 // inDataSize_Full = inDataSize; 380 if (!prev) 381 inDataSize_Start = size; 382 383 p->readProcessed += size; 384 finish = (size != p->inBufSize); 385 if (finish) 386 p->readWasFinished = True; 387 388 // res = E_INVALIDARG; // test 389 390 if (res != SZ_OK) 391 { 392 // PRF(printf("\nRead error = %d\n", res)) 393 // we want to decode all data before error 394 p->readRes = res; 395 // p->readError_BlockIndex = blockIndex; 396 p->readWasFinished = True; 397 finish = True; 398 res = SZ_OK; 399 // break; 400 } 401 402 if (inDataSize - inPrev >= MTDEC_ProgessStep) 403 { 404 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted); 405 if (res != SZ_OK || wasInterrupted) 406 break; 407 inPrev = inDataSize; 408 } 409 } 410 411 { 412 CMtDecCallbackInfo parse; 413 414 parse.startCall = (prev == NULL); 415 parse.src = parseData; 416 parse.srcSize = size; 417 parse.srcFinished = finish; 418 parse.canCreateNewThread = True; 419 420 // PRF(printf("\nParse size = %d\n", (unsigned)size)) 421 422 p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse); 423 424 needWrite = True; 425 canCreateNewThread = parse.canCreateNewThread; 426 427 // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize); 428 429 if ( 430 // parseRes != SZ_OK || 431 // inDataSize - (size - parse.srcSize) > p->inBlockMax 432 // || 433 parse.state == MTDEC_PARSE_OVERFLOW 434 // || wasInterrupted 435 ) 436 { 437 // Overflow or Parse error - switch from MT decoding to ST decoding 438 finish = True; 439 overflow = True; 440 441 { 442 PRF(printf("\n Overflow")); 443 // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished)); 444 PRF(printf("\n inDataSize = %d", (unsigned)inDataSize)); 445 } 446 447 if (crossSize != 0) 448 memcpy(data, parseData, size); 449 p->crossStart = 0; 450 p->crossEnd = 0; 451 break; 452 } 453 454 if (crossSize != 0) 455 { 456 memcpy(data, parseData, parse.srcSize); 457 p->crossStart += parse.srcSize; 458 } 459 460 if (parse.state != MTDEC_PARSE_CONTINUE || finish) 461 { 462 // we don't need to parse in current thread anymore 463 464 if (parse.state == MTDEC_PARSE_END) 465 finish = True; 466 467 needCode = True; 468 // p->crossFinished = finish; 469 470 if (parse.srcSize == size) 471 { 472 // full parsed - no cross transfer 473 p->crossStart = 0; 474 p->crossEnd = 0; 475 break; 476 } 477 478 if (parse.state == MTDEC_PARSE_END) 479 { 480 p->crossStart = 0; 481 p->crossEnd = 0; 482 483 if (crossSize != 0) 484 memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data 485 afterEndData_Size = size - parse.srcSize; 486 afterEndData = parseData + parse.srcSize; 487 488 // we reduce data size to required bytes (parsed only) 489 inDataSize -= (size - parse.srcSize); 490 if (!prev) 491 inDataSize_Start = parse.srcSize; 492 break; 493 } 494 495 { 496 // partial parsed - need cross transfer 497 if (crossSize != 0) 498 inDataSize = parse.srcSize; // it's only parsed now 499 else 500 { 501 // partial parsed - is not in initial cross block - we need to copy new data to cross block 502 Byte *cr = MtDec_GetCrossBuff(p); 503 if (!cr) 504 { 505 { 506 PRF(printf("\ncross alloc error error\n")); 507 // res = SZ_ERROR_MEM; 508 finish = True; 509 // p->allocError_for_Read_BlockIndex = blockIndex; 510 isAllocError = True; 511 break; 512 } 513 } 514 515 { 516 size_t crSize = size - parse.srcSize; 517 inDataSize -= crSize; 518 p->crossEnd = crSize; 519 p->crossStart = 0; 520 memcpy(cr, parseData + parse.srcSize, crSize); 521 } 522 } 523 524 // inDataSize_Full = inDataSize; 525 if (!prev) 526 inDataSize_Start = parse.srcSize; // it's partial size (parsed only) 527 528 finish = False; 529 break; 530 } 531 } 532 533 if (parse.srcSize != size) 534 { 535 res = SZ_ERROR_FAIL; 536 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res)); 537 break; 538 } 539 } 540 } 541 542 prev = link; 543 link = link->next; 544 545 if (crossSize != 0) 546 { 547 crossSize = 0; 548 p->crossStart = 0; 549 p->crossEnd = 0; 550 } 551 } 552 } 553 554 if (res == SZ_OK) 555 res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted); 556 } 557 558 codeRes = SZ_OK; 559 560 if (res == SZ_OK && needCode && !wasInterrupted) 561 { 562 codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index); 563 if (codeRes != SZ_OK) 564 { 565 needCode = False; 566 finish = True; 567 // SZ_ERROR_MEM is expected error here. 568 // if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later. 569 // if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding. 570 } 571 } 572 573 if (res != SZ_OK || wasInterrupted) 574 finish = True; 575 576 nextThread = NULL; 577 threadingErrorSRes = SZ_OK; 578 579 if (!finish) 580 { 581 if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread) 582 { 583 SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]); 584 if (res2 == SZ_OK) 585 { 586 // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads)); 587 p->numStartedThreads++; 588 } 589 else 590 { 591 PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads)); 592 if (p->numStartedThreads == 1) 593 { 594 // if only one thread is possible, we leave muti-threading code 595 finish = True; 596 needCode = False; 597 threadingErrorSRes = res2; 598 } 599 else 600 p->numStartedThreads_Limit = p->numStartedThreads; 601 } 602 } 603 604 if (!finish) 605 { 606 unsigned nextIndex = t->index + 1; 607 nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex]; 608 RINOK_THREAD(Event_Set(&nextThread->canRead)) 609 // We have started executing for new iteration (with next thread) 610 // And that next thread now is responsible for possible exit from decoding (threading_code) 611 } 612 } 613 614 // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite) 615 // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case 616 // if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block): 617 // - if (needContinue) after Write(&needContinue), we restore decoding with new iteration 618 // - otherwise we stop decoding and exit from ThreadFunc2() 619 620 // Don't change (finish) variable in the further code 621 622 623 // ---------- CODE ---------- 624 625 inPrev = 0; 626 outPrev = 0; 627 inCodePos = 0; 628 outCodePos = 0; 629 630 if (res == SZ_OK && needCode && codeRes == SZ_OK) 631 { 632 BoolInt isStartBlock = True; 633 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf; 634 635 for (;;) 636 { 637 size_t inSize; 638 int stop; 639 640 if (isStartBlock) 641 inSize = inDataSize_Start; 642 else 643 { 644 UInt64 rem = inDataSize - inCodePos; 645 inSize = p->inBufSize; 646 if (inSize > rem) 647 inSize = (size_t)rem; 648 } 649 650 inCodePos += inSize; 651 stop = True; 652 653 codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index, 654 (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize, 655 (inCodePos == inDataSize), // srcFinished 656 &inCodePos, &outCodePos, &stop); 657 658 if (codeRes != SZ_OK) 659 { 660 PRF(printf("\nCode Interrupt error = %x\n", codeRes)); 661 // we interrupt only later blocks 662 MtDec_Interrupt(p, blockIndex); 663 break; 664 } 665 666 if (stop || inCodePos == inDataSize) 667 break; 668 669 { 670 const UInt64 inDelta = inCodePos - inPrev; 671 const UInt64 outDelta = outCodePos - outPrev; 672 if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep) 673 { 674 // Sleep(1); 675 res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted); 676 if (res != SZ_OK || wasInterrupted) 677 break; 678 inPrev = inCodePos; 679 outPrev = outCodePos; 680 } 681 } 682 683 link = link->next; 684 isStartBlock = False; 685 } 686 } 687 688 689 // ---------- WRITE ---------- 690 691 RINOK_THREAD(Event_Wait(&t->canWrite)); 692 693 { 694 BoolInt isErrorMode = False; 695 BoolInt canRecode = True; 696 BoolInt needWriteToStream = needWrite; 697 698 if (p->exitThread) return 0; // it's never executed in normal cases 699 700 if (p->wasInterrupted) 701 wasInterrupted = True; 702 else 703 { 704 if (codeRes != SZ_OK) // || !needCode // check it !!! 705 { 706 p->wasInterrupted = True; 707 p->codeRes = codeRes; 708 if (codeRes == SZ_ERROR_MEM) 709 isAllocError = True; 710 } 711 712 if (threadingErrorSRes) 713 { 714 p->wasInterrupted = True; 715 p->threadingErrorSRes = threadingErrorSRes; 716 needWriteToStream = False; 717 } 718 if (isAllocError) 719 { 720 p->wasInterrupted = True; 721 p->isAllocError = True; 722 needWriteToStream = False; 723 } 724 if (overflow) 725 { 726 p->wasInterrupted = True; 727 p->overflow = True; 728 needWriteToStream = False; 729 } 730 } 731 732 if (needCode) 733 { 734 if (wasInterrupted) 735 { 736 inCodePos = 0; 737 outCodePos = 0; 738 } 739 { 740 const UInt64 inDelta = inCodePos - inPrev; 741 const UInt64 outDelta = outCodePos - outPrev; 742 // if (inDelta != 0 || outDelta != 0) 743 res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta); 744 } 745 } 746 747 needContinue = (!finish); 748 749 // if (res == SZ_OK && needWrite && !wasInterrupted) 750 if (needWrite) 751 { 752 // p->inProcessed += inCodePos; 753 754 res = p->mtCallback->Write(p->mtCallbackObject, t->index, 755 res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite 756 afterEndData, afterEndData_Size, 757 &needContinue, 758 &canRecode); 759 760 // res= E_INVALIDARG; // for test 761 762 PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue)); 763 PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed)); 764 765 if (res != SZ_OK) 766 { 767 PRF(printf("\nWrite error = %d\n", res)); 768 isErrorMode = True; 769 p->wasInterrupted = True; 770 } 771 if (res != SZ_OK 772 || (!needContinue && !finish)) 773 { 774 PRF(printf("\nWrite Interrupt error = %x\n", res)); 775 MtDec_Interrupt(p, blockIndex); 776 } 777 } 778 779 if (canRecode) 780 if (!needCode 781 || res != SZ_OK 782 || p->wasInterrupted 783 || codeRes != SZ_OK 784 || wasInterrupted 785 || p->numFilledThreads != 0 786 || isErrorMode) 787 { 788 if (p->numFilledThreads == 0) 789 p->filledThreadStart = t->index; 790 if (inDataSize != 0 || !finish) 791 { 792 t->inDataSize_Start = inDataSize_Start; 793 t->inDataSize = inDataSize; 794 p->numFilledThreads++; 795 } 796 PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads)); 797 PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart)); 798 } 799 800 if (!finish) 801 { 802 RINOK_THREAD(Event_Set(&nextThread->canWrite)); 803 } 804 else 805 { 806 if (needContinue) 807 { 808 // we restore decoding with new iteration 809 RINOK_THREAD(Event_Set(&p->threads[0].canWrite)); 810 } 811 else 812 { 813 // we exit from decoding 814 if (t->index == 0) 815 return SZ_OK; 816 p->exitThread = True; 817 } 818 RINOK_THREAD(Event_Set(&p->threads[0].canRead)); 819 } 820 } 821 } 822 } 823 824 #ifdef _WIN32 825 #define USE_ALLOCA 826 #endif 827 828 #ifdef USE_ALLOCA 829 #ifdef _WIN32 830 #include <malloc.h> 831 #else 832 #include <stdlib.h> 833 #endif 834 #endif 835 836 837 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp) 838 { 839 WRes res; 840 841 CMtDecThread *t = (CMtDecThread *)pp; 842 CMtDec *p; 843 844 // fprintf(stdout, "\n%d = %p\n", t->index, &t); 845 846 res = ThreadFunc2(t); 847 p = t->mtDec; 848 if (res == 0) 849 return p->exitThreadWRes; 850 { 851 // it's unexpected situation for some threading function error 852 if (p->exitThreadWRes == 0) 853 p->exitThreadWRes = res; 854 PRF(printf("\nthread exit error = %d\n", res)); 855 p->exitThread = True; 856 Event_Set(&p->threads[0].canRead); 857 Event_Set(&p->threads[0].canWrite); 858 MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res)); 859 } 860 return res; 861 } 862 863 static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp) 864 { 865 CMtDecThread *t = (CMtDecThread *)pp; 866 867 // fprintf(stderr, "\n%d = %p - before", t->index, &t); 868 #ifdef USE_ALLOCA 869 t->allocaPtr = alloca(t->index * 128); 870 #endif 871 return ThreadFunc1(pp); 872 } 873 874 875 int MtDec_PrepareRead(CMtDec *p) 876 { 877 if (p->crossBlock && p->crossStart == p->crossEnd) 878 { 879 ISzAlloc_Free(p->alloc, p->crossBlock); 880 p->crossBlock = NULL; 881 } 882 883 { 884 unsigned i; 885 for (i = 0; i < MTDEC__THREADS_MAX; i++) 886 if (i > p->numStartedThreads 887 || p->numFilledThreads <= 888 (i >= p->filledThreadStart ? 889 i - p->filledThreadStart : 890 i + p->numStartedThreads - p->filledThreadStart)) 891 MtDecThread_FreeInBufs(&p->threads[i]); 892 } 893 894 return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd); 895 } 896 897 898 const Byte *MtDec_Read(CMtDec *p, size_t *inLim) 899 { 900 while (p->numFilledThreads != 0) 901 { 902 CMtDecThread *t = &p->threads[p->filledThreadStart]; 903 904 if (*inLim != 0) 905 { 906 { 907 void *link = t->inBuf; 908 void *next = ((CMtDecBufLink *)link)->next; 909 ISzAlloc_Free(p->alloc, link); 910 t->inBuf = next; 911 } 912 913 if (t->inDataSize == 0) 914 { 915 MtDecThread_FreeInBufs(t); 916 if (--p->numFilledThreads == 0) 917 break; 918 if (++p->filledThreadStart == p->numStartedThreads) 919 p->filledThreadStart = 0; 920 t = &p->threads[p->filledThreadStart]; 921 } 922 } 923 924 { 925 size_t lim = t->inDataSize_Start; 926 if (lim != 0) 927 t->inDataSize_Start = 0; 928 else 929 { 930 UInt64 rem = t->inDataSize; 931 lim = p->inBufSize; 932 if (lim > rem) 933 lim = (size_t)rem; 934 } 935 t->inDataSize -= lim; 936 *inLim = lim; 937 return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf); 938 } 939 } 940 941 { 942 size_t crossSize = p->crossEnd - p->crossStart; 943 if (crossSize != 0) 944 { 945 const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart; 946 *inLim = crossSize; 947 p->crossStart = 0; 948 p->crossEnd = 0; 949 return data; 950 } 951 *inLim = 0; 952 if (p->crossBlock) 953 { 954 ISzAlloc_Free(p->alloc, p->crossBlock); 955 p->crossBlock = NULL; 956 } 957 return NULL; 958 } 959 } 960 961 962 void MtDec_Construct(CMtDec *p) 963 { 964 unsigned i; 965 966 p->inBufSize = (size_t)1 << 18; 967 968 p->numThreadsMax = 0; 969 970 p->inStream = NULL; 971 972 // p->inData = NULL; 973 // p->inDataSize = 0; 974 975 p->crossBlock = NULL; 976 p->crossStart = 0; 977 p->crossEnd = 0; 978 979 p->numFilledThreads = 0; 980 981 p->progress = NULL; 982 p->alloc = NULL; 983 984 p->mtCallback = NULL; 985 p->mtCallbackObject = NULL; 986 987 p->allocatedBufsSize = 0; 988 989 for (i = 0; i < MTDEC__THREADS_MAX; i++) 990 { 991 CMtDecThread *t = &p->threads[i]; 992 t->mtDec = p; 993 t->index = i; 994 t->inBuf = NULL; 995 Event_Construct(&t->canRead); 996 Event_Construct(&t->canWrite); 997 Thread_Construct(&t->thread); 998 } 999 1000 // Event_Construct(&p->finishedEvent); 1001 1002 CriticalSection_Init(&p->mtProgress.cs); 1003 } 1004 1005 1006 static void MtDec_Free(CMtDec *p) 1007 { 1008 unsigned i; 1009 1010 p->exitThread = True; 1011 1012 for (i = 0; i < MTDEC__THREADS_MAX; i++) 1013 MtDecThread_Destruct(&p->threads[i]); 1014 1015 // Event_Close(&p->finishedEvent); 1016 1017 if (p->crossBlock) 1018 { 1019 ISzAlloc_Free(p->alloc, p->crossBlock); 1020 p->crossBlock = NULL; 1021 } 1022 } 1023 1024 1025 void MtDec_Destruct(CMtDec *p) 1026 { 1027 MtDec_Free(p); 1028 1029 CriticalSection_Delete(&p->mtProgress.cs); 1030 } 1031 1032 1033 SRes MtDec_Code(CMtDec *p) 1034 { 1035 unsigned i; 1036 1037 p->inProcessed = 0; 1038 1039 p->blockIndex = 1; // it must be larger than not_defined index (0) 1040 p->isAllocError = False; 1041 p->overflow = False; 1042 p->threadingErrorSRes = SZ_OK; 1043 1044 p->needContinue = True; 1045 1046 p->readWasFinished = False; 1047 p->needInterrupt = False; 1048 p->interruptIndex = (UInt64)(Int64)-1; 1049 1050 p->readProcessed = 0; 1051 p->readRes = SZ_OK; 1052 p->codeRes = SZ_OK; 1053 p->wasInterrupted = False; 1054 1055 p->crossStart = 0; 1056 p->crossEnd = 0; 1057 1058 p->filledThreadStart = 0; 1059 p->numFilledThreads = 0; 1060 1061 { 1062 unsigned numThreads = p->numThreadsMax; 1063 if (numThreads > MTDEC__THREADS_MAX) 1064 numThreads = MTDEC__THREADS_MAX; 1065 p->numStartedThreads_Limit = numThreads; 1066 p->numStartedThreads = 0; 1067 } 1068 1069 if (p->inBufSize != p->allocatedBufsSize) 1070 { 1071 for (i = 0; i < MTDEC__THREADS_MAX; i++) 1072 { 1073 CMtDecThread *t = &p->threads[i]; 1074 if (t->inBuf) 1075 MtDecThread_FreeInBufs(t); 1076 } 1077 if (p->crossBlock) 1078 { 1079 ISzAlloc_Free(p->alloc, p->crossBlock); 1080 p->crossBlock = NULL; 1081 } 1082 1083 p->allocatedBufsSize = p->inBufSize; 1084 } 1085 1086 MtProgress_Init(&p->mtProgress, p->progress); 1087 1088 // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent)); 1089 p->exitThread = False; 1090 p->exitThreadWRes = 0; 1091 1092 { 1093 WRes wres; 1094 WRes sres; 1095 CMtDecThread *nextThread = &p->threads[p->numStartedThreads++]; 1096 // wres = MtDecThread_CreateAndStart(nextThread); 1097 wres = MtDecThread_CreateEvents(nextThread); 1098 if (wres == 0) { wres = Event_Set(&nextThread->canWrite); 1099 if (wres == 0) { wres = Event_Set(&nextThread->canRead); 1100 if (wres == 0) { wres = ThreadFunc(nextThread); 1101 if (wres != 0) 1102 { 1103 p->needContinue = False; 1104 MtDec_CloseThreads(p); 1105 }}}} 1106 1107 // wres = 17; // for test 1108 // wres = Event_Wait(&p->finishedEvent); 1109 1110 sres = MY_SRes_HRESULT_FROM_WRes(wres); 1111 1112 if (sres != 0) 1113 p->threadingErrorSRes = sres; 1114 1115 if ( 1116 // wres == 0 1117 // wres != 0 1118 // || p->mtc.codeRes == SZ_ERROR_MEM 1119 p->isAllocError 1120 || p->threadingErrorSRes != SZ_OK 1121 || p->overflow) 1122 { 1123 // p->needContinue = True; 1124 } 1125 else 1126 p->needContinue = False; 1127 1128 if (p->needContinue) 1129 return SZ_OK; 1130 1131 // if (sres != SZ_OK) 1132 return sres; 1133 // return E_FAIL; 1134 } 1135 } 1136 1137 #endif 1138