Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-window_time.hpp
      6 
      7     \brief Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
      8            If the skip parameter is set, return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
      9 
     10     \tparam Duration      the type of time intervals.
     11     \tparam Coordination  the type of the scheduler (optional).
     12 
     13     \param period        the period of time each window collects items before it is completed.
     14     \param skip          the period of time after which a new window will be created.
     15     \param coordination  the scheduler for the windows (optional).
     16 
     17     \return  Observable that emits observables every period time interval and collect items from this observable for period of time into each produced observable.
     18              If the skip parameter is set, return an Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable.
     19 
     20     \sample
     21     \snippet window.cpp window period+skip+coordination sample
     22     \snippet output.txt window period+skip+coordination sample
     23 
     24     \sample
     25     \snippet window.cpp window period+skip sample
     26     \snippet output.txt window period+skip sample
     27 
     28     \sample
     29     \snippet window.cpp window period+coordination sample
     30     \snippet output.txt window period+coordination sample
     31 
     32     \sample
     33     \snippet window.cpp window period sample
     34     \snippet output.txt window period sample
     35 */
     36 
     37 #if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP)
     38 #define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP
     39 
     40 #include "../rx-includes.hpp"
     41 
     42 namespace rxcpp {
     43 
     44 namespace operators {
     45 
     46 namespace detail {
     47 
     48 template<class... AN>
     49 struct window_with_time_invalid_arguments {};
     50 
     51 template<class... AN>
     52 struct window_with_time_invalid : public rxo::operator_base<window_with_time_invalid_arguments<AN...>> {
     53     using type = observable<window_with_time_invalid_arguments<AN...>, window_with_time_invalid<AN...>>;
     54 };
     55 template<class... AN>
     56 using window_with_time_invalid_t = typename window_with_time_invalid<AN...>::type;
     57 
     58 template<class T, class Duration, class Coordination>
     59 struct window_with_time
     60 {
     61     typedef rxu::decay_t<T> source_value_type;
     62     typedef observable<source_value_type> value_type;
     63     typedef rxu::decay_t<Coordination> coordination_type;
     64     typedef typename coordination_type::coordinator_type coordinator_type;
     65     typedef rxu::decay_t<Duration> duration_type;
     66 
     67     struct window_with_time_values
     68     {
     69         window_with_time_values(duration_type p, duration_type s, coordination_type c)
     70             : period(p)
     71             , skip(s)
     72             , coordination(c)
     73         {
     74         }
     75         duration_type period;
     76         duration_type skip;
     77         coordination_type coordination;
     78     };
     79     window_with_time_values initial;
     80 
     81     window_with_time(duration_type period, duration_type skip, coordination_type coordination)
     82         : initial(period, skip, coordination)
     83     {
     84     }
     85 
     86     template<class Subscriber>
     87     struct window_with_time_observer
     88     {
     89         typedef window_with_time_observer<Subscriber> this_type;
     90         typedef rxu::decay_t<T> value_type;
     91         typedef rxu::decay_t<Subscriber> dest_type;
     92         typedef observer<T, this_type> observer_type;
     93 
     94         struct window_with_time_subscriber_values : public window_with_time_values
     95         {
     96             window_with_time_subscriber_values(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c)
     97                 : window_with_time_values(v)
     98                 , cs(std::move(cs))
     99                 , dest(std::move(d))
    100                 , coordinator(std::move(c))
    101                 , worker(coordinator.get_worker())
    102                 , expected(worker.now())
    103             {
    104             }
    105             composite_subscription cs;
    106             dest_type dest;
    107             coordinator_type coordinator;
    108             rxsc::worker worker;
    109             mutable std::deque<rxcpp::subjects::subject<T>> subj;
    110             rxsc::scheduler::clock_type::time_point expected;
    111         };
    112         std::shared_ptr<window_with_time_subscriber_values> state;
    113 
    114         window_with_time_observer(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c)
    115             : state(std::make_shared<window_with_time_subscriber_values>(window_with_time_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
    116         {
    117             auto localState = state;
    118 
    119             auto disposer = [=](const rxsc::schedulable&){
    120                 localState->cs.unsubscribe();
    121                 localState->dest.unsubscribe();
    122                 localState->worker.unsubscribe();
    123             };
    124             auto selectedDisposer = on_exception(
    125                 [&](){return localState->coordinator.act(disposer);},
    126                 localState->dest);
    127             if (selectedDisposer.empty()) {
    128                 return;
    129             }
    130 
    131             localState->dest.add([=](){
    132                 localState->worker.schedule(selectedDisposer.get());
    133             });
    134             localState->cs.add([=](){
    135                 localState->worker.schedule(selectedDisposer.get());
    136             });
    137 
    138             //
    139             // The scheduler is FIFO for any time T. Since the observer is scheduling
    140             // on_next/on_error/oncompleted the timed schedule calls must be resheduled
    141             // when they occur to ensure that production happens after on_next/on_error/oncompleted
    142             //
    143 
    144             auto release_window = [localState](const rxsc::schedulable&) {
    145                 localState->worker.schedule([localState](const rxsc::schedulable&) {
    146                     localState->subj[0].get_subscriber().on_completed();
    147                     localState->subj.pop_front();
    148                 });
    149             };
    150             auto selectedRelease = on_exception(
    151                 [&](){return localState->coordinator.act(release_window);},
    152                 localState->dest);
    153             if (selectedRelease.empty()) {
    154                 return;
    155             }
    156 
    157             auto create_window = [localState, selectedRelease](const rxsc::schedulable&) {
    158                 localState->subj.push_back(rxcpp::subjects::subject<T>());
    159                 localState->dest.on_next(localState->subj[localState->subj.size() - 1].get_observable().as_dynamic());
    160 
    161                 auto produce_at = localState->expected + localState->period;
    162                 localState->expected += localState->skip;
    163                 localState->worker.schedule(produce_at, [localState, selectedRelease](const rxsc::schedulable&) {
    164                     localState->worker.schedule(selectedRelease.get());
    165                 });
    166             };
    167             auto selectedCreate = on_exception(
    168                 [&](){return localState->coordinator.act(create_window);},
    169                 localState->dest);
    170             if (selectedCreate.empty()) {
    171                 return;
    172             }
    173 
    174             state->worker.schedule_periodically(
    175                 state->expected,
    176                 state->skip,
    177                 [localState, selectedCreate](const rxsc::schedulable&) {
    178                     localState->worker.schedule(selectedCreate.get());
    179                 });
    180         }
    181 
    182         void on_next(T v) const {
    183             auto localState = state;
    184             auto work = [v, localState](const rxsc::schedulable&){
    185                 for (auto s : localState->subj) {
    186                     s.get_subscriber().on_next(v);
    187                 }
    188             };
    189             auto selectedWork = on_exception(
    190                 [&](){return localState->coordinator.act(work);},
    191                 localState->dest);
    192             if (selectedWork.empty()) {
    193                 return;
    194             }
    195             localState->worker.schedule(selectedWork.get());
    196         }
    197 
    198         void on_error(rxu::error_ptr e) const {
    199             auto localState = state;
    200             auto work = [e, localState](const rxsc::schedulable&){
    201                 for (auto s : localState->subj) {
    202                     s.get_subscriber().on_error(e);
    203                 }
    204                 localState->dest.on_error(e);
    205             };
    206             auto selectedWork = on_exception(
    207                 [&](){return localState->coordinator.act(work);},
    208                 localState->dest);
    209             if (selectedWork.empty()) {
    210                 return;
    211             }
    212             localState->worker.schedule(selectedWork.get());
    213         }
    214 
    215         void on_completed() const {
    216             auto localState = state;
    217             auto work = [localState](const rxsc::schedulable&){
    218                 for (auto s : localState->subj) {
    219                     s.get_subscriber().on_completed();
    220                 }
    221                 localState->dest.on_completed();
    222             };
    223             auto selectedWork = on_exception(
    224                 [&](){return localState->coordinator.act(work);},
    225                 localState->dest);
    226             if (selectedWork.empty()) {
    227                 return;
    228             }
    229             localState->worker.schedule(selectedWork.get());
    230         }
    231 
    232         static subscriber<T, observer_type> make(dest_type d, window_with_time_values v) {
    233             auto cs = composite_subscription();
    234             auto coordinator = v.coordination.create_coordinator();
    235 
    236             return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
    237         }
    238     };
    239 
    240     template<class Subscriber>
    241     auto operator()(Subscriber dest) const
    242         -> decltype(window_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
    243         return      window_with_time_observer<Subscriber>::make(std::move(dest), initial);
    244     }
    245 };
    246 
    247 }
    248 
    249 /*! @copydoc rx-window_time.hpp
    250 */
    251 template<class... AN>
    252 auto window_with_time(AN&&... an)
    253     ->      operator_factory<window_with_time_tag, AN...> {
    254      return operator_factory<window_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    255 }
    256 
    257 }
    258 
    259 template<>
    260 struct member_overload<window_with_time_tag>
    261 {
    262     template<class Observable, class Duration,
    263         class Enabled = rxu::enable_if_all_true_type_t<
    264             is_observable<Observable>,
    265             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
    266         class SourceValue = rxu::value_type_t<Observable>,
    267         class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
    268         class Value = rxu::value_type_t<WindowWithTime>>
    269     static auto member(Observable&& o, Duration period)
    270         -> decltype(o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()))) {
    271         return      o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()));
    272     }
    273 
    274     template<class Observable, class Duration, class Coordination,
    275         class Enabled = rxu::enable_if_all_true_type_t<
    276             is_observable<Observable>,
    277             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
    278             is_coordination<Coordination>>,
    279         class SourceValue = rxu::value_type_t<Observable>,
    280         class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
    281         class Value = rxu::value_type_t<WindowWithTime>>
    282     static auto member(Observable&& o, Duration period, Coordination&& cn)
    283         -> decltype(o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)))) {
    284         return      o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)));
    285     }
    286 
    287     template<class Observable, class Duration,
    288         class Enabled = rxu::enable_if_all_true_type_t<
    289             is_observable<Observable>,
    290             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
    291         class SourceValue = rxu::value_type_t<Observable>,
    292         class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
    293         class Value = rxu::value_type_t<WindowWithTime>>
    294     static auto member(Observable&& o, Duration&& period, Duration&& skip)
    295         -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()))) {
    296         return      o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()));
    297     }
    298 
    299     template<class Observable, class Duration, class Coordination,
    300         class Enabled = rxu::enable_if_all_true_type_t<
    301             is_observable<Observable>,
    302             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
    303             is_coordination<Coordination>>,
    304         class SourceValue = rxu::value_type_t<Observable>,
    305         class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
    306         class Value = rxu::value_type_t<WindowWithTime>>
    307     static auto member(Observable&& o, Duration&& period, Duration&& skip, Coordination&& cn)
    308         -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)))) {
    309         return      o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)));
    310     }
    311 
    312     template<class... AN>
    313     static operators::detail::window_with_time_invalid_t<AN...> member(AN...) {
    314         std::terminate();
    315         return {};
    316         static_assert(sizeof...(AN) == 10000, "window_with_time takes (Duration, optional Duration, optional Coordination)");
    317     }
    318 };
    319 
    320 }
    321 
    322 #endif
    323