Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include <rxcpp/operators/rx-take.hpp>
      3 #include <rxcpp/operators/rx-map.hpp>
      4 #include <rxcpp/operators/rx-observe_on.hpp>
      5 
      6 const int static_onnextcalls = 100000;
      7 
      8 SCENARIO("range observed on new_thread", "[!hide][range][observe_on_debug][observe_on][long][perf]"){
      9     const int& onnextcalls = static_onnextcalls;
     10     GIVEN("a range"){
     11         WHEN("multicasting a million ints"){
     12             using namespace std::chrono;
     13             typedef steady_clock clock;
     14 
     15             auto el = rx::observe_on_new_thread();
     16 
     17             for (int n = 0; n < 10; n++)
     18             {
     19                 std::atomic_bool disposed;
     20                 std::atomic_bool done;
     21                 auto c = std::make_shared<int>(0);
     22 
     23                 rx::composite_subscription cs;
     24                 cs.add([&](){
     25                     if (!done) {abort();}
     26                     disposed = true;
     27                 });
     28 
     29                 auto start = clock::now();
     30                 rxs::range<int>(1)
     31                     .take(onnextcalls)
     32                     .observe_on(el)
     33                     .as_blocking()
     34                     .subscribe(
     35                         cs,
     36                         [c](int){
     37                            ++(*c);
     38                         },
     39                         [&](){
     40                             done = true;
     41                         });
     42                 auto expected = onnextcalls;
     43                 REQUIRE(*c == expected);
     44                 auto finish = clock::now();
     45                 auto msElapsed = duration_cast<milliseconds>(finish-start);
     46                 std::cout << "range -> observe_on new_thread : " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed, int-per-second " << *c / (msElapsed.count() / 1000.0) << std::endl;
     47             }
     48         }
     49     }
     50 }
     51 
     52 SCENARIO("observe_on", "[observe][observe_on]"){
     53     GIVEN("a source"){
     54         auto sc = rxsc::make_test();
     55         auto so = rx::synchronize_in_one_worker(sc);
     56         auto w = sc.create_worker();
     57         const rxsc::test::messages<int> on;
     58 
     59         auto xs = sc.make_hot_observable({
     60             on.next(150, 1),
     61             on.next(210, 2),
     62             on.next(240, 3),
     63             on.completed(300)
     64         });
     65 
     66         WHEN("subscribe_on is specified"){
     67 
     68             auto res = w.start(
     69                 [so, xs]() {
     70                     return xs
     71                          .observe_on(so);
     72                 }
     73             );
     74 
     75             THEN("the output contains items sent while subscribed"){
     76                 auto required = rxu::to_vector({
     77                     on.next(211, 2),
     78                     on.next(241, 3),
     79                     on.completed(301)
     80                 });
     81                 auto actual = res.get_observer().messages();
     82                 REQUIRE(required == actual);
     83             }
     84 
     85             THEN("there was 1 subscription/unsubscription to the source"){
     86                 auto required = rxu::to_vector({
     87                     on.subscribe(200, 300)
     88                 });
     89                 auto actual = xs.subscriptions();
     90                 REQUIRE(required == actual);
     91             }
     92 
     93         }
     94     }
     95 }
     96 
     97 SCENARIO("stream observe_on", "[observe][observe_on]"){
     98     GIVEN("a source"){
     99         auto sc = rxsc::make_test();
    100         auto so = rx::synchronize_in_one_worker(sc);
    101         auto w = sc.create_worker();
    102         const rxsc::test::messages<int> on;
    103 
    104         auto xs = sc.make_hot_observable({
    105             on.next(150, 1),
    106             on.next(210, 2),
    107             on.next(240, 3),
    108             on.completed(300)
    109         });
    110 
    111         WHEN("observe_on is specified"){
    112 
    113             auto res = w.start(
    114                 [so, xs]() {
    115                     return xs
    116                          | rxo::observe_on(so);
    117                 }
    118             );
    119 
    120             THEN("the output contains items sent while subscribed"){
    121                 auto required = rxu::to_vector({
    122                     on.next(211, 2),
    123                     on.next(241, 3),
    124                     on.completed(301)
    125                 });
    126                 auto actual = res.get_observer().messages();
    127                 REQUIRE(required == actual);
    128             }
    129 
    130             THEN("there was 1 subscription/unsubscription to the source"){
    131                 auto required = rxu::to_vector({
    132                     on.subscribe(200, 300)
    133                 });
    134                 auto actual = xs.subscriptions();
    135                 REQUIRE(required == actual);
    136             }
    137 
    138         }
    139     }
    140 }
    141 
    142 class nocompare {
    143 public:
    144     int v;
    145 };
    146 
    147 SCENARIO("observe_on no-comparison", "[observe][observe_on]"){
    148     GIVEN("a source"){
    149         auto sc = rxsc::make_test();
    150         auto so = rx::observe_on_one_worker(sc);
    151         auto w = sc.create_worker();
    152         const rxsc::test::messages<nocompare> in;
    153         const rxsc::test::messages<int> out;
    154 
    155         auto xs = sc.make_hot_observable({
    156             in.next(150, nocompare{1}),
    157             in.next(210, nocompare{2}),
    158             in.next(240, nocompare{3}),
    159             in.completed(300)
    160         });
    161 
    162         WHEN("observe_on is specified"){
    163 
    164             auto res = w.start(
    165                 [so, xs]() {
    166                     return xs
    167                          | rxo::observe_on(so)
    168                          | rxo::map([](nocompare v){ return v.v; })
    169                          | rxo::as_dynamic();
    170                 }
    171             );
    172 
    173             THEN("the output contains items sent while subscribed"){
    174                 auto required = rxu::to_vector({
    175                     out.next(211, 2),
    176                     out.next(241, 3),
    177                     out.completed(301)
    178                 });
    179                 auto actual = res.get_observer().messages();
    180                 REQUIRE(required == actual);
    181             }
    182 
    183             THEN("there was 1 subscription/unsubscription to the source"){
    184                 auto required = rxu::to_vector({
    185                     out.subscribe(200, 300)
    186                 });
    187                 auto actual = xs.subscriptions();
    188                 REQUIRE(required == actual);
    189             }
    190 
    191         }
    192     }
    193 }
    194