Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include <rxcpp/operators/rx-map.hpp>
      3 #include <rxcpp/operators/rx-take_until.hpp>
      4 
      5 SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){
      6     GIVEN("2 sources"){
      7         auto sc = rxsc::make_test();
      8         auto w = sc.create_worker();
      9         const rxsc::test::messages<int> on;
     10 
     11         auto xs = sc.make_hot_observable({
     12             on.next(150, 1),
     13             on.next(210, 2),
     14             on.next(220, 3),
     15             on.next(230, 4),
     16             on.next(240, 5),
     17             on.completed(250)
     18         });
     19 
     20         auto ys = sc.make_hot_observable({
     21             on.next(150, 1),
     22             on.next(225, 99),
     23             on.completed(230)
     24         });
     25 
     26         WHEN("one is taken until the other emits a marble"){
     27 
     28             auto res = w.start(
     29                 [xs, ys]() {
     30                 return xs
     31                     | rxo::take_until(ys)
     32                     // forget type to workaround lambda deduction bug on msvc 2013
     33                     | rxo::as_dynamic();
     34             }
     35             );
     36 
     37             THEN("the output only contains items sent while subscribed"){
     38                 auto required = rxu::to_vector({
     39                     on.next(210, 2),
     40                     on.next(220, 3),
     41                     on.completed(225)
     42                 });
     43                 auto actual = res.get_observer().messages();
     44                 REQUIRE(required == actual);
     45             }
     46 
     47             THEN("there was 1 subscription/unsubscription to the source"){
     48                 auto required = rxu::to_vector({
     49                     on.subscribe(200, 225)
     50                 });
     51                 auto actual = xs.subscriptions();
     52                 REQUIRE(required == actual);
     53             }
     54 
     55             THEN("there was 1 subscription/unsubscription to the trigger"){
     56                 auto required = rxu::to_vector({
     57                     on.subscribe(200, 225)
     58                 });
     59                 auto actual = ys.subscriptions();
     60                 REQUIRE(required == actual);
     61             }
     62 
     63         }
     64     }
     65 }
     66 
     67 SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){
     68     GIVEN("2 sources"){
     69         auto sc = rxsc::make_test();
     70         auto w = sc.create_worker();
     71         const rxsc::test::messages<int> on;
     72 
     73         auto l = sc.make_hot_observable({
     74             on.next(150, 1),
     75             on.next(210, 2),
     76             on.next(220, 3),
     77             on.next(230, 4),
     78             on.next(240, 5),
     79             on.completed(250)
     80         });
     81 
     82         auto r = sc.make_hot_observable({
     83             on.next(150, 1),
     84             on.next(225, 99),
     85             on.completed(230)
     86         });
     87 
     88         WHEN("one is taken until the other emits a marble"){
     89 
     90             auto res = w.start(
     91                 [l, r]() {
     92                 return l
     93                     .take_until(r)
     94                     // forget type to workaround lambda deduction bug on msvc 2013
     95                     .as_dynamic();
     96             }
     97             );
     98 
     99             THEN("the output only contains items sent while subscribed"){
    100                 auto required = rxu::to_vector({
    101                     on.next(210, 2),
    102                     on.next(220, 3),
    103                     on.completed(225)
    104                 });
    105                 auto actual = res.get_observer().messages();
    106                 REQUIRE(required == actual);
    107             }
    108 
    109             THEN("there was 1 subscription/unsubscription to the source"){
    110                 auto required = rxu::to_vector({
    111                     on.subscribe(200, 225)
    112                 });
    113                 auto actual = l.subscriptions();
    114                 REQUIRE(required == actual);
    115             }
    116 
    117             THEN("there was 1 subscription/unsubscription to the trigger"){
    118                 auto required = rxu::to_vector({
    119                     on.subscribe(200, 225)
    120                 });
    121                 auto actual = r.subscriptions();
    122                 REQUIRE(required == actual);
    123             }
    124 
    125         }
    126     }
    127 }
    128 
    129 SCENARIO("take_until, preempt some data error", "[take_until][take][operators]"){
    130     GIVEN("2 sources"){
    131         auto sc = rxsc::make_test();
    132         auto w = sc.create_worker();
    133         const rxsc::test::messages<int> on;
    134 
    135         std::runtime_error ex("take_until on_error from source");
    136 
    137         auto l = sc.make_hot_observable({
    138             on.next(150, 1),
    139             on.next(210, 2),
    140             on.next(220, 3),
    141             on.next(230, 4),
    142             on.next(240, 5),
    143             on.completed(250)
    144         });
    145 
    146         auto r = sc.make_hot_observable({
    147             on.next(150, 1),
    148             on.error(225, ex)
    149         });
    150 
    151         WHEN("one is taken until the other emits a marble"){
    152 
    153             auto res = w.start(
    154                 [l, r]() {
    155                 return l
    156                     .take_until(r)
    157                     // forget type to workaround lambda deduction bug on msvc 2013
    158                     .as_dynamic();
    159             }
    160             );
    161 
    162             THEN("the output only contains items sent while subscribed"){
    163                 auto required = rxu::to_vector({
    164                     on.next(210, 2),
    165                     on.next(220, 3),
    166                     on.error(225, ex)
    167                 });
    168                 auto actual = res.get_observer().messages();
    169                 REQUIRE(required == actual);
    170             }
    171 
    172             THEN("there was 1 subscription/unsubscription to the source"){
    173                 auto required = rxu::to_vector({
    174                     on.subscribe(200, 225)
    175                 });
    176                 auto actual = l.subscriptions();
    177                 REQUIRE(required == actual);
    178             }
    179 
    180             THEN("there was 1 subscription/unsubscription to the trigger"){
    181                 auto required = rxu::to_vector({
    182                     on.subscribe(200, 225)
    183                 });
    184                 auto actual = r.subscriptions();
    185                 REQUIRE(required == actual);
    186             }
    187 
    188         }
    189     }
    190 }
    191 
    192 SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators]"){
    193     GIVEN("2 sources"){
    194         auto sc = rxsc::make_test();
    195         auto w = sc.create_worker();
    196         const rxsc::test::messages<int> on;
    197 
    198         auto l = sc.make_hot_observable({
    199             on.next(150, 1),
    200             on.next(210, 2),
    201             on.next(220, 3),
    202             on.next(230, 4),
    203             on.next(240, 5),
    204             on.completed(250)
    205         });
    206 
    207         auto r = sc.make_hot_observable({
    208             on.next(150, 1),
    209             on.completed(225)
    210         });
    211 
    212         WHEN("one is taken until the other emits a marble"){
    213 
    214             auto res = w.start(
    215                 [l, r]() {
    216                 return l
    217                     .take_until(r)
    218                     // forget type to workaround lambda deduction bug on msvc 2013
    219                     .as_dynamic();
    220             }
    221             );
    222 
    223             THEN("the output only contains items sent while subscribed"){
    224                 auto required = rxu::to_vector({
    225                     on.next(210, 2),
    226                     on.next(220, 3),
    227                     on.next(230, 4),
    228                     on.next(240, 5),
    229                     on.completed(250)
    230                 });
    231                 auto actual = res.get_observer().messages();
    232                 REQUIRE(required == actual);
    233             }
    234 
    235             THEN("there was 1 subscription/unsubscription to the source"){
    236                 auto required = rxu::to_vector({
    237                     on.subscribe(200, 250)
    238                 });
    239                 auto actual = l.subscriptions();
    240                 REQUIRE(required == actual);
    241             }
    242 
    243             THEN("there was 1 subscription/unsubscription to the trigger"){
    244                 auto required = rxu::to_vector({
    245                     on.subscribe(200, 225)
    246                 });
    247                 auto actual = r.subscriptions();
    248                 REQUIRE(required == actual);
    249             }
    250 
    251         }
    252     }
    253 }
    254 
    255 SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators]"){
    256     GIVEN("2 sources"){
    257         auto sc = rxsc::make_test();
    258         auto w = sc.create_worker();
    259         const rxsc::test::messages<int> on;
    260 
    261         auto l = sc.make_hot_observable({
    262             on.next(150, 1),
    263             on.next(210, 2),
    264             on.next(220, 3),
    265             on.next(230, 4),
    266             on.next(240, 5),
    267             on.completed(250)
    268         });
    269 
    270         auto r = sc.make_hot_observable({
    271             on.next(150, 1)
    272         });
    273 
    274         WHEN("one is taken until the other emits a marble"){
    275 
    276             auto res = w.start(
    277                 [l, r]() {
    278                 return l
    279                     .take_until(r)
    280                     // forget type to workaround lambda deduction bug on msvc 2013
    281                     .as_dynamic();
    282             }
    283             );
    284 
    285             THEN("the output only contains items sent while subscribed"){
    286                 auto required = rxu::to_vector({
    287                     on.next(210, 2),
    288                     on.next(220, 3),
    289                     on.next(230, 4),
    290                     on.next(240, 5),
    291                     on.completed(250)
    292                 });
    293                 auto actual = res.get_observer().messages();
    294                 REQUIRE(required == actual);
    295             }
    296 
    297             THEN("there was 1 subscription/unsubscription to the source"){
    298                 auto required = rxu::to_vector({
    299                     on.subscribe(200, 250)
    300                 });
    301                 auto actual = l.subscriptions();
    302                 REQUIRE(required == actual);
    303             }
    304 
    305             THEN("there was 1 subscription/unsubscription to the trigger"){
    306                 auto required = rxu::to_vector({
    307                     on.subscribe(200, 250)
    308                 });
    309                 auto actual = r.subscriptions();
    310                 REQUIRE(required == actual);
    311             }
    312 
    313         }
    314     }
    315 }
    316 
    317 SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){
    318     GIVEN("2 sources"){
    319         auto sc = rxsc::make_test();
    320         auto w = sc.create_worker();
    321         const rxsc::test::messages<int> on;
    322 
    323         auto l = sc.make_hot_observable({
    324             on.next(150, 1)
    325         });
    326 
    327         auto r = sc.make_hot_observable({
    328             on.next(150, 1),
    329             on.next(225, 2), //!
    330             on.completed(250)
    331         });
    332 
    333         WHEN("one is taken until the other emits a marble"){
    334 
    335             auto res = w.start(
    336                 [l, r]() {
    337                 return l
    338                     .take_until(r)
    339                     // forget type to workaround lambda deduction bug on msvc 2013
    340                     .as_dynamic();
    341             }
    342             );
    343 
    344             THEN("the output only contains items sent while subscribed"){
    345                 auto required = rxu::to_vector({
    346                     on.completed(225)
    347                 });
    348                 auto actual = res.get_observer().messages();
    349                 REQUIRE(required == actual);
    350             }
    351 
    352             THEN("there was 1 subscription/unsubscription to the source"){
    353                 auto required = rxu::to_vector({
    354                     on.subscribe(200, 225)
    355                 });
    356                 auto actual = l.subscriptions();
    357                 REQUIRE(required == actual);
    358             }
    359 
    360             THEN("there was 1 subscription/unsubscription to the trigger"){
    361                 auto required = rxu::to_vector({
    362                     on.subscribe(200, 225)
    363                 });
    364                 auto actual = r.subscriptions();
    365                 REQUIRE(required == actual);
    366             }
    367 
    368         }
    369     }
    370 }
    371 
    372 SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){
    373     GIVEN("2 sources"){
    374         auto sc = rxsc::make_test();
    375         auto w = sc.create_worker();
    376         const rxsc::test::messages<int> on;
    377 
    378         std::runtime_error ex("take_until on_error from source");
    379 
    380         auto l = sc.make_hot_observable({
    381             on.next(150, 1)
    382         });
    383 
    384         auto r = sc.make_hot_observable({
    385             on.next(150, 1),
    386             on.error(225, ex)
    387         });
    388 
    389         WHEN("one is taken until the other emits a marble"){
    390 
    391             auto res = w.start(
    392                 [l, r]() {
    393                 return l
    394                     .take_until(r)
    395                     // forget type to workaround lambda deduction bug on msvc 2013
    396                     .as_dynamic();
    397             }
    398             );
    399 
    400             THEN("the output only contains items sent while subscribed"){
    401                 auto required = rxu::to_vector({
    402                     on.error(225, ex)
    403                 });
    404                 auto actual = res.get_observer().messages();
    405                 REQUIRE(required == actual);
    406             }
    407 
    408             THEN("there was 1 subscription/unsubscription to the source"){
    409                 auto required = rxu::to_vector({
    410                     on.subscribe(200, 225)
    411                 });
    412                 auto actual = l.subscriptions();
    413                 REQUIRE(required == actual);
    414             }
    415 
    416             THEN("there was 1 subscription/unsubscription to the trigger"){
    417                 auto required = rxu::to_vector({
    418                     on.subscribe(200, 225)
    419                 });
    420                 auto actual = r.subscriptions();
    421                 REQUIRE(required == actual);
    422             }
    423 
    424         }
    425     }
    426 }
    427 
    428 SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){
    429     GIVEN("2 sources"){
    430         auto sc = rxsc::make_test();
    431         auto w = sc.create_worker();
    432         const rxsc::test::messages<int> on;
    433 
    434         auto l = sc.make_hot_observable({
    435             on.next(150, 1)
    436         });
    437 
    438         auto r = sc.make_hot_observable({
    439             on.next(150, 1),
    440             on.completed(225)
    441         });
    442 
    443         WHEN("one is taken until the other emits a marble"){
    444 
    445             auto res = w.start(
    446                 [l, r]() {
    447                 return l
    448                     .take_until(r)
    449                     // forget type to workaround lambda deduction bug on msvc 2013
    450                     .as_dynamic();
    451             }
    452             );
    453 
    454             THEN("the output only contains items sent while subscribed"){
    455                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    456                 auto actual = res.get_observer().messages();
    457                 REQUIRE(required == actual);
    458             }
    459 
    460             THEN("there was 1 subscription/unsubscription to the source"){
    461                 auto required = rxu::to_vector({
    462                     on.subscribe(200, 1000 /* can't dispose prematurely, could be in flight to dispatch OnError */)
    463                 });
    464                 auto actual = l.subscriptions();
    465                 REQUIRE(required == actual);
    466             }
    467 
    468             THEN("there was 1 subscription/unsubscription to the trigger"){
    469                 auto required = rxu::to_vector({
    470                     on.subscribe(200, 225)
    471                 });
    472                 auto actual = r.subscriptions();
    473                 REQUIRE(required == actual);
    474             }
    475 
    476         }
    477     }
    478 }
    479 
    480 SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){
    481     GIVEN("2 sources"){
    482         auto sc = rxsc::make_test();
    483         auto w = sc.create_worker();
    484         const rxsc::test::messages<int> on;
    485 
    486         auto l = sc.make_hot_observable({
    487             on.next(150, 1)
    488         });
    489 
    490         auto r = sc.make_hot_observable({
    491             on.next(150, 1)
    492         });
    493 
    494         WHEN("one is taken until the other emits a marble"){
    495 
    496             auto res = w.start(
    497                 [l, r]() {
    498                 return l
    499                     .take_until(r)
    500                     // forget type to workaround lambda deduction bug on msvc 2013
    501                     .as_dynamic();
    502             }
    503             );
    504 
    505             THEN("the output only contains items sent while subscribed"){
    506                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
    507                 auto actual = res.get_observer().messages();
    508                 REQUIRE(required == actual);
    509             }
    510 
    511             THEN("there was 1 subscription/unsubscription to the source"){
    512                 auto required = rxu::to_vector({
    513                     on.subscribe(200, 1000)
    514                 });
    515                 auto actual = l.subscriptions();
    516                 REQUIRE(required == actual);
    517             }
    518 
    519             THEN("there was 1 subscription/unsubscription to the trigger"){
    520                 auto required = rxu::to_vector({
    521                     on.subscribe(200, 1000)
    522                 });
    523                 auto actual = r.subscriptions();
    524                 REQUIRE(required == actual);
    525             }
    526 
    527         }
    528     }
    529 }
    530 
    531 SCENARIO("take_until, preempt before first produced", "[take_until][take][operators]"){
    532     GIVEN("2 sources"){
    533         auto sc = rxsc::make_test();
    534         auto w = sc.create_worker();
    535         const rxsc::test::messages<int> on;
    536 
    537         auto l = sc.make_hot_observable({
    538             on.next(150, 1),
    539             on.next(230, 2),
    540             on.completed(240)
    541         });
    542 
    543         auto r = sc.make_hot_observable({
    544             on.next(150, 1),
    545             on.next(210, 2), //!
    546             on.completed(220)
    547         });
    548 
    549         WHEN("one is taken until the other emits a marble"){
    550 
    551             auto res = w.start(
    552                 [l, r]() {
    553                 return l
    554                     .take_until(r)
    555                     // forget type to workaround lambda deduction bug on msvc 2013
    556                     .as_dynamic();
    557             }
    558             );
    559 
    560             THEN("the output only contains items sent while subscribed"){
    561                 auto required = rxu::to_vector({
    562                     on.completed(210)
    563                 });
    564                 auto actual = res.get_observer().messages();
    565                 REQUIRE(required == actual);
    566             }
    567 
    568             THEN("there was 1 subscription/unsubscription to the source"){
    569                 auto required = rxu::to_vector({
    570                     on.subscribe(200, 210)
    571                 });
    572                 auto actual = l.subscriptions();
    573                 REQUIRE(required == actual);
    574             }
    575 
    576             THEN("there was 1 subscription/unsubscription to the trigger"){
    577                 auto required = rxu::to_vector({
    578                     on.subscribe(200, 210)
    579                 });
    580                 auto actual = r.subscriptions();
    581                 REQUIRE(required == actual);
    582             }
    583 
    584         }
    585     }
    586 }
    587 
    588 SCENARIO("take_until, preempt before first produced, remain silent and proper unsubscribed", "[take_until][take][operators]"){
    589     GIVEN("2 sources"){
    590         auto sc = rxsc::make_test();
    591         auto w = sc.create_worker();
    592         const rxsc::test::messages<int> on;
    593 
    594         bool sourceNotDisposed = false;
    595 
    596         auto l = sc.make_hot_observable({
    597             on.next(150, 1),
    598             on.error(215, std::runtime_error("error in unsubscribed stream")), // should not come
    599             on.completed(240)
    600         });
    601 
    602         auto r = sc.make_hot_observable({
    603             on.next(150, 1),
    604             on.next(210, 2), //!
    605             on.completed(220)
    606         });
    607 
    608         WHEN("one is taken until the other emits a marble"){
    609 
    610             auto res = w.start(
    611                 [l, r, &sourceNotDisposed]() {
    612                 return l
    613                     .map([&sourceNotDisposed](int v){sourceNotDisposed = true; return v; })
    614                     .take_until(r)
    615                     // forget type to workaround lambda deduction bug on msvc 2013
    616                     .as_dynamic();
    617             }
    618             );
    619 
    620             THEN("the output only contains items sent while subscribed"){
    621                 auto required = rxu::to_vector({
    622                     on.completed(210)
    623                 });
    624                 auto actual = res.get_observer().messages();
    625                 REQUIRE(required == actual);
    626             }
    627 
    628             THEN("signal disposed"){
    629                 auto required = false;
    630                 auto actual = sourceNotDisposed;
    631                 REQUIRE(required == actual);
    632             }
    633 
    634         }
    635     }
    636 }
    637 
    638 SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal", "[take_until][take][operators]"){
    639     GIVEN("2 sources"){
    640         auto sc = rxsc::make_test();
    641         auto w = sc.create_worker();
    642         const rxsc::test::messages<int> on;
    643 
    644         bool signalNotDisposed = false;
    645 
    646         auto l = sc.make_hot_observable({
    647             on.next(150, 1),
    648             on.next(230, 2),
    649             on.completed(240)
    650         });
    651 
    652         auto r = sc.make_hot_observable({
    653             on.next(150, 1),
    654             on.next(250, 2),
    655             on.completed(260)
    656         });
    657 
    658         WHEN("one is taken until the other emits a marble"){
    659 
    660             auto res = w.start(
    661                 [l, r, &signalNotDisposed]() {
    662                 return l
    663                     .take_until(r
    664                     .map([&signalNotDisposed](int v){signalNotDisposed = true; return v; }))
    665                     // forget type to workaround lambda deduction bug on msvc 2013
    666                     .as_dynamic();
    667             }
    668             );
    669 
    670             THEN("the output only contains items sent while subscribed"){
    671                 auto required = rxu::to_vector({
    672                     on.next(230, 2),
    673                     on.completed(240)
    674                 });
    675                 auto actual = res.get_observer().messages();
    676                 REQUIRE(required == actual);
    677             }
    678 
    679             THEN("signal disposed"){
    680                 auto required = false;
    681                 auto actual = signalNotDisposed;
    682                 REQUIRE(required == actual);
    683             }
    684 
    685         }
    686     }
    687 }
    688 
    689 SCENARIO("take_until, error some", "[take_until][take][operators]"){
    690     GIVEN("2 sources"){
    691         auto sc = rxsc::make_test();
    692         auto w = sc.create_worker();
    693         const rxsc::test::messages<int> on;
    694 
    695         std::runtime_error ex("take_until on_error from source");
    696 
    697         auto l = sc.make_hot_observable({
    698             on.next(150, 1),
    699             on.error(225, ex)
    700         });
    701 
    702         auto r = sc.make_hot_observable({
    703             on.next(150, 1),
    704             on.next(240, 2)
    705         });
    706 
    707         WHEN("one is taken until the other emits a marble"){
    708 
    709             auto res = w.start(
    710                 [l, r]() {
    711                 return l
    712                     .take_until(r)
    713                     // forget type to workaround lambda deduction bug on msvc 2013
    714                     .as_dynamic();
    715             }
    716             );
    717 
    718             THEN("the output only contains items sent while subscribed"){
    719                 auto required = rxu::to_vector({
    720                     on.error(225, ex)
    721                 });
    722                 auto actual = res.get_observer().messages();
    723                 REQUIRE(required == actual);
    724             }
    725 
    726             THEN("there was 1 subscription/unsubscription to the source"){
    727                 auto required = rxu::to_vector({
    728                     on.subscribe(200, 225)
    729                 });
    730                 auto actual = l.subscriptions();
    731                 REQUIRE(required == actual);
    732             }
    733 
    734             THEN("there was 1 subscription/unsubscription to the trigger"){
    735                 auto required = rxu::to_vector({
    736                     on.subscribe(200, 225)
    737                 });
    738                 auto actual = r.subscriptions();
    739                 REQUIRE(required == actual);
    740             }
    741 
    742         }
    743     }
    744 }
    745 
    746 SCENARIO("take_until trigger on time point", "[take_until][take][operators]"){
    747     GIVEN("a source and a time point"){
    748         auto sc = rxsc::make_test();
    749         auto so = rx::synchronize_in_one_worker(sc);
    750         auto w = sc.create_worker();
    751         const rxsc::test::messages<int> on;
    752 
    753         auto xs = sc.make_hot_observable({
    754             on.next(150, 1),
    755             on.next(210, 2),
    756             on.next(220, 3),
    757             on.next(230, 4),
    758             on.next(240, 5),
    759             on.completed(250)
    760         });
    761 
    762         auto t = sc.to_time_point(225);
    763 
    764         WHEN("invoked with a time point"){
    765 
    766             auto res = w.start(
    767                 [&]() {
    768                 return xs
    769                     | rxo::take_until(t, so)
    770                     // forget type to workaround lambda deduction bug on msvc 2013
    771                     | rxo::as_dynamic();
    772             }
    773             );
    774 
    775             THEN("the output only contains items sent while subscribed"){
    776                 auto required = rxu::to_vector({
    777                     on.next(211, 2),
    778                     on.next(221, 3),
    779                     on.completed(226)
    780                 });
    781                 auto actual = res.get_observer().messages();
    782                 REQUIRE(required == actual);
    783             }
    784 
    785             THEN("there was 1 subscription/unsubscription to the source"){
    786                 auto required = rxu::to_vector({
    787                     on.subscribe(200, 226)
    788                 });
    789                 auto actual = xs.subscriptions();
    790                 REQUIRE(required == actual);
    791             }
    792         }
    793     }
    794 }
    795