Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include "rxcpp/operators/rx-timeout.hpp"
      3 
      4 using namespace std::chrono;
      5 
      6 SCENARIO("should timeout if the source never emits any items", "[timeout][operators]"){
      7     GIVEN("a source"){
      8         auto sc = rxsc::make_test();
      9         auto so = rx::synchronize_in_one_worker(sc);
     10         auto w = sc.create_worker();
     11         const rxsc::test::messages<int> on;
     12 
     13         rxcpp::timeout_error ex("timeout has occurred");
     14 
     15         auto xs = sc.make_hot_observable({
     16             on.next(150, 1)
     17         });
     18 
     19         WHEN("timeout is set"){
     20 
     21             auto res = w.start(
     22                 [so, xs]() {
     23                     return xs
     24                         | rxo::timeout(milliseconds(10), so);
     25                 }
     26             );
     27 
     28             THEN("the error notification message is captured"){
     29                 auto required = rxu::to_vector({
     30                     on.error(211, ex)
     31                 });
     32                 auto actual = res.get_observer().messages();
     33                 REQUIRE(required == actual);
     34             }
     35 
     36             THEN("there was 1 subscription/unsubscription to the source"){
     37                 auto required = rxu::to_vector({
     38                     on.subscribe(200, 212)
     39                 });
     40                 auto actual = xs.subscriptions();
     41                 REQUIRE(required == actual);
     42             }
     43         }
     44     }
     45 }
     46 
     47 SCENARIO("should not timeout if completed before the specified timeout duration", "[timeout][operators]"){
     48     GIVEN("a source"){
     49         auto sc = rxsc::make_test();
     50         auto so = rx::synchronize_in_one_worker(sc);
     51         auto w = sc.create_worker();
     52         const rxsc::test::messages<int> on;
     53 
     54         auto xs = sc.make_hot_observable({
     55             on.next(150, 1),
     56             on.completed(250)
     57         });
     58 
     59         WHEN("timeout is set"){
     60 
     61             auto res = w.start(
     62                 [so, xs]() {
     63                     return xs.timeout(so, milliseconds(100));
     64                 }
     65             );
     66 
     67             THEN("the output only contains complete message"){
     68                 auto required = rxu::to_vector({
     69                     on.completed(251)
     70                 });
     71                 auto actual = res.get_observer().messages();
     72                 REQUIRE(required == actual);
     73             }
     74 
     75             THEN("there was 1 subscription/unsubscription to the source"){
     76                 auto required = rxu::to_vector({
     77                     on.subscribe(200, 250)
     78                 });
     79                 auto actual = xs.subscriptions();
     80                 REQUIRE(required == actual);
     81             }
     82 
     83         }
     84     }
     85 }
     86 
     87 SCENARIO("should not timeout if all items are emitted within the specified timeout duration", "[timeout][operators]"){
     88     GIVEN("a source"){
     89         auto sc = rxsc::make_test();
     90         auto so = rx::synchronize_in_one_worker(sc);
     91         auto w = sc.create_worker();
     92         const rxsc::test::messages<int> on;
     93 
     94         auto xs = sc.make_hot_observable({
     95             on.next(150, 1),
     96             on.next(210, 2),
     97             on.next(240, 3),
     98             on.completed(250)
     99         });
    100 
    101         WHEN("timeout is set"){
    102 
    103             auto res = w.start(
    104                 [so, xs]() {
    105                     return xs.timeout(milliseconds(40), so);
    106                 }
    107             );
    108 
    109             THEN("the output contains the emitted items while subscribed"){
    110                 auto required = rxu::to_vector({
    111                     on.next(211, 2),
    112                     on.next(241, 3),
    113                     on.completed(251)
    114                 });
    115                 auto actual = res.get_observer().messages();
    116                 REQUIRE(required == actual);
    117             }
    118 
    119             THEN("there was 1 subscription/unsubscription to the source"){
    120                 auto required = rxu::to_vector({
    121                     on.subscribe(200, 250)
    122                 });
    123                 auto actual = xs.subscriptions();
    124                 REQUIRE(required == actual);
    125             }
    126 
    127         }
    128     }
    129 }
    130 
    131 SCENARIO("should timeout if there are no emitted items within the timeout duration", "[timeout][operators]"){
    132     GIVEN("a source"){
    133         auto sc = rxsc::make_test();
    134         auto so = rx::synchronize_in_one_worker(sc);
    135         auto w = sc.create_worker();
    136         const rxsc::test::messages<int> on;
    137 
    138         rxcpp::timeout_error ex("timeout has occurred");
    139 
    140         auto xs = sc.make_hot_observable({
    141             on.next(150, 1),
    142             on.next(210, 2),
    143             on.next(240, 3),
    144             // -- no emissions
    145             on.completed(300)
    146         });
    147 
    148         WHEN("timeout is set"){
    149 
    150             auto res = w.start(
    151                 [so, xs]() {
    152                     return xs.timeout(milliseconds(40), so);
    153                 }
    154             );
    155 
    156             THEN("an error notification message is captured"){
    157                 auto required = rxu::to_vector({
    158                     on.next(211, 2),
    159                     on.next(241, 3),
    160                     on.error(281, ex)
    161                 });
    162                 auto actual = res.get_observer().messages();
    163                 REQUIRE(required == actual);
    164             }
    165 
    166             THEN("there was 1 subscription/unsubscription to the source"){
    167                 auto required = rxu::to_vector({
    168                     on.subscribe(200, 282)
    169                 });
    170                 auto actual = xs.subscriptions();
    171                 REQUIRE(required == actual);
    172             }
    173 
    174         }
    175     }
    176 }
    177 
    178 SCENARIO("should not timeout if there is an error", "[timeout][operators]"){
    179     GIVEN("a source"){
    180         auto sc = rxsc::make_test();
    181         auto so = rx::synchronize_in_one_worker(sc);
    182         auto w = sc.create_worker();
    183         const rxsc::test::messages<int> on;
    184 
    185         std::runtime_error ex("on_error from source");
    186 
    187         auto xs = sc.make_hot_observable({
    188             on.next(150, 1),
    189             on.error(250, ex)
    190         });
    191 
    192         WHEN("timeout is set"){
    193 
    194             auto res = w.start(
    195                 [so, xs]() {
    196                     return xs.timeout(milliseconds(100), so);
    197                 }
    198             );
    199 
    200             THEN("the output contains only an error message"){
    201                 auto required = rxu::to_vector({
    202                     on.error(251, ex)
    203                 });
    204                 auto actual = res.get_observer().messages();
    205                 REQUIRE(required == actual);
    206             }
    207 
    208             THEN("there was 1 subscription/unsubscription to the source"){
    209                 auto required = rxu::to_vector({
    210                     on.subscribe(200, 250)
    211                 });
    212                 auto actual = xs.subscriptions();
    213                 REQUIRE(required == actual);
    214             }
    215 
    216         }
    217     }
    218 }
    219