1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-skip_while.hpp 6 7 \brief Discards the first items fulfilling the predicate from this observable emit them from the new observable that is returned. 8 9 \tparam Predicate the type of the predicate 10 11 \param t the predicate 12 13 \return An observable that discards the first items until condition emitted by the source Observable is not fulfilling the predicate, or all of the items from the source observable if the predicate never returns false 14 15 \sample 16 \snippet skip_while.cpp skip_while sample 17 \snippet output.txt skip_while sample 18 */ 19 20 #if !defined(RXCPP_OPERATORS_RX_SKIP_WHILE_HPP) 21 #define RXCPP_OPERATORS_RX_SKIP_WHILE_HPP 22 23 #include "../rx-includes.hpp" 24 25 namespace rxcpp { 26 27 namespace operators { 28 29 namespace detail { 30 31 template<class... AN> 32 struct skip_while_invalid_arguments {}; 33 34 template<class... AN> 35 struct skip_while_invalid : public rxo::operator_base<skip_while_invalid_arguments<AN...>> { 36 using type = observable<skip_while_invalid_arguments<AN...>, skip_while_invalid<AN...>>; 37 }; 38 template<class... AN> 39 using skip_while_invalid_t = typename skip_while_invalid<AN...>::type; 40 41 template<class T, class Predicate> 42 struct skip_while 43 { 44 typedef rxu::decay_t<T> source_value_type; 45 typedef rxu::decay_t<Predicate> test_type; 46 test_type test; 47 48 49 skip_while(test_type t) 50 : test(std::move(t)) 51 { 52 } 53 54 template<class Subscriber> 55 struct skip_while_observer 56 { 57 typedef skip_while_observer<Subscriber> this_type; 58 typedef source_value_type value_type; 59 typedef rxu::decay_t<Subscriber> dest_type; 60 typedef observer<value_type, this_type> observer_type; 61 dest_type dest; 62 test_type test; 63 bool pass; 64 65 skip_while_observer(dest_type d, test_type t) 66 : dest(std::move(d)) 67 , test(std::move(t)), 68 pass(false) 69 { 70 } 71 void on_next(source_value_type v) { 72 if(pass || !test(v)) 73 { 74 pass = true; 75 dest.on_next(v); 76 } 77 } 78 void on_error(rxu::error_ptr e) const { 79 dest.on_error(e); 80 } 81 void on_completed() const { 82 dest.on_completed(); 83 } 84 85 static subscriber<value_type, observer_type> make(dest_type d, test_type t) { 86 return make_subscriber<value_type>(d, this_type(d, std::move(t))); 87 } 88 }; 89 90 template<class Subscriber> 91 auto operator()(Subscriber dest) const 92 -> decltype(skip_while_observer<Subscriber>::make(std::move(dest), test)) { 93 return skip_while_observer<Subscriber>::make(std::move(dest), test); 94 } 95 }; 96 97 } 98 99 /*! @copydoc rx-skip_while.hpp 100 */ 101 template<class... AN> 102 auto skip_while(AN&&... an) 103 -> operator_factory<skip_while_tag, AN...> { 104 return operator_factory<skip_while_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 105 } 106 107 } 108 109 template<> 110 struct member_overload<skip_while_tag> 111 { 112 template<class Observable, class Predicate, 113 class SourceValue = rxu::value_type_t<Observable>, 114 class TakeWhile = rxo::detail::skip_while<SourceValue, rxu::decay_t<Predicate>>> 115 static auto member(Observable&& o, Predicate&& p) 116 -> decltype(o.template lift<SourceValue>(TakeWhile(std::forward<Predicate>(p)))) { 117 return o.template lift<SourceValue>(TakeWhile(std::forward<Predicate>(p))); 118 } 119 120 template<class... AN> 121 static operators::detail::skip_while_invalid_t<AN...> member(const AN&...) { 122 std::terminate(); 123 return {}; 124 static_assert(sizeof...(AN) == 10000, "skip_while takes (Predicate)"); 125 } 126 }; 127 128 129 } 130 131 #endif 132