Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-distinct.hpp
      6 
      7     \brief For each item from this observable, filter out repeated values and emit only items that have not already been emitted.
      8 
      9     \return Observable that emits those items from the source observable that are distinct.
     10 
     11     \note istinct keeps an unordered_set<T> of past values. Due to an issue in multiple implementations of std::hash<T>, rxcpp maintains a whitelist of hashable types. new types can be added by specializing rxcpp::filtered_hash<T>
     12 
     13     \sample
     14     \snippet distinct.cpp distinct sample
     15     \snippet output.txt distinct sample
     16 */
     17 
     18 #if !defined(RXCPP_OPERATORS_RX_DISTINCT_HPP)
     19 #define RXCPP_OPERATORS_RX_DISTINCT_HPP
     20 
     21 #include "../rx-includes.hpp"
     22 
     23 namespace rxcpp {
     24 
     25 namespace operators {
     26 
     27 namespace detail {
     28 
     29 template<class... AN>
     30 struct distinct_invalid_arguments {};
     31 
     32 template<class... AN>
     33 struct distinct_invalid : public rxo::operator_base<distinct_invalid_arguments<AN...>> {
     34     using type = observable<distinct_invalid_arguments<AN...>, distinct_invalid<AN...>>;
     35 };
     36 template<class... AN>
     37 using distinct_invalid_t = typename distinct_invalid<AN...>::type;
     38 
     39 template<class T>
     40 struct distinct
     41 {
     42     typedef rxu::decay_t<T> source_value_type;
     43 
     44     template<class Subscriber>
     45     struct distinct_observer
     46     {
     47         typedef distinct_observer<Subscriber> this_type;
     48         typedef source_value_type value_type;
     49         typedef rxu::decay_t<Subscriber> dest_type;
     50         typedef observer<value_type, this_type> observer_type;
     51         dest_type dest;
     52         mutable std::unordered_set<source_value_type, rxcpp::filtered_hash<source_value_type>> remembered;
     53 
     54         distinct_observer(dest_type d)
     55                 : dest(d)
     56         {
     57         }
     58         void on_next(source_value_type v) const {
     59             if (remembered.empty() || remembered.count(v) == 0) {
     60                 remembered.insert(v);
     61                 dest.on_next(v);
     62             }
     63         }
     64         void on_error(rxu::error_ptr e) const {
     65             dest.on_error(e);
     66         }
     67         void on_completed() const {
     68             dest.on_completed();
     69         }
     70 
     71         static subscriber<value_type, observer<value_type, this_type>> make(dest_type d) {
     72             return make_subscriber<value_type>(d, this_type(d));
     73         }
     74     };
     75 
     76     template<class Subscriber>
     77     auto operator()(Subscriber dest) const
     78     -> decltype(distinct_observer<Subscriber>::make(std::move(dest))) {
     79         return      distinct_observer<Subscriber>::make(std::move(dest));
     80     }
     81 };
     82 
     83 }
     84 
     85 /*! @copydoc rx-distinct.hpp
     86 */
     87 template<class... AN>
     88 auto distinct(AN&&... an)
     89     ->     operator_factory<distinct_tag, AN...> {
     90     return operator_factory<distinct_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
     91 }
     92 
     93 }
     94 
     95 template<>
     96 struct member_overload<distinct_tag>
     97 {
     98     template<class Observable,
     99             class SourceValue = rxu::value_type_t<Observable>,
    100             class Enabled = rxu::enable_if_all_true_type_t<
    101                 is_observable<Observable>,
    102                 is_hashable<SourceValue>>,
    103             class Distinct = rxo::detail::distinct<SourceValue>>
    104     static auto member(Observable&& o)
    105     -> decltype(o.template lift<SourceValue>(Distinct())) {
    106         return  o.template lift<SourceValue>(Distinct());
    107     }
    108 
    109     template<class... AN>
    110     static operators::detail::distinct_invalid_t<AN...> member(AN...) {
    111         std::terminate();
    112         return {};
    113         static_assert(sizeof...(AN) == 10000, "distinct takes no arguments");
    114     }
    115 };
    116 
    117 }
    118 
    119 #endif
    120