1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-window_time.hpp 6 7 \brief Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. 8 If the skip parameter is set, return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. 9 10 \tparam Duration the type of time intervals. 11 \tparam Coordination the type of the scheduler (optional). 12 13 \param period the period of time each window collects items before it is completed. 14 \param skip the period of time after which a new window will be created. 15 \param coordination the scheduler for the windows (optional). 16 17 \return Observable that emits observables every period time interval and collect items from this observable for period of time into each produced observable. 18 If the skip parameter is set, return an Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable. 19 20 \sample 21 \snippet window.cpp window period+skip+coordination sample 22 \snippet output.txt window period+skip+coordination sample 23 24 \sample 25 \snippet window.cpp window period+skip sample 26 \snippet output.txt window period+skip sample 27 28 \sample 29 \snippet window.cpp window period+coordination sample 30 \snippet output.txt window period+coordination sample 31 32 \sample 33 \snippet window.cpp window period sample 34 \snippet output.txt window period sample 35 */ 36 37 #if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP) 38 #define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP 39 40 #include "../rx-includes.hpp" 41 42 namespace rxcpp { 43 44 namespace operators { 45 46 namespace detail { 47 48 template<class... AN> 49 struct window_with_time_invalid_arguments {}; 50 51 template<class... AN> 52 struct window_with_time_invalid : public rxo::operator_base<window_with_time_invalid_arguments<AN...>> { 53 using type = observable<window_with_time_invalid_arguments<AN...>, window_with_time_invalid<AN...>>; 54 }; 55 template<class... AN> 56 using window_with_time_invalid_t = typename window_with_time_invalid<AN...>::type; 57 58 template<class T, class Duration, class Coordination> 59 struct window_with_time 60 { 61 typedef rxu::decay_t<T> source_value_type; 62 typedef observable<source_value_type> value_type; 63 typedef rxu::decay_t<Coordination> coordination_type; 64 typedef typename coordination_type::coordinator_type coordinator_type; 65 typedef rxu::decay_t<Duration> duration_type; 66 67 struct window_with_time_values 68 { 69 window_with_time_values(duration_type p, duration_type s, coordination_type c) 70 : period(p) 71 , skip(s) 72 , coordination(c) 73 { 74 } 75 duration_type period; 76 duration_type skip; 77 coordination_type coordination; 78 }; 79 window_with_time_values initial; 80 81 window_with_time(duration_type period, duration_type skip, coordination_type coordination) 82 : initial(period, skip, coordination) 83 { 84 } 85 86 template<class Subscriber> 87 struct window_with_time_observer 88 { 89 typedef window_with_time_observer<Subscriber> this_type; 90 typedef rxu::decay_t<T> value_type; 91 typedef rxu::decay_t<Subscriber> dest_type; 92 typedef observer<T, this_type> observer_type; 93 94 struct window_with_time_subscriber_values : public window_with_time_values 95 { 96 window_with_time_subscriber_values(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c) 97 : window_with_time_values(v) 98 , cs(std::move(cs)) 99 , dest(std::move(d)) 100 , coordinator(std::move(c)) 101 , worker(coordinator.get_worker()) 102 , expected(worker.now()) 103 { 104 } 105 composite_subscription cs; 106 dest_type dest; 107 coordinator_type coordinator; 108 rxsc::worker worker; 109 mutable std::deque<rxcpp::subjects::subject<T>> subj; 110 rxsc::scheduler::clock_type::time_point expected; 111 }; 112 std::shared_ptr<window_with_time_subscriber_values> state; 113 114 window_with_time_observer(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c) 115 : state(std::make_shared<window_with_time_subscriber_values>(window_with_time_subscriber_values(std::move(cs), std::move(d), v, std::move(c)))) 116 { 117 auto localState = state; 118 119 auto disposer = [=](const rxsc::schedulable&){ 120 localState->cs.unsubscribe(); 121 localState->dest.unsubscribe(); 122 localState->worker.unsubscribe(); 123 }; 124 auto selectedDisposer = on_exception( 125 [&](){return localState->coordinator.act(disposer);}, 126 localState->dest); 127 if (selectedDisposer.empty()) { 128 return; 129 } 130 131 localState->dest.add([=](){ 132 localState->worker.schedule(selectedDisposer.get()); 133 }); 134 localState->cs.add([=](){ 135 localState->worker.schedule(selectedDisposer.get()); 136 }); 137 138 // 139 // The scheduler is FIFO for any time T. Since the observer is scheduling 140 // on_next/on_error/oncompleted the timed schedule calls must be resheduled 141 // when they occur to ensure that production happens after on_next/on_error/oncompleted 142 // 143 144 auto release_window = [localState](const rxsc::schedulable&) { 145 localState->worker.schedule([localState](const rxsc::schedulable&) { 146 localState->subj[0].get_subscriber().on_completed(); 147 localState->subj.pop_front(); 148 }); 149 }; 150 auto selectedRelease = on_exception( 151 [&](){return localState->coordinator.act(release_window);}, 152 localState->dest); 153 if (selectedRelease.empty()) { 154 return; 155 } 156 157 auto create_window = [localState, selectedRelease](const rxsc::schedulable&) { 158 localState->subj.push_back(rxcpp::subjects::subject<T>()); 159 localState->dest.on_next(localState->subj[localState->subj.size() - 1].get_observable().as_dynamic()); 160 161 auto produce_at = localState->expected + localState->period; 162 localState->expected += localState->skip; 163 localState->worker.schedule(produce_at, [localState, selectedRelease](const rxsc::schedulable&) { 164 localState->worker.schedule(selectedRelease.get()); 165 }); 166 }; 167 auto selectedCreate = on_exception( 168 [&](){return localState->coordinator.act(create_window);}, 169 localState->dest); 170 if (selectedCreate.empty()) { 171 return; 172 } 173 174 state->worker.schedule_periodically( 175 state->expected, 176 state->skip, 177 [localState, selectedCreate](const rxsc::schedulable&) { 178 localState->worker.schedule(selectedCreate.get()); 179 }); 180 } 181 182 void on_next(T v) const { 183 auto localState = state; 184 auto work = [v, localState](const rxsc::schedulable&){ 185 for (auto s : localState->subj) { 186 s.get_subscriber().on_next(v); 187 } 188 }; 189 auto selectedWork = on_exception( 190 [&](){return localState->coordinator.act(work);}, 191 localState->dest); 192 if (selectedWork.empty()) { 193 return; 194 } 195 localState->worker.schedule(selectedWork.get()); 196 } 197 198 void on_error(rxu::error_ptr e) const { 199 auto localState = state; 200 auto work = [e, localState](const rxsc::schedulable&){ 201 for (auto s : localState->subj) { 202 s.get_subscriber().on_error(e); 203 } 204 localState->dest.on_error(e); 205 }; 206 auto selectedWork = on_exception( 207 [&](){return localState->coordinator.act(work);}, 208 localState->dest); 209 if (selectedWork.empty()) { 210 return; 211 } 212 localState->worker.schedule(selectedWork.get()); 213 } 214 215 void on_completed() const { 216 auto localState = state; 217 auto work = [localState](const rxsc::schedulable&){ 218 for (auto s : localState->subj) { 219 s.get_subscriber().on_completed(); 220 } 221 localState->dest.on_completed(); 222 }; 223 auto selectedWork = on_exception( 224 [&](){return localState->coordinator.act(work);}, 225 localState->dest); 226 if (selectedWork.empty()) { 227 return; 228 } 229 localState->worker.schedule(selectedWork.get()); 230 } 231 232 static subscriber<T, observer_type> make(dest_type d, window_with_time_values v) { 233 auto cs = composite_subscription(); 234 auto coordinator = v.coordination.create_coordinator(); 235 236 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator)))); 237 } 238 }; 239 240 template<class Subscriber> 241 auto operator()(Subscriber dest) const 242 -> decltype(window_with_time_observer<Subscriber>::make(std::move(dest), initial)) { 243 return window_with_time_observer<Subscriber>::make(std::move(dest), initial); 244 } 245 }; 246 247 } 248 249 /*! @copydoc rx-window_time.hpp 250 */ 251 template<class... AN> 252 auto window_with_time(AN&&... an) 253 -> operator_factory<window_with_time_tag, AN...> { 254 return operator_factory<window_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 255 } 256 257 } 258 259 template<> 260 struct member_overload<window_with_time_tag> 261 { 262 template<class Observable, class Duration, 263 class Enabled = rxu::enable_if_all_true_type_t< 264 is_observable<Observable>, 265 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>, 266 class SourceValue = rxu::value_type_t<Observable>, 267 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>, 268 class Value = rxu::value_type_t<WindowWithTime>> 269 static auto member(Observable&& o, Duration period) 270 -> decltype(o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()))) { 271 return o.template lift<Value>(WindowWithTime(period, period, identity_current_thread())); 272 } 273 274 template<class Observable, class Duration, class Coordination, 275 class Enabled = rxu::enable_if_all_true_type_t< 276 is_observable<Observable>, 277 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>, 278 is_coordination<Coordination>>, 279 class SourceValue = rxu::value_type_t<Observable>, 280 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>, 281 class Value = rxu::value_type_t<WindowWithTime>> 282 static auto member(Observable&& o, Duration period, Coordination&& cn) 283 -> decltype(o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)))) { 284 return o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn))); 285 } 286 287 template<class Observable, class Duration, 288 class Enabled = rxu::enable_if_all_true_type_t< 289 is_observable<Observable>, 290 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>, 291 class SourceValue = rxu::value_type_t<Observable>, 292 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>, 293 class Value = rxu::value_type_t<WindowWithTime>> 294 static auto member(Observable&& o, Duration&& period, Duration&& skip) 295 -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()))) { 296 return o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread())); 297 } 298 299 template<class Observable, class Duration, class Coordination, 300 class Enabled = rxu::enable_if_all_true_type_t< 301 is_observable<Observable>, 302 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>, 303 is_coordination<Coordination>>, 304 class SourceValue = rxu::value_type_t<Observable>, 305 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>, 306 class Value = rxu::value_type_t<WindowWithTime>> 307 static auto member(Observable&& o, Duration&& period, Duration&& skip, Coordination&& cn) 308 -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)))) { 309 return o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn))); 310 } 311 312 template<class... AN> 313 static operators::detail::window_with_time_invalid_t<AN...> member(AN...) { 314 std::terminate(); 315 return {}; 316 static_assert(sizeof...(AN) == 10000, "window_with_time takes (Duration, optional Duration, optional Coordination)"); 317 } 318 }; 319 320 } 321 322 #endif 323