1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-merge.hpp 6 7 \brief For each given observable subscribe. 8 For each item emitted from all of the given observables, deliver from the new observable that is returned. 9 10 There are 2 variants of the operator: 11 - The source observable emits nested observables, nested observables are merged. 12 - The source observable and the arguments v0...vn are used to provide the observables to merge. 13 14 \tparam Coordination the type of the scheduler (optional). 15 \tparam Value0 ... (optional). 16 \tparam ValueN types of source observables (optional). 17 18 \param cn the scheduler to synchronize sources from different contexts (optional). 19 \param v0 ... (optional). 20 \param vn source observables (optional). 21 22 \return Observable that emits items that are the result of flattening the observables emitted by the source observable. 23 24 If scheduler is omitted, identity_current_thread is used. 25 26 \sample 27 \snippet merge.cpp threaded implicit merge sample 28 \snippet output.txt threaded implicit merge sample 29 30 \sample 31 \snippet merge.cpp implicit merge sample 32 \snippet output.txt implicit merge sample 33 34 \sample 35 \snippet merge.cpp merge sample 36 \snippet output.txt merge sample 37 38 \sample 39 \snippet merge.cpp threaded merge sample 40 \snippet output.txt threaded merge sample 41 */ 42 43 #if !defined(RXCPP_OPERATORS_RX_MERGE_HPP) 44 #define RXCPP_OPERATORS_RX_MERGE_HPP 45 46 #include "../rx-includes.hpp" 47 48 namespace rxcpp { 49 50 namespace operators { 51 52 namespace detail { 53 54 template<class... AN> 55 struct merge_invalid_arguments {}; 56 57 template<class... AN> 58 struct merge_invalid : public rxo::operator_base<merge_invalid_arguments<AN...>> { 59 using type = observable<merge_invalid_arguments<AN...>, merge_invalid<AN...>>; 60 }; 61 template<class... AN> 62 using merge_invalid_t = typename merge_invalid<AN...>::type; 63 64 template<class T, class Observable, class Coordination> 65 struct merge 66 : public operator_base<rxu::value_type_t<rxu::decay_t<T>>> 67 { 68 //static_assert(is_observable<Observable>::value, "merge requires an observable"); 69 //static_assert(is_observable<T>::value, "merge requires an observable that contains observables"); 70 71 typedef merge<T, Observable, Coordination> this_type; 72 73 typedef rxu::decay_t<T> source_value_type; 74 typedef rxu::decay_t<Observable> source_type; 75 76 typedef typename source_type::source_operator_type source_operator_type; 77 typedef typename source_value_type::value_type value_type; 78 79 typedef rxu::decay_t<Coordination> coordination_type; 80 typedef typename coordination_type::coordinator_type coordinator_type; 81 82 struct values 83 { 84 values(source_operator_type o, coordination_type sf) 85 : source_operator(std::move(o)) 86 , coordination(std::move(sf)) 87 { 88 } 89 source_operator_type source_operator; 90 coordination_type coordination; 91 }; 92 values initial; 93 94 merge(const source_type& o, coordination_type sf) 95 : initial(o.source_operator, std::move(sf)) 96 { 97 } 98 99 template<class Subscriber> 100 void on_subscribe(Subscriber scbr) const { 101 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); 102 103 typedef Subscriber output_type; 104 105 struct merge_state_type 106 : public std::enable_shared_from_this<merge_state_type> 107 , public values 108 { 109 merge_state_type(values i, coordinator_type coor, output_type oarg) 110 : values(i) 111 , source(i.source_operator) 112 , pendingCompletions(0) 113 , coordinator(std::move(coor)) 114 , out(std::move(oarg)) 115 { 116 } 117 observable<source_value_type, source_operator_type> source; 118 // on_completed on the output must wait until all the 119 // subscriptions have received on_completed 120 int pendingCompletions; 121 coordinator_type coordinator; 122 output_type out; 123 }; 124 125 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); 126 127 // take a copy of the values for each subscription 128 auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr)); 129 130 composite_subscription outercs; 131 132 // when the out observer is unsubscribed all the 133 // inner subscriptions are unsubscribed as well 134 state->out.add(outercs); 135 136 auto source = on_exception( 137 [&](){return state->coordinator.in(state->source);}, 138 state->out); 139 if (source.empty()) { 140 return; 141 } 142 143 ++state->pendingCompletions; 144 // this subscribe does not share the observer subscription 145 // so that when it is unsubscribed the observer can be called 146 // until the inner subscriptions have finished 147 auto sink = make_subscriber<source_value_type>( 148 state->out, 149 outercs, 150 // on_next 151 [state](source_value_type st) { 152 153 composite_subscription innercs; 154 155 // when the out observer is unsubscribed all the 156 // inner subscriptions are unsubscribed as well 157 auto innercstoken = state->out.add(innercs); 158 159 innercs.add(make_subscription([state, innercstoken](){ 160 state->out.remove(innercstoken); 161 })); 162 163 auto selectedSource = state->coordinator.in(st); 164 165 ++state->pendingCompletions; 166 // this subscribe does not share the source subscription 167 // so that when it is unsubscribed the source will continue 168 auto sinkInner = make_subscriber<value_type>( 169 state->out, 170 innercs, 171 // on_next 172 [state, st](value_type ct) { 173 state->out.on_next(std::move(ct)); 174 }, 175 // on_error 176 [state](rxu::error_ptr e) { 177 state->out.on_error(e); 178 }, 179 //on_completed 180 [state](){ 181 if (--state->pendingCompletions == 0) { 182 state->out.on_completed(); 183 } 184 } 185 ); 186 187 auto selectedSinkInner = state->coordinator.out(sinkInner); 188 selectedSource.subscribe(std::move(selectedSinkInner)); 189 }, 190 // on_error 191 [state](rxu::error_ptr e) { 192 state->out.on_error(e); 193 }, 194 // on_completed 195 [state]() { 196 if (--state->pendingCompletions == 0) { 197 state->out.on_completed(); 198 } 199 } 200 ); 201 auto selectedSink = on_exception( 202 [&](){return state->coordinator.out(sink);}, 203 state->out); 204 if (selectedSink.empty()) { 205 return; 206 } 207 source->subscribe(std::move(selectedSink.get())); 208 } 209 }; 210 211 } 212 213 /*! @copydoc rx-merge.hpp 214 */ 215 template<class... AN> 216 auto merge(AN&&... an) 217 -> operator_factory<merge_tag, AN...> { 218 return operator_factory<merge_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 219 } 220 221 } 222 223 template<> 224 struct member_overload<merge_tag> 225 { 226 template<class Observable, 227 class Enabled = rxu::enable_if_all_true_type_t< 228 is_observable<Observable>>, 229 class SourceValue = rxu::value_type_t<Observable>, 230 class Merge = rxo::detail::merge<SourceValue, rxu::decay_t<Observable>, identity_one_worker>, 231 class Value = rxu::value_type_t<SourceValue>, 232 class Result = observable<Value, Merge> 233 > 234 static Result member(Observable&& o) { 235 return Result(Merge(std::forward<Observable>(o), identity_current_thread())); 236 } 237 238 template<class Observable, class Coordination, 239 class Enabled = rxu::enable_if_all_true_type_t< 240 is_observable<Observable>, 241 is_coordination<Coordination>>, 242 class SourceValue = rxu::value_type_t<Observable>, 243 class Merge = rxo::detail::merge<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>, 244 class Value = rxu::value_type_t<SourceValue>, 245 class Result = observable<Value, Merge> 246 > 247 static Result member(Observable&& o, Coordination&& cn) { 248 return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn))); 249 } 250 251 template<class Observable, class Value0, class... ValueN, 252 class Enabled = rxu::enable_if_all_true_type_t< 253 all_observables<Observable, Value0, ValueN...>>, 254 class EmittedValue = rxu::value_type_t<Observable>, 255 class SourceValue = observable<EmittedValue>, 256 class ObservableObservable = observable<SourceValue>, 257 class Merge = typename rxu::defer_type<rxo::detail::merge, SourceValue, ObservableObservable, identity_one_worker>::type, 258 class Value = rxu::value_type_t<Merge>, 259 class Result = observable<Value, Merge> 260 > 261 static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) { 262 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread())); 263 } 264 265 template<class Observable, class Coordination, class Value0, class... ValueN, 266 class Enabled = rxu::enable_if_all_true_type_t< 267 all_observables<Observable, Value0, ValueN...>, 268 is_coordination<Coordination>>, 269 class EmittedValue = rxu::value_type_t<Observable>, 270 class SourceValue = observable<EmittedValue>, 271 class ObservableObservable = observable<SourceValue>, 272 class Merge = typename rxu::defer_type<rxo::detail::merge, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type, 273 class Value = rxu::value_type_t<Merge>, 274 class Result = observable<Value, Merge> 275 > 276 static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) { 277 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn))); 278 } 279 280 template<class... AN> 281 static operators::detail::merge_invalid_t<AN...> member(AN...) { 282 std::terminate(); 283 return {}; 284 static_assert(sizeof...(AN) == 10000, "merge takes (optional Coordination, optional Value0, optional ValueN...)"); 285 } 286 }; 287 288 } 289 290 #endif 291