Home | History | Annotate | Download | only in sources
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_SOURCES_RX_INTERVAL_HPP)
      6 #define RXCPP_SOURCES_RX_INTERVAL_HPP
      7 
      8 #include "../rx-includes.hpp"
      9 
     10 /*! \file rx-interval.hpp
     11 
     12     \brief Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
     13 
     14     \tparam Coordination  the type of the scheduler (optional)
     15 
     16     \param  period   period between emitted values
     17     \param  cn       the scheduler to use for scheduling the items (optional)
     18 
     19     \return  Observable that sends a sequential integer each time interval
     20 
     21     \sample
     22     \snippet interval.cpp interval sample
     23     \snippet output.txt interval sample
     24 
     25     \sample
     26     \snippet interval.cpp immediate interval sample
     27     \snippet output.txt immediate interval sample
     28 
     29     \sample
     30     \snippet interval.cpp threaded interval sample
     31     \snippet output.txt threaded interval sample
     32 
     33     \sample
     34     \snippet interval.cpp threaded immediate interval sample
     35     \snippet output.txt threaded immediate interval sample
     36 */
     37 
     38 namespace rxcpp {
     39 
     40 namespace sources {
     41 
     42 namespace detail {
     43 
     44 template<class Coordination>
     45 struct interval : public source_base<long>
     46 {
     47     typedef interval<Coordination> this_type;
     48 
     49     typedef rxu::decay_t<Coordination> coordination_type;
     50     typedef typename coordination_type::coordinator_type coordinator_type;
     51 
     52     struct interval_initial_type
     53     {
     54         interval_initial_type(rxsc::scheduler::clock_type::time_point i, rxsc::scheduler::clock_type::duration p, coordination_type cn)
     55             : initial(i)
     56             , period(p)
     57             , coordination(std::move(cn))
     58         {
     59         }
     60         rxsc::scheduler::clock_type::time_point initial;
     61         rxsc::scheduler::clock_type::duration period;
     62         coordination_type coordination;
     63     };
     64     interval_initial_type initial;
     65 
     66     interval(rxsc::scheduler::clock_type::time_point i, rxsc::scheduler::clock_type::duration p, coordination_type cn)
     67         : initial(i, p, std::move(cn))
     68     {
     69     }
     70     template<class Subscriber>
     71     void on_subscribe(Subscriber o) const {
     72         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
     73 
     74         // creates a worker whose lifetime is the same as this subscription
     75         auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
     76 
     77         auto controller = coordinator.get_worker();
     78 
     79         auto counter = std::make_shared<long>(0);
     80 
     81         auto producer = [o, counter](const rxsc::schedulable&) {
     82             // send next value
     83             o.on_next(++(*counter));
     84         };
     85 
     86         auto selectedProducer = on_exception(
     87             [&](){return coordinator.act(producer);},
     88             o);
     89         if (selectedProducer.empty()) {
     90             return;
     91         }
     92 
     93         controller.schedule_periodically(initial.initial, initial.period, selectedProducer.get());
     94     }
     95 };
     96 
     97 template<class Duration, class Coordination>
     98 struct defer_interval : public defer_observable<
     99     rxu::all_true<
    100         std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>::value,
    101         is_coordination<Coordination>::value>,
    102     void,
    103     interval, Coordination>
    104 {
    105 };
    106 
    107 }
    108 
    109 
    110 /*! @copydoc rx-interval.hpp
    111  */
    112 template<class Duration>
    113 auto interval(Duration period)
    114     ->  typename std::enable_if<
    115                     detail::defer_interval<Duration, identity_one_worker>::value,
    116         typename    detail::defer_interval<Duration, identity_one_worker>::observable_type>::type {
    117     return          detail::defer_interval<Duration, identity_one_worker>::make(identity_current_thread().now(), period, identity_current_thread());
    118 }
    119 
    120 /*! @copydoc rx-interval.hpp
    121  */
    122 template<class Coordination>
    123 auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
    124     ->  typename std::enable_if<
    125                     detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::value,
    126         typename    detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::observable_type>::type {
    127     return          detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::make(cn.now(), period, std::move(cn));
    128 }
    129 
    130 /*! @copydoc rx-interval.hpp
    131  */
    132 template<class Duration>
    133 auto interval(rxsc::scheduler::clock_type::time_point when, Duration period)
    134     ->  typename std::enable_if<
    135                     detail::defer_interval<Duration, identity_one_worker>::value,
    136         typename    detail::defer_interval<Duration, identity_one_worker>::observable_type>::type {
    137     return          detail::defer_interval<Duration, identity_one_worker>::make(when, period, identity_current_thread());
    138 }
    139 
    140 /*! @copydoc rx-interval.hpp
    141  */
    142 template<class Coordination>
    143 auto interval(rxsc::scheduler::clock_type::time_point when, rxsc::scheduler::clock_type::duration period, Coordination cn)
    144     ->  typename std::enable_if<
    145                     detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::value,
    146         typename    detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::observable_type>::type {
    147     return          detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::make(when, period, std::move(cn));
    148 }
    149 
    150 }
    151 
    152 }
    153 
    154 #endif
    155