1 #include "../test.h" 2 #include <rxcpp/operators/rx-switch_on_next.hpp> 3 4 SCENARIO("switch_on_next - some changes", "[switch_on_next][operators]"){ 5 GIVEN("a source"){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 const rxsc::test::messages<rx::observable<int>> o_on; 10 11 auto ys1 = sc.make_cold_observable({ 12 on.next(10, 101), 13 on.next(20, 102), 14 on.next(110, 103), 15 on.next(120, 104), 16 on.next(210, 105), 17 on.next(220, 106), 18 on.completed(230) 19 }); 20 21 auto ys2 = sc.make_cold_observable({ 22 on.next(10, 201), 23 on.next(20, 202), 24 on.next(30, 203), 25 on.next(40, 204), 26 on.completed(50) 27 }); 28 29 auto ys3 = sc.make_cold_observable({ 30 on.next(10, 301), 31 on.next(20, 302), 32 on.next(30, 303), 33 on.next(40, 304), 34 on.completed(150) 35 }); 36 37 auto xs = sc.make_hot_observable({ 38 o_on.next(300, ys1), 39 o_on.next(400, ys2), 40 o_on.next(500, ys3), 41 o_on.completed(600) 42 }); 43 44 WHEN("distinct values are taken"){ 45 46 auto res = w.start( 47 [xs]() { 48 return xs 49 | rxo::switch_on_next(); 50 } 51 ); 52 53 THEN("the output only contains distinct items sent while subscribed"){ 54 auto required = rxu::to_vector({ 55 on.next(310, 101), 56 on.next(320, 102), 57 on.next(410, 201), 58 on.next(420, 202), 59 on.next(430, 203), 60 on.next(440, 204), 61 on.next(510, 301), 62 on.next(520, 302), 63 on.next(530, 303), 64 on.next(540, 304), 65 on.completed(650) 66 }); 67 auto actual = res.get_observer().messages(); 68 REQUIRE(required == actual); 69 } 70 71 THEN("there was 1 subscription/unsubscription to the source"){ 72 auto required = rxu::to_vector({ 73 o_on.subscribe(200, 600) 74 }); 75 auto actual = xs.subscriptions(); 76 REQUIRE(required == actual); 77 } 78 79 THEN("there was 1 subscription/unsubscription to ys1"){ 80 auto required = rxu::to_vector({ 81 on.subscribe(300, 400) 82 }); 83 auto actual = ys1.subscriptions(); 84 REQUIRE(required == actual); 85 } 86 87 THEN("there was 1 subscription/unsubscription to ys2"){ 88 auto required = rxu::to_vector({ 89 on.subscribe(400, 450) 90 }); 91 auto actual = ys2.subscriptions(); 92 REQUIRE(required == actual); 93 } 94 95 THEN("there was 1 subscription/unsubscription to ys3"){ 96 auto required = rxu::to_vector({ 97 on.subscribe(500, 650) 98 }); 99 auto actual = ys3.subscriptions(); 100 REQUIRE(required == actual); 101 } 102 } 103 } 104 } 105 106 SCENARIO("switch_on_next - inner throws", "[switch_on_next][operators]"){ 107 GIVEN("a source"){ 108 auto sc = rxsc::make_test(); 109 auto w = sc.create_worker(); 110 const rxsc::test::messages<int> on; 111 const rxsc::test::messages<rx::observable<int>> o_on; 112 113 std::runtime_error ex("switch_on_next on_error from source"); 114 115 auto ys1 = sc.make_cold_observable({ 116 on.next(10, 101), 117 on.next(20, 102), 118 on.next(110, 103), 119 on.next(120, 104), 120 on.next(210, 105), 121 on.next(220, 106), 122 on.completed(230) 123 }); 124 125 auto ys2 = sc.make_cold_observable({ 126 on.next(10, 201), 127 on.next(20, 202), 128 on.next(30, 203), 129 on.next(40, 204), 130 on.error(50, ex) 131 }); 132 133 auto ys3 = sc.make_cold_observable({ 134 on.next(10, 301), 135 on.next(20, 302), 136 on.next(30, 303), 137 on.next(40, 304), 138 on.completed(150) 139 }); 140 141 auto xs = sc.make_hot_observable({ 142 o_on.next(300, ys1), 143 o_on.next(400, ys2), 144 o_on.next(500, ys3), 145 o_on.completed(600) 146 }); 147 148 WHEN("distinct values are taken"){ 149 150 auto res = w.start( 151 [xs]() { 152 return xs.switch_on_next(); 153 } 154 ); 155 156 THEN("the output only contains distinct items sent while subscribed"){ 157 auto required = rxu::to_vector({ 158 on.next(310, 101), 159 on.next(320, 102), 160 on.next(410, 201), 161 on.next(420, 202), 162 on.next(430, 203), 163 on.next(440, 204), 164 on.error(450, ex) 165 }); 166 auto actual = res.get_observer().messages(); 167 REQUIRE(required == actual); 168 } 169 170 THEN("there was 1 subscription/unsubscription to the source"){ 171 auto required = rxu::to_vector({ 172 o_on.subscribe(200, 450) 173 }); 174 auto actual = xs.subscriptions(); 175 REQUIRE(required == actual); 176 } 177 178 THEN("there was 1 subscription/unsubscription to ys1"){ 179 auto required = rxu::to_vector({ 180 on.subscribe(300, 400) 181 }); 182 auto actual = ys1.subscriptions(); 183 REQUIRE(required == actual); 184 } 185 186 THEN("there was 1 subscription/unsubscription to ys2"){ 187 auto required = rxu::to_vector({ 188 on.subscribe(400, 450) 189 }); 190 auto actual = ys2.subscriptions(); 191 REQUIRE(required == actual); 192 } 193 194 THEN("there was 1 subscription/unsubscription to ys3"){ 195 auto required = std::vector<rxn::subscription>(); 196 auto actual = ys3.subscriptions(); 197 REQUIRE(required == actual); 198 } 199 } 200 } 201 } 202 203 SCENARIO("switch_on_next - outer throws", "[switch_on_next][operators]"){ 204 GIVEN("a source"){ 205 auto sc = rxsc::make_test(); 206 auto w = sc.create_worker(); 207 const rxsc::test::messages<int> on; 208 const rxsc::test::messages<rx::observable<int>> o_on; 209 210 std::runtime_error ex("switch_on_next on_error from source"); 211 212 auto ys1 = sc.make_cold_observable({ 213 on.next(10, 101), 214 on.next(20, 102), 215 on.next(110, 103), 216 on.next(120, 104), 217 on.next(210, 105), 218 on.next(220, 106), 219 on.completed(230) 220 }); 221 222 auto ys2 = sc.make_cold_observable({ 223 on.next(10, 201), 224 on.next(20, 202), 225 on.next(30, 203), 226 on.next(40, 204), 227 on.completed(50) 228 }); 229 230 auto xs = sc.make_hot_observable({ 231 o_on.next(300, ys1), 232 o_on.next(400, ys2), 233 o_on.error(500, ex) 234 }); 235 236 WHEN("distinct values are taken"){ 237 238 auto res = w.start( 239 [xs]() { 240 return xs.switch_on_next(); 241 } 242 ); 243 244 THEN("the output only contains distinct items sent while subscribed"){ 245 auto required = rxu::to_vector({ 246 on.next(310, 101), 247 on.next(320, 102), 248 on.next(410, 201), 249 on.next(420, 202), 250 on.next(430, 203), 251 on.next(440, 204), 252 on.error(500, ex) 253 }); 254 auto actual = res.get_observer().messages(); 255 REQUIRE(required == actual); 256 } 257 258 THEN("there was 1 subscription/unsubscription to the source"){ 259 auto required = rxu::to_vector({ 260 o_on.subscribe(200, 500) 261 }); 262 auto actual = xs.subscriptions(); 263 REQUIRE(required == actual); 264 } 265 266 THEN("there was 1 subscription/unsubscription to ys1"){ 267 auto required = rxu::to_vector({ 268 on.subscribe(300, 400) 269 }); 270 auto actual = ys1.subscriptions(); 271 REQUIRE(required == actual); 272 } 273 274 THEN("there was 1 subscription/unsubscription to ys2"){ 275 auto required = rxu::to_vector({ 276 on.subscribe(400, 450) 277 }); 278 auto actual = ys2.subscriptions(); 279 REQUIRE(required == actual); 280 } 281 } 282 } 283 } 284 285 SCENARIO("switch_on_next - no inner", "[switch_on_next][operators]"){ 286 GIVEN("a source"){ 287 auto sc = rxsc::make_test(); 288 auto w = sc.create_worker(); 289 const rxsc::test::messages<int> on; 290 const rxsc::test::messages<rx::observable<int>> o_on; 291 292 auto xs = sc.make_hot_observable({ 293 o_on.completed(500) 294 }); 295 296 WHEN("distinct values are taken"){ 297 298 auto res = w.start( 299 [xs]() { 300 return xs.switch_on_next(); 301 } 302 ); 303 304 THEN("the output only contains distinct items sent while subscribed"){ 305 auto required = rxu::to_vector({ 306 on.completed(500) 307 }); 308 auto actual = res.get_observer().messages(); 309 REQUIRE(required == actual); 310 } 311 312 THEN("there was 1 subscription/unsubscription to the source"){ 313 auto required = rxu::to_vector({ 314 o_on.subscribe(200, 500) 315 }); 316 auto actual = xs.subscriptions(); 317 REQUIRE(required == actual); 318 } 319 } 320 } 321 } 322 323 SCENARIO("switch_on_next - inner completes", "[switch_on_next][operators]"){ 324 GIVEN("a source"){ 325 auto sc = rxsc::make_test(); 326 auto w = sc.create_worker(); 327 const rxsc::test::messages<int> on; 328 const rxsc::test::messages<rx::observable<int>> o_on; 329 330 auto ys1 = sc.make_cold_observable({ 331 on.next(10, 101), 332 on.next(20, 102), 333 on.next(110, 103), 334 on.next(120, 104), 335 on.next(210, 105), 336 on.next(220, 106), 337 on.completed(230) 338 }); 339 340 auto xs = sc.make_hot_observable({ 341 o_on.next(300, ys1), 342 o_on.completed(540) 343 }); 344 345 WHEN("distinct values are taken"){ 346 347 auto res = w.start( 348 [xs]() { 349 return xs.switch_on_next(); 350 } 351 ); 352 353 THEN("the output only contains distinct items sent while subscribed"){ 354 auto required = rxu::to_vector({ 355 on.next(310, 101), 356 on.next(320, 102), 357 on.next(410, 103), 358 on.next(420, 104), 359 on.next(510, 105), 360 on.next(520, 106), 361 on.completed(540) 362 }); 363 auto actual = res.get_observer().messages(); 364 REQUIRE(required == actual); 365 } 366 367 THEN("there was 1 subscription/unsubscription to the source"){ 368 auto required = rxu::to_vector({ 369 o_on.subscribe(200, 540) 370 }); 371 auto actual = xs.subscriptions(); 372 REQUIRE(required == actual); 373 } 374 375 THEN("there was 1 subscription/unsubscription to ys1"){ 376 auto required = rxu::to_vector({ 377 on.subscribe(300, 530) 378 }); 379 auto actual = ys1.subscriptions(); 380 REQUIRE(required == actual); 381 } 382 } 383 } 384 } 385