Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-amb.hpp
      6 
      7     \brief For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler.
      8 
      9            There are 2 variants of the operator:
     10            - The source observable emits nested observables, one of the nested observables is selected.
     11            - The source observable and the arguments v0...vn are used to provide the observables to select from.
     12 
     13     \tparam Coordination  the type of the scheduler (optional).
     14     \tparam Value0        ... (optional).
     15     \tparam ValueN        types of source observables (optional).
     16 
     17     \param  cn  the scheduler to synchronize sources from different contexts (optional).
     18     \param  v0  ... (optional).
     19     \param  vn  source observables (optional).
     20 
     21     \return  Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.
     22 
     23     If scheduler is omitted, identity_current_thread is used.
     24 
     25     \sample
     26     \snippet amb.cpp threaded implicit amb sample
     27     \snippet output.txt threaded implicit amb sample
     28 
     29     \snippet amb.cpp implicit amb sample
     30     \snippet output.txt implicit amb sample
     31 
     32     \snippet amb.cpp amb sample
     33     \snippet output.txt amb sample
     34 
     35     \snippet amb.cpp threaded amb sample
     36     \snippet output.txt threaded amb sample
     37 */
     38 
     39 #if !defined(RXCPP_OPERATORS_RX_AMB_HPP)
     40 #define RXCPP_OPERATORS_RX_AMB_HPP
     41 
     42 #include "../rx-includes.hpp"
     43 
     44 namespace rxcpp {
     45 
     46 namespace operators {
     47 
     48 namespace detail {
     49 
     50 template<class... AN>
     51 struct amb_invalid_arguments {};
     52 
     53 template<class... AN>
     54 struct amb_invalid : public rxo::operator_base<amb_invalid_arguments<AN...>> {
     55     using type = observable<amb_invalid_arguments<AN...>, amb_invalid<AN...>>;
     56 };
     57 template<class... AN>
     58 using amb_invalid_t = typename amb_invalid<AN...>::type;
     59 
     60 template<class T, class Observable, class Coordination>
     61 struct amb
     62     : public operator_base<rxu::value_type_t<T>>
     63 {
     64     //static_assert(is_observable<Observable>::value, "amb requires an observable");
     65     //static_assert(is_observable<T>::value, "amb requires an observable that contains observables");
     66 
     67     typedef amb<T, Observable, Coordination> this_type;
     68 
     69     typedef rxu::decay_t<T> source_value_type;
     70     typedef rxu::decay_t<Observable> source_type;
     71 
     72     typedef typename source_type::source_operator_type source_operator_type;
     73     typedef typename source_value_type::value_type value_type;
     74 
     75     typedef rxu::decay_t<Coordination> coordination_type;
     76     typedef typename coordination_type::coordinator_type coordinator_type;
     77 
     78     struct values
     79     {
     80         values(source_operator_type o, coordination_type sf)
     81             : source_operator(std::move(o))
     82             , coordination(std::move(sf))
     83         {
     84         }
     85         source_operator_type source_operator;
     86         coordination_type coordination;
     87     };
     88     values initial;
     89 
     90     amb(const source_type& o, coordination_type sf)
     91         : initial(o.source_operator, std::move(sf))
     92     {
     93     }
     94 
     95     template<class Subscriber>
     96     void on_subscribe(Subscriber scbr) const {
     97         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
     98 
     99         typedef Subscriber output_type;
    100 
    101         struct amb_state_type
    102             : public std::enable_shared_from_this<amb_state_type>
    103             , public values
    104         {
    105             amb_state_type(values i, coordinator_type coor, output_type oarg)
    106                 : values(i)
    107                 , source(i.source_operator)
    108                 , coordinator(std::move(coor))
    109                 , out(std::move(oarg))
    110                 , pendingObservables(0)
    111                 , firstEmitted(false)
    112             {
    113             }
    114             observable<source_value_type, source_operator_type> source;
    115             coordinator_type coordinator;
    116             output_type out;
    117             int pendingObservables;
    118             bool firstEmitted;
    119             std::vector<composite_subscription> innerSubscriptions;
    120         };
    121 
    122         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
    123 
    124         // take a copy of the values for each subscription
    125         auto state = std::make_shared<amb_state_type>(initial, std::move(coordinator), std::move(scbr));
    126 
    127         composite_subscription outercs;
    128 
    129         // when the out observer is unsubscribed all the
    130         // inner subscriptions are unsubscribed as well
    131         state->out.add(outercs);
    132 
    133         auto source = on_exception(
    134             [&](){return state->coordinator.in(state->source);},
    135             state->out);
    136         if (source.empty()) {
    137             return;
    138         }
    139 
    140         // this subscribe does not share the observer subscription
    141         // so that when it is unsubscribed the observer can be called
    142         // until the inner subscriptions have finished
    143         auto sink = make_subscriber<source_value_type>(
    144             state->out,
    145             outercs,
    146         // on_next
    147             [state](source_value_type st) {
    148 
    149                 if (state->firstEmitted)
    150                     return;
    151 
    152                 composite_subscription innercs;
    153 
    154                 state->innerSubscriptions.push_back(innercs);
    155 
    156                 // when the out observer is unsubscribed all the
    157                 // inner subscriptions are unsubscribed as well
    158                 auto innercstoken = state->out.add(innercs);
    159 
    160                 innercs.add(make_subscription([state, innercstoken](){
    161                     state->out.remove(innercstoken);
    162                 }));
    163 
    164                 auto selectedSource = state->coordinator.in(st);
    165 
    166                 auto current_id = state->pendingObservables++;
    167 
    168                 // this subscribe does not share the source subscription
    169                 // so that when it is unsubscribed the source will continue
    170                 auto sinkInner = make_subscriber<value_type>(
    171                     state->out,
    172                     innercs,
    173                 // on_next
    174                     [state, st, current_id](value_type ct) {
    175                         state->out.on_next(std::move(ct));
    176                         if (!state->firstEmitted) {
    177                             state->firstEmitted = true;
    178                             auto do_unsubscribe = [](composite_subscription cs) {
    179                                 cs.unsubscribe();
    180                             };
    181                             std::for_each(state->innerSubscriptions.begin(), state->innerSubscriptions.begin() + current_id, do_unsubscribe);
    182                             std::for_each(state->innerSubscriptions.begin() + current_id + 1, state->innerSubscriptions.end(), do_unsubscribe);
    183                         }
    184                     },
    185                 // on_error
    186                     [state](rxu::error_ptr e) {
    187                         state->out.on_error(e);
    188                     },
    189                 //on_completed
    190                     [state](){
    191                         state->out.on_completed();
    192                     }
    193                 );
    194 
    195                 auto selectedSinkInner = state->coordinator.out(sinkInner);
    196                 selectedSource.subscribe(std::move(selectedSinkInner));
    197             },
    198         // on_error
    199             [state](rxu::error_ptr e) {
    200                 state->out.on_error(e);
    201             },
    202         // on_completed
    203             [state]() {
    204                 if (state->pendingObservables == 0) {
    205                     state->out.on_completed();
    206                 }
    207             }
    208         );
    209         auto selectedSink = on_exception(
    210             [&](){return state->coordinator.out(sink);},
    211             state->out);
    212         if (selectedSink.empty()) {
    213             return;
    214         }
    215         source->subscribe(std::move(selectedSink.get()));
    216     }
    217 };
    218 
    219 }
    220 
    221 /*! @copydoc rx-amb.hpp
    222 */
    223 template<class... AN>
    224 auto amb(AN&&... an)
    225     ->     operator_factory<amb_tag, AN...> {
    226     return operator_factory<amb_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    227 }
    228 
    229 }
    230 
    231 template<>
    232 struct member_overload<amb_tag>
    233 {
    234     template<class Observable,
    235         class Enabled = rxu::enable_if_all_true_type_t<
    236             is_observable<Observable>>,
    237         class SourceValue = rxu::value_type_t<Observable>,
    238         class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
    239         class Value = rxu::value_type_t<SourceValue>,
    240         class Result = observable<Value, Amb>
    241     >
    242     static Result member(Observable&& o) {
    243         return Result(Amb(std::forward<Observable>(o), identity_current_thread()));
    244     }
    245 
    246     template<class Observable, class Coordination,
    247         class Enabled = rxu::enable_if_all_true_type_t<
    248             is_observable<Observable>,
    249             is_coordination<Coordination>>,
    250         class SourceValue = rxu::value_type_t<Observable>,
    251         class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
    252         class Value = rxu::value_type_t<SourceValue>,
    253         class Result = observable<Value, Amb>
    254     >
    255     static Result member(Observable&& o, Coordination&& cn) {
    256         return Result(Amb(std::forward<Observable>(o), std::forward<Coordination>(cn)));
    257     }
    258 
    259     template<class Observable, class Value0, class... ValueN,
    260         class Enabled = rxu::enable_if_all_true_type_t<
    261             all_observables<Observable, Value0, ValueN...>>,
    262         class EmittedValue = rxu::value_type_t<Observable>,
    263         class SourceValue = observable<EmittedValue>,
    264         class ObservableObservable = observable<SourceValue>,
    265         class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, identity_one_worker>::type,
    266         class Value = rxu::value_type_t<Amb>,
    267         class Result = observable<Value, Amb>
    268     >
    269     static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
    270         return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
    271     }
    272 
    273     template<class Observable, class Coordination, class Value0, class... ValueN,
    274         class Enabled = rxu::enable_if_all_true_type_t<
    275             all_observables<Observable, Value0, ValueN...>,
    276             is_coordination<Coordination>>,
    277         class EmittedValue = rxu::value_type_t<Observable>,
    278         class SourceValue = observable<EmittedValue>,
    279         class ObservableObservable = observable<SourceValue>,
    280         class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
    281         class Value = rxu::value_type_t<Amb>,
    282         class Result = observable<Value, Amb>
    283     >
    284     static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
    285         return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
    286     }
    287 
    288     template<class... AN>
    289     static operators::detail::amb_invalid_t<AN...> member(AN...) {
    290         std::terminate();
    291         return {};
    292         static_assert(sizeof...(AN) == 10000, "amb takes (optional Coordination, optional Value0, optional ValueN...)");
    293     }
    294 };
    295 
    296 }
    297 
    298 #endif
    299