1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-timestamp.hpp 6 7 \brief Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted. 8 9 \tparam Coordination the type of the scheduler (optional). 10 11 \param coordination the scheduler to manage timeout for each event (optional). 12 13 \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }. 14 15 \sample 16 \snippet timestamp.cpp timestamp sample 17 \snippet output.txt timestamp sample 18 */ 19 20 #if !defined(RXCPP_OPERATORS_RX_TIMESTAMP_HPP) 21 #define RXCPP_OPERATORS_RX_TIMESTAMP_HPP 22 23 #include "../rx-includes.hpp" 24 25 namespace rxcpp { 26 27 namespace operators { 28 29 namespace detail { 30 31 template<class... AN> 32 struct timestamp_invalid_arguments {}; 33 34 template<class... AN> 35 struct timestamp_invalid : public rxo::operator_base<timestamp_invalid_arguments<AN...>> { 36 using type = observable<timestamp_invalid_arguments<AN...>, timestamp_invalid<AN...>>; 37 }; 38 template<class... AN> 39 using timestamp_invalid_t = typename timestamp_invalid<AN...>::type; 40 41 template<class T, class Coordination> 42 struct timestamp 43 { 44 typedef rxu::decay_t<T> source_value_type; 45 typedef rxu::decay_t<Coordination> coordination_type; 46 47 struct timestamp_values { 48 timestamp_values(coordination_type c) 49 : coordination(c) 50 { 51 } 52 53 coordination_type coordination; 54 }; 55 timestamp_values initial; 56 57 timestamp(coordination_type coordination) 58 : initial(coordination) 59 { 60 } 61 62 template<class Subscriber> 63 struct timestamp_observer 64 { 65 typedef timestamp_observer<Subscriber> this_type; 66 typedef source_value_type value_type; 67 typedef rxu::decay_t<Subscriber> dest_type; 68 typedef observer<value_type, this_type> observer_type; 69 dest_type dest; 70 coordination_type coord; 71 72 timestamp_observer(dest_type d, coordination_type coordination) 73 : dest(std::move(d)), 74 coord(std::move(coordination)) 75 { 76 } 77 78 void on_next(source_value_type v) const { 79 dest.on_next(std::make_pair(v, coord.now())); 80 } 81 void on_error(rxu::error_ptr e) const { 82 dest.on_error(e); 83 } 84 void on_completed() const { 85 dest.on_completed(); 86 } 87 88 static subscriber<value_type, observer_type> make(dest_type d, timestamp_values v) { 89 return make_subscriber<value_type>(d, this_type(d, v.coordination)); 90 } 91 }; 92 93 template<class Subscriber> 94 auto operator()(Subscriber dest) const 95 -> decltype(timestamp_observer<Subscriber>::make(std::move(dest), initial)) { 96 return timestamp_observer<Subscriber>::make(std::move(dest), initial); 97 } 98 }; 99 100 } 101 102 /*! @copydoc rx-timestamp.hpp 103 */ 104 template<class... AN> 105 auto timestamp(AN&&... an) 106 -> operator_factory<timestamp_tag, AN...> { 107 return operator_factory<timestamp_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 108 } 109 110 } 111 112 template<> 113 struct member_overload<timestamp_tag> 114 { 115 template<class Observable, 116 class Enabled = rxu::enable_if_all_true_type_t< 117 is_observable<Observable>>, 118 class SourceValue = rxu::value_type_t<Observable>, 119 class Timestamp = rxo::detail::timestamp<SourceValue, identity_one_worker>, 120 class Clock = typename rxsc::scheduler::clock_type::time_point, 121 class Value = std::pair<SourceValue, Clock>> 122 static auto member(Observable&& o) 123 -> decltype(o.template lift<Value>(Timestamp(identity_current_thread()))) { 124 return o.template lift<Value>(Timestamp(identity_current_thread())); 125 } 126 127 template<class Observable, class Coordination, 128 class Enabled = rxu::enable_if_all_true_type_t< 129 is_observable<Observable>, 130 is_coordination<Coordination>>, 131 class SourceValue = rxu::value_type_t<Observable>, 132 class Timestamp = rxo::detail::timestamp<SourceValue, rxu::decay_t<Coordination>>, 133 class Clock = typename rxsc::scheduler::clock_type::time_point, 134 class Value = std::pair<SourceValue, Clock>> 135 static auto member(Observable&& o, Coordination&& cn) 136 -> decltype(o.template lift<Value>(Timestamp(std::forward<Coordination>(cn)))) { 137 return o.template lift<Value>(Timestamp(std::forward<Coordination>(cn))); 138 } 139 140 template<class... AN> 141 static operators::detail::timestamp_invalid_t<AN...> member(AN...) { 142 std::terminate(); 143 return {}; 144 static_assert(sizeof...(AN) == 10000, "timestamp takes (optional Coordination)"); 145 } 146 }; 147 148 } 149 150 #endif 151