Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include <rxcpp/operators/rx-take_while.hpp>
      3 
      4 namespace {
      5     class not_equal_to {
      6         int value;
      7     public:
      8         not_equal_to(int value) : value(value) { }
      9         bool operator()(int i) const {
     10             return i != value;
     11         }
     12     };
     13 }
     14 
     15 SCENARIO("take_while not equal to 4", "[take_while][operators]"){
     16     GIVEN("a source"){
     17         auto sc = rxsc::make_test();
     18         auto w = sc.create_worker();
     19         const rxsc::test::messages<int> on;
     20 
     21         auto xs = sc.make_hot_observable({
     22             on.next(150, 1),
     23             on.next(210, 2),
     24             on.next(220, 3),
     25             on.next(230, 4),
     26             on.next(240, 5),
     27             on.completed(250)
     28         });
     29 
     30         WHEN("values before 4 are taken"){
     31 
     32             auto res = w.start(
     33                 [xs]() {
     34                     return xs
     35                         .take_while(not_equal_to(4))
     36                         .as_dynamic();
     37                 }
     38             );
     39 
     40             THEN("the output only contains items sent while subscribed"){
     41                 auto required = rxu::to_vector({
     42                     on.next(210, 2),
     43                     on.next(220, 3),
     44                     on.completed(230)
     45                 });
     46                 auto actual = res.get_observer().messages();
     47                 REQUIRE(required == actual);
     48             }
     49 
     50             THEN("there was 1 subscription/unsubscription to the source"){
     51                 auto required = rxu::to_vector({
     52                     on.subscribe(200, 230)
     53                 });
     54                 auto actual = xs.subscriptions();
     55                 REQUIRE(required == actual);
     56             }
     57 
     58         }
     59     }
     60 }
     61 
     62 SCENARIO("take_while, complete after", "[take_while][operators]"){
     63     GIVEN("a source"){
     64         auto sc = rxsc::make_test();
     65         auto w = sc.create_worker();
     66         const rxsc::test::messages<int> on;
     67 
     68         auto xs = sc.make_hot_observable({
     69             on.next(70, 6),
     70             on.next(150, 4),
     71             on.next(210, 9),
     72             on.next(230, 13),
     73             on.next(270, 7),
     74             on.next(280, 1),
     75             on.next(300, -1),
     76             on.next(310, 3),
     77             on.next(340, 8),
     78             on.next(370, 11),
     79             on.next(410, 15),
     80             on.next(415, 16),
     81             on.next(460, 72),
     82             on.next(510, 76),
     83             on.next(560, 32),
     84             on.next(570, -100),
     85             on.next(580, -3),
     86             on.next(590, 5),
     87             on.next(630, 10),
     88             on.completed(690)
     89         });
     90 
     91         WHEN("all are taken"){
     92 
     93             auto res = w.start(
     94                 [xs]() {
     95                     return xs
     96                         .take_while(not_equal_to(0))
     97                         // forget type to workaround lambda deduction bug on msvc 2013
     98                         .as_dynamic();
     99                 }
    100             );
    101 
    102             THEN("the output only contains items sent while subscribed"){
    103                 auto required = rxu::to_vector({
    104                     on.next(210, 9),
    105                     on.next(230, 13),
    106                     on.next(270, 7),
    107                     on.next(280, 1),
    108                     on.next(300, -1),
    109                     on.next(310, 3),
    110                     on.next(340, 8),
    111                     on.next(370, 11),
    112                     on.next(410, 15),
    113                     on.next(415, 16),
    114                     on.next(460, 72),
    115                     on.next(510, 76),
    116                     on.next(560, 32),
    117                     on.next(570, -100),
    118                     on.next(580, -3),
    119                     on.next(590, 5),
    120                     on.next(630, 10),
    121                     on.completed(690)
    122                 });
    123                 auto actual = res.get_observer().messages();
    124                 REQUIRE(required == actual);
    125             }
    126 
    127             THEN("there was 1 subscription/unsubscription to the source"){
    128                 auto required = rxu::to_vector({
    129                     on.subscribe(200, 690)
    130                 });
    131                 auto actual = xs.subscriptions();
    132                 REQUIRE(required == actual);
    133             }
    134 
    135         }
    136     }
    137 }
    138 
    139 SCENARIO("take_while, complete before", "[take_while][operators]"){
    140     GIVEN("a source"){
    141         auto sc = rxsc::make_test();
    142         auto w = sc.create_worker();
    143         const rxsc::test::messages<int> on;
    144 
    145         auto xs = sc.make_hot_observable({
    146             on.next(70, 6),
    147             on.next(150, 4),
    148             on.next(210, 9),
    149             on.next(230, 13),
    150             on.next(270, 7),
    151             on.next(280, 1),
    152             on.next(300, -1),
    153             on.next(310, 3),
    154             on.next(340, 8),
    155             on.next(370, 11),
    156             on.next(410, 15),
    157             on.next(415, 16),
    158             on.next(460, 72),
    159             on.next(510, 76),
    160             on.next(560, 32),
    161             on.next(570, -100),
    162             on.next(580, -3),
    163             on.next(590, 5),
    164             on.next(630, 10),
    165             on.completed(690)
    166         });
    167 
    168         WHEN("10 values are taken"){
    169 
    170             auto res = w.start(
    171                 [xs]() {
    172                     return xs
    173                         .take_while(not_equal_to(72))
    174                         // forget type to workaround lambda deduction bug on msvc 2013
    175                         .as_dynamic();
    176                 }
    177             );
    178 
    179             THEN("the output only contains items sent while subscribed"){
    180                 auto required = rxu::to_vector({
    181                     on.next(210, 9),
    182                     on.next(230, 13),
    183                     on.next(270, 7),
    184                     on.next(280, 1),
    185                     on.next(300, -1),
    186                     on.next(310, 3),
    187                     on.next(340, 8),
    188                     on.next(370, 11),
    189                     on.next(410, 15),
    190                     on.next(415, 16),
    191                     on.completed(460)
    192                 });
    193                 auto actual = res.get_observer().messages();
    194                 REQUIRE(required == actual);
    195             }
    196 
    197             THEN("there was 1 subscription/unsubscription to the source"){
    198                 auto required = rxu::to_vector({
    199                     on.subscribe(200, 460)
    200                 });
    201                 auto actual = xs.subscriptions();
    202                 REQUIRE(required == actual);
    203             }
    204 
    205         }
    206     }
    207 }
    208 
    209 SCENARIO("take_while, error after", "[take_while][operators]"){
    210     GIVEN("a source"){
    211         auto sc = rxsc::make_test();
    212         auto w = sc.create_worker();
    213         const rxsc::test::messages<int> on;
    214 
    215         std::runtime_error ex("take_while on_error from source");
    216 
    217         auto xs = sc.make_hot_observable({
    218             on.next(70, 6),
    219             on.next(150, 4),
    220             on.next(210, 9),
    221             on.next(230, 13),
    222             on.next(270, 7),
    223             on.next(280, 1),
    224             on.next(300, -1),
    225             on.next(310, 3),
    226             on.next(340, 8),
    227             on.next(370, 11),
    228             on.next(410, 15),
    229             on.next(415, 16),
    230             on.next(460, 72),
    231             on.next(510, 76),
    232             on.next(560, 32),
    233             on.next(570, -100),
    234             on.next(580, -3),
    235             on.next(590, 5),
    236             on.next(630, 10),
    237             on.error(690, ex)
    238         });
    239 
    240         WHEN("all values are taken"){
    241 
    242             auto res = w.start(
    243                 [xs]() {
    244                     return xs
    245                         .take_while(not_equal_to(0))
    246                         // forget type to workaround lambda deduction bug on msvc 2013
    247                         .as_dynamic();
    248                 }
    249             );
    250 
    251             THEN("the output only contains items sent while subscribed and the error"){
    252                 auto required = rxu::to_vector({
    253                     on.next(210, 9),
    254                     on.next(230, 13),
    255                     on.next(270, 7),
    256                     on.next(280, 1),
    257                     on.next(300, -1),
    258                     on.next(310, 3),
    259                     on.next(340, 8),
    260                     on.next(370, 11),
    261                     on.next(410, 15),
    262                     on.next(415, 16),
    263                     on.next(460, 72),
    264                     on.next(510, 76),
    265                     on.next(560, 32),
    266                     on.next(570, -100),
    267                     on.next(580, -3),
    268                     on.next(590, 5),
    269                     on.next(630, 10),
    270                     on.error(690, ex)
    271                 });
    272                 auto actual = res.get_observer().messages();
    273                 REQUIRE(required == actual);
    274             }
    275 
    276             THEN("there was 1 subscription/unsubscription to the source"){
    277                 auto required = rxu::to_vector({
    278                     on.subscribe(200, 690)
    279                 });
    280                 auto actual = xs.subscriptions();
    281                 REQUIRE(required == actual);
    282             }
    283 
    284         }
    285     }
    286 }
    287 
    288 SCENARIO("take_while, error same", "[take_while][operators]"){
    289     GIVEN("a source"){
    290         auto sc = rxsc::make_test();
    291         auto w = sc.create_worker();
    292         const rxsc::test::messages<int> on;
    293 
    294         auto xs = sc.make_hot_observable({
    295             on.next(70, 6),
    296             on.next(150, 4),
    297             on.next(210, 9),
    298             on.next(230, 13),
    299             on.next(270, 7),
    300             on.next(280, 1),
    301             on.next(300, -1),
    302             on.next(310, 3),
    303             on.next(340, 8),
    304             on.next(370, 11),
    305             on.next(410, 15),
    306             on.next(415, 16),
    307             on.next(460, 72),
    308             on.next(510, 76),
    309             on.next(560, 32),
    310             on.next(570, -100),
    311             on.next(580, -3),
    312             on.next(590, 5),
    313             on.next(630, 10),
    314             on.error(690, std::runtime_error("error in unsubscribed stream"))
    315         });
    316 
    317         WHEN("all but one values are taken"){
    318 
    319             auto res = w.start(
    320                 [xs]() {
    321                     return xs
    322                         .take_while(not_equal_to(10))
    323                         // forget type to workaround lambda deduction bug on msvc 2013
    324                         .as_dynamic();
    325                 }
    326             );
    327 
    328             THEN("the output only contains items sent while subscribed"){
    329                 auto required = rxu::to_vector({
    330                     on.next(210, 9),
    331                     on.next(230, 13),
    332                     on.next(270, 7),
    333                     on.next(280, 1),
    334                     on.next(300, -1),
    335                     on.next(310, 3),
    336                     on.next(340, 8),
    337                     on.next(370, 11),
    338                     on.next(410, 15),
    339                     on.next(415, 16),
    340                     on.next(460, 72),
    341                     on.next(510, 76),
    342                     on.next(560, 32),
    343                     on.next(570, -100),
    344                     on.next(580, -3),
    345                     on.next(590, 5),
    346                     on.completed(630)
    347                 });
    348                 auto actual = res.get_observer().messages();
    349                 REQUIRE(required == actual);
    350             }
    351 
    352             THEN("there was 1 subscription/unsubscription to the source"){
    353                 auto required = rxu::to_vector({
    354                     on.subscribe(200, 630)
    355                 });
    356                 auto actual = xs.subscriptions();
    357                 REQUIRE(required == actual);
    358             }
    359 
    360         }
    361     }
    362 }
    363 
    364 SCENARIO("take_while, error before", "[take_while][operators]"){
    365     GIVEN("a source"){
    366         auto sc = rxsc::make_test();
    367         auto w = sc.create_worker();
    368         const rxsc::test::messages<int> on;
    369 
    370         auto xs = sc.make_hot_observable({
    371             on.next(70, 6),
    372             on.next(150, 4),
    373             on.next(210, 9),
    374             on.next(230, 13),
    375             on.next(270, 7),
    376             on.next(280, 1),
    377             on.next(300, -1),
    378             on.next(310, 3),
    379             on.next(340, 8),
    380             on.next(370, 11),
    381             on.next(410, 15),
    382             on.next(415, 16),
    383             on.next(460, 72),
    384             on.next(510, 76),
    385             on.next(560, 32),
    386             on.next(570, -100),
    387             on.next(580, -3),
    388             on.next(590, 5),
    389             on.next(630, 10),
    390             on.error(690, std::runtime_error("error in unsubscribed stream"))
    391         });
    392 
    393         WHEN("3 values are taken"){
    394 
    395             auto res = w.start(
    396                 [xs]() {
    397                     return xs
    398                         .take_while(not_equal_to(1))
    399                         // forget type to workaround lambda deduction bug on msvc 2013
    400                         .as_dynamic();
    401                 }
    402             );
    403 
    404             THEN("the output only contains items sent while subscribed"){
    405                 auto required = rxu::to_vector({
    406                     on.next(210, 9),
    407                     on.next(230, 13),
    408                     on.next(270, 7),
    409                     on.completed(280)
    410                 });
    411                 auto actual = res.get_observer().messages();
    412                 REQUIRE(required == actual);
    413             }
    414 
    415             THEN("there was 1 subscription/unsubscription to the source"){
    416                 auto required = rxu::to_vector({
    417                     on.subscribe(200, 280)
    418                 });
    419                 auto actual = xs.subscriptions();
    420                 REQUIRE(required == actual);
    421             }
    422 
    423         }
    424     }
    425 }
    426 
    427 SCENARIO("take_while, dispose before", "[take_while][operators]"){
    428     GIVEN("a source"){
    429         auto sc = rxsc::make_test();
    430         auto w = sc.create_worker();
    431         const rxsc::test::messages<int> on;
    432 
    433         auto xs = sc.make_hot_observable({
    434             on.next(70, 6),
    435             on.next(150, 4),
    436             on.next(210, 9),
    437             on.next(230, 13),
    438             on.next(270, 7),
    439             on.next(280, 1),
    440             on.next(300, -1),
    441             on.next(310, 3),
    442             on.next(340, 8),
    443             on.next(370, 11),
    444             on.next(410, 15),
    445             on.next(415, 16),
    446             on.next(460, 72),
    447             on.next(510, 76),
    448             on.next(560, 32),
    449             on.next(570, -100),
    450             on.next(580, -3),
    451             on.next(590, 5),
    452             on.next(630, 10)
    453         });
    454 
    455         WHEN("3 values are taken"){
    456 
    457             auto res = w.start(
    458                 [xs]() {
    459                     return xs
    460                         .take_while(not_equal_to(100))
    461                         // forget type to workaround lambda deduction bug on msvc 2013
    462                         .as_dynamic();
    463                 },
    464                 250
    465             );
    466 
    467             THEN("the output only contains items sent while subscribed"){
    468                 auto required = rxu::to_vector({
    469                     on.next(210, 9),
    470                     on.next(230, 13)
    471                 });
    472                 auto actual = res.get_observer().messages();
    473                 REQUIRE(required == actual);
    474             }
    475 
    476             THEN("there was 1 subscription/unsubscription to the source"){
    477                 auto required = rxu::to_vector({
    478                     on.subscribe(200, 250)
    479                 });
    480                 auto actual = xs.subscriptions();
    481                 REQUIRE(required == actual);
    482             }
    483 
    484         }
    485     }
    486 }
    487 
    488 SCENARIO("take_while, dispose after", "[take_while][operators]"){
    489     GIVEN("a source"){
    490         auto sc = rxsc::make_test();
    491         auto w = sc.create_worker();
    492         const rxsc::test::messages<int> on;
    493 
    494         auto xs = sc.make_hot_observable({
    495             on.next(70, 6),
    496             on.next(150, 4),
    497             on.next(210, 9),
    498             on.next(230, 13),
    499             on.next(270, 7),
    500             on.next(280, 1),
    501             on.next(300, -1),
    502             on.next(310, 3),
    503             on.next(340, 8),
    504             on.next(370, 11),
    505             on.next(410, 15),
    506             on.next(415, 16),
    507             on.next(460, 72),
    508             on.next(510, 76),
    509             on.next(560, 32),
    510             on.next(570, -100),
    511             on.next(580, -3),
    512             on.next(590, 5),
    513             on.next(630, 10)
    514         });
    515 
    516         WHEN("3 values are taken"){
    517 
    518             auto res = w.start(
    519                 [xs]() {
    520                     return xs
    521                         .take_while(not_equal_to(1))
    522                         // forget type to workaround lambda deduction bug on msvc 2013
    523                         .as_dynamic();
    524                 },
    525                 400
    526             );
    527 
    528             THEN("the output only contains items sent while subscribed"){
    529                 auto required = rxu::to_vector({
    530                     on.next(210, 9),
    531                     on.next(230, 13),
    532                     on.next(270, 7),
    533                     on.completed(280)
    534                 });
    535                 auto actual = res.get_observer().messages();
    536                 REQUIRE(required == actual);
    537             }
    538 
    539             THEN("there was 1 subscription/unsubscription to the source"){
    540                 auto required = rxu::to_vector({
    541                     on.subscribe(200, 280)
    542                 });
    543                 auto actual = xs.subscriptions();
    544                 REQUIRE(required == actual);
    545             }
    546 
    547         }
    548     }
    549 }
    550 
    551