Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include "rxcpp/operators/rx-amb.hpp"
      3 
      4 SCENARIO("variadic amb never 3", "[amb][join][operators]"){
      5     GIVEN("3 hot observables of ints."){
      6         auto sc = rxsc::make_test();
      7         auto w = sc.create_worker();
      8         const rxsc::test::messages<int> on;
      9 
     10         auto ys1 = sc.make_hot_observable({
     11             on.next(100, 1)
     12         });
     13 
     14         auto ys2 = sc.make_hot_observable({
     15             on.next(110, 2)
     16         });
     17 
     18         auto ys3 = sc.make_hot_observable({
     19             on.next(120, 3)
     20         });
     21 
     22         WHEN("the first observable is selected to produce ints"){
     23 
     24             auto res = w.start(
     25                 [&]() {
     26                     return ys1
     27                         | rxo::amb(ys2, ys3)
     28                         // forget type to workaround lambda deduction bug on msvc 2013
     29                         | rxo::as_dynamic();
     30                 }
     31             );
     32 
     33             THEN("the output is empty"){
     34                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
     35                 auto actual = res.get_observer().messages();
     36                 REQUIRE(required == actual);
     37             }
     38 
     39             THEN("there was one subscription and one unsubscription to the ys1"){
     40                 auto required = rxu::to_vector({
     41                     on.subscribe(200, 1000)
     42                 });
     43                 auto actual = ys1.subscriptions();
     44                 REQUIRE(required == actual);
     45             }
     46 
     47             THEN("there was one subscription and one unsubscription to the ys2"){
     48                 auto required = rxu::to_vector({
     49                     on.subscribe(200, 1000)
     50                 });
     51                 auto actual = ys2.subscriptions();
     52                 REQUIRE(required == actual);
     53             }
     54 
     55             THEN("there was one subscription and one unsubscription to the ys3"){
     56                 auto required = rxu::to_vector({
     57                     on.subscribe(200, 1000)
     58                 });
     59                 auto actual = ys3.subscriptions();
     60                 REQUIRE(required == actual);
     61             }
     62         }
     63     }
     64 }
     65 
     66 SCENARIO("variadic amb never empty", "[amb][join][operators]"){
     67     GIVEN("2 hot observables of ints."){
     68         auto sc = rxsc::make_test();
     69         auto w = sc.create_worker();
     70         const rxsc::test::messages<int> on;
     71 
     72         auto ys1 = sc.make_hot_observable({
     73             on.next(100, 1)
     74         });
     75 
     76         auto ys2 = sc.make_hot_observable({
     77             on.next(110, 2),
     78             on.completed(400)
     79         });
     80 
     81         WHEN("the first observable is selected to produce ints"){
     82 
     83             auto res = w.start(
     84                 [&]() {
     85                     return ys1
     86                         .amb(ys2)
     87                         // forget type to workaround lambda deduction bug on msvc 2013
     88                         .as_dynamic();
     89                 }
     90             );
     91 
     92             THEN("the output contains only complete message"){
     93                 auto required = rxu::to_vector({
     94                     on.completed(400)
     95                 });
     96                 auto actual = res.get_observer().messages();
     97                 REQUIRE(required == actual);
     98             }
     99 
    100             THEN("there was one subscription and one unsubscription to the ys1"){
    101                 auto required = rxu::to_vector({
    102                     on.subscribe(200, 400)
    103                 });
    104                 auto actual = ys1.subscriptions();
    105                 REQUIRE(required == actual);
    106             }
    107 
    108             THEN("there was one subscription and one unsubscription to the ys2"){
    109                 auto required = rxu::to_vector({
    110                     on.subscribe(200, 400)
    111                 });
    112                 auto actual = ys2.subscriptions();
    113                 REQUIRE(required == actual);
    114             }
    115         }
    116     }
    117 }
    118 
    119 SCENARIO("variadic amb empty never", "[amb][join][operators]"){
    120     GIVEN("2 hot observables of ints."){
    121         auto sc = rxsc::make_test();
    122         auto w = sc.create_worker();
    123         const rxsc::test::messages<int> on;
    124 
    125         auto ys1 = sc.make_hot_observable({
    126             on.next(100, 1),
    127             on.completed(400)
    128         });
    129 
    130         auto ys2 = sc.make_hot_observable({
    131             on.next(110, 2)
    132         });
    133 
    134         WHEN("the first observable is selected to produce ints"){
    135 
    136             auto res = w.start(
    137                 [&]() {
    138                     return ys1
    139                         .amb(ys2)
    140                         // forget type to workaround lambda deduction bug on msvc 2013
    141                         .as_dynamic();
    142                 }
    143             );
    144 
    145             THEN("the output contains only complete message"){
    146                 auto required = rxu::to_vector({
    147                     on.completed(400)
    148                 });
    149                 auto actual = res.get_observer().messages();
    150                 REQUIRE(required == actual);
    151             }
    152 
    153             THEN("there was one subscription and one unsubscription to the ys1"){
    154                 auto required = rxu::to_vector({
    155                     on.subscribe(200, 400)
    156                 });
    157                 auto actual = ys1.subscriptions();
    158                 REQUIRE(required == actual);
    159             }
    160 
    161             THEN("there was one subscription and one unsubscription to the ys2"){
    162                 auto required = rxu::to_vector({
    163                     on.subscribe(200, 400)
    164                 });
    165                 auto actual = ys2.subscriptions();
    166                 REQUIRE(required == actual);
    167             }
    168         }
    169     }
    170 }
    171 
    172 SCENARIO("variadic amb completes", "[amb][join][operators]"){
    173     GIVEN("3 cold observables of ints."){
    174         auto sc = rxsc::make_test();
    175         auto w = sc.create_worker();
    176         const rxsc::test::messages<int> on;
    177 
    178         auto ys1 = sc.make_cold_observable({
    179             on.next(10, 101),
    180             on.next(110, 102),
    181             on.next(210, 103),
    182             on.completed(310)
    183         });
    184 
    185         auto ys2 = sc.make_cold_observable({
    186             on.next(20, 201),
    187             on.next(120, 202),
    188             on.next(220, 203),
    189             on.completed(320)
    190         });
    191 
    192         auto ys3 = sc.make_cold_observable({
    193             on.next(30, 301),
    194             on.next(130, 302),
    195             on.next(230, 303),
    196             on.completed(330)
    197         });
    198 
    199         WHEN("the first observable is selected to produce ints"){
    200 
    201             auto res = w.start(
    202                 [&]() {
    203                     return ys1
    204                         .amb(ys2, ys3)
    205                         // forget type to workaround lambda deduction bug on msvc 2013
    206                         .as_dynamic();
    207                 }
    208             );
    209 
    210             THEN("the output contains ints from the first observable"){
    211                 auto required = rxu::to_vector({
    212                     on.next(210, 101),
    213                     on.next(310, 102),
    214                     on.next(410, 103),
    215                     on.completed(510)
    216                 });
    217                 auto actual = res.get_observer().messages();
    218                 REQUIRE(required == actual);
    219             }
    220 
    221             THEN("there was one subscription and one unsubscription to the ys1"){
    222                 auto required = rxu::to_vector({
    223                     on.subscribe(200, 510)
    224                 });
    225                 auto actual = ys1.subscriptions();
    226                 REQUIRE(required == actual);
    227             }
    228 
    229             THEN("there was one subscription and one unsubscription to the ys2"){
    230                 auto required = rxu::to_vector({
    231                     on.subscribe(200, 210)
    232                 });
    233                 auto actual = ys2.subscriptions();
    234                 REQUIRE(required == actual);
    235             }
    236 
    237             THEN("there was one subscription and one unsubscription to the ys3"){
    238                 auto required = rxu::to_vector({
    239                     on.subscribe(200, 210)
    240                 });
    241                 auto actual = ys3.subscriptions();
    242                 REQUIRE(required == actual);
    243             }
    244         }
    245     }
    246 }
    247 
    248 SCENARIO("variadic amb winner&owner throws", "[amb][join][operators]"){
    249     GIVEN("3 cold observables of ints."){
    250         auto sc = rxsc::make_test();
    251         auto w = sc.create_worker();
    252         const rxsc::test::messages<int> on;
    253 
    254         std::runtime_error ex("amb on_error from source");
    255 
    256         auto ys1 = sc.make_cold_observable({
    257             on.next(10, 101),
    258             on.next(110, 102),
    259             on.next(210, 103),
    260             on.error(310, ex)
    261         });
    262 
    263         auto ys2 = sc.make_cold_observable({
    264             on.next(20, 201),
    265             on.next(120, 202),
    266             on.next(220, 203),
    267             on.completed(320)
    268         });
    269 
    270         auto ys3 = sc.make_cold_observable({
    271             on.next(30, 301),
    272             on.next(130, 302),
    273             on.next(230, 303),
    274             on.completed(330)
    275         });
    276 
    277         WHEN("the first observable is selected to produce ints"){
    278 
    279             auto res = w.start(
    280                 [&]() {
    281                     return ys1
    282                         .amb(ys2, ys3)
    283                         // forget type to workaround lambda deduction bug on msvc 2013
    284                         .as_dynamic();
    285                 }
    286             );
    287 
    288             THEN("the output contains ints from the first observable"){
    289                 auto required = rxu::to_vector({
    290                     on.next(210, 101),
    291                     on.next(310, 102),
    292                     on.next(410, 103),
    293                     on.error(510, ex)
    294                 });
    295                 auto actual = res.get_observer().messages();
    296                 REQUIRE(required == actual);
    297             }
    298 
    299             THEN("there was one subscription and one unsubscription to the ys1"){
    300                 auto required = rxu::to_vector({
    301                     on.subscribe(200, 510)
    302                 });
    303                 auto actual = ys1.subscriptions();
    304                 REQUIRE(required == actual);
    305             }
    306 
    307             THEN("there was one subscription and one unsubscription to the ys2"){
    308                 auto required = rxu::to_vector({
    309                     on.subscribe(200, 210)
    310                 });
    311                 auto actual = ys2.subscriptions();
    312                 REQUIRE(required == actual);
    313             }
    314 
    315             THEN("there was one subscription and one unsubscription to the ys3"){
    316                 auto required = rxu::to_vector({
    317                     on.subscribe(200, 210)
    318                 });
    319                 auto actual = ys3.subscriptions();
    320                 REQUIRE(required == actual);
    321             }
    322         }
    323     }
    324 }
    325 
    326 SCENARIO("variadic amb winner&non-owner throws", "[amb][join][operators]"){
    327     GIVEN("3 cold observables of ints."){
    328         auto sc = rxsc::make_test();
    329         auto w = sc.create_worker();
    330         const rxsc::test::messages<int> on;
    331 
    332         std::runtime_error ex("amb on_error from source");
    333 
    334         auto ys1 = sc.make_cold_observable({
    335             on.next(10, 101),
    336             on.next(110, 102),
    337             on.next(210, 103),
    338             on.error(310, ex)
    339         });
    340 
    341         auto ys2 = sc.make_cold_observable({
    342             on.next(20, 201),
    343             on.next(120, 202),
    344             on.next(220, 203),
    345             on.completed(320)
    346         });
    347 
    348         auto ys3 = sc.make_cold_observable({
    349             on.next(30, 301),
    350             on.next(130, 302),
    351             on.next(230, 303),
    352             on.completed(330)
    353         });
    354 
    355         WHEN("the first observable is selected to produce ints"){
    356 
    357             auto res = w.start(
    358                 [&]() {
    359                     return ys2
    360                         .amb(ys1, ys3)
    361                         // forget type to workaround lambda deduction bug on msvc 2013
    362                         .as_dynamic();
    363                 }
    364             );
    365 
    366             THEN("the output contains ints from the first observable"){
    367                 auto required = rxu::to_vector({
    368                     on.next(210, 101),
    369                     on.next(310, 102),
    370                     on.next(410, 103),
    371                     on.error(510, ex)
    372                 });
    373                 auto actual = res.get_observer().messages();
    374                 REQUIRE(required == actual);
    375             }
    376 
    377             THEN("there was one subscription and one unsubscription to the ys1"){
    378                 auto required = rxu::to_vector({
    379                     on.subscribe(200, 510)
    380                 });
    381                 auto actual = ys1.subscriptions();
    382                 REQUIRE(required == actual);
    383             }
    384 
    385             THEN("there was one subscription and one unsubscription to the ys2"){
    386                 auto required = rxu::to_vector({
    387                     on.subscribe(200, 210)
    388                 });
    389                 auto actual = ys2.subscriptions();
    390                 REQUIRE(required == actual);
    391             }
    392 
    393             THEN("there was one subscription and one unsubscription to the ys3"){
    394                 auto required = rxu::to_vector({
    395                     on.subscribe(200, 210)
    396                 });
    397                 auto actual = ys3.subscriptions();
    398                 REQUIRE(required == actual);
    399             }
    400         }
    401     }
    402 }
    403 
    404 SCENARIO("variadic amb loser&non-owner throws", "[amb][join][operators]"){
    405     GIVEN("3 cold observables of ints."){
    406         auto sc = rxsc::make_test();
    407         auto w = sc.create_worker();
    408         const rxsc::test::messages<int> on;
    409 
    410         std::runtime_error ex("amb on_error from source");
    411 
    412         auto ys1 = sc.make_cold_observable({
    413             on.next(10, 101),
    414             on.next(110, 102),
    415             on.next(210, 103),
    416             on.completed(310)
    417         });
    418 
    419         auto ys2 = sc.make_cold_observable({
    420             on.error(20, ex)
    421         });
    422 
    423         auto ys3 = sc.make_cold_observable({
    424             on.next(30, 301),
    425             on.next(130, 302),
    426             on.next(230, 303),
    427             on.completed(330)
    428         });
    429 
    430         WHEN("the first observable is selected to produce ints"){
    431 
    432             auto res = w.start(
    433                 [&]() {
    434                     return ys2
    435                         .amb(ys1, ys3)
    436                         // forget type to workaround lambda deduction bug on msvc 2013
    437                         .as_dynamic();
    438                 }
    439             );
    440 
    441             THEN("the output contains ints from the first observable"){
    442                 auto required = rxu::to_vector({
    443                     on.next(210, 101),
    444                     on.next(310, 102),
    445                     on.next(410, 103),
    446                     on.completed(510)
    447                 });
    448                 auto actual = res.get_observer().messages();
    449                 REQUIRE(required == actual);
    450             }
    451 
    452             THEN("there was one subscription and one unsubscription to the ys1"){
    453                 auto required = rxu::to_vector({
    454                     on.subscribe(200, 510)
    455                 });
    456                 auto actual = ys1.subscriptions();
    457                 REQUIRE(required == actual);
    458             }
    459 
    460             THEN("there was one subscription and one unsubscription to the ys2"){
    461                 auto required = rxu::to_vector({
    462                     on.subscribe(200, 210)
    463                 });
    464                 auto actual = ys2.subscriptions();
    465                 REQUIRE(required == actual);
    466             }
    467 
    468             THEN("there was one subscription and one unsubscription to the ys3"){
    469                 auto required = rxu::to_vector({
    470                     on.subscribe(200, 210)
    471                 });
    472                 auto actual = ys3.subscriptions();
    473                 REQUIRE(required == actual);
    474             }
    475         }
    476     }
    477 }
    478 
    479 SCENARIO("variadic amb loser&owner throws", "[amb][join][operators]"){
    480     GIVEN("3 cold observables of ints."){
    481         auto sc = rxsc::make_test();
    482         auto w = sc.create_worker();
    483         const rxsc::test::messages<int> on;
    484 
    485         std::runtime_error ex("amb on_error from source");
    486 
    487         auto ys1 = sc.make_cold_observable({
    488             on.next(10, 101),
    489             on.next(110, 102),
    490             on.next(210, 103),
    491             on.completed(310)
    492         });
    493 
    494         auto ys2 = sc.make_cold_observable({
    495             on.error(20, ex)
    496         });
    497 
    498         auto ys3 = sc.make_cold_observable({
    499             on.next(30, 301),
    500             on.next(130, 302),
    501             on.next(230, 303),
    502             on.completed(330)
    503         });
    504 
    505         WHEN("the first observable is selected to produce ints"){
    506 
    507             auto res = w.start(
    508                 [&]() {
    509                     return ys1
    510                         .amb(ys2, ys3)
    511                         // forget type to workaround lambda deduction bug on msvc 2013
    512                         .as_dynamic();
    513                 }
    514             );
    515 
    516             THEN("the output contains ints from the first observable"){
    517                 auto required = rxu::to_vector({
    518                     on.next(210, 101),
    519                     on.next(310, 102),
    520                     on.next(410, 103),
    521                     on.completed(510)
    522                 });
    523                 auto actual = res.get_observer().messages();
    524                 REQUIRE(required == actual);
    525             }
    526 
    527             THEN("there was one subscription and one unsubscription to the ys1"){
    528                 auto required = rxu::to_vector({
    529                     on.subscribe(200, 510)
    530                 });
    531                 auto actual = ys1.subscriptions();
    532                 REQUIRE(required == actual);
    533             }
    534 
    535             THEN("there was one subscription and one unsubscription to the ys2"){
    536                 auto required = rxu::to_vector({
    537                     on.subscribe(200, 210)
    538                 });
    539                 auto actual = ys2.subscriptions();
    540                 REQUIRE(required == actual);
    541             }
    542 
    543             THEN("there was one subscription and one unsubscription to the ys3"){
    544                 auto required = rxu::to_vector({
    545                     on.subscribe(200, 210)
    546                 });
    547                 auto actual = ys3.subscriptions();
    548                 REQUIRE(required == actual);
    549             }
    550         }
    551     }
    552 }
    553 
    554 SCENARIO("variadic amb never empty, custom coordination", "[amb][join][operators]"){
    555     GIVEN("2 hot observables of ints."){
    556         auto sc = rxsc::make_test();
    557         auto so = rx::synchronize_in_one_worker(sc);
    558         auto w = sc.create_worker();
    559         const rxsc::test::messages<int> on;
    560 
    561         auto ys1 = sc.make_hot_observable({
    562             on.next(100, 1)
    563         });
    564 
    565         auto ys2 = sc.make_hot_observable({
    566             on.next(110, 2),
    567             on.completed(400)
    568         });
    569 
    570         WHEN("the first observable is selected to produce ints"){
    571 
    572             auto res = w.start(
    573                 [&]() {
    574                     return ys1
    575                         .amb(so, ys2)
    576                         // forget type to workaround lambda deduction bug on msvc 2013
    577                         .as_dynamic();
    578                 }
    579             );
    580 
    581             THEN("the output contains only complete message"){
    582                 auto required = rxu::to_vector({
    583                     on.completed(401)
    584                 });
    585                 auto actual = res.get_observer().messages();
    586                 REQUIRE(required == actual);
    587             }
    588         }
    589     }
    590 }
    591