1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-combine_latest.hpp 6 7 \brief For each item from all of the observables select a value to emit from the new observable that is returned. 8 9 \tparam AN types of scheduler (optional), aggregate function (optional), and source observables 10 11 \param an scheduler (optional), aggregation function (optional), and source observables 12 13 \return Observable that emits items that are the result of combining the items emitted by the source observables. 14 15 If scheduler is omitted, identity_current_thread is used. 16 17 If aggregation function is omitted, the resulting observable returns tuples of emitted items. 18 19 \sample 20 21 Neither scheduler nor aggregation function are present: 22 \snippet combine_latest.cpp combine_latest sample 23 \snippet output.txt combine_latest sample 24 25 Only scheduler is present: 26 \snippet combine_latest.cpp Coordination combine_latest sample 27 \snippet output.txt Coordination combine_latest sample 28 29 Only aggregation function is present: 30 \snippet combine_latest.cpp Selector combine_latest sample 31 \snippet output.txt Selector combine_latest sample 32 33 Both scheduler and aggregation function are present: 34 \snippet combine_latest.cpp Coordination+Selector combine_latest sample 35 \snippet output.txt Coordination+Selector combine_latest sample 36 */ 37 38 #if !defined(RXCPP_OPERATORS_RX_COMBINE_LATEST_HPP) 39 #define RXCPP_OPERATORS_RX_COMBINE_LATEST_HPP 40 41 #include "../rx-includes.hpp" 42 43 namespace rxcpp { 44 45 namespace operators { 46 47 namespace detail { 48 49 template<class... AN> 50 struct combine_latest_invalid_arguments {}; 51 52 template<class... AN> 53 struct combine_latest_invalid : public rxo::operator_base<combine_latest_invalid_arguments<AN...>> { 54 using type = observable<combine_latest_invalid_arguments<AN...>, combine_latest_invalid<AN...>>; 55 }; 56 template<class... AN> 57 using combine_latest_invalid_t = typename combine_latest_invalid<AN...>::type; 58 59 template<class Selector, class... ObservableN> 60 struct is_combine_latest_selector_check { 61 typedef rxu::decay_t<Selector> selector_type; 62 63 struct tag_not_valid; 64 template<class CS, class... CON> 65 static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...)); 66 template<class CS, class... CON> 67 static tag_not_valid check(...); 68 69 using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0)); 70 71 static const bool value = !std::is_same<type, tag_not_valid>::value; 72 }; 73 74 template<class Selector, class... ObservableN> 75 struct invalid_combine_latest_selector { 76 static const bool value = false; 77 }; 78 79 template<class Selector, class... ObservableN> 80 struct is_combine_latest_selector : public std::conditional< 81 is_combine_latest_selector_check<Selector, ObservableN...>::value, 82 is_combine_latest_selector_check<Selector, ObservableN...>, 83 invalid_combine_latest_selector<Selector, ObservableN...>>::type { 84 }; 85 86 template<class Selector, class... ON> 87 using result_combine_latest_selector_t = typename is_combine_latest_selector<Selector, ON...>::type; 88 89 template<class Coordination, class Selector, class... ObservableN> 90 struct combine_latest_traits { 91 92 typedef std::tuple<ObservableN...> tuple_source_type; 93 typedef std::tuple<rxu::detail::maybe<typename ObservableN::value_type>...> tuple_source_value_type; 94 95 typedef rxu::decay_t<Selector> selector_type; 96 typedef rxu::decay_t<Coordination> coordination_type; 97 98 typedef typename is_combine_latest_selector<selector_type, ObservableN...>::type value_type; 99 }; 100 101 template<class Coordination, class Selector, class... ObservableN> 102 struct combine_latest : public operator_base<rxu::value_type_t<combine_latest_traits<Coordination, Selector, ObservableN...>>> 103 { 104 typedef combine_latest<Coordination, Selector, ObservableN...> this_type; 105 106 typedef combine_latest_traits<Coordination, Selector, ObservableN...> traits; 107 108 typedef typename traits::tuple_source_type tuple_source_type; 109 typedef typename traits::tuple_source_value_type tuple_source_value_type; 110 111 typedef typename traits::selector_type selector_type; 112 113 typedef typename traits::coordination_type coordination_type; 114 typedef typename coordination_type::coordinator_type coordinator_type; 115 116 struct values 117 { 118 values(tuple_source_type o, selector_type s, coordination_type sf) 119 : source(std::move(o)) 120 , selector(std::move(s)) 121 , coordination(std::move(sf)) 122 { 123 } 124 tuple_source_type source; 125 selector_type selector; 126 coordination_type coordination; 127 }; 128 values initial; 129 130 combine_latest(coordination_type sf, selector_type s, tuple_source_type ts) 131 : initial(std::move(ts), std::move(s), std::move(sf)) 132 { 133 } 134 135 template<int Index, class State> 136 void subscribe_one(std::shared_ptr<State> state) const { 137 138 typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type; 139 140 composite_subscription innercs; 141 142 // when the out observer is unsubscribed all the 143 // inner subscriptions are unsubscribed as well 144 state->out.add(innercs); 145 146 auto source = on_exception( 147 [&](){return state->coordinator.in(std::get<Index>(state->source));}, 148 state->out); 149 if (source.empty()) { 150 return; 151 } 152 153 // this subscribe does not share the observer subscription 154 // so that when it is unsubscribed the observer can be called 155 // until the inner subscriptions have finished 156 auto sink = make_subscriber<source_value_type>( 157 state->out, 158 innercs, 159 // on_next 160 [state](source_value_type st) { 161 auto& value = std::get<Index>(state->latest); 162 163 if (value.empty()) { 164 ++state->valuesSet; 165 } 166 167 value.reset(st); 168 169 if (state->valuesSet == sizeof... (ObservableN)) { 170 auto values = rxu::surely(state->latest); 171 auto selectedResult = rxu::apply(values, state->selector); 172 state->out.on_next(selectedResult); 173 } 174 }, 175 // on_error 176 [state](rxu::error_ptr e) { 177 state->out.on_error(e); 178 }, 179 // on_completed 180 [state]() { 181 if (--state->pendingCompletions == 0) { 182 state->out.on_completed(); 183 } 184 } 185 ); 186 auto selectedSink = on_exception( 187 [&](){return state->coordinator.out(sink);}, 188 state->out); 189 if (selectedSink.empty()) { 190 return; 191 } 192 source->subscribe(std::move(selectedSink.get())); 193 } 194 template<class State, int... IndexN> 195 void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const { 196 bool subscribed[] = {(subscribe_one<IndexN>(state), true)...}; 197 subscribed[0] = (*subscribed); // silence warning 198 } 199 200 template<class Subscriber> 201 void on_subscribe(Subscriber scbr) const { 202 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); 203 204 typedef Subscriber output_type; 205 206 struct combine_latest_state_type 207 : public std::enable_shared_from_this<combine_latest_state_type> 208 , public values 209 { 210 combine_latest_state_type(values i, coordinator_type coor, output_type oarg) 211 : values(std::move(i)) 212 , pendingCompletions(sizeof... (ObservableN)) 213 , valuesSet(0) 214 , coordinator(std::move(coor)) 215 , out(std::move(oarg)) 216 { 217 } 218 219 // on_completed on the output must wait until all the 220 // subscriptions have received on_completed 221 mutable int pendingCompletions; 222 mutable int valuesSet; 223 mutable tuple_source_value_type latest; 224 coordinator_type coordinator; 225 output_type out; 226 }; 227 228 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); 229 230 // take a copy of the values for each subscription 231 auto state = std::make_shared<combine_latest_state_type>(initial, std::move(coordinator), std::move(scbr)); 232 233 subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type()); 234 } 235 }; 236 237 } 238 239 /*! @copydoc rx-combine_latest.hpp 240 */ 241 template<class... AN> 242 auto combine_latest(AN&&... an) 243 -> operator_factory<combine_latest_tag, AN...> { 244 return operator_factory<combine_latest_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 245 } 246 247 } 248 249 template<> 250 struct member_overload<combine_latest_tag> 251 { 252 template<class Observable, class... ObservableN, 253 class Enabled = rxu::enable_if_all_true_type_t< 254 all_observables<Observable, ObservableN...>>, 255 class combine_latest = rxo::detail::combine_latest<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, 256 class Value = rxu::value_type_t<combine_latest>, 257 class Result = observable<Value, combine_latest>> 258 static Result member(Observable&& o, ObservableN&&... on) 259 { 260 return Result(combine_latest(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); 261 } 262 263 template<class Observable, class Selector, class... ObservableN, 264 class Enabled = rxu::enable_if_all_true_type_t< 265 operators::detail::is_combine_latest_selector<Selector, Observable, ObservableN...>, 266 all_observables<Observable, ObservableN...>>, 267 class ResolvedSelector = rxu::decay_t<Selector>, 268 class combine_latest = rxo::detail::combine_latest<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, 269 class Value = rxu::value_type_t<combine_latest>, 270 class Result = observable<Value, combine_latest>> 271 static Result member(Observable&& o, Selector&& s, ObservableN&&... on) 272 { 273 return Result(combine_latest(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); 274 } 275 276 template<class Coordination, class Observable, class... ObservableN, 277 class Enabled = rxu::enable_if_all_true_type_t< 278 is_coordination<Coordination>, 279 all_observables<Observable, ObservableN...>>, 280 class combine_latest = rxo::detail::combine_latest<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, 281 class Value = rxu::value_type_t<combine_latest>, 282 class Result = observable<Value, combine_latest>> 283 static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on) 284 { 285 return Result(combine_latest(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); 286 } 287 288 template<class Coordination, class Selector, class Observable, class... ObservableN, 289 class Enabled = rxu::enable_if_all_true_type_t< 290 is_coordination<Coordination>, 291 operators::detail::is_combine_latest_selector<Selector, Observable, ObservableN...>, 292 all_observables<Observable, ObservableN...>>, 293 class ResolvedSelector = rxu::decay_t<Selector>, 294 class combine_latest = rxo::detail::combine_latest<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, 295 class Value = rxu::value_type_t<combine_latest>, 296 class Result = observable<Value, combine_latest>> 297 static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on) 298 { 299 return Result(combine_latest(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); 300 } 301 302 template<class... AN> 303 static operators::detail::combine_latest_invalid_t<AN...> member(const AN&...) { 304 std::terminate(); 305 return {}; 306 static_assert(sizeof...(AN) == 10000, "combine_latest takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)"); 307 } 308 }; 309 310 } 311 312 #endif 313