Home | History | Annotate | Download | only in schedulers
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP)
      6 #define RXCPP_RX_SCHEDULER_TEST_HPP
      7 
      8 #include "../rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace schedulers {
     13 
     14 namespace detail {
     15 
     16 class test_type : public scheduler_interface
     17 {
     18 public:
     19 
     20     typedef scheduler_interface::clock_type clock_type;
     21 
     22     struct test_type_state : public virtual_time<long, long>
     23     {
     24         typedef virtual_time<long, long> base;
     25 
     26         using base::schedule_absolute;
     27         using base::schedule_relative;
     28 
     29         clock_type::time_point now() const {
     30             return to_time_point(clock_now);
     31         }
     32 
     33         virtual void schedule_absolute(long when, const schedulable& a) const
     34         {
     35             if (when <= base::clock_now)
     36                 when = base::clock_now + 1;
     37 
     38             return base::schedule_absolute(when, a);
     39         }
     40 
     41         virtual long add(long absolute, long relative) const
     42         {
     43             return absolute + relative;
     44         }
     45 
     46         virtual clock_type::time_point to_time_point(long absolute) const
     47         {
     48             return clock_type::time_point(std::chrono::milliseconds(absolute));
     49         }
     50 
     51         virtual long to_relative(clock_type::duration d) const
     52         {
     53             return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
     54         }
     55     };
     56 
     57 private:
     58     mutable std::shared_ptr<test_type_state> state;
     59 
     60 public:
     61     struct test_type_worker : public worker_interface
     62     {
     63         mutable std::shared_ptr<test_type_state> state;
     64 
     65         typedef test_type_state::absolute absolute;
     66         typedef test_type_state::relative relative;
     67 
     68         test_type_worker(std::shared_ptr<test_type_state> st)
     69             : state(std::move(st))
     70         {
     71         }
     72 
     73         virtual clock_type::time_point now() const {
     74             return state->now();
     75         }
     76 
     77         virtual void schedule(const schedulable& scbl) const {
     78             state->schedule_absolute(state->clock(), scbl);
     79         }
     80 
     81         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
     82             state->schedule_relative(state->to_relative(when - now()), scbl);
     83         }
     84 
     85         void schedule_absolute(absolute when, const schedulable& scbl) const {
     86             state->schedule_absolute(when, scbl);
     87         }
     88 
     89         void schedule_relative(relative when, const schedulable& scbl) const {
     90             state->schedule_relative(when, scbl);
     91         }
     92 
     93         bool is_enabled() const {return state->is_enabled();}
     94         absolute clock() const {return state->clock();}
     95 
     96         void start() const
     97         {
     98             state->start();
     99         }
    100 
    101         void stop() const
    102         {
    103             state->stop();
    104         }
    105 
    106         void advance_to(absolute time) const
    107         {
    108             state->advance_to(time);
    109         }
    110 
    111         void advance_by(relative time) const
    112         {
    113             state->advance_by(time);
    114         }
    115 
    116         void sleep(relative time) const
    117         {
    118             state->sleep(time);
    119         }
    120 
    121         template<class T>
    122         subscriber<T, rxt::testable_observer<T>> make_subscriber() const;
    123     };
    124 
    125 public:
    126     test_type()
    127         : state(std::make_shared<test_type_state>())
    128     {
    129     }
    130 
    131     virtual clock_type::time_point now() const {
    132         return state->now();
    133     }
    134 
    135     virtual worker create_worker(composite_subscription cs) const {
    136         return worker(cs, std::make_shared<test_type_worker>(state));
    137     }
    138 
    139     bool is_enabled() const {return state->is_enabled();}
    140     long clock() {
    141         return state->clock();
    142     }
    143 
    144     clock_type::time_point to_time_point(long absolute) const {
    145         return state->to_time_point(absolute);
    146     }
    147 
    148     std::shared_ptr<test_type_worker> create_test_type_worker_interface() const {
    149         return std::make_shared<test_type_worker>(state);
    150     }
    151 
    152     template<class T>
    153     rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
    154 
    155     template<class T>
    156     rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
    157 };
    158 
    159 template<class T>
    160 class mock_observer
    161     : public rxt::detail::test_subject_base<T>
    162 {
    163     typedef typename rxn::notification<T> notification_type;
    164     typedef rxn::recorded<typename notification_type::type> recorded_type;
    165 
    166 public:
    167     explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
    168         : sc(sc)
    169     {
    170     }
    171 
    172     std::shared_ptr<test_type::test_type_state> sc;
    173     std::vector<recorded_type> m;
    174 
    175     virtual void on_subscribe(subscriber<T>) const {
    176         std::terminate();
    177     }
    178     virtual std::vector<rxn::subscription> subscriptions() const {
    179         std::terminate();
    180     }
    181 
    182     virtual std::vector<recorded_type> messages() const {
    183         return m;
    184     }
    185 };
    186 
    187 template<class T>
    188 subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subscriber() const
    189 {
    190     typedef typename rxn::notification<T> notification_type;
    191     typedef rxn::recorded<typename notification_type::type> recorded_type;
    192 
    193     auto ts = std::make_shared<mock_observer<T>>(state);
    194 
    195     return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
    196           // on_next
    197           [ts](T value)
    198           {
    199               ts->m.push_back(
    200                               recorded_type(ts->sc->clock(), notification_type::on_next(value)));
    201           },
    202           // on_error
    203           [ts](rxu::error_ptr e)
    204           {
    205               ts->m.push_back(
    206                               recorded_type(ts->sc->clock(), notification_type::on_error(e)));
    207           },
    208           // on_completed
    209           [ts]()
    210           {
    211               ts->m.push_back(
    212                               recorded_type(ts->sc->clock(), notification_type::on_completed()));
    213           })));
    214 }
    215 
    216 template<class T>
    217 class cold_observable
    218     : public rxt::detail::test_subject_base<T>
    219 {
    220     typedef cold_observable<T> this_type;
    221     std::shared_ptr<test_type::test_type_state> sc;
    222     typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
    223     mutable std::vector<recorded_type> mv;
    224     mutable std::vector<rxn::subscription> sv;
    225     mutable worker controller;
    226 
    227 public:
    228 
    229     cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
    230         : sc(sc)
    231         , mv(std::move(mv))
    232         , controller(w)
    233     {
    234     }
    235 
    236     template<class Iterator>
    237     cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
    238         : sc(sc)
    239         , mv(begin, end)
    240         , controller(w)
    241     {
    242     }
    243 
    244     virtual void on_subscribe(subscriber<T> o) const {
    245         sv.push_back(rxn::subscription(sc->clock()));
    246         auto index = sv.size() - 1;
    247 
    248         for (auto& message : mv) {
    249             auto n = message.value();
    250             sc->schedule_relative(message.time(), make_schedulable(
    251                 controller,
    252                 [n, o](const schedulable&) {
    253                     if (o.is_subscribed()) {
    254                         n->accept(o);
    255                     }
    256                 }));
    257         }
    258 
    259         auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
    260         o.add([sharedThis, index]() {
    261             sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
    262         });
    263     }
    264 
    265     virtual std::vector<rxn::subscription> subscriptions() const {
    266         return sv;
    267     }
    268 
    269     virtual std::vector<recorded_type> messages() const {
    270         return mv;
    271     }
    272 };
    273 
    274 template<class T>
    275 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
    276 {
    277     auto co = std::make_shared<cold_observable<T>>(state, create_worker(composite_subscription()), std::move(messages));
    278     return rxt::testable_observable<T>(co);
    279 }
    280 
    281 template<class T>
    282 class hot_observable
    283     : public rxt::detail::test_subject_base<T>
    284 {
    285     typedef hot_observable<T> this_type;
    286     std::shared_ptr<test_type::test_type_state> sc;
    287     typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
    288     typedef subscriber<T> observer_type;
    289     mutable std::vector<recorded_type> mv;
    290     mutable std::vector<rxn::subscription> sv;
    291     mutable std::list<observer_type> observers;
    292     mutable worker controller;
    293 
    294 public:
    295 
    296     hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
    297         : sc(sc)
    298         , mv(mv)
    299         , controller(w)
    300     {
    301         for (auto& message : mv) {
    302             auto n = message.value();
    303             sc->schedule_absolute(message.time(), make_schedulable(
    304                 controller,
    305                 [this, n](const schedulable&) {
    306                     auto local = this->observers;
    307                     for (auto& o : local) {
    308                         if (o.is_subscribed()) {
    309                             n->accept(o);
    310                         }
    311                     }
    312                 }));
    313         }
    314     }
    315 
    316     virtual ~hot_observable() {}
    317 
    318     virtual void on_subscribe(observer_type o) const {
    319         auto olocation = observers.insert(observers.end(), o);
    320 
    321         sv.push_back(rxn::subscription(sc->clock()));
    322         auto index = sv.size() - 1;
    323 
    324         auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
    325         o.add([sharedThis, index, olocation]() {
    326             sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
    327             sharedThis->observers.erase(olocation);
    328         });
    329     }
    330 
    331     virtual std::vector<rxn::subscription> subscriptions() const {
    332         return sv;
    333     }
    334 
    335     virtual std::vector<recorded_type> messages() const {
    336         return mv;
    337     }
    338 };
    339 
    340 template<class T>
    341 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
    342 {
    343     auto worker = create_worker(composite_subscription());
    344     auto shared = std::make_shared<hot_observable<T>>(state, worker, std::move(messages));
    345     return rxt::testable_observable<T>(shared);
    346 }
    347 
    348 template<class F>
    349 struct is_create_source_function
    350 {
    351     struct not_void {};
    352     template<class CF>
    353     static auto check(int) -> decltype((*(CF*)nullptr)());
    354     template<class CF>
    355     static not_void check(...);
    356 
    357     static const bool value = is_observable<decltype(check<rxu::decay_t<F>>(0))>::value;
    358 };
    359 
    360 }
    361 
    362 class test : public scheduler
    363 {
    364     std::shared_ptr<detail::test_type> tester;
    365 public:
    366 
    367     explicit test(std::shared_ptr<detail::test_type> t)
    368         : scheduler(std::static_pointer_cast<scheduler_interface>(t))
    369         , tester(t)
    370     {
    371     }
    372 
    373     typedef detail::test_type::clock_type clock_type;
    374 
    375     static const long created_time = 100;
    376     static const long subscribed_time = 200;
    377     static const long unsubscribed_time = 1000;
    378 
    379     template<class T>
    380     struct messages
    381     {
    382         typedef typename rxn::notification<T> notification_type;
    383         typedef rxn::recorded<typename notification_type::type> recorded_type;
    384         typedef rxn::subscription subscription_type;
    385 
    386         messages() {}
    387 
    388         template<typename U>
    389         static recorded_type next(long ticks, U value) {
    390             return recorded_type(ticks, notification_type::on_next(std::move(value)));
    391         }
    392 
    393         static recorded_type completed(long ticks) {
    394             return recorded_type(ticks, notification_type::on_completed());
    395         }
    396 
    397         template<typename Exception>
    398         static recorded_type error(long ticks, Exception&& e) {
    399             return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e)));
    400         }
    401 
    402         static rxn::subscription subscribe(long subscribe, long unsubscribe) {
    403             return rxn::subscription(subscribe, unsubscribe);
    404         }
    405     };
    406 
    407     class test_worker : public worker
    408     {
    409         std::shared_ptr<detail::test_type::test_type_worker> tester;
    410     public:
    411 
    412         ~test_worker() {
    413         }
    414 
    415         explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t)
    416             : worker(cs, std::static_pointer_cast<worker_interface>(t))
    417             , tester(t)
    418         {
    419         }
    420 
    421         bool is_enabled() const {return tester->is_enabled();}
    422         long clock() const {return tester->clock();}
    423 
    424         void schedule_absolute(long when, const schedulable& a) const {
    425             tester->schedule_absolute(when, a);
    426         }
    427 
    428         void schedule_relative(long when, const schedulable& a) const {
    429             tester->schedule_relative(when, a);
    430         }
    431 
    432         template<class Arg0, class... ArgN>
    433         auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const
    434             -> typename std::enable_if<
    435                 (detail::is_action_function<Arg0>::value ||
    436                 is_subscription<Arg0>::value) &&
    437                 !is_schedulable<Arg0>::value>::type {
    438             tester->schedule_absolute(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
    439         }
    440 
    441         template<class Arg0, class... ArgN>
    442         auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const
    443             -> typename std::enable_if<
    444                 (detail::is_action_function<Arg0>::value ||
    445                 is_subscription<Arg0>::value) &&
    446                 !is_schedulable<Arg0>::value>::type {
    447             tester->schedule_relative(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
    448         }
    449 
    450         void advance_to(long time) const
    451         {
    452             tester->advance_to(time);
    453         }
    454 
    455         void advance_by(long time) const
    456         {
    457             tester->advance_by(time);
    458         }
    459 
    460         void sleep(long time) const
    461         {
    462             tester->sleep(time);
    463         }
    464 
    465         template<class T, class F>
    466         auto start(F createSource, long created, long subscribed, long unsubscribed) const
    467             -> subscriber<T, rxt::testable_observer<T>>
    468         {
    469             struct state_type
    470             : public std::enable_shared_from_this<state_type>
    471             {
    472                 typedef decltype(createSource()) source_type;
    473 
    474                 std::unique_ptr<source_type> source;
    475                 subscriber<T, rxt::testable_observer<T>> o;
    476 
    477                 explicit state_type(subscriber<T, rxt::testable_observer<T>> o)
    478                 : source()
    479                 , o(o)
    480                 {
    481                 }
    482             };
    483             auto state = std::make_shared<state_type>(this->make_subscriber<T>());
    484 
    485             schedule_absolute(created, [createSource, state](const schedulable&) {
    486                 state->source.reset(new typename state_type::source_type(createSource()));
    487             });
    488             schedule_absolute(subscribed, [state](const schedulable&) {
    489                 state->source->subscribe(state->o);
    490             });
    491             schedule_absolute(unsubscribed, [state](const schedulable&) {
    492                 state->o.unsubscribe();
    493             });
    494 
    495             tester->start();
    496 
    497             return state->o;
    498         }
    499 
    500         template<class T, class F>
    501         auto start(F&& createSource, long unsubscribed) const
    502             -> subscriber<T, rxt::testable_observer<T>>
    503         {
    504             return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
    505         }
    506 
    507         template<class T, class F>
    508         auto start(F&& createSource) const
    509             -> subscriber<T, rxt::testable_observer<T>>
    510         {
    511             return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
    512         }
    513 
    514         template<class F>
    515         struct start_traits
    516         {
    517             typedef decltype((*(F*)nullptr)()) source_type;
    518             typedef typename source_type::value_type value_type;
    519             typedef subscriber<value_type, rxt::testable_observer<value_type>> subscriber_type;
    520         };
    521 
    522         template<class F>
    523         auto start(F createSource, long created, long subscribed, long unsubscribed) const
    524             -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
    525         {
    526             return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created, subscribed, unsubscribed);
    527         }
    528 
    529         template<class F>
    530         auto start(F createSource, long unsubscribed) const
    531             -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
    532         {
    533             return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed);
    534         }
    535 
    536         template<class F>
    537         auto start(F createSource) const
    538             -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
    539         {
    540             return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed_time);
    541         }
    542 
    543         void start() const {
    544             tester->start();
    545         }
    546 
    547         template<class T>
    548         subscriber<T, rxt::testable_observer<T>> make_subscriber() const {
    549             return tester->make_subscriber<T>();
    550         }
    551     };
    552 
    553     clock_type::time_point now() const {
    554         return tester->now();
    555     }
    556 
    557     test_worker create_worker(composite_subscription cs = composite_subscription()) const {
    558         return test_worker(cs, tester->create_test_type_worker_interface());
    559     }
    560 
    561     bool is_enabled() const {return tester->is_enabled();}
    562     long clock() const {return tester->clock();}
    563 
    564     clock_type::time_point to_time_point(long absolute) const {
    565         return tester->to_time_point(absolute);
    566     }
    567 
    568     template<class T>
    569     rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const{
    570         return tester->make_hot_observable(std::move(messages));
    571     }
    572 
    573     template<class T, std::size_t size>
    574     auto make_hot_observable(const T (&arr) [size]) const
    575         -> decltype(tester->make_hot_observable(std::vector<T>())) {
    576         return      tester->make_hot_observable(rxu::to_vector(arr));
    577     }
    578 
    579     template<class T>
    580     auto make_hot_observable(std::initializer_list<T> il) const
    581         -> decltype(tester->make_hot_observable(std::vector<T>())) {
    582         return      tester->make_hot_observable(std::vector<T>(il));
    583     }
    584 
    585     template<class T>
    586     rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const {
    587         return tester->make_cold_observable(std::move(messages));
    588     }
    589 
    590     template<class T, std::size_t size>
    591     auto make_cold_observable(const T (&arr) [size]) const
    592         -> decltype(tester->make_cold_observable(std::vector<T>())) {
    593         return      tester->make_cold_observable(rxu::to_vector(arr));
    594     }
    595 
    596     template<class T>
    597     auto make_cold_observable(std::initializer_list<T> il) const
    598         -> decltype(tester->make_cold_observable(std::vector<T>())) {
    599         return      tester->make_cold_observable(std::vector<T>(il));
    600     }
    601 };
    602 
    603 
    604 inline test make_test() {
    605     return test(std::make_shared<detail::test_type>());
    606 }
    607 
    608 }
    609 
    610 inline identity_one_worker identity_test() {
    611     static identity_one_worker r(rxsc::make_test());
    612     return r;
    613 }
    614 
    615 }
    616 
    617 #endif
    618