Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-concat.hpp
      6 
      7     \brief For each item from this observable subscribe to one at a time, in the order received.
      8            For each item from all of the given observables deliver from the new observable that is returned.
      9 
     10            There are 2 variants of the operator:
     11            - The source observable emits nested observables, nested observables are concatenated.
     12            - The source observable and the arguments v0...vn are used to provide the observables to concatenate.
     13 
     14     \tparam Coordination  the type of the scheduler (optional).
     15     \tparam Value0  ... (optional).
     16     \tparam ValueN  types of source observables (optional).
     17 
     18     \param  cn  the scheduler to synchronize sources from different contexts (optional).
     19     \param  v0  ... (optional).
     20     \param  vn  source observables (optional).
     21 
     22     \return  Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them.
     23 
     24     \sample
     25     \snippet concat.cpp implicit concat sample
     26     \snippet output.txt implicit concat sample
     27 
     28     \sample
     29     \snippet concat.cpp threaded implicit concat sample
     30     \snippet output.txt threaded implicit concat sample
     31 
     32     \sample
     33     \snippet concat.cpp concat sample
     34     \snippet output.txt concat sample
     35 
     36     \sample
     37     \snippet concat.cpp threaded concat sample
     38     \snippet output.txt threaded concat sample
     39 */
     40 
     41 #if !defined(RXCPP_OPERATORS_RX_CONCAT_HPP)
     42 #define RXCPP_OPERATORS_RX_CONCAT_HPP
     43 
     44 #include "../rx-includes.hpp"
     45 
     46 namespace rxcpp {
     47 
     48 namespace operators {
     49 
     50 namespace detail {
     51 
     52 template<class... AN>
     53 struct concat_invalid_arguments {};
     54 
     55 template<class... AN>
     56 struct concat_invalid : public rxo::operator_base<concat_invalid_arguments<AN...>> {
     57     using type = observable<concat_invalid_arguments<AN...>, concat_invalid<AN...>>;
     58 };
     59 template<class... AN>
     60 using concat_invalid_t = typename concat_invalid<AN...>::type;
     61 
     62 template<class T, class Observable, class Coordination>
     63 struct concat
     64     : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
     65 {
     66     typedef concat<T, Observable, Coordination> this_type;
     67 
     68     typedef rxu::decay_t<T> source_value_type;
     69     typedef rxu::decay_t<Observable> source_type;
     70     typedef rxu::decay_t<Coordination> coordination_type;
     71 
     72     typedef typename coordination_type::coordinator_type coordinator_type;
     73 
     74     typedef typename source_type::source_operator_type source_operator_type;
     75     typedef source_value_type collection_type;
     76     typedef typename collection_type::value_type value_type;
     77 
     78     struct values
     79     {
     80         values(source_operator_type o, coordination_type sf)
     81             : source_operator(std::move(o))
     82             , coordination(std::move(sf))
     83         {
     84         }
     85         source_operator_type source_operator;
     86         coordination_type coordination;
     87     };
     88     values initial;
     89 
     90     concat(const source_type& o, coordination_type sf)
     91         : initial(o.source_operator, std::move(sf))
     92     {
     93     }
     94 
     95     template<class Subscriber>
     96     void on_subscribe(Subscriber scbr) const {
     97         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
     98 
     99         typedef Subscriber output_type;
    100 
    101         struct concat_state_type
    102             : public std::enable_shared_from_this<concat_state_type>
    103             , public values
    104         {
    105             concat_state_type(values i, coordinator_type coor, output_type oarg)
    106                 : values(i)
    107                 , source(i.source_operator)
    108                 , sourceLifetime(composite_subscription::empty())
    109                 , collectionLifetime(composite_subscription::empty())
    110                 , coordinator(std::move(coor))
    111                 , out(std::move(oarg))
    112             {
    113             }
    114 
    115             void subscribe_to(collection_type st)
    116             {
    117                 auto state = this->shared_from_this();
    118 
    119                 collectionLifetime = composite_subscription();
    120 
    121                 // when the out observer is unsubscribed all the
    122                 // inner subscriptions are unsubscribed as well
    123                 auto innercstoken = state->out.add(collectionLifetime);
    124 
    125                 collectionLifetime.add(make_subscription([state, innercstoken](){
    126                     state->out.remove(innercstoken);
    127                 }));
    128 
    129                 auto selectedSource = on_exception(
    130                     [&](){return state->coordinator.in(std::move(st));},
    131                     state->out);
    132                 if (selectedSource.empty()) {
    133                     return;
    134                 }
    135 
    136                 // this subscribe does not share the out subscription
    137                 // so that when it is unsubscribed the out will continue
    138                 auto sinkInner = make_subscriber<value_type>(
    139                     state->out,
    140                     collectionLifetime,
    141                 // on_next
    142                     [state, st](value_type ct) {
    143                         state->out.on_next(ct);
    144                     },
    145                 // on_error
    146                     [state](rxu::error_ptr e) {
    147                         state->out.on_error(e);
    148                     },
    149                 //on_completed
    150                     [state](){
    151                         if (!state->selectedCollections.empty()) {
    152                             auto value = state->selectedCollections.front();
    153                             state->selectedCollections.pop_front();
    154                             state->collectionLifetime.unsubscribe();
    155                             state->subscribe_to(value);
    156                         } else if (!state->sourceLifetime.is_subscribed()) {
    157                             state->out.on_completed();
    158                         }
    159                     }
    160                 );
    161                 auto selectedSinkInner = on_exception(
    162                     [&](){return state->coordinator.out(sinkInner);},
    163                     state->out);
    164                 if (selectedSinkInner.empty()) {
    165                     return;
    166                 }
    167                 selectedSource->subscribe(std::move(selectedSinkInner.get()));
    168             }
    169             observable<source_value_type, source_operator_type> source;
    170             composite_subscription sourceLifetime;
    171             composite_subscription collectionLifetime;
    172             std::deque<collection_type> selectedCollections;
    173             coordinator_type coordinator;
    174             output_type out;
    175         };
    176 
    177         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
    178 
    179         // take a copy of the values for each subscription
    180         auto state = std::make_shared<concat_state_type>(initial, std::move(coordinator), std::move(scbr));
    181 
    182         state->sourceLifetime = composite_subscription();
    183 
    184         // when the out observer is unsubscribed all the
    185         // inner subscriptions are unsubscribed as well
    186         state->out.add(state->sourceLifetime);
    187 
    188         auto source = on_exception(
    189             [&](){return state->coordinator.in(state->source);},
    190             state->out);
    191         if (source.empty()) {
    192             return;
    193         }
    194 
    195         // this subscribe does not share the observer subscription
    196         // so that when it is unsubscribed the observer can be called
    197         // until the inner subscriptions have finished
    198         auto sink = make_subscriber<collection_type>(
    199             state->out,
    200             state->sourceLifetime,
    201         // on_next
    202             [state](collection_type st) {
    203                 if (state->collectionLifetime.is_subscribed()) {
    204                     state->selectedCollections.push_back(st);
    205                 } else if (state->selectedCollections.empty()) {
    206                     state->subscribe_to(st);
    207                 }
    208             },
    209         // on_error
    210             [state](rxu::error_ptr e) {
    211                 state->out.on_error(e);
    212             },
    213         // on_completed
    214             [state]() {
    215                 if (!state->collectionLifetime.is_subscribed() && state->selectedCollections.empty()) {
    216                     state->out.on_completed();
    217                 }
    218             }
    219         );
    220         auto selectedSink = on_exception(
    221             [&](){return state->coordinator.out(sink);},
    222             state->out);
    223         if (selectedSink.empty()) {
    224             return;
    225         }
    226         source->subscribe(std::move(selectedSink.get()));
    227     }
    228 };
    229 
    230 }
    231 
    232 /*! @copydoc rx-concat.hpp
    233 */
    234 template<class... AN>
    235 auto concat(AN&&... an)
    236     ->     operator_factory<concat_tag, AN...> {
    237     return operator_factory<concat_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    238 }
    239 
    240 }
    241 
    242 template<>
    243 struct member_overload<concat_tag>
    244 {
    245     template<class Observable,
    246         class Enabled = rxu::enable_if_all_true_type_t<
    247             is_observable<Observable>>,
    248         class SourceValue = rxu::value_type_t<Observable>,
    249         class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
    250         class Value = rxu::value_type_t<SourceValue>,
    251         class Result = observable<Value, Concat>
    252     >
    253     static Result member(Observable&& o) {
    254         return Result(Concat(std::forward<Observable>(o), identity_current_thread()));
    255     }
    256 
    257     template<class Observable, class Coordination,
    258         class Enabled = rxu::enable_if_all_true_type_t<
    259             is_observable<Observable>,
    260             is_coordination<Coordination>>,
    261         class SourceValue = rxu::value_type_t<Observable>,
    262         class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
    263         class Value = rxu::value_type_t<SourceValue>,
    264         class Result = observable<Value, Concat>
    265     >
    266     static Result member(Observable&& o, Coordination&& cn) {
    267         return Result(Concat(std::forward<Observable>(o), std::forward<Coordination>(cn)));
    268     }
    269 
    270     template<class Observable, class Value0, class... ValueN,
    271         class Enabled = rxu::enable_if_all_true_type_t<
    272             all_observables<Observable, Value0, ValueN...>>,
    273         class EmittedValue = rxu::value_type_t<Observable>,
    274         class SourceValue = observable<EmittedValue>,
    275         class ObservableObservable = observable<SourceValue>,
    276         class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, identity_one_worker>::type,
    277         class Value = rxu::value_type_t<Concat>,
    278         class Result = observable<Value, Concat>
    279     >
    280     static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
    281         return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
    282     }
    283 
    284     template<class Observable, class Coordination, class Value0, class... ValueN,
    285         class Enabled = rxu::enable_if_all_true_type_t<
    286             all_observables<Observable, Value0, ValueN...>,
    287             is_coordination<Coordination>>,
    288         class EmittedValue = rxu::value_type_t<Observable>,
    289         class SourceValue = observable<EmittedValue>,
    290         class ObservableObservable = observable<SourceValue>,
    291         class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
    292         class Value = rxu::value_type_t<Concat>,
    293         class Result = observable<Value, Concat>
    294     >
    295     static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
    296         return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
    297     }
    298 
    299     template<class... AN>
    300     static operators::detail::concat_invalid_t<AN...> member(AN...) {
    301         std::terminate();
    302         return {};
    303         static_assert(sizeof...(AN) == 10000, "concat takes (optional Coordination, optional Value0, optional ValueN...)");
    304     }
    305 };
    306 
    307 }
    308 
    309 #endif
    310