1 #include "../test.h" 2 #include "rxcpp/operators/rx-amb.hpp" 3 4 SCENARIO("variadic amb never 3", "[amb][join][operators]"){ 5 GIVEN("3 hot observables of ints."){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 10 auto ys1 = sc.make_hot_observable({ 11 on.next(100, 1) 12 }); 13 14 auto ys2 = sc.make_hot_observable({ 15 on.next(110, 2) 16 }); 17 18 auto ys3 = sc.make_hot_observable({ 19 on.next(120, 3) 20 }); 21 22 WHEN("the first observable is selected to produce ints"){ 23 24 auto res = w.start( 25 [&]() { 26 return ys1 27 | rxo::amb(ys2, ys3) 28 // forget type to workaround lambda deduction bug on msvc 2013 29 | rxo::as_dynamic(); 30 } 31 ); 32 33 THEN("the output is empty"){ 34 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 35 auto actual = res.get_observer().messages(); 36 REQUIRE(required == actual); 37 } 38 39 THEN("there was one subscription and one unsubscription to the ys1"){ 40 auto required = rxu::to_vector({ 41 on.subscribe(200, 1000) 42 }); 43 auto actual = ys1.subscriptions(); 44 REQUIRE(required == actual); 45 } 46 47 THEN("there was one subscription and one unsubscription to the ys2"){ 48 auto required = rxu::to_vector({ 49 on.subscribe(200, 1000) 50 }); 51 auto actual = ys2.subscriptions(); 52 REQUIRE(required == actual); 53 } 54 55 THEN("there was one subscription and one unsubscription to the ys3"){ 56 auto required = rxu::to_vector({ 57 on.subscribe(200, 1000) 58 }); 59 auto actual = ys3.subscriptions(); 60 REQUIRE(required == actual); 61 } 62 } 63 } 64 } 65 66 SCENARIO("variadic amb never empty", "[amb][join][operators]"){ 67 GIVEN("2 hot observables of ints."){ 68 auto sc = rxsc::make_test(); 69 auto w = sc.create_worker(); 70 const rxsc::test::messages<int> on; 71 72 auto ys1 = sc.make_hot_observable({ 73 on.next(100, 1) 74 }); 75 76 auto ys2 = sc.make_hot_observable({ 77 on.next(110, 2), 78 on.completed(400) 79 }); 80 81 WHEN("the first observable is selected to produce ints"){ 82 83 auto res = w.start( 84 [&]() { 85 return ys1 86 .amb(ys2) 87 // forget type to workaround lambda deduction bug on msvc 2013 88 .as_dynamic(); 89 } 90 ); 91 92 THEN("the output contains only complete message"){ 93 auto required = rxu::to_vector({ 94 on.completed(400) 95 }); 96 auto actual = res.get_observer().messages(); 97 REQUIRE(required == actual); 98 } 99 100 THEN("there was one subscription and one unsubscription to the ys1"){ 101 auto required = rxu::to_vector({ 102 on.subscribe(200, 400) 103 }); 104 auto actual = ys1.subscriptions(); 105 REQUIRE(required == actual); 106 } 107 108 THEN("there was one subscription and one unsubscription to the ys2"){ 109 auto required = rxu::to_vector({ 110 on.subscribe(200, 400) 111 }); 112 auto actual = ys2.subscriptions(); 113 REQUIRE(required == actual); 114 } 115 } 116 } 117 } 118 119 SCENARIO("variadic amb empty never", "[amb][join][operators]"){ 120 GIVEN("2 hot observables of ints."){ 121 auto sc = rxsc::make_test(); 122 auto w = sc.create_worker(); 123 const rxsc::test::messages<int> on; 124 125 auto ys1 = sc.make_hot_observable({ 126 on.next(100, 1), 127 on.completed(400) 128 }); 129 130 auto ys2 = sc.make_hot_observable({ 131 on.next(110, 2) 132 }); 133 134 WHEN("the first observable is selected to produce ints"){ 135 136 auto res = w.start( 137 [&]() { 138 return ys1 139 .amb(ys2) 140 // forget type to workaround lambda deduction bug on msvc 2013 141 .as_dynamic(); 142 } 143 ); 144 145 THEN("the output contains only complete message"){ 146 auto required = rxu::to_vector({ 147 on.completed(400) 148 }); 149 auto actual = res.get_observer().messages(); 150 REQUIRE(required == actual); 151 } 152 153 THEN("there was one subscription and one unsubscription to the ys1"){ 154 auto required = rxu::to_vector({ 155 on.subscribe(200, 400) 156 }); 157 auto actual = ys1.subscriptions(); 158 REQUIRE(required == actual); 159 } 160 161 THEN("there was one subscription and one unsubscription to the ys2"){ 162 auto required = rxu::to_vector({ 163 on.subscribe(200, 400) 164 }); 165 auto actual = ys2.subscriptions(); 166 REQUIRE(required == actual); 167 } 168 } 169 } 170 } 171 172 SCENARIO("variadic amb completes", "[amb][join][operators]"){ 173 GIVEN("3 cold observables of ints."){ 174 auto sc = rxsc::make_test(); 175 auto w = sc.create_worker(); 176 const rxsc::test::messages<int> on; 177 178 auto ys1 = sc.make_cold_observable({ 179 on.next(10, 101), 180 on.next(110, 102), 181 on.next(210, 103), 182 on.completed(310) 183 }); 184 185 auto ys2 = sc.make_cold_observable({ 186 on.next(20, 201), 187 on.next(120, 202), 188 on.next(220, 203), 189 on.completed(320) 190 }); 191 192 auto ys3 = sc.make_cold_observable({ 193 on.next(30, 301), 194 on.next(130, 302), 195 on.next(230, 303), 196 on.completed(330) 197 }); 198 199 WHEN("the first observable is selected to produce ints"){ 200 201 auto res = w.start( 202 [&]() { 203 return ys1 204 .amb(ys2, ys3) 205 // forget type to workaround lambda deduction bug on msvc 2013 206 .as_dynamic(); 207 } 208 ); 209 210 THEN("the output contains ints from the first observable"){ 211 auto required = rxu::to_vector({ 212 on.next(210, 101), 213 on.next(310, 102), 214 on.next(410, 103), 215 on.completed(510) 216 }); 217 auto actual = res.get_observer().messages(); 218 REQUIRE(required == actual); 219 } 220 221 THEN("there was one subscription and one unsubscription to the ys1"){ 222 auto required = rxu::to_vector({ 223 on.subscribe(200, 510) 224 }); 225 auto actual = ys1.subscriptions(); 226 REQUIRE(required == actual); 227 } 228 229 THEN("there was one subscription and one unsubscription to the ys2"){ 230 auto required = rxu::to_vector({ 231 on.subscribe(200, 210) 232 }); 233 auto actual = ys2.subscriptions(); 234 REQUIRE(required == actual); 235 } 236 237 THEN("there was one subscription and one unsubscription to the ys3"){ 238 auto required = rxu::to_vector({ 239 on.subscribe(200, 210) 240 }); 241 auto actual = ys3.subscriptions(); 242 REQUIRE(required == actual); 243 } 244 } 245 } 246 } 247 248 SCENARIO("variadic amb winner&owner throws", "[amb][join][operators]"){ 249 GIVEN("3 cold observables of ints."){ 250 auto sc = rxsc::make_test(); 251 auto w = sc.create_worker(); 252 const rxsc::test::messages<int> on; 253 254 std::runtime_error ex("amb on_error from source"); 255 256 auto ys1 = sc.make_cold_observable({ 257 on.next(10, 101), 258 on.next(110, 102), 259 on.next(210, 103), 260 on.error(310, ex) 261 }); 262 263 auto ys2 = sc.make_cold_observable({ 264 on.next(20, 201), 265 on.next(120, 202), 266 on.next(220, 203), 267 on.completed(320) 268 }); 269 270 auto ys3 = sc.make_cold_observable({ 271 on.next(30, 301), 272 on.next(130, 302), 273 on.next(230, 303), 274 on.completed(330) 275 }); 276 277 WHEN("the first observable is selected to produce ints"){ 278 279 auto res = w.start( 280 [&]() { 281 return ys1 282 .amb(ys2, ys3) 283 // forget type to workaround lambda deduction bug on msvc 2013 284 .as_dynamic(); 285 } 286 ); 287 288 THEN("the output contains ints from the first observable"){ 289 auto required = rxu::to_vector({ 290 on.next(210, 101), 291 on.next(310, 102), 292 on.next(410, 103), 293 on.error(510, ex) 294 }); 295 auto actual = res.get_observer().messages(); 296 REQUIRE(required == actual); 297 } 298 299 THEN("there was one subscription and one unsubscription to the ys1"){ 300 auto required = rxu::to_vector({ 301 on.subscribe(200, 510) 302 }); 303 auto actual = ys1.subscriptions(); 304 REQUIRE(required == actual); 305 } 306 307 THEN("there was one subscription and one unsubscription to the ys2"){ 308 auto required = rxu::to_vector({ 309 on.subscribe(200, 210) 310 }); 311 auto actual = ys2.subscriptions(); 312 REQUIRE(required == actual); 313 } 314 315 THEN("there was one subscription and one unsubscription to the ys3"){ 316 auto required = rxu::to_vector({ 317 on.subscribe(200, 210) 318 }); 319 auto actual = ys3.subscriptions(); 320 REQUIRE(required == actual); 321 } 322 } 323 } 324 } 325 326 SCENARIO("variadic amb winner&non-owner throws", "[amb][join][operators]"){ 327 GIVEN("3 cold observables of ints."){ 328 auto sc = rxsc::make_test(); 329 auto w = sc.create_worker(); 330 const rxsc::test::messages<int> on; 331 332 std::runtime_error ex("amb on_error from source"); 333 334 auto ys1 = sc.make_cold_observable({ 335 on.next(10, 101), 336 on.next(110, 102), 337 on.next(210, 103), 338 on.error(310, ex) 339 }); 340 341 auto ys2 = sc.make_cold_observable({ 342 on.next(20, 201), 343 on.next(120, 202), 344 on.next(220, 203), 345 on.completed(320) 346 }); 347 348 auto ys3 = sc.make_cold_observable({ 349 on.next(30, 301), 350 on.next(130, 302), 351 on.next(230, 303), 352 on.completed(330) 353 }); 354 355 WHEN("the first observable is selected to produce ints"){ 356 357 auto res = w.start( 358 [&]() { 359 return ys2 360 .amb(ys1, ys3) 361 // forget type to workaround lambda deduction bug on msvc 2013 362 .as_dynamic(); 363 } 364 ); 365 366 THEN("the output contains ints from the first observable"){ 367 auto required = rxu::to_vector({ 368 on.next(210, 101), 369 on.next(310, 102), 370 on.next(410, 103), 371 on.error(510, ex) 372 }); 373 auto actual = res.get_observer().messages(); 374 REQUIRE(required == actual); 375 } 376 377 THEN("there was one subscription and one unsubscription to the ys1"){ 378 auto required = rxu::to_vector({ 379 on.subscribe(200, 510) 380 }); 381 auto actual = ys1.subscriptions(); 382 REQUIRE(required == actual); 383 } 384 385 THEN("there was one subscription and one unsubscription to the ys2"){ 386 auto required = rxu::to_vector({ 387 on.subscribe(200, 210) 388 }); 389 auto actual = ys2.subscriptions(); 390 REQUIRE(required == actual); 391 } 392 393 THEN("there was one subscription and one unsubscription to the ys3"){ 394 auto required = rxu::to_vector({ 395 on.subscribe(200, 210) 396 }); 397 auto actual = ys3.subscriptions(); 398 REQUIRE(required == actual); 399 } 400 } 401 } 402 } 403 404 SCENARIO("variadic amb loser&non-owner throws", "[amb][join][operators]"){ 405 GIVEN("3 cold observables of ints."){ 406 auto sc = rxsc::make_test(); 407 auto w = sc.create_worker(); 408 const rxsc::test::messages<int> on; 409 410 std::runtime_error ex("amb on_error from source"); 411 412 auto ys1 = sc.make_cold_observable({ 413 on.next(10, 101), 414 on.next(110, 102), 415 on.next(210, 103), 416 on.completed(310) 417 }); 418 419 auto ys2 = sc.make_cold_observable({ 420 on.error(20, ex) 421 }); 422 423 auto ys3 = sc.make_cold_observable({ 424 on.next(30, 301), 425 on.next(130, 302), 426 on.next(230, 303), 427 on.completed(330) 428 }); 429 430 WHEN("the first observable is selected to produce ints"){ 431 432 auto res = w.start( 433 [&]() { 434 return ys2 435 .amb(ys1, ys3) 436 // forget type to workaround lambda deduction bug on msvc 2013 437 .as_dynamic(); 438 } 439 ); 440 441 THEN("the output contains ints from the first observable"){ 442 auto required = rxu::to_vector({ 443 on.next(210, 101), 444 on.next(310, 102), 445 on.next(410, 103), 446 on.completed(510) 447 }); 448 auto actual = res.get_observer().messages(); 449 REQUIRE(required == actual); 450 } 451 452 THEN("there was one subscription and one unsubscription to the ys1"){ 453 auto required = rxu::to_vector({ 454 on.subscribe(200, 510) 455 }); 456 auto actual = ys1.subscriptions(); 457 REQUIRE(required == actual); 458 } 459 460 THEN("there was one subscription and one unsubscription to the ys2"){ 461 auto required = rxu::to_vector({ 462 on.subscribe(200, 210) 463 }); 464 auto actual = ys2.subscriptions(); 465 REQUIRE(required == actual); 466 } 467 468 THEN("there was one subscription and one unsubscription to the ys3"){ 469 auto required = rxu::to_vector({ 470 on.subscribe(200, 210) 471 }); 472 auto actual = ys3.subscriptions(); 473 REQUIRE(required == actual); 474 } 475 } 476 } 477 } 478 479 SCENARIO("variadic amb loser&owner throws", "[amb][join][operators]"){ 480 GIVEN("3 cold observables of ints."){ 481 auto sc = rxsc::make_test(); 482 auto w = sc.create_worker(); 483 const rxsc::test::messages<int> on; 484 485 std::runtime_error ex("amb on_error from source"); 486 487 auto ys1 = sc.make_cold_observable({ 488 on.next(10, 101), 489 on.next(110, 102), 490 on.next(210, 103), 491 on.completed(310) 492 }); 493 494 auto ys2 = sc.make_cold_observable({ 495 on.error(20, ex) 496 }); 497 498 auto ys3 = sc.make_cold_observable({ 499 on.next(30, 301), 500 on.next(130, 302), 501 on.next(230, 303), 502 on.completed(330) 503 }); 504 505 WHEN("the first observable is selected to produce ints"){ 506 507 auto res = w.start( 508 [&]() { 509 return ys1 510 .amb(ys2, ys3) 511 // forget type to workaround lambda deduction bug on msvc 2013 512 .as_dynamic(); 513 } 514 ); 515 516 THEN("the output contains ints from the first observable"){ 517 auto required = rxu::to_vector({ 518 on.next(210, 101), 519 on.next(310, 102), 520 on.next(410, 103), 521 on.completed(510) 522 }); 523 auto actual = res.get_observer().messages(); 524 REQUIRE(required == actual); 525 } 526 527 THEN("there was one subscription and one unsubscription to the ys1"){ 528 auto required = rxu::to_vector({ 529 on.subscribe(200, 510) 530 }); 531 auto actual = ys1.subscriptions(); 532 REQUIRE(required == actual); 533 } 534 535 THEN("there was one subscription and one unsubscription to the ys2"){ 536 auto required = rxu::to_vector({ 537 on.subscribe(200, 210) 538 }); 539 auto actual = ys2.subscriptions(); 540 REQUIRE(required == actual); 541 } 542 543 THEN("there was one subscription and one unsubscription to the ys3"){ 544 auto required = rxu::to_vector({ 545 on.subscribe(200, 210) 546 }); 547 auto actual = ys3.subscriptions(); 548 REQUIRE(required == actual); 549 } 550 } 551 } 552 } 553 554 SCENARIO("variadic amb never empty, custom coordination", "[amb][join][operators]"){ 555 GIVEN("2 hot observables of ints."){ 556 auto sc = rxsc::make_test(); 557 auto so = rx::synchronize_in_one_worker(sc); 558 auto w = sc.create_worker(); 559 const rxsc::test::messages<int> on; 560 561 auto ys1 = sc.make_hot_observable({ 562 on.next(100, 1) 563 }); 564 565 auto ys2 = sc.make_hot_observable({ 566 on.next(110, 2), 567 on.completed(400) 568 }); 569 570 WHEN("the first observable is selected to produce ints"){ 571 572 auto res = w.start( 573 [&]() { 574 return ys1 575 .amb(so, ys2) 576 // forget type to workaround lambda deduction bug on msvc 2013 577 .as_dynamic(); 578 } 579 ); 580 581 THEN("the output contains only complete message"){ 582 auto required = rxu::to_vector({ 583 on.completed(401) 584 }); 585 auto actual = res.get_observer().messages(); 586 REQUIRE(required == actual); 587 } 588 } 589 } 590 } 591