Home | History | Annotate | Download | only in operators
      1 #include "../test.h"
      2 #include <rxcpp/operators/rx-concat.hpp>
      3 #include <rxcpp/operators/rx-buffer_count.hpp>
      4 #include <rxcpp/operators/rx-buffer_time.hpp>
      5 #include <rxcpp/operators/rx-buffer_time_count.hpp>
      6 #include <rxcpp/operators/rx-take.hpp>
      7 
      8 SCENARIO("buffer count partial window", "[buffer][operators]"){
      9     GIVEN("1 hot observable of ints."){
     10         auto sc = rxsc::make_test();
     11         auto w = sc.create_worker();
     12         const rxsc::test::messages<int> on;
     13         const rxsc::test::messages<std::vector<int>> v_on;
     14 
     15         auto xs = sc.make_hot_observable({
     16             on.next(150, 1),
     17             on.next(210, 2),
     18             on.next(220, 3),
     19             on.next(230, 4),
     20             on.next(240, 5),
     21             on.completed(250)
     22         });
     23 
     24         WHEN("group each int with the next 4 ints"){
     25 
     26             auto res = w.start(
     27                 [&]() {
     28                     return xs
     29                         | rxo::buffer(5)
     30                         // forget type to workaround lambda deduction bug on msvc 2013
     31                         | rxo::as_dynamic();
     32                 }
     33             );
     34 
     35             THEN("the output contains groups of ints"){
     36                 auto required = rxu::to_vector({
     37                     v_on.next(250, rxu::to_vector({ 2, 3, 4, 5 })),
     38                     v_on.completed(250)
     39                 });
     40                 auto actual = res.get_observer().messages();
     41                 REQUIRE(required == actual);
     42             }
     43 
     44             THEN("there was one subscription and one unsubscription to the xs"){
     45                 auto required = rxu::to_vector({
     46                     on.subscribe(200, 250)
     47                 });
     48                 auto actual = xs.subscriptions();
     49                 REQUIRE(required == actual);
     50             }
     51         }
     52     }
     53 }
     54 
     55 SCENARIO("buffer count full windows", "[buffer][operators]"){
     56     GIVEN("1 hot observable of ints."){
     57         auto sc = rxsc::make_test();
     58         auto w = sc.create_worker();
     59         const rxsc::test::messages<int> on;
     60         const rxsc::test::messages<std::vector<int>> v_on;
     61 
     62         auto xs = sc.make_hot_observable({
     63             on.next(150, 1),
     64             on.next(210, 2),
     65             on.next(220, 3),
     66             on.next(230, 4),
     67             on.next(240, 5),
     68             on.completed(250)
     69         });
     70 
     71         WHEN("group each int with the next int"){
     72 
     73             auto res = w.start(
     74                 [&]() {
     75                 return xs
     76                     .buffer(2)
     77                     // forget type to workaround lambda deduction bug on msvc 2013
     78                     .as_dynamic();
     79             }
     80             );
     81 
     82             THEN("the output contains groups of ints"){
     83                 auto required = rxu::to_vector({
     84                     v_on.next(220, rxu::to_vector({ 2, 3 })),
     85                     v_on.next(240, rxu::to_vector({ 4, 5 })),
     86                     v_on.completed(250)
     87                 });
     88                 auto actual = res.get_observer().messages();
     89                 REQUIRE(required == actual);
     90             }
     91 
     92             THEN("there was one subscription and one unsubscription to the xs"){
     93                 auto required = rxu::to_vector({
     94                     on.subscribe(200, 250)
     95                 });
     96                 auto actual = xs.subscriptions();
     97                 REQUIRE(required == actual);
     98             }
     99         }
    100     }
    101 }
    102 
    103 SCENARIO("buffer count full and partial windows", "[buffer][operators]"){
    104     GIVEN("1 hot observable of ints."){
    105         auto sc = rxsc::make_test();
    106         auto w = sc.create_worker();
    107         const rxsc::test::messages<int> on;
    108         const rxsc::test::messages<std::vector<int>> v_on;
    109 
    110         auto xs = sc.make_hot_observable({
    111             on.next(150, 1),
    112             on.next(210, 2),
    113             on.next(220, 3),
    114             on.next(230, 4),
    115             on.next(240, 5),
    116             on.completed(250)
    117         });
    118 
    119         WHEN("group each int with the next 2 ints"){
    120 
    121             auto res = w.start(
    122                 [&]() {
    123                     return xs
    124                         .buffer(3)
    125                         // forget type to workaround lambda deduction bug on msvc 2013
    126                         .as_dynamic();
    127                 }
    128             );
    129 
    130             THEN("the output contains groups of ints"){
    131                 auto required = rxu::to_vector({
    132                     v_on.next(230, rxu::to_vector({ 2, 3, 4 })),
    133                     v_on.next(250, rxu::to_vector({ 5 })),
    134                     v_on.completed(250)
    135                 });
    136                 auto actual = res.get_observer().messages();
    137                 REQUIRE(required == actual);
    138             }
    139 
    140             THEN("there was one subscription and one unsubscription to the xs"){
    141                 auto required = rxu::to_vector({
    142                     on.subscribe(200, 250)
    143                 });
    144                 auto actual = xs.subscriptions();
    145                 REQUIRE(required == actual);
    146             }
    147         }
    148     }
    149 }
    150 
    151 SCENARIO("buffer count error", "[buffer][operators]"){
    152     GIVEN("1 hot observable of ints."){
    153         auto sc = rxsc::make_test();
    154         auto w = sc.create_worker();
    155         const rxsc::test::messages<int> on;
    156         const rxsc::test::messages<std::vector<int>> v_on;
    157 
    158         std::runtime_error ex("buffer on_error from source");
    159 
    160         auto xs = sc.make_hot_observable({
    161             on.next(150, 1),
    162             on.next(210, 2),
    163             on.next(220, 3),
    164             on.next(230, 4),
    165             on.next(240, 5),
    166             on.error(250, ex)
    167         });
    168 
    169         WHEN("group each int with the next 4 ints"){
    170 
    171             auto res = w.start(
    172                 [&]() {
    173                     return xs
    174                         .buffer(5)
    175                         // forget type to workaround lambda deduction bug on msvc 2013
    176                         .as_dynamic();
    177                 }
    178             );
    179 
    180             THEN("the output contains groups of ints"){
    181                 auto required = rxu::to_vector({
    182                     v_on.error(250, ex)
    183                 });
    184                 auto actual = res.get_observer().messages();
    185                 REQUIRE(required == actual);
    186             }
    187 
    188             THEN("there was one subscription and one unsubscription to the xs"){
    189                 auto required = rxu::to_vector({
    190                     on.subscribe(200, 250)
    191                 });
    192                 auto actual = xs.subscriptions();
    193                 REQUIRE(required == actual);
    194             }
    195         }
    196     }
    197 }
    198 
    199 SCENARIO("buffer count skip less", "[buffer][operators]"){
    200     GIVEN("1 hot observable of ints."){
    201         auto sc = rxsc::make_test();
    202         auto w = sc.create_worker();
    203         const rxsc::test::messages<int> on;
    204         const rxsc::test::messages<std::vector<int>> v_on;
    205 
    206         auto xs = sc.make_hot_observable({
    207             on.next(150, 1),
    208             on.next(210, 2),
    209             on.next(220, 3),
    210             on.next(230, 4),
    211             on.next(240, 5),
    212             on.completed(250)
    213         });
    214 
    215         WHEN("group each int with the next 2 ints"){
    216 
    217             auto res = w.start(
    218                 [&]() {
    219                     return xs
    220                         .buffer(3, 1)
    221                         // forget type to workaround lambda deduction bug on msvc 2013
    222                         .as_dynamic();
    223                 }
    224             );
    225 
    226             THEN("the output contains groups of ints"){
    227                 auto required = rxu::to_vector({
    228                     v_on.next(230, rxu::to_vector({ 2, 3, 4 })),
    229                     v_on.next(240, rxu::to_vector({ 3, 4, 5 })),
    230                     v_on.next(250, rxu::to_vector({ 4, 5 })),
    231                     v_on.next(250, rxu::to_vector({ 5 })),
    232                     v_on.completed(250)
    233                 });
    234                 auto actual = res.get_observer().messages();
    235                 REQUIRE(required == actual);
    236             }
    237 
    238             THEN("there was one subscription and one unsubscription to the xs"){
    239                 auto required = rxu::to_vector({
    240                     on.subscribe(200, 250)
    241                 });
    242                 auto actual = xs.subscriptions();
    243                 REQUIRE(required == actual);
    244             }
    245         }
    246     }
    247 }
    248 
    249 SCENARIO("buffer count skip more", "[buffer][operators]"){
    250     GIVEN("1 hot observable of ints."){
    251         auto sc = rxsc::make_test();
    252         auto w = sc.create_worker();
    253         const rxsc::test::messages<int> on;
    254         const rxsc::test::messages<std::vector<int>> v_on;
    255 
    256         auto xs = sc.make_hot_observable({
    257             on.next(150, 1),
    258             on.next(210, 2),
    259             on.next(220, 3),
    260             on.next(230, 4),
    261             on.next(240, 5),
    262             on.completed(250)
    263         });
    264 
    265         WHEN("group each int with the next int skipping the third one"){
    266 
    267             auto res = w.start(
    268                 [&]() {
    269                 return xs
    270                     .buffer(2, 3)
    271                     // forget type to workaround lambda deduction bug on msvc 2013
    272                     .as_dynamic();
    273             }
    274             );
    275 
    276             THEN("the output contains groups of ints"){
    277                 auto required = rxu::to_vector({
    278                     v_on.next(220, rxu::to_vector({ 2, 3 })),
    279                     v_on.next(250, rxu::to_vector({ 5 })),
    280                     v_on.completed(250)
    281                 });
    282                 auto actual = res.get_observer().messages();
    283                 REQUIRE(required == actual);
    284             }
    285 
    286             THEN("there was one subscription and one unsubscription to the xs"){
    287                 auto required = rxu::to_vector({
    288                     on.subscribe(200, 250)
    289                 });
    290                 auto actual = xs.subscriptions();
    291                 REQUIRE(required == actual);
    292             }
    293         }
    294     }
    295 }
    296 
    297 SCENARIO("buffer count basic", "[buffer][operators]"){
    298     GIVEN("1 hot observable of ints."){
    299         auto sc = rxsc::make_test();
    300         auto w = sc.create_worker();
    301         const rxsc::test::messages<int> on;
    302         const rxsc::test::messages<std::vector<int>> v_on;
    303 
    304         auto xs = sc.make_hot_observable({
    305             on.next(100, 1),
    306             on.next(210, 2),
    307             on.next(240, 3),
    308             on.next(280, 4),
    309             on.next(320, 5),
    310             on.next(350, 6),
    311             on.next(380, 7),
    312             on.next(420, 8),
    313             on.next(470, 9),
    314             on.completed(600)
    315         });
    316 
    317         WHEN("group each int with the next 2 ints"){
    318 
    319             auto res = w.start(
    320                 [&]() {
    321                     return xs
    322                         .buffer(3, 2)
    323                         // forget type to workaround lambda deduction bug on msvc 2013
    324                         .as_dynamic();
    325                 }
    326             );
    327 
    328             THEN("the output contains groups of ints"){
    329                 auto required = rxu::to_vector({
    330                     v_on.next(280, rxu::to_vector({ 2, 3, 4 })),
    331                     v_on.next(350, rxu::to_vector({ 4, 5, 6 })),
    332                     v_on.next(420, rxu::to_vector({ 6, 7, 8 })),
    333                     v_on.next(600, rxu::to_vector({ 8, 9 })),
    334                     v_on.completed(600)
    335                 });
    336                 auto actual = res.get_observer().messages();
    337                 REQUIRE(required == actual);
    338             }
    339 
    340             THEN("there was one subscription and one unsubscription to the xs"){
    341                 auto required = rxu::to_vector({
    342                     on.subscribe(200, 600)
    343                 });
    344                 auto actual = xs.subscriptions();
    345                 REQUIRE(required == actual);
    346             }
    347         }
    348     }
    349 }
    350 
    351 SCENARIO("buffer count disposed", "[buffer][operators]"){
    352     GIVEN("1 hot observable of ints."){
    353         auto sc = rxsc::make_test();
    354         auto w = sc.create_worker();
    355         const rxsc::test::messages<int> on;
    356         const rxsc::test::messages<std::vector<int>> v_on;
    357 
    358         auto xs = sc.make_hot_observable({
    359             on.next(100, 1),
    360             on.next(210, 2),
    361             on.next(240, 3),
    362             on.next(280, 4),
    363             on.next(320, 5),
    364             on.next(350, 6),
    365             on.next(380, 7),
    366             on.next(420, 8),
    367             on.next(470, 9),
    368             on.completed(600)
    369         });
    370 
    371         WHEN("group each int with the next 2 ints"){
    372 
    373             auto res = w.start(
    374                 [&]() {
    375                     return xs
    376                         .buffer(3, 2)
    377                         // forget type to workaround lambda deduction bug on msvc 2013
    378                         .as_dynamic();
    379                 },
    380                 370
    381             );
    382 
    383             THEN("the output contains groups of ints"){
    384                 auto required = rxu::to_vector({
    385                     v_on.next(280, rxu::to_vector({ 2, 3, 4 })),
    386                     v_on.next(350, rxu::to_vector({ 4, 5, 6 })),
    387                 });
    388                 auto actual = res.get_observer().messages();
    389                 REQUIRE(required == actual);
    390             }
    391 
    392             THEN("there was one subscription and one unsubscription to the xs"){
    393                 auto required = rxu::to_vector({
    394                     on.subscribe(200, 370)
    395                 });
    396                 auto actual = xs.subscriptions();
    397                 REQUIRE(required == actual);
    398             }
    399         }
    400     }
    401 }
    402 
    403 SCENARIO("buffer count error 2", "[buffer][operators]"){
    404     GIVEN("1 hot observable of ints."){
    405         auto sc = rxsc::make_test();
    406         auto w = sc.create_worker();
    407         const rxsc::test::messages<int> on;
    408         const rxsc::test::messages<std::vector<int>> v_on;
    409 
    410         std::runtime_error ex("buffer on_error from source");
    411 
    412         auto xs = sc.make_hot_observable({
    413             on.next(100, 1),
    414             on.next(210, 2),
    415             on.next(240, 3),
    416             on.next(280, 4),
    417             on.next(320, 5),
    418             on.next(350, 6),
    419             on.next(380, 7),
    420             on.next(420, 8),
    421             on.next(470, 9),
    422             on.error(600, ex)
    423         });
    424 
    425         WHEN("group each int with the next 2 ints"){
    426 
    427             auto res = w.start(
    428                 [&]() {
    429                     return xs
    430                         .buffer(3, 2)
    431                         // forget type to workaround lambda deduction bug on msvc 2013
    432                         .as_dynamic();
    433                 }
    434             );
    435 
    436             THEN("the output contains groups of ints"){
    437                 auto required = rxu::to_vector({
    438                     v_on.next(280, rxu::to_vector({ 2, 3, 4 })),
    439                     v_on.next(350, rxu::to_vector({ 4, 5, 6 })),
    440                     v_on.next(420, rxu::to_vector({ 6, 7, 8 })),
    441                     v_on.error(600, ex)
    442                 });
    443                 auto actual = res.get_observer().messages();
    444                 REQUIRE(required == actual);
    445             }
    446 
    447             THEN("there was one subscription and one unsubscription to the xs"){
    448                 auto required = rxu::to_vector({
    449                     on.subscribe(200, 600)
    450                 });
    451                 auto actual = xs.subscriptions();
    452                 REQUIRE(required == actual);
    453             }
    454         }
    455     }
    456 }
    457 
    458 SCENARIO("buffer with time on intervals", "[buffer_with_time][operators][long][!hide]"){
    459     GIVEN("7 intervals of 2 seconds"){
    460         WHEN("the period is 2sec and the initial is 5sec"){
    461             // time:   |-----------------|
    462             // events:      1 2 3 4 5 6 7
    463             // buffers: ---
    464             //             -1-
    465             //                2-3
    466             //                   -4-
    467             //                      5-6
    468             //                         -7
    469             using namespace std::chrono;
    470 
    471             #define TIME milliseconds
    472             #define UNIT *15
    473 
    474             auto sc = rxsc::make_current_thread();
    475             auto so = rx::synchronize_in_one_worker(sc);
    476             auto start = sc.now() + TIME(5 UNIT);
    477             auto period = TIME(2 UNIT);
    478 
    479             auto bufSource = rxs::interval(start, period, so)
    480                 | rxo::take(7)
    481                 | rxo::buffer_with_time(TIME(3 UNIT), so);
    482 
    483             bufSource
    484                 .subscribe(
    485                     [](std::vector<long> counter){
    486                         printf("on_next: ");
    487                         std::for_each(counter.begin(), counter.end(), [](long c){
    488                             printf("%ld ", c);
    489                         });
    490                         printf("\n");
    491                     },
    492                     [](rxu::error_ptr){
    493                         printf("on_error\n");
    494                     },
    495                     [](){
    496                         printf("on_completed\n");
    497                     }
    498                 );
    499         }
    500     }
    501 }
    502 
    503 SCENARIO("buffer with time on intervals, implicit coordination", "[buffer_with_time][operators][long][!hide]"){
    504     GIVEN("7 intervals of 2 seconds"){
    505         WHEN("the period is 2sec and the initial is 5sec"){
    506             // time:   |-----------------|
    507             // events:      1 2 3 4 5 6 7
    508             // buffers: ---
    509             //             -1-
    510             //                2-3
    511             //                   -4-
    512             //                      5-6
    513             //                         -7
    514             using namespace std::chrono;
    515 
    516             #define TIME milliseconds
    517             #define UNIT *15
    518 
    519             auto sc = rxsc::make_current_thread();
    520             auto so = rx::synchronize_in_one_worker(sc);
    521             auto start = sc.now() + TIME(5 UNIT);
    522             auto period = TIME(2 UNIT);
    523 
    524             rx::observable<>::interval(start, period, so)
    525                 .take(7)
    526                 .buffer_with_time(TIME(3 UNIT))
    527                 .subscribe(
    528                     [](std::vector<long> counter){
    529                         printf("on_next: ");
    530                         std::for_each(counter.begin(), counter.end(), [](long c){
    531                             printf("%ld ", c);
    532                         });
    533                         printf("\n");
    534                     },
    535                     [](rxu::error_ptr){
    536                         printf("on_error\n");
    537                     },
    538                     [](){
    539                         printf("on_completed\n");
    540                     }
    541                 );
    542         }
    543     }
    544 }
    545 
    546 SCENARIO("buffer with time on overlapping intervals", "[buffer_with_time][operators][long][!hide]"){
    547     GIVEN("5 intervals of 2 seconds"){
    548         WHEN("the period is 2sec and the initial is 5sec"){
    549             // time:   |-------------|
    550             // events:      1 2 3 4 5
    551             // buffers: ----
    552             //            --1-
    553             //              1-2-
    554             //                2-3-
    555             //                  3-4-
    556             //                    4-5
    557             //                      5
    558             using namespace std::chrono;
    559 
    560             #define TIME milliseconds
    561             #define UNIT *15
    562 
    563             auto sc = rxsc::make_current_thread();
    564             auto so = rx::synchronize_in_one_worker(sc);
    565             auto start = sc.now() + TIME(5 UNIT);
    566             auto period = TIME(2 UNIT);
    567 
    568             rx::observable<>::interval(start, period, so)
    569                 .take(5)
    570                 .buffer_with_time(TIME(4 UNIT), TIME(2 UNIT), so)
    571                 .subscribe(
    572                     [](std::vector<long> counter){
    573                         printf("on_next: ");
    574                         std::for_each(counter.begin(), counter.end(), [](long c){
    575                             printf("%ld ", c);
    576                         });
    577                         printf("\n");
    578                     },
    579                     [](rxu::error_ptr){
    580                         printf("on_error\n");
    581                     },
    582                     [](){
    583                         printf("on_completed\n");
    584                     }
    585                 );
    586         }
    587     }
    588 }
    589 
    590 SCENARIO("buffer with time on overlapping intervals, implicit coordination", "[buffer_with_time][operators][long][!hide]"){
    591     GIVEN("5 intervals of 2 seconds"){
    592         WHEN("the period is 2sec and the initial is 5sec"){
    593             // time:   |-------------|
    594             // events:      1 2 3 4 5
    595             // buffers: ----
    596             //            --1-
    597             //              1-2-
    598             //                2-3-
    599             //                  3-4-
    600             //                    4-5
    601             //                      5
    602             using namespace std::chrono;
    603 
    604             #define TIME milliseconds
    605             #define UNIT *15
    606 
    607             auto sc = rxsc::make_current_thread();
    608             auto so = rx::synchronize_in_one_worker(sc);
    609             auto start = sc.now() + TIME(5 UNIT);
    610             auto period = TIME(2 UNIT);
    611 
    612             rx::observable<>::interval(start, period, so)
    613                 .take(5)
    614                 .buffer_with_time(TIME(4 UNIT), TIME(2 UNIT))
    615                 .subscribe(
    616                     [](std::vector<long> counter){
    617                         printf("on_next: ");
    618                         std::for_each(counter.begin(), counter.end(), [](long c){
    619                             printf("%ld ", c);
    620                         });
    621                         printf("\n");
    622                     },
    623                     [](rxu::error_ptr){
    624                         printf("on_error\n");
    625                     },
    626                     [](){
    627                         printf("on_completed\n");
    628                     }
    629                 );
    630         }
    631     }
    632 }
    633 
    634 SCENARIO("buffer with time on intervals, error", "[buffer_with_time][operators][long][!hide]"){
    635     GIVEN("5 intervals of 2 seconds"){
    636         WHEN("the period is 2sec and the initial is 5sec"){
    637             // time:   |-------------|
    638             // events:      1 2 3 4 5
    639             // buffers: ----
    640             //            --1-
    641             //              1-2-
    642             //                2-3-
    643             //                  3-4-
    644             //                    4-5
    645             //                      5
    646             using namespace std::chrono;
    647 
    648             #define TIME milliseconds
    649             #define UNIT *15
    650 
    651             auto sc = rxsc::make_current_thread();
    652             auto so = rx::synchronize_in_one_worker(sc);
    653             auto start = sc.now() + TIME(5 UNIT);
    654             auto period = TIME(2 UNIT);
    655 
    656             std::runtime_error ex("buffer_with_time on_error from source");
    657 
    658             auto ys1 = rx::observable<>::interval(start, period, so).take(5);
    659             auto ys2 = rx::observable<>::error<long, std::runtime_error>(std::runtime_error("buffer_with_time on_error from source"), so);
    660             ys1.concat(so, ys2)
    661                 .buffer_with_time(TIME(4 UNIT), TIME(2 UNIT), so)
    662                 .subscribe(
    663                     [](std::vector<long> counter){
    664                         printf("on_next: ");
    665                         std::for_each(counter.begin(), counter.end(), [](long c){
    666                             printf("%ld ", c);
    667                         });
    668                         printf("\n");
    669                     },
    670                     [](rxu::error_ptr){
    671                         printf("on_error\n");
    672                     },
    673                     [](){
    674                         printf("on_completed\n");
    675                     }
    676                 );
    677         }
    678     }
    679 }
    680 
    681 SCENARIO("buffer with time, overlapping intervals", "[buffer_with_time][operators]"){
    682     GIVEN("1 hot observable of ints."){
    683         auto sc = rxsc::make_test();
    684         auto so = rx::synchronize_in_one_worker(sc);
    685         auto w = sc.create_worker();
    686         const rxsc::test::messages<int> on;
    687         const rxsc::test::messages<std::vector<int>> v_on;
    688 
    689         auto xs = sc.make_hot_observable({
    690             on.next(100, 1),
    691             on.next(210, 2),
    692             on.next(240, 3),
    693             on.next(280, 4),
    694             on.next(320, 5),
    695             on.next(350, 6),
    696             on.next(380, 7),
    697             on.next(420, 8),
    698             on.next(470, 9),
    699             on.completed(600)
    700         });
    701         WHEN("group ints on intersecting intervals"){
    702             using namespace std::chrono;
    703 
    704             auto res = w.start(
    705                 [&]() {
    706                     return xs
    707                         .buffer_with_time(milliseconds(100), milliseconds(70), so)
    708                         // forget type to workaround lambda deduction bug on msvc 2013
    709                         .as_dynamic();
    710                 }
    711             );
    712 
    713             THEN("the output contains groups of ints"){
    714                 auto required = rxu::to_vector({
    715                     v_on.next(301, rxu::to_vector({ 2, 3, 4 })),
    716                     v_on.next(371, rxu::to_vector({ 4, 5, 6 })),
    717                     v_on.next(441, rxu::to_vector({ 6, 7, 8 })),
    718                     v_on.next(511, rxu::to_vector({ 8, 9 })),
    719                     v_on.next(581, std::vector<int>()),
    720                     v_on.next(601, std::vector<int>()),
    721                     v_on.completed(601)
    722                 });
    723                 auto actual = res.get_observer().messages();
    724                 REQUIRE(required == actual);
    725             }
    726 
    727             THEN("there was one subscription and one unsubscription to the xs"){
    728                 auto required = rxu::to_vector({
    729                     on.subscribe(200, 600)
    730                 });
    731                 auto actual = xs.subscriptions();
    732                 REQUIRE(required == actual);
    733             }
    734         }
    735     }
    736 }
    737 
    738 SCENARIO("buffer with time, intervals with skips", "[buffer_with_time][operators]"){
    739     GIVEN("1 hot observable of ints."){
    740         auto sc = rxsc::make_test();
    741         auto so = rx::synchronize_in_one_worker(sc);
    742         auto w = sc.create_worker();
    743         const rxsc::test::messages<int> on;
    744         const rxsc::test::messages<std::vector<int>> v_on;
    745 
    746         auto xs = sc.make_hot_observable({
    747             on.next(100, 1),
    748             on.next(210, 2),
    749             on.next(240, 3),
    750             on.next(280, 4),
    751             on.next(320, 5),
    752             on.next(350, 6),
    753             on.next(380, 7),
    754             on.next(420, 8),
    755             on.next(470, 9),
    756             on.completed(600)
    757         });
    758         WHEN("group ints on intervals with skips"){
    759             using namespace std::chrono;
    760 
    761             auto res = w.start(
    762                 [&]() {
    763                     return xs
    764                         .buffer_with_time(milliseconds(70), milliseconds(100), so)
    765                         // forget type to workaround lambda deduction bug on msvc 2013
    766                         .as_dynamic();
    767                 }
    768             );
    769 
    770             THEN("the output contains groups of ints"){
    771                 auto required = rxu::to_vector({
    772                     v_on.next(271, rxu::to_vector({ 2, 3 })),
    773                     v_on.next(371, rxu::to_vector({ 5, 6 })),
    774                     v_on.next(471, rxu::to_vector({ 8, 9 })),
    775                     v_on.next(571, std::vector<int>()),
    776                     v_on.completed(601)
    777                 });
    778                 auto actual = res.get_observer().messages();
    779                 REQUIRE(required == actual);
    780             }
    781 
    782             THEN("there was one subscription and one unsubscription to the xs"){
    783                 auto required = rxu::to_vector({
    784                     on.subscribe(200, 600)
    785                 });
    786                 auto actual = xs.subscriptions();
    787                 REQUIRE(required == actual);
    788             }
    789         }
    790     }
    791 }
    792 
    793 SCENARIO("buffer with time, error", "[buffer_with_time][operators]"){
    794     GIVEN("1 hot observable of ints."){
    795         auto sc = rxsc::make_test();
    796         auto so = rx::synchronize_in_one_worker(sc);
    797         auto w = sc.create_worker();
    798         const rxsc::test::messages<int> on;
    799         const rxsc::test::messages<std::vector<int>> v_on;
    800 
    801         std::runtime_error ex("buffer_with_time on_error from source");
    802 
    803         auto xs = sc.make_hot_observable({
    804             on.next(100, 1),
    805             on.next(210, 2),
    806             on.next(240, 3),
    807             on.next(280, 4),
    808             on.next(320, 5),
    809             on.next(350, 6),
    810             on.next(380, 7),
    811             on.next(420, 8),
    812             on.next(470, 9),
    813             on.error(600, ex)
    814         });
    815         WHEN("group ints on intersecting intervals"){
    816             using namespace std::chrono;
    817 
    818             auto res = w.start(
    819                 [&]() {
    820                     return xs
    821                         .buffer_with_time(milliseconds(100), milliseconds(70), so)
    822                         // forget type to workaround lambda deduction bug on msvc 2013
    823                         .as_dynamic();
    824                 }
    825             );
    826 
    827             THEN("the output contains groups of ints"){
    828                 auto required = rxu::to_vector({
    829                     v_on.next(301, rxu::to_vector({ 2, 3, 4 })),
    830                     v_on.next(371, rxu::to_vector({ 4, 5, 6 })),
    831                     v_on.next(441, rxu::to_vector({ 6, 7, 8 })),
    832                     v_on.next(511, rxu::to_vector({ 8, 9 })),
    833                     v_on.next(581, std::vector<int>()),
    834                     v_on.error(601, ex)
    835                 });
    836                 auto actual = res.get_observer().messages();
    837                 REQUIRE(required == actual);
    838             }
    839 
    840             THEN("there was one subscription and one unsubscription to the xs"){
    841                 auto required = rxu::to_vector({
    842                     on.subscribe(200, 600)
    843                 });
    844                 auto actual = xs.subscriptions();
    845                 REQUIRE(required == actual);
    846             }
    847         }
    848     }
    849 }
    850 
    851 SCENARIO("buffer with time, disposed", "[buffer_with_time][operators]"){
    852     GIVEN("1 hot observable of ints."){
    853         auto sc = rxsc::make_test();
    854         auto so = rx::synchronize_in_one_worker(sc);
    855         auto w = sc.create_worker();
    856         const rxsc::test::messages<int> on;
    857         const rxsc::test::messages<std::vector<int>> v_on;
    858 
    859         auto xs = sc.make_hot_observable({
    860             on.next(100, 1),
    861             on.next(210, 2),
    862             on.next(240, 3),
    863             on.next(280, 4),
    864             on.next(320, 5),
    865             on.next(350, 6),
    866             on.next(380, 7),
    867             on.next(420, 8),
    868             on.next(470, 9),
    869             on.completed(600)
    870         });
    871         WHEN("group ints on intersecting intervals"){
    872             using namespace std::chrono;
    873 
    874             auto res = w.start(
    875                 [&]() {
    876                     return xs
    877                         .buffer_with_time(milliseconds(100), milliseconds(70), so)
    878                         // forget type to workaround lambda deduction bug on msvc 2013
    879                         .as_dynamic();
    880                 },
    881                 370
    882             );
    883 
    884             THEN("the output contains groups of ints"){
    885                 auto required = rxu::to_vector({
    886                     v_on.next(301, rxu::to_vector({ 2, 3, 4 })),
    887                 });
    888                 auto actual = res.get_observer().messages();
    889                 REQUIRE(required == actual);
    890             }
    891 
    892             THEN("there was one subscription and one unsubscription to the xs"){
    893                 auto required = rxu::to_vector({
    894                     on.subscribe(200, 371)
    895                 });
    896                 auto actual = xs.subscriptions();
    897                 REQUIRE(required == actual);
    898             }
    899         }
    900     }
    901 }
    902 
    903 SCENARIO("buffer with time, same", "[buffer_with_time][operators]"){
    904     GIVEN("1 hot observable of ints."){
    905         auto sc = rxsc::make_test();
    906         auto so = rx::synchronize_in_one_worker(sc);
    907         auto w = sc.create_worker();
    908         const rxsc::test::messages<int> on;
    909         const rxsc::test::messages<std::vector<int>> v_on;
    910 
    911         auto xs = sc.make_hot_observable({
    912             on.next(100, 1),
    913             on.next(210, 2),
    914             on.next(240, 3),
    915             on.next(280, 4),
    916             on.next(320, 5),
    917             on.next(350, 6),
    918             on.next(380, 7),
    919             on.next(420, 8),
    920             on.next(470, 9),
    921             on.completed(600)
    922         });
    923         WHEN("group ints on intervals"){
    924             using namespace std::chrono;
    925 
    926             auto res = w.start(
    927                 [&]() {
    928                     return xs
    929                         .buffer_with_time(milliseconds(100), so)
    930                         // forget type to workaround lambda deduction bug on msvc 2013
    931                         .as_dynamic();
    932                 }
    933             );
    934 
    935             THEN("the output contains groups of ints"){
    936                 auto required = rxu::to_vector({
    937                     v_on.next(301, rxu::to_vector({ 2, 3, 4 })),
    938                     v_on.next(401, rxu::to_vector({ 5, 6, 7 })),
    939                     v_on.next(501, rxu::to_vector({ 8, 9 })),
    940                     v_on.next(601, std::vector<int>()),
    941                     v_on.completed(601)
    942                 });
    943                 auto actual = res.get_observer().messages();
    944                 REQUIRE(required == actual);
    945             }
    946 
    947             THEN("there was one subscription and one unsubscription to the xs"){
    948                 auto required = rxu::to_vector({
    949                     on.subscribe(200, 600)
    950                 });
    951                 auto actual = xs.subscriptions();
    952                 REQUIRE(required == actual);
    953             }
    954         }
    955     }
    956 }
    957 
    958 SCENARIO("buffer with time or count, basic", "[buffer_with_time_or_count][operators]"){
    959     GIVEN("1 hot observable of ints."){
    960         auto sc = rxsc::make_test();
    961         auto so = rx::synchronize_in_one_worker(sc);
    962         auto w = sc.create_worker();
    963         const rxsc::test::messages<int> on;
    964         const rxsc::test::messages<std::vector<int>> v_on;
    965 
    966         auto xs = sc.make_hot_observable({
    967             on.next(205, 1),
    968             on.next(210, 2),
    969             on.next(240, 3),
    970             on.next(280, 4),
    971             on.next(320, 5),
    972             on.next(350, 6),
    973             on.next(370, 7),
    974             on.next(420, 8),
    975             on.next(470, 9),
    976             on.completed(600)
    977         });
    978         WHEN("group ints on intervals"){
    979             using namespace std::chrono;
    980 
    981             auto res = w.start(
    982                 [&]() {
    983                     return xs
    984                         | rxo::buffer_with_time_or_count(milliseconds(70), 3, so)
    985                         // forget type to workaround lambda deduction bug on msvc 2013
    986                         | rxo::as_dynamic();
    987                 }
    988             );
    989 
    990             THEN("the output contains groups of ints"){
    991                 auto required = rxu::to_vector({
    992                     v_on.next(241, rxu::to_vector({ 1, 2, 3 })),
    993                     v_on.next(312, rxu::to_vector({ 4 })),
    994                     v_on.next(371, rxu::to_vector({ 5, 6, 7 })),
    995                     v_on.next(442, rxu::to_vector({ 8 })),
    996                     v_on.next(512, rxu::to_vector({ 9 })),
    997                     v_on.next(582, std::vector<int>()),
    998                     v_on.next(601, std::vector<int>()),
    999                     v_on.completed(601)
   1000                 });
   1001                 auto actual = res.get_observer().messages();
   1002                 REQUIRE(required == actual);
   1003             }
   1004 
   1005             THEN("there was one subscription and one unsubscription to the xs"){
   1006                 auto required = rxu::to_vector({
   1007                     on.subscribe(200, 600)
   1008                 });
   1009                 auto actual = xs.subscriptions();
   1010                 REQUIRE(required == actual);
   1011             }
   1012         }
   1013     }
   1014 }
   1015 
   1016 SCENARIO("buffer with time or count, error", "[buffer_with_time_or_count][operators]"){
   1017     GIVEN("1 hot observable of ints."){
   1018         auto sc = rxsc::make_test();
   1019         auto so = rx::synchronize_in_one_worker(sc);
   1020         auto w = sc.create_worker();
   1021         const rxsc::test::messages<int> on;
   1022         const rxsc::test::messages<std::vector<int>> v_on;
   1023 
   1024         std::runtime_error ex("buffer_with_time on_error from source");
   1025 
   1026         auto xs = sc.make_hot_observable({
   1027             on.next(205, 1),
   1028             on.next(210, 2),
   1029             on.next(240, 3),
   1030             on.next(280, 4),
   1031             on.next(320, 5),
   1032             on.next(350, 6),
   1033             on.next(370, 7),
   1034             on.next(420, 8),
   1035             on.next(470, 9),
   1036             on.error(600, ex)
   1037         });
   1038         WHEN("group ints on intervals"){
   1039             using namespace std::chrono;
   1040 
   1041             auto res = w.start(
   1042                 [&]() {
   1043                     return xs
   1044                         .buffer_with_time_or_count(milliseconds(70), 3, so)
   1045                         // forget type to workaround lambda deduction bug on msvc 2013
   1046                         .as_dynamic();
   1047                 }
   1048             );
   1049 
   1050             THEN("the output contains groups of ints"){
   1051                 auto required = rxu::to_vector({
   1052                     v_on.next(241, rxu::to_vector({ 1, 2, 3 })),
   1053                     v_on.next(312, rxu::to_vector({ 4 })),
   1054                     v_on.next(371, rxu::to_vector({ 5, 6, 7 })),
   1055                     v_on.next(442, rxu::to_vector({ 8 })),
   1056                     v_on.next(512, rxu::to_vector({ 9 })),
   1057                     v_on.next(582, std::vector<int>()),
   1058                     v_on.error(601, ex)
   1059                 });
   1060                 auto actual = res.get_observer().messages();
   1061                 REQUIRE(required == actual);
   1062             }
   1063 
   1064             THEN("there was one subscription and one unsubscription to the xs"){
   1065                 auto required = rxu::to_vector({
   1066                     on.subscribe(200, 600)
   1067                 });
   1068                 auto actual = xs.subscriptions();
   1069                 REQUIRE(required == actual);
   1070             }
   1071         }
   1072     }
   1073 }
   1074 
   1075 SCENARIO("buffer with time or count, dispose", "[buffer_with_time_or_count][operators]"){
   1076     GIVEN("1 hot observable of ints."){
   1077         auto sc = rxsc::make_test();
   1078         auto so = rx::synchronize_in_one_worker(sc);
   1079         auto w = sc.create_worker();
   1080         const rxsc::test::messages<int> on;
   1081         const rxsc::test::messages<std::vector<int>> v_on;
   1082 
   1083         auto xs = sc.make_hot_observable({
   1084             on.next(205, 1),
   1085             on.next(210, 2),
   1086             on.next(240, 3),
   1087             on.next(280, 4),
   1088             on.next(320, 5),
   1089             on.next(350, 6),
   1090             on.next(370, 7),
   1091             on.next(420, 8),
   1092             on.next(470, 9),
   1093             on.completed(600)
   1094         });
   1095         WHEN("group ints on intervals"){
   1096             using namespace std::chrono;
   1097 
   1098             auto res = w.start(
   1099                 [&]() {
   1100                     return xs
   1101                         .buffer_with_time_or_count(milliseconds(70), 3, so)
   1102                         // forget type to workaround lambda deduction bug on msvc 2013
   1103                         .as_dynamic();
   1104                 },
   1105                 372
   1106             );
   1107 
   1108             THEN("the output contains groups of ints"){
   1109                 auto required = rxu::to_vector({
   1110                     v_on.next(241, rxu::to_vector({ 1, 2, 3 })),
   1111                     v_on.next(312, rxu::to_vector({ 4 })),
   1112                     v_on.next(371, rxu::to_vector({ 5, 6, 7 })),
   1113                 });
   1114                 auto actual = res.get_observer().messages();
   1115                 REQUIRE(required == actual);
   1116             }
   1117 
   1118             THEN("there was one subscription and one unsubscription to the xs"){
   1119                 auto required = rxu::to_vector({
   1120                     on.subscribe(200, 373)
   1121                 });
   1122                 auto actual = xs.subscriptions();
   1123                 REQUIRE(required == actual);
   1124             }
   1125         }
   1126     }
   1127 }
   1128 
   1129 SCENARIO("buffer with time or count, only time triggered", "[buffer_with_time_or_count][operators]"){
   1130     GIVEN("1 hot observable of ints."){
   1131         auto sc = rxsc::make_test();
   1132         auto so = rx::synchronize_in_one_worker(sc);
   1133         auto w = sc.create_worker();
   1134         const rxsc::test::messages<int> on;
   1135         const rxsc::test::messages<std::vector<int>> v_on;
   1136 
   1137         auto xs = sc.make_hot_observable({
   1138             on.next(205, 1),
   1139             on.next(305, 2),
   1140             on.next(505, 3),
   1141             on.next(605, 4),
   1142             on.next(610, 5),
   1143             on.completed(850)
   1144         });
   1145         WHEN("group ints on intervals"){
   1146             using namespace std::chrono;
   1147 
   1148             auto res = w.start(
   1149                 [&]() {
   1150                     return xs
   1151                         .buffer_with_time_or_count(milliseconds(100), 3, so)
   1152                         // forget type to workaround lambda deduction bug on msvc 2013
   1153                         .as_dynamic();
   1154                 }
   1155             );
   1156 
   1157             THEN("the output contains groups of ints"){
   1158                 auto required = rxu::to_vector({
   1159                     v_on.next(301, rxu::to_vector({ 1 })),
   1160                     v_on.next(401, rxu::to_vector({ 2 })),
   1161                     v_on.next(501, std::vector<int>()),
   1162                     v_on.next(601, rxu::to_vector({ 3 })),
   1163                     v_on.next(701, rxu::to_vector({ 4, 5 })),
   1164                     v_on.next(801, std::vector<int>()),
   1165                     v_on.next(851, std::vector<int>()),
   1166                     v_on.completed(851)
   1167                 });
   1168                 auto actual = res.get_observer().messages();
   1169                 REQUIRE(required == actual);
   1170             }
   1171 
   1172             THEN("there was one subscription and one unsubscription to the xs"){
   1173                 auto required = rxu::to_vector({
   1174                     on.subscribe(200, 850)
   1175                 });
   1176                 auto actual = xs.subscriptions();
   1177                 REQUIRE(required == actual);
   1178             }
   1179         }
   1180     }
   1181 }
   1182 
   1183 SCENARIO("buffer with time or count, only count triggered", "[buffer_with_time_or_count][operators]"){
   1184     GIVEN("1 hot observable of ints."){
   1185         auto sc = rxsc::make_test();
   1186         auto so = rx::synchronize_in_one_worker(sc);
   1187         auto w = sc.create_worker();
   1188         const rxsc::test::messages<int> on;
   1189         const rxsc::test::messages<std::vector<int>> v_on;
   1190 
   1191         auto xs = sc.make_hot_observable({
   1192             on.next(205, 1),
   1193             on.next(305, 2),
   1194             on.next(505, 3),
   1195             on.next(605, 4),
   1196             on.next(610, 5),
   1197             on.completed(850)
   1198         });
   1199         WHEN("group ints on intervals"){
   1200             using namespace std::chrono;
   1201 
   1202             auto res = w.start(
   1203                 [&]() {
   1204                     return xs
   1205                         .buffer_with_time_or_count(milliseconds(370), 2, so)
   1206                         // forget type to workaround lambda deduction bug on msvc 2013
   1207                         .as_dynamic();
   1208                 }
   1209             );
   1210 
   1211             THEN("the output contains groups of ints"){
   1212                 auto required = rxu::to_vector({
   1213                     v_on.next(306, rxu::to_vector({ 1, 2 })),
   1214                     v_on.next(606, rxu::to_vector({ 3, 4 })),
   1215                     v_on.next(851, rxu::to_vector({ 5 })),
   1216                     v_on.completed(851)
   1217                 });
   1218                 auto actual = res.get_observer().messages();
   1219                 REQUIRE(required == actual);
   1220             }
   1221 
   1222             THEN("there was one subscription and one unsubscription to the xs"){
   1223                 auto required = rxu::to_vector({
   1224                     on.subscribe(200, 850)
   1225                 });
   1226                 auto actual = xs.subscriptions();
   1227                 REQUIRE(required == actual);
   1228             }
   1229         }
   1230     }
   1231 }
   1232