1 #include "../test.h" 2 #include <rxcpp/operators/rx-skip_until.hpp> 3 4 SCENARIO("skip_until, some data next", "[skip_until][skip][operators]"){ 5 GIVEN("2 sources"){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 10 auto l = sc.make_hot_observable({ 11 on.next(150, 1), 12 on.next(210, 2), 13 on.next(220, 3), 14 on.next(230, 4), 15 on.next(240, 5), 16 on.completed(250) 17 }); 18 19 auto r = sc.make_hot_observable({ 20 on.next(150, 1), 21 on.next(225, 99), 22 on.completed(230) 23 }); 24 25 WHEN("one is taken until the other emits a marble"){ 26 27 auto res = w.start( 28 [&]() { 29 return l 30 | rxo::skip_until(r) 31 // forget type to workaround lambda deduction bug on msvc 2013 32 | rxo::as_dynamic(); 33 } 34 ); 35 36 THEN("the output only contains items sent while subscribed"){ 37 auto required = rxu::to_vector({ 38 on.next(230, 4), 39 on.next(240, 5), 40 on.completed(250) 41 }); 42 auto actual = res.get_observer().messages(); 43 REQUIRE(required == actual); 44 } 45 46 THEN("there was 1 subscription/unsubscription to the source"){ 47 auto required = rxu::to_vector({ 48 on.subscribe(200, 250) 49 }); 50 auto actual = l.subscriptions(); 51 REQUIRE(required == actual); 52 } 53 54 THEN("there was 1 subscription/unsubscription to the trigger"){ 55 auto required = rxu::to_vector({ 56 on.subscribe(200, 225) 57 }); 58 auto actual = r.subscriptions(); 59 REQUIRE(required == actual); 60 } 61 } 62 } 63 } 64 65 SCENARIO("skip_until, some data error", "[skip_until][skip][operators]"){ 66 GIVEN("2 sources"){ 67 auto sc = rxsc::make_test(); 68 auto w = sc.create_worker(); 69 const rxsc::test::messages<int> on; 70 71 std::runtime_error ex("skip_until on_error from source"); 72 73 auto l = sc.make_hot_observable({ 74 on.next(150, 1), 75 on.next(210, 2), 76 on.next(220, 3), 77 on.next(230, 4), 78 on.next(240, 5), 79 on.completed(250) 80 }); 81 82 auto r = sc.make_hot_observable({ 83 on.next(150, 1), 84 on.error(225, ex) 85 }); 86 87 WHEN("one is taken until the other emits a marble"){ 88 89 auto res = w.start( 90 [&]() { 91 return l 92 .skip_until(r) 93 // forget type to workaround lambda deduction bug on msvc 2013 94 .as_dynamic(); 95 } 96 ); 97 98 THEN("the output only contains error message"){ 99 auto required = rxu::to_vector({ 100 on.error(225, ex) 101 }); 102 auto actual = res.get_observer().messages(); 103 REQUIRE(required == actual); 104 } 105 106 THEN("there was 1 subscription/unsubscription to the source"){ 107 auto required = rxu::to_vector({ 108 on.subscribe(200, 225) 109 }); 110 auto actual = l.subscriptions(); 111 REQUIRE(required == actual); 112 } 113 114 THEN("there was 1 subscription/unsubscription to the trigger"){ 115 auto required = rxu::to_vector({ 116 on.subscribe(200, 225) 117 }); 118 auto actual = r.subscriptions(); 119 REQUIRE(required == actual); 120 } 121 } 122 } 123 } 124 125 SCENARIO("skip_until, error some data", "[skip_until][skip][operators]"){ 126 GIVEN("2 sources"){ 127 auto sc = rxsc::make_test(); 128 auto w = sc.create_worker(); 129 const rxsc::test::messages<int> on; 130 131 std::runtime_error ex("skip_until on_error from source"); 132 133 auto l = sc.make_hot_observable({ 134 on.next(150, 1), 135 on.next(210, 2), 136 on.error(220, ex) 137 }); 138 139 auto r = sc.make_hot_observable({ 140 on.next(150, 1), 141 on.next(230, 3), 142 on.completed(250) 143 }); 144 145 WHEN("one is taken until the other emits a marble"){ 146 147 auto res = w.start( 148 [&]() { 149 return l 150 .skip_until(r) 151 // forget type to workaround lambda deduction bug on msvc 2013 152 .as_dynamic(); 153 } 154 ); 155 156 THEN("the output only contains error message"){ 157 auto required = rxu::to_vector({ 158 on.error(220, ex) 159 }); 160 auto actual = res.get_observer().messages(); 161 REQUIRE(required == actual); 162 } 163 164 THEN("there was 1 subscription/unsubscription to the source"){ 165 auto required = rxu::to_vector({ 166 on.subscribe(200, 220) 167 }); 168 auto actual = l.subscriptions(); 169 REQUIRE(required == actual); 170 } 171 172 THEN("there was 1 subscription/unsubscription to the trigger"){ 173 auto required = rxu::to_vector({ 174 on.subscribe(200, 220) 175 }); 176 auto actual = r.subscriptions(); 177 REQUIRE(required == actual); 178 } 179 } 180 } 181 } 182 183 SCENARIO("skip_until, some data empty", "[skip_until][skip][operators]"){ 184 GIVEN("2 sources"){ 185 auto sc = rxsc::make_test(); 186 auto w = sc.create_worker(); 187 const rxsc::test::messages<int> on; 188 189 auto l = sc.make_hot_observable({ 190 on.next(150, 1), 191 on.next(210, 2), 192 on.next(220, 3), 193 on.next(230, 4), 194 on.next(240, 5), 195 on.completed(250) 196 }); 197 198 auto r = sc.make_hot_observable({ 199 on.next(150, 1), 200 on.completed(225) 201 }); 202 203 WHEN("one is taken until the other emits a marble"){ 204 205 auto res = w.start( 206 [&]() { 207 return l 208 .skip_until(r) 209 // forget type to workaround lambda deduction bug on msvc 2013 210 .as_dynamic(); 211 } 212 ); 213 214 THEN("the output is empty"){ 215 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 216 auto actual = res.get_observer().messages(); 217 REQUIRE(required == actual); 218 } 219 220 THEN("there was 1 subscription/unsubscription to the source"){ 221 auto required = rxu::to_vector({ 222 on.subscribe(200, 250) 223 }); 224 auto actual = l.subscriptions(); 225 REQUIRE(required == actual); 226 } 227 228 THEN("there was 1 subscription/unsubscription to the trigger"){ 229 auto required = rxu::to_vector({ 230 on.subscribe(200, 225) 231 }); 232 auto actual = r.subscriptions(); 233 REQUIRE(required == actual); 234 } 235 } 236 } 237 } 238 239 SCENARIO("skip_until, never next", "[skip_until][skip][operators]"){ 240 GIVEN("2 sources"){ 241 auto sc = rxsc::make_test(); 242 auto w = sc.create_worker(); 243 const rxsc::test::messages<int> on; 244 245 auto l = sc.make_hot_observable({ 246 on.next(150, 1) 247 }); 248 249 auto r = sc.make_hot_observable({ 250 on.next(150, 1), 251 on.next(225, 2), 252 on.completed(250) 253 }); 254 255 WHEN("one is taken until the other emits a marble"){ 256 257 auto res = w.start( 258 [&]() { 259 return l 260 .skip_until(r) 261 // forget type to workaround lambda deduction bug on msvc 2013 262 .as_dynamic(); 263 } 264 ); 265 266 THEN("the output is empty"){ 267 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 268 auto actual = res.get_observer().messages(); 269 REQUIRE(required == actual); 270 } 271 272 THEN("there was 1 subscription/unsubscription to the source"){ 273 auto required = rxu::to_vector({ 274 on.subscribe(200, 1000) 275 }); 276 auto actual = l.subscriptions(); 277 REQUIRE(required == actual); 278 } 279 280 THEN("there was 1 subscription/unsubscription to the trigger"){ 281 auto required = rxu::to_vector({ 282 on.subscribe(200, 225) 283 }); 284 auto actual = r.subscriptions(); 285 REQUIRE(required == actual); 286 } 287 } 288 } 289 } 290 291 SCENARIO("skip_until, never error", "[skip_until][skip][operators]"){ 292 GIVEN("2 sources"){ 293 auto sc = rxsc::make_test(); 294 auto w = sc.create_worker(); 295 const rxsc::test::messages<int> on; 296 297 std::runtime_error ex("skip_until on_error from source"); 298 299 auto l = sc.make_hot_observable({ 300 on.next(150, 1) 301 }); 302 303 auto r = sc.make_hot_observable({ 304 on.next(150, 1), 305 on.error(225, ex) 306 }); 307 308 WHEN("one is taken until the other emits a marble"){ 309 310 auto res = w.start( 311 [&]() { 312 return l 313 .skip_until(r) 314 // forget type to workaround lambda deduction bug on msvc 2013 315 .as_dynamic(); 316 } 317 ); 318 319 THEN("the output only contains error message"){ 320 auto required = rxu::to_vector({ 321 on.error(225, ex) 322 }); 323 auto actual = res.get_observer().messages(); 324 REQUIRE(required == actual); 325 } 326 327 THEN("there was 1 subscription/unsubscription to the source"){ 328 auto required = rxu::to_vector({ 329 on.subscribe(200, 225) 330 }); 331 auto actual = l.subscriptions(); 332 REQUIRE(required == actual); 333 } 334 335 THEN("there was 1 subscription/unsubscription to the trigger"){ 336 auto required = rxu::to_vector({ 337 on.subscribe(200, 225) 338 }); 339 auto actual = r.subscriptions(); 340 REQUIRE(required == actual); 341 } 342 } 343 } 344 } 345 346 SCENARIO("skip_until, some data error after completed", "[skip_until][skip][operators]"){ 347 GIVEN("2 sources"){ 348 auto sc = rxsc::make_test(); 349 auto w = sc.create_worker(); 350 const rxsc::test::messages<int> on; 351 352 std::runtime_error ex("skip_until on_error from source"); 353 354 auto l = sc.make_hot_observable({ 355 on.next(150, 1), 356 on.next(210, 2), 357 on.next(220, 3), 358 on.next(230, 4), 359 on.next(240, 5), 360 on.completed(250) 361 }); 362 363 auto r = sc.make_hot_observable({ 364 on.next(150, 1), 365 on.error(300, ex) 366 }); 367 368 WHEN("one is taken until the other emits a marble"){ 369 370 auto res = w.start( 371 [&]() { 372 return l 373 .skip_until(r) 374 // forget type to workaround lambda deduction bug on msvc 2013 375 .as_dynamic(); 376 } 377 ); 378 379 THEN("the output only contains error message"){ 380 auto required = rxu::to_vector({ 381 on.error(300, ex) 382 }); 383 auto actual = res.get_observer().messages(); 384 REQUIRE(required == actual); 385 } 386 387 THEN("there was 1 subscription/unsubscription to the source"){ 388 auto required = rxu::to_vector({ 389 on.subscribe(200, 250) 390 }); 391 auto actual = l.subscriptions(); 392 REQUIRE(required == actual); 393 } 394 395 THEN("there was 1 subscription/unsubscription to the trigger"){ 396 auto required = rxu::to_vector({ 397 on.subscribe(200, 300) 398 }); 399 auto actual = r.subscriptions(); 400 REQUIRE(required == actual); 401 } 402 } 403 } 404 } 405 406 SCENARIO("skip_until, some data never", "[skip_until][skip][operators]"){ 407 GIVEN("2 sources"){ 408 auto sc = rxsc::make_test(); 409 auto w = sc.create_worker(); 410 const rxsc::test::messages<int> on; 411 412 auto l = sc.make_hot_observable({ 413 on.next(150, 1), 414 on.next(210, 2), 415 on.next(220, 3), 416 on.next(230, 4), 417 on.next(240, 5), 418 on.completed(250) 419 }); 420 421 auto r = sc.make_hot_observable({ 422 on.next(150, 1) 423 }); 424 425 WHEN("one is taken until the other emits a marble"){ 426 427 auto res = w.start( 428 [&]() { 429 return l 430 .skip_until(r) 431 // forget type to workaround lambda deduction bug on msvc 2013 432 .as_dynamic(); 433 } 434 ); 435 436 THEN("the output is empty"){ 437 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 438 auto actual = res.get_observer().messages(); 439 REQUIRE(required == actual); 440 } 441 442 THEN("there was 1 subscription/unsubscription to the source"){ 443 auto required = rxu::to_vector({ 444 on.subscribe(200, 250) 445 }); 446 auto actual = l.subscriptions(); 447 REQUIRE(required == actual); 448 } 449 450 THEN("there was 1 subscription/unsubscription to the trigger"){ 451 auto required = rxu::to_vector({ 452 on.subscribe(200, 1000) 453 }); 454 auto actual = r.subscriptions(); 455 REQUIRE(required == actual); 456 } 457 } 458 } 459 } 460 461 SCENARIO("skip_until, never empty", "[skip_until][skip][operators]"){ 462 GIVEN("2 sources"){ 463 auto sc = rxsc::make_test(); 464 auto w = sc.create_worker(); 465 const rxsc::test::messages<int> on; 466 467 auto l = sc.make_hot_observable({ 468 on.next(150, 1) 469 }); 470 471 auto r = sc.make_hot_observable({ 472 on.next(150, 1), 473 on.completed(225) 474 }); 475 476 WHEN("one is taken until the other emits a marble"){ 477 478 auto res = w.start( 479 [&]() { 480 return l 481 .skip_until(r) 482 // forget type to workaround lambda deduction bug on msvc 2013 483 .as_dynamic(); 484 } 485 ); 486 487 THEN("the output is empty"){ 488 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 489 auto actual = res.get_observer().messages(); 490 REQUIRE(required == actual); 491 } 492 493 THEN("there was 1 subscription/unsubscription to the source"){ 494 auto required = rxu::to_vector({ 495 on.subscribe(200, 1000) 496 }); 497 auto actual = l.subscriptions(); 498 REQUIRE(required == actual); 499 } 500 501 THEN("there was 1 subscription/unsubscription to the trigger"){ 502 auto required = rxu::to_vector({ 503 on.subscribe(200, 225) 504 }); 505 auto actual = r.subscriptions(); 506 REQUIRE(required == actual); 507 } 508 } 509 } 510 } 511 512 SCENARIO("skip_until, never never", "[skip_until][skip][operators]"){ 513 GIVEN("2 sources"){ 514 auto sc = rxsc::make_test(); 515 auto w = sc.create_worker(); 516 const rxsc::test::messages<int> on; 517 518 auto l = sc.make_hot_observable({ 519 on.next(150, 1) 520 }); 521 522 auto r = sc.make_hot_observable({ 523 on.next(150, 1) 524 }); 525 526 WHEN("one is taken until the other emits a marble"){ 527 528 auto res = w.start( 529 [&]() { 530 return l 531 .skip_until(r) 532 // forget type to workaround lambda deduction bug on msvc 2013 533 .as_dynamic(); 534 } 535 ); 536 537 THEN("the output is empty"){ 538 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 539 auto actual = res.get_observer().messages(); 540 REQUIRE(required == actual); 541 } 542 543 THEN("there was 1 subscription/unsubscription to the source"){ 544 auto required = rxu::to_vector({ 545 on.subscribe(200, 1000) 546 }); 547 auto actual = l.subscriptions(); 548 REQUIRE(required == actual); 549 } 550 551 THEN("there was 1 subscription/unsubscription to the trigger"){ 552 auto required = rxu::to_vector({ 553 on.subscribe(200, 1000) 554 }); 555 auto actual = r.subscriptions(); 556 REQUIRE(required == actual); 557 } 558 } 559 } 560 } 561 562 SCENARIO("skip_until time point, some data next", "[skip_until][skip][operators]"){ 563 GIVEN("2 sources"){ 564 auto sc = rxsc::make_test(); 565 auto so = rx::synchronize_in_one_worker(sc); 566 auto w = sc.create_worker(); 567 const rxsc::test::messages<int> on; 568 569 auto l = sc.make_hot_observable({ 570 on.next(150, 1), 571 on.next(210, 2), 572 on.next(220, 3), 573 on.next(230, 4), 574 on.next(240, 5), 575 on.completed(250) 576 }); 577 578 auto t = sc.to_time_point(225); 579 580 WHEN("invoked with a time point"){ 581 582 auto res = w.start( 583 [&]() { 584 return l 585 | rxo::skip_until(t, so) 586 // forget type to workaround lambda deduction bug on msvc 2013 587 | rxo::as_dynamic(); 588 } 589 ); 590 591 THEN("the output only contains items sent while subscribed"){ 592 auto required = rxu::to_vector({ 593 on.next(231, 4), 594 on.next(241, 5), 595 on.completed(251) 596 }); 597 auto actual = res.get_observer().messages(); 598 REQUIRE(required == actual); 599 } 600 601 THEN("there was 1 subscription/unsubscription to the source"){ 602 auto required = rxu::to_vector({ 603 on.subscribe(200, 250) 604 }); 605 auto actual = l.subscriptions(); 606 REQUIRE(required == actual); 607 } 608 } 609 } 610 }