1 #pragma once 2 3 /*! \file rx-retry-repeat-common.hpp 4 5 \brief Implementation commonalities between retry and repeat operators abstracted away from rx-retry.hpp and rx-repeat.hpp files. Should be used only from rx-retry.hpp and rx-repeat.hpp 6 7 */ 8 9 #include "../rx-includes.hpp" 10 11 namespace rxcpp { 12 namespace operators { 13 namespace detail { 14 15 namespace retry_repeat_common { 16 // Structure to perform general retry/repeat operations on state 17 template <class Values, class Subscriber, class EventHandlers, class T> 18 struct state_type : public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>, 19 public Values { 20 21 typedef Subscriber output_type; 22 state_type(const Values& i, const output_type& oarg) 23 : Values(i), 24 source_lifetime(composite_subscription::empty()), 25 out(oarg) { 26 } 27 28 void do_subscribe() { 29 auto state = this->shared_from_this(); 30 31 state->out.remove(state->lifetime_token); 32 state->source_lifetime.unsubscribe(); 33 34 state->source_lifetime = composite_subscription(); 35 state->lifetime_token = state->out.add(state->source_lifetime); 36 37 state->source.subscribe( 38 state->out, 39 state->source_lifetime, 40 // on_next 41 [state](T t) { 42 state->out.on_next(t); 43 }, 44 // on_error 45 [state](rxu::error_ptr e) { 46 EventHandlers::on_error(state, e); 47 }, 48 // on_completed 49 [state]() { 50 EventHandlers::on_completed(state); 51 } 52 ); 53 } 54 55 composite_subscription source_lifetime; 56 output_type out; 57 composite_subscription::weak_subscription lifetime_token; 58 }; 59 60 // Finite case (explicitely limited with the number of times) 61 template <class EventHandlers, class T, class Observable, class Count> 62 struct finite : public operator_base<T> { 63 typedef rxu::decay_t<Observable> source_type; 64 typedef rxu::decay_t<Count> count_type; 65 66 struct values { 67 values(source_type s, count_type t) 68 : source(std::move(s)), 69 remaining_(std::move(t)) { 70 } 71 72 inline bool completed_predicate() const { 73 // Return true if we are completed 74 return remaining_ <= 0; 75 } 76 77 inline void update() { 78 // Decrement counter 79 --remaining_; 80 } 81 82 source_type source; 83 84 private: 85 // Counter to hold number of times remaining to complete 86 count_type remaining_; 87 }; 88 89 finite(source_type s, count_type t) 90 : initial_(std::move(s), std::move(t)) { 91 } 92 93 template<class Subscriber> 94 void on_subscribe(const Subscriber& s) const { 95 typedef state_type<values, Subscriber, EventHandlers, T> state_t; 96 // take a copy of the values for each subscription 97 auto state = std::make_shared<state_t>(initial_, s); 98 if (initial_.completed_predicate()) { 99 // return completed 100 state->out.on_completed(); 101 } else { 102 // start the first iteration 103 state->do_subscribe(); 104 } 105 } 106 107 private: 108 values initial_; 109 }; 110 111 // Infinite case 112 template <class EventHandlers, class T, class Observable> 113 struct infinite : public operator_base<T> { 114 typedef rxu::decay_t<Observable> source_type; 115 116 struct values { 117 values(source_type s) 118 : source(std::move(s)) { 119 } 120 121 static inline bool completed_predicate() { 122 // Infinite never completes 123 return false; 124 } 125 126 static inline void update() { 127 // Infinite does not need to update state 128 } 129 130 source_type source; 131 }; 132 133 infinite(source_type s) : initial_(std::move(s)) { 134 } 135 136 template<class Subscriber> 137 void on_subscribe(const Subscriber& s) const { 138 typedef state_type<values, Subscriber, EventHandlers, T> state_t; 139 // take a copy of the values for each subscription 140 auto state = std::make_shared<state_t>(initial_, s); 141 // start the first iteration 142 state->do_subscribe(); 143 } 144 145 private: 146 values initial_; 147 }; 148 149 150 } 151 } 152 } 153 } 154