Home | History | Annotate | Download | only in schedulers
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
      6 #define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP
      7 
      8 #include "../rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace schedulers {
     13 
     14 struct event_loop : public scheduler_interface
     15 {
     16 private:
     17     typedef event_loop this_type;
     18     event_loop(const this_type&);
     19 
     20     struct loop_worker : public worker_interface
     21     {
     22     private:
     23         typedef loop_worker this_type;
     24         loop_worker(const this_type&);
     25 
     26         typedef detail::schedulable_queue<
     27             typename clock_type::time_point> queue_item_time;
     28 
     29         typedef queue_item_time::item_type item_type;
     30 
     31         composite_subscription lifetime;
     32         worker controller;
     33         std::shared_ptr<const scheduler_interface> alive;
     34 
     35     public:
     36         virtual ~loop_worker()
     37         {
     38         }
     39         loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive)
     40             : lifetime(cs)
     41             , controller(w)
     42             , alive(alive)
     43         {
     44             auto token = controller.add(cs);
     45             cs.add([token, w](){
     46                 w.remove(token);
     47             });
     48         }
     49 
     50         virtual clock_type::time_point now() const {
     51             return clock_type::now();
     52         }
     53 
     54         virtual void schedule(const schedulable& scbl) const {
     55             controller.schedule(lifetime, scbl.get_action());
     56         }
     57 
     58         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
     59             controller.schedule(when, lifetime, scbl.get_action());
     60         }
     61     };
     62 
     63     mutable thread_factory factory;
     64     scheduler newthread;
     65     mutable std::atomic<std::size_t> count;
     66     composite_subscription loops_lifetime;
     67     std::vector<worker> loops;
     68 
     69 public:
     70     event_loop()
     71         : factory([](std::function<void()> start){
     72             return std::thread(std::move(start));
     73         })
     74         , newthread(make_new_thread())
     75         , count(0)
     76     {
     77         auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
     78         while (remaining--) {
     79             loops.push_back(newthread.create_worker(loops_lifetime));
     80         }
     81     }
     82     explicit event_loop(thread_factory tf)
     83         : factory(tf)
     84         , newthread(make_new_thread(tf))
     85         , count(0)
     86     {
     87         auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
     88         while (remaining--) {
     89             loops.push_back(newthread.create_worker(loops_lifetime));
     90         }
     91     }
     92     virtual ~event_loop()
     93     {
     94         loops_lifetime.unsubscribe();
     95     }
     96 
     97     virtual clock_type::time_point now() const {
     98         return clock_type::now();
     99     }
    100 
    101     virtual worker create_worker(composite_subscription cs) const {
    102         return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this()));
    103     }
    104 };
    105 
    106 inline scheduler make_event_loop() {
    107     static scheduler instance = make_scheduler<event_loop>();
    108     return instance;
    109 }
    110 inline scheduler make_event_loop(thread_factory tf) {
    111     return make_scheduler<event_loop>(tf);
    112 }
    113 
    114 }
    115 
    116 }
    117 
    118 #endif
    119