Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include "rxcpp/operators/rx-combine_latest.hpp"
      3 
      4 SCENARIO("combine_latest interleaved with tail", "[combine_latest][join][operators]"){
      5     GIVEN("2 hot observables of ints."){
      6         auto sc = rxsc::make_test();
      7         auto w = sc.create_worker();
      8         const rxsc::test::messages<int> on;
      9 
     10         auto o1 = sc.make_hot_observable({
     11             on.next(150, 1),
     12             on.next(215, 2),
     13             on.next(225, 4),
     14             on.completed(230)
     15         });
     16 
     17         auto o2 = sc.make_hot_observable({
     18             on.next(150, 1),
     19             on.next(220, 3),
     20             on.next(230, 5),
     21             on.next(235, 6),
     22             on.next(240, 7),
     23             on.completed(250)
     24         });
     25 
     26         WHEN("each int is combined with the latest from the other source"){
     27 
     28             auto res = w.start(
     29                 [&]() {
     30                     return o2
     31                         .combine_latest(
     32                             [](int v2, int v1){
     33                                 return v2 + v1;
     34                             },
     35                             o1
     36                         )
     37                         // forget type to workaround lambda deduction bug on msvc 2013
     38                         .as_dynamic();
     39                 }
     40             );
     41 
     42             THEN("the output contains combined ints"){
     43                 auto required = rxu::to_vector({
     44                     on.next(220, 2 + 3),
     45                     on.next(225, 4 + 3),
     46                     on.next(230, 4 + 5),
     47                     on.next(235, 4 + 6),
     48                     on.next(240, 4 + 7),
     49                     on.completed(250)
     50                 });
     51                 auto actual = res.get_observer().messages();
     52                 REQUIRE(required == actual);
     53             }
     54 
     55             THEN("there was one subscription and one unsubscription to the o1"){
     56                 auto required = rxu::to_vector({
     57                     on.subscribe(200, 230)
     58                 });
     59                 auto actual = o1.subscriptions();
     60                 REQUIRE(required == actual);
     61             }
     62 
     63             THEN("there was one subscription and one unsubscription to the o2"){
     64                 auto required = rxu::to_vector({
     65                     on.subscribe(200, 250)
     66                 });
     67                 auto actual = o2.subscriptions();
     68                 REQUIRE(required == actual);
     69             }
     70         }
     71     }
     72 }
     73 
     74 SCENARIO("combine_latest consecutive", "[combine_latest][join][operators]"){
     75     GIVEN("2 hot observables of ints."){
     76         auto sc = rxsc::make_test();
     77         auto w = sc.create_worker();
     78         const rxsc::test::messages<int> on;
     79 
     80         auto o1 = sc.make_hot_observable({
     81             on.next(150, 1),
     82             on.next(215, 2),
     83             on.next(225, 4),
     84             on.completed(230)
     85         });
     86 
     87         auto o2 = sc.make_hot_observable({
     88             on.next(150, 1),
     89             on.next(235, 6),
     90             on.next(240, 7),
     91             on.completed(250)
     92         });
     93 
     94         WHEN("each int is combined with the latest from the other source"){
     95 
     96             auto res = w.start(
     97                 [&]() {
     98                     return o2
     99                         .combine_latest(
    100                             [](int v2, int v1){
    101                                 return v2 + v1;
    102                             },
    103                             o1
    104                         )
    105                         // forget type to workaround lambda deduction bug on msvc 2013
    106                         .as_dynamic();
    107                 }
    108             );
    109 
    110             THEN("the output contains combined ints"){
    111                 auto required = rxu::to_vector({
    112                     on.next(235, 4 + 6),
    113                     on.next(240, 4 + 7),
    114                     on.completed(250)
    115                 });
    116                 auto actual = res.get_observer().messages();
    117                 REQUIRE(required == actual);
    118             }
    119 
    120             THEN("there was one subscription and one unsubscription to the o1"){
    121                 auto required = rxu::to_vector({
    122                     on.subscribe(200, 230)
    123                 });
    124                 auto actual = o1.subscriptions();
    125                 REQUIRE(required == actual);
    126             }
    127 
    128             THEN("there was one subscription and one unsubscription to the o2"){
    129                 auto required = rxu::to_vector({
    130                     on.subscribe(200, 250)
    131                 });
    132                 auto actual = o2.subscriptions();
    133                 REQUIRE(required == actual);
    134             }
    135         }
    136     }
    137 }
    138 
    139 SCENARIO("combine_latest consecutive ends with error left", "[combine_latest][join][operators]"){
    140     GIVEN("2 hot observables of ints."){
    141         auto sc = rxsc::make_test();
    142         auto w = sc.create_worker();
    143         const rxsc::test::messages<int> on;
    144 
    145         std::runtime_error ex("combine_latest on_error from source");
    146 
    147         auto o1 = sc.make_hot_observable({
    148             on.next(150, 1),
    149             on.next(215, 2),
    150             on.next(225, 4),
    151             on.error(230, ex)
    152         });
    153 
    154         auto o2 = sc.make_hot_observable({
    155             on.next(150, 1),
    156             on.next(235, 6),
    157             on.next(240, 7),
    158             on.completed(250)
    159         });
    160 
    161         WHEN("each int is combined with the latest from the other source"){
    162 
    163             auto res = w.start(
    164                 [&]() {
    165                     return o2
    166                         .combine_latest(
    167                             [](int v2, int v1){
    168                                 return v2 + v1;
    169                             },
    170                             o1
    171                         )
    172                         // forget type to workaround lambda deduction bug on msvc 2013
    173                         .as_dynamic();
    174                 }
    175             );
    176 
    177             THEN("the output contains only an error"){
    178                 auto required = rxu::to_vector({
    179                     on.error(230, ex)
    180                 });
    181                 auto actual = res.get_observer().messages();
    182                 REQUIRE(required == actual);
    183             }
    184 
    185             THEN("there was one subscription and one unsubscription to the o1"){
    186                 auto required = rxu::to_vector({
    187                     on.subscribe(200, 230)
    188                 });
    189                 auto actual = o1.subscriptions();
    190                 REQUIRE(required == actual);
    191             }
    192 
    193             THEN("there was one subscription and one unsubscription to the o2"){
    194                 auto required = rxu::to_vector({
    195                     on.subscribe(200, 230)
    196                 });
    197                 auto actual = o2.subscriptions();
    198                 REQUIRE(required == actual);
    199             }
    200         }
    201     }
    202 }
    203 
    204 SCENARIO("combine_latest consecutive ends with error right", "[combine_latest][join][operators]"){
    205     GIVEN("2 hot observables of ints."){
    206         auto sc = rxsc::make_test();
    207         auto w = sc.create_worker();
    208         const rxsc::test::messages<int> on;
    209 
    210         std::runtime_error ex("combine_latest on_error from source");
    211 
    212         auto o1 = sc.make_hot_observable({
    213             on.next(150, 1),
    214             on.next(215, 2),
    215             on.next(225, 4),
    216             on.completed(250)
    217         });
    218 
    219         auto o2 = sc.make_hot_observable({
    220             on.next(150, 1),
    221             on.next(235, 6),
    222             on.next(240, 7),
    223             on.error(245, ex)
    224         });
    225 
    226         WHEN("each int is combined with the latest from the other source"){
    227 
    228             auto res = w.start(
    229                 [&]() {
    230                     return o2
    231                         .combine_latest(
    232                             [](int v2, int v1){
    233                                 return v2 + v1;
    234                             },
    235                             o1
    236                         )
    237                         // forget type to workaround lambda deduction bug on msvc 2013
    238                         .as_dynamic();
    239                 }
    240             );
    241 
    242             THEN("the output contains combined ints followed by an error"){
    243                 auto required = rxu::to_vector({
    244                     on.next(235, 4 + 6),
    245                     on.next(240, 4 + 7),
    246                     on.error(245, ex)
    247                 });
    248                 auto actual = res.get_observer().messages();
    249                 REQUIRE(required == actual);
    250             }
    251 
    252             THEN("there was one subscription and one unsubscription to the o1"){
    253                 auto required = rxu::to_vector({
    254                     on.subscribe(200, 245)
    255                 });
    256                 auto actual = o1.subscriptions();
    257                 REQUIRE(required == actual);
    258             }
    259 
    260             THEN("there was one subscription and one unsubscription to the o2"){
    261                 auto required = rxu::to_vector({
    262                     on.subscribe(200, 245)
    263                 });
    264                 auto actual = o2.subscriptions();
    265                 REQUIRE(required == actual);
    266             }
    267         }
    268     }
    269 }
    270 
    271 SCENARIO("combine_latest never N", "[combine_latest][join][operators]"){
    272     GIVEN("N never completed hot observables of ints."){
    273         auto sc = rxsc::make_test();
    274         auto w = sc.create_worker();
    275         const rxsc::test::messages<int> on;
    276 
    277         const int N = 4;
    278 
    279         std::vector<rxcpp::test::testable_observable<int>> n;
    280         for (int i = 0; i < N; ++i) {
    281             n.push_back(
    282                 sc.make_hot_observable({
    283                     on.next(150, 1)
    284                 })
    285             );
    286         }
    287 
    288         WHEN("each int is combined with the latest from the other source"){
    289 
    290             auto res = w.start(
    291                 [&]() {
    292                     return n[0]
    293                         .combine_latest(
    294                             [](int v0, int v1, int v2, int v3){
    295                                 return v0 + v1 + v2 + v3;
    296                             },
    297                             n[1], n[2], n[3]
    298                         )
    299                         // forget type to workaround lambda deduction bug on msvc 2013
    300                         .as_dynamic();
    301                 }
    302             );
    303 
    304             THEN("the output is empty"){
    305                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    306                 auto actual = res.get_observer().messages();
    307                 REQUIRE(required == actual);
    308             }
    309 
    310             THEN("there was one subscription and one unsubscription to each observable"){
    311 
    312                 std::for_each(n.begin(), n.end(), [&](rxcpp::test::testable_observable<int> &s){
    313                     auto required = rxu::to_vector({
    314                         on.subscribe(200, 1000)
    315                     });
    316                     auto actual = s.subscriptions();
    317                     REQUIRE(required == actual);
    318                 });
    319             }
    320         }
    321     }
    322 }
    323 
    324 SCENARIO("combine_latest empty N", "[combine_latest][join][operators]"){
    325     GIVEN("N empty hot observables of ints."){
    326         auto sc = rxsc::make_test();
    327         auto w = sc.create_worker();
    328         const rxsc::test::messages<int> on;
    329 
    330         const int N = 4;
    331 
    332         std::vector<rxcpp::test::testable_observable<int>> e;
    333         for (int i = 0; i < N; ++i) {
    334             e.push_back(
    335                 sc.make_hot_observable({
    336                     on.next(150, 1),
    337                     on.completed(210 + 10 * i)
    338                 })
    339             );
    340         }
    341 
    342         WHEN("each int is combined with the latest from the other source"){
    343 
    344             auto res = w.start(
    345                 [&]() {
    346                     return e[0]
    347                         .combine_latest(
    348                             [](int v0, int v1, int v2, int v3){
    349                                 return v0 + v1 + v2 + v3;
    350                             },
    351                             e[1], e[2], e[3]
    352                         )
    353                         // forget type to workaround lambda deduction bug on msvc 2013
    354                         .as_dynamic();
    355                 }
    356             );
    357 
    358             THEN("the output contains only complete message"){
    359                 auto required = rxu::to_vector({
    360                     on.completed(200 + 10 * N)
    361                 });
    362                 auto actual = res.get_observer().messages();
    363                 REQUIRE(required == actual);
    364             }
    365 
    366             THEN("there was one subscription and one unsubscription to each observable"){
    367 
    368                 int i = 0;
    369                 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){
    370                     auto required = rxu::to_vector({
    371                         on.subscribe(200, 200 + 10 * ++i)
    372                     });
    373                     auto actual = s.subscriptions();
    374                     REQUIRE(required == actual);
    375                 });
    376             }
    377         }
    378     }
    379 }
    380 
    381 SCENARIO("combine_latest never/empty", "[combine_latest][join][operators]"){
    382     GIVEN("2 hot observables of ints."){
    383         auto sc = rxsc::make_test();
    384         auto w = sc.create_worker();
    385         const rxsc::test::messages<int> on;
    386 
    387         auto n = sc.make_hot_observable({
    388             on.next(150, 1)
    389         });
    390 
    391         auto e = sc.make_hot_observable({
    392             on.next(150, 1),
    393             on.completed(210)
    394         });
    395 
    396         WHEN("each int is combined with the latest from the other source"){
    397 
    398             auto res = w.start(
    399                 [&]() {
    400                     return n
    401                         .combine_latest(
    402                             [](int v2, int v1){
    403                                 return v2 + v1;
    404                             },
    405                             e
    406                         )
    407                         // forget type to workaround lambda deduction bug on msvc 2013
    408                         .as_dynamic();
    409                 }
    410             );
    411 
    412             THEN("the output is empty"){
    413                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    414                 auto actual = res.get_observer().messages();
    415                 REQUIRE(required == actual);
    416             }
    417 
    418             THEN("there was one subscription and one unsubscription to the n"){
    419                 auto required = rxu::to_vector({
    420                     on.subscribe(200, 1000)
    421                 });
    422                 auto actual = n.subscriptions();
    423                 REQUIRE(required == actual);
    424             }
    425 
    426             THEN("there was one subscription and one unsubscription to the e"){
    427                 auto required = rxu::to_vector({
    428                     on.subscribe(200, 210)
    429                 });
    430                 auto actual = e.subscriptions();
    431                 REQUIRE(required == actual);
    432             }
    433         }
    434     }
    435 }
    436 
    437 SCENARIO("combine_latest empty/never", "[combine_latest][join][operators]"){
    438     GIVEN("2 hot observables of ints."){
    439         auto sc = rxsc::make_test();
    440         auto w = sc.create_worker();
    441         const rxsc::test::messages<int> on;
    442 
    443         auto e = sc.make_hot_observable({
    444             on.next(150, 1),
    445             on.completed(210)
    446         });
    447 
    448         auto n = sc.make_hot_observable({
    449             on.next(150, 1)
    450         });
    451 
    452         WHEN("each int is combined with the latest from the other source"){
    453 
    454             auto res = w.start(
    455                 [&]() {
    456                     return e
    457                         .combine_latest(
    458                             [](int v2, int v1){
    459                                 return v2 + v1;
    460                             },
    461                             n
    462                         )
    463                         // forget type to workaround lambda deduction bug on msvc 2013
    464                         .as_dynamic();
    465                 }
    466             );
    467 
    468             THEN("the output is empty"){
    469                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    470                 auto actual = res.get_observer().messages();
    471                 REQUIRE(required == actual);
    472             }
    473 
    474             THEN("there was one subscription and one unsubscription to the e"){
    475                 auto required = rxu::to_vector({
    476                     on.subscribe(200, 210)
    477                 });
    478                 auto actual = e.subscriptions();
    479                 REQUIRE(required == actual);
    480             }
    481 
    482             THEN("there was one subscription and one unsubscription to the n"){
    483                 auto required = rxu::to_vector({
    484                     on.subscribe(200, 1000)
    485                 });
    486                 auto actual = n.subscriptions();
    487                 REQUIRE(required == actual);
    488             }
    489         }
    490     }
    491 }
    492 
    493 SCENARIO("combine_latest empty/return", "[combine_latest][join][operators]"){
    494     GIVEN("2 hot observables of ints."){
    495         auto sc = rxsc::make_test();
    496         auto w = sc.create_worker();
    497         const rxsc::test::messages<int> on;
    498 
    499         auto e = sc.make_hot_observable({
    500             on.next(150, 1),
    501             on.completed(210)
    502         });
    503 
    504         auto o = sc.make_hot_observable({
    505             on.next(150, 1),
    506             on.next(215, 2),
    507             on.completed(220)
    508         });
    509 
    510         WHEN("each int is combined with the latest from the other source"){
    511 
    512             auto res = w.start(
    513                 [&]() {
    514                     return e
    515                         .combine_latest(
    516                             [](int v2, int v1){
    517                                 return v2 + v1;
    518                             },
    519                             o
    520                         )
    521                         // forget type to workaround lambda deduction bug on msvc 2013
    522                         .as_dynamic();
    523                 }
    524             );
    525 
    526             THEN("the output contains only complete message"){
    527                 auto required = rxu::to_vector({
    528                     on.completed(220)
    529                 });
    530                 auto actual = res.get_observer().messages();
    531                 REQUIRE(required == actual);
    532             }
    533 
    534             THEN("there was one subscription and one unsubscription to the e"){
    535                 auto required = rxu::to_vector({
    536                     on.subscribe(200, 210)
    537                 });
    538                 auto actual = e.subscriptions();
    539                 REQUIRE(required == actual);
    540             }
    541 
    542             THEN("there was one subscription and one unsubscription to the o"){
    543                 auto required = rxu::to_vector({
    544                     on.subscribe(200, 220)
    545                 });
    546                 auto actual = o.subscriptions();
    547                 REQUIRE(required == actual);
    548             }
    549         }
    550     }
    551 }
    552 
    553 SCENARIO("combine_latest return/empty", "[combine_latest][join][operators]"){
    554     GIVEN("2 hot observables of ints."){
    555         auto sc = rxsc::make_test();
    556         auto w = sc.create_worker();
    557         const rxsc::test::messages<int> on;
    558 
    559         auto o = sc.make_hot_observable({
    560             on.next(150, 1),
    561             on.next(215, 2),
    562             on.completed(220)
    563         });
    564 
    565         auto e = sc.make_hot_observable({
    566             on.next(150, 1),
    567             on.completed(210)
    568         });
    569 
    570         WHEN("each int is combined with the latest from the other source"){
    571 
    572             auto res = w.start(
    573                 [&]() {
    574                     return o
    575                         .combine_latest(
    576                             [](int v2, int v1){
    577                                 return v2 + v1;
    578                             },
    579                             e
    580                         )
    581                         // forget type to workaround lambda deduction bug on msvc 2013
    582                         .as_dynamic();
    583                 }
    584             );
    585 
    586             THEN("the output contains only complete message"){
    587                 auto required = rxu::to_vector({
    588                     on.completed(220)
    589                 });
    590                 auto actual = res.get_observer().messages();
    591                 REQUIRE(required == actual);
    592             }
    593 
    594             THEN("there was one subscription and one unsubscription to the o"){
    595                 auto required = rxu::to_vector({
    596                     on.subscribe(200, 220)
    597                 });
    598                 auto actual = o.subscriptions();
    599                 REQUIRE(required == actual);
    600             }
    601 
    602             THEN("there was one subscription and one unsubscription to the e"){
    603                 auto required = rxu::to_vector({
    604                     on.subscribe(200, 210)
    605                 });
    606                 auto actual = e.subscriptions();
    607                 REQUIRE(required == actual);
    608             }
    609         }
    610     }
    611 }
    612 
    613 SCENARIO("combine_latest never/return", "[combine_latest][join][operators]"){
    614     GIVEN("2 hot observables of ints."){
    615         auto sc = rxsc::make_test();
    616         auto w = sc.create_worker();
    617         const rxsc::test::messages<int> on;
    618 
    619         auto n = sc.make_hot_observable({
    620             on.next(150, 1)
    621         });
    622 
    623         auto o = sc.make_hot_observable({
    624             on.next(150, 1),
    625             on.next(215, 2),
    626             on.completed(220)
    627         });
    628 
    629         WHEN("each int is combined with the latest from the other source"){
    630 
    631             auto res = w.start(
    632                 [&]() {
    633                     return n
    634                         .combine_latest(
    635                             [](int v2, int v1){
    636                                 return v2 + v1;
    637                             },
    638                             o
    639                         )
    640                         // forget type to workaround lambda deduction bug on msvc 2013
    641                         .as_dynamic();
    642                 }
    643             );
    644 
    645             THEN("the output is empty"){
    646                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    647                 auto actual = res.get_observer().messages();
    648                 REQUIRE(required == actual);
    649             }
    650 
    651             THEN("there was one subscription and one unsubscription to the n"){
    652                 auto required = rxu::to_vector({
    653                     on.subscribe(200, 1000)
    654                 });
    655                 auto actual = n.subscriptions();
    656                 REQUIRE(required == actual);
    657             }
    658 
    659             THEN("there was one subscription and one unsubscription to the o"){
    660                 auto required = rxu::to_vector({
    661                     on.subscribe(200, 220)
    662                 });
    663                 auto actual = o.subscriptions();
    664                 REQUIRE(required == actual);
    665             }
    666         }
    667     }
    668 }
    669 
    670 SCENARIO("combine_latest return/never", "[combine_latest][join][operators]"){
    671     GIVEN("2 hot observables of ints."){
    672         auto sc = rxsc::make_test();
    673         auto w = sc.create_worker();
    674         const rxsc::test::messages<int> on;
    675 
    676         auto o = sc.make_hot_observable({
    677             on.next(150, 1),
    678             on.next(215, 2),
    679             on.completed(220)
    680         });
    681 
    682         auto n = sc.make_hot_observable({
    683             on.next(150, 1)
    684         });
    685 
    686         WHEN("each int is combined with the latest from the other source"){
    687 
    688             auto res = w.start(
    689                 [&]() {
    690                     return o
    691                         .combine_latest(
    692                             [](int v2, int v1){
    693                                 return v2 + v1;
    694                             },
    695                             n
    696                         )
    697                         // forget type to workaround lambda deduction bug on msvc 2013
    698                         .as_dynamic();
    699                 }
    700             );
    701 
    702             THEN("the output is empty"){
    703                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    704                 auto actual = res.get_observer().messages();
    705                 REQUIRE(required == actual);
    706             }
    707 
    708             THEN("there was one subscription and one unsubscription to the n"){
    709                 auto required = rxu::to_vector({
    710                     on.subscribe(200, 1000)
    711                 });
    712                 auto actual = n.subscriptions();
    713                 REQUIRE(required == actual);
    714             }
    715 
    716             THEN("there was one subscription and one unsubscription to the o"){
    717                 auto required = rxu::to_vector({
    718                     on.subscribe(200, 220)
    719                 });
    720                 auto actual = o.subscriptions();
    721                 REQUIRE(required == actual);
    722             }
    723         }
    724     }
    725 }
    726 
    727 
    728 SCENARIO("combine_latest return/return", "[combine_latest][join][operators]"){
    729     GIVEN("2 hot observables of ints."){
    730         auto sc = rxsc::make_test();
    731         auto w = sc.create_worker();
    732         const rxsc::test::messages<int> on;
    733 
    734         auto o1 = sc.make_hot_observable({
    735             on.next(150, 1),
    736             on.next(215, 2),
    737             on.completed(230)
    738         });
    739 
    740         auto o2 = sc.make_hot_observable({
    741             on.next(150, 1),
    742             on.next(220, 3),
    743             on.completed(240)
    744         });
    745 
    746         WHEN("each int is combined with the latest from the other source"){
    747 
    748             auto res = w.start(
    749                 [&]() {
    750                     return o1
    751                         .combine_latest(
    752                             [](int v2, int v1){
    753                              return v2 + v1;
    754                             },
    755                             o2
    756                         )
    757                         // forget type to workaround lambda deduction bug on msvc 2013
    758                         .as_dynamic();
    759                 }
    760             );
    761 
    762             THEN("the output contains combined ints"){
    763                 auto required = rxu::to_vector({
    764                     on.next(220, 2 + 3),
    765                     on.completed(240)
    766                 });
    767                 auto actual = res.get_observer().messages();
    768                 REQUIRE(required == actual);
    769             }
    770 
    771             THEN("there was one subscription and one unsubscription to the o1"){
    772                 auto required = rxu::to_vector({
    773                     on.subscribe(200, 230)
    774                 });
    775                 auto actual = o1.subscriptions();
    776                 REQUIRE(required == actual);
    777             }
    778 
    779             THEN("there was one subscription and one unsubscription to the o2"){
    780                 auto required = rxu::to_vector({
    781                     on.subscribe(200, 240)
    782                 });
    783                 auto actual = o2.subscriptions();
    784                 REQUIRE(required == actual);
    785             }
    786         }
    787     }
    788 }
    789 
    790 SCENARIO("combine_latest empty/error", "[combine_latest][join][operators]"){
    791     GIVEN("2 hot observables of ints."){
    792         auto sc = rxsc::make_test();
    793         auto w = sc.create_worker();
    794         const rxsc::test::messages<int> on;
    795 
    796         std::runtime_error ex("combine_latest on_error from source");
    797 
    798         auto emp = sc.make_hot_observable({
    799             on.next(150, 1),
    800             on.completed(230)
    801         });
    802 
    803         auto err = sc.make_hot_observable({
    804             on.next(150, 1),
    805             on.error(220, ex)
    806         });
    807 
    808         WHEN("each int is combined with the latest from the other source"){
    809 
    810             auto res = w.start(
    811                 [&]() {
    812                     return emp
    813                         .combine_latest(
    814                             [](int v2, int v1){
    815                                 return v2 + v1;
    816                             },
    817                             err
    818                         )
    819                         // forget type to workaround lambda deduction bug on msvc 2013
    820                         .as_dynamic();
    821                 }
    822             );
    823 
    824             THEN("the output contains only error message"){
    825                 auto required = rxu::to_vector({
    826                     on.error(220, ex)
    827                 });
    828                 auto actual = res.get_observer().messages();
    829                 REQUIRE(required == actual);
    830             }
    831 
    832             THEN("there was one subscription and one unsubscription to the emp"){
    833                 auto required = rxu::to_vector({
    834                     on.subscribe(200, 220)
    835                 });
    836                 auto actual = emp.subscriptions();
    837                 REQUIRE(required == actual);
    838             }
    839 
    840             THEN("there was one subscription and one unsubscription to the err"){
    841                 auto required = rxu::to_vector({
    842                     on.subscribe(200, 220)
    843                 });
    844                 auto actual = err.subscriptions();
    845                 REQUIRE(required == actual);
    846             }
    847         }
    848     }
    849 }
    850 
    851 SCENARIO("combine_latest error/empty", "[combine_latest][join][operators]"){
    852     GIVEN("2 hot observables of ints."){
    853         auto sc = rxsc::make_test();
    854         auto w = sc.create_worker();
    855         const rxsc::test::messages<int> on;
    856 
    857         std::runtime_error ex("combine_latest on_error from source");
    858 
    859         auto err = sc.make_hot_observable({
    860             on.next(150, 1),
    861             on.error(220, ex)
    862         });
    863 
    864         auto emp = sc.make_hot_observable({
    865             on.next(150, 1),
    866             on.completed(230)
    867         });
    868 
    869         WHEN("each int is combined with the latest from the other source"){
    870 
    871             auto res = w.start(
    872                 [&]() {
    873                     return err
    874                         .combine_latest(
    875                             [](int v2, int v1){
    876                                 return v2 + v1;
    877                             },
    878                             emp
    879                         )
    880                         // forget type to workaround lambda deduction bug on msvc 2013
    881                         .as_dynamic();
    882                 }
    883             );
    884 
    885             THEN("the output contains only error message"){
    886                 auto required = rxu::to_vector({
    887                     on.error(220, ex)
    888                 });
    889                 auto actual = res.get_observer().messages();
    890                 REQUIRE(required == actual);
    891             }
    892 
    893             THEN("there was one subscription and one unsubscription to the emp"){
    894                 auto required = rxu::to_vector({
    895                     on.subscribe(200, 220)
    896                 });
    897                 auto actual = emp.subscriptions();
    898                 REQUIRE(required == actual);
    899             }
    900 
    901             THEN("there was one subscription and one unsubscription to the err"){
    902                 auto required = rxu::to_vector({
    903                     on.subscribe(200, 220)
    904                 });
    905                 auto actual = err.subscriptions();
    906                 REQUIRE(required == actual);
    907             }
    908         }
    909     }
    910 }
    911 
    912 SCENARIO("combine_latest return/error", "[combine_latest][join][operators]"){
    913     GIVEN("2 hot observables of ints."){
    914         auto sc = rxsc::make_test();
    915         auto w = sc.create_worker();
    916         const rxsc::test::messages<int> on;
    917 
    918         std::runtime_error ex("combine_latest on_error from source");
    919 
    920         auto o = sc.make_hot_observable({
    921             on.next(150, 1),
    922             on.next(210, 2),
    923             on.completed(230)
    924         });
    925 
    926         auto err = sc.make_hot_observable({
    927             on.next(150, 1),
    928             on.error(220, ex)
    929         });
    930 
    931         WHEN("each int is combined with the latest from the other source"){
    932 
    933             auto res = w.start(
    934                 [&]() {
    935                     return o
    936                         .combine_latest(
    937                             [](int v2, int v1){
    938                                 return v2 + v1;
    939                             },
    940                             err
    941                         )
    942                         // forget type to workaround lambda deduction bug on msvc 2013
    943                         .as_dynamic();
    944                 }
    945             );
    946 
    947             THEN("the output contains only error message"){
    948                 auto required = rxu::to_vector({
    949                     on.error(220, ex)
    950                 });
    951                 auto actual = res.get_observer().messages();
    952                 REQUIRE(required == actual);
    953             }
    954 
    955             THEN("there was one subscription and one unsubscription to the ret"){
    956                 auto required = rxu::to_vector({
    957                     on.subscribe(200, 220)
    958                 });
    959                 auto actual = o.subscriptions();
    960                 REQUIRE(required == actual);
    961             }
    962 
    963             THEN("there was one subscription and one unsubscription to the err"){
    964                 auto required = rxu::to_vector({
    965                     on.subscribe(200, 220)
    966                 });
    967                 auto actual = err.subscriptions();
    968                 REQUIRE(required == actual);
    969             }
    970         }
    971     }
    972 }
    973 
    974 SCENARIO("combine_latest error/return", "[combine_latest][join][operators]"){
    975     GIVEN("2 hot observables of ints."){
    976         auto sc = rxsc::make_test();
    977         auto w = sc.create_worker();
    978         const rxsc::test::messages<int> on;
    979 
    980         std::runtime_error ex("combine_latest on_error from source");
    981 
    982         auto err = sc.make_hot_observable({
    983             on.next(150, 1),
    984             on.error(220, ex)
    985         });
    986 
    987         auto ret = sc.make_hot_observable({
    988             on.next(150, 1),
    989             on.next(210, 2),
    990             on.completed(230)
    991         });
    992 
    993         WHEN("each int is combined with the latest from the other source"){
    994 
    995             auto res = w.start(
    996                 [&]() {
    997                     return err
    998                         .combine_latest(
    999                             [](int v2, int v1){
   1000                                 return v2 + v1;
   1001                             },
   1002                             ret
   1003                         )
   1004                         // forget type to workaround lambda deduction bug on msvc 2013
   1005                         .as_dynamic();
   1006                 }
   1007             );
   1008 
   1009             THEN("the output contains only error message"){
   1010                 auto required = rxu::to_vector({
   1011                     on.error(220, ex)
   1012                 });
   1013                 auto actual = res.get_observer().messages();
   1014                 REQUIRE(required == actual);
   1015             }
   1016 
   1017             THEN("there was one subscription and one unsubscription to the ret"){
   1018                 auto required = rxu::to_vector({
   1019                     on.subscribe(200, 220)
   1020                 });
   1021                 auto actual = ret.subscriptions();
   1022                 REQUIRE(required == actual);
   1023             }
   1024 
   1025             THEN("there was one subscription and one unsubscription to the err"){
   1026                 auto required = rxu::to_vector({
   1027                     on.subscribe(200, 220)
   1028                 });
   1029                 auto actual = err.subscriptions();
   1030                 REQUIRE(required == actual);
   1031             }
   1032         }
   1033     }
   1034 }
   1035 
   1036 SCENARIO("combine_latest error/error", "[combine_latest][join][operators]"){
   1037     GIVEN("2 hot observables of ints."){
   1038         auto sc = rxsc::make_test();
   1039         auto w = sc.create_worker();
   1040         const rxsc::test::messages<int> on;
   1041 
   1042         std::runtime_error ex1("combine_latest on_error from source 1");
   1043         std::runtime_error ex2("combine_latest on_error from source 2");
   1044 
   1045         auto err1 = sc.make_hot_observable({
   1046             on.next(150, 1),
   1047             on.error(220, ex1)
   1048         });
   1049 
   1050         auto err2 = sc.make_hot_observable({
   1051             on.next(150, 1),
   1052             on.error(230, ex2)
   1053         });
   1054 
   1055         WHEN("each int is combined with the latest from the other source"){
   1056 
   1057             auto res = w.start(
   1058                 [&]() {
   1059                     return err1
   1060                         .combine_latest(
   1061                             [](int v2, int v1){
   1062                                 return v2 + v1;
   1063                             },
   1064                             err2
   1065                         )
   1066                         // forget type to workaround lambda deduction bug on msvc 2013
   1067                         .as_dynamic();
   1068                 }
   1069             );
   1070 
   1071             THEN("the output contains only error message"){
   1072                 auto required = rxu::to_vector({
   1073                     on.error(220, ex1)
   1074                 });
   1075                 auto actual = res.get_observer().messages();
   1076                 REQUIRE(required == actual);
   1077             }
   1078 
   1079             THEN("there was one subscription and one unsubscription to the err1"){
   1080                 auto required = rxu::to_vector({
   1081                     on.subscribe(200, 220)
   1082                 });
   1083                 auto actual = err1.subscriptions();
   1084                 REQUIRE(required == actual);
   1085             }
   1086 
   1087             THEN("there was one subscription and one unsubscription to the err2"){
   1088                 auto required = rxu::to_vector({
   1089                     on.subscribe(200, 220)
   1090                 });
   1091                 auto actual = err2.subscriptions();
   1092                 REQUIRE(required == actual);
   1093             }
   1094         }
   1095     }
   1096 }
   1097 
   1098 SCENARIO("combine_latest next+error/error", "[combine_latest][join][operators]"){
   1099     GIVEN("2 hot observables of ints."){
   1100         auto sc = rxsc::make_test();
   1101         auto w = sc.create_worker();
   1102         const rxsc::test::messages<int> on;
   1103 
   1104         std::runtime_error ex1("combine_latest on_error from source 1");
   1105         std::runtime_error ex2("combine_latest on_error from source 2");
   1106 
   1107         auto err1 = sc.make_hot_observable({
   1108             on.next(150, 1),
   1109             on.next(210, 2),
   1110             on.error(220, ex1)
   1111         });
   1112 
   1113         auto err2 = sc.make_hot_observable({
   1114             on.next(150, 1),
   1115             on.error(230, ex2)
   1116         });
   1117 
   1118         WHEN("each int is combined with the latest from the other source"){
   1119 
   1120             auto res = w.start(
   1121                 [&]() {
   1122                     return err1
   1123                         .combine_latest(
   1124                             [](int v2, int v1){
   1125                                 return v2 + v1;
   1126                             },
   1127                             err2
   1128                         )
   1129                         // forget type to workaround lambda deduction bug on msvc 2013
   1130                         .as_dynamic();
   1131                 }
   1132             );
   1133 
   1134             THEN("the output contains only error message"){
   1135                 auto required = rxu::to_vector({
   1136                     on.error(220, ex1)
   1137                 });
   1138                 auto actual = res.get_observer().messages();
   1139                 REQUIRE(required == actual);
   1140             }
   1141 
   1142             THEN("there was one subscription and one unsubscription to the err1"){
   1143                 auto required = rxu::to_vector({
   1144                     on.subscribe(200, 220)
   1145                 });
   1146                 auto actual = err1.subscriptions();
   1147                 REQUIRE(required == actual);
   1148             }
   1149 
   1150             THEN("there was one subscription and one unsubscription to the err2"){
   1151                 auto required = rxu::to_vector({
   1152                     on.subscribe(200, 220)
   1153                 });
   1154                 auto actual = err2.subscriptions();
   1155                 REQUIRE(required == actual);
   1156             }
   1157         }
   1158     }
   1159 }
   1160 
   1161 SCENARIO("combine_latest error/next+error", "[combine_latest][join][operators]"){
   1162     GIVEN("2 hot observables of ints."){
   1163         auto sc = rxsc::make_test();
   1164         auto w = sc.create_worker();
   1165         const rxsc::test::messages<int> on;
   1166 
   1167         std::runtime_error ex1("combine_latest on_error from source 1");
   1168         std::runtime_error ex2("combine_latest on_error from source 2");
   1169 
   1170         auto err1 = sc.make_hot_observable({
   1171             on.next(150, 1),
   1172             on.error(230, ex1)
   1173         });
   1174 
   1175         auto err2 = sc.make_hot_observable({
   1176             on.next(150, 1),
   1177             on.next(210, 2),
   1178             on.error(220, ex2)
   1179         });
   1180 
   1181         WHEN("each int is combined with the latest from the other source"){
   1182 
   1183             auto res = w.start(
   1184                 [&]() {
   1185                     return err1
   1186                         .combine_latest(
   1187                             [](int v2, int v1){
   1188                                 return v2 + v1;
   1189                             },
   1190                             err2
   1191                         )
   1192                         // forget type to workaround lambda deduction bug on msvc 2013
   1193                         .as_dynamic();
   1194                 }
   1195             );
   1196 
   1197             THEN("the output contains only error message"){
   1198                 auto required = rxu::to_vector({
   1199                     on.error(220, ex2)
   1200                 });
   1201                 auto actual = res.get_observer().messages();
   1202                 REQUIRE(required == actual);
   1203             }
   1204 
   1205             THEN("there was one subscription and one unsubscription to the err1"){
   1206                 auto required = rxu::to_vector({
   1207                     on.subscribe(200, 220)
   1208                 });
   1209                 auto actual = err1.subscriptions();
   1210                 REQUIRE(required == actual);
   1211             }
   1212 
   1213             THEN("there was one subscription and one unsubscription to the err2"){
   1214                 auto required = rxu::to_vector({
   1215                     on.subscribe(200, 220)
   1216                 });
   1217                 auto actual = err2.subscriptions();
   1218                 REQUIRE(required == actual);
   1219             }
   1220         }
   1221     }
   1222 }
   1223 
   1224 SCENARIO("combine_latest never/error", "[combine_latest][join][operators]"){
   1225     GIVEN("2 hot observables of ints."){
   1226         auto sc = rxsc::make_test();
   1227         auto w = sc.create_worker();
   1228         const rxsc::test::messages<int> on;
   1229 
   1230         std::runtime_error ex("combine_latest on_error from source");
   1231 
   1232         auto n = sc.make_hot_observable({
   1233             on.next(150, 1)
   1234         });
   1235 
   1236         auto err = sc.make_hot_observable({
   1237             on.next(150, 1),
   1238             on.error(220, ex)
   1239         });
   1240 
   1241         WHEN("each int is combined with the latest from the other source"){
   1242 
   1243             auto res = w.start(
   1244                 [&]() {
   1245                     return n
   1246                         .combine_latest(
   1247                             [](int v2, int v1){
   1248                                 return v2 + v1;
   1249                             },
   1250                             err
   1251                         )
   1252                         // forget type to workaround lambda deduction bug on msvc 2013
   1253                         .as_dynamic();
   1254                 }
   1255             );
   1256 
   1257             THEN("the output contains only error message"){
   1258                 auto required = rxu::to_vector({
   1259                     on.error(220, ex)
   1260                 });
   1261                 auto actual = res.get_observer().messages();
   1262                 REQUIRE(required == actual);
   1263             }
   1264 
   1265             THEN("there was one subscription and one unsubscription to the n"){
   1266                 auto required = rxu::to_vector({
   1267                     on.subscribe(200, 220)
   1268                 });
   1269                 auto actual = n.subscriptions();
   1270                 REQUIRE(required == actual);
   1271             }
   1272 
   1273             THEN("there was one subscription and one unsubscription to the err"){
   1274                 auto required = rxu::to_vector({
   1275                     on.subscribe(200, 220)
   1276                 });
   1277                 auto actual = err.subscriptions();
   1278                 REQUIRE(required == actual);
   1279             }
   1280         }
   1281     }
   1282 }
   1283 
   1284 SCENARIO("combine_latest error/never", "[combine_latest][join][operators]"){
   1285     GIVEN("2 hot observables of ints."){
   1286         auto sc = rxsc::make_test();
   1287         auto w = sc.create_worker();
   1288         const rxsc::test::messages<int> on;
   1289 
   1290         std::runtime_error ex("combine_latest on_error from source");
   1291 
   1292         auto err = sc.make_hot_observable({
   1293             on.next(150, 1),
   1294             on.error(220, ex)
   1295         });
   1296 
   1297         auto n = sc.make_hot_observable({
   1298             on.next(150, 1)
   1299         });
   1300 
   1301         WHEN("each int is combined with the latest from the other source"){
   1302 
   1303             auto res = w.start(
   1304                 [&]() {
   1305                     return err
   1306                         .combine_latest(
   1307                             [](int v2, int v1){
   1308                                 return v2 + v1;
   1309                             },
   1310                             n
   1311                         )
   1312                         // forget type to workaround lambda deduction bug on msvc 2013
   1313                         .as_dynamic();
   1314                 }
   1315             );
   1316 
   1317             THEN("the output contains only error message"){
   1318                 auto required = rxu::to_vector({
   1319                     on.error(220, ex)
   1320                 });
   1321                 auto actual = res.get_observer().messages();
   1322                 REQUIRE(required == actual);
   1323             }
   1324 
   1325             THEN("there was one subscription and one unsubscription to the n"){
   1326                 auto required = rxu::to_vector({
   1327                     on.subscribe(200, 220)
   1328                 });
   1329                 auto actual = n.subscriptions();
   1330                 REQUIRE(required == actual);
   1331             }
   1332 
   1333             THEN("there was one subscription and one unsubscription to the err"){
   1334                 auto required = rxu::to_vector({
   1335                     on.subscribe(200, 220)
   1336                 });
   1337                 auto actual = err.subscriptions();
   1338                 REQUIRE(required == actual);
   1339             }
   1340         }
   1341     }
   1342 }
   1343 
   1344 SCENARIO("combine_latest error after completed left", "[combine_latest][join][operators]"){
   1345     GIVEN("2 hot observables of ints."){
   1346         auto sc = rxsc::make_test();
   1347         auto w = sc.create_worker();
   1348         const rxsc::test::messages<int> on;
   1349 
   1350         std::runtime_error ex("combine_latest on_error from source");
   1351 
   1352         auto ret = sc.make_hot_observable({
   1353             on.next(150, 1),
   1354             on.next(210, 2),
   1355             on.completed(215)
   1356         });
   1357 
   1358         auto err = sc.make_hot_observable({
   1359             on.next(150, 1),
   1360             on.error(220, ex)
   1361         });
   1362 
   1363         WHEN("each int is combined with the latest from the other source"){
   1364 
   1365             auto res = w.start(
   1366                 [&]() {
   1367                     return ret
   1368                         .combine_latest(
   1369                             [](int v2, int v1){
   1370                                 return v2 + v1;
   1371                             },
   1372                             err
   1373                         )
   1374                         // forget type to workaround lambda deduction bug on msvc 2013
   1375                         .as_dynamic();
   1376                 }
   1377             );
   1378 
   1379             THEN("the output contains only error message"){
   1380                 auto required = rxu::to_vector({
   1381                     on.error(220, ex)
   1382                 });
   1383                 auto actual = res.get_observer().messages();
   1384                 REQUIRE(required == actual);
   1385             }
   1386 
   1387             THEN("there was one subscription and one unsubscription to the ret"){
   1388                 auto required = rxu::to_vector({
   1389                     on.subscribe(200, 215)
   1390                 });
   1391                 auto actual = ret.subscriptions();
   1392                 REQUIRE(required == actual);
   1393             }
   1394 
   1395             THEN("there was one subscription and one unsubscription to the err"){
   1396                 auto required = rxu::to_vector({
   1397                     on.subscribe(200, 220)
   1398                 });
   1399                 auto actual = err.subscriptions();
   1400                 REQUIRE(required == actual);
   1401             }
   1402         }
   1403     }
   1404 }
   1405 
   1406 SCENARIO("combine_latest error after completed right", "[combine_latest][join][operators]"){
   1407     GIVEN("2 hot observables of ints."){
   1408         auto sc = rxsc::make_test();
   1409         auto w = sc.create_worker();
   1410         const rxsc::test::messages<int> on;
   1411 
   1412         std::runtime_error ex("combine_latest on_error from source");
   1413 
   1414         auto err = sc.make_hot_observable({
   1415             on.next(150, 1),
   1416             on.error(220, ex)
   1417         });
   1418 
   1419         auto ret = sc.make_hot_observable({
   1420             on.next(150, 1),
   1421             on.next(210, 2),
   1422             on.completed(215)
   1423         });
   1424 
   1425         WHEN("each int is combined with the latest from the other source"){
   1426 
   1427             auto res = w.start(
   1428                 [&]() {
   1429                     return err
   1430                         .combine_latest(
   1431                             [](int v2, int v1){
   1432                                 return v2 + v1;
   1433                             },
   1434                             ret
   1435                         )
   1436                         // forget type to workaround lambda deduction bug on msvc 2013
   1437                         .as_dynamic();
   1438                 }
   1439             );
   1440 
   1441             THEN("the output contains only error message"){
   1442                 auto required = rxu::to_vector({
   1443                     on.error(220, ex)
   1444                 });
   1445                 auto actual = res.get_observer().messages();
   1446                 REQUIRE(required == actual);
   1447             }
   1448 
   1449             THEN("there was one subscription and one unsubscription to the ret"){
   1450                 auto required = rxu::to_vector({
   1451                     on.subscribe(200, 215)
   1452                 });
   1453                 auto actual = ret.subscriptions();
   1454                 REQUIRE(required == actual);
   1455             }
   1456 
   1457             THEN("there was one subscription and one unsubscription to the err"){
   1458                 auto required = rxu::to_vector({
   1459                     on.subscribe(200, 220)
   1460                 });
   1461                 auto actual = err.subscriptions();
   1462                 REQUIRE(required == actual);
   1463             }
   1464         }
   1465     }
   1466 }
   1467 
   1468 SCENARIO("combine_latest selector throws", "[combine_latest][join][operators][!throws]"){
   1469     GIVEN("2 hot observables of ints."){
   1470         auto sc = rxsc::make_test();
   1471         auto w = sc.create_worker();
   1472         const rxsc::test::messages<int> on;
   1473 
   1474         std::runtime_error ex("combine_latest on_error from source");
   1475 
   1476         auto o1 = sc.make_hot_observable({
   1477             on.next(150, 1),
   1478             on.next(215, 2),
   1479             on.completed(230)
   1480         });
   1481 
   1482         auto o2 = sc.make_hot_observable({
   1483             on.next(150, 1),
   1484             on.next(220, 3),
   1485             on.completed(240)
   1486         });
   1487 
   1488         WHEN("each int is combined with the latest from the other source"){
   1489 
   1490             auto res = w.start(
   1491                 [&]() {
   1492                     return o1
   1493                         .combine_latest(
   1494                             // Note for trying to handle this test case when exceptions are disabled
   1495                             // with RXCPP_USE_EXCEPTIONS == 0:
   1496                             //
   1497                             // It seems that this test is in particular testing that the
   1498                             // combine_latest selector (aggregate function) thrown exceptions
   1499                             // are being translated into an on_error.
   1500                             //
   1501                             // Since there appears to be no way to give combine_latest
   1502                             // an Observable that would call on_error directly (as opposed
   1503                             // to a regular function that's converted into an observable),
   1504                             // this test is meaningless when exceptions are disabled
   1505                             // since any selectors with 'throw' will not even compile.
   1506                             //
   1507                             // Attempting to change this to e.g.
   1508                             //    o1.combineLatest(o2).map ... unconditional onError
   1509                             // would defeat the purpose of the test since its the combineLatest
   1510                             // implementation that's supposed to be doing the error forwarding.
   1511                             [&ex](int, int) -> int {
   1512                                 rxu::throw_exception(ex);
   1513                             },
   1514                             o2
   1515                         )
   1516                         // forget type to workaround lambda deduction bug on msvc 2013
   1517                         .as_dynamic();
   1518                 }
   1519             );
   1520 
   1521             THEN("the output contains only error"){
   1522                 auto required = rxu::to_vector({
   1523                     on.error(220, ex)
   1524                 });
   1525                 auto actual = res.get_observer().messages();
   1526                 REQUIRE(required == actual);
   1527             }
   1528 
   1529             THEN("there was one subscription and one unsubscription to the o1"){
   1530                 auto required = rxu::to_vector({
   1531                     on.subscribe(200, 220)
   1532                 });
   1533                 auto actual = o1.subscriptions();
   1534                 REQUIRE(required == actual);
   1535             }
   1536 
   1537             THEN("there was one subscription and one unsubscription to the o2"){
   1538                 auto required = rxu::to_vector({
   1539                     on.subscribe(200, 220)
   1540                 });
   1541                 auto actual = o2.subscriptions();
   1542                 REQUIRE(required == actual);
   1543             }
   1544         }
   1545     }
   1546 }
   1547 
   1548 SCENARIO("combine_latest selector throws N", "[combine_latest][join][operators][!throws]"){
   1549     GIVEN("N hot observables of ints."){
   1550         auto sc = rxsc::make_test();
   1551         auto w = sc.create_worker();
   1552         const rxsc::test::messages<int> on;
   1553 
   1554         const int N = 4;
   1555 
   1556         std::runtime_error ex("combine_latest on_error from source");
   1557 
   1558         std::vector<rxcpp::test::testable_observable<int>> e;
   1559         for (int i = 0; i < N; ++i) {
   1560             e.push_back(
   1561                 sc.make_hot_observable({
   1562                     on.next(210 + 10 * i, 1),
   1563                     on.completed(500)
   1564                 })
   1565             );
   1566         }
   1567 
   1568         WHEN("each int is combined with the latest from the other source"){
   1569 
   1570             auto res = w.start(
   1571                 [&]() {
   1572                     return e[0]
   1573                         .combine_latest(
   1574                             [&ex](int, int, int, int) -> int {
   1575                                 rxu::throw_exception(ex);
   1576                             },
   1577                             e[1], e[2], e[3]
   1578                         )
   1579                         // forget type to workaround lambda deduction bug on msvc 2013
   1580                         .as_dynamic();
   1581                 }
   1582             );
   1583 
   1584             THEN("the output contains only error"){
   1585                 auto required = rxu::to_vector({
   1586                     on.error(200 + 10 * N, ex)
   1587                 });
   1588                 auto actual = res.get_observer().messages();
   1589                 REQUIRE(required == actual);
   1590             }
   1591 
   1592             THEN("there was one subscription and one unsubscription to each observable"){
   1593 
   1594                 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){
   1595                     auto required = rxu::to_vector({
   1596                         on.subscribe(200, 200 + 10 * N)
   1597                     });
   1598                     auto actual = s.subscriptions();
   1599                     REQUIRE(required == actual);
   1600                 });
   1601             }
   1602         }
   1603     }
   1604 }
   1605 
   1606 SCENARIO("combine_latest typical N", "[combine_latest][join][operators]"){
   1607     GIVEN("N hot observables of ints."){
   1608         auto sc = rxsc::make_test();
   1609         auto w = sc.create_worker();
   1610         const rxsc::test::messages<int> on;
   1611 
   1612         const int N = 4;
   1613 
   1614         std::vector<rxcpp::test::testable_observable<int>> o;
   1615         for (int i = 0; i < N; ++i) {
   1616             o.push_back(
   1617                 sc.make_hot_observable({
   1618                     on.next(150, 1),
   1619                     on.next(210 + 10 * i, i + 1),
   1620                     on.next(410 + 10 * i, i + N + 1),
   1621                     on.completed(800)
   1622                 })
   1623             );
   1624         }
   1625 
   1626         WHEN("each int is combined with the latest from the other source"){
   1627 
   1628             auto res = w.start(
   1629                 [&]() {
   1630                     return o[0]
   1631                         .combine_latest(
   1632                             [](int v0, int v1, int v2, int v3) {
   1633                                 return v0 + v1 + v2 + v3;
   1634                             },
   1635                             o[1], o[2], o[3]
   1636                         )
   1637                         // forget type to workaround lambda deduction bug on msvc 2013
   1638                         .as_dynamic();
   1639                 }
   1640             );
   1641 
   1642             THEN("the output contains combined ints"){
   1643                 auto required = rxu::to_vector({
   1644                     on.next(200 + 10 * N, N * (N + 1) / 2)
   1645                 });
   1646                 for (int i = 0; i < N; ++i) {
   1647                     required.push_back(on.next(410 + 10 * i, N * (N + 1) / 2 + N + N * i));
   1648                 }
   1649                 required.push_back(on.completed(800));
   1650                 auto actual = res.get_observer().messages();
   1651                 REQUIRE(required == actual);
   1652             }
   1653 
   1654             THEN("there was one subscription and one unsubscription to each observable"){
   1655 
   1656                 std::for_each(o.begin(), o.end(), [&](rxcpp::test::testable_observable<int> &s){
   1657                     auto required = rxu::to_vector({
   1658                         on.subscribe(200, 800)
   1659                     });
   1660                     auto actual = s.subscriptions();
   1661                     REQUIRE(required == actual);
   1662                 });
   1663             }
   1664         }
   1665     }
   1666 }
   1667