Home | History | Annotate | Download | only in doxygen
      1 #include "rxcpp/rx.hpp"
      2 
      3 #include "rxcpp/rx-test.hpp"
      4 #include "catch.hpp"
      5 
      6 SCENARIO("buffer count sample"){
      7     printf("//! [buffer count sample]\n");
      8     auto values = rxcpp::observable<>::range(1, 5).buffer(2);
      9     values.
     10         subscribe(
     11             [](std::vector<int> v){
     12                 printf("OnNext:");
     13                 std::for_each(v.begin(), v.end(), [](int a){
     14                     printf(" %d", a);
     15                 });
     16                 printf("\n");
     17             },
     18             [](){printf("OnCompleted\n");});
     19     printf("//! [buffer count sample]\n");
     20 }
     21 
     22 SCENARIO("buffer count+skip sample"){
     23     printf("//! [buffer count+skip sample]\n");
     24     auto values = rxcpp::observable<>::range(1, 7).buffer(2, 3);
     25     values.
     26         subscribe(
     27             [](std::vector<int> v){
     28                 printf("OnNext:");
     29                 std::for_each(v.begin(), v.end(), [](int a){
     30                     printf(" %d", a);
     31                 });
     32                 printf("\n");
     33             },
     34             [](){printf("OnCompleted\n");});
     35     printf("//! [buffer count+skip sample]\n");
     36 }
     37 
     38 #include "main.hpp"
     39 
     40 SCENARIO("buffer period+skip+coordination sample"){
     41     printf("//! [buffer period+skip+coordination sample]\n");
     42     printf("[thread %s] Start task\n", get_pid().c_str());
     43     auto period = std::chrono::milliseconds(4);
     44     auto skip = std::chrono::milliseconds(6);
     45     auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
     46         map([](long v){
     47             printf("[thread %s] Interval OnNext: %ld\n", get_pid().c_str(), v);
     48             return v;
     49         }).
     50         take(7).
     51         buffer_with_time(period, skip, rxcpp::observe_on_new_thread());
     52     values.
     53         as_blocking().
     54         subscribe(
     55             [](std::vector<long> v){
     56                 printf("[thread %s] OnNext:", get_pid().c_str());
     57                 std::for_each(v.begin(), v.end(), [](long a){
     58                     printf(" %ld", a);
     59                 });
     60                 printf("\n");
     61             },
     62             [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
     63     printf("[thread %s] Finish task\n", get_pid().c_str());
     64     printf("//! [buffer period+skip+coordination sample]\n");
     65 }
     66 
     67 SCENARIO("buffer period+skip sample"){
     68     printf("//! [buffer period+skip sample]\n");
     69     auto period = std::chrono::milliseconds(4);
     70     auto skip = std::chrono::milliseconds(6);
     71     auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
     72         take(7).
     73         buffer_with_time(period, skip);
     74     values.
     75         subscribe(
     76             [](std::vector<long> v){
     77                 printf("OnNext:");
     78                 std::for_each(v.begin(), v.end(), [](long a){
     79                     printf(" %ld", a);
     80                 });
     81                 printf("\n");
     82             },
     83             [](){printf("OnCompleted\n");});
     84     printf("//! [buffer period+skip sample]\n");
     85 }
     86 
     87 SCENARIO("buffer period+skip overlapping sample"){
     88     printf("//! [buffer period+skip overlapping sample]\n");
     89     auto period = std::chrono::milliseconds(6);
     90     auto skip = std::chrono::milliseconds(4);
     91     auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
     92         take(7).
     93         buffer_with_time(period, skip);
     94     values.
     95         subscribe(
     96             [](std::vector<long> v){
     97                 printf("OnNext:");
     98                 std::for_each(v.begin(), v.end(), [](long a){
     99                     printf(" %ld", a);
    100                 });
    101                 printf("\n");
    102             },
    103             [](){printf("OnCompleted\n");});
    104     printf("//! [buffer period+skip overlapping sample]\n");
    105 }
    106 
    107 SCENARIO("buffer period+skip empty sample"){
    108     printf("//! [buffer period+skip empty sample]\n");
    109     auto period = std::chrono::milliseconds(2);
    110     auto skip = std::chrono::milliseconds(4);
    111     auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
    112         buffer_with_time(period, skip);
    113     values.
    114         subscribe(
    115             [](std::vector<long> v){
    116                 printf("OnNext:");
    117                 std::for_each(v.begin(), v.end(), [](long a){
    118                     printf(" %ld", a);
    119                 });
    120                 printf("\n");
    121             },
    122             [](){printf("OnCompleted\n");});
    123     printf("//! [buffer period+skip empty sample]\n");
    124 }
    125 
    126 SCENARIO("buffer period+coordination sample"){
    127     printf("//! [buffer period+coordination sample]\n");
    128     auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
    129         take(7).
    130         buffer_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread());
    131     values.
    132         as_blocking().
    133         subscribe(
    134             [](std::vector<long> v){
    135                 printf("OnNext:");
    136                 std::for_each(v.begin(), v.end(), [](long a){
    137                     printf(" %ld", a);
    138                 });
    139                 printf("\n");
    140             },
    141             [](){printf("OnCompleted\n");});
    142     printf("//! [buffer period+coordination sample]\n");
    143 }
    144 
    145 SCENARIO("buffer period sample"){
    146     printf("//! [buffer period sample]\n");
    147     auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
    148         take(7).
    149         buffer_with_time(std::chrono::milliseconds(4));
    150     values.
    151         subscribe(
    152             [](std::vector<long> v){
    153                 printf("OnNext:");
    154                 std::for_each(v.begin(), v.end(), [](long a){
    155                     printf(" %ld", a);
    156                 });
    157                 printf("\n");
    158             },
    159             [](){printf("OnCompleted\n");});
    160     printf("//! [buffer period sample]\n");
    161 }
    162 
    163 SCENARIO("buffer period+count+coordination sample"){
    164     printf("//! [buffer period+count+coordination sample]\n");
    165     auto int1 = rxcpp::observable<>::range(1L, 3L);
    166     auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
    167     auto values = int1.
    168         concat(int2).
    169         buffer_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop());
    170     values.
    171         as_blocking().
    172         subscribe(
    173             [](std::vector<long> v){
    174                 printf("OnNext:");
    175                 std::for_each(v.begin(), v.end(), [](long a){
    176                     printf(" %ld", a);
    177                 });
    178                 printf("\n");
    179             },
    180             [](){printf("OnCompleted\n");});
    181     printf("//! [buffer period+count+coordination sample]\n");
    182 }
    183 
    184 SCENARIO("buffer period+count sample"){
    185     printf("//! [buffer period+count sample]\n");
    186     auto int1 = rxcpp::observable<>::range(1L, 3L);
    187     auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
    188     auto values = int1.
    189         concat(int2).
    190         buffer_with_time_or_count(std::chrono::milliseconds(20), 2);
    191     values.
    192         subscribe(
    193             [](std::vector<long> v){
    194                 printf("OnNext:");
    195                 std::for_each(v.begin(), v.end(), [](long a){
    196                     printf(" %ld", a);
    197                 });
    198                 printf("\n");
    199             },
    200             [](){printf("OnCompleted\n");});
    201     printf("//! [buffer period+count sample]\n");
    202 }
    203