Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-merge.hpp
      6 
      7     \brief For each given observable subscribe.
      8            For each item emitted from all of the given observables, deliver from the new observable that is returned.
      9 
     10            There are 2 variants of the operator:
     11            - The source observable emits nested observables, nested observables are merged.
     12            - The source observable and the arguments v0...vn are used to provide the observables to merge.
     13 
     14     \tparam Coordination  the type of the scheduler (optional).
     15     \tparam Value0  ... (optional).
     16     \tparam ValueN  types of source observables (optional).
     17 
     18     \param  cn  the scheduler to synchronize sources from different contexts (optional).
     19     \param  v0  ... (optional).
     20     \param  vn  source observables (optional).
     21 
     22     \return  Observable that emits items that are the result of flattening the observables emitted by the source observable.
     23 
     24     If scheduler is omitted, identity_current_thread is used.
     25 
     26     \sample
     27     \snippet merge.cpp threaded implicit merge sample
     28     \snippet output.txt threaded implicit merge sample
     29 
     30     \sample
     31     \snippet merge.cpp implicit merge sample
     32     \snippet output.txt implicit merge sample
     33 
     34     \sample
     35     \snippet merge.cpp merge sample
     36     \snippet output.txt merge sample
     37 
     38     \sample
     39     \snippet merge.cpp threaded merge sample
     40     \snippet output.txt threaded merge sample
     41 */
     42 
     43 #if !defined(RXCPP_OPERATORS_RX_MERGE_HPP)
     44 #define RXCPP_OPERATORS_RX_MERGE_HPP
     45 
     46 #include "../rx-includes.hpp"
     47 
     48 namespace rxcpp {
     49 
     50 namespace operators {
     51 
     52 namespace detail {
     53 
     54 template<class... AN>
     55 struct merge_invalid_arguments {};
     56 
     57 template<class... AN>
     58 struct merge_invalid : public rxo::operator_base<merge_invalid_arguments<AN...>> {
     59     using type = observable<merge_invalid_arguments<AN...>, merge_invalid<AN...>>;
     60 };
     61 template<class... AN>
     62 using merge_invalid_t = typename merge_invalid<AN...>::type;
     63 
     64 template<class T, class Observable, class Coordination>
     65 struct merge
     66     : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
     67 {
     68     //static_assert(is_observable<Observable>::value, "merge requires an observable");
     69     //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
     70 
     71     typedef merge<T, Observable, Coordination> this_type;
     72 
     73     typedef rxu::decay_t<T> source_value_type;
     74     typedef rxu::decay_t<Observable> source_type;
     75 
     76     typedef typename source_type::source_operator_type source_operator_type;
     77     typedef typename source_value_type::value_type value_type;
     78 
     79     typedef rxu::decay_t<Coordination> coordination_type;
     80     typedef typename coordination_type::coordinator_type coordinator_type;
     81 
     82     struct values
     83     {
     84         values(source_operator_type o, coordination_type sf)
     85             : source_operator(std::move(o))
     86             , coordination(std::move(sf))
     87         {
     88         }
     89         source_operator_type source_operator;
     90         coordination_type coordination;
     91     };
     92     values initial;
     93 
     94     merge(const source_type& o, coordination_type sf)
     95         : initial(o.source_operator, std::move(sf))
     96     {
     97     }
     98 
     99     template<class Subscriber>
    100     void on_subscribe(Subscriber scbr) const {
    101         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
    102 
    103         typedef Subscriber output_type;
    104 
    105         struct merge_state_type
    106             : public std::enable_shared_from_this<merge_state_type>
    107             , public values
    108         {
    109             merge_state_type(values i, coordinator_type coor, output_type oarg)
    110                 : values(i)
    111                 , source(i.source_operator)
    112                 , pendingCompletions(0)
    113                 , coordinator(std::move(coor))
    114                 , out(std::move(oarg))
    115             {
    116             }
    117             observable<source_value_type, source_operator_type> source;
    118             // on_completed on the output must wait until all the
    119             // subscriptions have received on_completed
    120             int pendingCompletions;
    121             coordinator_type coordinator;
    122             output_type out;
    123         };
    124 
    125         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
    126 
    127         // take a copy of the values for each subscription
    128         auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr));
    129 
    130         composite_subscription outercs;
    131 
    132         // when the out observer is unsubscribed all the
    133         // inner subscriptions are unsubscribed as well
    134         state->out.add(outercs);
    135 
    136         auto source = on_exception(
    137             [&](){return state->coordinator.in(state->source);},
    138             state->out);
    139         if (source.empty()) {
    140             return;
    141         }
    142 
    143         ++state->pendingCompletions;
    144         // this subscribe does not share the observer subscription
    145         // so that when it is unsubscribed the observer can be called
    146         // until the inner subscriptions have finished
    147         auto sink = make_subscriber<source_value_type>(
    148             state->out,
    149             outercs,
    150         // on_next
    151             [state](source_value_type st) {
    152 
    153                 composite_subscription innercs;
    154 
    155                 // when the out observer is unsubscribed all the
    156                 // inner subscriptions are unsubscribed as well
    157                 auto innercstoken = state->out.add(innercs);
    158 
    159                 innercs.add(make_subscription([state, innercstoken](){
    160                     state->out.remove(innercstoken);
    161                 }));
    162 
    163                 auto selectedSource = state->coordinator.in(st);
    164 
    165                 ++state->pendingCompletions;
    166                 // this subscribe does not share the source subscription
    167                 // so that when it is unsubscribed the source will continue
    168                 auto sinkInner = make_subscriber<value_type>(
    169                     state->out,
    170                     innercs,
    171                 // on_next
    172                     [state, st](value_type ct) {
    173                         state->out.on_next(std::move(ct));
    174                     },
    175                 // on_error
    176                     [state](rxu::error_ptr e) {
    177                         state->out.on_error(e);
    178                     },
    179                 //on_completed
    180                     [state](){
    181                         if (--state->pendingCompletions == 0) {
    182                             state->out.on_completed();
    183                         }
    184                     }
    185                 );
    186 
    187                 auto selectedSinkInner = state->coordinator.out(sinkInner);
    188                 selectedSource.subscribe(std::move(selectedSinkInner));
    189             },
    190         // on_error
    191             [state](rxu::error_ptr e) {
    192                 state->out.on_error(e);
    193             },
    194         // on_completed
    195             [state]() {
    196                 if (--state->pendingCompletions == 0) {
    197                     state->out.on_completed();
    198                 }
    199             }
    200         );
    201         auto selectedSink = on_exception(
    202             [&](){return state->coordinator.out(sink);},
    203             state->out);
    204         if (selectedSink.empty()) {
    205             return;
    206         }
    207         source->subscribe(std::move(selectedSink.get()));
    208     }
    209 };
    210 
    211 }
    212 
    213 /*! @copydoc rx-merge.hpp
    214 */
    215 template<class... AN>
    216 auto merge(AN&&... an)
    217     ->     operator_factory<merge_tag, AN...> {
    218     return operator_factory<merge_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    219 }
    220 
    221 }
    222 
    223 template<>
    224 struct member_overload<merge_tag>
    225 {
    226     template<class Observable,
    227         class Enabled = rxu::enable_if_all_true_type_t<
    228             is_observable<Observable>>,
    229         class SourceValue = rxu::value_type_t<Observable>,
    230         class Merge = rxo::detail::merge<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
    231         class Value = rxu::value_type_t<SourceValue>,
    232         class Result = observable<Value, Merge>
    233     >
    234     static Result member(Observable&& o) {
    235         return Result(Merge(std::forward<Observable>(o), identity_current_thread()));
    236     }
    237 
    238     template<class Observable, class Coordination,
    239         class Enabled = rxu::enable_if_all_true_type_t<
    240             is_observable<Observable>,
    241             is_coordination<Coordination>>,
    242         class SourceValue = rxu::value_type_t<Observable>,
    243         class Merge = rxo::detail::merge<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
    244         class Value = rxu::value_type_t<SourceValue>,
    245         class Result = observable<Value, Merge>
    246     >
    247     static Result member(Observable&& o, Coordination&& cn) {
    248         return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn)));
    249     }
    250 
    251     template<class Observable, class Value0, class... ValueN,
    252         class Enabled = rxu::enable_if_all_true_type_t<
    253             all_observables<Observable, Value0, ValueN...>>,
    254         class EmittedValue = rxu::value_type_t<Observable>,
    255         class SourceValue = observable<EmittedValue>,
    256         class ObservableObservable = observable<SourceValue>,
    257         class Merge = typename rxu::defer_type<rxo::detail::merge, SourceValue, ObservableObservable, identity_one_worker>::type,
    258         class Value = rxu::value_type_t<Merge>,
    259         class Result = observable<Value, Merge>
    260     >
    261     static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
    262         return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
    263     }
    264 
    265     template<class Observable, class Coordination, class Value0, class... ValueN,
    266         class Enabled = rxu::enable_if_all_true_type_t<
    267             all_observables<Observable, Value0, ValueN...>,
    268             is_coordination<Coordination>>,
    269         class EmittedValue = rxu::value_type_t<Observable>,
    270         class SourceValue = observable<EmittedValue>,
    271         class ObservableObservable = observable<SourceValue>,
    272         class Merge = typename rxu::defer_type<rxo::detail::merge, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
    273         class Value = rxu::value_type_t<Merge>,
    274         class Result = observable<Value, Merge>
    275     >
    276     static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
    277         return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
    278     }
    279 
    280     template<class... AN>
    281     static operators::detail::merge_invalid_t<AN...> member(AN...) {
    282         std::terminate();
    283         return {};
    284         static_assert(sizeof...(AN) == 10000, "merge takes (optional Coordination, optional Value0, optional ValueN...)");
    285     }
    286 };
    287 
    288 }
    289 
    290 #endif
    291