1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP) 6 #define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP 7 8 #include "../rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace schedulers { 13 14 struct event_loop : public scheduler_interface 15 { 16 private: 17 typedef event_loop this_type; 18 event_loop(const this_type&); 19 20 struct loop_worker : public worker_interface 21 { 22 private: 23 typedef loop_worker this_type; 24 loop_worker(const this_type&); 25 26 typedef detail::schedulable_queue< 27 typename clock_type::time_point> queue_item_time; 28 29 typedef queue_item_time::item_type item_type; 30 31 composite_subscription lifetime; 32 worker controller; 33 std::shared_ptr<const scheduler_interface> alive; 34 35 public: 36 virtual ~loop_worker() 37 { 38 } 39 loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive) 40 : lifetime(cs) 41 , controller(w) 42 , alive(alive) 43 { 44 auto token = controller.add(cs); 45 cs.add([token, w](){ 46 w.remove(token); 47 }); 48 } 49 50 virtual clock_type::time_point now() const { 51 return clock_type::now(); 52 } 53 54 virtual void schedule(const schedulable& scbl) const { 55 controller.schedule(lifetime, scbl.get_action()); 56 } 57 58 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { 59 controller.schedule(when, lifetime, scbl.get_action()); 60 } 61 }; 62 63 mutable thread_factory factory; 64 scheduler newthread; 65 mutable std::atomic<std::size_t> count; 66 composite_subscription loops_lifetime; 67 std::vector<worker> loops; 68 69 public: 70 event_loop() 71 : factory([](std::function<void()> start){ 72 return std::thread(std::move(start)); 73 }) 74 , newthread(make_new_thread()) 75 , count(0) 76 { 77 auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4)); 78 while (remaining--) { 79 loops.push_back(newthread.create_worker(loops_lifetime)); 80 } 81 } 82 explicit event_loop(thread_factory tf) 83 : factory(tf) 84 , newthread(make_new_thread(tf)) 85 , count(0) 86 { 87 auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4)); 88 while (remaining--) { 89 loops.push_back(newthread.create_worker(loops_lifetime)); 90 } 91 } 92 virtual ~event_loop() 93 { 94 loops_lifetime.unsubscribe(); 95 } 96 97 virtual clock_type::time_point now() const { 98 return clock_type::now(); 99 } 100 101 virtual worker create_worker(composite_subscription cs) const { 102 return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this())); 103 } 104 }; 105 106 inline scheduler make_event_loop() { 107 static scheduler instance = make_scheduler<event_loop>(); 108 return instance; 109 } 110 inline scheduler make_event_loop(thread_factory tf) { 111 return make_scheduler<event_loop>(tf); 112 } 113 114 } 115 116 } 117 118 #endif 119