1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_SOURCES_RX_INTERVAL_HPP) 6 #define RXCPP_SOURCES_RX_INTERVAL_HPP 7 8 #include "../rx-includes.hpp" 9 10 /*! \file rx-interval.hpp 11 12 \brief Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler. 13 14 \tparam Coordination the type of the scheduler (optional) 15 16 \param period period between emitted values 17 \param cn the scheduler to use for scheduling the items (optional) 18 19 \return Observable that sends a sequential integer each time interval 20 21 \sample 22 \snippet interval.cpp interval sample 23 \snippet output.txt interval sample 24 25 \sample 26 \snippet interval.cpp immediate interval sample 27 \snippet output.txt immediate interval sample 28 29 \sample 30 \snippet interval.cpp threaded interval sample 31 \snippet output.txt threaded interval sample 32 33 \sample 34 \snippet interval.cpp threaded immediate interval sample 35 \snippet output.txt threaded immediate interval sample 36 */ 37 38 namespace rxcpp { 39 40 namespace sources { 41 42 namespace detail { 43 44 template<class Coordination> 45 struct interval : public source_base<long> 46 { 47 typedef interval<Coordination> this_type; 48 49 typedef rxu::decay_t<Coordination> coordination_type; 50 typedef typename coordination_type::coordinator_type coordinator_type; 51 52 struct interval_initial_type 53 { 54 interval_initial_type(rxsc::scheduler::clock_type::time_point i, rxsc::scheduler::clock_type::duration p, coordination_type cn) 55 : initial(i) 56 , period(p) 57 , coordination(std::move(cn)) 58 { 59 } 60 rxsc::scheduler::clock_type::time_point initial; 61 rxsc::scheduler::clock_type::duration period; 62 coordination_type coordination; 63 }; 64 interval_initial_type initial; 65 66 interval(rxsc::scheduler::clock_type::time_point i, rxsc::scheduler::clock_type::duration p, coordination_type cn) 67 : initial(i, p, std::move(cn)) 68 { 69 } 70 template<class Subscriber> 71 void on_subscribe(Subscriber o) const { 72 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); 73 74 // creates a worker whose lifetime is the same as this subscription 75 auto coordinator = initial.coordination.create_coordinator(o.get_subscription()); 76 77 auto controller = coordinator.get_worker(); 78 79 auto counter = std::make_shared<long>(0); 80 81 auto producer = [o, counter](const rxsc::schedulable&) { 82 // send next value 83 o.on_next(++(*counter)); 84 }; 85 86 auto selectedProducer = on_exception( 87 [&](){return coordinator.act(producer);}, 88 o); 89 if (selectedProducer.empty()) { 90 return; 91 } 92 93 controller.schedule_periodically(initial.initial, initial.period, selectedProducer.get()); 94 } 95 }; 96 97 template<class Duration, class Coordination> 98 struct defer_interval : public defer_observable< 99 rxu::all_true< 100 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>::value, 101 is_coordination<Coordination>::value>, 102 void, 103 interval, Coordination> 104 { 105 }; 106 107 } 108 109 110 /*! @copydoc rx-interval.hpp 111 */ 112 template<class Duration> 113 auto interval(Duration period) 114 -> typename std::enable_if< 115 detail::defer_interval<Duration, identity_one_worker>::value, 116 typename detail::defer_interval<Duration, identity_one_worker>::observable_type>::type { 117 return detail::defer_interval<Duration, identity_one_worker>::make(identity_current_thread().now(), period, identity_current_thread()); 118 } 119 120 /*! @copydoc rx-interval.hpp 121 */ 122 template<class Coordination> 123 auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) 124 -> typename std::enable_if< 125 detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::value, 126 typename detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::observable_type>::type { 127 return detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::make(cn.now(), period, std::move(cn)); 128 } 129 130 /*! @copydoc rx-interval.hpp 131 */ 132 template<class Duration> 133 auto interval(rxsc::scheduler::clock_type::time_point when, Duration period) 134 -> typename std::enable_if< 135 detail::defer_interval<Duration, identity_one_worker>::value, 136 typename detail::defer_interval<Duration, identity_one_worker>::observable_type>::type { 137 return detail::defer_interval<Duration, identity_one_worker>::make(when, period, identity_current_thread()); 138 } 139 140 /*! @copydoc rx-interval.hpp 141 */ 142 template<class Coordination> 143 auto interval(rxsc::scheduler::clock_type::time_point when, rxsc::scheduler::clock_type::duration period, Coordination cn) 144 -> typename std::enable_if< 145 detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::value, 146 typename detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::observable_type>::type { 147 return detail::defer_interval<rxsc::scheduler::clock_type::duration, Coordination>::make(when, period, std::move(cn)); 148 } 149 150 } 151 152 } 153 154 #endif 155