Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include <rxcpp/operators/rx-concat.hpp>
      3 #include <rxcpp/operators/rx-group_by.hpp>
      4 #include <rxcpp/operators/rx-reduce.hpp>
      5 #include <rxcpp/operators/rx-map.hpp>
      6 #include <rxcpp/operators/rx-merge.hpp>
      7 #include <rxcpp/operators/rx-take.hpp>
      8 #include <rxcpp/operators/rx-start_with.hpp>
      9 #include <rxcpp/operators/rx-observe_on.hpp>
     10 
     11 #include <locale>
     12 #include <sstream>
     13 
     14 SCENARIO("range partitioned by group_by across hardware threads to derive pi", "[!hide][pi][group_by][observe_on][long][perf]"){
     15     GIVEN("a for loop"){
     16         WHEN("partitioning pi series across all hardware threads"){
     17 
     18             std::atomic_int c;
     19             c = 0;
     20             auto pi = [&](int k) {
     21                 ++c;
     22                 return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L );
     23             };
     24 
     25             using namespace std::chrono;
     26             auto start = steady_clock::now();
     27 
     28             // share an output thread across all the producer threads
     29             auto outputthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
     30 
     31             struct work
     32             {
     33                 int index;
     34                 int first;
     35                 int last;
     36             };
     37 
     38             // use all available hardware threads
     39             auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1).
     40                 map(
     41                     [](int index){
     42                         static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency());
     43                         int first = (chunk * index) + 1;
     44                         int last =   chunk * (index + 1);
     45                         return work{index, first, last};}
     46                     ).
     47                 group_by(
     48                     [](work w) -> int {return w.index % std::thread::hardware_concurrency();},
     49                     [](work w){return w;}).
     50                 map(
     51                     [=](rxcpp::grouped_observable<int, work> onproc) {
     52                         auto key = onproc.get_key();
     53                         // share a producer thread across all the ranges in this group of chunks
     54                         auto producerthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
     55                         return onproc.
     56                             map(
     57                                 [=](work w){
     58                                     std::stringstream message;
     59                                     message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last;
     60 
     61                                     return rxcpp::observable<>::range(w.first, w.last, producerthread).
     62                                         map(pi).
     63                                         sum(). // each thread maps and reduces its contribution to the answer
     64                                         map(
     65                                             [=](long double v){
     66                                                 std::stringstream message;
     67                                                 message << key << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v;
     68                                                 return std::make_tuple(message.str(), v);
     69                                             }).
     70                                         start_with(std::make_tuple(message.str(), 0.0L)).
     71                                         as_dynamic();
     72                                 }).
     73                             concat(). // only subscribe to one range at a time in this group.
     74                             observe_on(outputthread).
     75                             map(rxcpp::util::apply_to(
     76                                 [](std::string message, long double v){
     77                                     std::cout << message << std::endl;
     78                                     return v;
     79                                 })).
     80                             as_dynamic();
     81                     }).
     82                 merge().
     83                 sum(). // reduces the contributions from all the threads to the answer
     84                 as_blocking().
     85                 last();
     86 
     87             std::cout << std::setprecision(16) << "Pi: " << total << std::endl;
     88             auto finish = steady_clock::now();
     89             auto msElapsed = duration_cast<milliseconds>(finish-start);
     90             std::cout << "pi using group_by and concat to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
     91 
     92         }
     93     }
     94 }
     95 
     96 SCENARIO("range partitioned by dividing work across hardware threads to derive pi", "[!hide][pi][observe_on][long][perf]"){
     97     GIVEN("a for loop"){
     98         WHEN("partitioning pi series across all hardware threads"){
     99 
    100             std::atomic_int c;
    101             c = 0;
    102             auto pi = [&](int k) {
    103                 ++c;
    104                 return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L );
    105             };
    106 
    107             using namespace std::chrono;
    108             auto start = steady_clock::now();
    109 
    110             struct work
    111             {
    112                 int index;
    113                 int first;
    114                 int last;
    115             };
    116 
    117             // use all available hardware threads
    118             auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1).
    119                 map(
    120                     [](int index){
    121                         static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency());
    122                         int first = (chunk * index) + 1;
    123                         int last =   chunk * (index + 1);
    124                         return work{index, first, last};
    125                     }).
    126                 map(
    127                     [=](work w){
    128                         std::stringstream message;
    129                         message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last;
    130 
    131                         // create a new thread for every chunk
    132                         return rxcpp::observable<>::range(w.first, w.last, rxcpp::observe_on_new_thread()).
    133                             map(pi).
    134                             sum(). // each thread maps and reduces its contribution to the answer
    135                             map(
    136                                 [=](long double v){
    137                                     std::stringstream message;
    138                                     message << w.index << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v;
    139                                     return std::make_tuple(message.str(), v);
    140                                 }).
    141                             start_with(std::make_tuple(message.str(), 0.0L)).
    142                             as_dynamic();
    143                     }).
    144                 merge(rxcpp::observe_on_new_thread()).
    145                 map(rxcpp::util::apply_to(
    146                     [](std::string message, long double v){
    147                         std::cout << message << std::endl;
    148                         return v;
    149                     })).
    150                 sum(). // reduces the contributions from all the threads to the answer
    151                 as_blocking().
    152                 last();
    153 
    154             std::cout << std::setprecision(16) << "Pi: " << total << std::endl;
    155             auto finish = steady_clock::now();
    156             auto msElapsed = duration_cast<milliseconds>(finish-start);
    157             std::cout << "pi using division of the whole range to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
    158 
    159         }
    160     }
    161 }
    162 
    163 char whitespace(char c) {
    164     return std::isspace<char>(c, std::locale::classic());
    165 }
    166 
    167 std::string trim(std::string s) {
    168     auto first = std::find_if_not(s.begin(), s.end(), whitespace);
    169     auto last = std::find_if_not(s.rbegin(), s.rend(), whitespace);
    170     if (last != s.rend()) {
    171         s.erase(s.end() - (last-s.rbegin()), s.end());
    172     }
    173     s.erase(s.begin(), first);
    174     return s;
    175 }
    176 
    177 bool tolowerLess(char lhs, char rhs) {
    178     return std::tolower(lhs, std::locale::classic()) < std::tolower(rhs, std::locale::classic());
    179 }
    180 
    181 bool tolowerStringLess(const std::string& lhs, const std::string& rhs) {
    182     return std::lexicographical_compare(lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), tolowerLess);
    183 }
    184 
    185 SCENARIO("group_by", "[group_by][operators]"){
    186     GIVEN("1 hot observable of ints."){
    187         auto sc = rxsc::make_test();
    188         auto w = sc.create_worker();
    189         const rxsc::test::messages<std::string> on;
    190         int keyInvoked = 0;
    191         int marbleInvoked = 0;
    192 
    193         auto xs = sc.make_hot_observable({
    194             on.next(90, "error"),
    195             on.next(110, "error"),
    196             on.next(130, "error"),
    197             on.next(220, "  foo"),
    198             on.next(240, " FoO "),
    199             on.next(270, "baR  "),
    200             on.next(310, "foO "),
    201             on.next(350, " Baz   "),
    202             on.next(360, "  qux "),
    203             on.next(390, "   bar"),
    204             on.next(420, " BAR  "),
    205             on.next(470, "FOO "),
    206             on.next(480, "baz  "),
    207             on.next(510, " bAZ "),
    208             on.next(530, "    fOo    "),
    209             on.completed(570),
    210             on.next(580, "error"),
    211             on.completed(600),
    212             on.error(650, std::runtime_error("error in completed sequence"))
    213         });
    214 
    215         WHEN("group normalized strings"){
    216 
    217             auto res = w.start(
    218                 [&]() {
    219                     return xs
    220                         .group_by(
    221                             [&](std::string v){
    222                                 ++keyInvoked;
    223                                 return trim(std::move(v));
    224                             },
    225                             [&](std::string v){
    226                                 ++marbleInvoked;
    227                                 std::reverse(v.begin(), v.end());
    228                                 return v;
    229                             },
    230                             tolowerStringLess)
    231                         .map([](const rxcpp::grouped_observable<std::string, std::string>& g){return g.get_key();})
    232                         // forget type to workaround lambda deduction bug on msvc 2013
    233                         .as_dynamic();
    234                 }
    235             );
    236 
    237             THEN("the output contains groups of group keys"){
    238                 auto required = rxu::to_vector({
    239                     on.next(220, "foo"),
    240                     on.next(270, "baR"),
    241                     on.next(350, "Baz"),
    242                     on.next(360, "qux"),
    243                     on.completed(570)
    244                 });
    245                 auto actual = res.get_observer().messages();
    246                 REQUIRE(required == actual);
    247             }
    248 
    249             THEN("there was one subscription and one unsubscription to the xs"){
    250                 auto required = rxu::to_vector({
    251                     on.subscribe(200, 570)
    252                 });
    253                 auto actual = xs.subscriptions();
    254                 REQUIRE(required == actual);
    255             }
    256 
    257             THEN("key selector was invoked for each value"){
    258                 REQUIRE(12 == keyInvoked);
    259             }
    260 
    261             THEN("marble selector was invoked for each value"){
    262                 REQUIRE(12 == marbleInvoked);
    263             }
    264         }
    265     }
    266 }
    267 
    268 SCENARIO("group_by take 1", "[group_by][take][operators]"){
    269     GIVEN("1 hot observable of ints."){
    270         auto sc = rxsc::make_test();
    271         auto w = sc.create_worker();
    272         const rxsc::test::messages<long> on;
    273         int keyInvoked = 0;
    274         int marbleInvoked = 0;
    275         int groupEmitted = 0;
    276 
    277         auto xs = sc.make_hot_observable({
    278             on.next(130, -1),
    279             on.next(220, 0),
    280             on.next(240, -1),
    281             on.next(270, 2),
    282             on.next(310, -3),
    283             on.next(350, 4),
    284             on.next(360, -5),
    285             on.next(390, 6),
    286             on.next(420, -7),
    287             on.next(470, 8),
    288             on.next(480, -9),
    289             on.completed(570)
    290         });
    291 
    292         WHEN("1 group of ints is emitted"){
    293 
    294             auto res = w.start(
    295                 [&]() {
    296                     return xs
    297                         | rxo::group_by(
    298                             [&](long v) {
    299                                 ++keyInvoked;
    300                                 return v % 2;
    301                             },
    302                             [&](long v){
    303                                 ++marbleInvoked;
    304                                 return v;
    305                             })
    306                         | rxo::take(1)
    307                         | rxo::map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> {
    308                             ++groupEmitted;
    309                             return g;
    310                         })
    311                         | rxo::merge()
    312                         // forget type to workaround lambda deduction bug on msvc 2013
    313                         | rxo::as_dynamic();
    314                 }
    315             );
    316 
    317             THEN("the output contains groups of ints"){
    318                 auto required = rxu::to_vector({
    319                     on.next(220, 0),
    320                     on.next(270, 2),
    321                     on.next(350, 4),
    322                     on.next(390, 6),
    323                     on.next(470, 8),
    324                     on.completed(570)
    325                 });
    326                 auto actual = res.get_observer().messages();
    327                 REQUIRE(required == actual);
    328             }
    329 
    330             THEN("there was one subscription and one unsubscription to the xs"){
    331                 auto required = rxu::to_vector({
    332                     on.subscribe(200, 570)
    333                 });
    334                 auto actual = xs.subscriptions();
    335                 REQUIRE(required == actual);
    336             }
    337 
    338             THEN("key selector was invoked for each value"){
    339                 REQUIRE(10 == keyInvoked);
    340             }
    341 
    342             THEN("marble selector was invoked for each value"){
    343                 REQUIRE(5 == marbleInvoked);
    344             }
    345 
    346             THEN("1 group emitted"){
    347                 REQUIRE(1 == groupEmitted);
    348             }
    349         }
    350     }
    351 }
    352 
    353 SCENARIO("group_by take 1 take 4", "[group_by][take][operators]"){
    354     GIVEN("1 hot observable of ints."){
    355         auto sc = rxsc::make_test();
    356         auto w = sc.create_worker();
    357         const rxsc::test::messages<long> on;
    358         int keyInvoked = 0;
    359         int marbleInvoked = 0;
    360         int groupEmitted = 0;
    361 
    362         auto xs = sc.make_hot_observable({
    363             on.next(130, -1),
    364             on.next(220, 0),
    365             on.next(240, -1),
    366             on.next(270, 2),
    367             on.next(310, -3),
    368             on.next(350, 4),
    369             on.next(360, -5),
    370             on.next(390, 6),
    371             on.next(420, -7),
    372         });
    373 
    374         WHEN("1 group of ints is emitted"){
    375 
    376             auto res = w.start(
    377                 [&]() {
    378                     return xs
    379                         .group_by(
    380                             [&](long v) {
    381                                 ++keyInvoked;
    382                                 return v % 2;
    383                             },
    384                             [&](long v){
    385                                 ++marbleInvoked;
    386                                 return v;
    387                             })
    388                         .take(1)
    389                         .map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> {
    390                             ++groupEmitted;
    391                             return g.take(4);
    392                         })
    393                         .merge()
    394                         // forget type to workaround lambda deduction bug on msvc 2013
    395                         .as_dynamic();
    396                 }
    397             );
    398 
    399             THEN("the output contains groups of ints"){
    400                 auto required = rxu::to_vector({
    401                     on.next(220, 0),
    402                     on.next(270, 2),
    403                     on.next(350, 4),
    404                     on.next(390, 6),
    405                     on.completed(390)
    406                 });
    407                 auto actual = res.get_observer().messages();
    408                 REQUIRE(required == actual);
    409             }
    410 
    411             THEN("there was one subscription and one unsubscription to the xs"){
    412                 auto required = rxu::to_vector({
    413                     on.subscribe(200, 390)
    414                 });
    415                 auto actual = xs.subscriptions();
    416                 REQUIRE(required == actual);
    417             }
    418 
    419             THEN("key selector was invoked for each value"){
    420                 REQUIRE(7 == keyInvoked);
    421             }
    422 
    423             THEN("marble selector was invoked for each value"){
    424                 REQUIRE(4 == marbleInvoked);
    425             }
    426 
    427             THEN("1 group emitted"){
    428                 REQUIRE(1 == groupEmitted);
    429             }
    430         }
    431     }
    432 }