1 #include "../test.h" 2 #include "rxcpp/operators/rx-combine_latest.hpp" 3 4 SCENARIO("combine_latest interleaved with tail", "[combine_latest][join][operators]"){ 5 GIVEN("2 hot observables of ints."){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 10 auto o1 = sc.make_hot_observable({ 11 on.next(150, 1), 12 on.next(215, 2), 13 on.next(225, 4), 14 on.completed(230) 15 }); 16 17 auto o2 = sc.make_hot_observable({ 18 on.next(150, 1), 19 on.next(220, 3), 20 on.next(230, 5), 21 on.next(235, 6), 22 on.next(240, 7), 23 on.completed(250) 24 }); 25 26 WHEN("each int is combined with the latest from the other source"){ 27 28 auto res = w.start( 29 [&]() { 30 return o2 31 .combine_latest( 32 [](int v2, int v1){ 33 return v2 + v1; 34 }, 35 o1 36 ) 37 // forget type to workaround lambda deduction bug on msvc 2013 38 .as_dynamic(); 39 } 40 ); 41 42 THEN("the output contains combined ints"){ 43 auto required = rxu::to_vector({ 44 on.next(220, 2 + 3), 45 on.next(225, 4 + 3), 46 on.next(230, 4 + 5), 47 on.next(235, 4 + 6), 48 on.next(240, 4 + 7), 49 on.completed(250) 50 }); 51 auto actual = res.get_observer().messages(); 52 REQUIRE(required == actual); 53 } 54 55 THEN("there was one subscription and one unsubscription to the o1"){ 56 auto required = rxu::to_vector({ 57 on.subscribe(200, 230) 58 }); 59 auto actual = o1.subscriptions(); 60 REQUIRE(required == actual); 61 } 62 63 THEN("there was one subscription and one unsubscription to the o2"){ 64 auto required = rxu::to_vector({ 65 on.subscribe(200, 250) 66 }); 67 auto actual = o2.subscriptions(); 68 REQUIRE(required == actual); 69 } 70 } 71 } 72 } 73 74 SCENARIO("combine_latest consecutive", "[combine_latest][join][operators]"){ 75 GIVEN("2 hot observables of ints."){ 76 auto sc = rxsc::make_test(); 77 auto w = sc.create_worker(); 78 const rxsc::test::messages<int> on; 79 80 auto o1 = sc.make_hot_observable({ 81 on.next(150, 1), 82 on.next(215, 2), 83 on.next(225, 4), 84 on.completed(230) 85 }); 86 87 auto o2 = sc.make_hot_observable({ 88 on.next(150, 1), 89 on.next(235, 6), 90 on.next(240, 7), 91 on.completed(250) 92 }); 93 94 WHEN("each int is combined with the latest from the other source"){ 95 96 auto res = w.start( 97 [&]() { 98 return o2 99 .combine_latest( 100 [](int v2, int v1){ 101 return v2 + v1; 102 }, 103 o1 104 ) 105 // forget type to workaround lambda deduction bug on msvc 2013 106 .as_dynamic(); 107 } 108 ); 109 110 THEN("the output contains combined ints"){ 111 auto required = rxu::to_vector({ 112 on.next(235, 4 + 6), 113 on.next(240, 4 + 7), 114 on.completed(250) 115 }); 116 auto actual = res.get_observer().messages(); 117 REQUIRE(required == actual); 118 } 119 120 THEN("there was one subscription and one unsubscription to the o1"){ 121 auto required = rxu::to_vector({ 122 on.subscribe(200, 230) 123 }); 124 auto actual = o1.subscriptions(); 125 REQUIRE(required == actual); 126 } 127 128 THEN("there was one subscription and one unsubscription to the o2"){ 129 auto required = rxu::to_vector({ 130 on.subscribe(200, 250) 131 }); 132 auto actual = o2.subscriptions(); 133 REQUIRE(required == actual); 134 } 135 } 136 } 137 } 138 139 SCENARIO("combine_latest consecutive ends with error left", "[combine_latest][join][operators]"){ 140 GIVEN("2 hot observables of ints."){ 141 auto sc = rxsc::make_test(); 142 auto w = sc.create_worker(); 143 const rxsc::test::messages<int> on; 144 145 std::runtime_error ex("combine_latest on_error from source"); 146 147 auto o1 = sc.make_hot_observable({ 148 on.next(150, 1), 149 on.next(215, 2), 150 on.next(225, 4), 151 on.error(230, ex) 152 }); 153 154 auto o2 = sc.make_hot_observable({ 155 on.next(150, 1), 156 on.next(235, 6), 157 on.next(240, 7), 158 on.completed(250) 159 }); 160 161 WHEN("each int is combined with the latest from the other source"){ 162 163 auto res = w.start( 164 [&]() { 165 return o2 166 .combine_latest( 167 [](int v2, int v1){ 168 return v2 + v1; 169 }, 170 o1 171 ) 172 // forget type to workaround lambda deduction bug on msvc 2013 173 .as_dynamic(); 174 } 175 ); 176 177 THEN("the output contains only an error"){ 178 auto required = rxu::to_vector({ 179 on.error(230, ex) 180 }); 181 auto actual = res.get_observer().messages(); 182 REQUIRE(required == actual); 183 } 184 185 THEN("there was one subscription and one unsubscription to the o1"){ 186 auto required = rxu::to_vector({ 187 on.subscribe(200, 230) 188 }); 189 auto actual = o1.subscriptions(); 190 REQUIRE(required == actual); 191 } 192 193 THEN("there was one subscription and one unsubscription to the o2"){ 194 auto required = rxu::to_vector({ 195 on.subscribe(200, 230) 196 }); 197 auto actual = o2.subscriptions(); 198 REQUIRE(required == actual); 199 } 200 } 201 } 202 } 203 204 SCENARIO("combine_latest consecutive ends with error right", "[combine_latest][join][operators]"){ 205 GIVEN("2 hot observables of ints."){ 206 auto sc = rxsc::make_test(); 207 auto w = sc.create_worker(); 208 const rxsc::test::messages<int> on; 209 210 std::runtime_error ex("combine_latest on_error from source"); 211 212 auto o1 = sc.make_hot_observable({ 213 on.next(150, 1), 214 on.next(215, 2), 215 on.next(225, 4), 216 on.completed(250) 217 }); 218 219 auto o2 = sc.make_hot_observable({ 220 on.next(150, 1), 221 on.next(235, 6), 222 on.next(240, 7), 223 on.error(245, ex) 224 }); 225 226 WHEN("each int is combined with the latest from the other source"){ 227 228 auto res = w.start( 229 [&]() { 230 return o2 231 .combine_latest( 232 [](int v2, int v1){ 233 return v2 + v1; 234 }, 235 o1 236 ) 237 // forget type to workaround lambda deduction bug on msvc 2013 238 .as_dynamic(); 239 } 240 ); 241 242 THEN("the output contains combined ints followed by an error"){ 243 auto required = rxu::to_vector({ 244 on.next(235, 4 + 6), 245 on.next(240, 4 + 7), 246 on.error(245, ex) 247 }); 248 auto actual = res.get_observer().messages(); 249 REQUIRE(required == actual); 250 } 251 252 THEN("there was one subscription and one unsubscription to the o1"){ 253 auto required = rxu::to_vector({ 254 on.subscribe(200, 245) 255 }); 256 auto actual = o1.subscriptions(); 257 REQUIRE(required == actual); 258 } 259 260 THEN("there was one subscription and one unsubscription to the o2"){ 261 auto required = rxu::to_vector({ 262 on.subscribe(200, 245) 263 }); 264 auto actual = o2.subscriptions(); 265 REQUIRE(required == actual); 266 } 267 } 268 } 269 } 270 271 SCENARIO("combine_latest never N", "[combine_latest][join][operators]"){ 272 GIVEN("N never completed hot observables of ints."){ 273 auto sc = rxsc::make_test(); 274 auto w = sc.create_worker(); 275 const rxsc::test::messages<int> on; 276 277 const int N = 4; 278 279 std::vector<rxcpp::test::testable_observable<int>> n; 280 for (int i = 0; i < N; ++i) { 281 n.push_back( 282 sc.make_hot_observable({ 283 on.next(150, 1) 284 }) 285 ); 286 } 287 288 WHEN("each int is combined with the latest from the other source"){ 289 290 auto res = w.start( 291 [&]() { 292 return n[0] 293 .combine_latest( 294 [](int v0, int v1, int v2, int v3){ 295 return v0 + v1 + v2 + v3; 296 }, 297 n[1], n[2], n[3] 298 ) 299 // forget type to workaround lambda deduction bug on msvc 2013 300 .as_dynamic(); 301 } 302 ); 303 304 THEN("the output is empty"){ 305 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 306 auto actual = res.get_observer().messages(); 307 REQUIRE(required == actual); 308 } 309 310 THEN("there was one subscription and one unsubscription to each observable"){ 311 312 std::for_each(n.begin(), n.end(), [&](rxcpp::test::testable_observable<int> &s){ 313 auto required = rxu::to_vector({ 314 on.subscribe(200, 1000) 315 }); 316 auto actual = s.subscriptions(); 317 REQUIRE(required == actual); 318 }); 319 } 320 } 321 } 322 } 323 324 SCENARIO("combine_latest empty N", "[combine_latest][join][operators]"){ 325 GIVEN("N empty hot observables of ints."){ 326 auto sc = rxsc::make_test(); 327 auto w = sc.create_worker(); 328 const rxsc::test::messages<int> on; 329 330 const int N = 4; 331 332 std::vector<rxcpp::test::testable_observable<int>> e; 333 for (int i = 0; i < N; ++i) { 334 e.push_back( 335 sc.make_hot_observable({ 336 on.next(150, 1), 337 on.completed(210 + 10 * i) 338 }) 339 ); 340 } 341 342 WHEN("each int is combined with the latest from the other source"){ 343 344 auto res = w.start( 345 [&]() { 346 return e[0] 347 .combine_latest( 348 [](int v0, int v1, int v2, int v3){ 349 return v0 + v1 + v2 + v3; 350 }, 351 e[1], e[2], e[3] 352 ) 353 // forget type to workaround lambda deduction bug on msvc 2013 354 .as_dynamic(); 355 } 356 ); 357 358 THEN("the output contains only complete message"){ 359 auto required = rxu::to_vector({ 360 on.completed(200 + 10 * N) 361 }); 362 auto actual = res.get_observer().messages(); 363 REQUIRE(required == actual); 364 } 365 366 THEN("there was one subscription and one unsubscription to each observable"){ 367 368 int i = 0; 369 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){ 370 auto required = rxu::to_vector({ 371 on.subscribe(200, 200 + 10 * ++i) 372 }); 373 auto actual = s.subscriptions(); 374 REQUIRE(required == actual); 375 }); 376 } 377 } 378 } 379 } 380 381 SCENARIO("combine_latest never/empty", "[combine_latest][join][operators]"){ 382 GIVEN("2 hot observables of ints."){ 383 auto sc = rxsc::make_test(); 384 auto w = sc.create_worker(); 385 const rxsc::test::messages<int> on; 386 387 auto n = sc.make_hot_observable({ 388 on.next(150, 1) 389 }); 390 391 auto e = sc.make_hot_observable({ 392 on.next(150, 1), 393 on.completed(210) 394 }); 395 396 WHEN("each int is combined with the latest from the other source"){ 397 398 auto res = w.start( 399 [&]() { 400 return n 401 .combine_latest( 402 [](int v2, int v1){ 403 return v2 + v1; 404 }, 405 e 406 ) 407 // forget type to workaround lambda deduction bug on msvc 2013 408 .as_dynamic(); 409 } 410 ); 411 412 THEN("the output is empty"){ 413 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 414 auto actual = res.get_observer().messages(); 415 REQUIRE(required == actual); 416 } 417 418 THEN("there was one subscription and one unsubscription to the n"){ 419 auto required = rxu::to_vector({ 420 on.subscribe(200, 1000) 421 }); 422 auto actual = n.subscriptions(); 423 REQUIRE(required == actual); 424 } 425 426 THEN("there was one subscription and one unsubscription to the e"){ 427 auto required = rxu::to_vector({ 428 on.subscribe(200, 210) 429 }); 430 auto actual = e.subscriptions(); 431 REQUIRE(required == actual); 432 } 433 } 434 } 435 } 436 437 SCENARIO("combine_latest empty/never", "[combine_latest][join][operators]"){ 438 GIVEN("2 hot observables of ints."){ 439 auto sc = rxsc::make_test(); 440 auto w = sc.create_worker(); 441 const rxsc::test::messages<int> on; 442 443 auto e = sc.make_hot_observable({ 444 on.next(150, 1), 445 on.completed(210) 446 }); 447 448 auto n = sc.make_hot_observable({ 449 on.next(150, 1) 450 }); 451 452 WHEN("each int is combined with the latest from the other source"){ 453 454 auto res = w.start( 455 [&]() { 456 return e 457 .combine_latest( 458 [](int v2, int v1){ 459 return v2 + v1; 460 }, 461 n 462 ) 463 // forget type to workaround lambda deduction bug on msvc 2013 464 .as_dynamic(); 465 } 466 ); 467 468 THEN("the output is empty"){ 469 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 470 auto actual = res.get_observer().messages(); 471 REQUIRE(required == actual); 472 } 473 474 THEN("there was one subscription and one unsubscription to the e"){ 475 auto required = rxu::to_vector({ 476 on.subscribe(200, 210) 477 }); 478 auto actual = e.subscriptions(); 479 REQUIRE(required == actual); 480 } 481 482 THEN("there was one subscription and one unsubscription to the n"){ 483 auto required = rxu::to_vector({ 484 on.subscribe(200, 1000) 485 }); 486 auto actual = n.subscriptions(); 487 REQUIRE(required == actual); 488 } 489 } 490 } 491 } 492 493 SCENARIO("combine_latest empty/return", "[combine_latest][join][operators]"){ 494 GIVEN("2 hot observables of ints."){ 495 auto sc = rxsc::make_test(); 496 auto w = sc.create_worker(); 497 const rxsc::test::messages<int> on; 498 499 auto e = sc.make_hot_observable({ 500 on.next(150, 1), 501 on.completed(210) 502 }); 503 504 auto o = sc.make_hot_observable({ 505 on.next(150, 1), 506 on.next(215, 2), 507 on.completed(220) 508 }); 509 510 WHEN("each int is combined with the latest from the other source"){ 511 512 auto res = w.start( 513 [&]() { 514 return e 515 .combine_latest( 516 [](int v2, int v1){ 517 return v2 + v1; 518 }, 519 o 520 ) 521 // forget type to workaround lambda deduction bug on msvc 2013 522 .as_dynamic(); 523 } 524 ); 525 526 THEN("the output contains only complete message"){ 527 auto required = rxu::to_vector({ 528 on.completed(220) 529 }); 530 auto actual = res.get_observer().messages(); 531 REQUIRE(required == actual); 532 } 533 534 THEN("there was one subscription and one unsubscription to the e"){ 535 auto required = rxu::to_vector({ 536 on.subscribe(200, 210) 537 }); 538 auto actual = e.subscriptions(); 539 REQUIRE(required == actual); 540 } 541 542 THEN("there was one subscription and one unsubscription to the o"){ 543 auto required = rxu::to_vector({ 544 on.subscribe(200, 220) 545 }); 546 auto actual = o.subscriptions(); 547 REQUIRE(required == actual); 548 } 549 } 550 } 551 } 552 553 SCENARIO("combine_latest return/empty", "[combine_latest][join][operators]"){ 554 GIVEN("2 hot observables of ints."){ 555 auto sc = rxsc::make_test(); 556 auto w = sc.create_worker(); 557 const rxsc::test::messages<int> on; 558 559 auto o = sc.make_hot_observable({ 560 on.next(150, 1), 561 on.next(215, 2), 562 on.completed(220) 563 }); 564 565 auto e = sc.make_hot_observable({ 566 on.next(150, 1), 567 on.completed(210) 568 }); 569 570 WHEN("each int is combined with the latest from the other source"){ 571 572 auto res = w.start( 573 [&]() { 574 return o 575 .combine_latest( 576 [](int v2, int v1){ 577 return v2 + v1; 578 }, 579 e 580 ) 581 // forget type to workaround lambda deduction bug on msvc 2013 582 .as_dynamic(); 583 } 584 ); 585 586 THEN("the output contains only complete message"){ 587 auto required = rxu::to_vector({ 588 on.completed(220) 589 }); 590 auto actual = res.get_observer().messages(); 591 REQUIRE(required == actual); 592 } 593 594 THEN("there was one subscription and one unsubscription to the o"){ 595 auto required = rxu::to_vector({ 596 on.subscribe(200, 220) 597 }); 598 auto actual = o.subscriptions(); 599 REQUIRE(required == actual); 600 } 601 602 THEN("there was one subscription and one unsubscription to the e"){ 603 auto required = rxu::to_vector({ 604 on.subscribe(200, 210) 605 }); 606 auto actual = e.subscriptions(); 607 REQUIRE(required == actual); 608 } 609 } 610 } 611 } 612 613 SCENARIO("combine_latest never/return", "[combine_latest][join][operators]"){ 614 GIVEN("2 hot observables of ints."){ 615 auto sc = rxsc::make_test(); 616 auto w = sc.create_worker(); 617 const rxsc::test::messages<int> on; 618 619 auto n = sc.make_hot_observable({ 620 on.next(150, 1) 621 }); 622 623 auto o = sc.make_hot_observable({ 624 on.next(150, 1), 625 on.next(215, 2), 626 on.completed(220) 627 }); 628 629 WHEN("each int is combined with the latest from the other source"){ 630 631 auto res = w.start( 632 [&]() { 633 return n 634 .combine_latest( 635 [](int v2, int v1){ 636 return v2 + v1; 637 }, 638 o 639 ) 640 // forget type to workaround lambda deduction bug on msvc 2013 641 .as_dynamic(); 642 } 643 ); 644 645 THEN("the output is empty"){ 646 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 647 auto actual = res.get_observer().messages(); 648 REQUIRE(required == actual); 649 } 650 651 THEN("there was one subscription and one unsubscription to the n"){ 652 auto required = rxu::to_vector({ 653 on.subscribe(200, 1000) 654 }); 655 auto actual = n.subscriptions(); 656 REQUIRE(required == actual); 657 } 658 659 THEN("there was one subscription and one unsubscription to the o"){ 660 auto required = rxu::to_vector({ 661 on.subscribe(200, 220) 662 }); 663 auto actual = o.subscriptions(); 664 REQUIRE(required == actual); 665 } 666 } 667 } 668 } 669 670 SCENARIO("combine_latest return/never", "[combine_latest][join][operators]"){ 671 GIVEN("2 hot observables of ints."){ 672 auto sc = rxsc::make_test(); 673 auto w = sc.create_worker(); 674 const rxsc::test::messages<int> on; 675 676 auto o = sc.make_hot_observable({ 677 on.next(150, 1), 678 on.next(215, 2), 679 on.completed(220) 680 }); 681 682 auto n = sc.make_hot_observable({ 683 on.next(150, 1) 684 }); 685 686 WHEN("each int is combined with the latest from the other source"){ 687 688 auto res = w.start( 689 [&]() { 690 return o 691 .combine_latest( 692 [](int v2, int v1){ 693 return v2 + v1; 694 }, 695 n 696 ) 697 // forget type to workaround lambda deduction bug on msvc 2013 698 .as_dynamic(); 699 } 700 ); 701 702 THEN("the output is empty"){ 703 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 704 auto actual = res.get_observer().messages(); 705 REQUIRE(required == actual); 706 } 707 708 THEN("there was one subscription and one unsubscription to the n"){ 709 auto required = rxu::to_vector({ 710 on.subscribe(200, 1000) 711 }); 712 auto actual = n.subscriptions(); 713 REQUIRE(required == actual); 714 } 715 716 THEN("there was one subscription and one unsubscription to the o"){ 717 auto required = rxu::to_vector({ 718 on.subscribe(200, 220) 719 }); 720 auto actual = o.subscriptions(); 721 REQUIRE(required == actual); 722 } 723 } 724 } 725 } 726 727 728 SCENARIO("combine_latest return/return", "[combine_latest][join][operators]"){ 729 GIVEN("2 hot observables of ints."){ 730 auto sc = rxsc::make_test(); 731 auto w = sc.create_worker(); 732 const rxsc::test::messages<int> on; 733 734 auto o1 = sc.make_hot_observable({ 735 on.next(150, 1), 736 on.next(215, 2), 737 on.completed(230) 738 }); 739 740 auto o2 = sc.make_hot_observable({ 741 on.next(150, 1), 742 on.next(220, 3), 743 on.completed(240) 744 }); 745 746 WHEN("each int is combined with the latest from the other source"){ 747 748 auto res = w.start( 749 [&]() { 750 return o1 751 .combine_latest( 752 [](int v2, int v1){ 753 return v2 + v1; 754 }, 755 o2 756 ) 757 // forget type to workaround lambda deduction bug on msvc 2013 758 .as_dynamic(); 759 } 760 ); 761 762 THEN("the output contains combined ints"){ 763 auto required = rxu::to_vector({ 764 on.next(220, 2 + 3), 765 on.completed(240) 766 }); 767 auto actual = res.get_observer().messages(); 768 REQUIRE(required == actual); 769 } 770 771 THEN("there was one subscription and one unsubscription to the o1"){ 772 auto required = rxu::to_vector({ 773 on.subscribe(200, 230) 774 }); 775 auto actual = o1.subscriptions(); 776 REQUIRE(required == actual); 777 } 778 779 THEN("there was one subscription and one unsubscription to the o2"){ 780 auto required = rxu::to_vector({ 781 on.subscribe(200, 240) 782 }); 783 auto actual = o2.subscriptions(); 784 REQUIRE(required == actual); 785 } 786 } 787 } 788 } 789 790 SCENARIO("combine_latest empty/error", "[combine_latest][join][operators]"){ 791 GIVEN("2 hot observables of ints."){ 792 auto sc = rxsc::make_test(); 793 auto w = sc.create_worker(); 794 const rxsc::test::messages<int> on; 795 796 std::runtime_error ex("combine_latest on_error from source"); 797 798 auto emp = sc.make_hot_observable({ 799 on.next(150, 1), 800 on.completed(230) 801 }); 802 803 auto err = sc.make_hot_observable({ 804 on.next(150, 1), 805 on.error(220, ex) 806 }); 807 808 WHEN("each int is combined with the latest from the other source"){ 809 810 auto res = w.start( 811 [&]() { 812 return emp 813 .combine_latest( 814 [](int v2, int v1){ 815 return v2 + v1; 816 }, 817 err 818 ) 819 // forget type to workaround lambda deduction bug on msvc 2013 820 .as_dynamic(); 821 } 822 ); 823 824 THEN("the output contains only error message"){ 825 auto required = rxu::to_vector({ 826 on.error(220, ex) 827 }); 828 auto actual = res.get_observer().messages(); 829 REQUIRE(required == actual); 830 } 831 832 THEN("there was one subscription and one unsubscription to the emp"){ 833 auto required = rxu::to_vector({ 834 on.subscribe(200, 220) 835 }); 836 auto actual = emp.subscriptions(); 837 REQUIRE(required == actual); 838 } 839 840 THEN("there was one subscription and one unsubscription to the err"){ 841 auto required = rxu::to_vector({ 842 on.subscribe(200, 220) 843 }); 844 auto actual = err.subscriptions(); 845 REQUIRE(required == actual); 846 } 847 } 848 } 849 } 850 851 SCENARIO("combine_latest error/empty", "[combine_latest][join][operators]"){ 852 GIVEN("2 hot observables of ints."){ 853 auto sc = rxsc::make_test(); 854 auto w = sc.create_worker(); 855 const rxsc::test::messages<int> on; 856 857 std::runtime_error ex("combine_latest on_error from source"); 858 859 auto err = sc.make_hot_observable({ 860 on.next(150, 1), 861 on.error(220, ex) 862 }); 863 864 auto emp = sc.make_hot_observable({ 865 on.next(150, 1), 866 on.completed(230) 867 }); 868 869 WHEN("each int is combined with the latest from the other source"){ 870 871 auto res = w.start( 872 [&]() { 873 return err 874 .combine_latest( 875 [](int v2, int v1){ 876 return v2 + v1; 877 }, 878 emp 879 ) 880 // forget type to workaround lambda deduction bug on msvc 2013 881 .as_dynamic(); 882 } 883 ); 884 885 THEN("the output contains only error message"){ 886 auto required = rxu::to_vector({ 887 on.error(220, ex) 888 }); 889 auto actual = res.get_observer().messages(); 890 REQUIRE(required == actual); 891 } 892 893 THEN("there was one subscription and one unsubscription to the emp"){ 894 auto required = rxu::to_vector({ 895 on.subscribe(200, 220) 896 }); 897 auto actual = emp.subscriptions(); 898 REQUIRE(required == actual); 899 } 900 901 THEN("there was one subscription and one unsubscription to the err"){ 902 auto required = rxu::to_vector({ 903 on.subscribe(200, 220) 904 }); 905 auto actual = err.subscriptions(); 906 REQUIRE(required == actual); 907 } 908 } 909 } 910 } 911 912 SCENARIO("combine_latest return/error", "[combine_latest][join][operators]"){ 913 GIVEN("2 hot observables of ints."){ 914 auto sc = rxsc::make_test(); 915 auto w = sc.create_worker(); 916 const rxsc::test::messages<int> on; 917 918 std::runtime_error ex("combine_latest on_error from source"); 919 920 auto o = sc.make_hot_observable({ 921 on.next(150, 1), 922 on.next(210, 2), 923 on.completed(230) 924 }); 925 926 auto err = sc.make_hot_observable({ 927 on.next(150, 1), 928 on.error(220, ex) 929 }); 930 931 WHEN("each int is combined with the latest from the other source"){ 932 933 auto res = w.start( 934 [&]() { 935 return o 936 .combine_latest( 937 [](int v2, int v1){ 938 return v2 + v1; 939 }, 940 err 941 ) 942 // forget type to workaround lambda deduction bug on msvc 2013 943 .as_dynamic(); 944 } 945 ); 946 947 THEN("the output contains only error message"){ 948 auto required = rxu::to_vector({ 949 on.error(220, ex) 950 }); 951 auto actual = res.get_observer().messages(); 952 REQUIRE(required == actual); 953 } 954 955 THEN("there was one subscription and one unsubscription to the ret"){ 956 auto required = rxu::to_vector({ 957 on.subscribe(200, 220) 958 }); 959 auto actual = o.subscriptions(); 960 REQUIRE(required == actual); 961 } 962 963 THEN("there was one subscription and one unsubscription to the err"){ 964 auto required = rxu::to_vector({ 965 on.subscribe(200, 220) 966 }); 967 auto actual = err.subscriptions(); 968 REQUIRE(required == actual); 969 } 970 } 971 } 972 } 973 974 SCENARIO("combine_latest error/return", "[combine_latest][join][operators]"){ 975 GIVEN("2 hot observables of ints."){ 976 auto sc = rxsc::make_test(); 977 auto w = sc.create_worker(); 978 const rxsc::test::messages<int> on; 979 980 std::runtime_error ex("combine_latest on_error from source"); 981 982 auto err = sc.make_hot_observable({ 983 on.next(150, 1), 984 on.error(220, ex) 985 }); 986 987 auto ret = sc.make_hot_observable({ 988 on.next(150, 1), 989 on.next(210, 2), 990 on.completed(230) 991 }); 992 993 WHEN("each int is combined with the latest from the other source"){ 994 995 auto res = w.start( 996 [&]() { 997 return err 998 .combine_latest( 999 [](int v2, int v1){ 1000 return v2 + v1; 1001 }, 1002 ret 1003 ) 1004 // forget type to workaround lambda deduction bug on msvc 2013 1005 .as_dynamic(); 1006 } 1007 ); 1008 1009 THEN("the output contains only error message"){ 1010 auto required = rxu::to_vector({ 1011 on.error(220, ex) 1012 }); 1013 auto actual = res.get_observer().messages(); 1014 REQUIRE(required == actual); 1015 } 1016 1017 THEN("there was one subscription and one unsubscription to the ret"){ 1018 auto required = rxu::to_vector({ 1019 on.subscribe(200, 220) 1020 }); 1021 auto actual = ret.subscriptions(); 1022 REQUIRE(required == actual); 1023 } 1024 1025 THEN("there was one subscription and one unsubscription to the err"){ 1026 auto required = rxu::to_vector({ 1027 on.subscribe(200, 220) 1028 }); 1029 auto actual = err.subscriptions(); 1030 REQUIRE(required == actual); 1031 } 1032 } 1033 } 1034 } 1035 1036 SCENARIO("combine_latest error/error", "[combine_latest][join][operators]"){ 1037 GIVEN("2 hot observables of ints."){ 1038 auto sc = rxsc::make_test(); 1039 auto w = sc.create_worker(); 1040 const rxsc::test::messages<int> on; 1041 1042 std::runtime_error ex1("combine_latest on_error from source 1"); 1043 std::runtime_error ex2("combine_latest on_error from source 2"); 1044 1045 auto err1 = sc.make_hot_observable({ 1046 on.next(150, 1), 1047 on.error(220, ex1) 1048 }); 1049 1050 auto err2 = sc.make_hot_observable({ 1051 on.next(150, 1), 1052 on.error(230, ex2) 1053 }); 1054 1055 WHEN("each int is combined with the latest from the other source"){ 1056 1057 auto res = w.start( 1058 [&]() { 1059 return err1 1060 .combine_latest( 1061 [](int v2, int v1){ 1062 return v2 + v1; 1063 }, 1064 err2 1065 ) 1066 // forget type to workaround lambda deduction bug on msvc 2013 1067 .as_dynamic(); 1068 } 1069 ); 1070 1071 THEN("the output contains only error message"){ 1072 auto required = rxu::to_vector({ 1073 on.error(220, ex1) 1074 }); 1075 auto actual = res.get_observer().messages(); 1076 REQUIRE(required == actual); 1077 } 1078 1079 THEN("there was one subscription and one unsubscription to the err1"){ 1080 auto required = rxu::to_vector({ 1081 on.subscribe(200, 220) 1082 }); 1083 auto actual = err1.subscriptions(); 1084 REQUIRE(required == actual); 1085 } 1086 1087 THEN("there was one subscription and one unsubscription to the err2"){ 1088 auto required = rxu::to_vector({ 1089 on.subscribe(200, 220) 1090 }); 1091 auto actual = err2.subscriptions(); 1092 REQUIRE(required == actual); 1093 } 1094 } 1095 } 1096 } 1097 1098 SCENARIO("combine_latest next+error/error", "[combine_latest][join][operators]"){ 1099 GIVEN("2 hot observables of ints."){ 1100 auto sc = rxsc::make_test(); 1101 auto w = sc.create_worker(); 1102 const rxsc::test::messages<int> on; 1103 1104 std::runtime_error ex1("combine_latest on_error from source 1"); 1105 std::runtime_error ex2("combine_latest on_error from source 2"); 1106 1107 auto err1 = sc.make_hot_observable({ 1108 on.next(150, 1), 1109 on.next(210, 2), 1110 on.error(220, ex1) 1111 }); 1112 1113 auto err2 = sc.make_hot_observable({ 1114 on.next(150, 1), 1115 on.error(230, ex2) 1116 }); 1117 1118 WHEN("each int is combined with the latest from the other source"){ 1119 1120 auto res = w.start( 1121 [&]() { 1122 return err1 1123 .combine_latest( 1124 [](int v2, int v1){ 1125 return v2 + v1; 1126 }, 1127 err2 1128 ) 1129 // forget type to workaround lambda deduction bug on msvc 2013 1130 .as_dynamic(); 1131 } 1132 ); 1133 1134 THEN("the output contains only error message"){ 1135 auto required = rxu::to_vector({ 1136 on.error(220, ex1) 1137 }); 1138 auto actual = res.get_observer().messages(); 1139 REQUIRE(required == actual); 1140 } 1141 1142 THEN("there was one subscription and one unsubscription to the err1"){ 1143 auto required = rxu::to_vector({ 1144 on.subscribe(200, 220) 1145 }); 1146 auto actual = err1.subscriptions(); 1147 REQUIRE(required == actual); 1148 } 1149 1150 THEN("there was one subscription and one unsubscription to the err2"){ 1151 auto required = rxu::to_vector({ 1152 on.subscribe(200, 220) 1153 }); 1154 auto actual = err2.subscriptions(); 1155 REQUIRE(required == actual); 1156 } 1157 } 1158 } 1159 } 1160 1161 SCENARIO("combine_latest error/next+error", "[combine_latest][join][operators]"){ 1162 GIVEN("2 hot observables of ints."){ 1163 auto sc = rxsc::make_test(); 1164 auto w = sc.create_worker(); 1165 const rxsc::test::messages<int> on; 1166 1167 std::runtime_error ex1("combine_latest on_error from source 1"); 1168 std::runtime_error ex2("combine_latest on_error from source 2"); 1169 1170 auto err1 = sc.make_hot_observable({ 1171 on.next(150, 1), 1172 on.error(230, ex1) 1173 }); 1174 1175 auto err2 = sc.make_hot_observable({ 1176 on.next(150, 1), 1177 on.next(210, 2), 1178 on.error(220, ex2) 1179 }); 1180 1181 WHEN("each int is combined with the latest from the other source"){ 1182 1183 auto res = w.start( 1184 [&]() { 1185 return err1 1186 .combine_latest( 1187 [](int v2, int v1){ 1188 return v2 + v1; 1189 }, 1190 err2 1191 ) 1192 // forget type to workaround lambda deduction bug on msvc 2013 1193 .as_dynamic(); 1194 } 1195 ); 1196 1197 THEN("the output contains only error message"){ 1198 auto required = rxu::to_vector({ 1199 on.error(220, ex2) 1200 }); 1201 auto actual = res.get_observer().messages(); 1202 REQUIRE(required == actual); 1203 } 1204 1205 THEN("there was one subscription and one unsubscription to the err1"){ 1206 auto required = rxu::to_vector({ 1207 on.subscribe(200, 220) 1208 }); 1209 auto actual = err1.subscriptions(); 1210 REQUIRE(required == actual); 1211 } 1212 1213 THEN("there was one subscription and one unsubscription to the err2"){ 1214 auto required = rxu::to_vector({ 1215 on.subscribe(200, 220) 1216 }); 1217 auto actual = err2.subscriptions(); 1218 REQUIRE(required == actual); 1219 } 1220 } 1221 } 1222 } 1223 1224 SCENARIO("combine_latest never/error", "[combine_latest][join][operators]"){ 1225 GIVEN("2 hot observables of ints."){ 1226 auto sc = rxsc::make_test(); 1227 auto w = sc.create_worker(); 1228 const rxsc::test::messages<int> on; 1229 1230 std::runtime_error ex("combine_latest on_error from source"); 1231 1232 auto n = sc.make_hot_observable({ 1233 on.next(150, 1) 1234 }); 1235 1236 auto err = sc.make_hot_observable({ 1237 on.next(150, 1), 1238 on.error(220, ex) 1239 }); 1240 1241 WHEN("each int is combined with the latest from the other source"){ 1242 1243 auto res = w.start( 1244 [&]() { 1245 return n 1246 .combine_latest( 1247 [](int v2, int v1){ 1248 return v2 + v1; 1249 }, 1250 err 1251 ) 1252 // forget type to workaround lambda deduction bug on msvc 2013 1253 .as_dynamic(); 1254 } 1255 ); 1256 1257 THEN("the output contains only error message"){ 1258 auto required = rxu::to_vector({ 1259 on.error(220, ex) 1260 }); 1261 auto actual = res.get_observer().messages(); 1262 REQUIRE(required == actual); 1263 } 1264 1265 THEN("there was one subscription and one unsubscription to the n"){ 1266 auto required = rxu::to_vector({ 1267 on.subscribe(200, 220) 1268 }); 1269 auto actual = n.subscriptions(); 1270 REQUIRE(required == actual); 1271 } 1272 1273 THEN("there was one subscription and one unsubscription to the err"){ 1274 auto required = rxu::to_vector({ 1275 on.subscribe(200, 220) 1276 }); 1277 auto actual = err.subscriptions(); 1278 REQUIRE(required == actual); 1279 } 1280 } 1281 } 1282 } 1283 1284 SCENARIO("combine_latest error/never", "[combine_latest][join][operators]"){ 1285 GIVEN("2 hot observables of ints."){ 1286 auto sc = rxsc::make_test(); 1287 auto w = sc.create_worker(); 1288 const rxsc::test::messages<int> on; 1289 1290 std::runtime_error ex("combine_latest on_error from source"); 1291 1292 auto err = sc.make_hot_observable({ 1293 on.next(150, 1), 1294 on.error(220, ex) 1295 }); 1296 1297 auto n = sc.make_hot_observable({ 1298 on.next(150, 1) 1299 }); 1300 1301 WHEN("each int is combined with the latest from the other source"){ 1302 1303 auto res = w.start( 1304 [&]() { 1305 return err 1306 .combine_latest( 1307 [](int v2, int v1){ 1308 return v2 + v1; 1309 }, 1310 n 1311 ) 1312 // forget type to workaround lambda deduction bug on msvc 2013 1313 .as_dynamic(); 1314 } 1315 ); 1316 1317 THEN("the output contains only error message"){ 1318 auto required = rxu::to_vector({ 1319 on.error(220, ex) 1320 }); 1321 auto actual = res.get_observer().messages(); 1322 REQUIRE(required == actual); 1323 } 1324 1325 THEN("there was one subscription and one unsubscription to the n"){ 1326 auto required = rxu::to_vector({ 1327 on.subscribe(200, 220) 1328 }); 1329 auto actual = n.subscriptions(); 1330 REQUIRE(required == actual); 1331 } 1332 1333 THEN("there was one subscription and one unsubscription to the err"){ 1334 auto required = rxu::to_vector({ 1335 on.subscribe(200, 220) 1336 }); 1337 auto actual = err.subscriptions(); 1338 REQUIRE(required == actual); 1339 } 1340 } 1341 } 1342 } 1343 1344 SCENARIO("combine_latest error after completed left", "[combine_latest][join][operators]"){ 1345 GIVEN("2 hot observables of ints."){ 1346 auto sc = rxsc::make_test(); 1347 auto w = sc.create_worker(); 1348 const rxsc::test::messages<int> on; 1349 1350 std::runtime_error ex("combine_latest on_error from source"); 1351 1352 auto ret = sc.make_hot_observable({ 1353 on.next(150, 1), 1354 on.next(210, 2), 1355 on.completed(215) 1356 }); 1357 1358 auto err = sc.make_hot_observable({ 1359 on.next(150, 1), 1360 on.error(220, ex) 1361 }); 1362 1363 WHEN("each int is combined with the latest from the other source"){ 1364 1365 auto res = w.start( 1366 [&]() { 1367 return ret 1368 .combine_latest( 1369 [](int v2, int v1){ 1370 return v2 + v1; 1371 }, 1372 err 1373 ) 1374 // forget type to workaround lambda deduction bug on msvc 2013 1375 .as_dynamic(); 1376 } 1377 ); 1378 1379 THEN("the output contains only error message"){ 1380 auto required = rxu::to_vector({ 1381 on.error(220, ex) 1382 }); 1383 auto actual = res.get_observer().messages(); 1384 REQUIRE(required == actual); 1385 } 1386 1387 THEN("there was one subscription and one unsubscription to the ret"){ 1388 auto required = rxu::to_vector({ 1389 on.subscribe(200, 215) 1390 }); 1391 auto actual = ret.subscriptions(); 1392 REQUIRE(required == actual); 1393 } 1394 1395 THEN("there was one subscription and one unsubscription to the err"){ 1396 auto required = rxu::to_vector({ 1397 on.subscribe(200, 220) 1398 }); 1399 auto actual = err.subscriptions(); 1400 REQUIRE(required == actual); 1401 } 1402 } 1403 } 1404 } 1405 1406 SCENARIO("combine_latest error after completed right", "[combine_latest][join][operators]"){ 1407 GIVEN("2 hot observables of ints."){ 1408 auto sc = rxsc::make_test(); 1409 auto w = sc.create_worker(); 1410 const rxsc::test::messages<int> on; 1411 1412 std::runtime_error ex("combine_latest on_error from source"); 1413 1414 auto err = sc.make_hot_observable({ 1415 on.next(150, 1), 1416 on.error(220, ex) 1417 }); 1418 1419 auto ret = sc.make_hot_observable({ 1420 on.next(150, 1), 1421 on.next(210, 2), 1422 on.completed(215) 1423 }); 1424 1425 WHEN("each int is combined with the latest from the other source"){ 1426 1427 auto res = w.start( 1428 [&]() { 1429 return err 1430 .combine_latest( 1431 [](int v2, int v1){ 1432 return v2 + v1; 1433 }, 1434 ret 1435 ) 1436 // forget type to workaround lambda deduction bug on msvc 2013 1437 .as_dynamic(); 1438 } 1439 ); 1440 1441 THEN("the output contains only error message"){ 1442 auto required = rxu::to_vector({ 1443 on.error(220, ex) 1444 }); 1445 auto actual = res.get_observer().messages(); 1446 REQUIRE(required == actual); 1447 } 1448 1449 THEN("there was one subscription and one unsubscription to the ret"){ 1450 auto required = rxu::to_vector({ 1451 on.subscribe(200, 215) 1452 }); 1453 auto actual = ret.subscriptions(); 1454 REQUIRE(required == actual); 1455 } 1456 1457 THEN("there was one subscription and one unsubscription to the err"){ 1458 auto required = rxu::to_vector({ 1459 on.subscribe(200, 220) 1460 }); 1461 auto actual = err.subscriptions(); 1462 REQUIRE(required == actual); 1463 } 1464 } 1465 } 1466 } 1467 1468 SCENARIO("combine_latest selector throws", "[combine_latest][join][operators][!throws]"){ 1469 GIVEN("2 hot observables of ints."){ 1470 auto sc = rxsc::make_test(); 1471 auto w = sc.create_worker(); 1472 const rxsc::test::messages<int> on; 1473 1474 std::runtime_error ex("combine_latest on_error from source"); 1475 1476 auto o1 = sc.make_hot_observable({ 1477 on.next(150, 1), 1478 on.next(215, 2), 1479 on.completed(230) 1480 }); 1481 1482 auto o2 = sc.make_hot_observable({ 1483 on.next(150, 1), 1484 on.next(220, 3), 1485 on.completed(240) 1486 }); 1487 1488 WHEN("each int is combined with the latest from the other source"){ 1489 1490 auto res = w.start( 1491 [&]() { 1492 return o1 1493 .combine_latest( 1494 // Note for trying to handle this test case when exceptions are disabled 1495 // with RXCPP_USE_EXCEPTIONS == 0: 1496 // 1497 // It seems that this test is in particular testing that the 1498 // combine_latest selector (aggregate function) thrown exceptions 1499 // are being translated into an on_error. 1500 // 1501 // Since there appears to be no way to give combine_latest 1502 // an Observable that would call on_error directly (as opposed 1503 // to a regular function that's converted into an observable), 1504 // this test is meaningless when exceptions are disabled 1505 // since any selectors with 'throw' will not even compile. 1506 // 1507 // Attempting to change this to e.g. 1508 // o1.combineLatest(o2).map ... unconditional onError 1509 // would defeat the purpose of the test since its the combineLatest 1510 // implementation that's supposed to be doing the error forwarding. 1511 [&ex](int, int) -> int { 1512 rxu::throw_exception(ex); 1513 }, 1514 o2 1515 ) 1516 // forget type to workaround lambda deduction bug on msvc 2013 1517 .as_dynamic(); 1518 } 1519 ); 1520 1521 THEN("the output contains only error"){ 1522 auto required = rxu::to_vector({ 1523 on.error(220, ex) 1524 }); 1525 auto actual = res.get_observer().messages(); 1526 REQUIRE(required == actual); 1527 } 1528 1529 THEN("there was one subscription and one unsubscription to the o1"){ 1530 auto required = rxu::to_vector({ 1531 on.subscribe(200, 220) 1532 }); 1533 auto actual = o1.subscriptions(); 1534 REQUIRE(required == actual); 1535 } 1536 1537 THEN("there was one subscription and one unsubscription to the o2"){ 1538 auto required = rxu::to_vector({ 1539 on.subscribe(200, 220) 1540 }); 1541 auto actual = o2.subscriptions(); 1542 REQUIRE(required == actual); 1543 } 1544 } 1545 } 1546 } 1547 1548 SCENARIO("combine_latest selector throws N", "[combine_latest][join][operators][!throws]"){ 1549 GIVEN("N hot observables of ints."){ 1550 auto sc = rxsc::make_test(); 1551 auto w = sc.create_worker(); 1552 const rxsc::test::messages<int> on; 1553 1554 const int N = 4; 1555 1556 std::runtime_error ex("combine_latest on_error from source"); 1557 1558 std::vector<rxcpp::test::testable_observable<int>> e; 1559 for (int i = 0; i < N; ++i) { 1560 e.push_back( 1561 sc.make_hot_observable({ 1562 on.next(210 + 10 * i, 1), 1563 on.completed(500) 1564 }) 1565 ); 1566 } 1567 1568 WHEN("each int is combined with the latest from the other source"){ 1569 1570 auto res = w.start( 1571 [&]() { 1572 return e[0] 1573 .combine_latest( 1574 [&ex](int, int, int, int) -> int { 1575 rxu::throw_exception(ex); 1576 }, 1577 e[1], e[2], e[3] 1578 ) 1579 // forget type to workaround lambda deduction bug on msvc 2013 1580 .as_dynamic(); 1581 } 1582 ); 1583 1584 THEN("the output contains only error"){ 1585 auto required = rxu::to_vector({ 1586 on.error(200 + 10 * N, ex) 1587 }); 1588 auto actual = res.get_observer().messages(); 1589 REQUIRE(required == actual); 1590 } 1591 1592 THEN("there was one subscription and one unsubscription to each observable"){ 1593 1594 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){ 1595 auto required = rxu::to_vector({ 1596 on.subscribe(200, 200 + 10 * N) 1597 }); 1598 auto actual = s.subscriptions(); 1599 REQUIRE(required == actual); 1600 }); 1601 } 1602 } 1603 } 1604 } 1605 1606 SCENARIO("combine_latest typical N", "[combine_latest][join][operators]"){ 1607 GIVEN("N hot observables of ints."){ 1608 auto sc = rxsc::make_test(); 1609 auto w = sc.create_worker(); 1610 const rxsc::test::messages<int> on; 1611 1612 const int N = 4; 1613 1614 std::vector<rxcpp::test::testable_observable<int>> o; 1615 for (int i = 0; i < N; ++i) { 1616 o.push_back( 1617 sc.make_hot_observable({ 1618 on.next(150, 1), 1619 on.next(210 + 10 * i, i + 1), 1620 on.next(410 + 10 * i, i + N + 1), 1621 on.completed(800) 1622 }) 1623 ); 1624 } 1625 1626 WHEN("each int is combined with the latest from the other source"){ 1627 1628 auto res = w.start( 1629 [&]() { 1630 return o[0] 1631 .combine_latest( 1632 [](int v0, int v1, int v2, int v3) { 1633 return v0 + v1 + v2 + v3; 1634 }, 1635 o[1], o[2], o[3] 1636 ) 1637 // forget type to workaround lambda deduction bug on msvc 2013 1638 .as_dynamic(); 1639 } 1640 ); 1641 1642 THEN("the output contains combined ints"){ 1643 auto required = rxu::to_vector({ 1644 on.next(200 + 10 * N, N * (N + 1) / 2) 1645 }); 1646 for (int i = 0; i < N; ++i) { 1647 required.push_back(on.next(410 + 10 * i, N * (N + 1) / 2 + N + N * i)); 1648 } 1649 required.push_back(on.completed(800)); 1650 auto actual = res.get_observer().messages(); 1651 REQUIRE(required == actual); 1652 } 1653 1654 THEN("there was one subscription and one unsubscription to each observable"){ 1655 1656 std::for_each(o.begin(), o.end(), [&](rxcpp::test::testable_observable<int> &s){ 1657 auto required = rxu::to_vector({ 1658 on.subscribe(200, 800) 1659 }); 1660 auto actual = s.subscriptions(); 1661 REQUIRE(required == actual); 1662 }); 1663 } 1664 } 1665 } 1666 } 1667