Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include <rxcpp/operators/rx-reduce.hpp>
      3 #include <rxcpp/operators/rx-filter.hpp>
      4 #include <rxcpp/operators/rx-map.hpp>
      5 #include <rxcpp/operators/rx-take.hpp>
      6 #include <rxcpp/operators/rx-flat_map.hpp>
      7 #include <rxcpp/operators/rx-observe_on.hpp>
      8 
      9 static const int static_tripletCount = 100;
     10 
     11 SCENARIO("pythagorian for loops", "[!hide][for][pythagorian][perf]"){
     12     const int& tripletCount = static_tripletCount;
     13     GIVEN("a for loop"){
     14         WHEN("generating pythagorian triplets"){
     15             using namespace std::chrono;
     16             typedef steady_clock clock;
     17 
     18             int c = 0;
     19             int ct = 0;
     20             int n = 1;
     21             auto start = clock::now();
     22             for(int z = 1;; ++z)
     23             {
     24                 for(int x = 1; x <= z; ++x)
     25                 {
     26                     for(int y = x; y <= z; ++y)
     27                     {
     28                         ++c;
     29                         if(x*x + y*y == z*z)
     30                         {
     31                             ++ct;
     32                             if(ct == tripletCount)
     33                                 goto done;
     34                         }
     35                     }
     36                 }
     37             }
     38             done:
     39             auto finish = clock::now();
     40             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
     41                    duration_cast<milliseconds>(start.time_since_epoch());
     42             std::cout << "pythagorian for   : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
     43 
     44         }
     45     }
     46 }
     47 
     48 SCENARIO("merge_transform pythagorian ranges", "[!hide][range][merge_transform][pythagorian][perf]"){
     49     const int& tripletCount = static_tripletCount;
     50     GIVEN("some ranges"){
     51         WHEN("generating pythagorian triplets"){
     52             using namespace std::chrono;
     53             typedef steady_clock clock;
     54 
     55             auto so = rx::identity_immediate();
     56 
     57             int c = 0;
     58             int ct = 0;
     59             int n = 1;
     60             auto start = clock::now();
     61             auto triples =
     62                 rxs::range(1, so)
     63                     .merge_transform(
     64                         [&c, so](int z){
     65                             return rxs::range(1, z, 1, so)
     66                                 .flat_map(
     67                                     [&c, so, z](int x){
     68                                         return rxs::range(x, z, 1, so)
     69                                             .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;})
     70                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
     71                                             // forget type to workaround lambda deduction bug on msvc 2013
     72                                             .as_dynamic();},
     73                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;})
     74                                 // forget type to workaround lambda deduction bug on msvc 2013
     75                                 .as_dynamic();},
     76                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;});
     77             triples
     78                 .take(tripletCount)
     79                 .subscribe(
     80                     rxu::apply_to([&ct](int /*x*/,int /*y*/,int /*z*/){
     81                         ++ct;
     82                     }));
     83             auto finish = clock::now();
     84             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
     85                    duration_cast<milliseconds>(start.time_since_epoch());
     86             std::cout << "merge pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
     87 
     88         }
     89     }
     90 }
     91 
     92 SCENARIO("synchronize merge_transform pythagorian ranges", "[!hide][range][merge_transform][synchronize][pythagorian][perf]"){
     93     const int& tripletCount = static_tripletCount;
     94     GIVEN("some ranges"){
     95         WHEN("generating pythagorian triplets"){
     96             using namespace std::chrono;
     97             typedef steady_clock clock;
     98 
     99             auto so = rx::synchronize_event_loop();
    100 
    101             int c = 0;
    102             int n = 1;
    103             auto start = clock::now();
    104             auto triples =
    105                 rxs::range(1, so)
    106                     .merge_transform(
    107                         [&c, so](int z){
    108                             return rxs::range(1, z, 1, so)
    109                                 .merge_transform(
    110                                     [&c, so, z](int x){
    111                                         return rxs::range(x, z, 1, so)
    112                                             .filter([&c, z, x](int y){
    113                                                 ++c;
    114                                                 if (x*x + y*y == z*z) {
    115                                                     return true;}
    116                                                 else {
    117                                                     return false;}})
    118                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
    119                                             // forget type to workaround lambda deduction bug on msvc 2013
    120                                             .as_dynamic();},
    121                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
    122                                     so)
    123                                 // forget type to workaround lambda deduction bug on msvc 2013
    124                                 .as_dynamic();},
    125                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
    126                         so);
    127             int ct = triples
    128                 .take(tripletCount)
    129                 .as_blocking()
    130                 .count();
    131 
    132             auto finish = clock::now();
    133             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
    134                    duration_cast<milliseconds>(start.time_since_epoch());
    135             std::cout << "merge sync pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
    136         }
    137     }
    138 }
    139 
    140 SCENARIO("observe_on merge_transform pythagorian ranges", "[!hide][range][merge_transform][observe_on][pythagorian][perf]"){
    141     const int& tripletCount = static_tripletCount;
    142     GIVEN("some ranges"){
    143         WHEN("generating pythagorian triplets"){
    144             using namespace std::chrono;
    145             typedef steady_clock clock;
    146 
    147             auto so = rx::observe_on_event_loop();
    148 
    149             int c = 0;
    150             int n = 1;
    151             auto start = clock::now();
    152             auto triples =
    153                 rxs::range(1, so)
    154                     .merge_transform(
    155                         [&c, so](int z){
    156                             return rxs::range(1, z, 1, so)
    157                                 .merge_transform(
    158                                     [&c, so, z](int x){
    159                                         return rxs::range(x, z, 1, so)
    160                                             .filter([&c, z, x](int y){
    161                                                 ++c;
    162                                                 if (x*x + y*y == z*z) {
    163                                                     return true;}
    164                                                 else {
    165                                                     return false;}})
    166                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
    167                                             // forget type to workaround lambda deduction bug on msvc 2013
    168                                             .as_dynamic();},
    169                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
    170                                     so)
    171                                 // forget type to workaround lambda deduction bug on msvc 2013
    172                                 .as_dynamic();},
    173                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
    174                         so);
    175             int ct = triples
    176                 .take(tripletCount)
    177                 .as_blocking()
    178                 .count();
    179 
    180             auto finish = clock::now();
    181             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
    182                    duration_cast<milliseconds>(start.time_since_epoch());
    183             std::cout << "merge observe_on pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
    184         }
    185     }
    186 }
    187 
    188 SCENARIO("serialize merge_transform pythagorian ranges", "[!hide][range][merge_transform][serialize][pythagorian][perf]"){
    189     const int& tripletCount = static_tripletCount;
    190     GIVEN("some ranges"){
    191         WHEN("generating pythagorian triplets"){
    192             using namespace std::chrono;
    193             typedef steady_clock clock;
    194 
    195             auto so = rx::serialize_event_loop();
    196 
    197             int c = 0;
    198             int n = 1;
    199             auto start = clock::now();
    200             auto triples =
    201                 rxs::range(1, so)
    202                     .merge_transform(
    203                         [&c, so](int z){
    204                             return rxs::range(1, z, 1, so)
    205                                 .merge_transform(
    206                                     [&c, so, z](int x){
    207                                         return rxs::range(x, z, 1, so)
    208                                             .filter([&c, z, x](int y){
    209                                                 ++c;
    210                                                 if (x*x + y*y == z*z) {
    211                                                     return true;}
    212                                                 else {
    213                                                     return false;}})
    214                                             .transform([z, x](int y){return std::make_tuple(x, y, z);})
    215                                             // forget type to workaround lambda deduction bug on msvc 2013
    216                                             .as_dynamic();},
    217                                     [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
    218                                     so)
    219                                 // forget type to workaround lambda deduction bug on msvc 2013
    220                                 .as_dynamic();},
    221                         [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
    222                         so);
    223             int ct = triples
    224                 .take(tripletCount)
    225                 .as_blocking()
    226                 .count();
    227 
    228             auto finish = clock::now();
    229             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
    230                    duration_cast<milliseconds>(start.time_since_epoch());
    231             std::cout << "merge serial pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
    232         }
    233     }
    234 }
    235 
    236 SCENARIO("flat_map completes", "[flat_map][map][operators]"){
    237     GIVEN("two cold observables. one of ints. one of strings."){
    238         auto sc = rxsc::make_test();
    239         auto w = sc.create_worker();
    240         const rxsc::test::messages<int> i_on;
    241         const rxsc::test::messages<std::string> s_on;
    242 
    243         auto xs = sc.make_cold_observable({
    244             i_on.next(100, 4),
    245             i_on.next(200, 2),
    246             i_on.next(300, 3),
    247             i_on.next(400, 1),
    248             i_on.completed(500)
    249         });
    250 
    251         auto ys = sc.make_cold_observable({
    252             s_on.next(50, "foo"),
    253             s_on.next(100, "bar"),
    254             s_on.next(150, "baz"),
    255             s_on.next(200, "qux"),
    256             s_on.completed(250)
    257         });
    258 
    259         WHEN("each int is mapped to the strings"){
    260 
    261             auto res = w.start(
    262                 [&]() {
    263                     return xs
    264                         | rxo::flat_map(
    265                             [&](int){
    266                                 return ys;},
    267                             [](int, std::string s){
    268                                 return s;})
    269                         // forget type to workaround lambda deduction bug on msvc 2013
    270                         | rxo::as_dynamic();
    271                 }
    272             );
    273 
    274             THEN("the output contains strings repeated for each int"){
    275                 auto required = rxu::to_vector({
    276                     s_on.next(350, "foo"),
    277                     s_on.next(400, "bar"),
    278                     s_on.next(450, "baz"),
    279                     s_on.next(450, "foo"),
    280                     s_on.next(500, "qux"),
    281                     s_on.next(500, "bar"),
    282                     s_on.next(550, "baz"),
    283                     s_on.next(550, "foo"),
    284                     s_on.next(600, "qux"),
    285                     s_on.next(600, "bar"),
    286                     s_on.next(650, "baz"),
    287                     s_on.next(650, "foo"),
    288                     s_on.next(700, "qux"),
    289                     s_on.next(700, "bar"),
    290                     s_on.next(750, "baz"),
    291                     s_on.next(800, "qux"),
    292                     s_on.completed(850)
    293                 });
    294                 auto actual = res.get_observer().messages();
    295                 REQUIRE(required == actual);
    296             }
    297 
    298             THEN("there was one subscription and one unsubscription to the ints"){
    299                 auto required = rxu::to_vector({
    300                     i_on.subscribe(200, 700)
    301                 });
    302                 auto actual = xs.subscriptions();
    303                 REQUIRE(required == actual);
    304             }
    305 
    306             THEN("there were four subscription and unsubscription to the strings"){
    307                 auto required = rxu::to_vector({
    308                     s_on.subscribe(300, 550),
    309                     s_on.subscribe(400, 650),
    310                     s_on.subscribe(500, 750),
    311                     s_on.subscribe(600, 850)
    312                 });
    313                 auto actual = ys.subscriptions();
    314                 REQUIRE(required == actual);
    315             }
    316         }
    317     }
    318 }
    319 
    320 SCENARIO("merge_transform completes", "[merge_transform][transform][map][operators]"){
    321     GIVEN("two cold observables. one of ints. one of strings."){
    322         auto sc = rxsc::make_test();
    323         auto w = sc.create_worker();
    324         const rxsc::test::messages<int> i_on;
    325         const rxsc::test::messages<std::string> s_on;
    326 
    327         auto xs = sc.make_cold_observable({
    328             i_on.next(100, 4),
    329             i_on.next(200, 2),
    330             i_on.next(300, 3),
    331             i_on.next(400, 1),
    332             i_on.completed(500)
    333         });
    334 
    335         auto ys = sc.make_cold_observable({
    336             s_on.next(50, "foo"),
    337             s_on.next(100, "bar"),
    338             s_on.next(150, "baz"),
    339             s_on.next(200, "qux"),
    340             s_on.completed(250)
    341         });
    342 
    343         WHEN("each int is mapped to the strings"){
    344 
    345             auto res = w.start(
    346                 [&]() {
    347                     return xs
    348                         | rxo::merge_transform(
    349                             [&](int){
    350                                 return ys;},
    351                             [](int, std::string s){
    352                                 return s;})
    353                         // forget type to workaround lambda deduction bug on msvc 2013
    354                         | rxo::as_dynamic();
    355                 }
    356             );
    357 
    358             THEN("the output contains strings repeated for each int"){
    359                 auto required = rxu::to_vector({
    360                     s_on.next(350, "foo"),
    361                     s_on.next(400, "bar"),
    362                     s_on.next(450, "baz"),
    363                     s_on.next(450, "foo"),
    364                     s_on.next(500, "qux"),
    365                     s_on.next(500, "bar"),
    366                     s_on.next(550, "baz"),
    367                     s_on.next(550, "foo"),
    368                     s_on.next(600, "qux"),
    369                     s_on.next(600, "bar"),
    370                     s_on.next(650, "baz"),
    371                     s_on.next(650, "foo"),
    372                     s_on.next(700, "qux"),
    373                     s_on.next(700, "bar"),
    374                     s_on.next(750, "baz"),
    375                     s_on.next(800, "qux"),
    376                     s_on.completed(850)
    377                 });
    378                 auto actual = res.get_observer().messages();
    379                 REQUIRE(required == actual);
    380             }
    381 
    382             THEN("there was one subscription and one unsubscription to the ints"){
    383                 auto required = rxu::to_vector({
    384                     i_on.subscribe(200, 700)
    385                 });
    386                 auto actual = xs.subscriptions();
    387                 REQUIRE(required == actual);
    388             }
    389 
    390             THEN("there were four subscription and unsubscription to the strings"){
    391                 auto required = rxu::to_vector({
    392                     s_on.subscribe(300, 550),
    393                     s_on.subscribe(400, 650),
    394                     s_on.subscribe(500, 750),
    395                     s_on.subscribe(600, 850)
    396                 });
    397                 auto actual = ys.subscriptions();
    398                 REQUIRE(required == actual);
    399             }
    400         }
    401 
    402         WHEN("each int is mapped to the strings with coordinator"){
    403 
    404             auto res = w.start(
    405                 [&]() {
    406                     return xs
    407                         .merge_transform(
    408                             [&](int){
    409                                 return ys;},
    410                             [](int, std::string s){
    411                                 return s;},
    412                             rx::identity_current_thread())
    413                         // forget type to workaround lambda deduction bug on msvc 2013
    414                         .as_dynamic();
    415                 }
    416             );
    417 
    418             THEN("the output contains strings repeated for each int"){
    419                 auto required = rxu::to_vector({
    420                     s_on.next(350, "foo"),
    421                     s_on.next(400, "bar"),
    422                     s_on.next(450, "baz"),
    423                     s_on.next(450, "foo"),
    424                     s_on.next(500, "qux"),
    425                     s_on.next(500, "bar"),
    426                     s_on.next(550, "baz"),
    427                     s_on.next(550, "foo"),
    428                     s_on.next(600, "qux"),
    429                     s_on.next(600, "bar"),
    430                     s_on.next(650, "baz"),
    431                     s_on.next(650, "foo"),
    432                     s_on.next(700, "qux"),
    433                     s_on.next(700, "bar"),
    434                     s_on.next(750, "baz"),
    435                     s_on.next(800, "qux"),
    436                     s_on.completed(850)
    437                 });
    438                 auto actual = res.get_observer().messages();
    439                 REQUIRE(required == actual);
    440             }
    441 
    442             THEN("there was one subscription and one unsubscription to the ints"){
    443                 auto required = rxu::to_vector({
    444                     i_on.subscribe(200, 700)
    445                 });
    446                 auto actual = xs.subscriptions();
    447                 REQUIRE(required == actual);
    448             }
    449 
    450             THEN("there were four subscription and unsubscription to the strings"){
    451                 auto required = rxu::to_vector({
    452                     s_on.subscribe(300, 550),
    453                     s_on.subscribe(400, 650),
    454                     s_on.subscribe(500, 750),
    455                     s_on.subscribe(600, 850)
    456                 });
    457                 auto actual = ys.subscriptions();
    458                 REQUIRE(required == actual);
    459             }
    460         }
    461     }
    462 }
    463 
    464 SCENARIO("merge_transform source never ends", "[merge_transform][transform][map][operators]"){
    465     GIVEN("two cold observables. one of ints. one of strings."){
    466         auto sc = rxsc::make_test();
    467         auto w = sc.create_worker();
    468         const rxsc::test::messages<int> i_on;
    469         const rxsc::test::messages<std::string> s_on;
    470 
    471         auto xs = sc.make_cold_observable({
    472             i_on.next(100, 4),
    473             i_on.next(200, 2),
    474             i_on.next(300, 3),
    475             i_on.next(400, 1),
    476             i_on.next(500, 5),
    477             i_on.next(700, 0)
    478         });
    479 
    480         auto ys = sc.make_cold_observable({
    481             s_on.next(50, "foo"),
    482             s_on.next(100, "bar"),
    483             s_on.next(150, "baz"),
    484             s_on.next(200, "qux"),
    485             s_on.completed(250)
    486         });
    487 
    488         WHEN("each int is mapped to the strings"){
    489 
    490             auto res = w.start(
    491                 [&]() {
    492                     return xs
    493                         .merge_transform([&](int){return ys;}, [](int, std::string s){return s;})
    494                         // forget type to workaround lambda deduction bug on msvc 2013
    495                         .as_dynamic();
    496                 }
    497             );
    498 
    499             THEN("the output contains strings repeated for each int"){
    500                 auto required = rxu::to_vector({
    501                     s_on.next(350, "foo"),
    502                     s_on.next(400, "bar"),
    503                     s_on.next(450, "baz"),
    504                     s_on.next(450, "foo"),
    505                     s_on.next(500, "qux"),
    506                     s_on.next(500, "bar"),
    507                     s_on.next(550, "baz"),
    508                     s_on.next(550, "foo"),
    509                     s_on.next(600, "qux"),
    510                     s_on.next(600, "bar"),
    511                     s_on.next(650, "baz"),
    512                     s_on.next(650, "foo"),
    513                     s_on.next(700, "qux"),
    514                     s_on.next(700, "bar"),
    515                     s_on.next(750, "baz"),
    516                     s_on.next(750, "foo"),
    517                     s_on.next(800, "qux"),
    518                     s_on.next(800, "bar"),
    519                     s_on.next(850, "baz"),
    520                     s_on.next(900, "qux"),
    521                     s_on.next(950, "foo")
    522                 });
    523                 auto actual = res.get_observer().messages();
    524                 REQUIRE(required == actual);
    525             }
    526 
    527             THEN("there was one subscription and one unsubscription to the ints"){
    528                 auto required = rxu::to_vector({
    529                     i_on.subscribe(200, 1000)
    530                 });
    531                 auto actual = xs.subscriptions();
    532                 REQUIRE(required == actual);
    533             }
    534 
    535             THEN("there were four subscription and unsubscription to the strings"){
    536                 auto required = rxu::to_vector({
    537                     s_on.subscribe(300, 550),
    538                     s_on.subscribe(400, 650),
    539                     s_on.subscribe(500, 750),
    540                     s_on.subscribe(600, 850),
    541                     s_on.subscribe(700, 950),
    542                     s_on.subscribe(900, 1000)
    543                 });
    544                 auto actual = ys.subscriptions();
    545                 REQUIRE(required == actual);
    546             }
    547         }
    548     }
    549 }
    550 
    551 SCENARIO("merge_transform inner error", "[merge_transform][transform][map][operators]"){
    552     GIVEN("two cold observables. one of ints. one of strings."){
    553         auto sc = rxsc::make_test();
    554         auto w = sc.create_worker();
    555         const rxsc::test::messages<int> i_on;
    556         const rxsc::test::messages<std::string> s_on;
    557 
    558         auto xs = sc.make_cold_observable({
    559             i_on.next(100, 4),
    560             i_on.next(200, 2),
    561             i_on.next(300, 3),
    562             i_on.next(400, 1),
    563             i_on.completed(500)
    564         });
    565 
    566         std::runtime_error ex("filter on_error from inner source");
    567 
    568         auto ys = sc.make_cold_observable({
    569             s_on.next(55, "foo"),
    570             s_on.next(104, "bar"),
    571             s_on.next(153, "baz"),
    572             s_on.next(202, "qux"),
    573             s_on.error(301, ex)
    574         });
    575 
    576         WHEN("each int is mapped to the strings"){
    577 
    578             auto res = w.start(
    579                 [&]() {
    580                     return xs
    581                         .merge_transform([&](int){return ys;}, [](int, std::string s){return s;})
    582                         // forget type to workaround lambda deduction bug on msvc 2013
    583                         .as_dynamic();
    584                 }
    585             );
    586 
    587             THEN("the output contains strings repeated for each int"){
    588                 auto required = rxu::to_vector({
    589                     s_on.next(355, "foo"),
    590                     s_on.next(404, "bar"),
    591                     s_on.next(453, "baz"),
    592                     s_on.next(455, "foo"),
    593                     s_on.next(502, "qux"),
    594                     s_on.next(504, "bar"),
    595                     s_on.next(553, "baz"),
    596                     s_on.next(555, "foo"),
    597                     s_on.error(601, ex)
    598                 });
    599                 auto actual = res.get_observer().messages();
    600                 REQUIRE(required == actual);
    601             }
    602 
    603             THEN("there was one subscription and one unsubscription to the ints"){
    604                 auto required = rxu::to_vector({
    605                     i_on.subscribe(200, 601)
    606                 });
    607                 auto actual = xs.subscriptions();
    608                 REQUIRE(required == actual);
    609             }
    610 
    611             THEN("there were four subscription and unsubscription to the strings"){
    612                 auto required = rxu::to_vector({
    613                     s_on.subscribe(300, 601),
    614                     s_on.subscribe(400, 601),
    615                     s_on.subscribe(500, 601),
    616                     s_on.subscribe(600, 601)
    617                 });
    618                 auto actual = ys.subscriptions();
    619                 REQUIRE(required == actual);
    620             }
    621         }
    622     }
    623 }
    624 
    625 SCENARIO("merge_transform, no result selector, no coordination", "[merge_transform][transform][map][operators]"){
    626     GIVEN("two cold observables. one of ints. one of strings."){
    627         auto sc = rxsc::make_test();
    628         auto w = sc.create_worker();
    629         const rxsc::test::messages<int> i_on;
    630         const rxsc::test::messages<std::string> s_on;
    631 
    632         auto xs = sc.make_cold_observable({
    633             i_on.next(100, 4),
    634             i_on.next(200, 2),
    635             i_on.next(300, 3),
    636             i_on.next(400, 1),
    637             i_on.completed(500)
    638         });
    639 
    640         auto ys = sc.make_cold_observable({
    641             s_on.next(50, "foo"),
    642             s_on.next(100, "bar"),
    643             s_on.next(150, "baz"),
    644             s_on.next(200, "qux"),
    645             s_on.completed(250)
    646         });
    647 
    648         WHEN("each int is mapped to the strings"){
    649 
    650             auto res = w.start(
    651                 [&]() {
    652                     return xs
    653                         .merge_transform(
    654                             [&](int){
    655                                 return ys;})
    656                         // forget type to workaround lambda deduction bug on msvc 2013
    657                         .as_dynamic();
    658                 }
    659             );
    660 
    661             THEN("the output contains strings repeated for each int"){
    662                 auto required = rxu::to_vector({
    663                     s_on.next(350, "foo"),
    664                     s_on.next(400, "bar"),
    665                     s_on.next(450, "baz"),
    666                     s_on.next(450, "foo"),
    667                     s_on.next(500, "qux"),
    668                     s_on.next(500, "bar"),
    669                     s_on.next(550, "baz"),
    670                     s_on.next(550, "foo"),
    671                     s_on.next(600, "qux"),
    672                     s_on.next(600, "bar"),
    673                     s_on.next(650, "baz"),
    674                     s_on.next(650, "foo"),
    675                     s_on.next(700, "qux"),
    676                     s_on.next(700, "bar"),
    677                     s_on.next(750, "baz"),
    678                     s_on.next(800, "qux"),
    679                     s_on.completed(850)
    680                 });
    681                 auto actual = res.get_observer().messages();
    682                 REQUIRE(required == actual);
    683             }
    684 
    685             THEN("there was one subscription and one unsubscription to the ints"){
    686                 auto required = rxu::to_vector({
    687                     i_on.subscribe(200, 700)
    688                 });
    689                 auto actual = xs.subscriptions();
    690                 REQUIRE(required == actual);
    691             }
    692 
    693             THEN("there were four subscription and unsubscription to the strings"){
    694                 auto required = rxu::to_vector({
    695                     s_on.subscribe(300, 550),
    696                     s_on.subscribe(400, 650),
    697                     s_on.subscribe(500, 750),
    698                     s_on.subscribe(600, 850)
    699                 });
    700                 auto actual = ys.subscriptions();
    701                 REQUIRE(required == actual);
    702             }
    703         }
    704     }
    705 }
    706 
    707 SCENARIO("merge_transform, no result selector, with coordination", "[merge_transform][transform][map][operators]"){
    708     GIVEN("two cold observables. one of ints. one of strings."){
    709         auto sc = rxsc::make_test();
    710         auto w = sc.create_worker();
    711         const rxsc::test::messages<int> i_on;
    712         const rxsc::test::messages<std::string> s_on;
    713 
    714         auto xs = sc.make_cold_observable({
    715             i_on.next(100, 4),
    716             i_on.next(200, 2),
    717             i_on.next(300, 3),
    718             i_on.next(400, 1),
    719             i_on.completed(500)
    720         });
    721 
    722         auto ys = sc.make_cold_observable({
    723             s_on.next(50, "foo"),
    724             s_on.next(100, "bar"),
    725             s_on.next(150, "baz"),
    726             s_on.next(200, "qux"),
    727             s_on.completed(250)
    728         });
    729 
    730         WHEN("each int is mapped to the strings"){
    731 
    732             auto res = w.start(
    733                 [&]() {
    734                     return xs
    735                         .merge_transform(
    736                             [&](int){
    737                                 return ys;},
    738                             rx::identity_current_thread())
    739                         // forget type to workaround lambda deduction bug on msvc 2013
    740                         .as_dynamic();
    741                 }
    742             );
    743 
    744             THEN("the output contains strings repeated for each int"){
    745                 auto required = rxu::to_vector({
    746                     s_on.next(350, "foo"),
    747                     s_on.next(400, "bar"),
    748                     s_on.next(450, "baz"),
    749                     s_on.next(450, "foo"),
    750                     s_on.next(500, "qux"),
    751                     s_on.next(500, "bar"),
    752                     s_on.next(550, "baz"),
    753                     s_on.next(550, "foo"),
    754                     s_on.next(600, "qux"),
    755                     s_on.next(600, "bar"),
    756                     s_on.next(650, "baz"),
    757                     s_on.next(650, "foo"),
    758                     s_on.next(700, "qux"),
    759                     s_on.next(700, "bar"),
    760                     s_on.next(750, "baz"),
    761                     s_on.next(800, "qux"),
    762                     s_on.completed(850)
    763                 });
    764                 auto actual = res.get_observer().messages();
    765                 REQUIRE(required == actual);
    766             }
    767 
    768             THEN("there was one subscription and one unsubscription to the ints"){
    769                 auto required = rxu::to_vector({
    770                     i_on.subscribe(200, 700)
    771                 });
    772                 auto actual = xs.subscriptions();
    773                 REQUIRE(required == actual);
    774             }
    775 
    776             THEN("there were four subscription and unsubscription to the strings"){
    777                 auto required = rxu::to_vector({
    778                     s_on.subscribe(300, 550),
    779                     s_on.subscribe(400, 650),
    780                     s_on.subscribe(500, 750),
    781                     s_on.subscribe(600, 850)
    782                 });
    783                 auto actual = ys.subscriptions();
    784                 REQUIRE(required == actual);
    785             }
    786         }
    787     }
    788 }
    789