Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-flat_map.hpp
      6 
      7     \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
      8            For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
      9 
     10     \tparam CollectionSelector  the type of the observable producing function. CollectionSelector must be a function with the signature observable(flat_map::source_value_type)
     11     \tparam ResultSelector      the type of the aggregation function (optional). ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type).
     12     \tparam Coordination        the type of the scheduler (optional).
     13 
     14     \param  s   a function that returns an observable for each item emitted by the source observable.
     15     \param  rs  a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
     16     \param  cn  the scheduler to synchronize sources from different contexts (optional).
     17 
     18     \return  Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
     19 
     20     Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but concatenates the observables.
     21 
     22     \sample
     23     \snippet flat_map.cpp flat_map sample
     24     \snippet output.txt flat_map sample
     25 
     26     \sample
     27     \snippet flat_map.cpp threaded flat_map sample
     28     \snippet output.txt threaded flat_map sample
     29 */
     30 
     31 #if !defined(RXCPP_OPERATORS_RX_FLATMAP_HPP)
     32 #define RXCPP_OPERATORS_RX_FLATMAP_HPP
     33 
     34 #include "../rx-includes.hpp"
     35 
     36 namespace rxcpp {
     37 
     38 namespace operators {
     39 
     40 namespace detail {
     41 
     42 template<class... AN>
     43 struct flat_map_invalid_arguments {};
     44 
     45 template<class... AN>
     46 struct flat_map_invalid : public rxo::operator_base<flat_map_invalid_arguments<AN...>> {
     47     using type = observable<flat_map_invalid_arguments<AN...>, flat_map_invalid<AN...>>;
     48 };
     49 template<class... AN>
     50 using flat_map_invalid_t = typename flat_map_invalid<AN...>::type;
     51 
     52 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
     53 struct flat_map_traits {
     54     typedef rxu::decay_t<Observable> source_type;
     55     typedef rxu::decay_t<CollectionSelector> collection_selector_type;
     56     typedef rxu::decay_t<ResultSelector> result_selector_type;
     57     typedef rxu::decay_t<Coordination> coordination_type;
     58 
     59     typedef typename source_type::value_type source_value_type;
     60 
     61     struct tag_not_valid {};
     62     template<class CV, class CCS>
     63     static auto collection_check(int) -> decltype((*(CCS*)nullptr)(*(CV*)nullptr));
     64     template<class CV, class CCS>
     65     static tag_not_valid collection_check(...);
     66 
     67     static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)");
     68 
     69     typedef rxu::decay_t<decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr)))> collection_type;
     70 
     71     static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable");
     72 
     73     typedef typename collection_type::value_type collection_value_type;
     74 
     75     template<class CV, class CCV, class CRS>
     76     static auto result_check(int) -> decltype((*(CRS*)nullptr)(*(CV*)nullptr, *(CCV*)nullptr));
     77     template<class CV, class CCV, class CRS>
     78     static tag_not_valid result_check(...);
     79 
     80     static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)");
     81 
     82     typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
     83 };
     84 
     85 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
     86 struct flat_map
     87     : public operator_base<rxu::value_type_t<flat_map_traits<Observable, CollectionSelector, ResultSelector, Coordination>>>
     88 {
     89     typedef flat_map<Observable, CollectionSelector, ResultSelector, Coordination> this_type;
     90     typedef flat_map_traits<Observable, CollectionSelector, ResultSelector, Coordination> traits;
     91 
     92     typedef typename traits::source_type source_type;
     93     typedef typename traits::collection_selector_type collection_selector_type;
     94     typedef typename traits::result_selector_type result_selector_type;
     95 
     96     typedef typename traits::source_value_type source_value_type;
     97     typedef typename traits::collection_type collection_type;
     98     typedef typename traits::collection_value_type collection_value_type;
     99 
    100     typedef typename traits::coordination_type coordination_type;
    101     typedef typename coordination_type::coordinator_type coordinator_type;
    102 
    103     struct values
    104     {
    105         values(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
    106             : source(std::move(o))
    107             , selectCollection(std::move(s))
    108             , selectResult(std::move(rs))
    109             , coordination(std::move(sf))
    110         {
    111         }
    112         source_type source;
    113         collection_selector_type selectCollection;
    114         result_selector_type selectResult;
    115         coordination_type coordination;
    116     };
    117     values initial;
    118 
    119     flat_map(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
    120         : initial(std::move(o), std::move(s), std::move(rs), std::move(sf))
    121     {
    122     }
    123 
    124     template<class Subscriber>
    125     void on_subscribe(Subscriber scbr) const {
    126         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
    127 
    128         typedef Subscriber output_type;
    129 
    130         struct state_type
    131             : public std::enable_shared_from_this<state_type>
    132             , public values
    133         {
    134             state_type(values i, coordinator_type coor, output_type oarg)
    135                 : values(std::move(i))
    136                 , pendingCompletions(0)
    137                 , coordinator(std::move(coor))
    138                 , out(std::move(oarg))
    139             {
    140             }
    141             // on_completed on the output must wait until all the
    142             // subscriptions have received on_completed
    143             int pendingCompletions;
    144             coordinator_type coordinator;
    145             output_type out;
    146         };
    147 
    148         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
    149 
    150         // take a copy of the values for each subscription
    151         auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(scbr));
    152 
    153         composite_subscription outercs;
    154 
    155         // when the out observer is unsubscribed all the
    156         // inner subscriptions are unsubscribed as well
    157         state->out.add(outercs);
    158 
    159         auto source = on_exception(
    160             [&](){return state->coordinator.in(state->source);},
    161             state->out);
    162         if (source.empty()) {
    163             return;
    164         }
    165 
    166         ++state->pendingCompletions;
    167         // this subscribe does not share the observer subscription
    168         // so that when it is unsubscribed the observer can be called
    169         // until the inner subscriptions have finished
    170         auto sink = make_subscriber<source_value_type>(
    171             state->out,
    172             outercs,
    173         // on_next
    174             [state](source_value_type st) {
    175 
    176                 composite_subscription innercs;
    177 
    178                 // when the out observer is unsubscribed all the
    179                 // inner subscriptions are unsubscribed as well
    180                 auto innercstoken = state->out.add(innercs);
    181 
    182                 innercs.add(make_subscription([state, innercstoken](){
    183                     state->out.remove(innercstoken);
    184                 }));
    185 
    186                 auto selectedCollection = state->selectCollection(st);
    187                 auto selectedSource = state->coordinator.in(selectedCollection);
    188 
    189                 ++state->pendingCompletions;
    190                 // this subscribe does not share the source subscription
    191                 // so that when it is unsubscribed the source will continue
    192                 auto sinkInner = make_subscriber<collection_value_type>(
    193                     state->out,
    194                     innercs,
    195                 // on_next
    196                     [state, st](collection_value_type ct) {
    197                         auto selectedResult = state->selectResult(st, std::move(ct));
    198                         state->out.on_next(std::move(selectedResult));
    199                     },
    200                 // on_error
    201                     [state](rxu::error_ptr e) {
    202                         state->out.on_error(e);
    203                     },
    204                 //on_completed
    205                     [state](){
    206                         if (--state->pendingCompletions == 0) {
    207                             state->out.on_completed();
    208                         }
    209                     }
    210                 );
    211 
    212                 auto selectedSinkInner = state->coordinator.out(sinkInner);
    213                 selectedSource.subscribe(std::move(selectedSinkInner));
    214             },
    215         // on_error
    216             [state](rxu::error_ptr e) {
    217                 state->out.on_error(e);
    218             },
    219         // on_completed
    220             [state]() {
    221                 if (--state->pendingCompletions == 0) {
    222                     state->out.on_completed();
    223                 }
    224             }
    225         );
    226 
    227         auto selectedSink = on_exception(
    228             [&](){return state->coordinator.out(sink);},
    229             state->out);
    230         if (selectedSink.empty()) {
    231             return;
    232         }
    233 
    234         source->subscribe(std::move(selectedSink.get()));
    235 
    236     }
    237 };
    238 
    239 }
    240 
    241 /*! @copydoc rx-flat_map.hpp
    242 */
    243 template<class... AN>
    244 auto flat_map(AN&&... an)
    245 ->     operator_factory<flat_map_tag, AN...> {
    246     return operator_factory<flat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    247 }
    248 
    249 /*! @copydoc rx-flat_map.hpp
    250 */
    251 template<class... AN>
    252 auto merge_transform(AN&&... an)
    253 ->     operator_factory<flat_map_tag, AN...> {
    254     return operator_factory<flat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    255 }
    256 
    257 }
    258 
    259 template<>
    260 struct member_overload<flat_map_tag>
    261 {
    262     template<class Observable, class CollectionSelector,
    263         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
    264         class SourceValue = rxu::value_type_t<Observable>,
    265         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
    266         class ResultSelectorType = rxu::detail::take_at<1>,
    267         class Enabled = rxu::enable_if_all_true_type_t<
    268             all_observables<Observable, CollectionType>>,
    269         class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>,
    270         class CollectionValueType = rxu::value_type_t<CollectionType>,
    271         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
    272         class Result = observable<Value, FlatMap>
    273     >
    274     static Result member(Observable&& o, CollectionSelector&& s) {
    275         return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread()));
    276     }
    277 
    278     template<class Observable, class CollectionSelector, class Coordination,
    279         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
    280         class SourceValue = rxu::value_type_t<Observable>,
    281         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
    282         class ResultSelectorType = rxu::detail::take_at<1>,
    283         class Enabled = rxu::enable_if_all_true_type_t<
    284             all_observables<Observable, CollectionType>,
    285             is_coordination<Coordination>>,
    286         class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>,
    287         class CollectionValueType = rxu::value_type_t<CollectionType>,
    288         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
    289         class Result = observable<Value, FlatMap>
    290     >
    291     static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
    292         return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
    293     }
    294 
    295     template<class Observable, class CollectionSelector, class ResultSelector,
    296         class IsCoordination = is_coordination<ResultSelector>,
    297         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
    298         class SourceValue = rxu::value_type_t<Observable>,
    299         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
    300         class Enabled = rxu::enable_if_all_true_type_t<
    301             all_observables<Observable, CollectionType>,
    302             rxu::negation<IsCoordination>>,
    303         class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>,
    304         class CollectionValueType = rxu::value_type_t<CollectionType>,
    305         class ResultSelectorType = rxu::decay_t<ResultSelector>,
    306         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
    307         class Result = observable<Value, FlatMap>
    308     >
    309     static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
    310         return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
    311     }
    312 
    313     template<class Observable, class CollectionSelector, class ResultSelector, class Coordination,
    314         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
    315         class SourceValue = rxu::value_type_t<Observable>,
    316         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
    317         class Enabled = rxu::enable_if_all_true_type_t<
    318             all_observables<Observable, CollectionType>,
    319             is_coordination<Coordination>>,
    320         class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
    321         class CollectionValueType = rxu::value_type_t<CollectionType>,
    322         class ResultSelectorType = rxu::decay_t<ResultSelector>,
    323         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
    324         class Result = observable<Value, FlatMap>
    325     >
    326     static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
    327         return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
    328     }
    329 
    330     template<class... AN>
    331     static operators::detail::flat_map_invalid_t<AN...> member(AN...) {
    332         std::terminate();
    333         return {};
    334         static_assert(sizeof...(AN) == 10000, "flat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
    335     }
    336 };
    337 
    338 }
    339 
    340 #endif
    341