1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_SCHEDULER_VIRTUAL_TIME_HPP) 6 #define RXCPP_RX_SCHEDULER_VIRTUAL_TIME_HPP 7 8 #include "../rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace schedulers { 13 14 namespace detail { 15 16 template<class Absolute, class Relative> 17 struct virtual_time_base : std::enable_shared_from_this<virtual_time_base<Absolute, Relative>> 18 { 19 private: 20 typedef virtual_time_base<Absolute, Relative> this_type; 21 virtual_time_base(const virtual_time_base&); 22 23 mutable bool isenabled; 24 25 public: 26 typedef Absolute absolute; 27 typedef Relative relative; 28 29 virtual ~virtual_time_base() 30 { 31 } 32 33 protected: 34 virtual_time_base() 35 : isenabled(false) 36 , clock_now(0) 37 { 38 } 39 explicit virtual_time_base(absolute initialClock) 40 : isenabled(false) 41 , clock_now(initialClock) 42 { 43 } 44 45 mutable absolute clock_now; 46 47 typedef time_schedulable<long> item_type; 48 49 virtual absolute add(absolute, relative) const =0; 50 51 virtual typename scheduler_base::clock_type::time_point to_time_point(absolute) const =0; 52 virtual relative to_relative(typename scheduler_base::clock_type::duration) const =0; 53 54 virtual item_type top() const =0; 55 virtual void pop() const =0; 56 virtual bool empty() const =0; 57 58 public: 59 60 virtual void schedule_absolute(absolute, const schedulable&) const =0; 61 62 virtual void schedule_relative(relative when, const schedulable& a) const { 63 auto at = add(clock_now, when); 64 return schedule_absolute(at, a); 65 } 66 67 bool is_enabled() const {return isenabled;} 68 absolute clock() const {return clock_now;} 69 70 void start() const 71 { 72 if (!isenabled) { 73 isenabled = true; 74 rxsc::recursion r; 75 r.reset(false); 76 while (!empty() && isenabled) { 77 auto next = top(); 78 pop(); 79 if (next.what.is_subscribed()) { 80 if (next.when > clock_now) { 81 clock_now = next.when; 82 } 83 next.what(r.get_recurse()); 84 } 85 } 86 isenabled = false; 87 } 88 } 89 90 void stop() const 91 { 92 isenabled = false; 93 } 94 95 void advance_to(absolute time) const 96 { 97 if (time < clock_now) { 98 std::terminate(); 99 } 100 101 if (time == clock_now) { 102 return; 103 } 104 105 if (!isenabled) { 106 isenabled = true; 107 rxsc::recursion r; 108 while (!empty() && isenabled) { 109 auto next = top(); 110 if (next.when <= time) { 111 pop(); 112 if (!next.what.is_subscribed()) { 113 continue; 114 } 115 if (next.when > clock_now) { 116 clock_now = next.when; 117 } 118 next.what(r.get_recurse()); 119 } 120 else { 121 break; 122 } 123 } 124 isenabled = false; 125 clock_now = time; 126 } 127 else { 128 std::terminate(); 129 } 130 } 131 132 void advance_by(relative time) const 133 { 134 auto dt = add(clock_now, time); 135 136 if (dt < clock_now) { 137 std::terminate(); 138 } 139 140 if (dt == clock_now) { 141 return; 142 } 143 144 if (!isenabled) { 145 advance_to(dt); 146 } 147 else { 148 std::terminate(); 149 } 150 } 151 152 void sleep(relative time) const 153 { 154 auto dt = add(clock_now, time); 155 156 if (dt < clock_now) { 157 std::terminate(); 158 } 159 160 clock_now = dt; 161 } 162 163 }; 164 165 } 166 167 template<class Absolute, class Relative> 168 struct virtual_time : public detail::virtual_time_base<Absolute, Relative> 169 { 170 typedef detail::virtual_time_base<Absolute, Relative> base; 171 172 typedef typename base::item_type item_type; 173 174 typedef detail::schedulable_queue< 175 typename item_type::time_point_type> queue_item_time; 176 177 mutable queue_item_time q; 178 179 public: 180 virtual ~virtual_time() 181 { 182 } 183 184 protected: 185 virtual_time() 186 { 187 } 188 explicit virtual_time(typename base::absolute initialClock) 189 : base(initialClock) 190 { 191 } 192 193 virtual item_type top() const { 194 return q.top(); 195 } 196 virtual void pop() const { 197 q.pop(); 198 } 199 virtual bool empty() const { 200 return q.empty(); 201 } 202 203 using base::schedule_absolute; 204 using base::schedule_relative; 205 206 virtual void schedule_absolute(typename base::absolute when, const schedulable& a) const 207 { 208 // use a separate subscription here so that a's subscription is not affected 209 auto run = make_schedulable( 210 a.get_worker(), 211 composite_subscription(), 212 [a](const schedulable& scbl) { 213 rxsc::recursion r; 214 r.reset(false); 215 if (scbl.is_subscribed()) { 216 scbl.unsubscribe(); // unsubscribe() run, not a; 217 a(r.get_recurse()); 218 } 219 }); 220 q.push(item_type(when, run)); 221 } 222 }; 223 224 225 226 } 227 228 } 229 230 #endif 231