1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-concat.hpp 6 7 \brief For each item from this observable subscribe to one at a time, in the order received. 8 For each item from all of the given observables deliver from the new observable that is returned. 9 10 There are 2 variants of the operator: 11 - The source observable emits nested observables, nested observables are concatenated. 12 - The source observable and the arguments v0...vn are used to provide the observables to concatenate. 13 14 \tparam Coordination the type of the scheduler (optional). 15 \tparam Value0 ... (optional). 16 \tparam ValueN types of source observables (optional). 17 18 \param cn the scheduler to synchronize sources from different contexts (optional). 19 \param v0 ... (optional). 20 \param vn source observables (optional). 21 22 \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them. 23 24 \sample 25 \snippet concat.cpp implicit concat sample 26 \snippet output.txt implicit concat sample 27 28 \sample 29 \snippet concat.cpp threaded implicit concat sample 30 \snippet output.txt threaded implicit concat sample 31 32 \sample 33 \snippet concat.cpp concat sample 34 \snippet output.txt concat sample 35 36 \sample 37 \snippet concat.cpp threaded concat sample 38 \snippet output.txt threaded concat sample 39 */ 40 41 #if !defined(RXCPP_OPERATORS_RX_CONCAT_HPP) 42 #define RXCPP_OPERATORS_RX_CONCAT_HPP 43 44 #include "../rx-includes.hpp" 45 46 namespace rxcpp { 47 48 namespace operators { 49 50 namespace detail { 51 52 template<class... AN> 53 struct concat_invalid_arguments {}; 54 55 template<class... AN> 56 struct concat_invalid : public rxo::operator_base<concat_invalid_arguments<AN...>> { 57 using type = observable<concat_invalid_arguments<AN...>, concat_invalid<AN...>>; 58 }; 59 template<class... AN> 60 using concat_invalid_t = typename concat_invalid<AN...>::type; 61 62 template<class T, class Observable, class Coordination> 63 struct concat 64 : public operator_base<rxu::value_type_t<rxu::decay_t<T>>> 65 { 66 typedef concat<T, Observable, Coordination> this_type; 67 68 typedef rxu::decay_t<T> source_value_type; 69 typedef rxu::decay_t<Observable> source_type; 70 typedef rxu::decay_t<Coordination> coordination_type; 71 72 typedef typename coordination_type::coordinator_type coordinator_type; 73 74 typedef typename source_type::source_operator_type source_operator_type; 75 typedef source_value_type collection_type; 76 typedef typename collection_type::value_type value_type; 77 78 struct values 79 { 80 values(source_operator_type o, coordination_type sf) 81 : source_operator(std::move(o)) 82 , coordination(std::move(sf)) 83 { 84 } 85 source_operator_type source_operator; 86 coordination_type coordination; 87 }; 88 values initial; 89 90 concat(const source_type& o, coordination_type sf) 91 : initial(o.source_operator, std::move(sf)) 92 { 93 } 94 95 template<class Subscriber> 96 void on_subscribe(Subscriber scbr) const { 97 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); 98 99 typedef Subscriber output_type; 100 101 struct concat_state_type 102 : public std::enable_shared_from_this<concat_state_type> 103 , public values 104 { 105 concat_state_type(values i, coordinator_type coor, output_type oarg) 106 : values(i) 107 , source(i.source_operator) 108 , sourceLifetime(composite_subscription::empty()) 109 , collectionLifetime(composite_subscription::empty()) 110 , coordinator(std::move(coor)) 111 , out(std::move(oarg)) 112 { 113 } 114 115 void subscribe_to(collection_type st) 116 { 117 auto state = this->shared_from_this(); 118 119 collectionLifetime = composite_subscription(); 120 121 // when the out observer is unsubscribed all the 122 // inner subscriptions are unsubscribed as well 123 auto innercstoken = state->out.add(collectionLifetime); 124 125 collectionLifetime.add(make_subscription([state, innercstoken](){ 126 state->out.remove(innercstoken); 127 })); 128 129 auto selectedSource = on_exception( 130 [&](){return state->coordinator.in(std::move(st));}, 131 state->out); 132 if (selectedSource.empty()) { 133 return; 134 } 135 136 // this subscribe does not share the out subscription 137 // so that when it is unsubscribed the out will continue 138 auto sinkInner = make_subscriber<value_type>( 139 state->out, 140 collectionLifetime, 141 // on_next 142 [state, st](value_type ct) { 143 state->out.on_next(ct); 144 }, 145 // on_error 146 [state](rxu::error_ptr e) { 147 state->out.on_error(e); 148 }, 149 //on_completed 150 [state](){ 151 if (!state->selectedCollections.empty()) { 152 auto value = state->selectedCollections.front(); 153 state->selectedCollections.pop_front(); 154 state->collectionLifetime.unsubscribe(); 155 state->subscribe_to(value); 156 } else if (!state->sourceLifetime.is_subscribed()) { 157 state->out.on_completed(); 158 } 159 } 160 ); 161 auto selectedSinkInner = on_exception( 162 [&](){return state->coordinator.out(sinkInner);}, 163 state->out); 164 if (selectedSinkInner.empty()) { 165 return; 166 } 167 selectedSource->subscribe(std::move(selectedSinkInner.get())); 168 } 169 observable<source_value_type, source_operator_type> source; 170 composite_subscription sourceLifetime; 171 composite_subscription collectionLifetime; 172 std::deque<collection_type> selectedCollections; 173 coordinator_type coordinator; 174 output_type out; 175 }; 176 177 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); 178 179 // take a copy of the values for each subscription 180 auto state = std::make_shared<concat_state_type>(initial, std::move(coordinator), std::move(scbr)); 181 182 state->sourceLifetime = composite_subscription(); 183 184 // when the out observer is unsubscribed all the 185 // inner subscriptions are unsubscribed as well 186 state->out.add(state->sourceLifetime); 187 188 auto source = on_exception( 189 [&](){return state->coordinator.in(state->source);}, 190 state->out); 191 if (source.empty()) { 192 return; 193 } 194 195 // this subscribe does not share the observer subscription 196 // so that when it is unsubscribed the observer can be called 197 // until the inner subscriptions have finished 198 auto sink = make_subscriber<collection_type>( 199 state->out, 200 state->sourceLifetime, 201 // on_next 202 [state](collection_type st) { 203 if (state->collectionLifetime.is_subscribed()) { 204 state->selectedCollections.push_back(st); 205 } else if (state->selectedCollections.empty()) { 206 state->subscribe_to(st); 207 } 208 }, 209 // on_error 210 [state](rxu::error_ptr e) { 211 state->out.on_error(e); 212 }, 213 // on_completed 214 [state]() { 215 if (!state->collectionLifetime.is_subscribed() && state->selectedCollections.empty()) { 216 state->out.on_completed(); 217 } 218 } 219 ); 220 auto selectedSink = on_exception( 221 [&](){return state->coordinator.out(sink);}, 222 state->out); 223 if (selectedSink.empty()) { 224 return; 225 } 226 source->subscribe(std::move(selectedSink.get())); 227 } 228 }; 229 230 } 231 232 /*! @copydoc rx-concat.hpp 233 */ 234 template<class... AN> 235 auto concat(AN&&... an) 236 -> operator_factory<concat_tag, AN...> { 237 return operator_factory<concat_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 238 } 239 240 } 241 242 template<> 243 struct member_overload<concat_tag> 244 { 245 template<class Observable, 246 class Enabled = rxu::enable_if_all_true_type_t< 247 is_observable<Observable>>, 248 class SourceValue = rxu::value_type_t<Observable>, 249 class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, identity_one_worker>, 250 class Value = rxu::value_type_t<SourceValue>, 251 class Result = observable<Value, Concat> 252 > 253 static Result member(Observable&& o) { 254 return Result(Concat(std::forward<Observable>(o), identity_current_thread())); 255 } 256 257 template<class Observable, class Coordination, 258 class Enabled = rxu::enable_if_all_true_type_t< 259 is_observable<Observable>, 260 is_coordination<Coordination>>, 261 class SourceValue = rxu::value_type_t<Observable>, 262 class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>, 263 class Value = rxu::value_type_t<SourceValue>, 264 class Result = observable<Value, Concat> 265 > 266 static Result member(Observable&& o, Coordination&& cn) { 267 return Result(Concat(std::forward<Observable>(o), std::forward<Coordination>(cn))); 268 } 269 270 template<class Observable, class Value0, class... ValueN, 271 class Enabled = rxu::enable_if_all_true_type_t< 272 all_observables<Observable, Value0, ValueN...>>, 273 class EmittedValue = rxu::value_type_t<Observable>, 274 class SourceValue = observable<EmittedValue>, 275 class ObservableObservable = observable<SourceValue>, 276 class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, identity_one_worker>::type, 277 class Value = rxu::value_type_t<Concat>, 278 class Result = observable<Value, Concat> 279 > 280 static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) { 281 return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread())); 282 } 283 284 template<class Observable, class Coordination, class Value0, class... ValueN, 285 class Enabled = rxu::enable_if_all_true_type_t< 286 all_observables<Observable, Value0, ValueN...>, 287 is_coordination<Coordination>>, 288 class EmittedValue = rxu::value_type_t<Observable>, 289 class SourceValue = observable<EmittedValue>, 290 class ObservableObservable = observable<SourceValue>, 291 class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type, 292 class Value = rxu::value_type_t<Concat>, 293 class Result = observable<Value, Concat> 294 > 295 static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) { 296 return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn))); 297 } 298 299 template<class... AN> 300 static operators::detail::concat_invalid_t<AN...> member(AN...) { 301 std::terminate(); 302 return {}; 303 static_assert(sizeof...(AN) == 10000, "concat takes (optional Coordination, optional Value0, optional ValueN...)"); 304 } 305 }; 306 307 } 308 309 #endif 310