1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-distinct.hpp 6 7 \brief For each item from this observable, filter out repeated values and emit only items that have not already been emitted. 8 9 \return Observable that emits those items from the source observable that are distinct. 10 11 \note istinct keeps an unordered_set<T> of past values. Due to an issue in multiple implementations of std::hash<T>, rxcpp maintains a whitelist of hashable types. new types can be added by specializing rxcpp::filtered_hash<T> 12 13 \sample 14 \snippet distinct.cpp distinct sample 15 \snippet output.txt distinct sample 16 */ 17 18 #if !defined(RXCPP_OPERATORS_RX_DISTINCT_HPP) 19 #define RXCPP_OPERATORS_RX_DISTINCT_HPP 20 21 #include "../rx-includes.hpp" 22 23 namespace rxcpp { 24 25 namespace operators { 26 27 namespace detail { 28 29 template<class... AN> 30 struct distinct_invalid_arguments {}; 31 32 template<class... AN> 33 struct distinct_invalid : public rxo::operator_base<distinct_invalid_arguments<AN...>> { 34 using type = observable<distinct_invalid_arguments<AN...>, distinct_invalid<AN...>>; 35 }; 36 template<class... AN> 37 using distinct_invalid_t = typename distinct_invalid<AN...>::type; 38 39 template<class T> 40 struct distinct 41 { 42 typedef rxu::decay_t<T> source_value_type; 43 44 template<class Subscriber> 45 struct distinct_observer 46 { 47 typedef distinct_observer<Subscriber> this_type; 48 typedef source_value_type value_type; 49 typedef rxu::decay_t<Subscriber> dest_type; 50 typedef observer<value_type, this_type> observer_type; 51 dest_type dest; 52 mutable std::unordered_set<source_value_type, rxcpp::filtered_hash<source_value_type>> remembered; 53 54 distinct_observer(dest_type d) 55 : dest(d) 56 { 57 } 58 void on_next(source_value_type v) const { 59 if (remembered.empty() || remembered.count(v) == 0) { 60 remembered.insert(v); 61 dest.on_next(v); 62 } 63 } 64 void on_error(rxu::error_ptr e) const { 65 dest.on_error(e); 66 } 67 void on_completed() const { 68 dest.on_completed(); 69 } 70 71 static subscriber<value_type, observer<value_type, this_type>> make(dest_type d) { 72 return make_subscriber<value_type>(d, this_type(d)); 73 } 74 }; 75 76 template<class Subscriber> 77 auto operator()(Subscriber dest) const 78 -> decltype(distinct_observer<Subscriber>::make(std::move(dest))) { 79 return distinct_observer<Subscriber>::make(std::move(dest)); 80 } 81 }; 82 83 } 84 85 /*! @copydoc rx-distinct.hpp 86 */ 87 template<class... AN> 88 auto distinct(AN&&... an) 89 -> operator_factory<distinct_tag, AN...> { 90 return operator_factory<distinct_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 91 } 92 93 } 94 95 template<> 96 struct member_overload<distinct_tag> 97 { 98 template<class Observable, 99 class SourceValue = rxu::value_type_t<Observable>, 100 class Enabled = rxu::enable_if_all_true_type_t< 101 is_observable<Observable>, 102 is_hashable<SourceValue>>, 103 class Distinct = rxo::detail::distinct<SourceValue>> 104 static auto member(Observable&& o) 105 -> decltype(o.template lift<SourceValue>(Distinct())) { 106 return o.template lift<SourceValue>(Distinct()); 107 } 108 109 template<class... AN> 110 static operators::detail::distinct_invalid_t<AN...> member(AN...) { 111 std::terminate(); 112 return {}; 113 static_assert(sizeof...(AN) == 10000, "distinct takes no arguments"); 114 } 115 }; 116 117 } 118 119 #endif 120