1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP) 6 #define RXCPP_RX_SCHEDULER_TEST_HPP 7 8 #include "../rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace schedulers { 13 14 namespace detail { 15 16 class test_type : public scheduler_interface 17 { 18 public: 19 20 typedef scheduler_interface::clock_type clock_type; 21 22 struct test_type_state : public virtual_time<long, long> 23 { 24 typedef virtual_time<long, long> base; 25 26 using base::schedule_absolute; 27 using base::schedule_relative; 28 29 clock_type::time_point now() const { 30 return to_time_point(clock_now); 31 } 32 33 virtual void schedule_absolute(long when, const schedulable& a) const 34 { 35 if (when <= base::clock_now) 36 when = base::clock_now + 1; 37 38 return base::schedule_absolute(when, a); 39 } 40 41 virtual long add(long absolute, long relative) const 42 { 43 return absolute + relative; 44 } 45 46 virtual clock_type::time_point to_time_point(long absolute) const 47 { 48 return clock_type::time_point(std::chrono::milliseconds(absolute)); 49 } 50 51 virtual long to_relative(clock_type::duration d) const 52 { 53 return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).count()); 54 } 55 }; 56 57 private: 58 mutable std::shared_ptr<test_type_state> state; 59 60 public: 61 struct test_type_worker : public worker_interface 62 { 63 mutable std::shared_ptr<test_type_state> state; 64 65 typedef test_type_state::absolute absolute; 66 typedef test_type_state::relative relative; 67 68 test_type_worker(std::shared_ptr<test_type_state> st) 69 : state(std::move(st)) 70 { 71 } 72 73 virtual clock_type::time_point now() const { 74 return state->now(); 75 } 76 77 virtual void schedule(const schedulable& scbl) const { 78 state->schedule_absolute(state->clock(), scbl); 79 } 80 81 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { 82 state->schedule_relative(state->to_relative(when - now()), scbl); 83 } 84 85 void schedule_absolute(absolute when, const schedulable& scbl) const { 86 state->schedule_absolute(when, scbl); 87 } 88 89 void schedule_relative(relative when, const schedulable& scbl) const { 90 state->schedule_relative(when, scbl); 91 } 92 93 bool is_enabled() const {return state->is_enabled();} 94 absolute clock() const {return state->clock();} 95 96 void start() const 97 { 98 state->start(); 99 } 100 101 void stop() const 102 { 103 state->stop(); 104 } 105 106 void advance_to(absolute time) const 107 { 108 state->advance_to(time); 109 } 110 111 void advance_by(relative time) const 112 { 113 state->advance_by(time); 114 } 115 116 void sleep(relative time) const 117 { 118 state->sleep(time); 119 } 120 121 template<class T> 122 subscriber<T, rxt::testable_observer<T>> make_subscriber() const; 123 }; 124 125 public: 126 test_type() 127 : state(std::make_shared<test_type_state>()) 128 { 129 } 130 131 virtual clock_type::time_point now() const { 132 return state->now(); 133 } 134 135 virtual worker create_worker(composite_subscription cs) const { 136 return worker(cs, std::make_shared<test_type_worker>(state)); 137 } 138 139 bool is_enabled() const {return state->is_enabled();} 140 long clock() { 141 return state->clock(); 142 } 143 144 clock_type::time_point to_time_point(long absolute) const { 145 return state->to_time_point(absolute); 146 } 147 148 std::shared_ptr<test_type_worker> create_test_type_worker_interface() const { 149 return std::make_shared<test_type_worker>(state); 150 } 151 152 template<class T> 153 rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const; 154 155 template<class T> 156 rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const; 157 }; 158 159 template<class T> 160 class mock_observer 161 : public rxt::detail::test_subject_base<T> 162 { 163 typedef typename rxn::notification<T> notification_type; 164 typedef rxn::recorded<typename notification_type::type> recorded_type; 165 166 public: 167 explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc) 168 : sc(sc) 169 { 170 } 171 172 std::shared_ptr<test_type::test_type_state> sc; 173 std::vector<recorded_type> m; 174 175 virtual void on_subscribe(subscriber<T>) const { 176 std::terminate(); 177 } 178 virtual std::vector<rxn::subscription> subscriptions() const { 179 std::terminate(); 180 } 181 182 virtual std::vector<recorded_type> messages() const { 183 return m; 184 } 185 }; 186 187 template<class T> 188 subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subscriber() const 189 { 190 typedef typename rxn::notification<T> notification_type; 191 typedef rxn::recorded<typename notification_type::type> recorded_type; 192 193 auto ts = std::make_shared<mock_observer<T>>(state); 194 195 return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>( 196 // on_next 197 [ts](T value) 198 { 199 ts->m.push_back( 200 recorded_type(ts->sc->clock(), notification_type::on_next(value))); 201 }, 202 // on_error 203 [ts](rxu::error_ptr e) 204 { 205 ts->m.push_back( 206 recorded_type(ts->sc->clock(), notification_type::on_error(e))); 207 }, 208 // on_completed 209 [ts]() 210 { 211 ts->m.push_back( 212 recorded_type(ts->sc->clock(), notification_type::on_completed())); 213 }))); 214 } 215 216 template<class T> 217 class cold_observable 218 : public rxt::detail::test_subject_base<T> 219 { 220 typedef cold_observable<T> this_type; 221 std::shared_ptr<test_type::test_type_state> sc; 222 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type; 223 mutable std::vector<recorded_type> mv; 224 mutable std::vector<rxn::subscription> sv; 225 mutable worker controller; 226 227 public: 228 229 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv) 230 : sc(sc) 231 , mv(std::move(mv)) 232 , controller(w) 233 { 234 } 235 236 template<class Iterator> 237 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end) 238 : sc(sc) 239 , mv(begin, end) 240 , controller(w) 241 { 242 } 243 244 virtual void on_subscribe(subscriber<T> o) const { 245 sv.push_back(rxn::subscription(sc->clock())); 246 auto index = sv.size() - 1; 247 248 for (auto& message : mv) { 249 auto n = message.value(); 250 sc->schedule_relative(message.time(), make_schedulable( 251 controller, 252 [n, o](const schedulable&) { 253 if (o.is_subscribed()) { 254 n->accept(o); 255 } 256 })); 257 } 258 259 auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this()); 260 o.add([sharedThis, index]() { 261 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock()); 262 }); 263 } 264 265 virtual std::vector<rxn::subscription> subscriptions() const { 266 return sv; 267 } 268 269 virtual std::vector<recorded_type> messages() const { 270 return mv; 271 } 272 }; 273 274 template<class T> 275 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const 276 { 277 auto co = std::make_shared<cold_observable<T>>(state, create_worker(composite_subscription()), std::move(messages)); 278 return rxt::testable_observable<T>(co); 279 } 280 281 template<class T> 282 class hot_observable 283 : public rxt::detail::test_subject_base<T> 284 { 285 typedef hot_observable<T> this_type; 286 std::shared_ptr<test_type::test_type_state> sc; 287 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type; 288 typedef subscriber<T> observer_type; 289 mutable std::vector<recorded_type> mv; 290 mutable std::vector<rxn::subscription> sv; 291 mutable std::list<observer_type> observers; 292 mutable worker controller; 293 294 public: 295 296 hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv) 297 : sc(sc) 298 , mv(mv) 299 , controller(w) 300 { 301 for (auto& message : mv) { 302 auto n = message.value(); 303 sc->schedule_absolute(message.time(), make_schedulable( 304 controller, 305 [this, n](const schedulable&) { 306 auto local = this->observers; 307 for (auto& o : local) { 308 if (o.is_subscribed()) { 309 n->accept(o); 310 } 311 } 312 })); 313 } 314 } 315 316 virtual ~hot_observable() {} 317 318 virtual void on_subscribe(observer_type o) const { 319 auto olocation = observers.insert(observers.end(), o); 320 321 sv.push_back(rxn::subscription(sc->clock())); 322 auto index = sv.size() - 1; 323 324 auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this()); 325 o.add([sharedThis, index, olocation]() { 326 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock()); 327 sharedThis->observers.erase(olocation); 328 }); 329 } 330 331 virtual std::vector<rxn::subscription> subscriptions() const { 332 return sv; 333 } 334 335 virtual std::vector<recorded_type> messages() const { 336 return mv; 337 } 338 }; 339 340 template<class T> 341 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const 342 { 343 auto worker = create_worker(composite_subscription()); 344 auto shared = std::make_shared<hot_observable<T>>(state, worker, std::move(messages)); 345 return rxt::testable_observable<T>(shared); 346 } 347 348 template<class F> 349 struct is_create_source_function 350 { 351 struct not_void {}; 352 template<class CF> 353 static auto check(int) -> decltype((*(CF*)nullptr)()); 354 template<class CF> 355 static not_void check(...); 356 357 static const bool value = is_observable<decltype(check<rxu::decay_t<F>>(0))>::value; 358 }; 359 360 } 361 362 class test : public scheduler 363 { 364 std::shared_ptr<detail::test_type> tester; 365 public: 366 367 explicit test(std::shared_ptr<detail::test_type> t) 368 : scheduler(std::static_pointer_cast<scheduler_interface>(t)) 369 , tester(t) 370 { 371 } 372 373 typedef detail::test_type::clock_type clock_type; 374 375 static const long created_time = 100; 376 static const long subscribed_time = 200; 377 static const long unsubscribed_time = 1000; 378 379 template<class T> 380 struct messages 381 { 382 typedef typename rxn::notification<T> notification_type; 383 typedef rxn::recorded<typename notification_type::type> recorded_type; 384 typedef rxn::subscription subscription_type; 385 386 messages() {} 387 388 template<typename U> 389 static recorded_type next(long ticks, U value) { 390 return recorded_type(ticks, notification_type::on_next(std::move(value))); 391 } 392 393 static recorded_type completed(long ticks) { 394 return recorded_type(ticks, notification_type::on_completed()); 395 } 396 397 template<typename Exception> 398 static recorded_type error(long ticks, Exception&& e) { 399 return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e))); 400 } 401 402 static rxn::subscription subscribe(long subscribe, long unsubscribe) { 403 return rxn::subscription(subscribe, unsubscribe); 404 } 405 }; 406 407 class test_worker : public worker 408 { 409 std::shared_ptr<detail::test_type::test_type_worker> tester; 410 public: 411 412 ~test_worker() { 413 } 414 415 explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t) 416 : worker(cs, std::static_pointer_cast<worker_interface>(t)) 417 , tester(t) 418 { 419 } 420 421 bool is_enabled() const {return tester->is_enabled();} 422 long clock() const {return tester->clock();} 423 424 void schedule_absolute(long when, const schedulable& a) const { 425 tester->schedule_absolute(when, a); 426 } 427 428 void schedule_relative(long when, const schedulable& a) const { 429 tester->schedule_relative(when, a); 430 } 431 432 template<class Arg0, class... ArgN> 433 auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const 434 -> typename std::enable_if< 435 (detail::is_action_function<Arg0>::value || 436 is_subscription<Arg0>::value) && 437 !is_schedulable<Arg0>::value>::type { 438 tester->schedule_absolute(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...)); 439 } 440 441 template<class Arg0, class... ArgN> 442 auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const 443 -> typename std::enable_if< 444 (detail::is_action_function<Arg0>::value || 445 is_subscription<Arg0>::value) && 446 !is_schedulable<Arg0>::value>::type { 447 tester->schedule_relative(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...)); 448 } 449 450 void advance_to(long time) const 451 { 452 tester->advance_to(time); 453 } 454 455 void advance_by(long time) const 456 { 457 tester->advance_by(time); 458 } 459 460 void sleep(long time) const 461 { 462 tester->sleep(time); 463 } 464 465 template<class T, class F> 466 auto start(F createSource, long created, long subscribed, long unsubscribed) const 467 -> subscriber<T, rxt::testable_observer<T>> 468 { 469 struct state_type 470 : public std::enable_shared_from_this<state_type> 471 { 472 typedef decltype(createSource()) source_type; 473 474 std::unique_ptr<source_type> source; 475 subscriber<T, rxt::testable_observer<T>> o; 476 477 explicit state_type(subscriber<T, rxt::testable_observer<T>> o) 478 : source() 479 , o(o) 480 { 481 } 482 }; 483 auto state = std::make_shared<state_type>(this->make_subscriber<T>()); 484 485 schedule_absolute(created, [createSource, state](const schedulable&) { 486 state->source.reset(new typename state_type::source_type(createSource())); 487 }); 488 schedule_absolute(subscribed, [state](const schedulable&) { 489 state->source->subscribe(state->o); 490 }); 491 schedule_absolute(unsubscribed, [state](const schedulable&) { 492 state->o.unsubscribe(); 493 }); 494 495 tester->start(); 496 497 return state->o; 498 } 499 500 template<class T, class F> 501 auto start(F&& createSource, long unsubscribed) const 502 -> subscriber<T, rxt::testable_observer<T>> 503 { 504 return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed); 505 } 506 507 template<class T, class F> 508 auto start(F&& createSource) const 509 -> subscriber<T, rxt::testable_observer<T>> 510 { 511 return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time); 512 } 513 514 template<class F> 515 struct start_traits 516 { 517 typedef decltype((*(F*)nullptr)()) source_type; 518 typedef typename source_type::value_type value_type; 519 typedef subscriber<value_type, rxt::testable_observer<value_type>> subscriber_type; 520 }; 521 522 template<class F> 523 auto start(F createSource, long created, long subscribed, long unsubscribed) const 524 -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type 525 { 526 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created, subscribed, unsubscribed); 527 } 528 529 template<class F> 530 auto start(F createSource, long unsubscribed) const 531 -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type 532 { 533 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed); 534 } 535 536 template<class F> 537 auto start(F createSource) const 538 -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type 539 { 540 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed_time); 541 } 542 543 void start() const { 544 tester->start(); 545 } 546 547 template<class T> 548 subscriber<T, rxt::testable_observer<T>> make_subscriber() const { 549 return tester->make_subscriber<T>(); 550 } 551 }; 552 553 clock_type::time_point now() const { 554 return tester->now(); 555 } 556 557 test_worker create_worker(composite_subscription cs = composite_subscription()) const { 558 return test_worker(cs, tester->create_test_type_worker_interface()); 559 } 560 561 bool is_enabled() const {return tester->is_enabled();} 562 long clock() const {return tester->clock();} 563 564 clock_type::time_point to_time_point(long absolute) const { 565 return tester->to_time_point(absolute); 566 } 567 568 template<class T> 569 rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const{ 570 return tester->make_hot_observable(std::move(messages)); 571 } 572 573 template<class T, std::size_t size> 574 auto make_hot_observable(const T (&arr) [size]) const 575 -> decltype(tester->make_hot_observable(std::vector<T>())) { 576 return tester->make_hot_observable(rxu::to_vector(arr)); 577 } 578 579 template<class T> 580 auto make_hot_observable(std::initializer_list<T> il) const 581 -> decltype(tester->make_hot_observable(std::vector<T>())) { 582 return tester->make_hot_observable(std::vector<T>(il)); 583 } 584 585 template<class T> 586 rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const { 587 return tester->make_cold_observable(std::move(messages)); 588 } 589 590 template<class T, std::size_t size> 591 auto make_cold_observable(const T (&arr) [size]) const 592 -> decltype(tester->make_cold_observable(std::vector<T>())) { 593 return tester->make_cold_observable(rxu::to_vector(arr)); 594 } 595 596 template<class T> 597 auto make_cold_observable(std::initializer_list<T> il) const 598 -> decltype(tester->make_cold_observable(std::vector<T>())) { 599 return tester->make_cold_observable(std::vector<T>(il)); 600 } 601 }; 602 603 604 inline test make_test() { 605 return test(std::make_shared<detail::test_type>()); 606 } 607 608 } 609 610 inline identity_one_worker identity_test() { 611 static identity_one_worker r(rxsc::make_test()); 612 return r; 613 } 614 615 } 616 617 #endif 618