1 #include "../test.h" 2 #include <rxcpp/operators/rx-concat.hpp> 3 #include <rxcpp/operators/rx-buffer_count.hpp> 4 #include <rxcpp/operators/rx-buffer_time.hpp> 5 #include <rxcpp/operators/rx-buffer_time_count.hpp> 6 #include <rxcpp/operators/rx-take.hpp> 7 8 SCENARIO("buffer count partial window", "[buffer][operators]"){ 9 GIVEN("1 hot observable of ints."){ 10 auto sc = rxsc::make_test(); 11 auto w = sc.create_worker(); 12 const rxsc::test::messages<int> on; 13 const rxsc::test::messages<std::vector<int>> v_on; 14 15 auto xs = sc.make_hot_observable({ 16 on.next(150, 1), 17 on.next(210, 2), 18 on.next(220, 3), 19 on.next(230, 4), 20 on.next(240, 5), 21 on.completed(250) 22 }); 23 24 WHEN("group each int with the next 4 ints"){ 25 26 auto res = w.start( 27 [&]() { 28 return xs 29 | rxo::buffer(5) 30 // forget type to workaround lambda deduction bug on msvc 2013 31 | rxo::as_dynamic(); 32 } 33 ); 34 35 THEN("the output contains groups of ints"){ 36 auto required = rxu::to_vector({ 37 v_on.next(250, rxu::to_vector({ 2, 3, 4, 5 })), 38 v_on.completed(250) 39 }); 40 auto actual = res.get_observer().messages(); 41 REQUIRE(required == actual); 42 } 43 44 THEN("there was one subscription and one unsubscription to the xs"){ 45 auto required = rxu::to_vector({ 46 on.subscribe(200, 250) 47 }); 48 auto actual = xs.subscriptions(); 49 REQUIRE(required == actual); 50 } 51 } 52 } 53 } 54 55 SCENARIO("buffer count full windows", "[buffer][operators]"){ 56 GIVEN("1 hot observable of ints."){ 57 auto sc = rxsc::make_test(); 58 auto w = sc.create_worker(); 59 const rxsc::test::messages<int> on; 60 const rxsc::test::messages<std::vector<int>> v_on; 61 62 auto xs = sc.make_hot_observable({ 63 on.next(150, 1), 64 on.next(210, 2), 65 on.next(220, 3), 66 on.next(230, 4), 67 on.next(240, 5), 68 on.completed(250) 69 }); 70 71 WHEN("group each int with the next int"){ 72 73 auto res = w.start( 74 [&]() { 75 return xs 76 .buffer(2) 77 // forget type to workaround lambda deduction bug on msvc 2013 78 .as_dynamic(); 79 } 80 ); 81 82 THEN("the output contains groups of ints"){ 83 auto required = rxu::to_vector({ 84 v_on.next(220, rxu::to_vector({ 2, 3 })), 85 v_on.next(240, rxu::to_vector({ 4, 5 })), 86 v_on.completed(250) 87 }); 88 auto actual = res.get_observer().messages(); 89 REQUIRE(required == actual); 90 } 91 92 THEN("there was one subscription and one unsubscription to the xs"){ 93 auto required = rxu::to_vector({ 94 on.subscribe(200, 250) 95 }); 96 auto actual = xs.subscriptions(); 97 REQUIRE(required == actual); 98 } 99 } 100 } 101 } 102 103 SCENARIO("buffer count full and partial windows", "[buffer][operators]"){ 104 GIVEN("1 hot observable of ints."){ 105 auto sc = rxsc::make_test(); 106 auto w = sc.create_worker(); 107 const rxsc::test::messages<int> on; 108 const rxsc::test::messages<std::vector<int>> v_on; 109 110 auto xs = sc.make_hot_observable({ 111 on.next(150, 1), 112 on.next(210, 2), 113 on.next(220, 3), 114 on.next(230, 4), 115 on.next(240, 5), 116 on.completed(250) 117 }); 118 119 WHEN("group each int with the next 2 ints"){ 120 121 auto res = w.start( 122 [&]() { 123 return xs 124 .buffer(3) 125 // forget type to workaround lambda deduction bug on msvc 2013 126 .as_dynamic(); 127 } 128 ); 129 130 THEN("the output contains groups of ints"){ 131 auto required = rxu::to_vector({ 132 v_on.next(230, rxu::to_vector({ 2, 3, 4 })), 133 v_on.next(250, rxu::to_vector({ 5 })), 134 v_on.completed(250) 135 }); 136 auto actual = res.get_observer().messages(); 137 REQUIRE(required == actual); 138 } 139 140 THEN("there was one subscription and one unsubscription to the xs"){ 141 auto required = rxu::to_vector({ 142 on.subscribe(200, 250) 143 }); 144 auto actual = xs.subscriptions(); 145 REQUIRE(required == actual); 146 } 147 } 148 } 149 } 150 151 SCENARIO("buffer count error", "[buffer][operators]"){ 152 GIVEN("1 hot observable of ints."){ 153 auto sc = rxsc::make_test(); 154 auto w = sc.create_worker(); 155 const rxsc::test::messages<int> on; 156 const rxsc::test::messages<std::vector<int>> v_on; 157 158 std::runtime_error ex("buffer on_error from source"); 159 160 auto xs = sc.make_hot_observable({ 161 on.next(150, 1), 162 on.next(210, 2), 163 on.next(220, 3), 164 on.next(230, 4), 165 on.next(240, 5), 166 on.error(250, ex) 167 }); 168 169 WHEN("group each int with the next 4 ints"){ 170 171 auto res = w.start( 172 [&]() { 173 return xs 174 .buffer(5) 175 // forget type to workaround lambda deduction bug on msvc 2013 176 .as_dynamic(); 177 } 178 ); 179 180 THEN("the output contains groups of ints"){ 181 auto required = rxu::to_vector({ 182 v_on.error(250, ex) 183 }); 184 auto actual = res.get_observer().messages(); 185 REQUIRE(required == actual); 186 } 187 188 THEN("there was one subscription and one unsubscription to the xs"){ 189 auto required = rxu::to_vector({ 190 on.subscribe(200, 250) 191 }); 192 auto actual = xs.subscriptions(); 193 REQUIRE(required == actual); 194 } 195 } 196 } 197 } 198 199 SCENARIO("buffer count skip less", "[buffer][operators]"){ 200 GIVEN("1 hot observable of ints."){ 201 auto sc = rxsc::make_test(); 202 auto w = sc.create_worker(); 203 const rxsc::test::messages<int> on; 204 const rxsc::test::messages<std::vector<int>> v_on; 205 206 auto xs = sc.make_hot_observable({ 207 on.next(150, 1), 208 on.next(210, 2), 209 on.next(220, 3), 210 on.next(230, 4), 211 on.next(240, 5), 212 on.completed(250) 213 }); 214 215 WHEN("group each int with the next 2 ints"){ 216 217 auto res = w.start( 218 [&]() { 219 return xs 220 .buffer(3, 1) 221 // forget type to workaround lambda deduction bug on msvc 2013 222 .as_dynamic(); 223 } 224 ); 225 226 THEN("the output contains groups of ints"){ 227 auto required = rxu::to_vector({ 228 v_on.next(230, rxu::to_vector({ 2, 3, 4 })), 229 v_on.next(240, rxu::to_vector({ 3, 4, 5 })), 230 v_on.next(250, rxu::to_vector({ 4, 5 })), 231 v_on.next(250, rxu::to_vector({ 5 })), 232 v_on.completed(250) 233 }); 234 auto actual = res.get_observer().messages(); 235 REQUIRE(required == actual); 236 } 237 238 THEN("there was one subscription and one unsubscription to the xs"){ 239 auto required = rxu::to_vector({ 240 on.subscribe(200, 250) 241 }); 242 auto actual = xs.subscriptions(); 243 REQUIRE(required == actual); 244 } 245 } 246 } 247 } 248 249 SCENARIO("buffer count skip more", "[buffer][operators]"){ 250 GIVEN("1 hot observable of ints."){ 251 auto sc = rxsc::make_test(); 252 auto w = sc.create_worker(); 253 const rxsc::test::messages<int> on; 254 const rxsc::test::messages<std::vector<int>> v_on; 255 256 auto xs = sc.make_hot_observable({ 257 on.next(150, 1), 258 on.next(210, 2), 259 on.next(220, 3), 260 on.next(230, 4), 261 on.next(240, 5), 262 on.completed(250) 263 }); 264 265 WHEN("group each int with the next int skipping the third one"){ 266 267 auto res = w.start( 268 [&]() { 269 return xs 270 .buffer(2, 3) 271 // forget type to workaround lambda deduction bug on msvc 2013 272 .as_dynamic(); 273 } 274 ); 275 276 THEN("the output contains groups of ints"){ 277 auto required = rxu::to_vector({ 278 v_on.next(220, rxu::to_vector({ 2, 3 })), 279 v_on.next(250, rxu::to_vector({ 5 })), 280 v_on.completed(250) 281 }); 282 auto actual = res.get_observer().messages(); 283 REQUIRE(required == actual); 284 } 285 286 THEN("there was one subscription and one unsubscription to the xs"){ 287 auto required = rxu::to_vector({ 288 on.subscribe(200, 250) 289 }); 290 auto actual = xs.subscriptions(); 291 REQUIRE(required == actual); 292 } 293 } 294 } 295 } 296 297 SCENARIO("buffer count basic", "[buffer][operators]"){ 298 GIVEN("1 hot observable of ints."){ 299 auto sc = rxsc::make_test(); 300 auto w = sc.create_worker(); 301 const rxsc::test::messages<int> on; 302 const rxsc::test::messages<std::vector<int>> v_on; 303 304 auto xs = sc.make_hot_observable({ 305 on.next(100, 1), 306 on.next(210, 2), 307 on.next(240, 3), 308 on.next(280, 4), 309 on.next(320, 5), 310 on.next(350, 6), 311 on.next(380, 7), 312 on.next(420, 8), 313 on.next(470, 9), 314 on.completed(600) 315 }); 316 317 WHEN("group each int with the next 2 ints"){ 318 319 auto res = w.start( 320 [&]() { 321 return xs 322 .buffer(3, 2) 323 // forget type to workaround lambda deduction bug on msvc 2013 324 .as_dynamic(); 325 } 326 ); 327 328 THEN("the output contains groups of ints"){ 329 auto required = rxu::to_vector({ 330 v_on.next(280, rxu::to_vector({ 2, 3, 4 })), 331 v_on.next(350, rxu::to_vector({ 4, 5, 6 })), 332 v_on.next(420, rxu::to_vector({ 6, 7, 8 })), 333 v_on.next(600, rxu::to_vector({ 8, 9 })), 334 v_on.completed(600) 335 }); 336 auto actual = res.get_observer().messages(); 337 REQUIRE(required == actual); 338 } 339 340 THEN("there was one subscription and one unsubscription to the xs"){ 341 auto required = rxu::to_vector({ 342 on.subscribe(200, 600) 343 }); 344 auto actual = xs.subscriptions(); 345 REQUIRE(required == actual); 346 } 347 } 348 } 349 } 350 351 SCENARIO("buffer count disposed", "[buffer][operators]"){ 352 GIVEN("1 hot observable of ints."){ 353 auto sc = rxsc::make_test(); 354 auto w = sc.create_worker(); 355 const rxsc::test::messages<int> on; 356 const rxsc::test::messages<std::vector<int>> v_on; 357 358 auto xs = sc.make_hot_observable({ 359 on.next(100, 1), 360 on.next(210, 2), 361 on.next(240, 3), 362 on.next(280, 4), 363 on.next(320, 5), 364 on.next(350, 6), 365 on.next(380, 7), 366 on.next(420, 8), 367 on.next(470, 9), 368 on.completed(600) 369 }); 370 371 WHEN("group each int with the next 2 ints"){ 372 373 auto res = w.start( 374 [&]() { 375 return xs 376 .buffer(3, 2) 377 // forget type to workaround lambda deduction bug on msvc 2013 378 .as_dynamic(); 379 }, 380 370 381 ); 382 383 THEN("the output contains groups of ints"){ 384 auto required = rxu::to_vector({ 385 v_on.next(280, rxu::to_vector({ 2, 3, 4 })), 386 v_on.next(350, rxu::to_vector({ 4, 5, 6 })), 387 }); 388 auto actual = res.get_observer().messages(); 389 REQUIRE(required == actual); 390 } 391 392 THEN("there was one subscription and one unsubscription to the xs"){ 393 auto required = rxu::to_vector({ 394 on.subscribe(200, 370) 395 }); 396 auto actual = xs.subscriptions(); 397 REQUIRE(required == actual); 398 } 399 } 400 } 401 } 402 403 SCENARIO("buffer count error 2", "[buffer][operators]"){ 404 GIVEN("1 hot observable of ints."){ 405 auto sc = rxsc::make_test(); 406 auto w = sc.create_worker(); 407 const rxsc::test::messages<int> on; 408 const rxsc::test::messages<std::vector<int>> v_on; 409 410 std::runtime_error ex("buffer on_error from source"); 411 412 auto xs = sc.make_hot_observable({ 413 on.next(100, 1), 414 on.next(210, 2), 415 on.next(240, 3), 416 on.next(280, 4), 417 on.next(320, 5), 418 on.next(350, 6), 419 on.next(380, 7), 420 on.next(420, 8), 421 on.next(470, 9), 422 on.error(600, ex) 423 }); 424 425 WHEN("group each int with the next 2 ints"){ 426 427 auto res = w.start( 428 [&]() { 429 return xs 430 .buffer(3, 2) 431 // forget type to workaround lambda deduction bug on msvc 2013 432 .as_dynamic(); 433 } 434 ); 435 436 THEN("the output contains groups of ints"){ 437 auto required = rxu::to_vector({ 438 v_on.next(280, rxu::to_vector({ 2, 3, 4 })), 439 v_on.next(350, rxu::to_vector({ 4, 5, 6 })), 440 v_on.next(420, rxu::to_vector({ 6, 7, 8 })), 441 v_on.error(600, ex) 442 }); 443 auto actual = res.get_observer().messages(); 444 REQUIRE(required == actual); 445 } 446 447 THEN("there was one subscription and one unsubscription to the xs"){ 448 auto required = rxu::to_vector({ 449 on.subscribe(200, 600) 450 }); 451 auto actual = xs.subscriptions(); 452 REQUIRE(required == actual); 453 } 454 } 455 } 456 } 457 458 SCENARIO("buffer with time on intervals", "[buffer_with_time][operators][long][!hide]"){ 459 GIVEN("7 intervals of 2 seconds"){ 460 WHEN("the period is 2sec and the initial is 5sec"){ 461 // time: |-----------------| 462 // events: 1 2 3 4 5 6 7 463 // buffers: --- 464 // -1- 465 // 2-3 466 // -4- 467 // 5-6 468 // -7 469 using namespace std::chrono; 470 471 #define TIME milliseconds 472 #define UNIT *15 473 474 auto sc = rxsc::make_current_thread(); 475 auto so = rx::synchronize_in_one_worker(sc); 476 auto start = sc.now() + TIME(5 UNIT); 477 auto period = TIME(2 UNIT); 478 479 auto bufSource = rxs::interval(start, period, so) 480 | rxo::take(7) 481 | rxo::buffer_with_time(TIME(3 UNIT), so); 482 483 bufSource 484 .subscribe( 485 [](std::vector<long> counter){ 486 printf("on_next: "); 487 std::for_each(counter.begin(), counter.end(), [](long c){ 488 printf("%ld ", c); 489 }); 490 printf("\n"); 491 }, 492 [](rxu::error_ptr){ 493 printf("on_error\n"); 494 }, 495 [](){ 496 printf("on_completed\n"); 497 } 498 ); 499 } 500 } 501 } 502 503 SCENARIO("buffer with time on intervals, implicit coordination", "[buffer_with_time][operators][long][!hide]"){ 504 GIVEN("7 intervals of 2 seconds"){ 505 WHEN("the period is 2sec and the initial is 5sec"){ 506 // time: |-----------------| 507 // events: 1 2 3 4 5 6 7 508 // buffers: --- 509 // -1- 510 // 2-3 511 // -4- 512 // 5-6 513 // -7 514 using namespace std::chrono; 515 516 #define TIME milliseconds 517 #define UNIT *15 518 519 auto sc = rxsc::make_current_thread(); 520 auto so = rx::synchronize_in_one_worker(sc); 521 auto start = sc.now() + TIME(5 UNIT); 522 auto period = TIME(2 UNIT); 523 524 rx::observable<>::interval(start, period, so) 525 .take(7) 526 .buffer_with_time(TIME(3 UNIT)) 527 .subscribe( 528 [](std::vector<long> counter){ 529 printf("on_next: "); 530 std::for_each(counter.begin(), counter.end(), [](long c){ 531 printf("%ld ", c); 532 }); 533 printf("\n"); 534 }, 535 [](rxu::error_ptr){ 536 printf("on_error\n"); 537 }, 538 [](){ 539 printf("on_completed\n"); 540 } 541 ); 542 } 543 } 544 } 545 546 SCENARIO("buffer with time on overlapping intervals", "[buffer_with_time][operators][long][!hide]"){ 547 GIVEN("5 intervals of 2 seconds"){ 548 WHEN("the period is 2sec and the initial is 5sec"){ 549 // time: |-------------| 550 // events: 1 2 3 4 5 551 // buffers: ---- 552 // --1- 553 // 1-2- 554 // 2-3- 555 // 3-4- 556 // 4-5 557 // 5 558 using namespace std::chrono; 559 560 #define TIME milliseconds 561 #define UNIT *15 562 563 auto sc = rxsc::make_current_thread(); 564 auto so = rx::synchronize_in_one_worker(sc); 565 auto start = sc.now() + TIME(5 UNIT); 566 auto period = TIME(2 UNIT); 567 568 rx::observable<>::interval(start, period, so) 569 .take(5) 570 .buffer_with_time(TIME(4 UNIT), TIME(2 UNIT), so) 571 .subscribe( 572 [](std::vector<long> counter){ 573 printf("on_next: "); 574 std::for_each(counter.begin(), counter.end(), [](long c){ 575 printf("%ld ", c); 576 }); 577 printf("\n"); 578 }, 579 [](rxu::error_ptr){ 580 printf("on_error\n"); 581 }, 582 [](){ 583 printf("on_completed\n"); 584 } 585 ); 586 } 587 } 588 } 589 590 SCENARIO("buffer with time on overlapping intervals, implicit coordination", "[buffer_with_time][operators][long][!hide]"){ 591 GIVEN("5 intervals of 2 seconds"){ 592 WHEN("the period is 2sec and the initial is 5sec"){ 593 // time: |-------------| 594 // events: 1 2 3 4 5 595 // buffers: ---- 596 // --1- 597 // 1-2- 598 // 2-3- 599 // 3-4- 600 // 4-5 601 // 5 602 using namespace std::chrono; 603 604 #define TIME milliseconds 605 #define UNIT *15 606 607 auto sc = rxsc::make_current_thread(); 608 auto so = rx::synchronize_in_one_worker(sc); 609 auto start = sc.now() + TIME(5 UNIT); 610 auto period = TIME(2 UNIT); 611 612 rx::observable<>::interval(start, period, so) 613 .take(5) 614 .buffer_with_time(TIME(4 UNIT), TIME(2 UNIT)) 615 .subscribe( 616 [](std::vector<long> counter){ 617 printf("on_next: "); 618 std::for_each(counter.begin(), counter.end(), [](long c){ 619 printf("%ld ", c); 620 }); 621 printf("\n"); 622 }, 623 [](rxu::error_ptr){ 624 printf("on_error\n"); 625 }, 626 [](){ 627 printf("on_completed\n"); 628 } 629 ); 630 } 631 } 632 } 633 634 SCENARIO("buffer with time on intervals, error", "[buffer_with_time][operators][long][!hide]"){ 635 GIVEN("5 intervals of 2 seconds"){ 636 WHEN("the period is 2sec and the initial is 5sec"){ 637 // time: |-------------| 638 // events: 1 2 3 4 5 639 // buffers: ---- 640 // --1- 641 // 1-2- 642 // 2-3- 643 // 3-4- 644 // 4-5 645 // 5 646 using namespace std::chrono; 647 648 #define TIME milliseconds 649 #define UNIT *15 650 651 auto sc = rxsc::make_current_thread(); 652 auto so = rx::synchronize_in_one_worker(sc); 653 auto start = sc.now() + TIME(5 UNIT); 654 auto period = TIME(2 UNIT); 655 656 std::runtime_error ex("buffer_with_time on_error from source"); 657 658 auto ys1 = rx::observable<>::interval(start, period, so).take(5); 659 auto ys2 = rx::observable<>::error<long, std::runtime_error>(std::runtime_error("buffer_with_time on_error from source"), so); 660 ys1.concat(so, ys2) 661 .buffer_with_time(TIME(4 UNIT), TIME(2 UNIT), so) 662 .subscribe( 663 [](std::vector<long> counter){ 664 printf("on_next: "); 665 std::for_each(counter.begin(), counter.end(), [](long c){ 666 printf("%ld ", c); 667 }); 668 printf("\n"); 669 }, 670 [](rxu::error_ptr){ 671 printf("on_error\n"); 672 }, 673 [](){ 674 printf("on_completed\n"); 675 } 676 ); 677 } 678 } 679 } 680 681 SCENARIO("buffer with time, overlapping intervals", "[buffer_with_time][operators]"){ 682 GIVEN("1 hot observable of ints."){ 683 auto sc = rxsc::make_test(); 684 auto so = rx::synchronize_in_one_worker(sc); 685 auto w = sc.create_worker(); 686 const rxsc::test::messages<int> on; 687 const rxsc::test::messages<std::vector<int>> v_on; 688 689 auto xs = sc.make_hot_observable({ 690 on.next(100, 1), 691 on.next(210, 2), 692 on.next(240, 3), 693 on.next(280, 4), 694 on.next(320, 5), 695 on.next(350, 6), 696 on.next(380, 7), 697 on.next(420, 8), 698 on.next(470, 9), 699 on.completed(600) 700 }); 701 WHEN("group ints on intersecting intervals"){ 702 using namespace std::chrono; 703 704 auto res = w.start( 705 [&]() { 706 return xs 707 .buffer_with_time(milliseconds(100), milliseconds(70), so) 708 // forget type to workaround lambda deduction bug on msvc 2013 709 .as_dynamic(); 710 } 711 ); 712 713 THEN("the output contains groups of ints"){ 714 auto required = rxu::to_vector({ 715 v_on.next(301, rxu::to_vector({ 2, 3, 4 })), 716 v_on.next(371, rxu::to_vector({ 4, 5, 6 })), 717 v_on.next(441, rxu::to_vector({ 6, 7, 8 })), 718 v_on.next(511, rxu::to_vector({ 8, 9 })), 719 v_on.next(581, std::vector<int>()), 720 v_on.next(601, std::vector<int>()), 721 v_on.completed(601) 722 }); 723 auto actual = res.get_observer().messages(); 724 REQUIRE(required == actual); 725 } 726 727 THEN("there was one subscription and one unsubscription to the xs"){ 728 auto required = rxu::to_vector({ 729 on.subscribe(200, 600) 730 }); 731 auto actual = xs.subscriptions(); 732 REQUIRE(required == actual); 733 } 734 } 735 } 736 } 737 738 SCENARIO("buffer with time, intervals with skips", "[buffer_with_time][operators]"){ 739 GIVEN("1 hot observable of ints."){ 740 auto sc = rxsc::make_test(); 741 auto so = rx::synchronize_in_one_worker(sc); 742 auto w = sc.create_worker(); 743 const rxsc::test::messages<int> on; 744 const rxsc::test::messages<std::vector<int>> v_on; 745 746 auto xs = sc.make_hot_observable({ 747 on.next(100, 1), 748 on.next(210, 2), 749 on.next(240, 3), 750 on.next(280, 4), 751 on.next(320, 5), 752 on.next(350, 6), 753 on.next(380, 7), 754 on.next(420, 8), 755 on.next(470, 9), 756 on.completed(600) 757 }); 758 WHEN("group ints on intervals with skips"){ 759 using namespace std::chrono; 760 761 auto res = w.start( 762 [&]() { 763 return xs 764 .buffer_with_time(milliseconds(70), milliseconds(100), so) 765 // forget type to workaround lambda deduction bug on msvc 2013 766 .as_dynamic(); 767 } 768 ); 769 770 THEN("the output contains groups of ints"){ 771 auto required = rxu::to_vector({ 772 v_on.next(271, rxu::to_vector({ 2, 3 })), 773 v_on.next(371, rxu::to_vector({ 5, 6 })), 774 v_on.next(471, rxu::to_vector({ 8, 9 })), 775 v_on.next(571, std::vector<int>()), 776 v_on.completed(601) 777 }); 778 auto actual = res.get_observer().messages(); 779 REQUIRE(required == actual); 780 } 781 782 THEN("there was one subscription and one unsubscription to the xs"){ 783 auto required = rxu::to_vector({ 784 on.subscribe(200, 600) 785 }); 786 auto actual = xs.subscriptions(); 787 REQUIRE(required == actual); 788 } 789 } 790 } 791 } 792 793 SCENARIO("buffer with time, error", "[buffer_with_time][operators]"){ 794 GIVEN("1 hot observable of ints."){ 795 auto sc = rxsc::make_test(); 796 auto so = rx::synchronize_in_one_worker(sc); 797 auto w = sc.create_worker(); 798 const rxsc::test::messages<int> on; 799 const rxsc::test::messages<std::vector<int>> v_on; 800 801 std::runtime_error ex("buffer_with_time on_error from source"); 802 803 auto xs = sc.make_hot_observable({ 804 on.next(100, 1), 805 on.next(210, 2), 806 on.next(240, 3), 807 on.next(280, 4), 808 on.next(320, 5), 809 on.next(350, 6), 810 on.next(380, 7), 811 on.next(420, 8), 812 on.next(470, 9), 813 on.error(600, ex) 814 }); 815 WHEN("group ints on intersecting intervals"){ 816 using namespace std::chrono; 817 818 auto res = w.start( 819 [&]() { 820 return xs 821 .buffer_with_time(milliseconds(100), milliseconds(70), so) 822 // forget type to workaround lambda deduction bug on msvc 2013 823 .as_dynamic(); 824 } 825 ); 826 827 THEN("the output contains groups of ints"){ 828 auto required = rxu::to_vector({ 829 v_on.next(301, rxu::to_vector({ 2, 3, 4 })), 830 v_on.next(371, rxu::to_vector({ 4, 5, 6 })), 831 v_on.next(441, rxu::to_vector({ 6, 7, 8 })), 832 v_on.next(511, rxu::to_vector({ 8, 9 })), 833 v_on.next(581, std::vector<int>()), 834 v_on.error(601, ex) 835 }); 836 auto actual = res.get_observer().messages(); 837 REQUIRE(required == actual); 838 } 839 840 THEN("there was one subscription and one unsubscription to the xs"){ 841 auto required = rxu::to_vector({ 842 on.subscribe(200, 600) 843 }); 844 auto actual = xs.subscriptions(); 845 REQUIRE(required == actual); 846 } 847 } 848 } 849 } 850 851 SCENARIO("buffer with time, disposed", "[buffer_with_time][operators]"){ 852 GIVEN("1 hot observable of ints."){ 853 auto sc = rxsc::make_test(); 854 auto so = rx::synchronize_in_one_worker(sc); 855 auto w = sc.create_worker(); 856 const rxsc::test::messages<int> on; 857 const rxsc::test::messages<std::vector<int>> v_on; 858 859 auto xs = sc.make_hot_observable({ 860 on.next(100, 1), 861 on.next(210, 2), 862 on.next(240, 3), 863 on.next(280, 4), 864 on.next(320, 5), 865 on.next(350, 6), 866 on.next(380, 7), 867 on.next(420, 8), 868 on.next(470, 9), 869 on.completed(600) 870 }); 871 WHEN("group ints on intersecting intervals"){ 872 using namespace std::chrono; 873 874 auto res = w.start( 875 [&]() { 876 return xs 877 .buffer_with_time(milliseconds(100), milliseconds(70), so) 878 // forget type to workaround lambda deduction bug on msvc 2013 879 .as_dynamic(); 880 }, 881 370 882 ); 883 884 THEN("the output contains groups of ints"){ 885 auto required = rxu::to_vector({ 886 v_on.next(301, rxu::to_vector({ 2, 3, 4 })), 887 }); 888 auto actual = res.get_observer().messages(); 889 REQUIRE(required == actual); 890 } 891 892 THEN("there was one subscription and one unsubscription to the xs"){ 893 auto required = rxu::to_vector({ 894 on.subscribe(200, 371) 895 }); 896 auto actual = xs.subscriptions(); 897 REQUIRE(required == actual); 898 } 899 } 900 } 901 } 902 903 SCENARIO("buffer with time, same", "[buffer_with_time][operators]"){ 904 GIVEN("1 hot observable of ints."){ 905 auto sc = rxsc::make_test(); 906 auto so = rx::synchronize_in_one_worker(sc); 907 auto w = sc.create_worker(); 908 const rxsc::test::messages<int> on; 909 const rxsc::test::messages<std::vector<int>> v_on; 910 911 auto xs = sc.make_hot_observable({ 912 on.next(100, 1), 913 on.next(210, 2), 914 on.next(240, 3), 915 on.next(280, 4), 916 on.next(320, 5), 917 on.next(350, 6), 918 on.next(380, 7), 919 on.next(420, 8), 920 on.next(470, 9), 921 on.completed(600) 922 }); 923 WHEN("group ints on intervals"){ 924 using namespace std::chrono; 925 926 auto res = w.start( 927 [&]() { 928 return xs 929 .buffer_with_time(milliseconds(100), so) 930 // forget type to workaround lambda deduction bug on msvc 2013 931 .as_dynamic(); 932 } 933 ); 934 935 THEN("the output contains groups of ints"){ 936 auto required = rxu::to_vector({ 937 v_on.next(301, rxu::to_vector({ 2, 3, 4 })), 938 v_on.next(401, rxu::to_vector({ 5, 6, 7 })), 939 v_on.next(501, rxu::to_vector({ 8, 9 })), 940 v_on.next(601, std::vector<int>()), 941 v_on.completed(601) 942 }); 943 auto actual = res.get_observer().messages(); 944 REQUIRE(required == actual); 945 } 946 947 THEN("there was one subscription and one unsubscription to the xs"){ 948 auto required = rxu::to_vector({ 949 on.subscribe(200, 600) 950 }); 951 auto actual = xs.subscriptions(); 952 REQUIRE(required == actual); 953 } 954 } 955 } 956 } 957 958 SCENARIO("buffer with time or count, basic", "[buffer_with_time_or_count][operators]"){ 959 GIVEN("1 hot observable of ints."){ 960 auto sc = rxsc::make_test(); 961 auto so = rx::synchronize_in_one_worker(sc); 962 auto w = sc.create_worker(); 963 const rxsc::test::messages<int> on; 964 const rxsc::test::messages<std::vector<int>> v_on; 965 966 auto xs = sc.make_hot_observable({ 967 on.next(205, 1), 968 on.next(210, 2), 969 on.next(240, 3), 970 on.next(280, 4), 971 on.next(320, 5), 972 on.next(350, 6), 973 on.next(370, 7), 974 on.next(420, 8), 975 on.next(470, 9), 976 on.completed(600) 977 }); 978 WHEN("group ints on intervals"){ 979 using namespace std::chrono; 980 981 auto res = w.start( 982 [&]() { 983 return xs 984 | rxo::buffer_with_time_or_count(milliseconds(70), 3, so) 985 // forget type to workaround lambda deduction bug on msvc 2013 986 | rxo::as_dynamic(); 987 } 988 ); 989 990 THEN("the output contains groups of ints"){ 991 auto required = rxu::to_vector({ 992 v_on.next(241, rxu::to_vector({ 1, 2, 3 })), 993 v_on.next(312, rxu::to_vector({ 4 })), 994 v_on.next(371, rxu::to_vector({ 5, 6, 7 })), 995 v_on.next(442, rxu::to_vector({ 8 })), 996 v_on.next(512, rxu::to_vector({ 9 })), 997 v_on.next(582, std::vector<int>()), 998 v_on.next(601, std::vector<int>()), 999 v_on.completed(601) 1000 }); 1001 auto actual = res.get_observer().messages(); 1002 REQUIRE(required == actual); 1003 } 1004 1005 THEN("there was one subscription and one unsubscription to the xs"){ 1006 auto required = rxu::to_vector({ 1007 on.subscribe(200, 600) 1008 }); 1009 auto actual = xs.subscriptions(); 1010 REQUIRE(required == actual); 1011 } 1012 } 1013 } 1014 } 1015 1016 SCENARIO("buffer with time or count, error", "[buffer_with_time_or_count][operators]"){ 1017 GIVEN("1 hot observable of ints."){ 1018 auto sc = rxsc::make_test(); 1019 auto so = rx::synchronize_in_one_worker(sc); 1020 auto w = sc.create_worker(); 1021 const rxsc::test::messages<int> on; 1022 const rxsc::test::messages<std::vector<int>> v_on; 1023 1024 std::runtime_error ex("buffer_with_time on_error from source"); 1025 1026 auto xs = sc.make_hot_observable({ 1027 on.next(205, 1), 1028 on.next(210, 2), 1029 on.next(240, 3), 1030 on.next(280, 4), 1031 on.next(320, 5), 1032 on.next(350, 6), 1033 on.next(370, 7), 1034 on.next(420, 8), 1035 on.next(470, 9), 1036 on.error(600, ex) 1037 }); 1038 WHEN("group ints on intervals"){ 1039 using namespace std::chrono; 1040 1041 auto res = w.start( 1042 [&]() { 1043 return xs 1044 .buffer_with_time_or_count(milliseconds(70), 3, so) 1045 // forget type to workaround lambda deduction bug on msvc 2013 1046 .as_dynamic(); 1047 } 1048 ); 1049 1050 THEN("the output contains groups of ints"){ 1051 auto required = rxu::to_vector({ 1052 v_on.next(241, rxu::to_vector({ 1, 2, 3 })), 1053 v_on.next(312, rxu::to_vector({ 4 })), 1054 v_on.next(371, rxu::to_vector({ 5, 6, 7 })), 1055 v_on.next(442, rxu::to_vector({ 8 })), 1056 v_on.next(512, rxu::to_vector({ 9 })), 1057 v_on.next(582, std::vector<int>()), 1058 v_on.error(601, ex) 1059 }); 1060 auto actual = res.get_observer().messages(); 1061 REQUIRE(required == actual); 1062 } 1063 1064 THEN("there was one subscription and one unsubscription to the xs"){ 1065 auto required = rxu::to_vector({ 1066 on.subscribe(200, 600) 1067 }); 1068 auto actual = xs.subscriptions(); 1069 REQUIRE(required == actual); 1070 } 1071 } 1072 } 1073 } 1074 1075 SCENARIO("buffer with time or count, dispose", "[buffer_with_time_or_count][operators]"){ 1076 GIVEN("1 hot observable of ints."){ 1077 auto sc = rxsc::make_test(); 1078 auto so = rx::synchronize_in_one_worker(sc); 1079 auto w = sc.create_worker(); 1080 const rxsc::test::messages<int> on; 1081 const rxsc::test::messages<std::vector<int>> v_on; 1082 1083 auto xs = sc.make_hot_observable({ 1084 on.next(205, 1), 1085 on.next(210, 2), 1086 on.next(240, 3), 1087 on.next(280, 4), 1088 on.next(320, 5), 1089 on.next(350, 6), 1090 on.next(370, 7), 1091 on.next(420, 8), 1092 on.next(470, 9), 1093 on.completed(600) 1094 }); 1095 WHEN("group ints on intervals"){ 1096 using namespace std::chrono; 1097 1098 auto res = w.start( 1099 [&]() { 1100 return xs 1101 .buffer_with_time_or_count(milliseconds(70), 3, so) 1102 // forget type to workaround lambda deduction bug on msvc 2013 1103 .as_dynamic(); 1104 }, 1105 372 1106 ); 1107 1108 THEN("the output contains groups of ints"){ 1109 auto required = rxu::to_vector({ 1110 v_on.next(241, rxu::to_vector({ 1, 2, 3 })), 1111 v_on.next(312, rxu::to_vector({ 4 })), 1112 v_on.next(371, rxu::to_vector({ 5, 6, 7 })), 1113 }); 1114 auto actual = res.get_observer().messages(); 1115 REQUIRE(required == actual); 1116 } 1117 1118 THEN("there was one subscription and one unsubscription to the xs"){ 1119 auto required = rxu::to_vector({ 1120 on.subscribe(200, 373) 1121 }); 1122 auto actual = xs.subscriptions(); 1123 REQUIRE(required == actual); 1124 } 1125 } 1126 } 1127 } 1128 1129 SCENARIO("buffer with time or count, only time triggered", "[buffer_with_time_or_count][operators]"){ 1130 GIVEN("1 hot observable of ints."){ 1131 auto sc = rxsc::make_test(); 1132 auto so = rx::synchronize_in_one_worker(sc); 1133 auto w = sc.create_worker(); 1134 const rxsc::test::messages<int> on; 1135 const rxsc::test::messages<std::vector<int>> v_on; 1136 1137 auto xs = sc.make_hot_observable({ 1138 on.next(205, 1), 1139 on.next(305, 2), 1140 on.next(505, 3), 1141 on.next(605, 4), 1142 on.next(610, 5), 1143 on.completed(850) 1144 }); 1145 WHEN("group ints on intervals"){ 1146 using namespace std::chrono; 1147 1148 auto res = w.start( 1149 [&]() { 1150 return xs 1151 .buffer_with_time_or_count(milliseconds(100), 3, so) 1152 // forget type to workaround lambda deduction bug on msvc 2013 1153 .as_dynamic(); 1154 } 1155 ); 1156 1157 THEN("the output contains groups of ints"){ 1158 auto required = rxu::to_vector({ 1159 v_on.next(301, rxu::to_vector({ 1 })), 1160 v_on.next(401, rxu::to_vector({ 2 })), 1161 v_on.next(501, std::vector<int>()), 1162 v_on.next(601, rxu::to_vector({ 3 })), 1163 v_on.next(701, rxu::to_vector({ 4, 5 })), 1164 v_on.next(801, std::vector<int>()), 1165 v_on.next(851, std::vector<int>()), 1166 v_on.completed(851) 1167 }); 1168 auto actual = res.get_observer().messages(); 1169 REQUIRE(required == actual); 1170 } 1171 1172 THEN("there was one subscription and one unsubscription to the xs"){ 1173 auto required = rxu::to_vector({ 1174 on.subscribe(200, 850) 1175 }); 1176 auto actual = xs.subscriptions(); 1177 REQUIRE(required == actual); 1178 } 1179 } 1180 } 1181 } 1182 1183 SCENARIO("buffer with time or count, only count triggered", "[buffer_with_time_or_count][operators]"){ 1184 GIVEN("1 hot observable of ints."){ 1185 auto sc = rxsc::make_test(); 1186 auto so = rx::synchronize_in_one_worker(sc); 1187 auto w = sc.create_worker(); 1188 const rxsc::test::messages<int> on; 1189 const rxsc::test::messages<std::vector<int>> v_on; 1190 1191 auto xs = sc.make_hot_observable({ 1192 on.next(205, 1), 1193 on.next(305, 2), 1194 on.next(505, 3), 1195 on.next(605, 4), 1196 on.next(610, 5), 1197 on.completed(850) 1198 }); 1199 WHEN("group ints on intervals"){ 1200 using namespace std::chrono; 1201 1202 auto res = w.start( 1203 [&]() { 1204 return xs 1205 .buffer_with_time_or_count(milliseconds(370), 2, so) 1206 // forget type to workaround lambda deduction bug on msvc 2013 1207 .as_dynamic(); 1208 } 1209 ); 1210 1211 THEN("the output contains groups of ints"){ 1212 auto required = rxu::to_vector({ 1213 v_on.next(306, rxu::to_vector({ 1, 2 })), 1214 v_on.next(606, rxu::to_vector({ 3, 4 })), 1215 v_on.next(851, rxu::to_vector({ 5 })), 1216 v_on.completed(851) 1217 }); 1218 auto actual = res.get_observer().messages(); 1219 REQUIRE(required == actual); 1220 } 1221 1222 THEN("there was one subscription and one unsubscription to the xs"){ 1223 auto required = rxu::to_vector({ 1224 on.subscribe(200, 850) 1225 }); 1226 auto actual = xs.subscriptions(); 1227 REQUIRE(required == actual); 1228 } 1229 } 1230 } 1231 } 1232