1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP) 6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP 7 8 #include "../rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace schedulers { 13 14 typedef std::function<std::thread(std::function<void()>)> thread_factory; 15 16 struct new_thread : public scheduler_interface 17 { 18 private: 19 typedef new_thread this_type; 20 new_thread(const this_type&); 21 22 struct new_worker : public worker_interface 23 { 24 private: 25 typedef new_worker this_type; 26 27 typedef detail::action_queue queue_type; 28 29 new_worker(const this_type&); 30 31 struct new_worker_state : public std::enable_shared_from_this<new_worker_state> 32 { 33 typedef detail::schedulable_queue< 34 typename clock_type::time_point> queue_item_time; 35 36 typedef queue_item_time::item_type item_type; 37 38 virtual ~new_worker_state() 39 { 40 } 41 42 explicit new_worker_state(composite_subscription cs) 43 : lifetime(cs) 44 { 45 } 46 47 composite_subscription lifetime; 48 mutable std::mutex lock; 49 mutable std::condition_variable wake; 50 mutable queue_item_time q; 51 std::thread worker; 52 recursion r; 53 }; 54 55 std::shared_ptr<new_worker_state> state; 56 57 public: 58 virtual ~new_worker() 59 { 60 } 61 62 explicit new_worker(std::shared_ptr<new_worker_state> ws) 63 : state(ws) 64 { 65 } 66 67 new_worker(composite_subscription cs, thread_factory& tf) 68 : state(std::make_shared<new_worker_state>(cs)) 69 { 70 auto keepAlive = state; 71 72 state->lifetime.add([keepAlive](){ 73 std::unique_lock<std::mutex> guard(keepAlive->lock); 74 auto expired = std::move(keepAlive->q); 75 keepAlive->q = new_worker_state::queue_item_time{}; 76 if (!keepAlive->q.empty()) std::terminate(); 77 keepAlive->wake.notify_one(); 78 79 if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) { 80 guard.unlock(); 81 keepAlive->worker.join(); 82 } 83 else { 84 keepAlive->worker.detach(); 85 } 86 }); 87 88 state->worker = tf([keepAlive](){ 89 90 // take ownership 91 queue_type::ensure(std::make_shared<new_worker>(keepAlive)); 92 // release ownership 93 RXCPP_UNWIND_AUTO([]{ 94 queue_type::destroy(); 95 }); 96 97 for(;;) { 98 std::unique_lock<std::mutex> guard(keepAlive->lock); 99 if (keepAlive->q.empty()) { 100 keepAlive->wake.wait(guard, [keepAlive](){ 101 return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty(); 102 }); 103 } 104 if (!keepAlive->lifetime.is_subscribed()) { 105 break; 106 } 107 auto& peek = keepAlive->q.top(); 108 if (!peek.what.is_subscribed()) { 109 keepAlive->q.pop(); 110 continue; 111 } 112 if (clock_type::now() < peek.when) { 113 keepAlive->wake.wait_until(guard, peek.when); 114 continue; 115 } 116 auto what = peek.what; 117 keepAlive->q.pop(); 118 keepAlive->r.reset(keepAlive->q.empty()); 119 guard.unlock(); 120 what(keepAlive->r.get_recurse()); 121 } 122 }); 123 } 124 125 virtual clock_type::time_point now() const { 126 return clock_type::now(); 127 } 128 129 virtual void schedule(const schedulable& scbl) const { 130 schedule(now(), scbl); 131 } 132 133 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { 134 if (scbl.is_subscribed()) { 135 std::unique_lock<std::mutex> guard(state->lock); 136 state->q.push(new_worker_state::item_type(when, scbl)); 137 state->r.reset(false); 138 } 139 state->wake.notify_one(); 140 } 141 }; 142 143 mutable thread_factory factory; 144 145 public: 146 new_thread() 147 : factory([](std::function<void()> start){ 148 return std::thread(std::move(start)); 149 }) 150 { 151 } 152 explicit new_thread(thread_factory tf) 153 : factory(tf) 154 { 155 } 156 virtual ~new_thread() 157 { 158 } 159 160 virtual clock_type::time_point now() const { 161 return clock_type::now(); 162 } 163 164 virtual worker create_worker(composite_subscription cs) const { 165 return worker(cs, std::make_shared<new_worker>(cs, factory)); 166 } 167 }; 168 169 inline scheduler make_new_thread() { 170 static scheduler instance = make_scheduler<new_thread>(); 171 return instance; 172 } 173 inline scheduler make_new_thread(thread_factory tf) { 174 return make_scheduler<new_thread>(tf); 175 } 176 177 } 178 179 } 180 181 #endif 182