Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include <rxcpp/operators/rx-zip.hpp>
      3 
      4 SCENARIO("zip never/never", "[zip][join][operators]"){
      5     GIVEN("2 hot observables of ints."){
      6         auto sc = rxsc::make_test();
      7         auto w = sc.create_worker();
      8         const rxsc::test::messages<int> on;
      9 
     10         auto n1 = sc.make_hot_observable({
     11             on.next(150, 1)
     12         });
     13 
     14         auto n2 = sc.make_hot_observable({
     15             on.next(150, 1)
     16         });
     17 
     18         WHEN("each int is combined with the latest from the other source"){
     19 
     20             auto res = w.start(
     21                 [&]() {
     22                     return n1
     23                         | rxo::zip(
     24                             [](int v2, int v1){
     25                                 return v2 + v1;
     26                             },
     27                             n2
     28                         )
     29                         // forget type to workaround lambda deduction bug on msvc 2013
     30                         | rxo::as_dynamic();
     31                 }
     32             );
     33 
     34             THEN("the output is empty"){
     35                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
     36                 auto actual = res.get_observer().messages();
     37                 REQUIRE(required == actual);
     38             }
     39 
     40             THEN("there was one subscription and one unsubscription to the n1"){
     41                 auto required = rxu::to_vector({
     42                     on.subscribe(200, 1000)
     43                 });
     44                 auto actual = n1.subscriptions();
     45                 REQUIRE(required == actual);
     46             }
     47 
     48             THEN("there was one subscription and one unsubscription to the n2"){
     49                 auto required = rxu::to_vector({
     50                     on.subscribe(200, 1000)
     51                 });
     52                 auto actual = n2.subscriptions();
     53                 REQUIRE(required == actual);
     54             }
     55         }
     56     }
     57 }
     58 
     59 SCENARIO("zip never N", "[zip][join][operators]"){
     60     GIVEN("N never completed hot observables of ints."){
     61         auto sc = rxsc::make_test();
     62         auto w = sc.create_worker();
     63         const rxsc::test::messages<int> on;
     64 
     65         const std::size_t N = 4;
     66 
     67         std::vector<rxcpp::test::testable_observable<int>> n;
     68         for (std::size_t i = 0; i < N; ++i) {
     69             n.push_back(
     70                 sc.make_hot_observable({
     71                     on.next(150, 1)
     72                 })
     73             );
     74         }
     75 
     76         WHEN("each int is combined with the latest from the other source"){
     77 
     78             auto res = w.start(
     79                 [&]() {
     80                     return n[0]
     81                         | rxo::zip(
     82                             [](int v0, int v1, int v2, int v3){
     83                                 return v0 + v1 + v2 + v3;
     84                             },
     85                             n[1], n[2], n[3]
     86                         )
     87                         // forget type to workaround lambda deduction bug on msvc 2013
     88                         | rxo::as_dynamic();
     89                 }
     90             );
     91 
     92             THEN("the output is empty"){
     93                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
     94                 auto actual = res.get_observer().messages();
     95                 REQUIRE(required == actual);
     96             }
     97 
     98             THEN("there was one subscription and one unsubscription to each observable"){
     99 
    100                 std::for_each(n.begin(), n.end(), [&](rxcpp::test::testable_observable<int> &s){
    101                     auto required = rxu::to_vector({
    102                         on.subscribe(200, 1000)
    103                     });
    104                     auto actual = s.subscriptions();
    105                     REQUIRE(required == actual);
    106                 });
    107             }
    108         }
    109     }
    110 }
    111 
    112 SCENARIO("zip never/empty", "[zip][join][operators]"){
    113     GIVEN("2 hot observables of ints."){
    114         auto sc = rxsc::make_test();
    115         auto w = sc.create_worker();
    116         const rxsc::test::messages<int> on;
    117 
    118         auto n = sc.make_hot_observable({
    119             on.next(150, 1)
    120         });
    121 
    122         auto e = sc.make_hot_observable({
    123             on.next(150, 1),
    124             on.completed(210)
    125         });
    126 
    127         WHEN("each int is combined with the latest from the other source"){
    128 
    129             auto res = w.start(
    130                 [&]() {
    131                     return n
    132                         .zip(
    133                             [](int v2, int v1){
    134                                 return v2 + v1;
    135                             },
    136                             e
    137                         )
    138                         // forget type to workaround lambda deduction bug on msvc 2013
    139                         .as_dynamic();
    140                 }
    141             );
    142 
    143             THEN("the output is empty"){
    144                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    145                 auto actual = res.get_observer().messages();
    146                 REQUIRE(required == actual);
    147             }
    148 
    149             THEN("there was one subscription and one unsubscription to the n"){
    150                 auto required = rxu::to_vector({
    151                     on.subscribe(200, 1000)
    152                 });
    153                 auto actual = n.subscriptions();
    154                 REQUIRE(required == actual);
    155             }
    156 
    157             THEN("there was one subscription and one unsubscription to the e"){
    158                 auto required = rxu::to_vector({
    159                     on.subscribe(200, 210)
    160                 });
    161                 auto actual = e.subscriptions();
    162                 REQUIRE(required == actual);
    163             }
    164         }
    165     }
    166 }
    167 
    168 SCENARIO("zip empty/never", "[zip][join][operators]"){
    169     GIVEN("2 hot observables of ints."){
    170         auto sc = rxsc::make_test();
    171         auto w = sc.create_worker();
    172         const rxsc::test::messages<int> on;
    173 
    174         auto e = sc.make_hot_observable({
    175             on.next(150, 1),
    176             on.completed(210)
    177         });
    178 
    179         auto n = sc.make_hot_observable({
    180             on.next(150, 1)
    181         });
    182 
    183         WHEN("each int is combined with the latest from the other source"){
    184 
    185             auto res = w.start(
    186                 [&]() {
    187                     return e
    188                         .zip(
    189                             [](int v2, int v1){
    190                                 return v2 + v1;
    191                             },
    192                             n
    193                         )
    194                         // forget type to workaround lambda deduction bug on msvc 2013
    195                         .as_dynamic();
    196                 }
    197             );
    198 
    199             THEN("the output is empty"){
    200                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    201                 auto actual = res.get_observer().messages();
    202                 REQUIRE(required == actual);
    203             }
    204 
    205             THEN("there was one subscription and one unsubscription to the e"){
    206                 auto required = rxu::to_vector({
    207                     on.subscribe(200, 210)
    208                 });
    209                 auto actual = e.subscriptions();
    210                 REQUIRE(required == actual);
    211             }
    212 
    213             THEN("there was one subscription and one unsubscription to the n"){
    214                 auto required = rxu::to_vector({
    215                     on.subscribe(200, 1000)
    216                 });
    217                 auto actual = n.subscriptions();
    218                 REQUIRE(required == actual);
    219             }
    220         }
    221     }
    222 }
    223 
    224 SCENARIO("zip empty/empty", "[zip][join][operators]"){
    225     GIVEN("2 hot observables of ints."){
    226         auto sc = rxsc::make_test();
    227         auto w = sc.create_worker();
    228         const rxsc::test::messages<int> on;
    229 
    230         auto e1 = sc.make_hot_observable({
    231             on.next(150, 1),
    232             on.completed(210)
    233         });
    234 
    235         auto e2 = sc.make_hot_observable({
    236             on.next(150, 1),
    237             on.completed(210)
    238         });
    239 
    240         WHEN("each int is combined with the latest from the other source"){
    241 
    242             auto res = w.start(
    243                 [&]() {
    244                     return e1
    245                         .zip(
    246                             [](int v2, int v1){
    247                                 return v2 + v1;
    248                             },
    249                             e2
    250                         )
    251                         // forget type to workaround lambda deduction bug on msvc 2013
    252                         .as_dynamic();
    253                 }
    254             );
    255 
    256             THEN("the output contains only complete message"){
    257                 auto required = rxu::to_vector({
    258                     on.completed(210)
    259                 });
    260                 auto actual = res.get_observer().messages();
    261                 REQUIRE(required == actual);
    262             }
    263 
    264             THEN("there was one subscription and one unsubscription to the e"){
    265                 auto required = rxu::to_vector({
    266                     on.subscribe(200, 210)
    267                 });
    268                 auto actual = e1.subscriptions();
    269                 REQUIRE(required == actual);
    270             }
    271 
    272             THEN("there was one subscription and one unsubscription to the n"){
    273                 auto required = rxu::to_vector({
    274                     on.subscribe(200, 210)
    275                 });
    276                 auto actual = e2.subscriptions();
    277                 REQUIRE(required == actual);
    278             }
    279         }
    280     }
    281 }
    282 
    283 SCENARIO("zip empty N", "[zip][join][operators]"){
    284     GIVEN("N empty hot observables of ints."){
    285         auto sc = rxsc::make_test();
    286         auto w = sc.create_worker();
    287         const rxsc::test::messages<int> on;
    288 
    289         const int N = 4;
    290 
    291         std::vector<rxcpp::test::testable_observable<int>> e;
    292         for (int i = 0; i < N; ++i) {
    293             e.push_back(
    294                 sc.make_hot_observable({
    295                     on.next(150, 1),
    296                     on.completed(210 + 10 * i)
    297                 })
    298             );
    299         }
    300 
    301         WHEN("each int is combined with the latest from the other source"){
    302 
    303             auto res = w.start(
    304                 [&]() {
    305                     return e[0]
    306                         .zip(
    307                             [](int v0, int v1, int v2, int v3){
    308                                 return v0 + v1 + v2 + v3;
    309                             },
    310                             e[1], e[2], e[3]
    311                         )
    312                         // forget type to workaround lambda deduction bug on msvc 2013
    313                         .as_dynamic();
    314                 }
    315             );
    316 
    317             THEN("the output contains only complete message"){
    318                 auto required = rxu::to_vector({
    319                     on.completed(200 + 10 * N)
    320                 });
    321                 auto actual = res.get_observer().messages();
    322                 REQUIRE(required == actual);
    323             }
    324 
    325             THEN("there was one subscription and one unsubscription to each observable"){
    326 
    327                 int i = 0;
    328                 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){
    329                     auto required = rxu::to_vector({
    330                         on.subscribe(200, 200 + 10 * ++i)
    331                     });
    332                     auto actual = s.subscriptions();
    333                     REQUIRE(required == actual);
    334                 });
    335             }
    336         }
    337     }
    338 }
    339 
    340 SCENARIO("zip empty/return", "[zip][join][operators]"){
    341     GIVEN("2 hot observables of ints."){
    342         auto sc = rxsc::make_test();
    343         auto w = sc.create_worker();
    344         const rxsc::test::messages<int> on;
    345 
    346         auto e = sc.make_hot_observable({
    347             on.next(150, 1),
    348             on.completed(210)
    349         });
    350 
    351         auto o = sc.make_hot_observable({
    352             on.next(150, 1),
    353             on.next(215, 2),
    354             on.completed(220)
    355         });
    356 
    357         WHEN("each int is combined with the latest from the other source"){
    358 
    359             auto res = w.start(
    360                 [&]() {
    361                     return e
    362                         .zip(
    363                             [](int v2, int v1){
    364                                 return v2 + v1;
    365                             },
    366                             o
    367                         )
    368                         // forget type to workaround lambda deduction bug on msvc 2013
    369                         .as_dynamic();
    370                 }
    371             );
    372 
    373             THEN("the output contains only complete message"){
    374                 auto required = rxu::to_vector({
    375                     on.completed(215)
    376                 });
    377                 auto actual = res.get_observer().messages();
    378                 REQUIRE(required == actual);
    379             }
    380 
    381             THEN("there was one subscription and one unsubscription to the e"){
    382                 auto required = rxu::to_vector({
    383                     on.subscribe(200, 210)
    384                 });
    385                 auto actual = e.subscriptions();
    386                 REQUIRE(required == actual);
    387             }
    388 
    389             THEN("there was one subscription and one unsubscription to the o"){
    390                 auto required = rxu::to_vector({
    391                     on.subscribe(200, 215)
    392                 });
    393                 auto actual = o.subscriptions();
    394                 REQUIRE(required == actual);
    395             }
    396         }
    397     }
    398 }
    399 
    400 SCENARIO("zip return/empty", "[zip][join][operators]"){
    401     GIVEN("2 hot observables of ints."){
    402         auto sc = rxsc::make_test();
    403         auto w = sc.create_worker();
    404         const rxsc::test::messages<int> on;
    405 
    406         auto o = sc.make_hot_observable({
    407             on.next(150, 1),
    408             on.next(215, 2),
    409             on.completed(220)
    410         });
    411 
    412         auto e = sc.make_hot_observable({
    413             on.next(150, 1),
    414             on.completed(210)
    415         });
    416 
    417         WHEN("each int is combined with the latest from the other source"){
    418 
    419             auto res = w.start(
    420                 [&]() {
    421                     return o
    422                         .zip(
    423                             [](int v2, int v1){
    424                                 return v2 + v1;
    425                             },
    426                             e
    427                         )
    428                         // forget type to workaround lambda deduction bug on msvc 2013
    429                         .as_dynamic();
    430                 }
    431             );
    432 
    433             THEN("the output contains only complete message"){
    434                 auto required = rxu::to_vector({
    435                     on.completed(215)
    436                 });
    437                 auto actual = res.get_observer().messages();
    438                 REQUIRE(required == actual);
    439             }
    440 
    441             THEN("there was one subscription and one unsubscription to the o"){
    442                 auto required = rxu::to_vector({
    443                     on.subscribe(200, 215)
    444                 });
    445                 auto actual = o.subscriptions();
    446                 REQUIRE(required == actual);
    447             }
    448 
    449             THEN("there was one subscription and one unsubscription to the e"){
    450                 auto required = rxu::to_vector({
    451                     on.subscribe(200, 210)
    452                 });
    453                 auto actual = e.subscriptions();
    454                 REQUIRE(required == actual);
    455             }
    456         }
    457     }
    458 }
    459 
    460 SCENARIO("zip never/return", "[zip][join][operators]"){
    461     GIVEN("2 hot observables of ints."){
    462         auto sc = rxsc::make_test();
    463         auto w = sc.create_worker();
    464         const rxsc::test::messages<int> on;
    465 
    466         auto n = sc.make_hot_observable({
    467             on.next(150, 1)
    468         });
    469 
    470         auto o = sc.make_hot_observable({
    471             on.next(150, 1),
    472             on.next(215, 2),
    473             on.completed(220)
    474         });
    475 
    476         WHEN("each int is combined with the latest from the other source"){
    477 
    478             auto res = w.start(
    479                 [&]() {
    480                     return n
    481                         .zip(
    482                             [](int v2, int v1){
    483                                 return v2 + v1;
    484                             },
    485                             o
    486                         )
    487                         // forget type to workaround lambda deduction bug on msvc 2013
    488                         .as_dynamic();
    489                 }
    490             );
    491 
    492             THEN("the output is empty"){
    493                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    494                 auto actual = res.get_observer().messages();
    495                 REQUIRE(required == actual);
    496             }
    497 
    498             THEN("there was one subscription and one unsubscription to the n"){
    499                 auto required = rxu::to_vector({
    500                     on.subscribe(200, 1000)
    501                 });
    502                 auto actual = n.subscriptions();
    503                 REQUIRE(required == actual);
    504             }
    505 
    506             THEN("there was one subscription and one unsubscription to the o"){
    507                 auto required = rxu::to_vector({
    508                     on.subscribe(200, 220)
    509                 });
    510                 auto actual = o.subscriptions();
    511                 REQUIRE(required == actual);
    512             }
    513         }
    514     }
    515 }
    516 
    517 SCENARIO("zip return/never", "[zip][join][operators]"){
    518     GIVEN("2 hot observables of ints."){
    519         auto sc = rxsc::make_test();
    520         auto w = sc.create_worker();
    521         const rxsc::test::messages<int> on;
    522 
    523         auto o = sc.make_hot_observable({
    524             on.next(150, 1),
    525             on.next(215, 2),
    526             on.completed(220)
    527         });
    528 
    529         auto n = sc.make_hot_observable({
    530             on.next(150, 1)
    531         });
    532 
    533         WHEN("each int is combined with the latest from the other source"){
    534 
    535             auto res = w.start(
    536                 [&]() {
    537                     return o
    538                         .zip(
    539                             [](int v2, int v1){
    540                                 return v2 + v1;
    541                             },
    542                             n
    543                         )
    544                         // forget type to workaround lambda deduction bug on msvc 2013
    545                         .as_dynamic();
    546                 }
    547             );
    548 
    549             THEN("the output is empty"){
    550                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    551                 auto actual = res.get_observer().messages();
    552                 REQUIRE(required == actual);
    553             }
    554 
    555             THEN("there was one subscription and one unsubscription to the n"){
    556                 auto required = rxu::to_vector({
    557                     on.subscribe(200, 1000)
    558                 });
    559                 auto actual = n.subscriptions();
    560                 REQUIRE(required == actual);
    561             }
    562 
    563             THEN("there was one subscription and one unsubscription to the o"){
    564                 auto required = rxu::to_vector({
    565                     on.subscribe(200, 220)
    566                 });
    567                 auto actual = o.subscriptions();
    568                 REQUIRE(required == actual);
    569             }
    570         }
    571     }
    572 }
    573 
    574 SCENARIO("zip return/return", "[zip][join][operators]"){
    575     GIVEN("2 hot observables of ints."){
    576         auto sc = rxsc::make_test();
    577         auto w = sc.create_worker();
    578         const rxsc::test::messages<int> on;
    579 
    580         auto o1 = sc.make_hot_observable({
    581             on.next(150, 1),
    582             on.next(215, 2),
    583             on.completed(230)
    584         });
    585 
    586         auto o2 = sc.make_hot_observable({
    587             on.next(150, 1),
    588             on.next(220, 3),
    589             on.completed(240)
    590         });
    591 
    592         WHEN("each int is combined with the latest from the other source"){
    593 
    594             auto res = w.start(
    595                 [&]() {
    596                     return o1
    597                         .zip(
    598                             [](int v2, int v1){
    599                              return v2 + v1;
    600                             },
    601                             o2
    602                         )
    603                         // forget type to workaround lambda deduction bug on msvc 2013
    604                         .as_dynamic();
    605                 }
    606             );
    607 
    608             THEN("the output contains combined ints"){
    609                 auto required = rxu::to_vector({
    610                     on.next(220, 2 + 3),
    611                     on.completed(240)
    612                 });
    613                 auto actual = res.get_observer().messages();
    614                 REQUIRE(required == actual);
    615             }
    616 
    617             THEN("there was one subscription and one unsubscription to the o1"){
    618                 auto required = rxu::to_vector({
    619                     on.subscribe(200, 230)
    620                 });
    621                 auto actual = o1.subscriptions();
    622                 REQUIRE(required == actual);
    623             }
    624 
    625             THEN("there was one subscription and one unsubscription to the o2"){
    626                 auto required = rxu::to_vector({
    627                     on.subscribe(200, 240)
    628                 });
    629                 auto actual = o2.subscriptions();
    630                 REQUIRE(required == actual);
    631             }
    632         }
    633     }
    634 }
    635 
    636 SCENARIO("zip empty/error", "[zip][join][operators]"){
    637     GIVEN("2 hot observables of ints."){
    638         auto sc = rxsc::make_test();
    639         auto w = sc.create_worker();
    640         const rxsc::test::messages<int> on;
    641 
    642         std::runtime_error ex("zip on_error from source");
    643 
    644         auto emp = sc.make_hot_observable({
    645             on.next(150, 1),
    646             on.completed(230)
    647         });
    648 
    649         auto err = sc.make_hot_observable({
    650             on.next(150, 1),
    651             on.error(220, ex)
    652         });
    653 
    654         WHEN("each int is combined with the latest from the other source"){
    655 
    656             auto res = w.start(
    657                 [&]() {
    658                     return emp
    659                         .zip(
    660                             [](int v2, int v1){
    661                                 return v2 + v1;
    662                             },
    663                             err
    664                         )
    665                         // forget type to workaround lambda deduction bug on msvc 2013
    666                         .as_dynamic();
    667                 }
    668             );
    669 
    670             THEN("the output contains only error message"){
    671                 auto required = rxu::to_vector({
    672                     on.error(220, ex)
    673                 });
    674                 auto actual = res.get_observer().messages();
    675                 REQUIRE(required == actual);
    676             }
    677 
    678             THEN("there was one subscription and one unsubscription to the emp"){
    679                 auto required = rxu::to_vector({
    680                     on.subscribe(200, 220)
    681                 });
    682                 auto actual = emp.subscriptions();
    683                 REQUIRE(required == actual);
    684             }
    685 
    686             THEN("there was one subscription and one unsubscription to the err"){
    687                 auto required = rxu::to_vector({
    688                     on.subscribe(200, 220)
    689                 });
    690                 auto actual = err.subscriptions();
    691                 REQUIRE(required == actual);
    692             }
    693         }
    694     }
    695 }
    696 
    697 SCENARIO("zip error/empty", "[zip][join][operators]"){
    698     GIVEN("2 hot observables of ints."){
    699         auto sc = rxsc::make_test();
    700         auto w = sc.create_worker();
    701         const rxsc::test::messages<int> on;
    702 
    703         std::runtime_error ex("zip on_error from source");
    704 
    705         auto err = sc.make_hot_observable({
    706             on.next(150, 1),
    707             on.error(220, ex)
    708         });
    709 
    710         auto emp = sc.make_hot_observable({
    711             on.next(150, 1),
    712             on.completed(230)
    713         });
    714 
    715         WHEN("each int is combined with the latest from the other source"){
    716 
    717             auto res = w.start(
    718                 [&]() {
    719                     return err
    720                         .zip(
    721                             [](int v2, int v1){
    722                                 return v2 + v1;
    723                             },
    724                             emp
    725                         )
    726                         // forget type to workaround lambda deduction bug on msvc 2013
    727                         .as_dynamic();
    728                 }
    729             );
    730 
    731             THEN("the output contains only error message"){
    732                 auto required = rxu::to_vector({
    733                     on.error(220, ex)
    734                 });
    735                 auto actual = res.get_observer().messages();
    736                 REQUIRE(required == actual);
    737             }
    738 
    739             THEN("there was one subscription and one unsubscription to the emp"){
    740                 auto required = rxu::to_vector({
    741                     on.subscribe(200, 220)
    742                 });
    743                 auto actual = emp.subscriptions();
    744                 REQUIRE(required == actual);
    745             }
    746 
    747             THEN("there was one subscription and one unsubscription to the err"){
    748                 auto required = rxu::to_vector({
    749                     on.subscribe(200, 220)
    750                 });
    751                 auto actual = err.subscriptions();
    752                 REQUIRE(required == actual);
    753             }
    754         }
    755     }
    756 }
    757 
    758 SCENARIO("zip never/error", "[zip][join][operators]"){
    759     GIVEN("2 hot observables of ints."){
    760         auto sc = rxsc::make_test();
    761         auto w = sc.create_worker();
    762         const rxsc::test::messages<int> on;
    763 
    764         std::runtime_error ex("zip on_error from source");
    765 
    766         auto n = sc.make_hot_observable({
    767             on.next(150, 1)
    768         });
    769 
    770         auto err = sc.make_hot_observable({
    771             on.next(150, 1),
    772             on.error(220, ex)
    773         });
    774 
    775         WHEN("each int is combined with the latest from the other source"){
    776 
    777             auto res = w.start(
    778                 [&]() {
    779                     return n
    780                         .zip(
    781                             [](int v2, int v1){
    782                                 return v2 + v1;
    783                             },
    784                             err
    785                         )
    786                         // forget type to workaround lambda deduction bug on msvc 2013
    787                         .as_dynamic();
    788                 }
    789             );
    790 
    791             THEN("the output contains only error message"){
    792                 auto required = rxu::to_vector({
    793                     on.error(220, ex)
    794                 });
    795                 auto actual = res.get_observer().messages();
    796                 REQUIRE(required == actual);
    797             }
    798 
    799             THEN("there was one subscription and one unsubscription to the n"){
    800                 auto required = rxu::to_vector({
    801                     on.subscribe(200, 220)
    802                 });
    803                 auto actual = n.subscriptions();
    804                 REQUIRE(required == actual);
    805             }
    806 
    807             THEN("there was one subscription and one unsubscription to the err"){
    808                 auto required = rxu::to_vector({
    809                     on.subscribe(200, 220)
    810                 });
    811                 auto actual = err.subscriptions();
    812                 REQUIRE(required == actual);
    813             }
    814         }
    815     }
    816 }
    817 
    818 SCENARIO("zip error/never", "[zip][join][operators]"){
    819     GIVEN("2 hot observables of ints."){
    820         auto sc = rxsc::make_test();
    821         auto w = sc.create_worker();
    822         const rxsc::test::messages<int> on;
    823 
    824         std::runtime_error ex("zip on_error from source");
    825 
    826         auto err = sc.make_hot_observable({
    827             on.next(150, 1),
    828             on.error(220, ex)
    829         });
    830 
    831         auto n = sc.make_hot_observable({
    832             on.next(150, 1)
    833         });
    834 
    835         WHEN("each int is combined with the latest from the other source"){
    836 
    837             auto res = w.start(
    838                 [&]() {
    839                     return err
    840                         .zip(
    841                             [](int v2, int v1){
    842                                 return v2 + v1;
    843                             },
    844                             n
    845                         )
    846                         // forget type to workaround lambda deduction bug on msvc 2013
    847                         .as_dynamic();
    848                 }
    849             );
    850 
    851             THEN("the output contains only error message"){
    852                 auto required = rxu::to_vector({
    853                     on.error(220, ex)
    854                 });
    855                 auto actual = res.get_observer().messages();
    856                 REQUIRE(required == actual);
    857             }
    858 
    859             THEN("there was one subscription and one unsubscription to the n"){
    860                 auto required = rxu::to_vector({
    861                     on.subscribe(200, 220)
    862                 });
    863                 auto actual = n.subscriptions();
    864                 REQUIRE(required == actual);
    865             }
    866 
    867             THEN("there was one subscription and one unsubscription to the err"){
    868                 auto required = rxu::to_vector({
    869                     on.subscribe(200, 220)
    870                 });
    871                 auto actual = err.subscriptions();
    872                 REQUIRE(required == actual);
    873             }
    874         }
    875     }
    876 }
    877 
    878 SCENARIO("zip error/error", "[zip][join][operators]"){
    879     GIVEN("2 hot observables of ints."){
    880         auto sc = rxsc::make_test();
    881         auto w = sc.create_worker();
    882         const rxsc::test::messages<int> on;
    883 
    884         std::runtime_error ex1("zip on_error from source 1");
    885         std::runtime_error ex2("zip on_error from source 2");
    886 
    887         auto err1 = sc.make_hot_observable({
    888             on.next(150, 1),
    889             on.error(220, ex1)
    890         });
    891 
    892         auto err2 = sc.make_hot_observable({
    893             on.next(150, 1),
    894             on.error(230, ex2)
    895         });
    896 
    897         WHEN("each int is combined with the latest from the other source"){
    898 
    899             auto res = w.start(
    900                 [&]() {
    901                     return err1
    902                         .zip(
    903                             [](int v2, int v1){
    904                                 return v2 + v1;
    905                             },
    906                             err2
    907                         )
    908                         // forget type to workaround lambda deduction bug on msvc 2013
    909                         .as_dynamic();
    910                 }
    911             );
    912 
    913             THEN("the output contains only error message"){
    914                 auto required = rxu::to_vector({
    915                     on.error(220, ex1)
    916                 });
    917                 auto actual = res.get_observer().messages();
    918                 REQUIRE(required == actual);
    919             }
    920 
    921             THEN("there was one subscription and one unsubscription to the err1"){
    922                 auto required = rxu::to_vector({
    923                     on.subscribe(200, 220)
    924                 });
    925                 auto actual = err1.subscriptions();
    926                 REQUIRE(required == actual);
    927             }
    928 
    929             THEN("there was one subscription and one unsubscription to the err2"){
    930                 auto required = rxu::to_vector({
    931                     on.subscribe(200, 220)
    932                 });
    933                 auto actual = err2.subscriptions();
    934                 REQUIRE(required == actual);
    935             }
    936         }
    937     }
    938 }
    939 
    940 SCENARIO("zip return/error", "[zip][join][operators]"){
    941     GIVEN("2 hot observables of ints."){
    942         auto sc = rxsc::make_test();
    943         auto w = sc.create_worker();
    944         const rxsc::test::messages<int> on;
    945 
    946         std::runtime_error ex("zip on_error from source");
    947 
    948         auto o = sc.make_hot_observable({
    949             on.next(150, 1),
    950             on.next(210, 2),
    951             on.completed(230)
    952         });
    953 
    954         auto err = sc.make_hot_observable({
    955             on.next(150, 1),
    956             on.error(220, ex)
    957         });
    958 
    959         WHEN("each int is combined with the latest from the other source"){
    960 
    961             auto res = w.start(
    962                 [&]() {
    963                     return o
    964                         .zip(
    965                             [](int v2, int v1){
    966                                 return v2 + v1;
    967                             },
    968                             err
    969                         )
    970                         // forget type to workaround lambda deduction bug on msvc 2013
    971                         .as_dynamic();
    972                 }
    973             );
    974 
    975             THEN("the output contains only error message"){
    976                 auto required = rxu::to_vector({
    977                     on.error(220, ex)
    978                 });
    979                 auto actual = res.get_observer().messages();
    980                 REQUIRE(required == actual);
    981             }
    982 
    983             THEN("there was one subscription and one unsubscription to the ret"){
    984                 auto required = rxu::to_vector({
    985                     on.subscribe(200, 220)
    986                 });
    987                 auto actual = o.subscriptions();
    988                 REQUIRE(required == actual);
    989             }
    990 
    991             THEN("there was one subscription and one unsubscription to the err"){
    992                 auto required = rxu::to_vector({
    993                     on.subscribe(200, 220)
    994                 });
    995                 auto actual = err.subscriptions();
    996                 REQUIRE(required == actual);
    997             }
    998         }
    999     }
   1000 }
   1001 
   1002 SCENARIO("zip error/return", "[zip][join][operators]"){
   1003     GIVEN("2 hot observables of ints."){
   1004         auto sc = rxsc::make_test();
   1005         auto w = sc.create_worker();
   1006         const rxsc::test::messages<int> on;
   1007 
   1008         std::runtime_error ex("zip on_error from source");
   1009 
   1010         auto err = sc.make_hot_observable({
   1011             on.next(150, 1),
   1012             on.error(220, ex)
   1013         });
   1014 
   1015         auto ret = sc.make_hot_observable({
   1016             on.next(150, 1),
   1017             on.next(210, 2),
   1018             on.completed(230)
   1019         });
   1020 
   1021         WHEN("each int is combined with the latest from the other source"){
   1022 
   1023             auto res = w.start(
   1024                 [&]() {
   1025                     return err
   1026                         .zip(
   1027                             [](int v2, int v1){
   1028                                 return v2 + v1;
   1029                             },
   1030                             ret
   1031                         )
   1032                         // forget type to workaround lambda deduction bug on msvc 2013
   1033                         .as_dynamic();
   1034                 }
   1035             );
   1036 
   1037             THEN("the output contains only error message"){
   1038                 auto required = rxu::to_vector({
   1039                     on.error(220, ex)
   1040                 });
   1041                 auto actual = res.get_observer().messages();
   1042                 REQUIRE(required == actual);
   1043             }
   1044 
   1045             THEN("there was one subscription and one unsubscription to the ret"){
   1046                 auto required = rxu::to_vector({
   1047                     on.subscribe(200, 220)
   1048                 });
   1049                 auto actual = ret.subscriptions();
   1050                 REQUIRE(required == actual);
   1051             }
   1052 
   1053             THEN("there was one subscription and one unsubscription to the err"){
   1054                 auto required = rxu::to_vector({
   1055                     on.subscribe(200, 220)
   1056                 });
   1057                 auto actual = err.subscriptions();
   1058                 REQUIRE(required == actual);
   1059             }
   1060         }
   1061     }
   1062 }
   1063 
   1064 SCENARIO("zip left completes first", "[zip][join][operators]"){
   1065     GIVEN("2 hot observables of ints."){
   1066         auto sc = rxsc::make_test();
   1067         auto w = sc.create_worker();
   1068         const rxsc::test::messages<int> on;
   1069 
   1070         auto o1 = sc.make_hot_observable({
   1071             on.next(150, 1),
   1072             on.next(210, 2),
   1073             on.completed(220)
   1074         });
   1075 
   1076         auto o2 = sc.make_hot_observable({
   1077             on.next(150, 1),
   1078             on.next(215, 4),
   1079             on.completed(225)
   1080         });
   1081 
   1082         WHEN("each int is combined with the latest from the other source"){
   1083 
   1084             auto res = w.start(
   1085                 [&]() {
   1086                     return o2
   1087                         .zip(
   1088                             [](int v2, int v1){
   1089                                 return v2 + v1;
   1090                             },
   1091                             o1
   1092                         )
   1093                         // forget type to workaround lambda deduction bug on msvc 2013
   1094                         .as_dynamic();
   1095                 }
   1096             );
   1097 
   1098             THEN("the output contains combined ints"){
   1099                 auto required = rxu::to_vector({
   1100                     on.next(215, 2 + 4),
   1101                     on.completed(225)
   1102                 });
   1103                 auto actual = res.get_observer().messages();
   1104                 REQUIRE(required == actual);
   1105             }
   1106 
   1107             THEN("there was one subscription and one unsubscription to the o1"){
   1108                 auto required = rxu::to_vector({
   1109                     on.subscribe(200, 220)
   1110                 });
   1111                 auto actual = o1.subscriptions();
   1112                 REQUIRE(required == actual);
   1113             }
   1114 
   1115             THEN("there was one subscription and one unsubscription to the o2"){
   1116                 auto required = rxu::to_vector({
   1117                     on.subscribe(200, 225)
   1118                 });
   1119                 auto actual = o2.subscriptions();
   1120                 REQUIRE(required == actual);
   1121             }
   1122         }
   1123     }
   1124 }
   1125 
   1126 SCENARIO("zip right completes first", "[zip][join][operators]"){
   1127     GIVEN("2 hot observables of ints."){
   1128         auto sc = rxsc::make_test();
   1129         auto w = sc.create_worker();
   1130         const rxsc::test::messages<int> on;
   1131 
   1132         auto o1 = sc.make_hot_observable({
   1133             on.next(150, 1),
   1134             on.next(215, 4),
   1135             on.completed(225)
   1136         });
   1137 
   1138         auto o2 = sc.make_hot_observable({
   1139             on.next(150, 1),
   1140             on.next(210, 2),
   1141             on.completed(220)
   1142         });
   1143 
   1144         WHEN("each int is combined with the latest from the other source"){
   1145 
   1146             auto res = w.start(
   1147                 [&]() {
   1148                     return o2
   1149                         .zip(
   1150                             [](int v2, int v1){
   1151                                 return v2 + v1;
   1152                             },
   1153                             o1
   1154                         )
   1155                         // forget type to workaround lambda deduction bug on msvc 2013
   1156                         .as_dynamic();
   1157                 }
   1158             );
   1159 
   1160             THEN("the output contains combined ints"){
   1161                 auto required = rxu::to_vector({
   1162                     on.next(215, 2 + 4),
   1163                     on.completed(225)
   1164                 });
   1165                 auto actual = res.get_observer().messages();
   1166                 REQUIRE(required == actual);
   1167             }
   1168 
   1169             THEN("there was one subscription and one unsubscription to the o1"){
   1170                 auto required = rxu::to_vector({
   1171                     on.subscribe(200, 225)
   1172                 });
   1173                 auto actual = o1.subscriptions();
   1174                 REQUIRE(required == actual);
   1175             }
   1176 
   1177             THEN("there was one subscription and one unsubscription to the o2"){
   1178                 auto required = rxu::to_vector({
   1179                     on.subscribe(200, 220)
   1180                 });
   1181                 auto actual = o2.subscriptions();
   1182                 REQUIRE(required == actual);
   1183             }
   1184         }
   1185     }
   1186 }
   1187 
   1188 SCENARIO("zip selector throws", "[zip][join][operators][!throws]"){
   1189     GIVEN("2 hot observables of ints."){
   1190         auto sc = rxsc::make_test();
   1191         auto w = sc.create_worker();
   1192         const rxsc::test::messages<int> on;
   1193 
   1194         std::runtime_error ex("zip on_error from source");
   1195 
   1196         auto o1 = sc.make_hot_observable({
   1197             on.next(150, 1),
   1198             on.next(215, 2),
   1199             on.completed(230)
   1200         });
   1201 
   1202         auto o2 = sc.make_hot_observable({
   1203             on.next(150, 1),
   1204             on.next(220, 3),
   1205             on.completed(240)
   1206         });
   1207 
   1208         WHEN("each int is combined with the latest from the other source"){
   1209 
   1210             auto res = w.start(
   1211                 [&]() {
   1212                     return o1
   1213                         .zip(
   1214                             [&ex](int, int) -> int {
   1215                                 rxu::throw_exception(ex);
   1216                             },
   1217                             o2
   1218                         )
   1219                         // forget type to workaround lambda deduction bug on msvc 2013
   1220                         .as_dynamic();
   1221                 }
   1222             );
   1223 
   1224             THEN("the output contains only error"){
   1225                 auto required = rxu::to_vector({
   1226                     on.error(220, ex)
   1227                 });
   1228                 auto actual = res.get_observer().messages();
   1229                 REQUIRE(required == actual);
   1230             }
   1231 
   1232             THEN("there was one subscription and one unsubscription to the o1"){
   1233                 auto required = rxu::to_vector({
   1234                     on.subscribe(200, 220)
   1235                 });
   1236                 auto actual = o1.subscriptions();
   1237                 REQUIRE(required == actual);
   1238             }
   1239 
   1240             THEN("there was one subscription and one unsubscription to the o2"){
   1241                 auto required = rxu::to_vector({
   1242                     on.subscribe(200, 220)
   1243                 });
   1244                 auto actual = o2.subscriptions();
   1245                 REQUIRE(required == actual);
   1246             }
   1247         }
   1248     }
   1249 }
   1250 
   1251 SCENARIO("zip selector throws N", "[zip][join][operators][!throws]"){
   1252     GIVEN("N hot observables of ints."){
   1253         auto sc = rxsc::make_test();
   1254         auto w = sc.create_worker();
   1255         const rxsc::test::messages<int> on;
   1256 
   1257         const int N = 4;
   1258 
   1259         std::runtime_error ex("zip on_error from source");
   1260 
   1261         std::vector<rxcpp::test::testable_observable<int>> e;
   1262         for (int i = 0; i < N; ++i) {
   1263             e.push_back(
   1264                 sc.make_hot_observable({
   1265                     on.next(210 + 10 * i, 1),
   1266                     on.completed(500)
   1267                 })
   1268             );
   1269         }
   1270 
   1271         WHEN("each int is combined with the latest from the other source"){
   1272 
   1273             auto res = w.start(
   1274                 [&]() {
   1275                     return e[0]
   1276                         .zip(
   1277                             [&ex](int, int, int, int) -> int {
   1278                                 rxu::throw_exception(ex);
   1279                             },
   1280                             e[1], e[2], e[3]
   1281                         )
   1282                         // forget type to workaround lambda deduction bug on msvc 2013
   1283                         .as_dynamic();
   1284                 }
   1285             );
   1286 
   1287             THEN("the output contains only error"){
   1288                 auto required = rxu::to_vector({
   1289                     on.error(200 + 10 * N, ex)
   1290                 });
   1291                 auto actual = res.get_observer().messages();
   1292                 REQUIRE(required == actual);
   1293             }
   1294 
   1295             THEN("there was one subscription and one unsubscription to each observable"){
   1296 
   1297                 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){
   1298                     auto required = rxu::to_vector({
   1299                         on.subscribe(200, 200 + 10 * N)
   1300                     });
   1301                     auto actual = s.subscriptions();
   1302                     REQUIRE(required == actual);
   1303                 });
   1304             }
   1305         }
   1306     }
   1307 }
   1308 
   1309 SCENARIO("zip typical N", "[zip][join][operators]"){
   1310     GIVEN("N hot observables of ints."){
   1311         auto sc = rxsc::make_test();
   1312         auto w = sc.create_worker();
   1313         const rxsc::test::messages<int> on;
   1314 
   1315         const int N = 4;
   1316 
   1317         std::vector<rxcpp::test::testable_observable<int>> o;
   1318         for (int i = 0; i < N; ++i) {
   1319             o.push_back(
   1320                 sc.make_hot_observable({
   1321                     on.next(150, 1),
   1322                     on.next(210 + 10 * i, i + 1),
   1323                     on.next(410 + 10 * i, i + N + 1),
   1324                     on.completed(800)
   1325                 })
   1326             );
   1327         }
   1328 
   1329         WHEN("each int is combined with the latest from the other source"){
   1330 
   1331             auto res = w.start(
   1332                 [&]() {
   1333                     return o[0]
   1334                         .zip(
   1335                             [](int v0, int v1, int v2, int v3) {
   1336                                 return v0 + v1 + v2 + v3;
   1337                             },
   1338                             o[1], o[2], o[3]
   1339                         )
   1340                         // forget type to workaround lambda deduction bug on msvc 2013
   1341                         .as_dynamic();
   1342                 }
   1343             );
   1344 
   1345             THEN("the output contains combined ints"){
   1346                 auto required = rxu::to_vector({
   1347                     on.next(200 + 10 * N, N * (N + 1) / 2),
   1348                     on.next(400 + 10 * N, N * (3 * N + 1) / 2)
   1349                 });
   1350                 required.push_back(on.completed(800));
   1351                 auto actual = res.get_observer().messages();
   1352                 REQUIRE(required == actual);
   1353             }
   1354 
   1355             THEN("there was one subscription and one unsubscription to each observable"){
   1356 
   1357                 std::for_each(o.begin(), o.end(), [&](rxcpp::test::testable_observable<int> &s){
   1358                     auto required = rxu::to_vector({
   1359                         on.subscribe(200, 800)
   1360                     });
   1361                     auto actual = s.subscriptions();
   1362                     REQUIRE(required == actual);
   1363                 });
   1364             }
   1365         }
   1366     }
   1367 }
   1368 
   1369 SCENARIO("zip interleaved with tail", "[zip][join][operators]"){
   1370     GIVEN("2 hot observables of ints."){
   1371         auto sc = rxsc::make_test();
   1372         auto w = sc.create_worker();
   1373         const rxsc::test::messages<int> on;
   1374 
   1375         auto o1 = sc.make_hot_observable({
   1376             on.next(150, 1),
   1377             on.next(215, 2),
   1378             on.next(225, 4),
   1379             on.completed(230)
   1380         });
   1381 
   1382         auto o2 = sc.make_hot_observable({
   1383             on.next(150, 1),
   1384             on.next(220, 3),
   1385             on.next(230, 5),
   1386             on.next(235, 6),
   1387             on.next(240, 7),
   1388             on.completed(250)
   1389         });
   1390 
   1391         WHEN("each int is combined with the latest from the other source"){
   1392 
   1393             auto res = w.start(
   1394                 [&]() {
   1395                     return o2
   1396                         .zip(
   1397                             [](int v2, int v1){
   1398                                 return v2 + v1;
   1399                             },
   1400                             o1
   1401                         )
   1402                         // forget type to workaround lambda deduction bug on msvc 2013
   1403                         .as_dynamic();
   1404                 }
   1405             );
   1406 
   1407             THEN("the output contains combined ints"){
   1408                 auto required = rxu::to_vector({
   1409                     on.next(220, 2 + 3),
   1410                     on.next(230, 4 + 5),
   1411                     on.completed(230)
   1412                 });
   1413                 auto actual = res.get_observer().messages();
   1414                 REQUIRE(required == actual);
   1415             }
   1416 
   1417             THEN("there was one subscription and one unsubscription to the o1"){
   1418                 auto required = rxu::to_vector({
   1419                     on.subscribe(200, 230)
   1420                 });
   1421                 auto actual = o1.subscriptions();
   1422                 REQUIRE(required == actual);
   1423             }
   1424 
   1425             THEN("there was one subscription and one unsubscription to the o2"){
   1426                 auto required = rxu::to_vector({
   1427                     on.subscribe(200, 230)
   1428                 });
   1429                 auto actual = o2.subscriptions();
   1430                 REQUIRE(required == actual);
   1431             }
   1432         }
   1433     }
   1434 }
   1435 
   1436 SCENARIO("zip consecutive", "[zip][join][operators]"){
   1437     GIVEN("2 hot observables of ints."){
   1438         auto sc = rxsc::make_test();
   1439         auto w = sc.create_worker();
   1440         const rxsc::test::messages<int> on;
   1441 
   1442         auto o1 = sc.make_hot_observable({
   1443             on.next(150, 1),
   1444             on.next(215, 2),
   1445             on.next(225, 4),
   1446             on.completed(230)
   1447         });
   1448 
   1449         auto o2 = sc.make_hot_observable({
   1450             on.next(150, 1),
   1451             on.next(235, 6),
   1452             on.next(240, 7),
   1453             on.completed(250)
   1454         });
   1455 
   1456         WHEN("each int is combined with the latest from the other source"){
   1457 
   1458             auto res = w.start(
   1459                 [&]() {
   1460                     return o2
   1461                         .zip(
   1462                             [](int v2, int v1){
   1463                                 return v2 + v1;
   1464                             },
   1465                             o1
   1466                         )
   1467                         // forget type to workaround lambda deduction bug on msvc 2013
   1468                         .as_dynamic();
   1469                 }
   1470             );
   1471 
   1472             THEN("the output contains combined ints"){
   1473                 auto required = rxu::to_vector({
   1474                     on.next(235, 2 + 6),
   1475                     on.next(240, 4 + 7),
   1476                     on.completed(240)
   1477                 });
   1478                 auto actual = res.get_observer().messages();
   1479                 REQUIRE(required == actual);
   1480             }
   1481 
   1482             THEN("there was one subscription and one unsubscription to the o1"){
   1483                 auto required = rxu::to_vector({
   1484                     on.subscribe(200, 230)
   1485                 });
   1486                 auto actual = o1.subscriptions();
   1487                 REQUIRE(required == actual);
   1488             }
   1489 
   1490             THEN("there was one subscription and one unsubscription to the o2"){
   1491                 auto required = rxu::to_vector({
   1492                     on.subscribe(200, 240)
   1493                 });
   1494                 auto actual = o2.subscriptions();
   1495                 REQUIRE(required == actual);
   1496             }
   1497         }
   1498     }
   1499 }
   1500 
   1501 SCENARIO("zip consecutive ends with error left", "[zip][join][operators]"){
   1502     GIVEN("2 hot observables of ints."){
   1503         auto sc = rxsc::make_test();
   1504         auto w = sc.create_worker();
   1505         const rxsc::test::messages<int> on;
   1506 
   1507         std::runtime_error ex("zip on_error from source");
   1508 
   1509         auto o1 = sc.make_hot_observable({
   1510             on.next(150, 1),
   1511             on.next(215, 2),
   1512             on.next(225, 4),
   1513             on.error(230, ex)
   1514         });
   1515 
   1516         auto o2 = sc.make_hot_observable({
   1517             on.next(150, 1),
   1518             on.next(235, 6),
   1519             on.next(240, 7),
   1520             on.completed(250)
   1521         });
   1522 
   1523         WHEN("each int is combined with the latest from the other source"){
   1524 
   1525             auto res = w.start(
   1526                 [&]() {
   1527                     return o2
   1528                         .zip(
   1529                             [](int v2, int v1){
   1530                                 return v2 + v1;
   1531                             },
   1532                             o1
   1533                         )
   1534                         // forget type to workaround lambda deduction bug on msvc 2013
   1535                         .as_dynamic();
   1536                 }
   1537             );
   1538 
   1539             THEN("the output contains only an error"){
   1540                 auto required = rxu::to_vector({
   1541                     on.error(230, ex)
   1542                 });
   1543                 auto actual = res.get_observer().messages();
   1544                 REQUIRE(required == actual);
   1545             }
   1546 
   1547             THEN("there was one subscription and one unsubscription to the o1"){
   1548                 auto required = rxu::to_vector({
   1549                     on.subscribe(200, 230)
   1550                 });
   1551                 auto actual = o1.subscriptions();
   1552                 REQUIRE(required == actual);
   1553             }
   1554 
   1555             THEN("there was one subscription and one unsubscription to the o2"){
   1556                 auto required = rxu::to_vector({
   1557                     on.subscribe(200, 230)
   1558                 });
   1559                 auto actual = o2.subscriptions();
   1560                 REQUIRE(required == actual);
   1561             }
   1562         }
   1563     }
   1564 }
   1565 
   1566 SCENARIO("zip consecutive ends with error right", "[zip][join][operators]"){
   1567     GIVEN("2 hot observables of ints."){
   1568         auto sc = rxsc::make_test();
   1569         auto w = sc.create_worker();
   1570         const rxsc::test::messages<int> on;
   1571 
   1572         std::runtime_error ex("zip on_error from source");
   1573 
   1574         auto o1 = sc.make_hot_observable({
   1575             on.next(150, 1),
   1576             on.next(215, 2),
   1577             on.next(225, 4),
   1578             on.completed(250)
   1579         });
   1580 
   1581         auto o2 = sc.make_hot_observable({
   1582             on.next(150, 1),
   1583             on.next(235, 6),
   1584             on.next(240, 7),
   1585             on.error(245, ex)
   1586         });
   1587 
   1588         WHEN("each int is combined with the latest from the other source"){
   1589 
   1590             auto res = w.start(
   1591                 [&]() {
   1592                     return o2
   1593                         .zip(
   1594                             [](int v2, int v1){
   1595                                 return v2 + v1;
   1596                             },
   1597                             o1
   1598                         )
   1599                         // forget type to workaround lambda deduction bug on msvc 2013
   1600                         .as_dynamic();
   1601                 }
   1602             );
   1603 
   1604             THEN("the output contains combined ints followed by an error"){
   1605                 auto required = rxu::to_vector({
   1606                     on.next(235, 2 + 6),
   1607                     on.next(240, 4 + 7),
   1608                     on.error(245, ex)
   1609                 });
   1610                 auto actual = res.get_observer().messages();
   1611                 REQUIRE(required == actual);
   1612             }
   1613 
   1614             THEN("there was one subscription and one unsubscription to the o1"){
   1615                 auto required = rxu::to_vector({
   1616                     on.subscribe(200, 245)
   1617                 });
   1618                 auto actual = o1.subscriptions();
   1619                 REQUIRE(required == actual);
   1620             }
   1621 
   1622             THEN("there was one subscription and one unsubscription to the o2"){
   1623                 auto required = rxu::to_vector({
   1624                     on.subscribe(200, 245)
   1625                 });
   1626                 auto actual = o2.subscriptions();
   1627                 REQUIRE(required == actual);
   1628             }
   1629         }
   1630     }
   1631 }
   1632 
   1633 SCENARIO("zip next+error/error", "[zip][join][operators]"){
   1634     GIVEN("2 hot observables of ints."){
   1635         auto sc = rxsc::make_test();
   1636         auto w = sc.create_worker();
   1637         const rxsc::test::messages<int> on;
   1638 
   1639         std::runtime_error ex1("zip on_error from source 1");
   1640         std::runtime_error ex2("zip on_error from source 2");
   1641 
   1642         auto err1 = sc.make_hot_observable({
   1643             on.next(150, 1),
   1644             on.next(210, 2),
   1645             on.error(220, ex1)
   1646         });
   1647 
   1648         auto err2 = sc.make_hot_observable({
   1649             on.next(150, 1),
   1650             on.error(230, ex2)
   1651         });
   1652 
   1653         WHEN("each int is combined with the latest from the other source"){
   1654 
   1655             auto res = w.start(
   1656                 [&]() {
   1657                     return err1
   1658                         .zip(
   1659                             [](int v2, int v1){
   1660                                 return v2 + v1;
   1661                             },
   1662                             err2
   1663                         )
   1664                         // forget type to workaround lambda deduction bug on msvc 2013
   1665                         .as_dynamic();
   1666                 }
   1667             );
   1668 
   1669             THEN("the output contains only error message"){
   1670                 auto required = rxu::to_vector({
   1671                     on.error(220, ex1)
   1672                 });
   1673                 auto actual = res.get_observer().messages();
   1674                 REQUIRE(required == actual);
   1675             }
   1676 
   1677             THEN("there was one subscription and one unsubscription to the err1"){
   1678                 auto required = rxu::to_vector({
   1679                     on.subscribe(200, 220)
   1680                 });
   1681                 auto actual = err1.subscriptions();
   1682                 REQUIRE(required == actual);
   1683             }
   1684 
   1685             THEN("there was one subscription and one unsubscription to the err2"){
   1686                 auto required = rxu::to_vector({
   1687                     on.subscribe(200, 220)
   1688                 });
   1689                 auto actual = err2.subscriptions();
   1690                 REQUIRE(required == actual);
   1691             }
   1692         }
   1693     }
   1694 }
   1695 
   1696 SCENARIO("zip error/next+error", "[zip][join][operators]"){
   1697     GIVEN("2 hot observables of ints."){
   1698         auto sc = rxsc::make_test();
   1699         auto w = sc.create_worker();
   1700         const rxsc::test::messages<int> on;
   1701 
   1702         std::runtime_error ex1("zip on_error from source 1");
   1703         std::runtime_error ex2("zip on_error from source 2");
   1704 
   1705         auto err1 = sc.make_hot_observable({
   1706             on.next(150, 1),
   1707             on.error(230, ex1)
   1708         });
   1709 
   1710         auto err2 = sc.make_hot_observable({
   1711             on.next(150, 1),
   1712             on.next(210, 2),
   1713             on.error(220, ex2)
   1714         });
   1715 
   1716         WHEN("each int is combined with the latest from the other source"){
   1717 
   1718             auto res = w.start(
   1719                 [&]() {
   1720                     return err1
   1721                         .zip(
   1722                             [](int v2, int v1){
   1723                                 return v2 + v1;
   1724                             },
   1725                             err2
   1726                         )
   1727                         // forget type to workaround lambda deduction bug on msvc 2013
   1728                         .as_dynamic();
   1729                 }
   1730             );
   1731 
   1732             THEN("the output contains only error message"){
   1733                 auto required = rxu::to_vector({
   1734                     on.error(220, ex2)
   1735                 });
   1736                 auto actual = res.get_observer().messages();
   1737                 REQUIRE(required == actual);
   1738             }
   1739 
   1740             THEN("there was one subscription and one unsubscription to the err1"){
   1741                 auto required = rxu::to_vector({
   1742                     on.subscribe(200, 220)
   1743                 });
   1744                 auto actual = err1.subscriptions();
   1745                 REQUIRE(required == actual);
   1746             }
   1747 
   1748             THEN("there was one subscription and one unsubscription to the err2"){
   1749                 auto required = rxu::to_vector({
   1750                     on.subscribe(200, 220)
   1751                 });
   1752                 auto actual = err2.subscriptions();
   1753                 REQUIRE(required == actual);
   1754             }
   1755         }
   1756     }
   1757 }
   1758 
   1759 SCENARIO("zip error after completed left", "[zip][join][operators]"){
   1760     GIVEN("2 hot observables of ints."){
   1761         auto sc = rxsc::make_test();
   1762         auto w = sc.create_worker();
   1763         const rxsc::test::messages<int> on;
   1764 
   1765         std::runtime_error ex("zip on_error from source");
   1766 
   1767         auto ret = sc.make_hot_observable({
   1768             on.next(150, 1),
   1769             on.next(210, 2),
   1770             on.completed(215)
   1771         });
   1772 
   1773         auto err = sc.make_hot_observable({
   1774             on.next(150, 1),
   1775             on.error(220, ex)
   1776         });
   1777 
   1778         WHEN("each int is combined with the latest from the other source"){
   1779 
   1780             auto res = w.start(
   1781                 [&]() {
   1782                     return ret
   1783                         .zip(
   1784                             [](int v2, int v1){
   1785                                 return v2 + v1;
   1786                             },
   1787                             err
   1788                         )
   1789                         // forget type to workaround lambda deduction bug on msvc 2013
   1790                         .as_dynamic();
   1791                 }
   1792             );
   1793 
   1794             THEN("the output contains only error message"){
   1795                 auto required = rxu::to_vector({
   1796                     on.error(220, ex)
   1797                 });
   1798                 auto actual = res.get_observer().messages();
   1799                 REQUIRE(required == actual);
   1800             }
   1801 
   1802             THEN("there was one subscription and one unsubscription to the ret"){
   1803                 auto required = rxu::to_vector({
   1804                     on.subscribe(200, 215)
   1805                 });
   1806                 auto actual = ret.subscriptions();
   1807                 REQUIRE(required == actual);
   1808             }
   1809 
   1810             THEN("there was one subscription and one unsubscription to the err"){
   1811                 auto required = rxu::to_vector({
   1812                     on.subscribe(200, 220)
   1813                 });
   1814                 auto actual = err.subscriptions();
   1815                 REQUIRE(required == actual);
   1816             }
   1817         }
   1818     }
   1819 }
   1820 
   1821 SCENARIO("zip error after completed right", "[zip][join][operators]"){
   1822     GIVEN("2 hot observables of ints."){
   1823         auto sc = rxsc::make_test();
   1824         auto w = sc.create_worker();
   1825         const rxsc::test::messages<int> on;
   1826 
   1827         std::runtime_error ex("zip on_error from source");
   1828 
   1829         auto err = sc.make_hot_observable({
   1830             on.next(150, 1),
   1831             on.error(220, ex)
   1832         });
   1833 
   1834         auto ret = sc.make_hot_observable({
   1835             on.next(150, 1),
   1836             on.next(210, 2),
   1837             on.completed(215)
   1838         });
   1839 
   1840         WHEN("each int is combined with the latest from the other source"){
   1841 
   1842             auto res = w.start(
   1843                 [&]() {
   1844                     return err
   1845                         .zip(
   1846                             [](int v2, int v1){
   1847                                 return v2 + v1;
   1848                             },
   1849                             ret
   1850                         )
   1851                         // forget type to workaround lambda deduction bug on msvc 2013
   1852                         .as_dynamic();
   1853                 }
   1854             );
   1855 
   1856             THEN("the output contains only error message"){
   1857                 auto required = rxu::to_vector({
   1858                     on.error(220, ex)
   1859                 });
   1860                 auto actual = res.get_observer().messages();
   1861                 REQUIRE(required == actual);
   1862             }
   1863 
   1864             THEN("there was one subscription and one unsubscription to the ret"){
   1865                 auto required = rxu::to_vector({
   1866                     on.subscribe(200, 215)
   1867                 });
   1868                 auto actual = ret.subscriptions();
   1869                 REQUIRE(required == actual);
   1870             }
   1871 
   1872             THEN("there was one subscription and one unsubscription to the err"){
   1873                 auto required = rxu::to_vector({
   1874                     on.subscribe(200, 220)
   1875                 });
   1876                 auto actual = err.subscriptions();
   1877                 REQUIRE(required == actual);
   1878             }
   1879         }
   1880     }
   1881 }
   1882