Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-concat_map.hpp
      6 
      7     \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
      8            For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
      9 
     10     \tparam CollectionSelector  the type of the observable producing function. CollectionSelector must be a function with the signature: observable(concat_map::source_value_type)
     11     \tparam ResultSelector      the type of the aggregation function (optional). ResultSelector must be a function with the signature: concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)
     12     \tparam Coordination        the type of the scheduler (optional).
     13 
     14     \param  s   a function that returns an observable for each item emitted by the source observable.
     15     \param  rs  a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
     16     \param  cn  the scheduler to synchronize sources from different contexts. (optional).
     17 
     18     \return  Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
     19 
     20     Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.
     21 
     22     \sample
     23     \snippet concat_map.cpp concat_map sample
     24     \snippet output.txt concat_map sample
     25 
     26     \sample
     27     \snippet concat_map.cpp threaded concat_map sample
     28     \snippet output.txt threaded concat_map sample
     29 */
     30 
     31 #if !defined(RXCPP_OPERATORS_RX_CONCATMAP_HPP)
     32 #define RXCPP_OPERATORS_RX_CONCATMAP_HPP
     33 
     34 #include "../rx-includes.hpp"
     35 
     36 namespace rxcpp {
     37 
     38 namespace operators {
     39 
     40 namespace detail {
     41 
     42 template<class... AN>
     43 struct concat_map_invalid_arguments {};
     44 
     45 template<class... AN>
     46 struct concat_map_invalid : public rxo::operator_base<concat_map_invalid_arguments<AN...>> {
     47     using type = observable<concat_map_invalid_arguments<AN...>, concat_map_invalid<AN...>>;
     48 };
     49 template<class... AN>
     50 using concat_map_invalid_t = typename concat_map_invalid<AN...>::type;
     51 
     52 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
     53 struct concat_traits {
     54     typedef rxu::decay_t<Observable> source_type;
     55     typedef rxu::decay_t<CollectionSelector> collection_selector_type;
     56     typedef rxu::decay_t<ResultSelector> result_selector_type;
     57     typedef rxu::decay_t<Coordination> coordination_type;
     58 
     59     typedef typename source_type::value_type source_value_type;
     60 
     61     struct tag_not_valid {};
     62     template<class CV, class CCS>
     63     static auto collection_check(int) -> decltype((*(CCS*)nullptr)(*(CV*)nullptr));
     64     template<class CV, class CCS>
     65     static tag_not_valid collection_check(...);
     66 
     67     static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "concat_map CollectionSelector must be a function with the signature observable(concat_map::source_value_type)");
     68 
     69     typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type;
     70 
     71 //#if _MSC_VER >= 1900
     72     static_assert(is_observable<collection_type>::value, "concat_map CollectionSelector must return an observable");
     73 //#endif
     74 
     75     typedef typename collection_type::value_type collection_value_type;
     76 
     77     template<class CV, class CCV, class CRS>
     78     static auto result_check(int) -> decltype((*(CRS*)nullptr)(*(CV*)nullptr, *(CCV*)nullptr));
     79     template<class CV, class CCV, class CRS>
     80     static tag_not_valid result_check(...);
     81 
     82     static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "concat_map ResultSelector must be a function with the signature concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)");
     83 
     84     typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
     85 };
     86 
     87 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
     88 struct concat_map
     89     : public operator_base<rxu::value_type_t<concat_traits<Observable, CollectionSelector, ResultSelector, Coordination>>>
     90 {
     91     typedef concat_map<Observable, CollectionSelector, ResultSelector, Coordination> this_type;
     92     typedef concat_traits<Observable, CollectionSelector, ResultSelector, Coordination> traits;
     93 
     94     typedef typename traits::source_type source_type;
     95     typedef typename traits::collection_selector_type collection_selector_type;
     96     typedef typename traits::result_selector_type result_selector_type;
     97 
     98     typedef typename traits::source_value_type source_value_type;
     99     typedef typename traits::collection_type collection_type;
    100     typedef typename traits::collection_value_type collection_value_type;
    101 
    102     typedef typename traits::coordination_type coordination_type;
    103     typedef typename coordination_type::coordinator_type coordinator_type;
    104 
    105     struct values
    106     {
    107         values(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
    108             : source(std::move(o))
    109             , selectCollection(std::move(s))
    110             , selectResult(std::move(rs))
    111             , coordination(std::move(sf))
    112         {
    113         }
    114         source_type source;
    115         collection_selector_type selectCollection;
    116         result_selector_type selectResult;
    117         coordination_type coordination;
    118     private:
    119         values& operator=(const values&) RXCPP_DELETE;
    120     };
    121     values initial;
    122 
    123     concat_map(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
    124         : initial(std::move(o), std::move(s), std::move(rs), std::move(sf))
    125     {
    126     }
    127 
    128     template<class Subscriber>
    129     void on_subscribe(Subscriber scbr) const {
    130         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
    131 
    132         typedef Subscriber output_type;
    133 
    134         struct concat_map_state_type
    135             : public std::enable_shared_from_this<concat_map_state_type>
    136             , public values
    137         {
    138             concat_map_state_type(values i, coordinator_type coor, output_type oarg)
    139                 : values(std::move(i))
    140                 , sourceLifetime(composite_subscription::empty())
    141                 , collectionLifetime(composite_subscription::empty())
    142                 , coordinator(std::move(coor))
    143                 , out(std::move(oarg))
    144             {
    145             }
    146 
    147             void subscribe_to(source_value_type st)
    148             {
    149                 auto state = this->shared_from_this();
    150 
    151                 auto selectedCollection = on_exception(
    152                     [&](){return state->selectCollection(st);},
    153                     state->out);
    154                 if (selectedCollection.empty()) {
    155                     return;
    156                 }
    157 
    158                 collectionLifetime = composite_subscription();
    159 
    160                 // when the out observer is unsubscribed all the
    161                 // inner subscriptions are unsubscribed as well
    162                 auto innercstoken = state->out.add(collectionLifetime);
    163 
    164                 collectionLifetime.add(make_subscription([state, innercstoken](){
    165                     state->out.remove(innercstoken);
    166                 }));
    167 
    168                 auto selectedSource = on_exception(
    169                     [&](){return state->coordinator.in(selectedCollection.get());},
    170                     state->out);
    171                 if (selectedSource.empty()) {
    172                     return;
    173                 }
    174 
    175                 // this subscribe does not share the source subscription
    176                 // so that when it is unsubscribed the source will continue
    177                 auto sinkInner = make_subscriber<collection_value_type>(
    178                     state->out,
    179                     collectionLifetime,
    180                 // on_next
    181                     [state, st](collection_value_type ct) {
    182                         auto selectedResult = state->selectResult(st, std::move(ct));
    183                         state->out.on_next(std::move(selectedResult));
    184                     },
    185                 // on_error
    186                     [state](rxu::error_ptr e) {
    187                         state->out.on_error(e);
    188                     },
    189                 //on_completed
    190                     [state](){
    191                         if (!state->selectedCollections.empty()) {
    192                             auto value = state->selectedCollections.front();
    193                             state->selectedCollections.pop_front();
    194                             state->collectionLifetime.unsubscribe();
    195                             state->subscribe_to(value);
    196                         } else if (!state->sourceLifetime.is_subscribed()) {
    197                             state->out.on_completed();
    198                         }
    199                     }
    200                 );
    201                 auto selectedSinkInner = on_exception(
    202                     [&](){return state->coordinator.out(sinkInner);},
    203                     state->out);
    204                 if (selectedSinkInner.empty()) {
    205                     return;
    206                 }
    207                 selectedSource->subscribe(std::move(selectedSinkInner.get()));
    208             }
    209             composite_subscription sourceLifetime;
    210             composite_subscription collectionLifetime;
    211             std::deque<source_value_type> selectedCollections;
    212             coordinator_type coordinator;
    213             output_type out;
    214         };
    215 
    216         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
    217 
    218         // take a copy of the values for each subscription
    219         auto state = std::make_shared<concat_map_state_type>(initial, std::move(coordinator), std::move(scbr));
    220 
    221         state->sourceLifetime = composite_subscription();
    222 
    223         // when the out observer is unsubscribed all the
    224         // inner subscriptions are unsubscribed as well
    225         state->out.add(state->sourceLifetime);
    226 
    227         auto source = on_exception(
    228             [&](){return state->coordinator.in(state->source);},
    229             state->out);
    230         if (source.empty()) {
    231             return;
    232         }
    233 
    234         // this subscribe does not share the observer subscription
    235         // so that when it is unsubscribed the observer can be called
    236         // until the inner subscriptions have finished
    237         auto sink = make_subscriber<source_value_type>(
    238             state->out,
    239             state->sourceLifetime,
    240         // on_next
    241             [state](source_value_type st) {
    242                 if (state->collectionLifetime.is_subscribed()) {
    243                     state->selectedCollections.push_back(st);
    244                 } else if (state->selectedCollections.empty()) {
    245                     state->subscribe_to(st);
    246                 }
    247             },
    248         // on_error
    249             [state](rxu::error_ptr e) {
    250                 state->out.on_error(e);
    251             },
    252         // on_completed
    253             [state]() {
    254                 if (!state->collectionLifetime.is_subscribed() && state->selectedCollections.empty()) {
    255                     state->out.on_completed();
    256                 }
    257             }
    258         );
    259         auto selectedSink = on_exception(
    260             [&](){return state->coordinator.out(sink);},
    261             state->out);
    262         if (selectedSink.empty()) {
    263             return;
    264         }
    265         source->subscribe(std::move(selectedSink.get()));
    266 
    267     }
    268 private:
    269     concat_map& operator=(const concat_map&) RXCPP_DELETE;
    270 };
    271 
    272 }
    273 
    274 /*! @copydoc rx-concat_map.hpp
    275 */
    276 template<class... AN>
    277 auto concat_map(AN&&... an)
    278 ->     operator_factory<concat_map_tag, AN...> {
    279     return operator_factory<concat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    280 }
    281 
    282 /*! @copydoc rx-concat_map.hpp
    283 */
    284 template<class... AN>
    285 auto concat_transform(AN&&... an)
    286 ->     operator_factory<concat_map_tag, AN...> {
    287     return operator_factory<concat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    288 }
    289 
    290 }
    291 
    292 template<>
    293 struct member_overload<concat_map_tag>
    294 {
    295     template<class Observable, class CollectionSelector,
    296         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
    297         class SourceValue = rxu::value_type_t<Observable>,
    298         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
    299         class ResultSelectorType = rxu::detail::take_at<1>,
    300         class Enabled = rxu::enable_if_all_true_type_t<
    301             all_observables<Observable, CollectionType>>,
    302         class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>,
    303         class CollectionValueType = rxu::value_type_t<CollectionType>,
    304         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
    305         class Result = observable<Value, ConcatMap>
    306     >
    307     static Result member(Observable&& o, CollectionSelector&& s) {
    308         return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread()));
    309     }
    310 
    311     template<class Observable, class CollectionSelector, class Coordination,
    312         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
    313         class SourceValue = rxu::value_type_t<Observable>,
    314         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
    315         class ResultSelectorType = rxu::detail::take_at<1>,
    316         class Enabled = rxu::enable_if_all_true_type_t<
    317             all_observables<Observable, CollectionType>,
    318             is_coordination<Coordination>>,
    319         class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>,
    320         class CollectionValueType = rxu::value_type_t<CollectionType>,
    321         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
    322         class Result = observable<Value, ConcatMap>
    323     >
    324     static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
    325         return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
    326     }
    327 
    328     template<class Observable, class CollectionSelector, class ResultSelector,
    329         class IsCoordination = is_coordination<ResultSelector>,
    330         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
    331         class SourceValue = rxu::value_type_t<Observable>,
    332         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
    333         class Enabled = rxu::enable_if_all_true_type_t<
    334             all_observables<Observable, CollectionType>,
    335             rxu::negation<IsCoordination>>,
    336         class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>,
    337         class CollectionValueType = rxu::value_type_t<CollectionType>,
    338         class ResultSelectorType = rxu::decay_t<ResultSelector>,
    339         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
    340         class Result = observable<Value, ConcatMap>
    341     >
    342     static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
    343         return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
    344     }
    345 
    346     template<class Observable, class CollectionSelector, class ResultSelector, class Coordination,
    347         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
    348         class SourceValue = rxu::value_type_t<Observable>,
    349         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
    350         class Enabled = rxu::enable_if_all_true_type_t<
    351             all_observables<Observable, CollectionType>,
    352             is_coordination<Coordination>>,
    353         class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
    354         class CollectionValueType = rxu::value_type_t<CollectionType>,
    355         class ResultSelectorType = rxu::decay_t<ResultSelector>,
    356         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
    357         class Result = observable<Value, ConcatMap>
    358     >
    359     static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
    360         return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
    361     }
    362 
    363     template<class... AN>
    364     static operators::detail::concat_map_invalid_t<AN...> member(AN...) {
    365         std::terminate();
    366         return {};
    367         static_assert(sizeof...(AN) == 10000, "concat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
    368     }
    369 };
    370 
    371 }
    372 
    373 #endif
    374