Home | History | Annotate | Download | only in rxcpp
      1 #pragma once
      2 
      3 #if !defined(RXCPP_RX_TEST_HPP)
      4 #define RXCPP_RX_TEST_HPP
      5 
      6 #include "rx-includes.hpp"
      7 
      8 namespace rxcpp {
      9 
     10 namespace test {
     11 
     12 namespace detail {
     13 
     14 template<class T>
     15 struct test_subject_base
     16     : public std::enable_shared_from_this<test_subject_base<T>>
     17 {
     18     typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
     19     typedef std::shared_ptr<test_subject_base<T>> type;
     20 
     21     virtual ~test_subject_base() {}
     22     virtual void on_subscribe(subscriber<T>) const =0;
     23     virtual std::vector<recorded_type> messages() const =0;
     24     virtual std::vector<rxn::subscription> subscriptions() const =0;
     25 };
     26 
     27 template<class T>
     28 struct test_source
     29     : public rxs::source_base<T>
     30 {
     31     explicit test_source(typename test_subject_base<T>::type ts)
     32         : ts(std::move(ts))
     33     {
     34         if (!this->ts) std::terminate();
     35     }
     36     typename test_subject_base<T>::type ts;
     37     void on_subscribe(subscriber<T> o) const {
     38         ts->on_subscribe(std::move(o));
     39     }
     40     template<class Subscriber>
     41     typename std::enable_if<!std::is_same<Subscriber, subscriber<T>>::value, void>::type
     42     on_subscribe(Subscriber o) const {
     43 
     44         static_assert(is_subscriber<Subscriber>::value, "on_subscribe must be passed a subscriber.");
     45 
     46         ts->on_subscribe(o.as_dynamic());
     47     }
     48 };
     49 
     50 }
     51 
     52 template<class T>
     53 class testable_observer
     54     : public observer<T>
     55 {
     56     typedef observer<T> observer_base;
     57     typedef typename detail::test_subject_base<T>::type test_subject;
     58     test_subject ts;
     59 
     60 public:
     61     typedef typename detail::test_subject_base<T>::recorded_type recorded_type;
     62 
     63     testable_observer(test_subject ts, observer_base ob)
     64         : observer_base(std::move(ob))
     65         , ts(std::move(ts))
     66     {
     67     }
     68 
     69     std::vector<recorded_type> messages() const {
     70         return ts->messages();
     71     }
     72 };
     73 
     74 //struct tag_test_observable : public tag_observable {};
     75 
     76 /*!
     77     \brief a source of values that records the time of each subscription/unsubscription and all the values and the time they were emitted.
     78 
     79     \ingroup group-observable
     80 
     81 */
     82 template<class T>
     83 class testable_observable
     84     : public observable<T, typename detail::test_source<T>>
     85 {
     86     typedef observable<T, typename detail::test_source<T>> observable_base;
     87     typedef typename detail::test_subject_base<T>::type test_subject;
     88     test_subject ts;
     89 
     90     //typedef tag_test_observable observable_tag;
     91 
     92 public:
     93     typedef typename detail::test_subject_base<T>::recorded_type recorded_type;
     94 
     95     explicit testable_observable(test_subject ts)
     96         : observable_base(detail::test_source<T>(ts))
     97         , ts(ts)
     98     {
     99     }
    100 
    101     std::vector<rxn::subscription> subscriptions() const {
    102         return ts->subscriptions();
    103     }
    104 
    105     std::vector<recorded_type> messages() const {
    106         return ts->messages();
    107     }
    108 };
    109 
    110 }
    111 namespace rxt=test;
    112 
    113 }
    114 
    115 //
    116 // support range() >> filter() >> subscribe() syntax
    117 // '>>' is spelled 'stream'
    118 //
    119 template<class T, class OperatorFactory>
    120 auto operator >> (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of)
    121     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
    122     return      source.op(std::forward<OperatorFactory>(of));
    123 }
    124 
    125 //
    126 // support range() | filter() | subscribe() syntax
    127 // '|' is spelled 'pipe'
    128 //
    129 template<class T, class OperatorFactory>
    130 auto operator | (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of)
    131     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
    132     return      source.op(std::forward<OperatorFactory>(of));
    133 }
    134 
    135 #include "schedulers/rx-test.hpp"
    136 
    137 #endif
    138