Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_OPERATORS_RX_ZIP_HPP)
      6 #define RXCPP_OPERATORS_RX_ZIP_HPP
      7 
      8 #include "../rx-includes.hpp"
      9 
     10 /*! \file rx-zip.hpp
     11 
     12     \brief Bring by one item from all given observables and select a value to emit from the new observable that is returned.
     13 
     14     \tparam AN  types of scheduler (optional), aggregate function (optional), and source observables
     15 
     16     \param  an  scheduler (optional), aggregation function (optional), and source observables
     17 
     18     \return  Observable that emits the result of combining the items emitted and brought by one from each of the source observables.
     19 
     20     If scheduler is omitted, identity_current_thread is used.
     21 
     22     If aggregation function is omitted, the resulting observable returns tuples of emitted items.
     23 
     24     \sample
     25 
     26     Neither scheduler nor aggregation function are present:
     27     \snippet zip.cpp zip sample
     28     \snippet output.txt zip sample
     29 
     30     Only scheduler is present:
     31     \snippet zip.cpp Coordination zip sample
     32     \snippet output.txt Coordination zip sample
     33 
     34     Only aggregation function is present:
     35     \snippet zip.cpp Selector zip sample
     36     \snippet output.txt Selector zip sample
     37 
     38     Both scheduler and aggregation function are present:
     39     \snippet zip.cpp Coordination+Selector zip sample
     40     \snippet output.txt Coordination+Selector zip sample
     41 */
     42 
     43 namespace rxcpp {
     44 
     45 namespace operators {
     46 
     47 namespace detail {
     48 
     49 template<class Observable>
     50 struct zip_source_state
     51 {
     52     using value_type = rxu::value_type_t<Observable>;
     53     zip_source_state()
     54         : completed(false)
     55     {
     56     }
     57     std::list<value_type> values;
     58     bool completed;
     59 };
     60 
     61 struct values_not_empty {
     62     template<class Observable>
     63     bool operator()(zip_source_state<Observable>& source) const {
     64         return !source.values.empty();
     65     }
     66 };
     67 
     68 struct source_completed_values_empty {
     69     template<class Observable>
     70     bool operator()(zip_source_state<Observable>& source) const {
     71         return source.completed && source.values.empty();
     72     }
     73 };
     74 
     75 struct extract_value_front {
     76     template<class Observable, class Value = rxu::value_type_t<Observable>>
     77     Value operator()(zip_source_state<Observable>& source) const {
     78         auto val = std::move(source.values.front());
     79         source.values.pop_front();
     80         return val;
     81     }
     82 };
     83 
     84 template<class... AN>
     85 struct zip_invalid_arguments {};
     86 
     87 template<class... AN>
     88 struct zip_invalid : public rxo::operator_base<zip_invalid_arguments<AN...>> {
     89     using type = observable<zip_invalid_arguments<AN...>, zip_invalid<AN...>>;
     90 };
     91 template<class... AN>
     92 using zip_invalid_t = typename zip_invalid<AN...>::type;
     93 
     94 template<class Selector, class... ObservableN>
     95 struct is_zip_selector_check {
     96     typedef rxu::decay_t<Selector> selector_type;
     97 
     98     struct tag_not_valid;
     99     template<class CS, class... CON>
    100     static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...));
    101     template<class CS, class... CON>
    102     static tag_not_valid check(...);
    103 
    104     using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0));
    105 
    106     static const bool value = !std::is_same<type, tag_not_valid>::value;
    107 };
    108 
    109 template<class Selector, class... ObservableN>
    110 struct invalid_zip_selector {
    111     static const bool value = false;
    112 };
    113 
    114 template<class Selector, class... ObservableN>
    115 struct is_zip_selector : public std::conditional<
    116     is_zip_selector_check<Selector, ObservableN...>::value,
    117     is_zip_selector_check<Selector, ObservableN...>,
    118     invalid_zip_selector<Selector, ObservableN...>>::type {
    119 };
    120 
    121 template<class Selector, class... ON>
    122 using result_zip_selector_t = typename is_zip_selector<Selector, ON...>::type;
    123 
    124 template<class Coordination, class Selector, class... ObservableN>
    125 struct zip_traits {
    126     typedef std::tuple<rxu::decay_t<ObservableN>...> tuple_source_type;
    127     typedef std::tuple<zip_source_state<ObservableN>...> tuple_source_values_type;
    128 
    129     typedef rxu::decay_t<Selector> selector_type;
    130     typedef rxu::decay_t<Coordination> coordination_type;
    131 
    132     typedef typename is_zip_selector<selector_type, ObservableN...>::type value_type;
    133 };
    134 
    135 template<class Coordination, class Selector, class... ObservableN>
    136 struct zip : public operator_base<rxu::value_type_t<zip_traits<Coordination, Selector, ObservableN...>>>
    137 {
    138     typedef zip<Coordination, Selector, ObservableN...> this_type;
    139 
    140     typedef zip_traits<Coordination, Selector, ObservableN...> traits;
    141 
    142     typedef typename traits::tuple_source_type tuple_source_type;
    143     typedef typename traits::tuple_source_values_type tuple_source_values_type;
    144 
    145     typedef typename traits::selector_type selector_type;
    146 
    147     typedef typename traits::coordination_type coordination_type;
    148     typedef typename coordination_type::coordinator_type coordinator_type;
    149 
    150     struct values
    151     {
    152         values(tuple_source_type o, selector_type s, coordination_type sf)
    153             : source(std::move(o))
    154             , selector(std::move(s))
    155             , coordination(std::move(sf))
    156         {
    157         }
    158         tuple_source_type source;
    159         selector_type selector;
    160         coordination_type coordination;
    161     };
    162     values initial;
    163 
    164     zip(coordination_type sf, selector_type s, tuple_source_type ts)
    165         : initial(std::move(ts), std::move(s), std::move(sf))
    166     {
    167     }
    168 
    169     template<int Index, class State>
    170     void subscribe_one(std::shared_ptr<State> state) const {
    171 
    172         typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type;
    173 
    174         composite_subscription innercs;
    175 
    176         // when the out observer is unsubscribed all the
    177         // inner subscriptions are unsubscribed as well
    178         state->out.add(innercs);
    179 
    180         auto source = on_exception(
    181             [&](){return state->coordinator.in(std::get<Index>(state->source));},
    182             state->out);
    183         if (source.empty()) {
    184             return;
    185         }
    186 
    187         // this subscribe does not share the observer subscription
    188         // so that when it is unsubscribed the observer can be called
    189         // until the inner subscriptions have finished
    190         auto sink = make_subscriber<source_value_type>(
    191             state->out,
    192             innercs,
    193         // on_next
    194             [state](source_value_type st) {
    195                 auto& values = std::get<Index>(state->pending).values;
    196                 values.push_back(st);
    197                 if (rxu::apply_to_each(state->pending, values_not_empty(), rxu::all_values_true())) {
    198                     auto selectedResult = rxu::apply_to_each(state->pending, extract_value_front(), state->selector);
    199                     state->out.on_next(selectedResult);
    200                 }
    201                 if (rxu::apply_to_each(state->pending, source_completed_values_empty(), rxu::any_value_true())) {
    202                     state->out.on_completed();
    203                 }
    204             },
    205         // on_error
    206             [state](rxu::error_ptr e) {
    207                 state->out.on_error(e);
    208             },
    209         // on_completed
    210             [state]() {
    211                 auto& completed = std::get<Index>(state->pending).completed;
    212                 completed = true;
    213                 if (--state->pendingCompletions == 0) {
    214                     state->out.on_completed();
    215                 }
    216             }
    217         );
    218         auto selectedSink = on_exception(
    219             [&](){return state->coordinator.out(sink);},
    220             state->out);
    221         if (selectedSink.empty()) {
    222             return;
    223         }
    224         source->subscribe(std::move(selectedSink.get()));
    225     }
    226     template<class State, int... IndexN>
    227     void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const {
    228         bool subscribed[] = {(subscribe_one<IndexN>(state), true)...};
    229         subscribed[0] = (*subscribed); // silence warning
    230     }
    231 
    232     template<class Subscriber>
    233     void on_subscribe(Subscriber scbr) const {
    234         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
    235 
    236         typedef Subscriber output_type;
    237 
    238         struct zip_state_type
    239             : public std::enable_shared_from_this<zip_state_type>
    240             , public values
    241         {
    242             zip_state_type(values i, coordinator_type coor, output_type oarg)
    243                 : values(std::move(i))
    244                 , pendingCompletions(sizeof... (ObservableN))
    245                 , valuesSet(0)
    246                 , coordinator(std::move(coor))
    247                 , out(std::move(oarg))
    248             {
    249             }
    250 
    251             // on_completed on the output must wait until all the
    252             // subscriptions have received on_completed
    253             mutable int pendingCompletions;
    254             mutable int valuesSet;
    255             mutable tuple_source_values_type pending;
    256             coordinator_type coordinator;
    257             output_type out;
    258         };
    259 
    260         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
    261 
    262         // take a copy of the values for each subscription
    263         auto state = std::make_shared<zip_state_type>(initial, std::move(coordinator), std::move(scbr));
    264 
    265         subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type());
    266     }
    267 };
    268 
    269 }
    270 
    271 /*! @copydoc rx-zip.hpp
    272 */
    273 template<class... AN>
    274 auto zip(AN&&... an)
    275     ->     operator_factory<zip_tag, AN...> {
    276     return operator_factory<zip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    277 }
    278 
    279 }
    280 
    281 template<>
    282 struct member_overload<zip_tag>
    283 {
    284     template<class Observable, class... ObservableN,
    285         class Enabled = rxu::enable_if_all_true_type_t<
    286             all_observables<Observable, ObservableN...>>,
    287         class Zip = rxo::detail::zip<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
    288         class Value = rxu::value_type_t<Zip>,
    289         class Result = observable<Value, Zip>>
    290     static Result member(Observable&& o, ObservableN&&... on)
    291     {
    292         return Result(Zip(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
    293     }
    294 
    295     template<class Observable, class Selector, class... ObservableN,
    296         class Enabled = rxu::enable_if_all_true_type_t<
    297             operators::detail::is_zip_selector<Selector, Observable, ObservableN...>,
    298             all_observables<Observable, ObservableN...>>,
    299         class ResolvedSelector = rxu::decay_t<Selector>,
    300         class Zip = rxo::detail::zip<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
    301         class Value = rxu::value_type_t<Zip>,
    302         class Result = observable<Value, Zip>>
    303     static Result member(Observable&& o, Selector&& s, ObservableN&&... on)
    304     {
    305         return Result(Zip(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
    306     }
    307 
    308     template<class Coordination, class Observable, class... ObservableN,
    309         class Enabled = rxu::enable_if_all_true_type_t<
    310             is_coordination<Coordination>,
    311             all_observables<Observable, ObservableN...>>,
    312         class Zip = rxo::detail::zip<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
    313         class Value = rxu::value_type_t<Zip>,
    314         class Result = observable<Value, Zip>>
    315     static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on)
    316     {
    317         return Result(Zip(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
    318     }
    319 
    320     template<class Coordination, class Selector, class Observable, class... ObservableN,
    321         class Enabled = rxu::enable_if_all_true_type_t<
    322             is_coordination<Coordination>,
    323             operators::detail::is_zip_selector<Selector, Observable, ObservableN...>,
    324             all_observables<Observable, ObservableN...>>,
    325         class ResolvedSelector = rxu::decay_t<Selector>,
    326         class Zip = rxo::detail::zip<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
    327         class Value = rxu::value_type_t<Zip>,
    328         class Result = observable<Value, Zip>>
    329     static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on)
    330     {
    331         return Result(Zip(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
    332     }
    333 
    334     template<class... AN>
    335     static operators::detail::zip_invalid_t<AN...> member(const AN&...) {
    336         std::terminate();
    337         return {};
    338         static_assert(sizeof...(AN) == 10000, "zip takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)");
    339     }
    340 };
    341 
    342 }
    343 
    344 #endif
    345