Home | History | Annotate | Download | only in subscriptions
      1 #include "../test.h"
      2 #include "rxcpp/operators/rx-combine_latest.hpp"
      3 #include "rxcpp/operators/rx-map.hpp"
      4 #include "rxcpp/operators/rx-take.hpp"
      5 #include "rxcpp/operators/rx-observe_on.hpp"
      6 #include "rxcpp/operators/rx-publish.hpp"
      7 #include "rxcpp/operators/rx-ref_count.hpp"
      8 
      9 #include <sstream>
     10 
     11 SCENARIO("observe subscription", "[!hide]"){
     12     GIVEN("observable of ints"){
     13         WHEN("subscribe"){
     14             auto observers = std::make_shared<std::list<rxcpp::subscriber<int>>>();
     15 
     16             auto observable = rxcpp::observable<>::create<int>([=](rxcpp::subscriber<int> out){
     17                 auto it = observers->insert(observers->end(), out);
     18                 it->add([=](){
     19                     observers->erase(it);
     20                 });
     21             });
     22 
     23         }
     24     }
     25 }
     26 
     27 static const int static_subscriptions = 10000;
     28 
     29 SCENARIO("for loop subscribes to map", "[!hide][for][just][subscribe][long][perf]"){
     30     const int& subscriptions = static_subscriptions;
     31     GIVEN("a for loop"){
     32         WHEN("subscribe 100K times"){
     33             using namespace std::chrono;
     34             typedef steady_clock clock;
     35 
     36             auto sc = rxsc::make_current_thread();
     37             auto w = sc.create_worker();
     38             int runs = 10;
     39 
     40             auto loop = [&](const rxsc::schedulable& self) {
     41                 int c = 0;
     42                 int n = 1;
     43                 auto start = clock::now();
     44                 for (int i = 0; i < subscriptions; i++) {
     45                     rx::observable<>::just(1)
     46                         .map([](int i) {
     47                             std::stringstream serializer;
     48                             serializer << i;
     49                             return serializer.str();
     50                         })
     51                         .map([](const std::string& s) {
     52                             int i;
     53                             std::stringstream(s) >> i;
     54                             return i;
     55                         })
     56                         .subscribe([&](int){
     57                             ++c;
     58                         });
     59                 }
     60                 auto finish = clock::now();
     61                 auto msElapsed = duration_cast<milliseconds>(finish-start);
     62                 std::cout << "loop subscribe map             : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
     63 
     64                 if (--runs > 0) {
     65                     self();
     66                 }
     67             };
     68 
     69             w.schedule(loop);
     70         }
     71     }
     72 }
     73 
     74 SCENARIO("for loop subscribes to combine_latest", "[!hide][for][just][combine_latest][subscribe][long][perf]"){
     75     const int& subscriptions = static_subscriptions;
     76     GIVEN("a for loop"){
     77         WHEN("subscribe 100K times"){
     78             using namespace std::chrono;
     79             typedef steady_clock clock;
     80 
     81             auto sc = rxsc::make_current_thread();
     82             auto w = sc.create_worker();
     83             int runs = 10;
     84 
     85             auto loop = [&](const rxsc::schedulable& self) {
     86                 int c = 0;
     87                 int n = 1;
     88                 auto start = clock::now();
     89                 for (int i = 0; i < subscriptions; i++) {
     90                     rx::observable<>::just(1)
     91                         .combine_latest([](int i, int j) {
     92                             return i + j;
     93                         }, rx::observable<>::just(2))
     94                         .subscribe([&](int){
     95                             ++c;
     96                         });
     97                 }
     98                 auto finish = clock::now();
     99                 auto msElapsed = duration_cast<milliseconds>(finish-start);
    100                 std::cout << "loop subscribe combine_latest  : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
    101 
    102                 if (--runs > 0) {
    103                     self();
    104                 }
    105             };
    106 
    107             w.schedule(loop);
    108         }
    109     }
    110 }
    111 
    112 SCENARIO("synchronized range debug", "[!hide][subscribe][range][synchronize_debug][synchronize][long][perf]"){
    113     GIVEN("range"){
    114         WHEN("synchronized"){
    115             using namespace std::chrono;
    116             typedef steady_clock clock;
    117 
    118             auto sc = rxsc::make_current_thread();
    119             auto w = sc.create_worker();
    120 
    121             auto es = rx::synchronize_event_loop();
    122 
    123             const int values = 10000;
    124 
    125             int runs = 10;
    126 
    127             auto loop = [&](const rxsc::schedulable& self) {
    128                 std::atomic<int> c(0);
    129                 int n = 1;
    130                 auto liftrequirecompletion = [&](rx::subscriber<int> dest){
    131                     auto completionstate = std::make_shared<std::tuple<bool, long, rx::subscriber<int>>>(false, 0, std::move(dest));
    132                     std::get<2>(*completionstate).add([=](){
    133                         if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) {
    134                             abort();
    135                         }
    136                     });
    137                     // VS2013 deduction issue requires dynamic (type-forgetting)
    138                     return rx::make_subscriber<int>(
    139                         std::get<2>(*completionstate),
    140                         [=](int n){
    141                             ++std::get<1>(*completionstate);
    142                             std::get<2>(*completionstate).on_next(n);
    143                         },
    144                         [=](rxu::error_ptr){
    145                             abort();
    146                         },
    147                         [=](){
    148                             if (std::get<1>(*completionstate) != values) {
    149                                 abort();
    150                             }
    151                             std::get<0>(*completionstate) = true;
    152                             std::get<2>(*completionstate).on_completed();
    153                         }).as_dynamic();
    154                 };
    155                 auto start = clock::now();
    156                 auto ew = es.create_coordinator().get_worker();
    157                 std::atomic<int> v(0);
    158                 auto s0 = rxs::range(1, es)
    159                     .take(values)
    160                     .lift<int>(liftrequirecompletion)
    161                     .as_dynamic()
    162                     .publish_synchronized(es)
    163                     .ref_count()
    164                     .lift<int>(liftrequirecompletion)
    165                     .subscribe(
    166                         rx::make_observer_dynamic<int>(
    167                         [&](int){
    168                             ++v;
    169                         },
    170                         [&](){
    171                             ++c;
    172                         }));
    173                 auto s1 = rxs::range(values + 1, es)
    174                     .take(values)
    175                     .lift<int>(liftrequirecompletion)
    176                     .as_dynamic()
    177                     .publish_synchronized(es)
    178                     .ref_count()
    179                     .lift<int>(liftrequirecompletion)
    180                     .subscribe(
    181                         rx::make_observer_dynamic<int>(
    182                         [&](int){
    183                             ++v;
    184                         },
    185                         [&](){
    186                             ++c;
    187                         }));
    188                 auto s2 = rxs::range((values * 2) + 1, es)
    189                     .take(values)
    190                     .lift<int>(liftrequirecompletion)
    191                     .as_dynamic()
    192                     .publish_synchronized(es)
    193                     .ref_count()
    194                     .lift<int>(liftrequirecompletion)
    195                     .subscribe(
    196                         rx::make_observer_dynamic<int>(
    197                         [&](int){
    198                             ++v;
    199                         },
    200                         [&](){
    201                             ++c;
    202                         }));
    203                 while(v != values * 3 || c != 3);
    204                 s0.unsubscribe();
    205                 s1.unsubscribe();
    206                 s2.unsubscribe();
    207                 auto finish = clock::now();
    208                 auto msElapsed = duration_cast<milliseconds>(finish-start);
    209                 std::cout << "range synchronized : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
    210 
    211                 if (--runs > 0) {
    212                     self();
    213                 }
    214             };
    215 
    216             w.schedule(loop);
    217         }
    218     }
    219 }
    220 
    221 SCENARIO("observe_on range debug", "[!hide][subscribe][range][observe_on_debug][observe_on][long][perf]"){
    222     GIVEN("range"){
    223         WHEN("observed on"){
    224             using namespace std::chrono;
    225             typedef steady_clock clock;
    226 
    227             auto sc = rxsc::make_current_thread();
    228             auto w = sc.create_worker();
    229 
    230             auto es = rx::observe_on_event_loop();
    231 
    232             const int values = 10000;
    233 
    234             int runs = 10;
    235 
    236             auto loop = [&](const rxsc::schedulable& self) {
    237                 std::atomic<int> c(0);
    238                 int n = 1;
    239                 auto liftrequirecompletion = [&](rx::subscriber<int> dest){
    240                     auto completionstate = std::make_shared<std::tuple<bool, long, rx::subscriber<int>>>(false, 0, std::move(dest));
    241                     std::get<2>(*completionstate).add([=](){
    242                         if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) {
    243                             abort();
    244                         }
    245                     });
    246                     // VS2013 deduction issue requires dynamic (type-forgetting)
    247                     return rx::make_subscriber<int>(
    248                         std::get<2>(*completionstate),
    249                         [=](int n){
    250                             ++std::get<1>(*completionstate);
    251                             std::get<2>(*completionstate).on_next(n);
    252                         },
    253                         [=](rxu::error_ptr){
    254                             abort();
    255                         },
    256                         [=](){
    257                             if (std::get<1>(*completionstate) != values) {
    258                                 abort();
    259                             }
    260                             std::get<0>(*completionstate) = true;
    261                             std::get<2>(*completionstate).on_completed();
    262                         }).as_dynamic();
    263                 };
    264                 auto start = clock::now();
    265                 auto ew = es.create_coordinator().get_worker();
    266                 std::atomic<int> v(0);
    267                 auto s0 = rxs::range(1, es)
    268                     .take(values)
    269                     .lift<int>(liftrequirecompletion)
    270                     .as_dynamic()
    271                     .observe_on(es)
    272                     .lift<int>(liftrequirecompletion)
    273                     .subscribe(
    274                         rx::make_observer_dynamic<int>(
    275                         [&](int){
    276                             ++v;
    277                         },
    278                         [&](){
    279                             ++c;
    280                         }));
    281                 auto s1 = rxs::range(values + 1, es)
    282                     .take(values)
    283                     .lift<int>(liftrequirecompletion)
    284                     .as_dynamic()
    285                     .observe_on(es)
    286                     .lift<int>(liftrequirecompletion)
    287                     .subscribe(
    288                         rx::make_observer_dynamic<int>(
    289                         [&](int){
    290                             ++v;
    291                         },
    292                         [&](){
    293                             ++c;
    294                         }));
    295                 auto s2 = rxs::range((values * 2) + 1, es)
    296                     .take(values)
    297                     .lift<int>(liftrequirecompletion)
    298                     .as_dynamic()
    299                     .observe_on(es)
    300                     .lift<int>(liftrequirecompletion)
    301                     .subscribe(
    302                         rx::make_observer_dynamic<int>(
    303                         [&](int){
    304                             ++v;
    305                         },
    306                         [&](){
    307                             ++c;
    308                         }));
    309                 while(v != values * 3 || c != 3);
    310                 s0.unsubscribe();
    311                 s1.unsubscribe();
    312                 s2.unsubscribe();
    313                 auto finish = clock::now();
    314                 auto msElapsed = duration_cast<milliseconds>(finish-start);
    315                 std::cout << "range observe_on : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
    316 
    317                 if (--runs > 0) {
    318                     self();
    319                 }
    320             };
    321 
    322             w.schedule(loop);
    323         }
    324     }
    325 }
    326 
    327 SCENARIO("subscription traits", "[subscription][traits]"){
    328     GIVEN("given some subscription types"){
    329         auto es = rx::make_subscription();
    330         rx::composite_subscription cs;
    331         WHEN("tested"){
    332             THEN("is_subscription value is true for empty subscription"){
    333                 REQUIRE(rx::is_subscription<decltype(es)>::value);
    334             }
    335             THEN("is_subscription value is true for composite_subscription"){
    336                 REQUIRE(rx::is_subscription<decltype(cs)>::value);
    337             }
    338         }
    339     }
    340 }
    341 
    342 SCENARIO("non-subscription traits", "[subscription][traits]"){
    343     GIVEN("given some non-subscription types"){
    344         auto l = [](){};
    345         int i = 0;
    346         void* v = nullptr;
    347         WHEN("tested"){
    348             THEN("is_subscription value is false for lambda"){
    349                 l();
    350                 REQUIRE(!rx::is_subscription<decltype(l)>::value);
    351             }
    352             THEN("is_subscription value is false for int"){
    353                 i = 0;
    354                 REQUIRE(!rx::is_subscription<decltype(i)>::value);
    355             }
    356             THEN("is_subscription value is false for void*"){
    357                 v = nullptr;
    358                 REQUIRE(!rx::is_subscription<decltype(v)>::value);
    359             }
    360             THEN("is_subscription value is false for void"){
    361                 REQUIRE(!rx::is_subscription<void>::value);
    362             }
    363         }
    364     }
    365 }
    366 
    367 SCENARIO("subscription static", "[subscription]"){
    368     GIVEN("given a subscription"){
    369         int i=0;
    370         auto s = rx::make_subscription([&i](){++i;});
    371         WHEN("not used"){
    372             THEN("is subscribed"){
    373                 REQUIRE(s.is_subscribed());
    374             }
    375             THEN("i is 0"){
    376                 REQUIRE(i == 0);
    377             }
    378         }
    379         WHEN("used"){
    380             THEN("is not subscribed when unsubscribed once"){
    381                 s.unsubscribe();
    382                 REQUIRE(!s.is_subscribed());
    383             }
    384             THEN("is not subscribed when unsubscribed twice"){
    385                 s.unsubscribe();
    386                 s.unsubscribe();
    387                 REQUIRE(!s.is_subscribed());
    388             }
    389             THEN("i is 1 when unsubscribed once"){
    390                 s.unsubscribe();
    391                 REQUIRE(i == 1);
    392             }
    393             THEN("i is 1 when unsubscribed twice"){
    394                 s.unsubscribe();
    395                 s.unsubscribe();
    396                 REQUIRE(i == 1);
    397             }
    398         }
    399     }
    400 }
    401 
    402 SCENARIO("subscription empty", "[subscription]"){
    403     GIVEN("given an empty subscription"){
    404         auto s = rx::make_subscription();
    405         WHEN("not used"){
    406             THEN("is not subscribed"){
    407                 REQUIRE(!s.is_subscribed());
    408             }
    409         }
    410         WHEN("used"){
    411             THEN("is not subscribed when unsubscribed once"){
    412                 s.unsubscribe();
    413                 REQUIRE(!s.is_subscribed());
    414             }
    415             THEN("is not subscribed when unsubscribed twice"){
    416                 s.unsubscribe();
    417                 s.unsubscribe();
    418                 REQUIRE(!s.is_subscribed());
    419             }
    420         }
    421     }
    422 }
    423 
    424 SCENARIO("subscription composite", "[subscription]"){
    425     GIVEN("given a subscription"){
    426         int i=0;
    427         rx::composite_subscription s;
    428         s.add(rx::make_subscription());
    429         s.add(rx::make_subscription([&i](){++i;}));
    430         s.add([&i](){++i;});
    431         WHEN("not used"){
    432             THEN("is subscribed"){
    433                 REQUIRE(s.is_subscribed());
    434             }
    435             THEN("i is 0"){
    436                 REQUIRE(i == 0);
    437             }
    438         }
    439         WHEN("used"){
    440             THEN("is not subscribed when unsubscribed once"){
    441                 s.unsubscribe();
    442                 REQUIRE(!s.is_subscribed());
    443             }
    444             THEN("is not subscribed when unsubscribed twice"){
    445                 s.unsubscribe();
    446                 s.unsubscribe();
    447                 REQUIRE(!s.is_subscribed());
    448             }
    449             THEN("i is 2 when unsubscribed once"){
    450                 s.unsubscribe();
    451                 REQUIRE(i == 2);
    452             }
    453             THEN("i is 2 when unsubscribed twice"){
    454                 s.unsubscribe();
    455                 s.unsubscribe();
    456                 REQUIRE(i == 2);
    457             }
    458         }
    459     }
    460 }
    461 
    462