Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-observe_on.hpp
      6 
      7     \brief All values are queued and delivered using the scheduler from the supplied coordination.
      8 
      9     \tparam Coordination  the type of the scheduler.
     10 
     11     \param  cn  the scheduler to notify observers on.
     12 
     13     \return  The source observable modified so that its observers are notified on the specified scheduler.
     14 
     15     \sample
     16     \snippet observe_on.cpp observe_on sample
     17     \snippet output.txt observe_on sample
     18 
     19     Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results:
     20     \snippet output.txt subscribe_on sample
     21 */
     22 
     23 #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP)
     24 #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP
     25 
     26 #include "../rx-includes.hpp"
     27 
     28 namespace rxcpp {
     29 
     30 namespace operators {
     31 
     32 namespace detail {
     33 
     34 template<class... AN>
     35 struct observe_on_invalid_arguments {};
     36 
     37 template<class... AN>
     38 struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> {
     39     using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>;
     40 };
     41 template<class... AN>
     42 using observe_on_invalid_t = typename observe_on_invalid<AN...>::type;
     43 
     44 template<class T, class Coordination>
     45 struct observe_on
     46 {
     47     typedef rxu::decay_t<T> source_value_type;
     48 
     49     typedef rxu::decay_t<Coordination> coordination_type;
     50     typedef typename coordination_type::coordinator_type coordinator_type;
     51 
     52     coordination_type coordination;
     53 
     54     observe_on(coordination_type cn)
     55         : coordination(std::move(cn))
     56     {
     57     }
     58 
     59     template<class Subscriber>
     60     struct observe_on_observer
     61     {
     62         typedef observe_on_observer<Subscriber> this_type;
     63         typedef source_value_type value_type;
     64         typedef rxu::decay_t<Subscriber> dest_type;
     65         typedef observer<value_type, this_type> observer_type;
     66 
     67         typedef rxn::notification<T> notification_type;
     68         typedef typename notification_type::type base_notification_type;
     69         typedef std::deque<base_notification_type> queue_type;
     70 
     71         struct mode
     72         {
     73             enum type {
     74                 Invalid = 0,
     75                 Processing,
     76                 Empty,
     77                 Disposed,
     78                 Errored
     79             };
     80         };
     81         struct observe_on_state : std::enable_shared_from_this<observe_on_state>
     82         {
     83             mutable std::mutex lock;
     84             mutable queue_type fill_queue;
     85             mutable queue_type drain_queue;
     86             composite_subscription lifetime;
     87             mutable typename mode::type current;
     88             coordinator_type coordinator;
     89             dest_type destination;
     90 
     91             observe_on_state(dest_type d, coordinator_type coor, composite_subscription cs)
     92                 : lifetime(std::move(cs))
     93                 , current(mode::Empty)
     94                 , coordinator(std::move(coor))
     95                 , destination(std::move(d))
     96             {
     97             }
     98 
     99             void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const {
    100                 if (!guard.owns_lock()) {
    101                     std::terminate();
    102                 }
    103                 if (current == mode::Errored || current == mode::Disposed) {return;}
    104                 current = end;
    105                 queue_type fill_expired;
    106                 swap(fill_expired, fill_queue);
    107                 queue_type drain_expired;
    108                 swap(drain_expired, drain_queue);
    109                 RXCPP_UNWIND_AUTO([&](){guard.lock();});
    110                 guard.unlock();
    111                 lifetime.unsubscribe();
    112                 destination.unsubscribe();
    113             }
    114 
    115             void ensure_processing(std::unique_lock<std::mutex>& guard) const {
    116                 if (!guard.owns_lock()) {
    117                     std::terminate();
    118                 }
    119                 if (current == mode::Empty) {
    120                     current = mode::Processing;
    121 
    122                     if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) {
    123                         finish(guard, mode::Disposed);
    124                     }
    125 
    126                     auto keepAlive = this->shared_from_this();
    127 
    128                     auto drain = [keepAlive, this](const rxsc::schedulable& self){
    129                         using std::swap;
    130                         RXCPP_TRY {
    131                             for (;;) {
    132                                 if (drain_queue.empty() || !destination.is_subscribed()) {
    133                                     std::unique_lock<std::mutex> guard(lock);
    134                                     if (!destination.is_subscribed() ||
    135                                         (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
    136                                         finish(guard, mode::Disposed);
    137                                         return;
    138                                     }
    139                                     if (drain_queue.empty()) {
    140                                         if (fill_queue.empty()) {
    141                                             current = mode::Empty;
    142                                             return;
    143                                         }
    144                                         swap(fill_queue, drain_queue);
    145                                     }
    146                                 }
    147                                 auto notification = std::move(drain_queue.front());
    148                                 drain_queue.pop_front();
    149                                 notification->accept(destination);
    150                                 std::unique_lock<std::mutex> guard(lock);
    151                                 self();
    152                                 if (lifetime.is_subscribed()) break;
    153                             }
    154                         }
    155                         RXCPP_CATCH(...) {
    156                             destination.on_error(rxu::current_exception());
    157                             std::unique_lock<std::mutex> guard(lock);
    158                             finish(guard, mode::Errored);
    159                         }
    160                     };
    161 
    162                     auto selectedDrain = on_exception(
    163                         [&](){return coordinator.act(drain);},
    164                         destination);
    165                     if (selectedDrain.empty()) {
    166                         finish(guard, mode::Errored);
    167                         return;
    168                     }
    169 
    170                     auto processor = coordinator.get_worker();
    171 
    172                     RXCPP_UNWIND_AUTO([&](){guard.lock();});
    173                     guard.unlock();
    174 
    175                     processor.schedule(selectedDrain.get());
    176                 }
    177             }
    178         };
    179         std::shared_ptr<observe_on_state> state;
    180 
    181         observe_on_observer(dest_type d, coordinator_type coor, composite_subscription cs)
    182             : state(std::make_shared<observe_on_state>(std::move(d), std::move(coor), std::move(cs)))
    183         {
    184         }
    185 
    186         void on_next(source_value_type v) const {
    187             std::unique_lock<std::mutex> guard(state->lock);
    188             if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
    189             state->fill_queue.push_back(notification_type::on_next(std::move(v)));
    190             state->ensure_processing(guard);
    191         }
    192         void on_error(rxu::error_ptr e) const {
    193             std::unique_lock<std::mutex> guard(state->lock);
    194             if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
    195             state->fill_queue.push_back(notification_type::on_error(e));
    196             state->ensure_processing(guard);
    197         }
    198         void on_completed() const {
    199             std::unique_lock<std::mutex> guard(state->lock);
    200             if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
    201             state->fill_queue.push_back(notification_type::on_completed());
    202             state->ensure_processing(guard);
    203         }
    204 
    205         static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, coordination_type cn, composite_subscription cs = composite_subscription()) {
    206             auto coor = cn.create_coordinator(d.get_subscription());
    207             d.add(cs);
    208 
    209             this_type o(d, std::move(coor), cs);
    210             auto keepAlive = o.state;
    211             cs.add([=](){
    212                 std::unique_lock<std::mutex> guard(keepAlive->lock);
    213                 keepAlive->ensure_processing(guard);
    214             });
    215 
    216             return make_subscriber<value_type>(d, cs, make_observer<value_type>(std::move(o)));
    217         }
    218     };
    219 
    220     template<class Subscriber>
    221     auto operator()(Subscriber dest) const
    222         -> decltype(observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination)) {
    223         return      observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination);
    224     }
    225 };
    226 
    227 }
    228 
    229 /*! @copydoc rx-observe_on.hpp
    230 */
    231 template<class... AN>
    232 auto observe_on(AN&&... an)
    233     ->      operator_factory<observe_on_tag, AN...> {
    234      return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    235 }
    236 
    237 }
    238 
    239 template<>
    240 struct member_overload<observe_on_tag>
    241 {
    242     template<class Observable, class Coordination,
    243         class Enabled = rxu::enable_if_all_true_type_t<
    244             is_observable<Observable>,
    245             is_coordination<Coordination>>,
    246         class SourceValue = rxu::value_type_t<Observable>,
    247         class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>>
    248     static auto member(Observable&& o, Coordination&& cn)
    249         -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) {
    250         return      o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)));
    251     }
    252 
    253     template<class... AN>
    254     static operators::detail::observe_on_invalid_t<AN...> member(AN...) {
    255         std::terminate();
    256         return {};
    257         static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)");
    258     }
    259 };
    260 
    261 class observe_on_one_worker : public coordination_base
    262 {
    263     rxsc::scheduler factory;
    264 
    265     class input_type
    266     {
    267         rxsc::worker controller;
    268         rxsc::scheduler factory;
    269         identity_one_worker coordination;
    270     public:
    271         explicit input_type(rxsc::worker w)
    272             : controller(w)
    273             , factory(rxsc::make_same_worker(w))
    274             , coordination(factory)
    275         {
    276         }
    277         inline rxsc::worker get_worker() const {
    278             return controller;
    279         }
    280         inline rxsc::scheduler get_scheduler() const {
    281             return factory;
    282         }
    283         inline rxsc::scheduler::clock_type::time_point now() const {
    284             return factory.now();
    285         }
    286         template<class Observable>
    287         auto in(Observable o) const
    288             -> decltype(o.observe_on(coordination)) {
    289             return      o.observe_on(coordination);
    290         }
    291         template<class Subscriber>
    292         auto out(Subscriber s) const
    293             -> Subscriber {
    294             return s;
    295         }
    296         template<class F>
    297         auto act(F f) const
    298             -> F {
    299             return f;
    300         }
    301     };
    302 
    303 public:
    304 
    305     explicit observe_on_one_worker(rxsc::scheduler sc) : factory(sc) {}
    306 
    307     typedef coordinator<input_type> coordinator_type;
    308 
    309     inline rxsc::scheduler::clock_type::time_point now() const {
    310         return factory.now();
    311     }
    312 
    313     inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
    314         auto w = factory.create_worker(std::move(cs));
    315         return coordinator_type(input_type(std::move(w)));
    316     }
    317 };
    318 
    319 inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) {
    320     return observe_on_one_worker(rxsc::make_run_loop(rl));
    321 }
    322 
    323 inline observe_on_one_worker observe_on_event_loop() {
    324     static observe_on_one_worker r(rxsc::make_event_loop());
    325     return r;
    326 }
    327 
    328 inline observe_on_one_worker observe_on_new_thread() {
    329     static observe_on_one_worker r(rxsc::make_new_thread());
    330     return r;
    331 }
    332 
    333 }
    334 
    335 #endif
    336