Home | History | Annotate | Download | only in doxygen
      1 #include "rxcpp/rx.hpp"
      2 
      3 #include "rxcpp/rx-test.hpp"
      4 #include "catch.hpp"
      5 
      6 SCENARIO("take_until sample"){
      7     printf("//! [take_until sample]\n");
      8     auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
      9     auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25));
     10     auto values = source.take_until(trigger);
     11     values.
     12         subscribe(
     13             [](long v){printf("OnNext: %ld\n", v);},
     14             [](){printf("OnCompleted\n");});
     15     printf("//! [take_until sample]\n");
     16 }
     17 
     18 SCENARIO("take_until time sample"){
     19     printf("//! [take_until time sample]\n");
     20     auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
     21     auto values = source.take_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(25));
     22     values.
     23         subscribe(
     24             [](long v){printf("OnNext: %ld\n", v);},
     25             [](){printf("OnCompleted\n");});
     26     printf("//! [take_until time sample]\n");
     27 }
     28 
     29 #include "main.hpp"
     30 
     31 SCENARIO("threaded take_until sample"){
     32     printf("//! [threaded take_until sample]\n");
     33     printf("[thread %s] Start task\n", get_pid().c_str());
     34     auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
     35         printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v);
     36         return v;
     37     });
     38     auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){
     39         printf("[thread %s] Trigger emits, value = %ld\n", get_pid().c_str(), v);
     40         return v;
     41     });
     42     auto values = source.take_until(trigger, rxcpp::observe_on_new_thread());
     43     values.
     44         as_blocking().
     45         subscribe(
     46             [](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
     47             [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
     48     printf("[thread %s] Finish task\n", get_pid().c_str());
     49     printf("//! [threaded take_until sample]\n");
     50 }
     51 
     52 SCENARIO("threaded take_until time sample"){
     53     printf("//! [threaded take_until time sample]\n");
     54     printf("[thread %s] Start task\n", get_pid().c_str());
     55     auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
     56         printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v);
     57         return v;
     58     });
     59     auto scheduler = rxcpp::observe_on_new_thread();
     60     auto values = source.take_until(scheduler.now() + std::chrono::milliseconds(25), scheduler);
     61     values.
     62         as_blocking().
     63         subscribe(
     64             [](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
     65             [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
     66     printf("[thread %s] Finish task\n", get_pid().c_str());
     67     printf("//! [threaded take_until time sample]\n");
     68 }
     69