Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include <rxcpp/operators/rx-reduce.hpp>
      3 #include <rxcpp/operators/rx-filter.hpp>
      4 #include <rxcpp/operators/rx-map.hpp>
      5 #include <rxcpp/operators/rx-take.hpp>
      6 #include <rxcpp/operators/rx-concat_map.hpp>
      7 #include <rxcpp/operators/rx-observe_on.hpp>
      8 
      9 static const int static_tripletCount = 100;
     10 
     11 SCENARIO("concat_transform pythagorian ranges", "[!hide][range][concat_transform][pythagorian][perf]"){
     12     const int& tripletCount = static_tripletCount;
     13     GIVEN("some ranges"){
     14         WHEN("generating pythagorian triplets"){
     15             using namespace std::chrono;
     16             typedef steady_clock clock;
     17 
     18             auto sc = rxsc::make_immediate();
     19             //auto sc = rxsc::make_current_thread();
     20             auto so = rx::identity_one_worker(sc);
     21 
     22             int c = 0;
     23             int ct = 0;
     24             int n = 1;
     25             auto start = clock::now();
     26             auto triples =
     27                 rxs::range(1, so)
     28                     .concat_transform(
     29                         [&c, so](int z){
     30                             return rxs::range(1, z, 1, so)
     31                                 .concat_transform(
     32                                     [&c, so, z](int x){
     33                                         return rxs::range(x, z, 1, so)
     34                                             .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;})
     35                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
     36                                             // forget type to workaround lambda deduction bug on msvc 2013
     37                                             .as_dynamic();},
     38                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;})
     39                                 // forget type to workaround lambda deduction bug on msvc 2013
     40                                 .as_dynamic();},
     41                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;});
     42             triples
     43                 .take(tripletCount)
     44                 .subscribe(
     45                     rxu::apply_to([&ct](int /*x*/,int /*y*/,int /*z*/){++ct;}),
     46                     [](rxu::error_ptr){abort();});
     47             auto finish = clock::now();
     48             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
     49                    duration_cast<milliseconds>(start.time_since_epoch());
     50             std::cout << "concat pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
     51 
     52         }
     53     }
     54 }
     55 
     56 SCENARIO("synchronize concat_transform pythagorian ranges", "[!hide][range][concat_transform][synchronize][pythagorian][perf]"){
     57     const int& tripletCount = static_tripletCount;
     58     GIVEN("some ranges"){
     59         WHEN("generating pythagorian triplets"){
     60             using namespace std::chrono;
     61             typedef steady_clock clock;
     62 
     63             auto so = rx::synchronize_event_loop();
     64 
     65             int c = 0;
     66             int n = 1;
     67             auto start = clock::now();
     68             auto triples =
     69                 rxs::range(1, so)
     70                     .concat_transform(
     71                         [&c, so](int z){
     72                             return rxs::range(1, z, 1, so)
     73                                 .concat_transform(
     74                                     [&c, so, z](int x){
     75                                         return rxs::range(x, z, 1, so)
     76                                             .filter([&c, z, x](int y){
     77                                                 ++c;
     78                                                 if (x*x + y*y == z*z) {
     79                                                     return true;}
     80                                                 else {
     81                                                     return false;}})
     82                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
     83                                             // forget type to workaround lambda deduction bug on msvc 2013
     84                                             .as_dynamic();},
     85                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
     86                                     so)
     87                                 // forget type to workaround lambda deduction bug on msvc 2013
     88                                 .as_dynamic();},
     89                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
     90                         so);
     91             int ct = triples
     92                 .take(tripletCount)
     93                 .as_blocking()
     94                 .count();
     95 
     96             auto finish = clock::now();
     97             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
     98                    duration_cast<milliseconds>(start.time_since_epoch());
     99             std::cout << "concat sync pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
    100         }
    101     }
    102 }
    103 
    104 SCENARIO("observe_on concat_transform pythagorian ranges", "[!hide][range][concat_transform][observe_on][pythagorian][perf]"){
    105     const int& tripletCount = static_tripletCount;
    106     GIVEN("some ranges"){
    107         WHEN("generating pythagorian triplets"){
    108             using namespace std::chrono;
    109             typedef steady_clock clock;
    110 
    111             auto so = rx::observe_on_event_loop();
    112 
    113             int c = 0;
    114             int n = 1;
    115             auto start = clock::now();
    116             auto triples =
    117                 rxs::range(1, so)
    118                     .concat_transform(
    119                         [&c, so](int z){
    120                             return rxs::range(1, z, 1, so)
    121                                 .concat_transform(
    122                                     [&c, so, z](int x){
    123                                         return rxs::range(x, z, 1, so)
    124                                             .filter([&c, z, x](int y){
    125                                                 ++c;
    126                                                 if (x*x + y*y == z*z) {
    127                                                     return true;}
    128                                                 else {
    129                                                     return false;}})
    130                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
    131                                             // forget type to workaround lambda deduction bug on msvc 2013
    132                                             .as_dynamic();},
    133                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
    134                                     so)
    135                                 // forget type to workaround lambda deduction bug on msvc 2013
    136                                 .as_dynamic();},
    137                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
    138                         so);
    139 
    140             int ct = triples
    141                 .take(tripletCount)
    142                 .as_blocking()
    143                 .count();
    144 
    145             auto finish = clock::now();
    146             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
    147                    duration_cast<milliseconds>(start.time_since_epoch());
    148             std::cout << "concat observe_on pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
    149         }
    150     }
    151 }
    152 
    153 SCENARIO("serialize concat_transform pythagorian ranges", "[!hide][range][concat_transform][serialize][pythagorian][perf]"){
    154     const int& tripletCount = static_tripletCount;
    155     GIVEN("some ranges"){
    156         WHEN("generating pythagorian triplets"){
    157             using namespace std::chrono;
    158             typedef steady_clock clock;
    159 
    160             auto so = rx::serialize_event_loop();
    161 
    162             int c = 0;
    163             int n = 1;
    164             auto start = clock::now();
    165             auto triples =
    166                 rxs::range(1, so)
    167                     .concat_transform(
    168                         [&c, so](int z){
    169                             return rxs::range(1, z, 1, so)
    170                                 .concat_transform(
    171                                     [&c, so, z](int x){
    172                                         return rxs::range(x, z, 1, so)
    173                                             .filter([&c, z, x](int y){
    174                                                 ++c;
    175                                                 if (x*x + y*y == z*z) {
    176                                                     return true;}
    177                                                 else {
    178                                                     return false;}})
    179                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
    180                                             // forget type to workaround lambda deduction bug on msvc 2013
    181                                             .as_dynamic();},
    182                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
    183                                     so)
    184                                 // forget type to workaround lambda deduction bug on msvc 2013
    185                                 .as_dynamic();},
    186                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
    187                         so);
    188 
    189             int ct = triples
    190                 .take(tripletCount)
    191                 .as_blocking()
    192                 .count();
    193 
    194             auto finish = clock::now();
    195             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
    196                    duration_cast<milliseconds>(start.time_since_epoch());
    197             std::cout << "concat serial pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
    198         }
    199     }
    200 }
    201 
    202 SCENARIO("concat_map completes", "[concat_map][transform][map][operators]"){
    203     GIVEN("two cold observables. one of ints. one of strings."){
    204         auto sc = rxsc::make_test();
    205         auto w = sc.create_worker();
    206         const rxsc::test::messages<int> i_on;
    207         const rxsc::test::messages<std::string> s_on;
    208 
    209         auto xs = sc.make_cold_observable({
    210             i_on.next(100, 4),
    211             i_on.next(200, 2),
    212             i_on.completed(500)
    213         });
    214 
    215         auto ys = sc.make_cold_observable({
    216             s_on.next(50, "foo"),
    217             s_on.next(100, "bar"),
    218             s_on.next(150, "baz"),
    219             s_on.next(200, "qux"),
    220             s_on.completed(250)
    221         });
    222 
    223         WHEN("each int is mapped to the strings"){
    224 
    225             auto res = w.start(
    226                 [&]() {
    227                     return xs
    228                         | rxo::concat_map(
    229                             [&](int){
    230                                 return ys;},
    231                             [](int, std::string s){
    232                                 return s;})
    233                         // forget type to workaround lambda deduction bug on msvc 2013
    234                         | rxo::as_dynamic();
    235                 }
    236             );
    237 
    238             THEN("the output contains strings repeated for each int"){
    239                 auto required = rxu::to_vector({
    240                     s_on.next(350, "foo"),
    241                     s_on.next(400, "bar"),
    242                     s_on.next(450, "baz"),
    243                     s_on.next(500, "qux"),
    244                     s_on.next(600, "foo"),
    245                     s_on.next(650, "bar"),
    246                     s_on.next(700, "baz"),
    247                     s_on.next(750, "qux"),
    248                     s_on.completed(800)
    249                 });
    250                 auto actual = res.get_observer().messages();
    251                 REQUIRE(required == actual);
    252             }
    253 
    254             THEN("there was one subscription and one unsubscription to the ints"){
    255                 auto required = rxu::to_vector({
    256                     i_on.subscribe(200, 700)
    257                 });
    258                 auto actual = xs.subscriptions();
    259                 REQUIRE(required == actual);
    260             }
    261 
    262             THEN("there were 2 subscription and unsubscription to the strings"){
    263                 auto required = rxu::to_vector({
    264                     s_on.subscribe(300, 550),
    265                     s_on.subscribe(550, 800)
    266                 });
    267                 auto actual = ys.subscriptions();
    268                 REQUIRE(required == actual);
    269             }
    270         }
    271     }
    272 }
    273 
    274 SCENARIO("concat_transform completes", "[concat_transform][transform][map][operators]"){
    275     GIVEN("two cold observables. one of ints. one of strings."){
    276         auto sc = rxsc::make_test();
    277         auto w = sc.create_worker();
    278         const rxsc::test::messages<int> i_on;
    279         const rxsc::test::messages<std::string> s_on;
    280 
    281         auto xs = sc.make_cold_observable({
    282             i_on.next(100, 4),
    283             i_on.next(200, 2),
    284             i_on.completed(500)
    285         });
    286 
    287         auto ys = sc.make_cold_observable({
    288             s_on.next(50, "foo"),
    289             s_on.next(100, "bar"),
    290             s_on.next(150, "baz"),
    291             s_on.next(200, "qux"),
    292             s_on.completed(250)
    293         });
    294 
    295         WHEN("each int is mapped to the strings"){
    296 
    297             auto res = w.start(
    298                 [&]() {
    299                     return xs
    300                         | rxo::concat_transform(
    301                             [&](int){
    302                                 return ys;},
    303                             [](int, std::string s){
    304                                 return s;})
    305                         // forget type to workaround lambda deduction bug on msvc 2013
    306                         | rxo::as_dynamic();
    307                 }
    308             );
    309 
    310             THEN("the output contains strings repeated for each int"){
    311                 auto required = rxu::to_vector({
    312                     s_on.next(350, "foo"),
    313                     s_on.next(400, "bar"),
    314                     s_on.next(450, "baz"),
    315                     s_on.next(500, "qux"),
    316                     s_on.next(600, "foo"),
    317                     s_on.next(650, "bar"),
    318                     s_on.next(700, "baz"),
    319                     s_on.next(750, "qux"),
    320                     s_on.completed(800)
    321                 });
    322                 auto actual = res.get_observer().messages();
    323                 REQUIRE(required == actual);
    324             }
    325 
    326             THEN("there was one subscription and one unsubscription to the ints"){
    327                 auto required = rxu::to_vector({
    328                     i_on.subscribe(200, 700)
    329                 });
    330                 auto actual = xs.subscriptions();
    331                 REQUIRE(required == actual);
    332             }
    333 
    334             THEN("there were 2 subscription and unsubscription to the strings"){
    335                 auto required = rxu::to_vector({
    336                     s_on.subscribe(300, 550),
    337                     s_on.subscribe(550, 800)
    338                 });
    339                 auto actual = ys.subscriptions();
    340                 REQUIRE(required == actual);
    341             }
    342         }
    343 
    344         WHEN("each int is mapped to the strings with coordinator"){
    345 
    346             auto res = w.start(
    347                 [&]() {
    348                     return xs
    349                         .concat_transform(
    350                             [&](int){
    351                                 return ys;},
    352                             [](int, std::string s){
    353                                 return s;},
    354                             rx::identity_current_thread())
    355                         // forget type to workaround lambda deduction bug on msvc 2013
    356                         .as_dynamic();
    357                 }
    358             );
    359 
    360             THEN("the output contains strings repeated for each int"){
    361                 auto required = rxu::to_vector({
    362                     s_on.next(350, "foo"),
    363                     s_on.next(400, "bar"),
    364                     s_on.next(450, "baz"),
    365                     s_on.next(500, "qux"),
    366                     s_on.next(600, "foo"),
    367                     s_on.next(650, "bar"),
    368                     s_on.next(700, "baz"),
    369                     s_on.next(750, "qux"),
    370                     s_on.completed(800)
    371                 });
    372                 auto actual = res.get_observer().messages();
    373                 REQUIRE(required == actual);
    374             }
    375 
    376             THEN("there was one subscription and one unsubscription to the ints"){
    377                 auto required = rxu::to_vector({
    378                     i_on.subscribe(200, 700)
    379                 });
    380                 auto actual = xs.subscriptions();
    381                 REQUIRE(required == actual);
    382             }
    383 
    384             THEN("there were 2 subscription and unsubscription to the strings"){
    385                 auto required = rxu::to_vector({
    386                     s_on.subscribe(300, 550),
    387                     s_on.subscribe(550, 800)
    388                 });
    389                 auto actual = ys.subscriptions();
    390                 REQUIRE(required == actual);
    391             }
    392         }
    393     }
    394 }
    395 
    396 SCENARIO("concat_transform, no result selector, no coordination", "[concat_transform][transform][map][operators]"){
    397     GIVEN("two cold observables. one of ints. one of strings."){
    398         auto sc = rxsc::make_test();
    399         auto w = sc.create_worker();
    400         const rxsc::test::messages<int> i_on;
    401         const rxsc::test::messages<std::string> s_on;
    402 
    403         auto xs = sc.make_cold_observable({
    404             i_on.next(100, 4),
    405             i_on.next(200, 2),
    406             i_on.completed(500)
    407         });
    408 
    409         auto ys = sc.make_cold_observable({
    410             s_on.next(50, "foo"),
    411             s_on.next(100, "bar"),
    412             s_on.next(150, "baz"),
    413             s_on.next(200, "qux"),
    414             s_on.completed(250)
    415         });
    416 
    417         WHEN("each int is mapped to the strings"){
    418 
    419             auto res = w.start(
    420                 [&]() {
    421                     return xs
    422                         .concat_transform(
    423                             [&](int){
    424                                 return ys;})
    425                         // forget type to workaround lambda deduction bug on msvc 2013
    426                         .as_dynamic();
    427                 }
    428             );
    429 
    430             THEN("the output contains strings repeated for each int"){
    431                 auto required = rxu::to_vector({
    432                     s_on.next(350, "foo"),
    433                     s_on.next(400, "bar"),
    434                     s_on.next(450, "baz"),
    435                     s_on.next(500, "qux"),
    436                     s_on.next(600, "foo"),
    437                     s_on.next(650, "bar"),
    438                     s_on.next(700, "baz"),
    439                     s_on.next(750, "qux"),
    440                     s_on.completed(800)
    441                 });
    442                 auto actual = res.get_observer().messages();
    443                 REQUIRE(required == actual);
    444             }
    445 
    446             THEN("there was one subscription and one unsubscription to the ints"){
    447                 auto required = rxu::to_vector({
    448                     i_on.subscribe(200, 700)
    449                 });
    450                 auto actual = xs.subscriptions();
    451                 REQUIRE(required == actual);
    452             }
    453 
    454             THEN("there were 2 subscription and unsubscription to the strings"){
    455                 auto required = rxu::to_vector({
    456                     s_on.subscribe(300, 550),
    457                     s_on.subscribe(550, 800)
    458                 });
    459                 auto actual = ys.subscriptions();
    460                 REQUIRE(required == actual);
    461             }
    462         }
    463     }
    464 }
    465 
    466 SCENARIO("concat_transform, no result selector, with coordination", "[concat_transform][transform][map][operators]"){
    467     GIVEN("two cold observables. one of ints. one of strings."){
    468         auto sc = rxsc::make_test();
    469         auto w = sc.create_worker();
    470         const rxsc::test::messages<int> i_on;
    471         const rxsc::test::messages<std::string> s_on;
    472 
    473         auto xs = sc.make_cold_observable({
    474             i_on.next(100, 4),
    475             i_on.next(200, 2),
    476             i_on.completed(500)
    477         });
    478 
    479         auto ys = sc.make_cold_observable({
    480             s_on.next(50, "foo"),
    481             s_on.next(100, "bar"),
    482             s_on.next(150, "baz"),
    483             s_on.next(200, "qux"),
    484             s_on.completed(250)
    485         });
    486 
    487         WHEN("each int is mapped to the strings"){
    488 
    489             auto res = w.start(
    490                 [&]() {
    491                     return xs
    492                         .concat_transform(
    493                             [&](int){
    494                                 return ys;},
    495                             rx::identity_current_thread())
    496                         // forget type to workaround lambda deduction bug on msvc 2013
    497                         .as_dynamic();
    498                 }
    499             );
    500 
    501             THEN("the output contains strings repeated for each int"){
    502                 auto required = rxu::to_vector({
    503                     s_on.next(350, "foo"),
    504                     s_on.next(400, "bar"),
    505                     s_on.next(450, "baz"),
    506                     s_on.next(500, "qux"),
    507                     s_on.next(600, "foo"),
    508                     s_on.next(650, "bar"),
    509                     s_on.next(700, "baz"),
    510                     s_on.next(750, "qux"),
    511                     s_on.completed(800)
    512                 });
    513                 auto actual = res.get_observer().messages();
    514                 REQUIRE(required == actual);
    515             }
    516 
    517             THEN("there was one subscription and one unsubscription to the ints"){
    518                 auto required = rxu::to_vector({
    519                     i_on.subscribe(200, 700)
    520                 });
    521                 auto actual = xs.subscriptions();
    522                 REQUIRE(required == actual);
    523             }
    524 
    525             THEN("there were 2 subscription and unsubscription to the strings"){
    526                 auto required = rxu::to_vector({
    527                     s_on.subscribe(300, 550),
    528                     s_on.subscribe(550, 800)
    529                 });
    530                 auto actual = ys.subscriptions();
    531                 REQUIRE(required == actual);
    532             }
    533         }
    534     }
    535 }
    536