Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-merge_delay_error.hpp
      6 
      7         \brief For each given observable subscribe.
      8                    For each item emitted from all of the given observables, deliver from the new observable that is returned.
      9                    The first error to occure is hold off until all of the given non-error-emitting observables have finished their emission.
     10 
     11                    There are 2 variants of the operator:
     12                    - The source observable emits nested observables, nested observables are merged.
     13                    - The source observable and the arguments v0...vn are used to provide the observables to merge.
     14 
     15         \tparam Coordination  the type of the scheduler (optional).
     16         \tparam Value0  ... (optional).
     17         \tparam ValueN  types of source observables (optional).
     18 
     19         \param  cn      the scheduler to synchronize sources from different contexts (optional).
     20         \param  v0      ... (optional).
     21         \param  vn      source observables (optional).
     22 
     23         \return                                                                                                              Observable that emits items that are the result of flattening the observables emitted by the source observable.
     24 
     25         If scheduler is omitted, identity_current_thread is used.
     26 
     27         \sample
     28         \snippet merge_delay_error.cpp threaded implicit merge sample
     29         \snippet output.txt threaded implicit merge sample
     30 
     31         \sample
     32         \snippet merge_delay_error.cpp implicit merge sample
     33         \snippet output.txt implicit merge sample
     34 
     35         \sample
     36         \snippet merge_delay_error.cpp merge sample
     37         \snippet output.txt merge sample
     38 
     39         \sample
     40         \snippet merge_delay_error.cpp threaded merge sample
     41         \snippet output.txt threaded merge sample
     42 */
     43 
     44 #if !defined(RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP)
     45 #define RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP
     46 
     47 #include "rx-merge.hpp"
     48 
     49 #include "../rx-composite_exception.hpp"
     50 
     51 namespace rxcpp {
     52 
     53 namespace operators {
     54 
     55 namespace detail {
     56 
     57 template<class T, class Observable, class Coordination>
     58 struct merge_delay_error
     59         : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
     60 {
     61         //static_assert(is_observable<Observable>::value, "merge requires an observable");
     62         //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
     63 
     64         typedef merge_delay_error<T, Observable, Coordination> this_type;
     65 
     66         typedef rxu::decay_t<T> source_value_type;
     67         typedef rxu::decay_t<Observable> source_type;
     68 
     69         typedef typename source_type::source_operator_type source_operator_type;
     70         typedef typename source_value_type::value_type value_type;
     71 
     72         typedef rxu::decay_t<Coordination> coordination_type;
     73         typedef typename coordination_type::coordinator_type coordinator_type;
     74 
     75         struct values
     76         {
     77                 values(source_operator_type o, coordination_type sf)
     78                         : source_operator(std::move(o))
     79                         , coordination(std::move(sf))
     80                 {
     81                 }
     82                 source_operator_type source_operator;
     83                 coordination_type coordination;
     84         };
     85         values initial;
     86 
     87         merge_delay_error(const source_type& o, coordination_type sf)
     88                 : initial(o.source_operator, std::move(sf))
     89         {
     90         }
     91 
     92         template<class Subscriber>
     93         void on_subscribe(Subscriber scbr) const {
     94                 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
     95 
     96                 typedef Subscriber output_type;
     97 
     98                 struct merge_state_type
     99                         : public std::enable_shared_from_this<merge_state_type>
    100                         , public values
    101                 {
    102                         merge_state_type(values i, coordinator_type coor, output_type oarg)
    103                                 : values(i)
    104                                 , source(i.source_operator)
    105                                 , pendingCompletions(0)
    106                                 , coordinator(std::move(coor))
    107                                 , out(std::move(oarg))
    108                         {
    109                         }
    110                         observable<source_value_type, source_operator_type> source;
    111                         // on_completed on the output must wait until all the
    112                         // subscriptions have received on_completed
    113                         int pendingCompletions;
    114                         composite_exception exception;;
    115                         coordinator_type coordinator;
    116                         output_type out;
    117                 };
    118 
    119                 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
    120 
    121                 // take a copy of the values for each subscription
    122                 auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr));
    123 
    124                 composite_subscription outercs;
    125 
    126                 // when the out observer is unsubscribed all the
    127                 // inner subscriptions are unsubscribed as well
    128                 state->out.add(outercs);
    129 
    130                 auto source = on_exception(
    131                         [&](){return state->coordinator.in(state->source);},
    132                         state->out);
    133                 if (source.empty()) {
    134                         return;
    135                 }
    136 
    137                 ++state->pendingCompletions;
    138                 // this subscribe does not share the observer subscription
    139                 // so that when it is unsubscribed the observer can be called
    140                 // until the inner subscriptions have finished
    141                 auto sink = make_subscriber<source_value_type>(
    142                         state->out,
    143                         outercs,
    144                 // on_next
    145                         [state](source_value_type st) {
    146 
    147                                 composite_subscription innercs;
    148 
    149                                 // when the out observer is unsubscribed all the
    150                                 // inner subscriptions are unsubscribed as well
    151                                 auto innercstoken = state->out.add(innercs);
    152 
    153                                 innercs.add(make_subscription([state, innercstoken](){
    154                                         state->out.remove(innercstoken);
    155                                 }));
    156 
    157                                 auto selectedSource = state->coordinator.in(st);
    158 
    159                                 ++state->pendingCompletions;
    160                                 // this subscribe does not share the source subscription
    161                                 // so that when it is unsubscribed the source will continue
    162                                 auto sinkInner = make_subscriber<value_type>(
    163                                         state->out,
    164                                         innercs,
    165                                 // on_next
    166                                         [state, st](value_type ct) {
    167                                                 state->out.on_next(std::move(ct));
    168                                         },
    169                                 // on_error
    170                                         [state](rxu::error_ptr e) {
    171                                                 if(--state->pendingCompletions == 0) {
    172                                                     state->out.on_error(
    173                                                         rxu::make_error_ptr(std::move(state->exception.add(e))));
    174                                                 } else {
    175                                                         state->exception.add(e);
    176                                                 }
    177                                         },
    178                                 //on_completed
    179                                         [state](){
    180                                                 if (--state->pendingCompletions == 0) {
    181                                                         if(!state->exception.empty()) {
    182                                                             state->out.on_error(
    183                                                                 rxu::make_error_ptr(std::move(state->exception)));
    184                                                         } else {
    185                                                                 state->out.on_completed();
    186                                                         }
    187                                                 }
    188                                         }
    189                                 );
    190 
    191                                 auto selectedSinkInner = state->coordinator.out(sinkInner);
    192                                 selectedSource.subscribe(std::move(selectedSinkInner));
    193                         },
    194                 // on_error
    195                         [state](rxu::error_ptr e) {
    196                             if(--state->pendingCompletions == 0) {
    197                                 state->out.on_error(
    198                                     rxu::make_error_ptr(std::move(state->exception.add(e))));
    199                             } else {
    200                                 state->exception.add(e);
    201                             }
    202                         },
    203                 // on_completed
    204                         [state]() {
    205                             if (--state->pendingCompletions == 0) {
    206                                 if(!state->exception.empty()) {
    207                                     state->out.on_error(
    208                                         rxu::make_error_ptr(std::move(state->exception)));
    209                                 } else {
    210                                     state->out.on_completed();
    211                                 }
    212                             }
    213                         }
    214                 );
    215                 auto selectedSink = on_exception(
    216                         [&](){return state->coordinator.out(sink);},
    217                         state->out);
    218                 if (selectedSink.empty()) {
    219                         return;
    220                 }
    221                 source->subscribe(std::move(selectedSink.get()));
    222         }
    223 };
    224 
    225 }
    226 
    227 /*! @copydoc rx-merge-delay-error.hpp
    228 */
    229 template<class... AN>
    230 auto merge_delay_error(AN&&... an)
    231         ->         operator_factory<merge_delay_error_tag, AN...> {
    232         return operator_factory<merge_delay_error_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    233 }
    234 
    235 }
    236 
    237 template<>
    238 struct member_overload<merge_delay_error_tag>
    239 {
    240         template<class Observable,
    241                 class Enabled = rxu::enable_if_all_true_type_t<
    242                         is_observable<Observable>>,
    243                 class SourceValue = rxu::value_type_t<Observable>,
    244                 class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
    245                 class Value = rxu::value_type_t<SourceValue>,
    246                 class Result = observable<Value, Merge>
    247         >
    248         static Result member(Observable&& o) {
    249                 return Result(Merge(std::forward<Observable>(o), identity_current_thread()));
    250         }
    251 
    252         template<class Observable, class Coordination,
    253                 class Enabled = rxu::enable_if_all_true_type_t<
    254                         is_observable<Observable>,
    255                         is_coordination<Coordination>>,
    256                 class SourceValue = rxu::value_type_t<Observable>,
    257                 class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
    258                 class Value = rxu::value_type_t<SourceValue>,
    259                 class Result = observable<Value, Merge>
    260         >
    261         static Result member(Observable&& o, Coordination&& cn) {
    262                 return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn)));
    263         }
    264 
    265         template<class Observable, class Value0, class... ValueN,
    266                 class Enabled = rxu::enable_if_all_true_type_t<
    267                         all_observables<Observable, Value0, ValueN...>>,
    268                 class EmittedValue = rxu::value_type_t<Observable>,
    269                 class SourceValue = observable<EmittedValue>,
    270                 class ObservableObservable = observable<SourceValue>,
    271                 class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, identity_one_worker>::type,
    272                 class Value = rxu::value_type_t<Merge>,
    273                 class Result = observable<Value, Merge>
    274         >
    275         static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
    276                 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
    277         }
    278 
    279         template<class Observable, class Coordination, class Value0, class... ValueN,
    280                 class Enabled = rxu::enable_if_all_true_type_t<
    281                         all_observables<Observable, Value0, ValueN...>,
    282                         is_coordination<Coordination>>,
    283                 class EmittedValue = rxu::value_type_t<Observable>,
    284                 class SourceValue = observable<EmittedValue>,
    285                 class ObservableObservable = observable<SourceValue>,
    286                 class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
    287                 class Value = rxu::value_type_t<Merge>,
    288                 class Result = observable<Value, Merge>
    289         >
    290         static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
    291                 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
    292         }
    293 
    294         template<class... AN>
    295         static operators::detail::merge_invalid_t<AN...> member(AN...) {
    296                 std::terminate();
    297                 return {};
    298                 static_assert(sizeof...(AN) == 10000, "merge_delay_error takes (optional Coordination, optional Value0, optional ValueN...)");
    299         }
    300 };
    301 
    302 }
    303 
    304 #endif
    305