1 #include "../test.h" 2 #include <rxcpp/operators/rx-distinct_until_changed.hpp> 3 4 SCENARIO("distinct_until_changed - never", "[distinct_until_changed][operators]"){ 5 GIVEN("a source"){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 10 auto xs = sc.make_hot_observable({ 11 on.next(150, 1) 12 }); 13 14 WHEN("distinct values are taken"){ 15 16 auto res = w.start( 17 [xs]() { 18 return xs | rxo::distinct_until_changed(); 19 } 20 ); 21 22 THEN("the output is empty"){ 23 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 24 auto actual = res.get_observer().messages(); 25 REQUIRE(required == actual); 26 } 27 28 THEN("there was 1 subscription/unsubscription to the source"){ 29 auto required = rxu::to_vector({ 30 on.subscribe(200, 1000) 31 }); 32 auto actual = xs.subscriptions(); 33 REQUIRE(required == actual); 34 } 35 } 36 } 37 } 38 39 SCENARIO("distinct_until_changed - empty", "[distinct_until_changed][operators]"){ 40 GIVEN("a source"){ 41 auto sc = rxsc::make_test(); 42 auto w = sc.create_worker(); 43 const rxsc::test::messages<int> on; 44 45 auto xs = sc.make_hot_observable({ 46 on.next(150, 1), 47 on.completed(250) 48 }); 49 50 WHEN("distinct values are taken"){ 51 52 auto res = w.start( 53 [xs]() { 54 return xs.distinct_until_changed(); 55 } 56 ); 57 58 THEN("the output only contains complete message"){ 59 auto required = rxu::to_vector({ 60 on.completed(250) 61 }); 62 auto actual = res.get_observer().messages(); 63 REQUIRE(required == actual); 64 } 65 66 THEN("there was 1 subscription/unsubscription to the source"){ 67 auto required = rxu::to_vector({ 68 on.subscribe(200, 250) 69 }); 70 auto actual = xs.subscriptions(); 71 REQUIRE(required == actual); 72 } 73 74 } 75 } 76 } 77 78 SCENARIO("distinct_until_changed - return", "[distinct_until_changed][operators]"){ 79 GIVEN("a source"){ 80 auto sc = rxsc::make_test(); 81 auto w = sc.create_worker(); 82 const rxsc::test::messages<int> on; 83 84 auto xs = sc.make_hot_observable({ 85 on.next(150, 1), 86 on.next(210, 2), 87 on.completed(250) 88 }); 89 90 WHEN("distinct values are taken"){ 91 92 auto res = w.start( 93 [xs]() { 94 return xs.distinct_until_changed(); 95 } 96 ); 97 98 THEN("the output only contains distinct items sent while subscribed"){ 99 auto required = rxu::to_vector({ 100 on.next(210, 2), 101 on.completed(250) 102 }); 103 auto actual = res.get_observer().messages(); 104 REQUIRE(required == actual); 105 } 106 107 THEN("there was 1 subscription/unsubscription to the source"){ 108 auto required = rxu::to_vector({ 109 on.subscribe(200, 250) 110 }); 111 auto actual = xs.subscriptions(); 112 REQUIRE(required == actual); 113 } 114 115 } 116 } 117 } 118 119 SCENARIO("distinct_until_changed - throw", "[distinct_until_changed][operators]"){ 120 GIVEN("a source"){ 121 auto sc = rxsc::make_test(); 122 auto w = sc.create_worker(); 123 const rxsc::test::messages<int> on; 124 125 std::runtime_error ex("distinct_until_changed on_error from source"); 126 127 auto xs = sc.make_hot_observable({ 128 on.next(150, 1), 129 on.error(250, ex) 130 }); 131 132 WHEN("distinct values are taken"){ 133 134 auto res = w.start( 135 [xs]() { 136 return xs.distinct_until_changed(); 137 } 138 ); 139 140 THEN("the output only contains only error"){ 141 auto required = rxu::to_vector({ 142 on.error(250, ex) 143 }); 144 auto actual = res.get_observer().messages(); 145 REQUIRE(required == actual); 146 } 147 148 THEN("there was 1 subscription/unsubscription to the source"){ 149 auto required = rxu::to_vector({ 150 on.subscribe(200, 250) 151 }); 152 auto actual = xs.subscriptions(); 153 REQUIRE(required == actual); 154 } 155 156 } 157 } 158 } 159 160 SCENARIO("distinct_until_changed - all changes", "[distinct_until_changed][operators]"){ 161 GIVEN("a source"){ 162 auto sc = rxsc::make_test(); 163 auto w = sc.create_worker(); 164 const rxsc::test::messages<int> on; 165 166 auto xs = sc.make_hot_observable({ 167 on.next(150, 1), 168 on.next(210, 2), 169 on.next(220, 3), 170 on.next(230, 4), 171 on.next(240, 5), 172 on.completed(250) 173 }); 174 175 WHEN("distinct values are taken"){ 176 177 auto res = w.start( 178 [xs]() { 179 return xs.distinct_until_changed(); 180 } 181 ); 182 183 THEN("the output only contains distinct items sent while subscribed"){ 184 auto required = rxu::to_vector({ 185 on.next(210, 2), 186 on.next(220, 3), 187 on.next(230, 4), 188 on.next(240, 5), 189 on.completed(250) 190 }); 191 auto actual = res.get_observer().messages(); 192 REQUIRE(required == actual); 193 } 194 195 THEN("there was 1 subscription/unsubscription to the source"){ 196 auto required = rxu::to_vector({ 197 on.subscribe(200, 250) 198 }); 199 auto actual = xs.subscriptions(); 200 REQUIRE(required == actual); 201 } 202 203 } 204 } 205 } 206 207 SCENARIO("distinct_until_changed - all same", "[distinct_until_changed][operators]"){ 208 GIVEN("a source"){ 209 auto sc = rxsc::make_test(); 210 auto w = sc.create_worker(); 211 const rxsc::test::messages<int> on; 212 213 auto xs = sc.make_hot_observable({ 214 on.next(150, 1), 215 on.next(210, 2), 216 on.next(220, 2), 217 on.next(230, 2), 218 on.next(240, 2), 219 on.completed(250) 220 }); 221 222 WHEN("distinct values are taken"){ 223 224 auto res = w.start( 225 [xs]() { 226 return xs.distinct_until_changed(); 227 } 228 ); 229 230 THEN("the output only contains distinct items sent while subscribed"){ 231 auto required = rxu::to_vector({ 232 on.next(210, 2), 233 on.completed(250) 234 }); 235 auto actual = res.get_observer().messages(); 236 REQUIRE(required == actual); 237 } 238 239 THEN("there was 1 subscription/unsubscription to the source"){ 240 auto required = rxu::to_vector({ 241 on.subscribe(200, 250) 242 }); 243 auto actual = xs.subscriptions(); 244 REQUIRE(required == actual); 245 } 246 247 } 248 } 249 } 250 251 SCENARIO("distinct_until_changed - some changes", "[distinct_until_changed][operators]"){ 252 GIVEN("a source"){ 253 auto sc = rxsc::make_test(); 254 auto w = sc.create_worker(); 255 const rxsc::test::messages<int> on; 256 257 auto xs = sc.make_hot_observable({ 258 on.next(150, 1), 259 on.next(210, 2), //* 260 on.next(215, 3), //* 261 on.next(220, 3), 262 on.next(225, 2), //* 263 on.next(230, 2), 264 on.next(230, 1), //* 265 on.next(240, 2), //* 266 on.completed(250) 267 }); 268 269 WHEN("distinct values are taken"){ 270 271 auto res = w.start( 272 [xs]() { 273 return xs.distinct_until_changed(); 274 } 275 ); 276 277 THEN("the output only contains distinct items sent while subscribed"){ 278 auto required = rxu::to_vector({ 279 on.next(210, 2), //* 280 on.next(215, 3), //* 281 on.next(225, 2), //* 282 on.next(230, 1), //* 283 on.next(240, 2), //* 284 on.completed(250) 285 }); 286 auto actual = res.get_observer().messages(); 287 REQUIRE(required == actual); 288 } 289 290 THEN("there was 1 subscription/unsubscription to the source"){ 291 auto required = rxu::to_vector({ 292 on.subscribe(200, 250) 293 }); 294 auto actual = xs.subscriptions(); 295 REQUIRE(required == actual); 296 } 297 298 } 299 } 300 } 301 302 struct A { 303 int i; 304 305 bool operator!=(const A& a) const { 306 return i != a.i; 307 } 308 309 bool operator==(const A& a) const { 310 return i == a.i; 311 } 312 }; 313 314 SCENARIO("distinct_until_changed - custom type", "[distinct_until_changed][operators]"){ 315 GIVEN("a source"){ 316 auto sc = rxsc::make_test(); 317 auto w = sc.create_worker(); 318 const rxsc::test::messages<A> on; 319 320 auto xs = sc.make_hot_observable({ 321 on.next(150, A{1}), 322 on.next(210, A{2}), //* 323 on.next(215, A{3}), //* 324 on.next(220, A{3}), 325 on.next(225, A{2}), //* 326 on.next(230, A{2}), 327 on.next(230, A{1}), //* 328 on.next(240, A{2}), //* 329 on.completed(250) 330 }); 331 332 WHEN("distinct values are taken"){ 333 334 auto res = w.start( 335 [xs]() { 336 return xs.distinct_until_changed(); 337 } 338 ); 339 340 THEN("the output only contains distinct items sent while subscribed"){ 341 auto required = rxu::to_vector({ 342 on.next(210, A{2}), //* 343 on.next(215, A{3}), //* 344 on.next(225, A{2}), //* 345 on.next(230, A{1}), //* 346 on.next(240, A{2}), //* 347 on.completed(250) 348 }); 349 auto actual = res.get_observer().messages(); 350 REQUIRE(required == actual); 351 } 352 353 THEN("there was 1 subscription/unsubscription to the source"){ 354 auto required = rxu::to_vector({ 355 on.subscribe(200, 250) 356 }); 357 auto actual = xs.subscriptions(); 358 REQUIRE(required == actual); 359 } 360 361 } 362 } 363 } 364 365 SCENARIO("distinct_until_changed - custom predicate", "[distinct_until_changed][operators]"){ 366 GIVEN("a source"){ 367 auto sc = rxsc::make_test(); 368 auto w = sc.create_worker(); 369 const rxsc::test::messages<int> on; 370 371 auto xs = sc.make_hot_observable({ 372 on.next(150, 1), 373 on.next(210, 2), //* 374 on.next(215, 3), //* 375 on.next(220, 3), 376 on.next(225, 2), //* 377 on.next(230, 2), 378 on.next(230, 1), //* 379 on.next(240, 2), //* 380 on.completed(250) 381 }); 382 383 WHEN("distinct values are taken"){ 384 385 auto res = w.start( 386 [xs]() { 387 return xs 388 .distinct_until_changed([](int x, int y) { return x == y; }) 389 // forget type to workaround lambda deduction bug on msvc 2013 390 .as_dynamic(); 391 } 392 ); 393 394 THEN("the output only contains distinct items sent while subscribed"){ 395 auto required = rxu::to_vector({ 396 on.next(210, 2), //* 397 on.next(215, 3), //* 398 on.next(225, 2), //* 399 on.next(230, 1), //* 400 on.next(240, 2), //* 401 on.completed(250) 402 }); 403 auto actual = res.get_observer().messages(); 404 REQUIRE(required == actual); 405 } 406 407 THEN("there was 1 subscription/unsubscription to the source"){ 408 auto required = rxu::to_vector({ 409 on.subscribe(200, 250) 410 }); 411 auto actual = xs.subscriptions(); 412 REQUIRE(required == actual); 413 } 414 415 } 416 } 417 } 418