1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-amb.hpp 6 7 \brief For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler. 8 9 There are 2 variants of the operator: 10 - The source observable emits nested observables, one of the nested observables is selected. 11 - The source observable and the arguments v0...vn are used to provide the observables to select from. 12 13 \tparam Coordination the type of the scheduler (optional). 14 \tparam Value0 ... (optional). 15 \tparam ValueN types of source observables (optional). 16 17 \param cn the scheduler to synchronize sources from different contexts (optional). 18 \param v0 ... (optional). 19 \param vn source observables (optional). 20 21 \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification. 22 23 If scheduler is omitted, identity_current_thread is used. 24 25 \sample 26 \snippet amb.cpp threaded implicit amb sample 27 \snippet output.txt threaded implicit amb sample 28 29 \snippet amb.cpp implicit amb sample 30 \snippet output.txt implicit amb sample 31 32 \snippet amb.cpp amb sample 33 \snippet output.txt amb sample 34 35 \snippet amb.cpp threaded amb sample 36 \snippet output.txt threaded amb sample 37 */ 38 39 #if !defined(RXCPP_OPERATORS_RX_AMB_HPP) 40 #define RXCPP_OPERATORS_RX_AMB_HPP 41 42 #include "../rx-includes.hpp" 43 44 namespace rxcpp { 45 46 namespace operators { 47 48 namespace detail { 49 50 template<class... AN> 51 struct amb_invalid_arguments {}; 52 53 template<class... AN> 54 struct amb_invalid : public rxo::operator_base<amb_invalid_arguments<AN...>> { 55 using type = observable<amb_invalid_arguments<AN...>, amb_invalid<AN...>>; 56 }; 57 template<class... AN> 58 using amb_invalid_t = typename amb_invalid<AN...>::type; 59 60 template<class T, class Observable, class Coordination> 61 struct amb 62 : public operator_base<rxu::value_type_t<T>> 63 { 64 //static_assert(is_observable<Observable>::value, "amb requires an observable"); 65 //static_assert(is_observable<T>::value, "amb requires an observable that contains observables"); 66 67 typedef amb<T, Observable, Coordination> this_type; 68 69 typedef rxu::decay_t<T> source_value_type; 70 typedef rxu::decay_t<Observable> source_type; 71 72 typedef typename source_type::source_operator_type source_operator_type; 73 typedef typename source_value_type::value_type value_type; 74 75 typedef rxu::decay_t<Coordination> coordination_type; 76 typedef typename coordination_type::coordinator_type coordinator_type; 77 78 struct values 79 { 80 values(source_operator_type o, coordination_type sf) 81 : source_operator(std::move(o)) 82 , coordination(std::move(sf)) 83 { 84 } 85 source_operator_type source_operator; 86 coordination_type coordination; 87 }; 88 values initial; 89 90 amb(const source_type& o, coordination_type sf) 91 : initial(o.source_operator, std::move(sf)) 92 { 93 } 94 95 template<class Subscriber> 96 void on_subscribe(Subscriber scbr) const { 97 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); 98 99 typedef Subscriber output_type; 100 101 struct amb_state_type 102 : public std::enable_shared_from_this<amb_state_type> 103 , public values 104 { 105 amb_state_type(values i, coordinator_type coor, output_type oarg) 106 : values(i) 107 , source(i.source_operator) 108 , coordinator(std::move(coor)) 109 , out(std::move(oarg)) 110 , pendingObservables(0) 111 , firstEmitted(false) 112 { 113 } 114 observable<source_value_type, source_operator_type> source; 115 coordinator_type coordinator; 116 output_type out; 117 int pendingObservables; 118 bool firstEmitted; 119 std::vector<composite_subscription> innerSubscriptions; 120 }; 121 122 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); 123 124 // take a copy of the values for each subscription 125 auto state = std::make_shared<amb_state_type>(initial, std::move(coordinator), std::move(scbr)); 126 127 composite_subscription outercs; 128 129 // when the out observer is unsubscribed all the 130 // inner subscriptions are unsubscribed as well 131 state->out.add(outercs); 132 133 auto source = on_exception( 134 [&](){return state->coordinator.in(state->source);}, 135 state->out); 136 if (source.empty()) { 137 return; 138 } 139 140 // this subscribe does not share the observer subscription 141 // so that when it is unsubscribed the observer can be called 142 // until the inner subscriptions have finished 143 auto sink = make_subscriber<source_value_type>( 144 state->out, 145 outercs, 146 // on_next 147 [state](source_value_type st) { 148 149 if (state->firstEmitted) 150 return; 151 152 composite_subscription innercs; 153 154 state->innerSubscriptions.push_back(innercs); 155 156 // when the out observer is unsubscribed all the 157 // inner subscriptions are unsubscribed as well 158 auto innercstoken = state->out.add(innercs); 159 160 innercs.add(make_subscription([state, innercstoken](){ 161 state->out.remove(innercstoken); 162 })); 163 164 auto selectedSource = state->coordinator.in(st); 165 166 auto current_id = state->pendingObservables++; 167 168 // this subscribe does not share the source subscription 169 // so that when it is unsubscribed the source will continue 170 auto sinkInner = make_subscriber<value_type>( 171 state->out, 172 innercs, 173 // on_next 174 [state, st, current_id](value_type ct) { 175 state->out.on_next(std::move(ct)); 176 if (!state->firstEmitted) { 177 state->firstEmitted = true; 178 auto do_unsubscribe = [](composite_subscription cs) { 179 cs.unsubscribe(); 180 }; 181 std::for_each(state->innerSubscriptions.begin(), state->innerSubscriptions.begin() + current_id, do_unsubscribe); 182 std::for_each(state->innerSubscriptions.begin() + current_id + 1, state->innerSubscriptions.end(), do_unsubscribe); 183 } 184 }, 185 // on_error 186 [state](rxu::error_ptr e) { 187 state->out.on_error(e); 188 }, 189 //on_completed 190 [state](){ 191 state->out.on_completed(); 192 } 193 ); 194 195 auto selectedSinkInner = state->coordinator.out(sinkInner); 196 selectedSource.subscribe(std::move(selectedSinkInner)); 197 }, 198 // on_error 199 [state](rxu::error_ptr e) { 200 state->out.on_error(e); 201 }, 202 // on_completed 203 [state]() { 204 if (state->pendingObservables == 0) { 205 state->out.on_completed(); 206 } 207 } 208 ); 209 auto selectedSink = on_exception( 210 [&](){return state->coordinator.out(sink);}, 211 state->out); 212 if (selectedSink.empty()) { 213 return; 214 } 215 source->subscribe(std::move(selectedSink.get())); 216 } 217 }; 218 219 } 220 221 /*! @copydoc rx-amb.hpp 222 */ 223 template<class... AN> 224 auto amb(AN&&... an) 225 -> operator_factory<amb_tag, AN...> { 226 return operator_factory<amb_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 227 } 228 229 } 230 231 template<> 232 struct member_overload<amb_tag> 233 { 234 template<class Observable, 235 class Enabled = rxu::enable_if_all_true_type_t< 236 is_observable<Observable>>, 237 class SourceValue = rxu::value_type_t<Observable>, 238 class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, identity_one_worker>, 239 class Value = rxu::value_type_t<SourceValue>, 240 class Result = observable<Value, Amb> 241 > 242 static Result member(Observable&& o) { 243 return Result(Amb(std::forward<Observable>(o), identity_current_thread())); 244 } 245 246 template<class Observable, class Coordination, 247 class Enabled = rxu::enable_if_all_true_type_t< 248 is_observable<Observable>, 249 is_coordination<Coordination>>, 250 class SourceValue = rxu::value_type_t<Observable>, 251 class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>, 252 class Value = rxu::value_type_t<SourceValue>, 253 class Result = observable<Value, Amb> 254 > 255 static Result member(Observable&& o, Coordination&& cn) { 256 return Result(Amb(std::forward<Observable>(o), std::forward<Coordination>(cn))); 257 } 258 259 template<class Observable, class Value0, class... ValueN, 260 class Enabled = rxu::enable_if_all_true_type_t< 261 all_observables<Observable, Value0, ValueN...>>, 262 class EmittedValue = rxu::value_type_t<Observable>, 263 class SourceValue = observable<EmittedValue>, 264 class ObservableObservable = observable<SourceValue>, 265 class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, identity_one_worker>::type, 266 class Value = rxu::value_type_t<Amb>, 267 class Result = observable<Value, Amb> 268 > 269 static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) { 270 return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread())); 271 } 272 273 template<class Observable, class Coordination, class Value0, class... ValueN, 274 class Enabled = rxu::enable_if_all_true_type_t< 275 all_observables<Observable, Value0, ValueN...>, 276 is_coordination<Coordination>>, 277 class EmittedValue = rxu::value_type_t<Observable>, 278 class SourceValue = observable<EmittedValue>, 279 class ObservableObservable = observable<SourceValue>, 280 class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type, 281 class Value = rxu::value_type_t<Amb>, 282 class Result = observable<Value, Amb> 283 > 284 static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) { 285 return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn))); 286 } 287 288 template<class... AN> 289 static operators::detail::amb_invalid_t<AN...> member(AN...) { 290 std::terminate(); 291 return {}; 292 static_assert(sizeof...(AN) == 10000, "amb takes (optional Coordination, optional Value0, optional ValueN...)"); 293 } 294 }; 295 296 } 297 298 #endif 299