Home | History | Annotate | Download | only in schedulers
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_SCHEDULER_RUN_LOOP_HPP)
      6 #define RXCPP_RX_SCHEDULER_RUN_LOOP_HPP
      7 
      8 #include "../rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace schedulers {
     13 
     14 namespace detail {
     15 
     16 struct run_loop_state : public std::enable_shared_from_this<run_loop_state>
     17 {
     18     typedef scheduler::clock_type clock_type;
     19 
     20     typedef detail::schedulable_queue<
     21         clock_type::time_point> queue_item_time;
     22 
     23     typedef queue_item_time::item_type item_type;
     24     typedef queue_item_time::const_reference const_reference_item_type;
     25 
     26     virtual ~run_loop_state()
     27     {
     28     }
     29 
     30     run_loop_state()
     31     {
     32     }
     33 
     34     composite_subscription lifetime;
     35     mutable std::mutex lock;
     36     mutable queue_item_time q;
     37     recursion r;
     38     std::function<void(clock_type::time_point)> notify_earlier_wakeup;
     39 };
     40 
     41 }
     42 
     43 
     44 struct run_loop_scheduler : public scheduler_interface
     45 {
     46 private:
     47     typedef run_loop_scheduler this_type;
     48     run_loop_scheduler(const this_type&);
     49 
     50     struct run_loop_worker : public worker_interface
     51     {
     52     private:
     53         typedef run_loop_worker this_type;
     54 
     55         run_loop_worker(const this_type&);
     56 
     57     public:
     58         std::weak_ptr<detail::run_loop_state> state;
     59 
     60         virtual ~run_loop_worker()
     61         {
     62         }
     63 
     64         explicit run_loop_worker(std::weak_ptr<detail::run_loop_state> ws)
     65             : state(ws)
     66         {
     67         }
     68 
     69         virtual clock_type::time_point now() const {
     70             return clock_type::now();
     71         }
     72 
     73         virtual void schedule(const schedulable& scbl) const {
     74             schedule(now(), scbl);
     75         }
     76 
     77         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
     78             if (scbl.is_subscribed()) {
     79                 auto st = state.lock();
     80                 std::unique_lock<std::mutex> guard(st->lock);
     81                 const bool need_earlier_wakeup_notification = st->notify_earlier_wakeup &&
     82                                                               (st->q.empty() || when < st->q.top().when);
     83                 st->q.push(detail::run_loop_state::item_type(when, scbl));
     84                 st->r.reset(false);
     85                 if (need_earlier_wakeup_notification) st->notify_earlier_wakeup(when);
     86                 guard.unlock(); // So we can't get attempt to recursively lock the state
     87             }
     88         }
     89     };
     90 
     91     std::weak_ptr<detail::run_loop_state> state;
     92 
     93 public:
     94     explicit run_loop_scheduler(std::weak_ptr<detail::run_loop_state> ws)
     95         : state(ws)
     96     {
     97     }
     98     virtual ~run_loop_scheduler()
     99     {
    100     }
    101 
    102     virtual clock_type::time_point now() const {
    103         return clock_type::now();
    104     }
    105 
    106     virtual worker create_worker(composite_subscription cs) const {
    107         auto lifetime = state.lock()->lifetime;
    108         auto token = lifetime.add(cs);
    109         cs.add([=](){lifetime.remove(token);});
    110         return worker(cs, create_worker_interface());
    111     }
    112 
    113     std::shared_ptr<worker_interface> create_worker_interface() const {
    114         return std::make_shared<run_loop_worker>(state);
    115     }
    116 };
    117 
    118 class run_loop
    119 {
    120 private:
    121     typedef run_loop this_type;
    122     // don't allow this instance to copy/move since it owns current_thread queue
    123     // for the thread it is constructed on.
    124     run_loop(const this_type&);
    125     run_loop(this_type&&);
    126 
    127     typedef detail::action_queue queue_type;
    128 
    129     typedef detail::run_loop_state::item_type item_type;
    130     typedef detail::run_loop_state::const_reference_item_type const_reference_item_type;
    131 
    132     std::shared_ptr<detail::run_loop_state> state;
    133     std::shared_ptr<run_loop_scheduler> sc;
    134 
    135 public:
    136     typedef scheduler::clock_type clock_type;
    137     run_loop()
    138         : state(std::make_shared<detail::run_loop_state>())
    139         , sc(std::make_shared<run_loop_scheduler>(state))
    140     {
    141         // take ownership so that the current_thread scheduler
    142         // uses the same queue on this thread
    143         queue_type::ensure(sc->create_worker_interface());
    144     }
    145     ~run_loop()
    146     {
    147         state->lifetime.unsubscribe();
    148 
    149         std::unique_lock<std::mutex> guard(state->lock);
    150 
    151         // release ownership
    152         queue_type::destroy();
    153 
    154         auto expired = std::move(state->q);
    155         if (!state->q.empty()) std::terminate();
    156     }
    157 
    158     clock_type::time_point now() const {
    159         return clock_type::now();
    160     }
    161 
    162     composite_subscription get_subscription() const {
    163         return state->lifetime;
    164     }
    165 
    166     bool empty() const {
    167         return state->q.empty();
    168     }
    169 
    170     const_reference_item_type peek() const {
    171         return state->q.top();
    172     }
    173 
    174     void dispatch() const {
    175         std::unique_lock<std::mutex> guard(state->lock);
    176         if (state->q.empty()) {
    177             return;
    178         }
    179         auto& peek = state->q.top();
    180         if (!peek.what.is_subscribed()) {
    181             state->q.pop();
    182             return;
    183         }
    184         if (clock_type::now() < peek.when) {
    185             return;
    186         }
    187         auto what = peek.what;
    188         state->q.pop();
    189         state->r.reset(state->q.empty());
    190         guard.unlock();
    191         what(state->r.get_recurse());
    192     }
    193 
    194     scheduler get_scheduler() const {
    195         return make_scheduler(sc);
    196     }
    197 
    198     void set_notify_earlier_wakeup(std::function<void(clock_type::time_point)> const& f) {
    199         std::unique_lock<std::mutex> guard(state->lock);
    200         state->notify_earlier_wakeup = f;
    201     }
    202 };
    203 
    204 inline scheduler make_run_loop(const run_loop& r) {
    205     return r.get_scheduler();
    206 }
    207 
    208 }
    209 
    210 }
    211 
    212 #endif
    213