Home | History | Annotate | Download | only in schedulers
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_SCHEDULER_VIRTUAL_TIME_HPP)
      6 #define RXCPP_RX_SCHEDULER_VIRTUAL_TIME_HPP
      7 
      8 #include "../rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace schedulers {
     13 
     14 namespace detail {
     15 
     16 template<class Absolute, class Relative>
     17 struct virtual_time_base : std::enable_shared_from_this<virtual_time_base<Absolute, Relative>>
     18 {
     19 private:
     20     typedef virtual_time_base<Absolute, Relative> this_type;
     21     virtual_time_base(const virtual_time_base&);
     22 
     23     mutable bool isenabled;
     24 
     25 public:
     26     typedef Absolute absolute;
     27     typedef Relative relative;
     28 
     29     virtual ~virtual_time_base()
     30     {
     31     }
     32 
     33 protected:
     34     virtual_time_base()
     35         : isenabled(false)
     36         , clock_now(0)
     37     {
     38     }
     39     explicit virtual_time_base(absolute initialClock)
     40         : isenabled(false)
     41         , clock_now(initialClock)
     42     {
     43     }
     44 
     45     mutable absolute clock_now;
     46 
     47     typedef time_schedulable<long> item_type;
     48 
     49     virtual absolute add(absolute, relative) const =0;
     50 
     51     virtual typename scheduler_base::clock_type::time_point to_time_point(absolute) const =0;
     52     virtual relative to_relative(typename scheduler_base::clock_type::duration) const =0;
     53 
     54     virtual item_type top() const =0;
     55     virtual void pop() const =0;
     56     virtual bool empty() const =0;
     57 
     58 public:
     59 
     60     virtual void schedule_absolute(absolute, const schedulable&) const =0;
     61 
     62     virtual void schedule_relative(relative when, const schedulable& a) const {
     63         auto at = add(clock_now, when);
     64         return schedule_absolute(at, a);
     65     }
     66 
     67     bool is_enabled() const {return isenabled;}
     68     absolute clock() const {return clock_now;}
     69 
     70     void start() const
     71     {
     72         if (!isenabled) {
     73             isenabled = true;
     74             rxsc::recursion r;
     75             r.reset(false);
     76             while (!empty() && isenabled) {
     77                 auto next = top();
     78                 pop();
     79                 if (next.what.is_subscribed()) {
     80                     if (next.when > clock_now) {
     81                         clock_now = next.when;
     82                     }
     83                     next.what(r.get_recurse());
     84                 }
     85             }
     86             isenabled = false;
     87         }
     88     }
     89 
     90     void stop() const
     91     {
     92         isenabled = false;
     93     }
     94 
     95     void advance_to(absolute time) const
     96     {
     97         if (time < clock_now) {
     98             std::terminate();
     99         }
    100 
    101         if (time == clock_now) {
    102             return;
    103         }
    104 
    105         if (!isenabled) {
    106             isenabled = true;
    107             rxsc::recursion r;
    108             while (!empty() && isenabled) {
    109                 auto next = top();
    110                 if (next.when <= time) {
    111                     pop();
    112                     if (!next.what.is_subscribed()) {
    113                         continue;
    114                     }
    115                     if (next.when > clock_now) {
    116                         clock_now = next.when;
    117                     }
    118                     next.what(r.get_recurse());
    119                 }
    120                 else {
    121                     break;
    122                 }
    123             }
    124             isenabled = false;
    125             clock_now = time;
    126         }
    127         else {
    128             std::terminate();
    129         }
    130     }
    131 
    132     void advance_by(relative time) const
    133     {
    134         auto dt = add(clock_now, time);
    135 
    136         if (dt < clock_now) {
    137             std::terminate();
    138         }
    139 
    140         if (dt == clock_now) {
    141             return;
    142         }
    143 
    144         if (!isenabled) {
    145             advance_to(dt);
    146         }
    147         else {
    148             std::terminate();
    149         }
    150     }
    151 
    152     void sleep(relative time) const
    153     {
    154         auto dt = add(clock_now, time);
    155 
    156         if (dt < clock_now) {
    157             std::terminate();
    158         }
    159 
    160         clock_now = dt;
    161     }
    162 
    163 };
    164 
    165 }
    166 
    167 template<class Absolute, class Relative>
    168 struct virtual_time : public detail::virtual_time_base<Absolute, Relative>
    169 {
    170     typedef detail::virtual_time_base<Absolute, Relative> base;
    171 
    172     typedef typename base::item_type item_type;
    173 
    174     typedef detail::schedulable_queue<
    175         typename item_type::time_point_type> queue_item_time;
    176 
    177     mutable queue_item_time q;
    178 
    179 public:
    180     virtual ~virtual_time()
    181     {
    182     }
    183 
    184 protected:
    185     virtual_time()
    186     {
    187     }
    188     explicit virtual_time(typename base::absolute initialClock)
    189         : base(initialClock)
    190     {
    191     }
    192 
    193     virtual item_type top() const {
    194         return q.top();
    195     }
    196     virtual void pop() const {
    197         q.pop();
    198     }
    199     virtual bool empty() const {
    200         return q.empty();
    201     }
    202 
    203     using base::schedule_absolute;
    204     using base::schedule_relative;
    205 
    206     virtual void schedule_absolute(typename base::absolute when, const schedulable& a) const
    207     {
    208         // use a separate subscription here so that a's subscription is not affected
    209         auto run = make_schedulable(
    210             a.get_worker(),
    211             composite_subscription(),
    212             [a](const schedulable& scbl) {
    213                 rxsc::recursion r;
    214                 r.reset(false);
    215                 if (scbl.is_subscribed()) {
    216                     scbl.unsubscribe(); // unsubscribe() run, not a;
    217                     a(r.get_recurse());
    218                 }
    219             });
    220         q.push(item_type(when, run));
    221     }
    222 };
    223 
    224 
    225 
    226 }
    227 
    228 }
    229 
    230 #endif
    231