Home | History | Annotate | Download | only in rxcpp
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_SCHEDULER_HPP)
      6 #define RXCPP_RX_SCHEDULER_HPP
      7 
      8 #include "rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace schedulers {
     13 
     14 class worker_interface;
     15 class scheduler_interface;
     16 
     17 namespace detail {
     18 
     19 class action_type;
     20 typedef std::shared_ptr<action_type> action_ptr;
     21 
     22 typedef std::shared_ptr<worker_interface> worker_interface_ptr;
     23 typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
     24 
     25 typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr;
     26 typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr;
     27 
     28 typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
     29 typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
     30 
     31 inline action_ptr shared_empty() {
     32     static action_ptr shared_empty = std::make_shared<detail::action_type>();
     33     return shared_empty;
     34 }
     35 
     36 }
     37 
     38 // It is essential to keep virtual function calls out of an inner loop.
     39 // To make tail-recursion work efficiently the recursion objects create
     40 // a space on the stack inside the virtual function call in the actor that
     41 // allows the callback and the scheduler to share stack space that records
     42 // the request and the allowance without any virtual calls in the loop.
     43 
     44 /// recursed is set on a schedulable by the action to allow the called
     45 /// function to request to be rescheduled.
     46 class recursed
     47 {
     48     bool& isrequested;
     49     recursed operator=(const recursed&);
     50 public:
     51     explicit recursed(bool& r)
     52         : isrequested(r)
     53     {
     54     }
     55     /// request to be rescheduled
     56     inline void operator()() const {
     57         isrequested = true;
     58     }
     59 };
     60 
     61 /// recurse is passed to the action by the scheduler.
     62 /// the action uses recurse to coordinate the scheduler and the function.
     63 class recurse
     64 {
     65     bool& isallowed;
     66     mutable bool isrequested;
     67     recursed requestor;
     68     recurse operator=(const recurse&);
     69 public:
     70     explicit recurse(bool& a)
     71         : isallowed(a)
     72         , isrequested(true)
     73         , requestor(isrequested)
     74     {
     75     }
     76     /// does the scheduler allow tail-recursion now?
     77     inline bool is_allowed() const {
     78         return isallowed;
     79     }
     80     /// did the function request to be recursed?
     81     inline bool is_requested() const {
     82         return isrequested;
     83     }
     84     /// reset the function request. call before each call to the function.
     85     inline void reset() const {
     86         isrequested = false;
     87     }
     88     /// get the recursed to set into the schedulable for the function to use to request recursion
     89     inline const recursed& get_recursed() const {
     90         return requestor;
     91     }
     92 };
     93 
     94 /// recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
     95 class recursion
     96 {
     97     mutable bool isallowed;
     98     recurse recursor;
     99     recursion operator=(const recursion&);
    100 public:
    101     recursion()
    102         : isallowed(true)
    103         , recursor(isallowed)
    104     {
    105     }
    106     explicit recursion(bool b)
    107         : isallowed(b)
    108         , recursor(isallowed)
    109     {
    110     }
    111     /// set whether tail-recursion is allowed
    112     inline void reset(bool b = true) const {
    113         isallowed = b;
    114     }
    115     /// get the recurse to pass into each action being called
    116     inline const recurse& get_recurse() const {
    117         return recursor;
    118     }
    119 };
    120 
    121 
    122 struct action_base
    123 {
    124     typedef tag_action action_tag;
    125 };
    126 
    127 class schedulable;
    128 
    129 /// action provides type-forgetting for a potentially recursive set of calls to a function that takes a schedulable
    130 class action : public action_base
    131 {
    132     typedef action this_type;
    133     detail::action_ptr inner;
    134 public:
    135     action()
    136     {
    137     }
    138     explicit action(detail::action_ptr i)
    139     : inner(std::move(i))
    140     {
    141     }
    142 
    143     /// return the empty action
    144     inline static action empty() {
    145         return action(detail::shared_empty());
    146     }
    147 
    148     /// call the function
    149     inline void operator()(const schedulable& s, const recurse& r) const;
    150 };
    151 
    152 struct scheduler_base
    153 {
    154     typedef std::chrono::steady_clock clock_type;
    155     typedef tag_scheduler scheduler_tag;
    156 };
    157 
    158 struct worker_base : public subscription_base
    159 {
    160     typedef tag_worker worker_tag;
    161 };
    162 
    163 class worker_interface
    164     : public std::enable_shared_from_this<worker_interface>
    165 {
    166     typedef worker_interface this_type;
    167 
    168 public:
    169     typedef scheduler_base::clock_type clock_type;
    170 
    171     virtual ~worker_interface() {}
    172 
    173     virtual clock_type::time_point now() const = 0;
    174 
    175     virtual void schedule(const schedulable& scbl) const = 0;
    176     virtual void schedule(clock_type::time_point when, const schedulable& scbl) const = 0;
    177 };
    178 
    179 namespace detail {
    180 
    181 template<class F>
    182 struct is_action_function
    183 {
    184     struct not_void {};
    185     template<class CF>
    186     static auto check(int) -> decltype((*(CF*)nullptr)(*(schedulable*)nullptr));
    187     template<class CF>
    188     static not_void check(...);
    189 
    190     static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
    191 };
    192 
    193 }
    194 
    195 class weak_worker;
    196 
    197 /// a worker ensures that all scheduled actions on the same instance are executed in-order with no overlap
    198 /// a worker ensures that all scheduled actions are unsubscribed when it is unsubscribed
    199 /// some inner implementations will impose additional constraints on the execution of items.
    200 class worker : public worker_base
    201 {
    202     typedef worker this_type;
    203     detail::worker_interface_ptr inner;
    204     composite_subscription lifetime;
    205     friend bool operator==(const worker&, const worker&);
    206     friend class weak_worker;
    207 public:
    208     typedef scheduler_base::clock_type clock_type;
    209     typedef composite_subscription::weak_subscription weak_subscription;
    210 
    211     worker()
    212     {
    213     }
    214     worker(composite_subscription cs, detail::const_worker_interface_ptr i)
    215         : inner(std::const_pointer_cast<worker_interface>(i))
    216         , lifetime(std::move(cs))
    217     {
    218     }
    219     worker(composite_subscription cs, worker o)
    220         : inner(o.inner)
    221         , lifetime(std::move(cs))
    222     {
    223     }
    224 
    225     inline const composite_subscription& get_subscription() const {
    226         return lifetime;
    227     }
    228     inline composite_subscription& get_subscription() {
    229         return lifetime;
    230     }
    231 
    232     // composite_subscription
    233     //
    234     inline bool is_subscribed() const {
    235         return lifetime.is_subscribed();
    236     }
    237     inline weak_subscription add(subscription s) const {
    238         return lifetime.add(std::move(s));
    239     }
    240     inline void remove(weak_subscription w) const {
    241         return lifetime.remove(std::move(w));
    242     }
    243     inline void clear() const {
    244         return lifetime.clear();
    245     }
    246     inline void unsubscribe() const {
    247         return lifetime.unsubscribe();
    248     }
    249 
    250     // worker_interface
    251     //
    252     /// return the current time for this worker
    253     inline clock_type::time_point now() const {
    254         return inner->now();
    255     }
    256 
    257     /// insert the supplied schedulable to be run as soon as possible
    258     inline void schedule(const schedulable& scbl) const {
    259         // force rebinding scbl to this worker
    260         schedule_rebind(scbl);
    261     }
    262 
    263     /// insert the supplied schedulable to be run at the time specified
    264     inline void schedule(clock_type::time_point when, const schedulable& scbl) const {
    265         // force rebinding scbl to this worker
    266         schedule_rebind(when, scbl);
    267     }
    268 
    269     // helpers
    270     //
    271 
    272     /// insert the supplied schedulable to be run at now() + the delay specified
    273     inline void schedule(clock_type::duration when, const schedulable& scbl) const {
    274         // force rebinding scbl to this worker
    275         schedule_rebind(now() + when, scbl);
    276     }
    277 
    278     /// insert the supplied schedulable to be run at the initial time specified and then again at initial + (N * period)
    279     /// this will continue until the worker or schedulable is unsubscribed.
    280     inline void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl) const {
    281         // force rebinding scbl to this worker
    282         schedule_periodically_rebind(initial, period, scbl);
    283     }
    284 
    285     /// insert the supplied schedulable to be run at now() + the initial delay specified and then again at now() + initial + (N * period)
    286     /// this will continue until the worker or schedulable is unsubscribed.
    287     inline void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable& scbl) const {
    288         // force rebinding scbl to this worker
    289         schedule_periodically_rebind(now() + initial, period, scbl);
    290     }
    291 
    292     /// use the supplied arguments to make a schedulable and then insert it to be run
    293     template<class Arg0, class... ArgN>
    294     auto schedule(Arg0&& a0, ArgN&&... an) const
    295         -> typename std::enable_if<
    296             (detail::is_action_function<Arg0>::value ||
    297             is_subscription<Arg0>::value) &&
    298             !is_schedulable<Arg0>::value>::type;
    299     template<class... ArgN>
    300     /// use the supplied arguments to make a schedulable and then insert it to be run
    301     void schedule_rebind(const schedulable& scbl, ArgN&&... an) const;
    302 
    303     /// use the supplied arguments to make a schedulable and then insert it to be run
    304     template<class Arg0, class... ArgN>
    305     auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
    306         -> typename std::enable_if<
    307             (detail::is_action_function<Arg0>::value ||
    308             is_subscription<Arg0>::value) &&
    309             !is_schedulable<Arg0>::value>::type;
    310     /// use the supplied arguments to make a schedulable and then insert it to be run
    311     template<class... ArgN>
    312     void schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const;
    313 
    314     /// use the supplied arguments to make a schedulable and then insert it to be run
    315     template<class Arg0, class... ArgN>
    316     auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
    317         -> typename std::enable_if<
    318             (detail::is_action_function<Arg0>::value ||
    319             is_subscription<Arg0>::value) &&
    320             !is_schedulable<Arg0>::value>::type;
    321     /// use the supplied arguments to make a schedulable and then insert it to be run
    322     template<class... ArgN>
    323     void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const;
    324 };
    325 
    326 inline bool operator==(const worker& lhs, const worker& rhs) {
    327     return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
    328 }
    329 inline bool operator!=(const worker& lhs, const worker& rhs) {
    330     return !(lhs == rhs);
    331 }
    332 
    333 class weak_worker
    334 {
    335     detail::worker_interface_weak_ptr inner;
    336     composite_subscription lifetime;
    337 
    338 public:
    339     weak_worker()
    340     {
    341     }
    342     explicit weak_worker(worker& owner)
    343         : inner(owner.inner)
    344         , lifetime(owner.lifetime)
    345     {
    346     }
    347 
    348     worker lock() const {
    349         return worker(lifetime, inner.lock());
    350     }
    351 };
    352 
    353 class scheduler_interface
    354     : public std::enable_shared_from_this<scheduler_interface>
    355 {
    356     typedef scheduler_interface this_type;
    357 
    358 public:
    359     typedef scheduler_base::clock_type clock_type;
    360 
    361     virtual ~scheduler_interface() {}
    362 
    363     virtual clock_type::time_point now() const = 0;
    364 
    365     virtual worker create_worker(composite_subscription cs) const = 0;
    366 };
    367 
    368 
    369 struct schedulable_base :
    370     // public subscription_base, <- already in worker base
    371     public worker_base,
    372     public action_base
    373 {
    374     typedef tag_schedulable schedulable_tag;
    375 };
    376 
    377 /*!
    378     \brief allows functions to be called at specified times and possibly in other contexts.
    379 
    380     \ingroup group-core
    381 
    382 */
    383 class scheduler : public scheduler_base
    384 {
    385     typedef scheduler this_type;
    386     detail::scheduler_interface_ptr inner;
    387     friend bool operator==(const scheduler&, const scheduler&);
    388 public:
    389     typedef scheduler_base::clock_type clock_type;
    390 
    391     scheduler()
    392     {
    393     }
    394     explicit scheduler(detail::scheduler_interface_ptr i)
    395         : inner(std::move(i))
    396     {
    397     }
    398     explicit scheduler(detail::const_scheduler_interface_ptr i)
    399         : inner(std::const_pointer_cast<scheduler_interface>(i))
    400     {
    401     }
    402 
    403     /// return the current time for this scheduler
    404     inline clock_type::time_point now() const {
    405         return inner->now();
    406     }
    407     /// create a worker with a lifetime.
    408     /// when the worker is unsubscribed all scheduled items will be unsubscribed.
    409     /// items scheduled to a worker will be run one at a time.
    410     /// scheduling order is preserved: when more than one item is scheduled for
    411     /// time T then at time T they will be run in the order that they were scheduled.
    412     inline worker create_worker(composite_subscription cs = composite_subscription()) const {
    413         return inner->create_worker(cs);
    414     }
    415 };
    416 
    417 template<class Scheduler, class... ArgN>
    418 inline scheduler make_scheduler(ArgN&&... an) {
    419     return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
    420 }
    421 
    422 inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) {
    423     return scheduler(si);
    424 }
    425 
    426 class schedulable : public schedulable_base
    427 {
    428     typedef schedulable this_type;
    429 
    430     composite_subscription lifetime;
    431     weak_worker controller;
    432     action activity;
    433     bool scoped;
    434     composite_subscription::weak_subscription action_scope;
    435 
    436     struct detacher
    437     {
    438         ~detacher()
    439         {
    440             if (that) {
    441                 that->unsubscribe();
    442             }
    443         }
    444         detacher(const this_type* that)
    445             : that(that)
    446         {
    447         }
    448         const this_type* that;
    449     };
    450 
    451     class recursed_scope_type
    452     {
    453         mutable const recursed* requestor;
    454 
    455         class exit_recursed_scope_type
    456         {
    457             const recursed_scope_type* that;
    458         public:
    459             ~exit_recursed_scope_type()
    460             {
    461                 if (that != nullptr) {
    462                     that->requestor = nullptr;
    463                 }
    464             }
    465             exit_recursed_scope_type(const recursed_scope_type* that)
    466                 : that(that)
    467             {
    468             }
    469             exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT
    470                 : that(other.that)
    471             {
    472                 other.that = nullptr;
    473             }
    474         };
    475     public:
    476         recursed_scope_type()
    477             : requestor(nullptr)
    478         {
    479         }
    480         recursed_scope_type(const recursed_scope_type&)
    481             : requestor(nullptr)
    482         {
    483             // does not aquire recursion scope
    484         }
    485         recursed_scope_type& operator=(const recursed_scope_type& )
    486         {
    487             // no change in recursion scope
    488             return *this;
    489         }
    490         exit_recursed_scope_type reset(const recurse& r) const {
    491             requestor = std::addressof(r.get_recursed());
    492             return exit_recursed_scope_type(this);
    493         }
    494         bool is_recursed() const {
    495             return !!requestor;
    496         }
    497         void operator()() const {
    498             (*requestor)();
    499         }
    500     };
    501     recursed_scope_type recursed_scope;
    502 
    503 public:
    504     typedef composite_subscription::weak_subscription weak_subscription;
    505     typedef scheduler_base::clock_type clock_type;
    506 
    507     ~schedulable()
    508     {
    509         if (scoped) {
    510             controller.lock().remove(action_scope);
    511         }
    512     }
    513     schedulable()
    514         : scoped(false)
    515     {
    516     }
    517 
    518     /// action and worker share lifetime
    519     schedulable(worker q, action a)
    520         : lifetime(q.get_subscription())
    521         , controller(q)
    522         , activity(std::move(a))
    523         , scoped(false)
    524     {
    525     }
    526     /// action and worker have independent lifetimes
    527     schedulable(composite_subscription cs, worker q, action a)
    528         : lifetime(std::move(cs))
    529         , controller(q)
    530         , activity(std::move(a))
    531         , scoped(true)
    532         , action_scope(controller.lock().add(lifetime))
    533     {
    534     }
    535     /// inherit lifetimes
    536     schedulable(schedulable scbl, worker q, action a)
    537         : lifetime(scbl.get_subscription())
    538         , controller(q)
    539         , activity(std::move(a))
    540         , scoped(scbl.scoped)
    541         , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription())
    542     {
    543     }
    544 
    545     inline const composite_subscription& get_subscription() const {
    546         return lifetime;
    547     }
    548     inline composite_subscription& get_subscription() {
    549         return lifetime;
    550     }
    551     inline const worker get_worker() const {
    552         return controller.lock();
    553     }
    554     inline worker get_worker() {
    555         return controller.lock();
    556     }
    557     inline const action& get_action() const {
    558         return activity;
    559     }
    560     inline action& get_action() {
    561         return activity;
    562     }
    563 
    564     inline static schedulable empty(worker sc) {
    565         return schedulable(composite_subscription::empty(), sc, action::empty());
    566     }
    567 
    568     inline auto set_recursed(const recurse& r) const
    569         -> decltype(recursed_scope.reset(r)) {
    570         return      recursed_scope.reset(r);
    571     }
    572 
    573     // recursed
    574     //
    575     bool is_recursed() const {
    576         return recursed_scope.is_recursed();
    577     }
    578     /// requests tail-recursion of the same action
    579     /// this will exit the process if called when
    580     /// is_recursed() is false.
    581     /// Note: to improve perf it is not required
    582     /// to call is_recursed() before calling this
    583     /// operator. Context is sufficient. The schedulable
    584     /// passed to the action by the scheduler will return
    585     /// true from is_recursed()
    586     inline void operator()() const {
    587         recursed_scope();
    588     }
    589 
    590     // composite_subscription
    591     //
    592     inline bool is_subscribed() const {
    593         return lifetime.is_subscribed();
    594     }
    595     inline weak_subscription add(subscription s) const {
    596         return lifetime.add(std::move(s));
    597     }
    598     template<class F>
    599     auto add(F f) const
    600     -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
    601         return lifetime.add(make_subscription(std::move(f)));
    602     }
    603     inline void remove(weak_subscription w) const {
    604         return lifetime.remove(std::move(w));
    605     }
    606     inline void clear() const {
    607         return lifetime.clear();
    608     }
    609     inline void unsubscribe() const {
    610         return lifetime.unsubscribe();
    611     }
    612 
    613     // scheduler
    614     //
    615     inline clock_type::time_point now() const {
    616         return controller.lock().now();
    617     }
    618     /// put this on the queue of the stored scheduler to run asap
    619     inline void schedule() const {
    620         if (is_subscribed()) {
    621             get_worker().schedule(*this);
    622         }
    623     }
    624     /// put this on the queue of the stored scheduler to run at the specified time
    625     inline void schedule(clock_type::time_point when) const {
    626         if (is_subscribed()) {
    627             get_worker().schedule(when, *this);
    628         }
    629     }
    630     /// put this on the queue of the stored scheduler to run after a delay from now
    631     inline void schedule(clock_type::duration when) const {
    632         if (is_subscribed()) {
    633             get_worker().schedule(when, *this);
    634         }
    635     }
    636 
    637     // action
    638     //
    639     /// invokes the action
    640     inline void operator()(const recurse& r) const {
    641         if (!is_subscribed()) {
    642             return;
    643         }
    644         detacher protect(this);
    645         activity(*this, r);
    646         protect.that = nullptr;
    647     }
    648 };
    649 
    650 struct current_thread;
    651 
    652 namespace detail {
    653 
    654 class action_type
    655     : public std::enable_shared_from_this<action_type>
    656 {
    657     typedef action_type this_type;
    658 
    659 public:
    660     typedef std::function<void(const schedulable&, const recurse&)> function_type;
    661 
    662 private:
    663     function_type f;
    664 
    665 public:
    666     action_type()
    667     {
    668     }
    669 
    670     action_type(function_type f)
    671         : f(std::move(f))
    672     {
    673     }
    674 
    675     inline void operator()(const schedulable& s, const recurse& r) {
    676         if (!f) {
    677             std::terminate();
    678         }
    679         f(s, r);
    680     }
    681 };
    682 
    683 class action_tailrecurser
    684     : public std::enable_shared_from_this<action_type>
    685 {
    686     typedef action_type this_type;
    687 
    688 public:
    689     typedef std::function<void(const schedulable&)> function_type;
    690 
    691 private:
    692     function_type f;
    693 
    694 public:
    695     action_tailrecurser()
    696     {
    697     }
    698 
    699     action_tailrecurser(function_type f)
    700         : f(std::move(f))
    701     {
    702     }
    703 
    704     inline void operator()(const schedulable& s, const recurse& r) {
    705         if (!f) {
    706             std::terminate();
    707         }
    708         trace_activity().action_enter(s);
    709         auto scope = s.set_recursed(r);
    710         while (s.is_subscribed()) {
    711             r.reset();
    712             f(s);
    713             if (!r.is_allowed() || !r.is_requested()) {
    714                 if (r.is_requested()) {
    715                     s.schedule();
    716                 }
    717                 break;
    718             }
    719             trace_activity().action_recurse(s);
    720         }
    721         trace_activity().action_return(s);
    722     }
    723 };
    724 }
    725 
    726 inline void action::operator()(const schedulable& s, const recurse& r) const {
    727     (*inner)(s, r);
    728 }
    729 
    730 inline action make_action_empty() {
    731     return action::empty();
    732 }
    733 
    734 template<class F>
    735 inline action make_action(F&& f) {
    736     static_assert(detail::is_action_function<F>::value, "action function must be void(schedulable)");
    737     auto fn = std::forward<F>(f);
    738     return action(std::make_shared<detail::action_type>(detail::action_tailrecurser(fn)));
    739 }
    740 
    741 // copy
    742 inline auto make_schedulable(
    743     const   schedulable& scbl)
    744     ->      schedulable {
    745     return  schedulable(scbl);
    746 }
    747 // move
    748 inline auto make_schedulable(
    749             schedulable&& scbl)
    750     ->      schedulable {
    751     return  schedulable(std::move(scbl));
    752 }
    753 
    754 inline schedulable make_schedulable(worker sc, action a) {
    755     return schedulable(sc, a);
    756 }
    757 inline schedulable make_schedulable(worker sc, composite_subscription cs, action a) {
    758     return schedulable(cs, sc, a);
    759 }
    760 
    761 template<class F>
    762 auto make_schedulable(worker sc, F&& f)
    763     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
    764     return schedulable(sc, make_action(std::forward<F>(f)));
    765 }
    766 template<class F>
    767 auto make_schedulable(worker sc, composite_subscription cs, F&& f)
    768     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
    769     return schedulable(cs, sc, make_action(std::forward<F>(f)));
    770 }
    771 template<class F>
    772 auto make_schedulable(schedulable scbl, composite_subscription cs, F&& f)
    773     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
    774     return schedulable(cs, scbl.get_worker(), make_action(std::forward<F>(f)));
    775 }
    776 template<class F>
    777 auto make_schedulable(schedulable scbl, worker sc, F&& f)
    778     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
    779     return schedulable(scbl, sc, make_action(std::forward<F>(f)));
    780 }
    781 template<class F>
    782 auto make_schedulable(schedulable scbl, F&& f)
    783     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
    784     return schedulable(scbl, scbl.get_worker(), make_action(std::forward<F>(f)));
    785 }
    786 
    787 inline auto make_schedulable(schedulable scbl, composite_subscription cs)
    788     -> schedulable {
    789     return schedulable(cs, scbl.get_worker(), scbl.get_action());
    790 }
    791 inline auto make_schedulable(schedulable scbl, worker sc, composite_subscription cs)
    792     -> schedulable {
    793     return schedulable(cs, sc, scbl.get_action());
    794 }
    795 inline auto make_schedulable(schedulable scbl, worker sc)
    796     -> schedulable {
    797     return schedulable(scbl, sc, scbl.get_action());
    798 }
    799 
    800 template<class Arg0, class... ArgN>
    801 auto worker::schedule(Arg0&& a0, ArgN&&... an) const
    802     -> typename std::enable_if<
    803         (detail::is_action_function<Arg0>::value ||
    804         is_subscription<Arg0>::value) &&
    805         !is_schedulable<Arg0>::value>::type {
    806     auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
    807     trace_activity().schedule_enter(*inner.get(), scbl);
    808     inner->schedule(std::move(scbl));
    809     trace_activity().schedule_return(*inner.get());
    810 }
    811 template<class... ArgN>
    812 void worker::schedule_rebind(const schedulable& scbl, ArgN&&... an) const {
    813     auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
    814     trace_activity().schedule_enter(*inner.get(), rescbl);
    815     inner->schedule(std::move(rescbl));
    816     trace_activity().schedule_return(*inner.get());
    817 }
    818 
    819 template<class Arg0, class... ArgN>
    820 auto worker::schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
    821     -> typename std::enable_if<
    822         (detail::is_action_function<Arg0>::value ||
    823         is_subscription<Arg0>::value) &&
    824         !is_schedulable<Arg0>::value>::type {
    825     auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
    826     trace_activity().schedule_when_enter(*inner.get(), when, scbl);
    827     inner->schedule(when, std::move(scbl));
    828     trace_activity().schedule_when_return(*inner.get());
    829 }
    830 template<class... ArgN>
    831 void worker::schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const {
    832     auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
    833     trace_activity().schedule_when_enter(*inner.get(), when, rescbl);
    834     inner->schedule(when, std::move(rescbl));
    835     trace_activity().schedule_when_return(*inner.get());
    836 }
    837 
    838 template<class Arg0, class... ArgN>
    839 auto worker::schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
    840     -> typename std::enable_if<
    841         (detail::is_action_function<Arg0>::value ||
    842         is_subscription<Arg0>::value) &&
    843         !is_schedulable<Arg0>::value>::type {
    844     schedule_periodically_rebind(initial, period, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
    845 }
    846 template<class... ArgN>
    847 void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const {
    848     auto keepAlive = *this;
    849     auto target = std::make_shared<clock_type::time_point>(initial);
    850     auto activity = make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...);
    851     auto periodic = make_schedulable(
    852         activity,
    853         [keepAlive, target, period, activity](schedulable self) {
    854             // any recursion requests will be pushed to the scheduler queue
    855             recursion r(false);
    856             // call action
    857             activity(r.get_recurse());
    858 
    859             // schedule next occurance (if the action took longer than 'period' target will be in the past)
    860             *target += period;
    861             self.schedule(*target);
    862         });
    863     trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
    864     inner->schedule(*target, periodic);
    865     trace_activity().schedule_when_return(*inner.get());
    866 }
    867 
    868 namespace detail {
    869 
    870 template<class TimePoint>
    871 struct time_schedulable
    872 {
    873     typedef TimePoint time_point_type;
    874 
    875     time_schedulable(TimePoint when, schedulable a)
    876         : when(when)
    877         , what(std::move(a))
    878     {
    879     }
    880     TimePoint when;
    881     schedulable what;
    882 };
    883 
    884 
    885 // Sorts time_schedulable items in priority order sorted
    886 // on value of time_schedulable.when. Items with equal
    887 // values for when are sorted in fifo order.
    888 template<class TimePoint>
    889 class schedulable_queue {
    890 public:
    891     typedef time_schedulable<TimePoint> item_type;
    892     typedef std::pair<item_type, int64_t> elem_type;
    893     typedef std::vector<elem_type> container_type;
    894     typedef const item_type& const_reference;
    895 
    896 private:
    897     struct compare_elem
    898     {
    899         bool operator()(const elem_type& lhs, const elem_type& rhs) const {
    900             if (lhs.first.when == rhs.first.when) {
    901                 return lhs.second > rhs.second;
    902             }
    903             else {
    904                 return lhs.first.when > rhs.first.when;
    905             }
    906         }
    907     };
    908 
    909     typedef std::priority_queue<
    910         elem_type,
    911         container_type,
    912         compare_elem
    913     > queue_type;
    914 
    915     queue_type q;
    916 
    917     int64_t ordinal;
    918 public:
    919 
    920     schedulable_queue()
    921         : ordinal(0)
    922     {
    923     }
    924 
    925     const_reference top() const {
    926         return q.top().first;
    927     }
    928 
    929     void pop() {
    930         q.pop();
    931     }
    932 
    933     bool empty() const {
    934         return q.empty();
    935     }
    936 
    937     void push(const item_type& value) {
    938         q.push(elem_type(value, ordinal++));
    939     }
    940 
    941     void push(item_type&& value) {
    942         q.push(elem_type(std::move(value), ordinal++));
    943     }
    944 };
    945 
    946 }
    947 
    948 }
    949 namespace rxsc=schedulers;
    950 
    951 }
    952 
    953 #include "schedulers/rx-currentthread.hpp"
    954 #include "schedulers/rx-runloop.hpp"
    955 #include "schedulers/rx-newthread.hpp"
    956 #include "schedulers/rx-eventloop.hpp"
    957 #include "schedulers/rx-immediate.hpp"
    958 #include "schedulers/rx-virtualtime.hpp"
    959 #include "schedulers/rx-sameworker.hpp"
    960 
    961 #endif
    962