Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-combine_latest.hpp
      6 
      7     \brief For each item from all of the observables select a value to emit from the new observable that is returned.
      8 
      9     \tparam AN  types of scheduler (optional), aggregate function (optional), and source observables
     10 
     11     \param  an  scheduler (optional), aggregation function (optional), and source observables
     12 
     13     \return  Observable that emits items that are the result of combining the items emitted by the source observables.
     14 
     15     If scheduler is omitted, identity_current_thread is used.
     16 
     17     If aggregation function is omitted, the resulting observable returns tuples of emitted items.
     18 
     19     \sample
     20 
     21     Neither scheduler nor aggregation function are present:
     22     \snippet combine_latest.cpp combine_latest sample
     23     \snippet output.txt combine_latest sample
     24 
     25     Only scheduler is present:
     26     \snippet combine_latest.cpp Coordination combine_latest sample
     27     \snippet output.txt Coordination combine_latest sample
     28 
     29     Only aggregation function is present:
     30     \snippet combine_latest.cpp Selector combine_latest sample
     31     \snippet output.txt Selector combine_latest sample
     32 
     33     Both scheduler and aggregation function are present:
     34     \snippet combine_latest.cpp Coordination+Selector combine_latest sample
     35     \snippet output.txt Coordination+Selector combine_latest sample
     36 */
     37 
     38 #if !defined(RXCPP_OPERATORS_RX_COMBINE_LATEST_HPP)
     39 #define RXCPP_OPERATORS_RX_COMBINE_LATEST_HPP
     40 
     41 #include "../rx-includes.hpp"
     42 
     43 namespace rxcpp {
     44 
     45 namespace operators {
     46 
     47 namespace detail {
     48 
     49 template<class... AN>
     50 struct combine_latest_invalid_arguments {};
     51 
     52 template<class... AN>
     53 struct combine_latest_invalid : public rxo::operator_base<combine_latest_invalid_arguments<AN...>> {
     54     using type = observable<combine_latest_invalid_arguments<AN...>, combine_latest_invalid<AN...>>;
     55 };
     56 template<class... AN>
     57 using combine_latest_invalid_t = typename combine_latest_invalid<AN...>::type;
     58 
     59 template<class Selector, class... ObservableN>
     60 struct is_combine_latest_selector_check {
     61     typedef rxu::decay_t<Selector> selector_type;
     62 
     63     struct tag_not_valid;
     64     template<class CS, class... CON>
     65     static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...));
     66     template<class CS, class... CON>
     67     static tag_not_valid check(...);
     68 
     69     using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0));
     70 
     71     static const bool value = !std::is_same<type, tag_not_valid>::value;
     72 };
     73 
     74 template<class Selector, class... ObservableN>
     75 struct invalid_combine_latest_selector {
     76     static const bool value = false;
     77 };
     78 
     79 template<class Selector, class... ObservableN>
     80 struct is_combine_latest_selector : public std::conditional<
     81     is_combine_latest_selector_check<Selector, ObservableN...>::value,
     82     is_combine_latest_selector_check<Selector, ObservableN...>,
     83     invalid_combine_latest_selector<Selector, ObservableN...>>::type {
     84 };
     85 
     86 template<class Selector, class... ON>
     87 using result_combine_latest_selector_t = typename is_combine_latest_selector<Selector, ON...>::type;
     88 
     89 template<class Coordination, class Selector, class... ObservableN>
     90 struct combine_latest_traits {
     91 
     92     typedef std::tuple<ObservableN...> tuple_source_type;
     93     typedef std::tuple<rxu::detail::maybe<typename ObservableN::value_type>...> tuple_source_value_type;
     94 
     95     typedef rxu::decay_t<Selector> selector_type;
     96     typedef rxu::decay_t<Coordination> coordination_type;
     97 
     98     typedef typename is_combine_latest_selector<selector_type, ObservableN...>::type value_type;
     99 };
    100 
    101 template<class Coordination, class Selector, class... ObservableN>
    102 struct combine_latest : public operator_base<rxu::value_type_t<combine_latest_traits<Coordination, Selector, ObservableN...>>>
    103 {
    104     typedef combine_latest<Coordination, Selector, ObservableN...> this_type;
    105 
    106     typedef combine_latest_traits<Coordination, Selector, ObservableN...> traits;
    107 
    108     typedef typename traits::tuple_source_type tuple_source_type;
    109     typedef typename traits::tuple_source_value_type tuple_source_value_type;
    110 
    111     typedef typename traits::selector_type selector_type;
    112 
    113     typedef typename traits::coordination_type coordination_type;
    114     typedef typename coordination_type::coordinator_type coordinator_type;
    115 
    116     struct values
    117     {
    118         values(tuple_source_type o, selector_type s, coordination_type sf)
    119             : source(std::move(o))
    120             , selector(std::move(s))
    121             , coordination(std::move(sf))
    122         {
    123         }
    124         tuple_source_type source;
    125         selector_type selector;
    126         coordination_type coordination;
    127     };
    128     values initial;
    129 
    130     combine_latest(coordination_type sf, selector_type s, tuple_source_type ts)
    131         : initial(std::move(ts), std::move(s), std::move(sf))
    132     {
    133     }
    134 
    135     template<int Index, class State>
    136     void subscribe_one(std::shared_ptr<State> state) const {
    137 
    138         typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type;
    139 
    140         composite_subscription innercs;
    141 
    142         // when the out observer is unsubscribed all the
    143         // inner subscriptions are unsubscribed as well
    144         state->out.add(innercs);
    145 
    146         auto source = on_exception(
    147             [&](){return state->coordinator.in(std::get<Index>(state->source));},
    148             state->out);
    149         if (source.empty()) {
    150             return;
    151         }
    152 
    153         // this subscribe does not share the observer subscription
    154         // so that when it is unsubscribed the observer can be called
    155         // until the inner subscriptions have finished
    156         auto sink = make_subscriber<source_value_type>(
    157             state->out,
    158             innercs,
    159         // on_next
    160             [state](source_value_type st) {
    161                 auto& value = std::get<Index>(state->latest);
    162 
    163                 if (value.empty()) {
    164                     ++state->valuesSet;
    165                 }
    166 
    167                 value.reset(st);
    168 
    169                 if (state->valuesSet == sizeof... (ObservableN)) {
    170                     auto values = rxu::surely(state->latest);
    171                     auto selectedResult = rxu::apply(values, state->selector);
    172                     state->out.on_next(selectedResult);
    173                 }
    174             },
    175         // on_error
    176             [state](rxu::error_ptr e) {
    177                 state->out.on_error(e);
    178             },
    179         // on_completed
    180             [state]() {
    181                 if (--state->pendingCompletions == 0) {
    182                     state->out.on_completed();
    183                 }
    184             }
    185         );
    186         auto selectedSink = on_exception(
    187             [&](){return state->coordinator.out(sink);},
    188             state->out);
    189         if (selectedSink.empty()) {
    190             return;
    191         }
    192         source->subscribe(std::move(selectedSink.get()));
    193     }
    194     template<class State, int... IndexN>
    195     void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const {
    196         bool subscribed[] = {(subscribe_one<IndexN>(state), true)...};
    197         subscribed[0] = (*subscribed); // silence warning
    198     }
    199 
    200     template<class Subscriber>
    201     void on_subscribe(Subscriber scbr) const {
    202         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
    203 
    204         typedef Subscriber output_type;
    205 
    206         struct combine_latest_state_type
    207             : public std::enable_shared_from_this<combine_latest_state_type>
    208             , public values
    209         {
    210             combine_latest_state_type(values i, coordinator_type coor, output_type oarg)
    211                 : values(std::move(i))
    212                 , pendingCompletions(sizeof... (ObservableN))
    213                 , valuesSet(0)
    214                 , coordinator(std::move(coor))
    215                 , out(std::move(oarg))
    216             {
    217             }
    218 
    219             // on_completed on the output must wait until all the
    220             // subscriptions have received on_completed
    221             mutable int pendingCompletions;
    222             mutable int valuesSet;
    223             mutable tuple_source_value_type latest;
    224             coordinator_type coordinator;
    225             output_type out;
    226         };
    227 
    228         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
    229 
    230         // take a copy of the values for each subscription
    231         auto state = std::make_shared<combine_latest_state_type>(initial, std::move(coordinator), std::move(scbr));
    232 
    233         subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type());
    234     }
    235 };
    236 
    237 }
    238 
    239 /*! @copydoc rx-combine_latest.hpp
    240 */
    241 template<class... AN>
    242 auto combine_latest(AN&&... an)
    243     ->     operator_factory<combine_latest_tag, AN...> {
    244     return operator_factory<combine_latest_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    245 }
    246 
    247 }
    248 
    249 template<>
    250 struct member_overload<combine_latest_tag>
    251 {
    252     template<class Observable, class... ObservableN,
    253         class Enabled = rxu::enable_if_all_true_type_t<
    254             all_observables<Observable, ObservableN...>>,
    255         class combine_latest = rxo::detail::combine_latest<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
    256         class Value = rxu::value_type_t<combine_latest>,
    257         class Result = observable<Value, combine_latest>>
    258     static Result member(Observable&& o, ObservableN&&... on)
    259     {
    260         return Result(combine_latest(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
    261     }
    262 
    263     template<class Observable, class Selector, class... ObservableN,
    264         class Enabled = rxu::enable_if_all_true_type_t<
    265             operators::detail::is_combine_latest_selector<Selector, Observable, ObservableN...>,
    266             all_observables<Observable, ObservableN...>>,
    267         class ResolvedSelector = rxu::decay_t<Selector>,
    268         class combine_latest = rxo::detail::combine_latest<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
    269         class Value = rxu::value_type_t<combine_latest>,
    270         class Result = observable<Value, combine_latest>>
    271     static Result member(Observable&& o, Selector&& s, ObservableN&&... on)
    272     {
    273         return Result(combine_latest(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
    274     }
    275 
    276     template<class Coordination, class Observable, class... ObservableN,
    277         class Enabled = rxu::enable_if_all_true_type_t<
    278             is_coordination<Coordination>,
    279             all_observables<Observable, ObservableN...>>,
    280         class combine_latest = rxo::detail::combine_latest<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
    281         class Value = rxu::value_type_t<combine_latest>,
    282         class Result = observable<Value, combine_latest>>
    283     static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on)
    284     {
    285         return Result(combine_latest(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
    286     }
    287 
    288     template<class Coordination, class Selector, class Observable, class... ObservableN,
    289         class Enabled = rxu::enable_if_all_true_type_t<
    290             is_coordination<Coordination>,
    291             operators::detail::is_combine_latest_selector<Selector, Observable, ObservableN...>,
    292             all_observables<Observable, ObservableN...>>,
    293         class ResolvedSelector = rxu::decay_t<Selector>,
    294         class combine_latest = rxo::detail::combine_latest<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
    295         class Value = rxu::value_type_t<combine_latest>,
    296         class Result = observable<Value, combine_latest>>
    297     static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on)
    298     {
    299         return Result(combine_latest(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
    300     }
    301 
    302     template<class... AN>
    303     static operators::detail::combine_latest_invalid_t<AN...> member(const AN&...) {
    304         std::terminate();
    305         return {};
    306         static_assert(sizeof...(AN) == 10000, "combine_latest takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)");
    307     }
    308 };
    309 
    310 }
    311 
    312 #endif
    313