Home | History | Annotate | Download | only in schedulers
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
      6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP
      7 
      8 #include "../rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace schedulers {
     13 
     14 typedef std::function<std::thread(std::function<void()>)> thread_factory;
     15 
     16 struct new_thread : public scheduler_interface
     17 {
     18 private:
     19     typedef new_thread this_type;
     20     new_thread(const this_type&);
     21 
     22     struct new_worker : public worker_interface
     23     {
     24     private:
     25         typedef new_worker this_type;
     26 
     27         typedef detail::action_queue queue_type;
     28 
     29         new_worker(const this_type&);
     30 
     31         struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
     32         {
     33             typedef detail::schedulable_queue<
     34                 typename clock_type::time_point> queue_item_time;
     35 
     36             typedef queue_item_time::item_type item_type;
     37 
     38             virtual ~new_worker_state()
     39             {
     40             }
     41 
     42             explicit new_worker_state(composite_subscription cs)
     43                 : lifetime(cs)
     44             {
     45             }
     46 
     47             composite_subscription lifetime;
     48             mutable std::mutex lock;
     49             mutable std::condition_variable wake;
     50             mutable queue_item_time q;
     51             std::thread worker;
     52             recursion r;
     53         };
     54 
     55         std::shared_ptr<new_worker_state> state;
     56 
     57     public:
     58         virtual ~new_worker()
     59         {
     60         }
     61 
     62         explicit new_worker(std::shared_ptr<new_worker_state> ws)
     63             : state(ws)
     64         {
     65         }
     66 
     67         new_worker(composite_subscription cs, thread_factory& tf)
     68             : state(std::make_shared<new_worker_state>(cs))
     69         {
     70             auto keepAlive = state;
     71 
     72             state->lifetime.add([keepAlive](){
     73                 std::unique_lock<std::mutex> guard(keepAlive->lock);
     74                 auto expired = std::move(keepAlive->q);
     75                 keepAlive->q = new_worker_state::queue_item_time{};
     76                 if (!keepAlive->q.empty()) std::terminate();
     77                 keepAlive->wake.notify_one();
     78 
     79                 if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
     80                     guard.unlock();
     81                     keepAlive->worker.join();
     82                 }
     83                 else {
     84                     keepAlive->worker.detach();
     85                 }
     86             });
     87 
     88             state->worker = tf([keepAlive](){
     89 
     90                 // take ownership
     91                 queue_type::ensure(std::make_shared<new_worker>(keepAlive));
     92                 // release ownership
     93                 RXCPP_UNWIND_AUTO([]{
     94                     queue_type::destroy();
     95                 });
     96 
     97                 for(;;) {
     98                     std::unique_lock<std::mutex> guard(keepAlive->lock);
     99                     if (keepAlive->q.empty()) {
    100                         keepAlive->wake.wait(guard, [keepAlive](){
    101                             return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
    102                         });
    103                     }
    104                     if (!keepAlive->lifetime.is_subscribed()) {
    105                         break;
    106                     }
    107                     auto& peek = keepAlive->q.top();
    108                     if (!peek.what.is_subscribed()) {
    109                         keepAlive->q.pop();
    110                         continue;
    111                     }
    112                     if (clock_type::now() < peek.when) {
    113                         keepAlive->wake.wait_until(guard, peek.when);
    114                         continue;
    115                     }
    116                     auto what = peek.what;
    117                     keepAlive->q.pop();
    118                     keepAlive->r.reset(keepAlive->q.empty());
    119                     guard.unlock();
    120                     what(keepAlive->r.get_recurse());
    121                 }
    122             });
    123         }
    124 
    125         virtual clock_type::time_point now() const {
    126             return clock_type::now();
    127         }
    128 
    129         virtual void schedule(const schedulable& scbl) const {
    130             schedule(now(), scbl);
    131         }
    132 
    133         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
    134             if (scbl.is_subscribed()) {
    135                 std::unique_lock<std::mutex> guard(state->lock);
    136                 state->q.push(new_worker_state::item_type(when, scbl));
    137                 state->r.reset(false);
    138             }
    139             state->wake.notify_one();
    140         }
    141     };
    142 
    143     mutable thread_factory factory;
    144 
    145 public:
    146     new_thread()
    147         : factory([](std::function<void()> start){
    148             return std::thread(std::move(start));
    149         })
    150     {
    151     }
    152     explicit new_thread(thread_factory tf)
    153         : factory(tf)
    154     {
    155     }
    156     virtual ~new_thread()
    157     {
    158     }
    159 
    160     virtual clock_type::time_point now() const {
    161         return clock_type::now();
    162     }
    163 
    164     virtual worker create_worker(composite_subscription cs) const {
    165         return worker(cs, std::make_shared<new_worker>(cs, factory));
    166     }
    167 };
    168 
    169 inline scheduler make_new_thread() {
    170     static scheduler instance = make_scheduler<new_thread>();
    171     return instance;
    172 }
    173 inline scheduler make_new_thread(thread_factory tf) {
    174     return make_scheduler<new_thread>(tf);
    175 }
    176 
    177 }
    178 
    179 }
    180 
    181 #endif
    182