Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include "rxcpp/operators/rx-time_interval.hpp"
      3 
      4 using namespace std::chrono;
      5 
      6 SCENARIO("should not emit time intervals if the source never emits any items", "[time_interval][operators]"){
      7     GIVEN("a source"){
      8         typedef rxsc::detail::test_type::clock_type::time_point::duration duration;
      9 
     10         auto sc = rxsc::make_test();
     11         auto w = sc.create_worker();
     12         const rxsc::test::messages<int> on;
     13 
     14         auto xs = sc.make_hot_observable({
     15             on.next(150, 1)
     16         });
     17 
     18         WHEN("time_interval operator is invoked"){
     19 
     20             auto res = w.start(
     21                 [xs]() {
     22                     return xs
     23                         | rxo::time_interval();
     24                 }
     25             );
     26 
     27             THEN("the output is empty"){
     28                 auto required = std::vector<rxsc::test::messages<duration>::recorded_type>();
     29                 auto actual = res.get_observer().messages();
     30                 REQUIRE(required == actual);
     31             }
     32 
     33             THEN("there was 1 subscription/unsubscription to the source"){
     34                 auto required = rxu::to_vector({
     35                     on.subscribe(200, 1000)
     36                 });
     37                 auto actual = xs.subscriptions();
     38                 REQUIRE(required == actual);
     39             }
     40         }
     41     }
     42 }
     43 
     44 SCENARIO("should not emit time intervals if the source observable is empty", "[time_interval][operators]"){
     45     GIVEN("a source"){
     46         typedef rxsc::detail::test_type::clock_type::time_point::duration duration;
     47 
     48         auto sc = rxsc::make_test();
     49         auto so = rx::synchronize_in_one_worker(sc);
     50         auto w = sc.create_worker();
     51         const rxsc::test::messages<int> on;
     52         const rxsc::test::messages<duration> on_time_interval;
     53 
     54         auto xs = sc.make_hot_observable({
     55             on.next(150, 1),
     56             on.completed(250)
     57         });
     58 
     59         WHEN("time_interval operator is invoked"){
     60 
     61             auto res = w.start(
     62                 [so, xs]() {
     63                     return xs.time_interval();
     64                 }
     65             );
     66 
     67             THEN("the output only contains complete message"){
     68                 auto required = rxu::to_vector({
     69                     on_time_interval.completed(250)
     70                 });
     71                 auto actual = res.get_observer().messages();
     72                 REQUIRE(required == actual);
     73             }
     74 
     75             THEN("there was 1 subscription/unsubscription to the source"){
     76                 auto required = rxu::to_vector({
     77                     on.subscribe(200, 250)
     78                 });
     79                 auto actual = xs.subscriptions();
     80                 REQUIRE(required == actual);
     81             }
     82 
     83         }
     84     }
     85 }
     86 
     87 SCENARIO("should emit time intervals for every item in the source observable", "[time_interval][operators]"){
     88     GIVEN("a source"){
     89         typedef rxsc::detail::test_type::clock_type clock_type;
     90         typedef clock_type::time_point::duration duration;
     91 
     92         auto sc = rxsc::make_test();
     93         auto so = rx::synchronize_in_one_worker(sc);
     94         auto w = sc.create_worker();
     95         const rxsc::test::messages<int> on;
     96         const rxsc::test::messages<duration> on_time_interval;
     97 
     98         auto xs = sc.make_hot_observable({
     99             on.next(150, 1),
    100             on.next(210, 2),
    101             on.next(240, 3),
    102             on.completed(250)
    103         });
    104 
    105         WHEN("time_interval operator is invoked"){
    106 
    107             auto res = w.start(
    108                 [so, xs]() {
    109                     return xs.time_interval(so);
    110                 }
    111             );
    112 
    113             THEN("the output contains the emitted items while subscribed"){
    114                 auto required = rxu::to_vector({
    115                     on_time_interval.next(210, milliseconds(10)),
    116                     on_time_interval.next(240, milliseconds(30)),
    117                     on_time_interval.completed(250)
    118                 });
    119                 auto actual = res.get_observer().messages();
    120                 REQUIRE(required == actual);
    121             }
    122 
    123             THEN("there was 1 subscription/unsubscription to the source"){
    124                 auto required = rxu::to_vector({
    125                     on.subscribe(200, 250)
    126                 });
    127                 auto actual = xs.subscriptions();
    128                 REQUIRE(required == actual);
    129             }
    130 
    131         }
    132     }
    133 }
    134 
    135 SCENARIO("should emit time intervals and an error if there is an error", "[time_interval][operators]"){
    136     GIVEN("a source"){
    137         typedef rxsc::detail::test_type::clock_type clock_type;
    138         typedef clock_type::time_point::duration duration;
    139 
    140         auto sc = rxsc::make_test();
    141         auto so = rx::synchronize_in_one_worker(sc);
    142         auto w = sc.create_worker();
    143         const rxsc::test::messages<int> on;
    144         const rxsc::test::messages<duration> on_time_interval;
    145 
    146         std::runtime_error ex("on_error from source");
    147 
    148         auto xs = sc.make_hot_observable({
    149             on.next(150, 1),
    150             on.next(210, 2),
    151             on.next(240, 3),
    152             on.error(250, ex)
    153         });
    154 
    155         WHEN("time_interval operator is invoked"){
    156 
    157             auto res = w.start(
    158                 [so, xs]() {
    159                     return xs.time_interval(so);
    160                 }
    161             );
    162 
    163             THEN("the output contains emitted items and an error"){
    164                 auto required = rxu::to_vector({
    165                     on_time_interval.next(210, milliseconds(10)),
    166                     on_time_interval.next(240, milliseconds(30)),
    167                     on_time_interval.error(250, ex)
    168                 });
    169                 auto actual = res.get_observer().messages();
    170                 REQUIRE(required == actual);
    171             }
    172 
    173             THEN("there was 1 subscription/unsubscription to the source"){
    174                 auto required = rxu::to_vector({
    175                     on.subscribe(200, 250)
    176                 });
    177                 auto actual = xs.subscriptions();
    178                 REQUIRE(required == actual);
    179             }
    180 
    181         }
    182     }
    183 }
    184