1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP) 6 #define RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP 7 8 #include "rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace detail { 13 14 template<class T> 15 struct has_on_connect 16 { 17 struct not_void {}; 18 template<class CT> 19 static auto check(int) -> decltype((*(CT*)nullptr).on_connect(composite_subscription())); 20 template<class CT> 21 static not_void check(...); 22 23 typedef decltype(check<T>(0)) detail_result; 24 static const bool value = std::is_same<detail_result, void>::value; 25 }; 26 27 } 28 29 template<class T> 30 class dynamic_connectable_observable 31 : public dynamic_observable<T> 32 { 33 struct state_type 34 : public std::enable_shared_from_this<state_type> 35 { 36 typedef std::function<void(composite_subscription)> onconnect_type; 37 38 onconnect_type on_connect; 39 }; 40 std::shared_ptr<state_type> state; 41 42 template<class U> 43 void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) { 44 state = o.state; 45 } 46 47 template<class U> 48 void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) { 49 state = std::move(o.state); 50 } 51 52 template<class SO> 53 void construct(SO&& source, rxs::tag_source&&) { 54 auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source)); 55 state->on_connect = [so](composite_subscription cs) mutable { 56 so->on_connect(std::move(cs)); 57 }; 58 } 59 60 public: 61 62 typedef tag_dynamic_observable dynamic_observable_tag; 63 64 dynamic_connectable_observable() 65 { 66 } 67 68 template<class SOF> 69 explicit dynamic_connectable_observable(SOF sof) 70 : dynamic_observable<T>(sof) 71 , state(std::make_shared<state_type>()) 72 { 73 construct(std::move(sof), 74 typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type()); 75 } 76 77 template<class SF, class CF> 78 dynamic_connectable_observable(SF&& sf, CF&& cf) 79 : dynamic_observable<T>(std::forward<SF>(sf)) 80 , state(std::make_shared<state_type>()) 81 { 82 state->on_connect = std::forward<CF>(cf); 83 } 84 85 using dynamic_observable<T>::on_subscribe; 86 87 void on_connect(composite_subscription cs) const { 88 state->on_connect(std::move(cs)); 89 } 90 }; 91 92 template<class T, class Source> 93 connectable_observable<T> make_dynamic_connectable_observable(Source&& s) { 94 return connectable_observable<T>(dynamic_connectable_observable<T>(std::forward<Source>(s))); 95 } 96 97 98 /*! 99 \brief a source of values that is shared across all subscribers and does not start until connectable_observable::connect() is called. 100 101 \ingroup group-observable 102 103 */ 104 template<class T, class SourceOperator> 105 class connectable_observable 106 : public observable<T, SourceOperator> 107 { 108 typedef connectable_observable<T, SourceOperator> this_type; 109 typedef observable<T, SourceOperator> base_type; 110 typedef rxu::decay_t<SourceOperator> source_operator_type; 111 112 static_assert(detail::has_on_connect<source_operator_type>::value, "inner must have on_connect method void(composite_subscription)"); 113 114 public: 115 typedef tag_connectable_observable observable_tag; 116 117 connectable_observable() 118 { 119 } 120 121 explicit connectable_observable(const SourceOperator& o) 122 : base_type(o) 123 { 124 } 125 explicit connectable_observable(SourceOperator&& o) 126 : base_type(std::move(o)) 127 { 128 } 129 130 // implicit conversion between observables of the same value_type 131 template<class SO> 132 connectable_observable(const connectable_observable<T, SO>& o) 133 : base_type(o) 134 {} 135 // implicit conversion between observables of the same value_type 136 template<class SO> 137 connectable_observable(connectable_observable<T, SO>&& o) 138 : base_type(std::move(o)) 139 {} 140 141 /// 142 /// takes any function that will take this observable and produce a result value. 143 /// this is intended to allow externally defined operators, that use subscribe, 144 /// to be connected into the expression. 145 /// 146 template<class OperatorFactory> 147 auto op(OperatorFactory&& of) const 148 -> decltype(of(*(const this_type*)nullptr)) { 149 return of(*this); 150 static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)"); 151 } 152 153 /// 154 /// performs type-forgetting conversion to a new composite_observable 155 /// 156 connectable_observable<T> as_dynamic() { 157 return *this; 158 } 159 160 composite_subscription connect(composite_subscription cs = composite_subscription()) { 161 base_type::source_operator.on_connect(cs); 162 return cs; 163 } 164 165 /*! @copydoc rx-ref_count.hpp 166 */ 167 template<class... AN> 168 auto ref_count(AN... an) const 169 /// \cond SHOW_SERVICE_MEMBERS 170 -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) 171 /// \endcond 172 { 173 return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...); 174 } 175 176 /*! @copydoc rx-connect_forever.hpp 177 */ 178 template<class... AN> 179 auto connect_forever(AN... an) const 180 /// \cond SHOW_SERVICE_MEMBERS 181 -> decltype(observable_member(connect_forever_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) 182 /// \endcond 183 { 184 return observable_member(connect_forever_tag{}, *this, std::forward<AN>(an)...); 185 } 186 }; 187 188 189 } 190 191 // 192 // support range() >> filter() >> subscribe() syntax 193 // '>>' is spelled 'stream' 194 // 195 template<class T, class SourceOperator, class OperatorFactory> 196 auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of) 197 -> decltype(source.op(std::forward<OperatorFactory>(of))) { 198 return source.op(std::forward<OperatorFactory>(of)); 199 } 200 201 // 202 // support range() | filter() | subscribe() syntax 203 // '|' is spelled 'pipe' 204 // 205 template<class T, class SourceOperator, class OperatorFactory> 206 auto operator | (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of) 207 -> decltype(source.op(std::forward<OperatorFactory>(of))) { 208 return source.op(std::forward<OperatorFactory>(of)); 209 } 210 211 #endif 212