Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-distinct_until_changed.hpp
      6 
      7     \brief For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned.
      8 
      9     \tparam BinaryPredicate (optional) the type of the value comparing function. The signature should be equivalent to the following: bool pred(const T1& a, const T2& b);
     10 
     11     \param pred (optional) the function that implements comparison of two values.
     12 
     13     \return  Observable that emits those items from the source observable that are distinct from their immediate predecessors.
     14 
     15     \sample
     16     \snippet distinct_until_changed.cpp distinct_until_changed sample
     17     \snippet output.txt distinct_until_changed sample
     18 */
     19 
     20 #if !defined(RXCPP_OPERATORS_RX_DISTINCT_UNTIL_CHANGED_HPP)
     21 #define RXCPP_OPERATORS_RX_DISTINCT_UNTIL_CHANGED_HPP
     22 
     23 #include "../rx-includes.hpp"
     24 
     25 namespace rxcpp {
     26 
     27 namespace operators {
     28 
     29 namespace detail {
     30 
     31 template<class... AN>
     32 struct distinct_until_changed_invalid_arguments {};
     33 
     34 template<class... AN>
     35 struct distinct_until_changed_invalid : public rxo::operator_base<distinct_until_changed_invalid_arguments<AN...>> {
     36     using type = observable<distinct_until_changed_invalid_arguments<AN...>, distinct_until_changed_invalid<AN...>>;
     37 };
     38 template<class... AN>
     39 using distinct_until_changed_invalid_t = typename distinct_until_changed_invalid<AN...>::type;
     40 
     41 template<class T, class BinaryPredicate>
     42 struct distinct_until_changed
     43 {
     44     typedef rxu::decay_t<T> source_value_type;
     45     typedef rxu::decay_t<BinaryPredicate> predicate_type;
     46 
     47     predicate_type pred;
     48 
     49     distinct_until_changed(predicate_type p)
     50     : pred(std::move(p))
     51     {
     52     }
     53 
     54     template<class Subscriber>
     55     struct distinct_until_changed_observer
     56     {
     57         typedef distinct_until_changed_observer<Subscriber> this_type;
     58         typedef source_value_type value_type;
     59         typedef rxu::decay_t<Subscriber> dest_type;
     60         typedef observer<value_type, this_type> observer_type;
     61 
     62         dest_type dest;
     63         predicate_type pred;
     64         mutable rxu::detail::maybe<source_value_type> remembered;
     65 
     66         distinct_until_changed_observer(dest_type d, predicate_type pred)
     67             : dest(std::move(d))
     68             , pred(std::move(pred))
     69         {
     70         }
     71         void on_next(source_value_type v) const {
     72             if (remembered.empty() || !pred(v, remembered.get())) {
     73                 remembered.reset(v);
     74                 dest.on_next(v);
     75             }
     76         }
     77         void on_error(rxu::error_ptr e) const {
     78             dest.on_error(e);
     79         }
     80         void on_completed() const {
     81             dest.on_completed();
     82         }
     83 
     84         static subscriber<value_type, observer_type> make(dest_type d, predicate_type p) {
     85             return make_subscriber<value_type>(d, this_type(d, std::move(p)));
     86         }
     87     };
     88 
     89     template<class Subscriber>
     90     auto operator()(Subscriber dest) const
     91         -> decltype(distinct_until_changed_observer<Subscriber>::make(std::move(dest), pred)) {
     92         return      distinct_until_changed_observer<Subscriber>::make(std::move(dest), pred);
     93     }
     94 };
     95 
     96 }
     97 
     98 /*! @copydoc rx-distinct_until_changed.hpp
     99 */
    100 template<class... AN>
    101 auto distinct_until_changed(AN&&... an)
    102     ->      operator_factory<distinct_until_changed_tag, AN...> {
    103      return operator_factory<distinct_until_changed_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    104 }
    105 
    106 }
    107 
    108 template<>
    109 struct member_overload<distinct_until_changed_tag>
    110 {
    111     template<class Observable,
    112             class SourceValue = rxu::value_type_t<Observable>,
    113             class Enabled = rxu::enable_if_all_true_type_t<
    114                 is_observable<Observable>>,
    115             class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue, rxu::equal_to<>>>
    116     static auto member(Observable&& o)
    117     -> decltype(o.template lift<SourceValue>(DistinctUntilChanged(rxu::equal_to<>()))) {
    118         return  o.template lift<SourceValue>(DistinctUntilChanged(rxu::equal_to<>()));
    119     }
    120 
    121     template<class Observable,
    122             class BinaryPredicate,
    123             class SourceValue = rxu::value_type_t<Observable>,
    124             class Enabled = rxu::enable_if_all_true_type_t<
    125             is_observable<Observable>>,
    126             class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue, BinaryPredicate>>
    127     static auto member(Observable&& o, BinaryPredicate&& pred)
    128     -> decltype(o.template lift<SourceValue>(DistinctUntilChanged(std::forward<BinaryPredicate>(pred)))) {
    129         return  o.template lift<SourceValue>(DistinctUntilChanged(std::forward<BinaryPredicate>(pred)));
    130     }
    131 
    132     template<class... AN>
    133     static operators::detail::distinct_until_changed_invalid_t<AN...> member(AN...) {
    134         std::terminate();
    135         return {};
    136         static_assert(sizeof...(AN) == 10000, "distinct_until_changed takes (optional BinaryPredicate)");
    137     }
    138 };
    139 
    140 }
    141 
    142 #endif
    143