1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-flat_map.hpp 6 7 \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. 8 For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. 9 10 \tparam CollectionSelector the type of the observable producing function. CollectionSelector must be a function with the signature observable(flat_map::source_value_type) 11 \tparam ResultSelector the type of the aggregation function (optional). ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type). 12 \tparam Coordination the type of the scheduler (optional). 13 14 \param s a function that returns an observable for each item emitted by the source observable. 15 \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional). 16 \param cn the scheduler to synchronize sources from different contexts (optional). 17 18 \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. 19 20 Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but concatenates the observables. 21 22 \sample 23 \snippet flat_map.cpp flat_map sample 24 \snippet output.txt flat_map sample 25 26 \sample 27 \snippet flat_map.cpp threaded flat_map sample 28 \snippet output.txt threaded flat_map sample 29 */ 30 31 #if !defined(RXCPP_OPERATORS_RX_FLATMAP_HPP) 32 #define RXCPP_OPERATORS_RX_FLATMAP_HPP 33 34 #include "../rx-includes.hpp" 35 36 namespace rxcpp { 37 38 namespace operators { 39 40 namespace detail { 41 42 template<class... AN> 43 struct flat_map_invalid_arguments {}; 44 45 template<class... AN> 46 struct flat_map_invalid : public rxo::operator_base<flat_map_invalid_arguments<AN...>> { 47 using type = observable<flat_map_invalid_arguments<AN...>, flat_map_invalid<AN...>>; 48 }; 49 template<class... AN> 50 using flat_map_invalid_t = typename flat_map_invalid<AN...>::type; 51 52 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination> 53 struct flat_map_traits { 54 typedef rxu::decay_t<Observable> source_type; 55 typedef rxu::decay_t<CollectionSelector> collection_selector_type; 56 typedef rxu::decay_t<ResultSelector> result_selector_type; 57 typedef rxu::decay_t<Coordination> coordination_type; 58 59 typedef typename source_type::value_type source_value_type; 60 61 struct tag_not_valid {}; 62 template<class CV, class CCS> 63 static auto collection_check(int) -> decltype((*(CCS*)nullptr)(*(CV*)nullptr)); 64 template<class CV, class CCS> 65 static tag_not_valid collection_check(...); 66 67 static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)"); 68 69 typedef rxu::decay_t<decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr)))> collection_type; 70 71 static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable"); 72 73 typedef typename collection_type::value_type collection_value_type; 74 75 template<class CV, class CCV, class CRS> 76 static auto result_check(int) -> decltype((*(CRS*)nullptr)(*(CV*)nullptr, *(CCV*)nullptr)); 77 template<class CV, class CCV, class CRS> 78 static tag_not_valid result_check(...); 79 80 static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)"); 81 82 typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type; 83 }; 84 85 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination> 86 struct flat_map 87 : public operator_base<rxu::value_type_t<flat_map_traits<Observable, CollectionSelector, ResultSelector, Coordination>>> 88 { 89 typedef flat_map<Observable, CollectionSelector, ResultSelector, Coordination> this_type; 90 typedef flat_map_traits<Observable, CollectionSelector, ResultSelector, Coordination> traits; 91 92 typedef typename traits::source_type source_type; 93 typedef typename traits::collection_selector_type collection_selector_type; 94 typedef typename traits::result_selector_type result_selector_type; 95 96 typedef typename traits::source_value_type source_value_type; 97 typedef typename traits::collection_type collection_type; 98 typedef typename traits::collection_value_type collection_value_type; 99 100 typedef typename traits::coordination_type coordination_type; 101 typedef typename coordination_type::coordinator_type coordinator_type; 102 103 struct values 104 { 105 values(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf) 106 : source(std::move(o)) 107 , selectCollection(std::move(s)) 108 , selectResult(std::move(rs)) 109 , coordination(std::move(sf)) 110 { 111 } 112 source_type source; 113 collection_selector_type selectCollection; 114 result_selector_type selectResult; 115 coordination_type coordination; 116 }; 117 values initial; 118 119 flat_map(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf) 120 : initial(std::move(o), std::move(s), std::move(rs), std::move(sf)) 121 { 122 } 123 124 template<class Subscriber> 125 void on_subscribe(Subscriber scbr) const { 126 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); 127 128 typedef Subscriber output_type; 129 130 struct state_type 131 : public std::enable_shared_from_this<state_type> 132 , public values 133 { 134 state_type(values i, coordinator_type coor, output_type oarg) 135 : values(std::move(i)) 136 , pendingCompletions(0) 137 , coordinator(std::move(coor)) 138 , out(std::move(oarg)) 139 { 140 } 141 // on_completed on the output must wait until all the 142 // subscriptions have received on_completed 143 int pendingCompletions; 144 coordinator_type coordinator; 145 output_type out; 146 }; 147 148 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); 149 150 // take a copy of the values for each subscription 151 auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(scbr)); 152 153 composite_subscription outercs; 154 155 // when the out observer is unsubscribed all the 156 // inner subscriptions are unsubscribed as well 157 state->out.add(outercs); 158 159 auto source = on_exception( 160 [&](){return state->coordinator.in(state->source);}, 161 state->out); 162 if (source.empty()) { 163 return; 164 } 165 166 ++state->pendingCompletions; 167 // this subscribe does not share the observer subscription 168 // so that when it is unsubscribed the observer can be called 169 // until the inner subscriptions have finished 170 auto sink = make_subscriber<source_value_type>( 171 state->out, 172 outercs, 173 // on_next 174 [state](source_value_type st) { 175 176 composite_subscription innercs; 177 178 // when the out observer is unsubscribed all the 179 // inner subscriptions are unsubscribed as well 180 auto innercstoken = state->out.add(innercs); 181 182 innercs.add(make_subscription([state, innercstoken](){ 183 state->out.remove(innercstoken); 184 })); 185 186 auto selectedCollection = state->selectCollection(st); 187 auto selectedSource = state->coordinator.in(selectedCollection); 188 189 ++state->pendingCompletions; 190 // this subscribe does not share the source subscription 191 // so that when it is unsubscribed the source will continue 192 auto sinkInner = make_subscriber<collection_value_type>( 193 state->out, 194 innercs, 195 // on_next 196 [state, st](collection_value_type ct) { 197 auto selectedResult = state->selectResult(st, std::move(ct)); 198 state->out.on_next(std::move(selectedResult)); 199 }, 200 // on_error 201 [state](rxu::error_ptr e) { 202 state->out.on_error(e); 203 }, 204 //on_completed 205 [state](){ 206 if (--state->pendingCompletions == 0) { 207 state->out.on_completed(); 208 } 209 } 210 ); 211 212 auto selectedSinkInner = state->coordinator.out(sinkInner); 213 selectedSource.subscribe(std::move(selectedSinkInner)); 214 }, 215 // on_error 216 [state](rxu::error_ptr e) { 217 state->out.on_error(e); 218 }, 219 // on_completed 220 [state]() { 221 if (--state->pendingCompletions == 0) { 222 state->out.on_completed(); 223 } 224 } 225 ); 226 227 auto selectedSink = on_exception( 228 [&](){return state->coordinator.out(sink);}, 229 state->out); 230 if (selectedSink.empty()) { 231 return; 232 } 233 234 source->subscribe(std::move(selectedSink.get())); 235 236 } 237 }; 238 239 } 240 241 /*! @copydoc rx-flat_map.hpp 242 */ 243 template<class... AN> 244 auto flat_map(AN&&... an) 245 -> operator_factory<flat_map_tag, AN...> { 246 return operator_factory<flat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 247 } 248 249 /*! @copydoc rx-flat_map.hpp 250 */ 251 template<class... AN> 252 auto merge_transform(AN&&... an) 253 -> operator_factory<flat_map_tag, AN...> { 254 return operator_factory<flat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 255 } 256 257 } 258 259 template<> 260 struct member_overload<flat_map_tag> 261 { 262 template<class Observable, class CollectionSelector, 263 class CollectionSelectorType = rxu::decay_t<CollectionSelector>, 264 class SourceValue = rxu::value_type_t<Observable>, 265 class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, 266 class ResultSelectorType = rxu::detail::take_at<1>, 267 class Enabled = rxu::enable_if_all_true_type_t< 268 all_observables<Observable, CollectionType>>, 269 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>, 270 class CollectionValueType = rxu::value_type_t<CollectionType>, 271 class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, 272 class Result = observable<Value, FlatMap> 273 > 274 static Result member(Observable&& o, CollectionSelector&& s) { 275 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread())); 276 } 277 278 template<class Observable, class CollectionSelector, class Coordination, 279 class CollectionSelectorType = rxu::decay_t<CollectionSelector>, 280 class SourceValue = rxu::value_type_t<Observable>, 281 class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, 282 class ResultSelectorType = rxu::detail::take_at<1>, 283 class Enabled = rxu::enable_if_all_true_type_t< 284 all_observables<Observable, CollectionType>, 285 is_coordination<Coordination>>, 286 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>, 287 class CollectionValueType = rxu::value_type_t<CollectionType>, 288 class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, 289 class Result = observable<Value, FlatMap> 290 > 291 static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) { 292 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn))); 293 } 294 295 template<class Observable, class CollectionSelector, class ResultSelector, 296 class IsCoordination = is_coordination<ResultSelector>, 297 class CollectionSelectorType = rxu::decay_t<CollectionSelector>, 298 class SourceValue = rxu::value_type_t<Observable>, 299 class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, 300 class Enabled = rxu::enable_if_all_true_type_t< 301 all_observables<Observable, CollectionType>, 302 rxu::negation<IsCoordination>>, 303 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>, 304 class CollectionValueType = rxu::value_type_t<CollectionType>, 305 class ResultSelectorType = rxu::decay_t<ResultSelector>, 306 class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, 307 class Result = observable<Value, FlatMap> 308 > 309 static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) { 310 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread())); 311 } 312 313 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination, 314 class CollectionSelectorType = rxu::decay_t<CollectionSelector>, 315 class SourceValue = rxu::value_type_t<Observable>, 316 class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, 317 class Enabled = rxu::enable_if_all_true_type_t< 318 all_observables<Observable, CollectionType>, 319 is_coordination<Coordination>>, 320 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>, 321 class CollectionValueType = rxu::value_type_t<CollectionType>, 322 class ResultSelectorType = rxu::decay_t<ResultSelector>, 323 class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, 324 class Result = observable<Value, FlatMap> 325 > 326 static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) { 327 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn))); 328 } 329 330 template<class... AN> 331 static operators::detail::flat_map_invalid_t<AN...> member(AN...) { 332 std::terminate(); 333 return {}; 334 static_assert(sizeof...(AN) == 10000, "flat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)"); 335 } 336 }; 337 338 } 339 340 #endif 341