1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_SCHEDULER_HPP) 6 #define RXCPP_RX_SCHEDULER_HPP 7 8 #include "rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace schedulers { 13 14 class worker_interface; 15 class scheduler_interface; 16 17 namespace detail { 18 19 class action_type; 20 typedef std::shared_ptr<action_type> action_ptr; 21 22 typedef std::shared_ptr<worker_interface> worker_interface_ptr; 23 typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr; 24 25 typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr; 26 typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr; 27 28 typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr; 29 typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr; 30 31 inline action_ptr shared_empty() { 32 static action_ptr shared_empty = std::make_shared<detail::action_type>(); 33 return shared_empty; 34 } 35 36 } 37 38 // It is essential to keep virtual function calls out of an inner loop. 39 // To make tail-recursion work efficiently the recursion objects create 40 // a space on the stack inside the virtual function call in the actor that 41 // allows the callback and the scheduler to share stack space that records 42 // the request and the allowance without any virtual calls in the loop. 43 44 /// recursed is set on a schedulable by the action to allow the called 45 /// function to request to be rescheduled. 46 class recursed 47 { 48 bool& isrequested; 49 recursed operator=(const recursed&); 50 public: 51 explicit recursed(bool& r) 52 : isrequested(r) 53 { 54 } 55 /// request to be rescheduled 56 inline void operator()() const { 57 isrequested = true; 58 } 59 }; 60 61 /// recurse is passed to the action by the scheduler. 62 /// the action uses recurse to coordinate the scheduler and the function. 63 class recurse 64 { 65 bool& isallowed; 66 mutable bool isrequested; 67 recursed requestor; 68 recurse operator=(const recurse&); 69 public: 70 explicit recurse(bool& a) 71 : isallowed(a) 72 , isrequested(true) 73 , requestor(isrequested) 74 { 75 } 76 /// does the scheduler allow tail-recursion now? 77 inline bool is_allowed() const { 78 return isallowed; 79 } 80 /// did the function request to be recursed? 81 inline bool is_requested() const { 82 return isrequested; 83 } 84 /// reset the function request. call before each call to the function. 85 inline void reset() const { 86 isrequested = false; 87 } 88 /// get the recursed to set into the schedulable for the function to use to request recursion 89 inline const recursed& get_recursed() const { 90 return requestor; 91 } 92 }; 93 94 /// recursion is used by the scheduler to signal to each action whether tail recursion is allowed. 95 class recursion 96 { 97 mutable bool isallowed; 98 recurse recursor; 99 recursion operator=(const recursion&); 100 public: 101 recursion() 102 : isallowed(true) 103 , recursor(isallowed) 104 { 105 } 106 explicit recursion(bool b) 107 : isallowed(b) 108 , recursor(isallowed) 109 { 110 } 111 /// set whether tail-recursion is allowed 112 inline void reset(bool b = true) const { 113 isallowed = b; 114 } 115 /// get the recurse to pass into each action being called 116 inline const recurse& get_recurse() const { 117 return recursor; 118 } 119 }; 120 121 122 struct action_base 123 { 124 typedef tag_action action_tag; 125 }; 126 127 class schedulable; 128 129 /// action provides type-forgetting for a potentially recursive set of calls to a function that takes a schedulable 130 class action : public action_base 131 { 132 typedef action this_type; 133 detail::action_ptr inner; 134 public: 135 action() 136 { 137 } 138 explicit action(detail::action_ptr i) 139 : inner(std::move(i)) 140 { 141 } 142 143 /// return the empty action 144 inline static action empty() { 145 return action(detail::shared_empty()); 146 } 147 148 /// call the function 149 inline void operator()(const schedulable& s, const recurse& r) const; 150 }; 151 152 struct scheduler_base 153 { 154 typedef std::chrono::steady_clock clock_type; 155 typedef tag_scheduler scheduler_tag; 156 }; 157 158 struct worker_base : public subscription_base 159 { 160 typedef tag_worker worker_tag; 161 }; 162 163 class worker_interface 164 : public std::enable_shared_from_this<worker_interface> 165 { 166 typedef worker_interface this_type; 167 168 public: 169 typedef scheduler_base::clock_type clock_type; 170 171 virtual ~worker_interface() {} 172 173 virtual clock_type::time_point now() const = 0; 174 175 virtual void schedule(const schedulable& scbl) const = 0; 176 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const = 0; 177 }; 178 179 namespace detail { 180 181 template<class F> 182 struct is_action_function 183 { 184 struct not_void {}; 185 template<class CF> 186 static auto check(int) -> decltype((*(CF*)nullptr)(*(schedulable*)nullptr)); 187 template<class CF> 188 static not_void check(...); 189 190 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value; 191 }; 192 193 } 194 195 class weak_worker; 196 197 /// a worker ensures that all scheduled actions on the same instance are executed in-order with no overlap 198 /// a worker ensures that all scheduled actions are unsubscribed when it is unsubscribed 199 /// some inner implementations will impose additional constraints on the execution of items. 200 class worker : public worker_base 201 { 202 typedef worker this_type; 203 detail::worker_interface_ptr inner; 204 composite_subscription lifetime; 205 friend bool operator==(const worker&, const worker&); 206 friend class weak_worker; 207 public: 208 typedef scheduler_base::clock_type clock_type; 209 typedef composite_subscription::weak_subscription weak_subscription; 210 211 worker() 212 { 213 } 214 worker(composite_subscription cs, detail::const_worker_interface_ptr i) 215 : inner(std::const_pointer_cast<worker_interface>(i)) 216 , lifetime(std::move(cs)) 217 { 218 } 219 worker(composite_subscription cs, worker o) 220 : inner(o.inner) 221 , lifetime(std::move(cs)) 222 { 223 } 224 225 inline const composite_subscription& get_subscription() const { 226 return lifetime; 227 } 228 inline composite_subscription& get_subscription() { 229 return lifetime; 230 } 231 232 // composite_subscription 233 // 234 inline bool is_subscribed() const { 235 return lifetime.is_subscribed(); 236 } 237 inline weak_subscription add(subscription s) const { 238 return lifetime.add(std::move(s)); 239 } 240 inline void remove(weak_subscription w) const { 241 return lifetime.remove(std::move(w)); 242 } 243 inline void clear() const { 244 return lifetime.clear(); 245 } 246 inline void unsubscribe() const { 247 return lifetime.unsubscribe(); 248 } 249 250 // worker_interface 251 // 252 /// return the current time for this worker 253 inline clock_type::time_point now() const { 254 return inner->now(); 255 } 256 257 /// insert the supplied schedulable to be run as soon as possible 258 inline void schedule(const schedulable& scbl) const { 259 // force rebinding scbl to this worker 260 schedule_rebind(scbl); 261 } 262 263 /// insert the supplied schedulable to be run at the time specified 264 inline void schedule(clock_type::time_point when, const schedulable& scbl) const { 265 // force rebinding scbl to this worker 266 schedule_rebind(when, scbl); 267 } 268 269 // helpers 270 // 271 272 /// insert the supplied schedulable to be run at now() + the delay specified 273 inline void schedule(clock_type::duration when, const schedulable& scbl) const { 274 // force rebinding scbl to this worker 275 schedule_rebind(now() + when, scbl); 276 } 277 278 /// insert the supplied schedulable to be run at the initial time specified and then again at initial + (N * period) 279 /// this will continue until the worker or schedulable is unsubscribed. 280 inline void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl) const { 281 // force rebinding scbl to this worker 282 schedule_periodically_rebind(initial, period, scbl); 283 } 284 285 /// insert the supplied schedulable to be run at now() + the initial delay specified and then again at now() + initial + (N * period) 286 /// this will continue until the worker or schedulable is unsubscribed. 287 inline void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable& scbl) const { 288 // force rebinding scbl to this worker 289 schedule_periodically_rebind(now() + initial, period, scbl); 290 } 291 292 /// use the supplied arguments to make a schedulable and then insert it to be run 293 template<class Arg0, class... ArgN> 294 auto schedule(Arg0&& a0, ArgN&&... an) const 295 -> typename std::enable_if< 296 (detail::is_action_function<Arg0>::value || 297 is_subscription<Arg0>::value) && 298 !is_schedulable<Arg0>::value>::type; 299 template<class... ArgN> 300 /// use the supplied arguments to make a schedulable and then insert it to be run 301 void schedule_rebind(const schedulable& scbl, ArgN&&... an) const; 302 303 /// use the supplied arguments to make a schedulable and then insert it to be run 304 template<class Arg0, class... ArgN> 305 auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const 306 -> typename std::enable_if< 307 (detail::is_action_function<Arg0>::value || 308 is_subscription<Arg0>::value) && 309 !is_schedulable<Arg0>::value>::type; 310 /// use the supplied arguments to make a schedulable and then insert it to be run 311 template<class... ArgN> 312 void schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const; 313 314 /// use the supplied arguments to make a schedulable and then insert it to be run 315 template<class Arg0, class... ArgN> 316 auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const 317 -> typename std::enable_if< 318 (detail::is_action_function<Arg0>::value || 319 is_subscription<Arg0>::value) && 320 !is_schedulable<Arg0>::value>::type; 321 /// use the supplied arguments to make a schedulable and then insert it to be run 322 template<class... ArgN> 323 void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const; 324 }; 325 326 inline bool operator==(const worker& lhs, const worker& rhs) { 327 return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime; 328 } 329 inline bool operator!=(const worker& lhs, const worker& rhs) { 330 return !(lhs == rhs); 331 } 332 333 class weak_worker 334 { 335 detail::worker_interface_weak_ptr inner; 336 composite_subscription lifetime; 337 338 public: 339 weak_worker() 340 { 341 } 342 explicit weak_worker(worker& owner) 343 : inner(owner.inner) 344 , lifetime(owner.lifetime) 345 { 346 } 347 348 worker lock() const { 349 return worker(lifetime, inner.lock()); 350 } 351 }; 352 353 class scheduler_interface 354 : public std::enable_shared_from_this<scheduler_interface> 355 { 356 typedef scheduler_interface this_type; 357 358 public: 359 typedef scheduler_base::clock_type clock_type; 360 361 virtual ~scheduler_interface() {} 362 363 virtual clock_type::time_point now() const = 0; 364 365 virtual worker create_worker(composite_subscription cs) const = 0; 366 }; 367 368 369 struct schedulable_base : 370 // public subscription_base, <- already in worker base 371 public worker_base, 372 public action_base 373 { 374 typedef tag_schedulable schedulable_tag; 375 }; 376 377 /*! 378 \brief allows functions to be called at specified times and possibly in other contexts. 379 380 \ingroup group-core 381 382 */ 383 class scheduler : public scheduler_base 384 { 385 typedef scheduler this_type; 386 detail::scheduler_interface_ptr inner; 387 friend bool operator==(const scheduler&, const scheduler&); 388 public: 389 typedef scheduler_base::clock_type clock_type; 390 391 scheduler() 392 { 393 } 394 explicit scheduler(detail::scheduler_interface_ptr i) 395 : inner(std::move(i)) 396 { 397 } 398 explicit scheduler(detail::const_scheduler_interface_ptr i) 399 : inner(std::const_pointer_cast<scheduler_interface>(i)) 400 { 401 } 402 403 /// return the current time for this scheduler 404 inline clock_type::time_point now() const { 405 return inner->now(); 406 } 407 /// create a worker with a lifetime. 408 /// when the worker is unsubscribed all scheduled items will be unsubscribed. 409 /// items scheduled to a worker will be run one at a time. 410 /// scheduling order is preserved: when more than one item is scheduled for 411 /// time T then at time T they will be run in the order that they were scheduled. 412 inline worker create_worker(composite_subscription cs = composite_subscription()) const { 413 return inner->create_worker(cs); 414 } 415 }; 416 417 template<class Scheduler, class... ArgN> 418 inline scheduler make_scheduler(ArgN&&... an) { 419 return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...))); 420 } 421 422 inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) { 423 return scheduler(si); 424 } 425 426 class schedulable : public schedulable_base 427 { 428 typedef schedulable this_type; 429 430 composite_subscription lifetime; 431 weak_worker controller; 432 action activity; 433 bool scoped; 434 composite_subscription::weak_subscription action_scope; 435 436 struct detacher 437 { 438 ~detacher() 439 { 440 if (that) { 441 that->unsubscribe(); 442 } 443 } 444 detacher(const this_type* that) 445 : that(that) 446 { 447 } 448 const this_type* that; 449 }; 450 451 class recursed_scope_type 452 { 453 mutable const recursed* requestor; 454 455 class exit_recursed_scope_type 456 { 457 const recursed_scope_type* that; 458 public: 459 ~exit_recursed_scope_type() 460 { 461 if (that != nullptr) { 462 that->requestor = nullptr; 463 } 464 } 465 exit_recursed_scope_type(const recursed_scope_type* that) 466 : that(that) 467 { 468 } 469 exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT 470 : that(other.that) 471 { 472 other.that = nullptr; 473 } 474 }; 475 public: 476 recursed_scope_type() 477 : requestor(nullptr) 478 { 479 } 480 recursed_scope_type(const recursed_scope_type&) 481 : requestor(nullptr) 482 { 483 // does not aquire recursion scope 484 } 485 recursed_scope_type& operator=(const recursed_scope_type& ) 486 { 487 // no change in recursion scope 488 return *this; 489 } 490 exit_recursed_scope_type reset(const recurse& r) const { 491 requestor = std::addressof(r.get_recursed()); 492 return exit_recursed_scope_type(this); 493 } 494 bool is_recursed() const { 495 return !!requestor; 496 } 497 void operator()() const { 498 (*requestor)(); 499 } 500 }; 501 recursed_scope_type recursed_scope; 502 503 public: 504 typedef composite_subscription::weak_subscription weak_subscription; 505 typedef scheduler_base::clock_type clock_type; 506 507 ~schedulable() 508 { 509 if (scoped) { 510 controller.lock().remove(action_scope); 511 } 512 } 513 schedulable() 514 : scoped(false) 515 { 516 } 517 518 /// action and worker share lifetime 519 schedulable(worker q, action a) 520 : lifetime(q.get_subscription()) 521 , controller(q) 522 , activity(std::move(a)) 523 , scoped(false) 524 { 525 } 526 /// action and worker have independent lifetimes 527 schedulable(composite_subscription cs, worker q, action a) 528 : lifetime(std::move(cs)) 529 , controller(q) 530 , activity(std::move(a)) 531 , scoped(true) 532 , action_scope(controller.lock().add(lifetime)) 533 { 534 } 535 /// inherit lifetimes 536 schedulable(schedulable scbl, worker q, action a) 537 : lifetime(scbl.get_subscription()) 538 , controller(q) 539 , activity(std::move(a)) 540 , scoped(scbl.scoped) 541 , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription()) 542 { 543 } 544 545 inline const composite_subscription& get_subscription() const { 546 return lifetime; 547 } 548 inline composite_subscription& get_subscription() { 549 return lifetime; 550 } 551 inline const worker get_worker() const { 552 return controller.lock(); 553 } 554 inline worker get_worker() { 555 return controller.lock(); 556 } 557 inline const action& get_action() const { 558 return activity; 559 } 560 inline action& get_action() { 561 return activity; 562 } 563 564 inline static schedulable empty(worker sc) { 565 return schedulable(composite_subscription::empty(), sc, action::empty()); 566 } 567 568 inline auto set_recursed(const recurse& r) const 569 -> decltype(recursed_scope.reset(r)) { 570 return recursed_scope.reset(r); 571 } 572 573 // recursed 574 // 575 bool is_recursed() const { 576 return recursed_scope.is_recursed(); 577 } 578 /// requests tail-recursion of the same action 579 /// this will exit the process if called when 580 /// is_recursed() is false. 581 /// Note: to improve perf it is not required 582 /// to call is_recursed() before calling this 583 /// operator. Context is sufficient. The schedulable 584 /// passed to the action by the scheduler will return 585 /// true from is_recursed() 586 inline void operator()() const { 587 recursed_scope(); 588 } 589 590 // composite_subscription 591 // 592 inline bool is_subscribed() const { 593 return lifetime.is_subscribed(); 594 } 595 inline weak_subscription add(subscription s) const { 596 return lifetime.add(std::move(s)); 597 } 598 template<class F> 599 auto add(F f) const 600 -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type { 601 return lifetime.add(make_subscription(std::move(f))); 602 } 603 inline void remove(weak_subscription w) const { 604 return lifetime.remove(std::move(w)); 605 } 606 inline void clear() const { 607 return lifetime.clear(); 608 } 609 inline void unsubscribe() const { 610 return lifetime.unsubscribe(); 611 } 612 613 // scheduler 614 // 615 inline clock_type::time_point now() const { 616 return controller.lock().now(); 617 } 618 /// put this on the queue of the stored scheduler to run asap 619 inline void schedule() const { 620 if (is_subscribed()) { 621 get_worker().schedule(*this); 622 } 623 } 624 /// put this on the queue of the stored scheduler to run at the specified time 625 inline void schedule(clock_type::time_point when) const { 626 if (is_subscribed()) { 627 get_worker().schedule(when, *this); 628 } 629 } 630 /// put this on the queue of the stored scheduler to run after a delay from now 631 inline void schedule(clock_type::duration when) const { 632 if (is_subscribed()) { 633 get_worker().schedule(when, *this); 634 } 635 } 636 637 // action 638 // 639 /// invokes the action 640 inline void operator()(const recurse& r) const { 641 if (!is_subscribed()) { 642 return; 643 } 644 detacher protect(this); 645 activity(*this, r); 646 protect.that = nullptr; 647 } 648 }; 649 650 struct current_thread; 651 652 namespace detail { 653 654 class action_type 655 : public std::enable_shared_from_this<action_type> 656 { 657 typedef action_type this_type; 658 659 public: 660 typedef std::function<void(const schedulable&, const recurse&)> function_type; 661 662 private: 663 function_type f; 664 665 public: 666 action_type() 667 { 668 } 669 670 action_type(function_type f) 671 : f(std::move(f)) 672 { 673 } 674 675 inline void operator()(const schedulable& s, const recurse& r) { 676 if (!f) { 677 std::terminate(); 678 } 679 f(s, r); 680 } 681 }; 682 683 class action_tailrecurser 684 : public std::enable_shared_from_this<action_type> 685 { 686 typedef action_type this_type; 687 688 public: 689 typedef std::function<void(const schedulable&)> function_type; 690 691 private: 692 function_type f; 693 694 public: 695 action_tailrecurser() 696 { 697 } 698 699 action_tailrecurser(function_type f) 700 : f(std::move(f)) 701 { 702 } 703 704 inline void operator()(const schedulable& s, const recurse& r) { 705 if (!f) { 706 std::terminate(); 707 } 708 trace_activity().action_enter(s); 709 auto scope = s.set_recursed(r); 710 while (s.is_subscribed()) { 711 r.reset(); 712 f(s); 713 if (!r.is_allowed() || !r.is_requested()) { 714 if (r.is_requested()) { 715 s.schedule(); 716 } 717 break; 718 } 719 trace_activity().action_recurse(s); 720 } 721 trace_activity().action_return(s); 722 } 723 }; 724 } 725 726 inline void action::operator()(const schedulable& s, const recurse& r) const { 727 (*inner)(s, r); 728 } 729 730 inline action make_action_empty() { 731 return action::empty(); 732 } 733 734 template<class F> 735 inline action make_action(F&& f) { 736 static_assert(detail::is_action_function<F>::value, "action function must be void(schedulable)"); 737 auto fn = std::forward<F>(f); 738 return action(std::make_shared<detail::action_type>(detail::action_tailrecurser(fn))); 739 } 740 741 // copy 742 inline auto make_schedulable( 743 const schedulable& scbl) 744 -> schedulable { 745 return schedulable(scbl); 746 } 747 // move 748 inline auto make_schedulable( 749 schedulable&& scbl) 750 -> schedulable { 751 return schedulable(std::move(scbl)); 752 } 753 754 inline schedulable make_schedulable(worker sc, action a) { 755 return schedulable(sc, a); 756 } 757 inline schedulable make_schedulable(worker sc, composite_subscription cs, action a) { 758 return schedulable(cs, sc, a); 759 } 760 761 template<class F> 762 auto make_schedulable(worker sc, F&& f) 763 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type { 764 return schedulable(sc, make_action(std::forward<F>(f))); 765 } 766 template<class F> 767 auto make_schedulable(worker sc, composite_subscription cs, F&& f) 768 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type { 769 return schedulable(cs, sc, make_action(std::forward<F>(f))); 770 } 771 template<class F> 772 auto make_schedulable(schedulable scbl, composite_subscription cs, F&& f) 773 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type { 774 return schedulable(cs, scbl.get_worker(), make_action(std::forward<F>(f))); 775 } 776 template<class F> 777 auto make_schedulable(schedulable scbl, worker sc, F&& f) 778 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type { 779 return schedulable(scbl, sc, make_action(std::forward<F>(f))); 780 } 781 template<class F> 782 auto make_schedulable(schedulable scbl, F&& f) 783 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type { 784 return schedulable(scbl, scbl.get_worker(), make_action(std::forward<F>(f))); 785 } 786 787 inline auto make_schedulable(schedulable scbl, composite_subscription cs) 788 -> schedulable { 789 return schedulable(cs, scbl.get_worker(), scbl.get_action()); 790 } 791 inline auto make_schedulable(schedulable scbl, worker sc, composite_subscription cs) 792 -> schedulable { 793 return schedulable(cs, sc, scbl.get_action()); 794 } 795 inline auto make_schedulable(schedulable scbl, worker sc) 796 -> schedulable { 797 return schedulable(scbl, sc, scbl.get_action()); 798 } 799 800 template<class Arg0, class... ArgN> 801 auto worker::schedule(Arg0&& a0, ArgN&&... an) const 802 -> typename std::enable_if< 803 (detail::is_action_function<Arg0>::value || 804 is_subscription<Arg0>::value) && 805 !is_schedulable<Arg0>::value>::type { 806 auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...); 807 trace_activity().schedule_enter(*inner.get(), scbl); 808 inner->schedule(std::move(scbl)); 809 trace_activity().schedule_return(*inner.get()); 810 } 811 template<class... ArgN> 812 void worker::schedule_rebind(const schedulable& scbl, ArgN&&... an) const { 813 auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...); 814 trace_activity().schedule_enter(*inner.get(), rescbl); 815 inner->schedule(std::move(rescbl)); 816 trace_activity().schedule_return(*inner.get()); 817 } 818 819 template<class Arg0, class... ArgN> 820 auto worker::schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const 821 -> typename std::enable_if< 822 (detail::is_action_function<Arg0>::value || 823 is_subscription<Arg0>::value) && 824 !is_schedulable<Arg0>::value>::type { 825 auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...); 826 trace_activity().schedule_when_enter(*inner.get(), when, scbl); 827 inner->schedule(when, std::move(scbl)); 828 trace_activity().schedule_when_return(*inner.get()); 829 } 830 template<class... ArgN> 831 void worker::schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const { 832 auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...); 833 trace_activity().schedule_when_enter(*inner.get(), when, rescbl); 834 inner->schedule(when, std::move(rescbl)); 835 trace_activity().schedule_when_return(*inner.get()); 836 } 837 838 template<class Arg0, class... ArgN> 839 auto worker::schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const 840 -> typename std::enable_if< 841 (detail::is_action_function<Arg0>::value || 842 is_subscription<Arg0>::value) && 843 !is_schedulable<Arg0>::value>::type { 844 schedule_periodically_rebind(initial, period, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...)); 845 } 846 template<class... ArgN> 847 void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const { 848 auto keepAlive = *this; 849 auto target = std::make_shared<clock_type::time_point>(initial); 850 auto activity = make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...); 851 auto periodic = make_schedulable( 852 activity, 853 [keepAlive, target, period, activity](schedulable self) { 854 // any recursion requests will be pushed to the scheduler queue 855 recursion r(false); 856 // call action 857 activity(r.get_recurse()); 858 859 // schedule next occurance (if the action took longer than 'period' target will be in the past) 860 *target += period; 861 self.schedule(*target); 862 }); 863 trace_activity().schedule_when_enter(*inner.get(), *target, periodic); 864 inner->schedule(*target, periodic); 865 trace_activity().schedule_when_return(*inner.get()); 866 } 867 868 namespace detail { 869 870 template<class TimePoint> 871 struct time_schedulable 872 { 873 typedef TimePoint time_point_type; 874 875 time_schedulable(TimePoint when, schedulable a) 876 : when(when) 877 , what(std::move(a)) 878 { 879 } 880 TimePoint when; 881 schedulable what; 882 }; 883 884 885 // Sorts time_schedulable items in priority order sorted 886 // on value of time_schedulable.when. Items with equal 887 // values for when are sorted in fifo order. 888 template<class TimePoint> 889 class schedulable_queue { 890 public: 891 typedef time_schedulable<TimePoint> item_type; 892 typedef std::pair<item_type, int64_t> elem_type; 893 typedef std::vector<elem_type> container_type; 894 typedef const item_type& const_reference; 895 896 private: 897 struct compare_elem 898 { 899 bool operator()(const elem_type& lhs, const elem_type& rhs) const { 900 if (lhs.first.when == rhs.first.when) { 901 return lhs.second > rhs.second; 902 } 903 else { 904 return lhs.first.when > rhs.first.when; 905 } 906 } 907 }; 908 909 typedef std::priority_queue< 910 elem_type, 911 container_type, 912 compare_elem 913 > queue_type; 914 915 queue_type q; 916 917 int64_t ordinal; 918 public: 919 920 schedulable_queue() 921 : ordinal(0) 922 { 923 } 924 925 const_reference top() const { 926 return q.top().first; 927 } 928 929 void pop() { 930 q.pop(); 931 } 932 933 bool empty() const { 934 return q.empty(); 935 } 936 937 void push(const item_type& value) { 938 q.push(elem_type(value, ordinal++)); 939 } 940 941 void push(item_type&& value) { 942 q.push(elem_type(std::move(value), ordinal++)); 943 } 944 }; 945 946 } 947 948 } 949 namespace rxsc=schedulers; 950 951 } 952 953 #include "schedulers/rx-currentthread.hpp" 954 #include "schedulers/rx-runloop.hpp" 955 #include "schedulers/rx-newthread.hpp" 956 #include "schedulers/rx-eventloop.hpp" 957 #include "schedulers/rx-immediate.hpp" 958 #include "schedulers/rx-virtualtime.hpp" 959 #include "schedulers/rx-sameworker.hpp" 960 961 #endif 962