1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-merge_delay_error.hpp 6 7 \brief For each given observable subscribe. 8 For each item emitted from all of the given observables, deliver from the new observable that is returned. 9 The first error to occure is hold off until all of the given non-error-emitting observables have finished their emission. 10 11 There are 2 variants of the operator: 12 - The source observable emits nested observables, nested observables are merged. 13 - The source observable and the arguments v0...vn are used to provide the observables to merge. 14 15 \tparam Coordination the type of the scheduler (optional). 16 \tparam Value0 ... (optional). 17 \tparam ValueN types of source observables (optional). 18 19 \param cn the scheduler to synchronize sources from different contexts (optional). 20 \param v0 ... (optional). 21 \param vn source observables (optional). 22 23 \return Observable that emits items that are the result of flattening the observables emitted by the source observable. 24 25 If scheduler is omitted, identity_current_thread is used. 26 27 \sample 28 \snippet merge_delay_error.cpp threaded implicit merge sample 29 \snippet output.txt threaded implicit merge sample 30 31 \sample 32 \snippet merge_delay_error.cpp implicit merge sample 33 \snippet output.txt implicit merge sample 34 35 \sample 36 \snippet merge_delay_error.cpp merge sample 37 \snippet output.txt merge sample 38 39 \sample 40 \snippet merge_delay_error.cpp threaded merge sample 41 \snippet output.txt threaded merge sample 42 */ 43 44 #if !defined(RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP) 45 #define RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP 46 47 #include "rx-merge.hpp" 48 49 #include "../rx-composite_exception.hpp" 50 51 namespace rxcpp { 52 53 namespace operators { 54 55 namespace detail { 56 57 template<class T, class Observable, class Coordination> 58 struct merge_delay_error 59 : public operator_base<rxu::value_type_t<rxu::decay_t<T>>> 60 { 61 //static_assert(is_observable<Observable>::value, "merge requires an observable"); 62 //static_assert(is_observable<T>::value, "merge requires an observable that contains observables"); 63 64 typedef merge_delay_error<T, Observable, Coordination> this_type; 65 66 typedef rxu::decay_t<T> source_value_type; 67 typedef rxu::decay_t<Observable> source_type; 68 69 typedef typename source_type::source_operator_type source_operator_type; 70 typedef typename source_value_type::value_type value_type; 71 72 typedef rxu::decay_t<Coordination> coordination_type; 73 typedef typename coordination_type::coordinator_type coordinator_type; 74 75 struct values 76 { 77 values(source_operator_type o, coordination_type sf) 78 : source_operator(std::move(o)) 79 , coordination(std::move(sf)) 80 { 81 } 82 source_operator_type source_operator; 83 coordination_type coordination; 84 }; 85 values initial; 86 87 merge_delay_error(const source_type& o, coordination_type sf) 88 : initial(o.source_operator, std::move(sf)) 89 { 90 } 91 92 template<class Subscriber> 93 void on_subscribe(Subscriber scbr) const { 94 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); 95 96 typedef Subscriber output_type; 97 98 struct merge_state_type 99 : public std::enable_shared_from_this<merge_state_type> 100 , public values 101 { 102 merge_state_type(values i, coordinator_type coor, output_type oarg) 103 : values(i) 104 , source(i.source_operator) 105 , pendingCompletions(0) 106 , coordinator(std::move(coor)) 107 , out(std::move(oarg)) 108 { 109 } 110 observable<source_value_type, source_operator_type> source; 111 // on_completed on the output must wait until all the 112 // subscriptions have received on_completed 113 int pendingCompletions; 114 composite_exception exception;; 115 coordinator_type coordinator; 116 output_type out; 117 }; 118 119 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); 120 121 // take a copy of the values for each subscription 122 auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr)); 123 124 composite_subscription outercs; 125 126 // when the out observer is unsubscribed all the 127 // inner subscriptions are unsubscribed as well 128 state->out.add(outercs); 129 130 auto source = on_exception( 131 [&](){return state->coordinator.in(state->source);}, 132 state->out); 133 if (source.empty()) { 134 return; 135 } 136 137 ++state->pendingCompletions; 138 // this subscribe does not share the observer subscription 139 // so that when it is unsubscribed the observer can be called 140 // until the inner subscriptions have finished 141 auto sink = make_subscriber<source_value_type>( 142 state->out, 143 outercs, 144 // on_next 145 [state](source_value_type st) { 146 147 composite_subscription innercs; 148 149 // when the out observer is unsubscribed all the 150 // inner subscriptions are unsubscribed as well 151 auto innercstoken = state->out.add(innercs); 152 153 innercs.add(make_subscription([state, innercstoken](){ 154 state->out.remove(innercstoken); 155 })); 156 157 auto selectedSource = state->coordinator.in(st); 158 159 ++state->pendingCompletions; 160 // this subscribe does not share the source subscription 161 // so that when it is unsubscribed the source will continue 162 auto sinkInner = make_subscriber<value_type>( 163 state->out, 164 innercs, 165 // on_next 166 [state, st](value_type ct) { 167 state->out.on_next(std::move(ct)); 168 }, 169 // on_error 170 [state](rxu::error_ptr e) { 171 if(--state->pendingCompletions == 0) { 172 state->out.on_error( 173 rxu::make_error_ptr(std::move(state->exception.add(e)))); 174 } else { 175 state->exception.add(e); 176 } 177 }, 178 //on_completed 179 [state](){ 180 if (--state->pendingCompletions == 0) { 181 if(!state->exception.empty()) { 182 state->out.on_error( 183 rxu::make_error_ptr(std::move(state->exception))); 184 } else { 185 state->out.on_completed(); 186 } 187 } 188 } 189 ); 190 191 auto selectedSinkInner = state->coordinator.out(sinkInner); 192 selectedSource.subscribe(std::move(selectedSinkInner)); 193 }, 194 // on_error 195 [state](rxu::error_ptr e) { 196 if(--state->pendingCompletions == 0) { 197 state->out.on_error( 198 rxu::make_error_ptr(std::move(state->exception.add(e)))); 199 } else { 200 state->exception.add(e); 201 } 202 }, 203 // on_completed 204 [state]() { 205 if (--state->pendingCompletions == 0) { 206 if(!state->exception.empty()) { 207 state->out.on_error( 208 rxu::make_error_ptr(std::move(state->exception))); 209 } else { 210 state->out.on_completed(); 211 } 212 } 213 } 214 ); 215 auto selectedSink = on_exception( 216 [&](){return state->coordinator.out(sink);}, 217 state->out); 218 if (selectedSink.empty()) { 219 return; 220 } 221 source->subscribe(std::move(selectedSink.get())); 222 } 223 }; 224 225 } 226 227 /*! @copydoc rx-merge-delay-error.hpp 228 */ 229 template<class... AN> 230 auto merge_delay_error(AN&&... an) 231 -> operator_factory<merge_delay_error_tag, AN...> { 232 return operator_factory<merge_delay_error_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 233 } 234 235 } 236 237 template<> 238 struct member_overload<merge_delay_error_tag> 239 { 240 template<class Observable, 241 class Enabled = rxu::enable_if_all_true_type_t< 242 is_observable<Observable>>, 243 class SourceValue = rxu::value_type_t<Observable>, 244 class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, identity_one_worker>, 245 class Value = rxu::value_type_t<SourceValue>, 246 class Result = observable<Value, Merge> 247 > 248 static Result member(Observable&& o) { 249 return Result(Merge(std::forward<Observable>(o), identity_current_thread())); 250 } 251 252 template<class Observable, class Coordination, 253 class Enabled = rxu::enable_if_all_true_type_t< 254 is_observable<Observable>, 255 is_coordination<Coordination>>, 256 class SourceValue = rxu::value_type_t<Observable>, 257 class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>, 258 class Value = rxu::value_type_t<SourceValue>, 259 class Result = observable<Value, Merge> 260 > 261 static Result member(Observable&& o, Coordination&& cn) { 262 return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn))); 263 } 264 265 template<class Observable, class Value0, class... ValueN, 266 class Enabled = rxu::enable_if_all_true_type_t< 267 all_observables<Observable, Value0, ValueN...>>, 268 class EmittedValue = rxu::value_type_t<Observable>, 269 class SourceValue = observable<EmittedValue>, 270 class ObservableObservable = observable<SourceValue>, 271 class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, identity_one_worker>::type, 272 class Value = rxu::value_type_t<Merge>, 273 class Result = observable<Value, Merge> 274 > 275 static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) { 276 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread())); 277 } 278 279 template<class Observable, class Coordination, class Value0, class... ValueN, 280 class Enabled = rxu::enable_if_all_true_type_t< 281 all_observables<Observable, Value0, ValueN...>, 282 is_coordination<Coordination>>, 283 class EmittedValue = rxu::value_type_t<Observable>, 284 class SourceValue = observable<EmittedValue>, 285 class ObservableObservable = observable<SourceValue>, 286 class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type, 287 class Value = rxu::value_type_t<Merge>, 288 class Result = observable<Value, Merge> 289 > 290 static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) { 291 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn))); 292 } 293 294 template<class... AN> 295 static operators::detail::merge_invalid_t<AN...> member(AN...) { 296 std::terminate(); 297 return {}; 298 static_assert(sizeof...(AN) == 10000, "merge_delay_error takes (optional Coordination, optional Value0, optional ValueN...)"); 299 } 300 }; 301 302 } 303 304 #endif 305