Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include "rxcpp/operators/rx-reduce.hpp"
      3 
      4 SCENARIO("reduce some data with seed", "[reduce][operators]"){
      5     GIVEN("a test hot observable of ints"){
      6         auto sc = rxsc::make_test();
      7         auto w = sc.create_worker();
      8         const rxsc::test::messages<int> on;
      9 
     10         int seed = 42;
     11 
     12         auto xs = sc.make_hot_observable({
     13             on.next(150, 1),
     14             on.next(210, 0),
     15             on.next(220, 1),
     16             on.next(230, 2),
     17             on.next(240, 3),
     18             on.next(250, 4),
     19             on.completed(260)
     20         });
     21 
     22         WHEN("mapped to ints that are one larger"){
     23 
     24             auto res = w.start(
     25                 [&]() {
     26                     return xs
     27                         .reduce(seed,
     28                             [](int sum, int x) {
     29                                 return sum + x;
     30                             },
     31                             [](int sum) {
     32                                 return sum * 5;
     33                             })
     34                         // forget type to workaround lambda deduction bug on msvc 2013
     35                         .as_dynamic();
     36                 }
     37             );
     38 
     39             THEN("the output stops on completion"){
     40                 auto required = rxu::to_vector({
     41                     on.next(260, (seed + 0 + 1 + 2 + 3 + 4) * 5),
     42                     on.completed(260)
     43                 });
     44                 auto actual = res.get_observer().messages();
     45                 REQUIRE(required == actual);
     46             }
     47 
     48             THEN("there was one subscription and one unsubscription"){
     49                 auto required = rxu::to_vector({
     50                     on.subscribe(200, 260)
     51                 });
     52                 auto actual = xs.subscriptions();
     53                 REQUIRE(required == actual);
     54             }
     55         }
     56     }
     57 }
     58 
     59 SCENARIO("accumulate some data with seed", "[accumulate][reduce][operators]"){
     60     GIVEN("a test hot observable of ints"){
     61         auto sc = rxsc::make_test();
     62         auto w = sc.create_worker();
     63         const rxsc::test::messages<int> on;
     64 
     65         int seed = 42;
     66 
     67         auto xs = sc.make_hot_observable({
     68             on.next(150, 1),
     69             on.next(210, 0),
     70             on.next(220, 1),
     71             on.next(230, 2),
     72             on.next(240, 3),
     73             on.next(250, 4),
     74             on.completed(260)
     75         });
     76 
     77         WHEN("mapped to ints that are one larger"){
     78 
     79             auto res = w.start(
     80                 [&]() {
     81                     return xs
     82                         .accumulate(seed,
     83                             [](int sum, int x) {
     84                                 return sum + x;
     85                             },
     86                             [](int sum) {
     87                                 return sum * 5;
     88                             })
     89                         // forget type to workaround lambda deduction bug on msvc 2013
     90                         .as_dynamic();
     91                 }
     92             );
     93 
     94             THEN("the output stops on completion"){
     95                 auto required = rxu::to_vector({
     96                     on.next(260, (seed + 0 + 1 + 2 + 3 + 4) * 5),
     97                     on.completed(260)
     98                 });
     99                 auto actual = res.get_observer().messages();
    100                 REQUIRE(required == actual);
    101             }
    102 
    103             THEN("there was one subscription and one unsubscription"){
    104                 auto required = rxu::to_vector({
    105                     on.subscribe(200, 260)
    106                 });
    107                 auto actual = xs.subscriptions();
    108                 REQUIRE(required == actual);
    109             }
    110         }
    111     }
    112 }
    113 
    114 SCENARIO("average some data", "[reduce][average][operators]"){
    115     GIVEN("a test hot observable of ints"){
    116         auto sc = rxsc::make_test();
    117         auto w = sc.create_worker();
    118         const rxsc::test::messages<int> on;
    119         const rxsc::test::messages<double> d_on;
    120 
    121         auto xs = sc.make_hot_observable({
    122             on.next(150, 1),
    123             on.next(210, 3),
    124             on.next(220, 4),
    125             on.next(230, 2),
    126             on.completed(250)
    127         });
    128 
    129         WHEN("mapped to ints that are one larger"){
    130 
    131             auto res = w.start(
    132                 [&]() {
    133                     return xs.average();
    134                 }
    135             );
    136 
    137             THEN("the output stops on completion"){
    138                 auto required = rxu::to_vector({
    139                     d_on.next(250, 3.0),
    140                     d_on.completed(250)
    141                 });
    142                 auto actual = res.get_observer().messages();
    143                 REQUIRE(required == actual);
    144             }
    145 
    146             THEN("there was one subscription and one unsubscription"){
    147                 auto required = rxu::to_vector({
    148                     on.subscribe(200, 250)
    149                 });
    150                 auto actual = xs.subscriptions();
    151                 REQUIRE(required == actual);
    152             }
    153         }
    154     }
    155 }
    156 
    157 SCENARIO("sum some data", "[reduce][sum][operators]"){
    158     GIVEN("a test hot observable of ints"){
    159         auto sc = rxsc::make_test();
    160         auto w = sc.create_worker();
    161         const rxsc::test::messages<int> on;
    162         const rxsc::test::messages<int> d_on;
    163 
    164         auto xs = sc.make_hot_observable({
    165              on.next(150, 1),
    166              on.next(210, 3),
    167              on.next(220, 4),
    168              on.next(230, 2),
    169              on.completed(250)
    170          });
    171 
    172         WHEN("sum is calculated"){
    173 
    174             auto res = w.start(
    175                 [&]() {
    176                     return xs.sum();
    177                 }
    178             );
    179 
    180             THEN("the output contains the sum of source values"){
    181                 auto required = rxu::to_vector({
    182                     d_on.next(250, 9),
    183                     d_on.completed(250)
    184                 });
    185                 auto actual = res.get_observer().messages();
    186                 REQUIRE(required == actual);
    187             }
    188 
    189             THEN("there was one subscription and one unsubscription"){
    190                 auto required = rxu::to_vector({
    191                     on.subscribe(200, 250)
    192                 });
    193                 auto actual = xs.subscriptions();
    194                 REQUIRE(required == actual);
    195             }
    196         }
    197     }
    198 }
    199 
    200 SCENARIO("max", "[reduce][max][operators]"){
    201     GIVEN("a test hot observable of ints"){
    202         auto sc = rxsc::make_test();
    203         auto w = sc.create_worker();
    204         const rxsc::test::messages<int> on;
    205         const rxsc::test::messages<int> d_on;
    206 
    207         auto xs = sc.make_hot_observable({
    208             on.next(150, 1),
    209             on.next(210, 3),
    210             on.next(220, 4),
    211             on.next(230, 2),
    212             on.completed(250)
    213         });
    214 
    215         WHEN("max is calculated"){
    216 
    217             auto res = w.start(
    218                 [&]() {
    219                     return xs.max();
    220                 }
    221             );
    222 
    223             THEN("the output contains the max of source values"){
    224                 auto required = rxu::to_vector({
    225                     d_on.next(250, 4),
    226                     d_on.completed(250)
    227                 });
    228                 auto actual = res.get_observer().messages();
    229                 REQUIRE(required == actual);
    230             }
    231 
    232             THEN("there was one subscription and one unsubscription"){
    233                 auto required = rxu::to_vector({
    234                     on.subscribe(200, 250)
    235                 });
    236                 auto actual = xs.subscriptions();
    237                 REQUIRE(required == actual);
    238             }
    239         }
    240     }
    241 }
    242 
    243 // Does not work because calling max() on an empty stream throws an exception
    244 // which will crash when exceptions are disabled.
    245 //
    246 // TODO: the max internal implementation should be rewritten not to throw exceptions.
    247 SCENARIO("max, empty", "[reduce][max][operators][!throws]"){
    248     GIVEN("a test hot observable of ints"){
    249         auto sc = rxsc::make_test();
    250         auto w = sc.create_worker();
    251         const rxsc::test::messages<int> on;
    252         const rxsc::test::messages<int> d_on;
    253 
    254         std::runtime_error ex("max on_error");
    255 
    256         auto xs = sc.make_hot_observable({
    257             on.next(150, 1),
    258             on.completed(250)
    259         });
    260 
    261         WHEN("max is calculated"){
    262 
    263             auto res = w.start(
    264                 [&]() {
    265                   return xs.max();
    266                 }
    267             );
    268 
    269             THEN("the output contains only error message"){
    270                 auto required = rxu::to_vector({
    271                     d_on.error(250, ex)
    272                 });
    273                 auto actual = res.get_observer().messages();
    274                 REQUIRE(required == actual);
    275             }
    276 
    277             THEN("there was one subscription and one unsubscription"){
    278                 auto required = rxu::to_vector({
    279                     on.subscribe(200, 250)
    280                 });
    281                 auto actual = xs.subscriptions();
    282                 REQUIRE(required == actual);
    283             }
    284         }
    285     }
    286 }
    287 
    288 SCENARIO("max, error", "[reduce][max][operators]"){
    289     GIVEN("a test hot observable of ints"){
    290         auto sc = rxsc::make_test();
    291         auto w = sc.create_worker();
    292         const rxsc::test::messages<int> on;
    293         const rxsc::test::messages<int> d_on;
    294 
    295         std::runtime_error ex("max on_error from source");
    296 
    297         auto xs = sc.make_hot_observable({
    298             on.next(150, 1),
    299             on.error(250, ex)
    300         });
    301 
    302         WHEN("max is calculated"){
    303 
    304             auto res = w.start(
    305                 [&]() {
    306                   return xs.max();
    307                 }
    308             );
    309 
    310             THEN("the output contains only error message"){
    311                 auto required = rxu::to_vector({
    312                     d_on.error(250, ex)
    313                 });
    314                 auto actual = res.get_observer().messages();
    315                 REQUIRE(required == actual);
    316             }
    317 
    318             THEN("there was one subscription and one unsubscription"){
    319                 auto required = rxu::to_vector({
    320                     on.subscribe(200, 250)
    321                 });
    322                 auto actual = xs.subscriptions();
    323                 REQUIRE(required == actual);
    324             }
    325         }
    326     }
    327 }
    328 
    329 SCENARIO("min", "[reduce][min][operators]"){
    330     GIVEN("a test hot observable of ints"){
    331         auto sc = rxsc::make_test();
    332         auto w = sc.create_worker();
    333         const rxsc::test::messages<int> on;
    334         const rxsc::test::messages<int> d_on;
    335 
    336         auto xs = sc.make_hot_observable({
    337             on.next(150, 1),
    338             on.next(210, 3),
    339             on.next(220, 4),
    340             on.next(230, 2),
    341             on.completed(250)
    342         });
    343 
    344         WHEN("min is calculated"){
    345 
    346             auto res = w.start(
    347                 [&]() {
    348                   return xs.min();
    349                 }
    350             );
    351 
    352             THEN("the output contains the min of source values"){
    353                 auto required = rxu::to_vector({
    354                     d_on.next(250, 2),
    355                     d_on.completed(250)
    356                 });
    357                 auto actual = res.get_observer().messages();
    358                 REQUIRE(required == actual);
    359             }
    360 
    361             THEN("there was one subscription and one unsubscription"){
    362                 auto required = rxu::to_vector({
    363                     on.subscribe(200, 250)
    364                 });
    365                 auto actual = xs.subscriptions();
    366                 REQUIRE(required == actual);
    367             }
    368         }
    369     }
    370 }
    371 
    372 // Does not work with exceptions disabled, min will throw when stream is empty
    373 // and this crashes immediately.
    374 // TODO: min implementation should be rewritten not to throw exceptions.
    375 SCENARIO("min, empty", "[reduce][min][operators][!throws]"){
    376     GIVEN("a test hot observable of ints"){
    377         auto sc = rxsc::make_test();
    378         auto w = sc.create_worker();
    379         const rxsc::test::messages<int> on;
    380         const rxsc::test::messages<int> d_on;
    381 
    382         std::runtime_error ex("min on_error");
    383 
    384         auto xs = sc.make_hot_observable({
    385             on.next(150, 1),
    386             on.completed(250)
    387         });
    388 
    389         WHEN("min is calculated"){
    390 
    391             auto res = w.start(
    392                 [&]() {
    393                   return xs.min();
    394                 }
    395             );
    396 
    397             THEN("the output contains only error message"){
    398                 auto required = rxu::to_vector({
    399                     d_on.error(250, ex)
    400                 });
    401                 auto actual = res.get_observer().messages();
    402                 REQUIRE(required == actual);
    403             }
    404 
    405             THEN("there was one subscription and one unsubscription"){
    406                 auto required = rxu::to_vector({
    407                     on.subscribe(200, 250)
    408                 });
    409                 auto actual = xs.subscriptions();
    410                 REQUIRE(required == actual);
    411             }
    412         }
    413     }
    414 }
    415 
    416 SCENARIO("min, error", "[reduce][min][operators]"){
    417     GIVEN("a test hot observable of ints"){
    418         auto sc = rxsc::make_test();
    419         auto w = sc.create_worker();
    420         const rxsc::test::messages<int> on;
    421         const rxsc::test::messages<int> d_on;
    422 
    423         std::runtime_error ex("min on_error from source");
    424 
    425         auto xs = sc.make_hot_observable({
    426             on.next(150, 1),
    427             on.error(250, ex)
    428         });
    429 
    430         WHEN("min is calculated"){
    431 
    432             auto res = w.start(
    433                 [&]() {
    434                   return xs.min();
    435                 }
    436             );
    437 
    438             THEN("the output contains only error message"){
    439                 auto required = rxu::to_vector({
    440                     d_on.error(250, ex)
    441                 });
    442                 auto actual = res.get_observer().messages();
    443                 REQUIRE(required == actual);
    444             }
    445 
    446             THEN("there was one subscription and one unsubscription"){
    447                 auto required = rxu::to_vector({
    448                     on.subscribe(200, 250)
    449                 });
    450                 auto actual = xs.subscriptions();
    451                 REQUIRE(required == actual);
    452             }
    453         }
    454     }
    455 }
    456