1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_OPERATORS_RX_ZIP_HPP) 6 #define RXCPP_OPERATORS_RX_ZIP_HPP 7 8 #include "../rx-includes.hpp" 9 10 /*! \file rx-zip.hpp 11 12 \brief Bring by one item from all given observables and select a value to emit from the new observable that is returned. 13 14 \tparam AN types of scheduler (optional), aggregate function (optional), and source observables 15 16 \param an scheduler (optional), aggregation function (optional), and source observables 17 18 \return Observable that emits the result of combining the items emitted and brought by one from each of the source observables. 19 20 If scheduler is omitted, identity_current_thread is used. 21 22 If aggregation function is omitted, the resulting observable returns tuples of emitted items. 23 24 \sample 25 26 Neither scheduler nor aggregation function are present: 27 \snippet zip.cpp zip sample 28 \snippet output.txt zip sample 29 30 Only scheduler is present: 31 \snippet zip.cpp Coordination zip sample 32 \snippet output.txt Coordination zip sample 33 34 Only aggregation function is present: 35 \snippet zip.cpp Selector zip sample 36 \snippet output.txt Selector zip sample 37 38 Both scheduler and aggregation function are present: 39 \snippet zip.cpp Coordination+Selector zip sample 40 \snippet output.txt Coordination+Selector zip sample 41 */ 42 43 namespace rxcpp { 44 45 namespace operators { 46 47 namespace detail { 48 49 template<class Observable> 50 struct zip_source_state 51 { 52 using value_type = rxu::value_type_t<Observable>; 53 zip_source_state() 54 : completed(false) 55 { 56 } 57 std::list<value_type> values; 58 bool completed; 59 }; 60 61 struct values_not_empty { 62 template<class Observable> 63 bool operator()(zip_source_state<Observable>& source) const { 64 return !source.values.empty(); 65 } 66 }; 67 68 struct source_completed_values_empty { 69 template<class Observable> 70 bool operator()(zip_source_state<Observable>& source) const { 71 return source.completed && source.values.empty(); 72 } 73 }; 74 75 struct extract_value_front { 76 template<class Observable, class Value = rxu::value_type_t<Observable>> 77 Value operator()(zip_source_state<Observable>& source) const { 78 auto val = std::move(source.values.front()); 79 source.values.pop_front(); 80 return val; 81 } 82 }; 83 84 template<class... AN> 85 struct zip_invalid_arguments {}; 86 87 template<class... AN> 88 struct zip_invalid : public rxo::operator_base<zip_invalid_arguments<AN...>> { 89 using type = observable<zip_invalid_arguments<AN...>, zip_invalid<AN...>>; 90 }; 91 template<class... AN> 92 using zip_invalid_t = typename zip_invalid<AN...>::type; 93 94 template<class Selector, class... ObservableN> 95 struct is_zip_selector_check { 96 typedef rxu::decay_t<Selector> selector_type; 97 98 struct tag_not_valid; 99 template<class CS, class... CON> 100 static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...)); 101 template<class CS, class... CON> 102 static tag_not_valid check(...); 103 104 using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0)); 105 106 static const bool value = !std::is_same<type, tag_not_valid>::value; 107 }; 108 109 template<class Selector, class... ObservableN> 110 struct invalid_zip_selector { 111 static const bool value = false; 112 }; 113 114 template<class Selector, class... ObservableN> 115 struct is_zip_selector : public std::conditional< 116 is_zip_selector_check<Selector, ObservableN...>::value, 117 is_zip_selector_check<Selector, ObservableN...>, 118 invalid_zip_selector<Selector, ObservableN...>>::type { 119 }; 120 121 template<class Selector, class... ON> 122 using result_zip_selector_t = typename is_zip_selector<Selector, ON...>::type; 123 124 template<class Coordination, class Selector, class... ObservableN> 125 struct zip_traits { 126 typedef std::tuple<rxu::decay_t<ObservableN>...> tuple_source_type; 127 typedef std::tuple<zip_source_state<ObservableN>...> tuple_source_values_type; 128 129 typedef rxu::decay_t<Selector> selector_type; 130 typedef rxu::decay_t<Coordination> coordination_type; 131 132 typedef typename is_zip_selector<selector_type, ObservableN...>::type value_type; 133 }; 134 135 template<class Coordination, class Selector, class... ObservableN> 136 struct zip : public operator_base<rxu::value_type_t<zip_traits<Coordination, Selector, ObservableN...>>> 137 { 138 typedef zip<Coordination, Selector, ObservableN...> this_type; 139 140 typedef zip_traits<Coordination, Selector, ObservableN...> traits; 141 142 typedef typename traits::tuple_source_type tuple_source_type; 143 typedef typename traits::tuple_source_values_type tuple_source_values_type; 144 145 typedef typename traits::selector_type selector_type; 146 147 typedef typename traits::coordination_type coordination_type; 148 typedef typename coordination_type::coordinator_type coordinator_type; 149 150 struct values 151 { 152 values(tuple_source_type o, selector_type s, coordination_type sf) 153 : source(std::move(o)) 154 , selector(std::move(s)) 155 , coordination(std::move(sf)) 156 { 157 } 158 tuple_source_type source; 159 selector_type selector; 160 coordination_type coordination; 161 }; 162 values initial; 163 164 zip(coordination_type sf, selector_type s, tuple_source_type ts) 165 : initial(std::move(ts), std::move(s), std::move(sf)) 166 { 167 } 168 169 template<int Index, class State> 170 void subscribe_one(std::shared_ptr<State> state) const { 171 172 typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type; 173 174 composite_subscription innercs; 175 176 // when the out observer is unsubscribed all the 177 // inner subscriptions are unsubscribed as well 178 state->out.add(innercs); 179 180 auto source = on_exception( 181 [&](){return state->coordinator.in(std::get<Index>(state->source));}, 182 state->out); 183 if (source.empty()) { 184 return; 185 } 186 187 // this subscribe does not share the observer subscription 188 // so that when it is unsubscribed the observer can be called 189 // until the inner subscriptions have finished 190 auto sink = make_subscriber<source_value_type>( 191 state->out, 192 innercs, 193 // on_next 194 [state](source_value_type st) { 195 auto& values = std::get<Index>(state->pending).values; 196 values.push_back(st); 197 if (rxu::apply_to_each(state->pending, values_not_empty(), rxu::all_values_true())) { 198 auto selectedResult = rxu::apply_to_each(state->pending, extract_value_front(), state->selector); 199 state->out.on_next(selectedResult); 200 } 201 if (rxu::apply_to_each(state->pending, source_completed_values_empty(), rxu::any_value_true())) { 202 state->out.on_completed(); 203 } 204 }, 205 // on_error 206 [state](rxu::error_ptr e) { 207 state->out.on_error(e); 208 }, 209 // on_completed 210 [state]() { 211 auto& completed = std::get<Index>(state->pending).completed; 212 completed = true; 213 if (--state->pendingCompletions == 0) { 214 state->out.on_completed(); 215 } 216 } 217 ); 218 auto selectedSink = on_exception( 219 [&](){return state->coordinator.out(sink);}, 220 state->out); 221 if (selectedSink.empty()) { 222 return; 223 } 224 source->subscribe(std::move(selectedSink.get())); 225 } 226 template<class State, int... IndexN> 227 void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const { 228 bool subscribed[] = {(subscribe_one<IndexN>(state), true)...}; 229 subscribed[0] = (*subscribed); // silence warning 230 } 231 232 template<class Subscriber> 233 void on_subscribe(Subscriber scbr) const { 234 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); 235 236 typedef Subscriber output_type; 237 238 struct zip_state_type 239 : public std::enable_shared_from_this<zip_state_type> 240 , public values 241 { 242 zip_state_type(values i, coordinator_type coor, output_type oarg) 243 : values(std::move(i)) 244 , pendingCompletions(sizeof... (ObservableN)) 245 , valuesSet(0) 246 , coordinator(std::move(coor)) 247 , out(std::move(oarg)) 248 { 249 } 250 251 // on_completed on the output must wait until all the 252 // subscriptions have received on_completed 253 mutable int pendingCompletions; 254 mutable int valuesSet; 255 mutable tuple_source_values_type pending; 256 coordinator_type coordinator; 257 output_type out; 258 }; 259 260 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); 261 262 // take a copy of the values for each subscription 263 auto state = std::make_shared<zip_state_type>(initial, std::move(coordinator), std::move(scbr)); 264 265 subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type()); 266 } 267 }; 268 269 } 270 271 /*! @copydoc rx-zip.hpp 272 */ 273 template<class... AN> 274 auto zip(AN&&... an) 275 -> operator_factory<zip_tag, AN...> { 276 return operator_factory<zip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 277 } 278 279 } 280 281 template<> 282 struct member_overload<zip_tag> 283 { 284 template<class Observable, class... ObservableN, 285 class Enabled = rxu::enable_if_all_true_type_t< 286 all_observables<Observable, ObservableN...>>, 287 class Zip = rxo::detail::zip<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, 288 class Value = rxu::value_type_t<Zip>, 289 class Result = observable<Value, Zip>> 290 static Result member(Observable&& o, ObservableN&&... on) 291 { 292 return Result(Zip(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); 293 } 294 295 template<class Observable, class Selector, class... ObservableN, 296 class Enabled = rxu::enable_if_all_true_type_t< 297 operators::detail::is_zip_selector<Selector, Observable, ObservableN...>, 298 all_observables<Observable, ObservableN...>>, 299 class ResolvedSelector = rxu::decay_t<Selector>, 300 class Zip = rxo::detail::zip<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, 301 class Value = rxu::value_type_t<Zip>, 302 class Result = observable<Value, Zip>> 303 static Result member(Observable&& o, Selector&& s, ObservableN&&... on) 304 { 305 return Result(Zip(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); 306 } 307 308 template<class Coordination, class Observable, class... ObservableN, 309 class Enabled = rxu::enable_if_all_true_type_t< 310 is_coordination<Coordination>, 311 all_observables<Observable, ObservableN...>>, 312 class Zip = rxo::detail::zip<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, 313 class Value = rxu::value_type_t<Zip>, 314 class Result = observable<Value, Zip>> 315 static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on) 316 { 317 return Result(Zip(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); 318 } 319 320 template<class Coordination, class Selector, class Observable, class... ObservableN, 321 class Enabled = rxu::enable_if_all_true_type_t< 322 is_coordination<Coordination>, 323 operators::detail::is_zip_selector<Selector, Observable, ObservableN...>, 324 all_observables<Observable, ObservableN...>>, 325 class ResolvedSelector = rxu::decay_t<Selector>, 326 class Zip = rxo::detail::zip<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, 327 class Value = rxu::value_type_t<Zip>, 328 class Result = observable<Value, Zip>> 329 static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on) 330 { 331 return Result(Zip(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); 332 } 333 334 template<class... AN> 335 static operators::detail::zip_invalid_t<AN...> member(const AN&...) { 336 std::terminate(); 337 return {}; 338 static_assert(sizeof...(AN) == 10000, "zip takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)"); 339 } 340 }; 341 342 } 343 344 #endif 345