1 #pragma once 2 3 #if !defined(RXCPP_RX_TEST_HPP) 4 #define RXCPP_RX_TEST_HPP 5 6 #include "rx-includes.hpp" 7 8 namespace rxcpp { 9 10 namespace test { 11 12 namespace detail { 13 14 template<class T> 15 struct test_subject_base 16 : public std::enable_shared_from_this<test_subject_base<T>> 17 { 18 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type; 19 typedef std::shared_ptr<test_subject_base<T>> type; 20 21 virtual ~test_subject_base() {} 22 virtual void on_subscribe(subscriber<T>) const =0; 23 virtual std::vector<recorded_type> messages() const =0; 24 virtual std::vector<rxn::subscription> subscriptions() const =0; 25 }; 26 27 template<class T> 28 struct test_source 29 : public rxs::source_base<T> 30 { 31 explicit test_source(typename test_subject_base<T>::type ts) 32 : ts(std::move(ts)) 33 { 34 if (!this->ts) std::terminate(); 35 } 36 typename test_subject_base<T>::type ts; 37 void on_subscribe(subscriber<T> o) const { 38 ts->on_subscribe(std::move(o)); 39 } 40 template<class Subscriber> 41 typename std::enable_if<!std::is_same<Subscriber, subscriber<T>>::value, void>::type 42 on_subscribe(Subscriber o) const { 43 44 static_assert(is_subscriber<Subscriber>::value, "on_subscribe must be passed a subscriber."); 45 46 ts->on_subscribe(o.as_dynamic()); 47 } 48 }; 49 50 } 51 52 template<class T> 53 class testable_observer 54 : public observer<T> 55 { 56 typedef observer<T> observer_base; 57 typedef typename detail::test_subject_base<T>::type test_subject; 58 test_subject ts; 59 60 public: 61 typedef typename detail::test_subject_base<T>::recorded_type recorded_type; 62 63 testable_observer(test_subject ts, observer_base ob) 64 : observer_base(std::move(ob)) 65 , ts(std::move(ts)) 66 { 67 } 68 69 std::vector<recorded_type> messages() const { 70 return ts->messages(); 71 } 72 }; 73 74 //struct tag_test_observable : public tag_observable {}; 75 76 /*! 77 \brief a source of values that records the time of each subscription/unsubscription and all the values and the time they were emitted. 78 79 \ingroup group-observable 80 81 */ 82 template<class T> 83 class testable_observable 84 : public observable<T, typename detail::test_source<T>> 85 { 86 typedef observable<T, typename detail::test_source<T>> observable_base; 87 typedef typename detail::test_subject_base<T>::type test_subject; 88 test_subject ts; 89 90 //typedef tag_test_observable observable_tag; 91 92 public: 93 typedef typename detail::test_subject_base<T>::recorded_type recorded_type; 94 95 explicit testable_observable(test_subject ts) 96 : observable_base(detail::test_source<T>(ts)) 97 , ts(ts) 98 { 99 } 100 101 std::vector<rxn::subscription> subscriptions() const { 102 return ts->subscriptions(); 103 } 104 105 std::vector<recorded_type> messages() const { 106 return ts->messages(); 107 } 108 }; 109 110 } 111 namespace rxt=test; 112 113 } 114 115 // 116 // support range() >> filter() >> subscribe() syntax 117 // '>>' is spelled 'stream' 118 // 119 template<class T, class OperatorFactory> 120 auto operator >> (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of) 121 -> decltype(source.op(std::forward<OperatorFactory>(of))) { 122 return source.op(std::forward<OperatorFactory>(of)); 123 } 124 125 // 126 // support range() | filter() | subscribe() syntax 127 // '|' is spelled 'pipe' 128 // 129 template<class T, class OperatorFactory> 130 auto operator | (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of) 131 -> decltype(source.op(std::forward<OperatorFactory>(of))) { 132 return source.op(std::forward<OperatorFactory>(of)); 133 } 134 135 #include "schedulers/rx-test.hpp" 136 137 #endif 138