Home | History | Annotate | Download | only in operators
      1 #pragma once
      2 
      3 /*! \file rx-retry-repeat-common.hpp
      4 
      5     \brief Implementation commonalities between retry and repeat operators abstracted away from rx-retry.hpp and rx-repeat.hpp files. Should be used only from rx-retry.hpp and rx-repeat.hpp
      6 
      7 */
      8 
      9 #include "../rx-includes.hpp"
     10 
     11 namespace rxcpp {
     12   namespace operators {
     13     namespace detail {
     14 
     15       namespace retry_repeat_common {
     16         // Structure to perform general retry/repeat operations on state
     17         template <class Values, class Subscriber, class EventHandlers, class T>
     18         struct state_type : public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>,
     19                             public Values {
     20 
     21           typedef Subscriber output_type;
     22           state_type(const Values& i, const output_type& oarg)
     23             : Values(i),
     24               source_lifetime(composite_subscription::empty()),
     25               out(oarg) {
     26           }
     27 
     28           void do_subscribe() {
     29             auto state = this->shared_from_this();
     30 
     31             state->out.remove(state->lifetime_token);
     32             state->source_lifetime.unsubscribe();
     33 
     34             state->source_lifetime = composite_subscription();
     35             state->lifetime_token = state->out.add(state->source_lifetime);
     36 
     37             state->source.subscribe(
     38                                     state->out,
     39                                     state->source_lifetime,
     40                                     // on_next
     41                                     [state](T t) {
     42                                       state->out.on_next(t);
     43                                     },
     44                                     // on_error
     45                                     [state](rxu::error_ptr e) {
     46                                       EventHandlers::on_error(state, e);
     47                                     },
     48                                     // on_completed
     49                                     [state]() {
     50                                       EventHandlers::on_completed(state);
     51                                     }
     52                                     );
     53           }
     54 
     55           composite_subscription source_lifetime;
     56           output_type out;
     57           composite_subscription::weak_subscription lifetime_token;
     58         };
     59 
     60         // Finite case (explicitely limited with the number of times)
     61         template <class EventHandlers, class T, class Observable, class Count>
     62         struct finite : public operator_base<T> {
     63           typedef rxu::decay_t<Observable> source_type;
     64           typedef rxu::decay_t<Count> count_type;
     65 
     66           struct values {
     67             values(source_type s, count_type t)
     68               : source(std::move(s)),
     69                 remaining_(std::move(t)) {
     70             }
     71 
     72             inline bool completed_predicate() const {
     73               // Return true if we are completed
     74               return remaining_ <= 0;
     75             }
     76 
     77             inline void update() {
     78               // Decrement counter
     79               --remaining_;
     80             }
     81 
     82             source_type source;
     83 
     84           private:
     85             // Counter to hold number of times remaining to complete
     86             count_type remaining_;
     87           };
     88 
     89           finite(source_type s, count_type t)
     90             : initial_(std::move(s), std::move(t)) {
     91           }
     92 
     93           template<class Subscriber>
     94           void on_subscribe(const Subscriber& s) const {
     95             typedef state_type<values, Subscriber, EventHandlers, T> state_t;
     96             // take a copy of the values for each subscription
     97             auto state = std::make_shared<state_t>(initial_, s);
     98             if (initial_.completed_predicate()) {
     99               // return completed
    100               state->out.on_completed();
    101             } else {
    102               // start the first iteration
    103               state->do_subscribe();
    104             }
    105           }
    106 
    107         private:
    108           values initial_;
    109         };
    110 
    111         // Infinite case
    112         template <class EventHandlers, class T, class Observable>
    113         struct infinite : public operator_base<T> {
    114           typedef rxu::decay_t<Observable> source_type;
    115 
    116           struct values {
    117             values(source_type s)
    118               : source(std::move(s)) {
    119             }
    120 
    121             static inline bool completed_predicate() {
    122               // Infinite never completes
    123               return false;
    124             }
    125 
    126             static inline void update() {
    127               // Infinite does not need to update state
    128             }
    129 
    130             source_type source;
    131           };
    132 
    133           infinite(source_type s) : initial_(std::move(s)) {
    134           }
    135 
    136           template<class Subscriber>
    137           void on_subscribe(const Subscriber& s) const {
    138             typedef state_type<values, Subscriber, EventHandlers, T> state_t;
    139             // take a copy of the values for each subscription
    140             auto state = std::make_shared<state_t>(initial_, s);
    141             // start the first iteration
    142             state->do_subscribe();
    143           }
    144 
    145         private:
    146           values initial_;
    147         };
    148 
    149 
    150       }
    151     }
    152   }
    153 }
    154