Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-buffer_count.hpp
      6 
      7     \brief Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable.
      8            If the skip parameter is set, return an observable that emits buffers every skip items containing at most count items from the source observable.
      9 
     10     \param count  the maximum size of each buffers before it should be emitted.
     11     \param skip   how many items need to be skipped before starting a new buffers (optional).
     12 
     13     \return  Observable that emits connected, non-overlapping buffers, each containing at most count items from the source observable.
     14              If the skip parameter is set, return an Observable that emits buffers every skip items containing at most count items from the source observable.
     15 
     16     \sample
     17     \snippet buffer.cpp buffer count sample
     18     \snippet output.txt buffer count sample
     19 
     20     \sample
     21     \snippet buffer.cpp buffer count+skip sample
     22     \snippet output.txt buffer count+skip sample
     23 */
     24 
     25 #if !defined(RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP)
     26 #define RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP
     27 
     28 #include "../rx-includes.hpp"
     29 
     30 namespace rxcpp {
     31 
     32 namespace operators {
     33 
     34 namespace detail {
     35 
     36 template<class... AN>
     37 struct buffer_count_invalid_arguments {};
     38 
     39 template<class... AN>
     40 struct buffer_count_invalid : public rxo::operator_base<buffer_count_invalid_arguments<AN...>> {
     41     using type = observable<buffer_count_invalid_arguments<AN...>, buffer_count_invalid<AN...>>;
     42 };
     43 template<class... AN>
     44 using buffer_count_invalid_t = typename buffer_count_invalid<AN...>::type;
     45 
     46 template<class T>
     47 struct buffer_count
     48 {
     49     typedef rxu::decay_t<T> source_value_type;
     50     typedef std::vector<source_value_type> value_type;
     51 
     52     struct buffer_count_values
     53     {
     54         buffer_count_values(int c, int s)
     55             : count(c)
     56             , skip(s)
     57         {
     58         }
     59         int count;
     60         int skip;
     61     };
     62 
     63     buffer_count_values initial;
     64 
     65     buffer_count(int count, int skip)
     66         : initial(count, skip)
     67     {
     68     }
     69 
     70     template<class Subscriber>
     71     struct buffer_count_observer : public buffer_count_values
     72     {
     73         typedef buffer_count_observer<Subscriber> this_type;
     74         typedef std::vector<T> value_type;
     75         typedef rxu::decay_t<Subscriber> dest_type;
     76         typedef observer<value_type, this_type> observer_type;
     77         dest_type dest;
     78         mutable int cursor;
     79         mutable std::deque<value_type> chunks;
     80 
     81         buffer_count_observer(dest_type d, buffer_count_values v)
     82             : buffer_count_values(v)
     83             , dest(std::move(d))
     84             , cursor(0)
     85         {
     86         }
     87         void on_next(T v) const {
     88             if (cursor++ % this->skip == 0) {
     89                 chunks.emplace_back();
     90             }
     91             for(auto& chunk : chunks) {
     92                 chunk.push_back(v);
     93             }
     94             while (!chunks.empty() && int(chunks.front().size()) == this->count) {
     95                 dest.on_next(std::move(chunks.front()));
     96                 chunks.pop_front();
     97             }
     98         }
     99         void on_error(rxu::error_ptr e) const {
    100             dest.on_error(e);
    101         }
    102         void on_completed() const {
    103             auto done = on_exception(
    104                 [&](){
    105                     while (!chunks.empty()) {
    106                         dest.on_next(std::move(chunks.front()));
    107                         chunks.pop_front();
    108                     }
    109                     return true;
    110                 },
    111                 dest);
    112             if (done.empty()) {
    113                 return;
    114             }
    115             dest.on_completed();
    116         }
    117 
    118         static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_count_values v) {
    119             auto cs = d.get_subscription();
    120             return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v)));
    121         }
    122     };
    123 
    124     template<class Subscriber>
    125     auto operator()(Subscriber dest) const
    126         -> decltype(buffer_count_observer<Subscriber>::make(std::move(dest), initial)) {
    127         return      buffer_count_observer<Subscriber>::make(std::move(dest), initial);
    128     }
    129 };
    130 
    131 }
    132 
    133 /*! @copydoc rx-buffer_count.hpp
    134 */
    135 template<class... AN>
    136 auto buffer(AN&&... an)
    137     ->      operator_factory<buffer_count_tag, AN...> {
    138      return operator_factory<buffer_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    139 }
    140 
    141 }
    142 
    143 template<>
    144 struct member_overload<buffer_count_tag>
    145 {
    146     template<class Observable,
    147         class Enabled = rxu::enable_if_all_true_type_t<
    148             is_observable<Observable>>,
    149         class SourceValue = rxu::value_type_t<Observable>,
    150         class BufferCount = rxo::detail::buffer_count<SourceValue>,
    151         class Value = rxu::value_type_t<BufferCount>>
    152     static auto member(Observable&& o, int count, int skip)
    153         -> decltype(o.template lift<Value>(BufferCount(count, skip))) {
    154         return      o.template lift<Value>(BufferCount(count, skip));
    155     }
    156 
    157      template<class Observable,
    158         class Enabled = rxu::enable_if_all_true_type_t<
    159             is_observable<Observable>>,
    160         class SourceValue = rxu::value_type_t<Observable>,
    161         class BufferCount = rxo::detail::buffer_count<SourceValue>,
    162         class Value = rxu::value_type_t<BufferCount>>
    163     static auto member(Observable&& o, int count)
    164         -> decltype(o.template lift<Value>(BufferCount(count, count))) {
    165         return      o.template lift<Value>(BufferCount(count, count));
    166     }
    167 
    168     template<class... AN>
    169     static operators::detail::buffer_count_invalid_t<AN...> member(AN...) {
    170         std::terminate();
    171         return {};
    172         static_assert(sizeof...(AN) == 10000, "buffer takes (Count, optional Skip)");
    173     }
    174 };
    175 
    176 }
    177 
    178 #endif
    179