1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-buffer_count.hpp 6 7 \brief Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable. 8 If the skip parameter is set, return an observable that emits buffers every skip items containing at most count items from the source observable. 9 10 \param count the maximum size of each buffers before it should be emitted. 11 \param skip how many items need to be skipped before starting a new buffers (optional). 12 13 \return Observable that emits connected, non-overlapping buffers, each containing at most count items from the source observable. 14 If the skip parameter is set, return an Observable that emits buffers every skip items containing at most count items from the source observable. 15 16 \sample 17 \snippet buffer.cpp buffer count sample 18 \snippet output.txt buffer count sample 19 20 \sample 21 \snippet buffer.cpp buffer count+skip sample 22 \snippet output.txt buffer count+skip sample 23 */ 24 25 #if !defined(RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP) 26 #define RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP 27 28 #include "../rx-includes.hpp" 29 30 namespace rxcpp { 31 32 namespace operators { 33 34 namespace detail { 35 36 template<class... AN> 37 struct buffer_count_invalid_arguments {}; 38 39 template<class... AN> 40 struct buffer_count_invalid : public rxo::operator_base<buffer_count_invalid_arguments<AN...>> { 41 using type = observable<buffer_count_invalid_arguments<AN...>, buffer_count_invalid<AN...>>; 42 }; 43 template<class... AN> 44 using buffer_count_invalid_t = typename buffer_count_invalid<AN...>::type; 45 46 template<class T> 47 struct buffer_count 48 { 49 typedef rxu::decay_t<T> source_value_type; 50 typedef std::vector<source_value_type> value_type; 51 52 struct buffer_count_values 53 { 54 buffer_count_values(int c, int s) 55 : count(c) 56 , skip(s) 57 { 58 } 59 int count; 60 int skip; 61 }; 62 63 buffer_count_values initial; 64 65 buffer_count(int count, int skip) 66 : initial(count, skip) 67 { 68 } 69 70 template<class Subscriber> 71 struct buffer_count_observer : public buffer_count_values 72 { 73 typedef buffer_count_observer<Subscriber> this_type; 74 typedef std::vector<T> value_type; 75 typedef rxu::decay_t<Subscriber> dest_type; 76 typedef observer<value_type, this_type> observer_type; 77 dest_type dest; 78 mutable int cursor; 79 mutable std::deque<value_type> chunks; 80 81 buffer_count_observer(dest_type d, buffer_count_values v) 82 : buffer_count_values(v) 83 , dest(std::move(d)) 84 , cursor(0) 85 { 86 } 87 void on_next(T v) const { 88 if (cursor++ % this->skip == 0) { 89 chunks.emplace_back(); 90 } 91 for(auto& chunk : chunks) { 92 chunk.push_back(v); 93 } 94 while (!chunks.empty() && int(chunks.front().size()) == this->count) { 95 dest.on_next(std::move(chunks.front())); 96 chunks.pop_front(); 97 } 98 } 99 void on_error(rxu::error_ptr e) const { 100 dest.on_error(e); 101 } 102 void on_completed() const { 103 auto done = on_exception( 104 [&](){ 105 while (!chunks.empty()) { 106 dest.on_next(std::move(chunks.front())); 107 chunks.pop_front(); 108 } 109 return true; 110 }, 111 dest); 112 if (done.empty()) { 113 return; 114 } 115 dest.on_completed(); 116 } 117 118 static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_count_values v) { 119 auto cs = d.get_subscription(); 120 return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v))); 121 } 122 }; 123 124 template<class Subscriber> 125 auto operator()(Subscriber dest) const 126 -> decltype(buffer_count_observer<Subscriber>::make(std::move(dest), initial)) { 127 return buffer_count_observer<Subscriber>::make(std::move(dest), initial); 128 } 129 }; 130 131 } 132 133 /*! @copydoc rx-buffer_count.hpp 134 */ 135 template<class... AN> 136 auto buffer(AN&&... an) 137 -> operator_factory<buffer_count_tag, AN...> { 138 return operator_factory<buffer_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 139 } 140 141 } 142 143 template<> 144 struct member_overload<buffer_count_tag> 145 { 146 template<class Observable, 147 class Enabled = rxu::enable_if_all_true_type_t< 148 is_observable<Observable>>, 149 class SourceValue = rxu::value_type_t<Observable>, 150 class BufferCount = rxo::detail::buffer_count<SourceValue>, 151 class Value = rxu::value_type_t<BufferCount>> 152 static auto member(Observable&& o, int count, int skip) 153 -> decltype(o.template lift<Value>(BufferCount(count, skip))) { 154 return o.template lift<Value>(BufferCount(count, skip)); 155 } 156 157 template<class Observable, 158 class Enabled = rxu::enable_if_all_true_type_t< 159 is_observable<Observable>>, 160 class SourceValue = rxu::value_type_t<Observable>, 161 class BufferCount = rxo::detail::buffer_count<SourceValue>, 162 class Value = rxu::value_type_t<BufferCount>> 163 static auto member(Observable&& o, int count) 164 -> decltype(o.template lift<Value>(BufferCount(count, count))) { 165 return o.template lift<Value>(BufferCount(count, count)); 166 } 167 168 template<class... AN> 169 static operators::detail::buffer_count_invalid_t<AN...> member(AN...) { 170 std::terminate(); 171 return {}; 172 static_assert(sizeof...(AN) == 10000, "buffer takes (Count, optional Skip)"); 173 } 174 }; 175 176 } 177 178 #endif 179