Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include <rxcpp/operators/rx-distinct_until_changed.hpp>
      3 
      4 SCENARIO("distinct_until_changed - never", "[distinct_until_changed][operators]"){
      5     GIVEN("a source"){
      6         auto sc = rxsc::make_test();
      7         auto w = sc.create_worker();
      8         const rxsc::test::messages<int> on;
      9 
     10         auto xs = sc.make_hot_observable({
     11             on.next(150, 1)
     12         });
     13 
     14         WHEN("distinct values are taken"){
     15 
     16             auto res = w.start(
     17                 [xs]() {
     18                     return xs | rxo::distinct_until_changed();
     19                 }
     20             );
     21 
     22             THEN("the output is empty"){
     23                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
     24                 auto actual = res.get_observer().messages();
     25                 REQUIRE(required == actual);
     26             }
     27 
     28             THEN("there was 1 subscription/unsubscription to the source"){
     29                 auto required = rxu::to_vector({
     30                     on.subscribe(200, 1000)
     31                 });
     32                 auto actual = xs.subscriptions();
     33                 REQUIRE(required == actual);
     34             }
     35         }
     36     }
     37 }
     38 
     39 SCENARIO("distinct_until_changed - empty", "[distinct_until_changed][operators]"){
     40     GIVEN("a source"){
     41         auto sc = rxsc::make_test();
     42         auto w = sc.create_worker();
     43         const rxsc::test::messages<int> on;
     44 
     45         auto xs = sc.make_hot_observable({
     46             on.next(150, 1),
     47             on.completed(250)
     48         });
     49 
     50         WHEN("distinct values are taken"){
     51 
     52             auto res = w.start(
     53                 [xs]() {
     54                     return xs.distinct_until_changed();
     55                 }
     56             );
     57 
     58             THEN("the output only contains complete message"){
     59                 auto required = rxu::to_vector({
     60                     on.completed(250)
     61                 });
     62                 auto actual = res.get_observer().messages();
     63                 REQUIRE(required == actual);
     64             }
     65 
     66             THEN("there was 1 subscription/unsubscription to the source"){
     67                 auto required = rxu::to_vector({
     68                     on.subscribe(200, 250)
     69                 });
     70                 auto actual = xs.subscriptions();
     71                 REQUIRE(required == actual);
     72             }
     73 
     74         }
     75     }
     76 }
     77 
     78 SCENARIO("distinct_until_changed - return", "[distinct_until_changed][operators]"){
     79     GIVEN("a source"){
     80         auto sc = rxsc::make_test();
     81         auto w = sc.create_worker();
     82         const rxsc::test::messages<int> on;
     83 
     84         auto xs = sc.make_hot_observable({
     85             on.next(150, 1),
     86             on.next(210, 2),
     87             on.completed(250)
     88         });
     89 
     90         WHEN("distinct values are taken"){
     91 
     92             auto res = w.start(
     93                 [xs]() {
     94                     return xs.distinct_until_changed();
     95                 }
     96             );
     97 
     98             THEN("the output only contains distinct items sent while subscribed"){
     99                 auto required = rxu::to_vector({
    100                     on.next(210, 2),
    101                     on.completed(250)
    102                 });
    103                 auto actual = res.get_observer().messages();
    104                 REQUIRE(required == actual);
    105             }
    106 
    107             THEN("there was 1 subscription/unsubscription to the source"){
    108                 auto required = rxu::to_vector({
    109                     on.subscribe(200, 250)
    110                 });
    111                 auto actual = xs.subscriptions();
    112                 REQUIRE(required == actual);
    113             }
    114 
    115         }
    116     }
    117 }
    118 
    119 SCENARIO("distinct_until_changed - throw", "[distinct_until_changed][operators]"){
    120     GIVEN("a source"){
    121         auto sc = rxsc::make_test();
    122         auto w = sc.create_worker();
    123         const rxsc::test::messages<int> on;
    124 
    125         std::runtime_error ex("distinct_until_changed on_error from source");
    126 
    127         auto xs = sc.make_hot_observable({
    128             on.next(150, 1),
    129             on.error(250, ex)
    130         });
    131 
    132         WHEN("distinct values are taken"){
    133 
    134             auto res = w.start(
    135                 [xs]() {
    136                     return xs.distinct_until_changed();
    137                 }
    138             );
    139 
    140             THEN("the output only contains only error"){
    141                 auto required = rxu::to_vector({
    142                     on.error(250, ex)
    143                 });
    144                 auto actual = res.get_observer().messages();
    145                 REQUIRE(required == actual);
    146             }
    147 
    148             THEN("there was 1 subscription/unsubscription to the source"){
    149                 auto required = rxu::to_vector({
    150                     on.subscribe(200, 250)
    151                 });
    152                 auto actual = xs.subscriptions();
    153                 REQUIRE(required == actual);
    154             }
    155 
    156         }
    157     }
    158 }
    159 
    160 SCENARIO("distinct_until_changed - all changes", "[distinct_until_changed][operators]"){
    161     GIVEN("a source"){
    162         auto sc = rxsc::make_test();
    163         auto w = sc.create_worker();
    164         const rxsc::test::messages<int> on;
    165 
    166         auto xs = sc.make_hot_observable({
    167             on.next(150, 1),
    168             on.next(210, 2),
    169             on.next(220, 3),
    170             on.next(230, 4),
    171             on.next(240, 5),
    172             on.completed(250)
    173         });
    174 
    175         WHEN("distinct values are taken"){
    176 
    177             auto res = w.start(
    178                 [xs]() {
    179                     return xs.distinct_until_changed();
    180                 }
    181             );
    182 
    183             THEN("the output only contains distinct items sent while subscribed"){
    184                 auto required = rxu::to_vector({
    185                     on.next(210, 2),
    186                     on.next(220, 3),
    187                     on.next(230, 4),
    188                     on.next(240, 5),
    189                     on.completed(250)
    190                 });
    191                 auto actual = res.get_observer().messages();
    192                 REQUIRE(required == actual);
    193             }
    194 
    195             THEN("there was 1 subscription/unsubscription to the source"){
    196                 auto required = rxu::to_vector({
    197                     on.subscribe(200, 250)
    198                 });
    199                 auto actual = xs.subscriptions();
    200                 REQUIRE(required == actual);
    201             }
    202 
    203         }
    204     }
    205 }
    206 
    207 SCENARIO("distinct_until_changed - all same", "[distinct_until_changed][operators]"){
    208     GIVEN("a source"){
    209         auto sc = rxsc::make_test();
    210         auto w = sc.create_worker();
    211         const rxsc::test::messages<int> on;
    212 
    213         auto xs = sc.make_hot_observable({
    214             on.next(150, 1),
    215             on.next(210, 2),
    216             on.next(220, 2),
    217             on.next(230, 2),
    218             on.next(240, 2),
    219             on.completed(250)
    220         });
    221 
    222         WHEN("distinct values are taken"){
    223 
    224             auto res = w.start(
    225                 [xs]() {
    226                     return xs.distinct_until_changed();
    227                 }
    228             );
    229 
    230             THEN("the output only contains distinct items sent while subscribed"){
    231                 auto required = rxu::to_vector({
    232                     on.next(210, 2),
    233                     on.completed(250)
    234                 });
    235                 auto actual = res.get_observer().messages();
    236                 REQUIRE(required == actual);
    237             }
    238 
    239             THEN("there was 1 subscription/unsubscription to the source"){
    240                 auto required = rxu::to_vector({
    241                     on.subscribe(200, 250)
    242                 });
    243                 auto actual = xs.subscriptions();
    244                 REQUIRE(required == actual);
    245             }
    246 
    247         }
    248     }
    249 }
    250 
    251 SCENARIO("distinct_until_changed - some changes", "[distinct_until_changed][operators]"){
    252     GIVEN("a source"){
    253         auto sc = rxsc::make_test();
    254         auto w = sc.create_worker();
    255         const rxsc::test::messages<int> on;
    256 
    257         auto xs = sc.make_hot_observable({
    258             on.next(150, 1),
    259             on.next(210, 2), //*
    260             on.next(215, 3), //*
    261             on.next(220, 3),
    262             on.next(225, 2), //*
    263             on.next(230, 2),
    264             on.next(230, 1), //*
    265             on.next(240, 2), //*
    266             on.completed(250)
    267         });
    268 
    269         WHEN("distinct values are taken"){
    270 
    271             auto res = w.start(
    272                 [xs]() {
    273                     return xs.distinct_until_changed();
    274                 }
    275             );
    276 
    277             THEN("the output only contains distinct items sent while subscribed"){
    278                 auto required = rxu::to_vector({
    279                     on.next(210, 2), //*
    280                     on.next(215, 3), //*
    281                     on.next(225, 2), //*
    282                     on.next(230, 1), //*
    283                     on.next(240, 2), //*
    284                     on.completed(250)
    285                 });
    286                 auto actual = res.get_observer().messages();
    287                 REQUIRE(required == actual);
    288             }
    289 
    290             THEN("there was 1 subscription/unsubscription to the source"){
    291                 auto required = rxu::to_vector({
    292                     on.subscribe(200, 250)
    293                 });
    294                 auto actual = xs.subscriptions();
    295                 REQUIRE(required == actual);
    296             }
    297 
    298         }
    299     }
    300 }
    301 
    302 struct A {
    303     int i;
    304 
    305     bool operator!=(const A& a) const {
    306         return i != a.i;
    307     }
    308 
    309     bool operator==(const A& a) const {
    310         return i == a.i;
    311     }
    312 };
    313 
    314 SCENARIO("distinct_until_changed - custom type", "[distinct_until_changed][operators]"){
    315     GIVEN("a source"){
    316         auto sc = rxsc::make_test();
    317         auto w = sc.create_worker();
    318         const rxsc::test::messages<A> on;
    319 
    320         auto xs = sc.make_hot_observable({
    321             on.next(150, A{1}),
    322             on.next(210, A{2}), //*
    323             on.next(215, A{3}), //*
    324             on.next(220, A{3}),
    325             on.next(225, A{2}), //*
    326             on.next(230, A{2}),
    327             on.next(230, A{1}), //*
    328             on.next(240, A{2}), //*
    329             on.completed(250)
    330         });
    331 
    332         WHEN("distinct values are taken"){
    333 
    334             auto res = w.start(
    335                 [xs]() {
    336                     return xs.distinct_until_changed();
    337                 }
    338             );
    339 
    340             THEN("the output only contains distinct items sent while subscribed"){
    341                 auto required = rxu::to_vector({
    342                     on.next(210, A{2}), //*
    343                     on.next(215, A{3}), //*
    344                     on.next(225, A{2}), //*
    345                     on.next(230, A{1}), //*
    346                     on.next(240, A{2}), //*
    347                     on.completed(250)
    348                 });
    349                 auto actual = res.get_observer().messages();
    350                 REQUIRE(required == actual);
    351             }
    352 
    353             THEN("there was 1 subscription/unsubscription to the source"){
    354                 auto required = rxu::to_vector({
    355                     on.subscribe(200, 250)
    356                 });
    357                 auto actual = xs.subscriptions();
    358                 REQUIRE(required == actual);
    359             }
    360 
    361         }
    362     }
    363 }
    364 
    365 SCENARIO("distinct_until_changed - custom predicate", "[distinct_until_changed][operators]"){
    366     GIVEN("a source"){
    367         auto sc = rxsc::make_test();
    368         auto w = sc.create_worker();
    369         const rxsc::test::messages<int> on;
    370 
    371         auto xs = sc.make_hot_observable({
    372             on.next(150, 1),
    373             on.next(210, 2), //*
    374             on.next(215, 3), //*
    375             on.next(220, 3),
    376             on.next(225, 2), //*
    377             on.next(230, 2),
    378             on.next(230, 1), //*
    379             on.next(240, 2), //*
    380             on.completed(250)
    381         });
    382 
    383         WHEN("distinct values are taken"){
    384 
    385             auto res = w.start(
    386                 [xs]() {
    387                     return xs
    388                             .distinct_until_changed([](int x, int y) { return x == y; })
    389                             // forget type to workaround lambda deduction bug on msvc 2013
    390                             .as_dynamic();
    391                 }
    392             );
    393 
    394             THEN("the output only contains distinct items sent while subscribed"){
    395                 auto required = rxu::to_vector({
    396                     on.next(210, 2), //*
    397                     on.next(215, 3), //*
    398                     on.next(225, 2), //*
    399                     on.next(230, 1), //*
    400                     on.next(240, 2), //*
    401                     on.completed(250)
    402                 });
    403                 auto actual = res.get_observer().messages();
    404                 REQUIRE(required == actual);
    405             }
    406 
    407             THEN("there was 1 subscription/unsubscription to the source"){
    408                 auto required = rxu::to_vector({
    409                     on.subscribe(200, 250)
    410                 });
    411                 auto actual = xs.subscriptions();
    412                 REQUIRE(required == actual);
    413             }
    414 
    415         }
    416     }
    417 }
    418