1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-observe_on.hpp 6 7 \brief All values are queued and delivered using the scheduler from the supplied coordination. 8 9 \tparam Coordination the type of the scheduler. 10 11 \param cn the scheduler to notify observers on. 12 13 \return The source observable modified so that its observers are notified on the specified scheduler. 14 15 \sample 16 \snippet observe_on.cpp observe_on sample 17 \snippet output.txt observe_on sample 18 19 Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results: 20 \snippet output.txt subscribe_on sample 21 */ 22 23 #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP) 24 #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP 25 26 #include "../rx-includes.hpp" 27 28 namespace rxcpp { 29 30 namespace operators { 31 32 namespace detail { 33 34 template<class... AN> 35 struct observe_on_invalid_arguments {}; 36 37 template<class... AN> 38 struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> { 39 using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>; 40 }; 41 template<class... AN> 42 using observe_on_invalid_t = typename observe_on_invalid<AN...>::type; 43 44 template<class T, class Coordination> 45 struct observe_on 46 { 47 typedef rxu::decay_t<T> source_value_type; 48 49 typedef rxu::decay_t<Coordination> coordination_type; 50 typedef typename coordination_type::coordinator_type coordinator_type; 51 52 coordination_type coordination; 53 54 observe_on(coordination_type cn) 55 : coordination(std::move(cn)) 56 { 57 } 58 59 template<class Subscriber> 60 struct observe_on_observer 61 { 62 typedef observe_on_observer<Subscriber> this_type; 63 typedef source_value_type value_type; 64 typedef rxu::decay_t<Subscriber> dest_type; 65 typedef observer<value_type, this_type> observer_type; 66 67 typedef rxn::notification<T> notification_type; 68 typedef typename notification_type::type base_notification_type; 69 typedef std::deque<base_notification_type> queue_type; 70 71 struct mode 72 { 73 enum type { 74 Invalid = 0, 75 Processing, 76 Empty, 77 Disposed, 78 Errored 79 }; 80 }; 81 struct observe_on_state : std::enable_shared_from_this<observe_on_state> 82 { 83 mutable std::mutex lock; 84 mutable queue_type fill_queue; 85 mutable queue_type drain_queue; 86 composite_subscription lifetime; 87 mutable typename mode::type current; 88 coordinator_type coordinator; 89 dest_type destination; 90 91 observe_on_state(dest_type d, coordinator_type coor, composite_subscription cs) 92 : lifetime(std::move(cs)) 93 , current(mode::Empty) 94 , coordinator(std::move(coor)) 95 , destination(std::move(d)) 96 { 97 } 98 99 void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const { 100 if (!guard.owns_lock()) { 101 std::terminate(); 102 } 103 if (current == mode::Errored || current == mode::Disposed) {return;} 104 current = end; 105 queue_type fill_expired; 106 swap(fill_expired, fill_queue); 107 queue_type drain_expired; 108 swap(drain_expired, drain_queue); 109 RXCPP_UNWIND_AUTO([&](){guard.lock();}); 110 guard.unlock(); 111 lifetime.unsubscribe(); 112 destination.unsubscribe(); 113 } 114 115 void ensure_processing(std::unique_lock<std::mutex>& guard) const { 116 if (!guard.owns_lock()) { 117 std::terminate(); 118 } 119 if (current == mode::Empty) { 120 current = mode::Processing; 121 122 if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) { 123 finish(guard, mode::Disposed); 124 } 125 126 auto keepAlive = this->shared_from_this(); 127 128 auto drain = [keepAlive, this](const rxsc::schedulable& self){ 129 using std::swap; 130 RXCPP_TRY { 131 for (;;) { 132 if (drain_queue.empty() || !destination.is_subscribed()) { 133 std::unique_lock<std::mutex> guard(lock); 134 if (!destination.is_subscribed() || 135 (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) { 136 finish(guard, mode::Disposed); 137 return; 138 } 139 if (drain_queue.empty()) { 140 if (fill_queue.empty()) { 141 current = mode::Empty; 142 return; 143 } 144 swap(fill_queue, drain_queue); 145 } 146 } 147 auto notification = std::move(drain_queue.front()); 148 drain_queue.pop_front(); 149 notification->accept(destination); 150 std::unique_lock<std::mutex> guard(lock); 151 self(); 152 if (lifetime.is_subscribed()) break; 153 } 154 } 155 RXCPP_CATCH(...) { 156 destination.on_error(rxu::current_exception()); 157 std::unique_lock<std::mutex> guard(lock); 158 finish(guard, mode::Errored); 159 } 160 }; 161 162 auto selectedDrain = on_exception( 163 [&](){return coordinator.act(drain);}, 164 destination); 165 if (selectedDrain.empty()) { 166 finish(guard, mode::Errored); 167 return; 168 } 169 170 auto processor = coordinator.get_worker(); 171 172 RXCPP_UNWIND_AUTO([&](){guard.lock();}); 173 guard.unlock(); 174 175 processor.schedule(selectedDrain.get()); 176 } 177 } 178 }; 179 std::shared_ptr<observe_on_state> state; 180 181 observe_on_observer(dest_type d, coordinator_type coor, composite_subscription cs) 182 : state(std::make_shared<observe_on_state>(std::move(d), std::move(coor), std::move(cs))) 183 { 184 } 185 186 void on_next(source_value_type v) const { 187 std::unique_lock<std::mutex> guard(state->lock); 188 if (state->current == mode::Errored || state->current == mode::Disposed) { return; } 189 state->fill_queue.push_back(notification_type::on_next(std::move(v))); 190 state->ensure_processing(guard); 191 } 192 void on_error(rxu::error_ptr e) const { 193 std::unique_lock<std::mutex> guard(state->lock); 194 if (state->current == mode::Errored || state->current == mode::Disposed) { return; } 195 state->fill_queue.push_back(notification_type::on_error(e)); 196 state->ensure_processing(guard); 197 } 198 void on_completed() const { 199 std::unique_lock<std::mutex> guard(state->lock); 200 if (state->current == mode::Errored || state->current == mode::Disposed) { return; } 201 state->fill_queue.push_back(notification_type::on_completed()); 202 state->ensure_processing(guard); 203 } 204 205 static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, coordination_type cn, composite_subscription cs = composite_subscription()) { 206 auto coor = cn.create_coordinator(d.get_subscription()); 207 d.add(cs); 208 209 this_type o(d, std::move(coor), cs); 210 auto keepAlive = o.state; 211 cs.add([=](){ 212 std::unique_lock<std::mutex> guard(keepAlive->lock); 213 keepAlive->ensure_processing(guard); 214 }); 215 216 return make_subscriber<value_type>(d, cs, make_observer<value_type>(std::move(o))); 217 } 218 }; 219 220 template<class Subscriber> 221 auto operator()(Subscriber dest) const 222 -> decltype(observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination)) { 223 return observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination); 224 } 225 }; 226 227 } 228 229 /*! @copydoc rx-observe_on.hpp 230 */ 231 template<class... AN> 232 auto observe_on(AN&&... an) 233 -> operator_factory<observe_on_tag, AN...> { 234 return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 235 } 236 237 } 238 239 template<> 240 struct member_overload<observe_on_tag> 241 { 242 template<class Observable, class Coordination, 243 class Enabled = rxu::enable_if_all_true_type_t< 244 is_observable<Observable>, 245 is_coordination<Coordination>>, 246 class SourceValue = rxu::value_type_t<Observable>, 247 class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>> 248 static auto member(Observable&& o, Coordination&& cn) 249 -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) { 250 return o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn))); 251 } 252 253 template<class... AN> 254 static operators::detail::observe_on_invalid_t<AN...> member(AN...) { 255 std::terminate(); 256 return {}; 257 static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)"); 258 } 259 }; 260 261 class observe_on_one_worker : public coordination_base 262 { 263 rxsc::scheduler factory; 264 265 class input_type 266 { 267 rxsc::worker controller; 268 rxsc::scheduler factory; 269 identity_one_worker coordination; 270 public: 271 explicit input_type(rxsc::worker w) 272 : controller(w) 273 , factory(rxsc::make_same_worker(w)) 274 , coordination(factory) 275 { 276 } 277 inline rxsc::worker get_worker() const { 278 return controller; 279 } 280 inline rxsc::scheduler get_scheduler() const { 281 return factory; 282 } 283 inline rxsc::scheduler::clock_type::time_point now() const { 284 return factory.now(); 285 } 286 template<class Observable> 287 auto in(Observable o) const 288 -> decltype(o.observe_on(coordination)) { 289 return o.observe_on(coordination); 290 } 291 template<class Subscriber> 292 auto out(Subscriber s) const 293 -> Subscriber { 294 return s; 295 } 296 template<class F> 297 auto act(F f) const 298 -> F { 299 return f; 300 } 301 }; 302 303 public: 304 305 explicit observe_on_one_worker(rxsc::scheduler sc) : factory(sc) {} 306 307 typedef coordinator<input_type> coordinator_type; 308 309 inline rxsc::scheduler::clock_type::time_point now() const { 310 return factory.now(); 311 } 312 313 inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const { 314 auto w = factory.create_worker(std::move(cs)); 315 return coordinator_type(input_type(std::move(w))); 316 } 317 }; 318 319 inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) { 320 return observe_on_one_worker(rxsc::make_run_loop(rl)); 321 } 322 323 inline observe_on_one_worker observe_on_event_loop() { 324 static observe_on_one_worker r(rxsc::make_event_loop()); 325 return r; 326 } 327 328 inline observe_on_one_worker observe_on_new_thread() { 329 static observe_on_one_worker r(rxsc::make_new_thread()); 330 return r; 331 } 332 333 } 334 335 #endif 336