Home | History | Annotate | Download | only in subscriptions
      1 #include "../test.h"
      2 
      3 #include <rxcpp/rx-coroutine.hpp>
      4 
      5 #ifdef _RESUMABLE_FUNCTIONS_SUPPORTED
      6 
      7 SCENARIO("coroutine completes", "[coroutine]"){
      8     GIVEN("a source") {
      9         auto sc = rxsc::make_test();
     10         auto w = sc.create_worker();
     11         const rxsc::test::messages<int> on;
     12 
     13         auto xs = sc.make_hot_observable({
     14             on.next(110, 1),
     15             on.next(210, 2),
     16             on.next(310, 10),
     17             on.completed(350)
     18         });
     19 
     20         WHEN("for co_await"){
     21 
     22             std::vector<typename rxsc::test::messages<int>::recorded_type> messages;
     23 
     24             w.advance_to(rxsc::test::subscribed_time);
     25 
     26             auto d = [&]() -> std::future<void> {
     27                 RXCPP_TRY {
     28                     for co_await (auto n : xs | rxo::as_dynamic()) {
     29                         messages.push_back(on.next(w.clock(), n));
     30                     }
     31                     messages.push_back(on.completed(w.clock()));
     32                 } RXCPP_CATCH(...) {
     33                     messages.push_back(on.error(w.clock(), rxu::current_exception()));
     34                 }
     35             }();
     36 
     37             w.advance_to(rxsc::test::unsubscribed_time);
     38 
     39             THEN("the function completed"){
     40                 REQUIRE(d.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
     41             }
     42 
     43             THEN("the output only contains true"){
     44                 auto required = rxu::to_vector({
     45                     on.next(210, 2),
     46                     on.next(310, 10),
     47                     on.completed(350)
     48                 });
     49                 auto actual = messages;
     50                 REQUIRE(required == actual);
     51             }
     52 
     53             THEN("there was 1 subscription/unsubscription to the source"){
     54                 auto required = rxu::to_vector({
     55                     on.subscribe(200, 350)
     56                 });
     57                 auto actual = xs.subscriptions();
     58                 REQUIRE(required == actual);
     59             }
     60 
     61         }
     62     }
     63 }
     64 
     65 SCENARIO("coroutine errors", "[coroutine]"){
     66     GIVEN("a source") {
     67         auto sc = rxsc::make_test();
     68         auto w = sc.create_worker();
     69         const rxsc::test::messages<int> on;
     70 
     71         std::runtime_error ex("error in source");
     72 
     73         auto xs = sc.make_hot_observable({
     74             on.next(110, 1),
     75             on.next(210, 2),
     76             on.error(310, ex),
     77             on.next(310, 10),
     78             on.completed(350)
     79         });
     80 
     81         WHEN("for co_await"){
     82 
     83             std::vector<typename rxsc::test::messages<int>::recorded_type> messages;
     84 
     85             w.advance_to(rxsc::test::subscribed_time);
     86 
     87             auto d = [&]() -> std::future<void> {
     88                 RXCPP_TRY {
     89                     for co_await (auto n : xs | rxo::as_dynamic()) {
     90                         messages.push_back(on.next(w.clock(), n));
     91                     }
     92                     messages.push_back(on.completed(w.clock()));
     93                 } RXCPP_CATCH(...) {
     94                     messages.push_back(on.error(w.clock(), rxu::current_exception()));
     95                 }
     96             }();
     97 
     98             w.advance_to(rxsc::test::unsubscribed_time);
     99 
    100             THEN("the function completed"){
    101                 REQUIRE(d.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
    102             }
    103 
    104             THEN("the output only contains true"){
    105                 auto required = rxu::to_vector({
    106                     on.next(210, 2),
    107                     on.error(310, ex)
    108                 });
    109                 auto actual = messages;
    110                 REQUIRE(required == actual);
    111             }
    112 
    113             THEN("there was 1 subscription/unsubscription to the source"){
    114                 auto required = rxu::to_vector({
    115                     on.subscribe(200, 310)
    116                 });
    117                 auto actual = xs.subscriptions();
    118                 REQUIRE(required == actual);
    119             }
    120 
    121         }
    122     }
    123 }
    124 
    125 #endif
    126