Home | History | Annotate | Download | only in schedulers
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP)
      6 #define RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP
      7 
      8 #include "../rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace schedulers {
     13 
     14 namespace detail {
     15 
     16 struct action_queue
     17 {
     18     typedef action_queue this_type;
     19 
     20     typedef scheduler_base::clock_type clock;
     21     typedef time_schedulable<clock::time_point> item_type;
     22 
     23 private:
     24     typedef schedulable_queue<item_type::time_point_type> queue_item_time;
     25 
     26 public:
     27     struct current_thread_queue_type {
     28         std::shared_ptr<worker_interface> w;
     29         recursion r;
     30         queue_item_time q;
     31     };
     32 
     33 private:
     34 #if defined(RXCPP_THREAD_LOCAL)
     35      static current_thread_queue_type*& current_thread_queue() {
     36          static RXCPP_THREAD_LOCAL current_thread_queue_type* q;
     37          return q;
     38      }
     39 #else
     40     static rxu::thread_local_storage<current_thread_queue_type>& current_thread_queue() {
     41         static rxu::thread_local_storage<current_thread_queue_type> q;
     42         return q;
     43     }
     44 #endif
     45 
     46 public:
     47 
     48     static bool owned() {
     49         return !!current_thread_queue();
     50     }
     51     static const std::shared_ptr<worker_interface>& get_worker_interface() {
     52         return current_thread_queue()->w;
     53     }
     54     static recursion& get_recursion() {
     55         return current_thread_queue()->r;
     56     }
     57     static bool empty() {
     58         if (!current_thread_queue()) {
     59             std::terminate();
     60         }
     61         return current_thread_queue()->q.empty();
     62     }
     63     static queue_item_time::const_reference top() {
     64         if (!current_thread_queue()) {
     65             std::terminate();
     66         }
     67         return current_thread_queue()->q.top();
     68     }
     69     static void pop() {
     70         auto& state = current_thread_queue();
     71         if (!state) {
     72             std::terminate();
     73         }
     74         state->q.pop();
     75         if (state->q.empty()) {
     76             // allow recursion
     77             state->r.reset(true);
     78         }
     79     }
     80     static void push(item_type item) {
     81         auto& state = current_thread_queue();
     82         if (!state) {
     83             std::terminate();
     84         }
     85         if (!item.what.is_subscribed()) {
     86             return;
     87         }
     88         state->q.push(std::move(item));
     89         // disallow recursion
     90         state->r.reset(false);
     91     }
     92     static std::shared_ptr<worker_interface> ensure(std::shared_ptr<worker_interface> w) {
     93         if (!!current_thread_queue()) {
     94             std::terminate();
     95         }
     96         // create and publish new queue
     97         current_thread_queue() = new current_thread_queue_type();
     98         current_thread_queue()->w = w;
     99         return w;
    100     }
    101     static std::unique_ptr<current_thread_queue_type> create(std::shared_ptr<worker_interface> w) {
    102         std::unique_ptr<current_thread_queue_type> result(new current_thread_queue_type());
    103         result->w = std::move(w);
    104         return result;
    105     }
    106     static void set(current_thread_queue_type* q) {
    107         if (!!current_thread_queue()) {
    108             std::terminate();
    109         }
    110         // publish new queue
    111         current_thread_queue() = q;
    112     }
    113     static void destroy(current_thread_queue_type* q) {
    114         delete q;
    115     }
    116     static void destroy() {
    117         if (!current_thread_queue()) {
    118             std::terminate();
    119         }
    120 #if defined(RXCPP_THREAD_LOCAL)
    121          destroy(current_thread_queue());
    122 #else
    123         destroy(current_thread_queue().get());
    124 #endif
    125         current_thread_queue() = nullptr;
    126     }
    127 };
    128 
    129 }
    130 
    131 struct current_thread : public scheduler_interface
    132 {
    133 private:
    134     typedef current_thread this_type;
    135     current_thread(const this_type&);
    136 
    137     typedef detail::action_queue queue_type;
    138 
    139     struct derecurser : public worker_interface
    140     {
    141     private:
    142         typedef current_thread this_type;
    143         derecurser(const this_type&);
    144     public:
    145         derecurser()
    146         {
    147         }
    148         virtual ~derecurser()
    149         {
    150         }
    151 
    152         virtual clock_type::time_point now() const {
    153             return clock_type::now();
    154         }
    155 
    156         virtual void schedule(const schedulable& scbl) const {
    157             queue_type::push(queue_type::item_type(now(), scbl));
    158         }
    159 
    160         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
    161             queue_type::push(queue_type::item_type(when, scbl));
    162         }
    163     };
    164 
    165     struct current_worker : public worker_interface
    166     {
    167     private:
    168         typedef current_thread this_type;
    169         current_worker(const this_type&);
    170     public:
    171         current_worker()
    172         {
    173         }
    174         virtual ~current_worker()
    175         {
    176         }
    177 
    178         virtual clock_type::time_point now() const {
    179             return clock_type::now();
    180         }
    181 
    182         virtual void schedule(const schedulable& scbl) const {
    183             schedule(now(), scbl);
    184         }
    185 
    186         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
    187             if (!scbl.is_subscribed()) {
    188                 return;
    189             }
    190 
    191             {
    192                 // check ownership
    193                 if (queue_type::owned()) {
    194                     // already has an owner - delegate
    195                     queue_type::get_worker_interface()->schedule(when, scbl);
    196                     return;
    197                 }
    198 
    199                 // take ownership
    200                 queue_type::ensure(std::make_shared<derecurser>());
    201             }
    202             // release ownership
    203             RXCPP_UNWIND_AUTO([]{
    204                 queue_type::destroy();
    205             });
    206 
    207             const auto& recursor = queue_type::get_recursion().get_recurse();
    208             std::this_thread::sleep_until(when);
    209             if (scbl.is_subscribed()) {
    210                 scbl(recursor);
    211             }
    212             if (queue_type::empty()) {
    213                 return;
    214             }
    215 
    216             // loop until queue is empty
    217             for (
    218                 auto next = queue_type::top().when;
    219                 (std::this_thread::sleep_until(next), true);
    220                 next = queue_type::top().when
    221             ) {
    222                 auto what = queue_type::top().what;
    223 
    224                 queue_type::pop();
    225 
    226                 if (what.is_subscribed()) {
    227                     what(recursor);
    228                 }
    229 
    230                 if (queue_type::empty()) {
    231                     break;
    232                 }
    233             }
    234         }
    235     };
    236 
    237     std::shared_ptr<current_worker> wi;
    238 
    239 public:
    240     current_thread()
    241         : wi(std::make_shared<current_worker>())
    242     {
    243     }
    244     virtual ~current_thread()
    245     {
    246     }
    247 
    248     static bool is_schedule_required() { return !queue_type::owned(); }
    249 
    250     inline bool is_tail_recursion_allowed() const {
    251         return queue_type::empty();
    252     }
    253 
    254     virtual clock_type::time_point now() const {
    255         return clock_type::now();
    256     }
    257 
    258     virtual worker create_worker(composite_subscription cs) const {
    259         return worker(std::move(cs), wi);
    260     }
    261 };
    262 
    263 inline const scheduler& make_current_thread() {
    264     static scheduler instance = make_scheduler<current_thread>();
    265     return instance;
    266 }
    267 
    268 }
    269 
    270 }
    271 
    272 #endif
    273