Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-reduce.hpp
      6 
      7     \brief For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned.
      8 
      9     \tparam Seed            the type of the initial value for the accumulator
     10     \tparam Accumulator     the type of the data accumulating function
     11     \tparam ResultSelector  the type of the result producing function
     12 
     13     \param seed  the initial value for the accumulator
     14     \param a     an accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call
     15     \param rs    a result producing function that makes the final value from the last accumulator call result
     16 
     17     \return  An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable.
     18 
     19     Some basic reduce-type operators have already been implemented:
     20     - rxcpp::operators::first
     21     - rxcpp::operators::last
     22     - rxcpp::operators::count
     23     - rxcpp::operators::sum
     24     - rxcpp::operators::average
     25     - rxcpp::operators::min
     26     - rxcpp::operators::max
     27 
     28     \sample
     29     Geometric mean of source values:
     30     \snippet reduce.cpp reduce sample
     31     \snippet output.txt reduce sample
     32 
     33     If the source observable completes without emitting any items, the resulting observable emits the result of passing the initial seed to the result selector:
     34     \snippet reduce.cpp reduce empty sample
     35     \snippet output.txt reduce empty sample
     36 
     37     If the accumulator raises an exception, it is returned by the resulting observable in on_error:
     38     \snippet reduce.cpp reduce exception from accumulator sample
     39     \snippet output.txt reduce exception from accumulator sample
     40 
     41     The same for exceptions raised by the result selector:
     42     \snippet reduce.cpp reduce exception from result selector sample
     43     \snippet output.txt reduce exception from result selector sample
     44 */
     45 
     46 #if !defined(RXCPP_OPERATORS_RX_REDUCE_HPP)
     47 #define RXCPP_OPERATORS_RX_REDUCE_HPP
     48 
     49 #include "../rx-includes.hpp"
     50 
     51 namespace rxcpp {
     52 
     53 namespace operators {
     54 
     55 namespace detail {
     56 
     57 template<class... AN>
     58 struct reduce_invalid_arguments {};
     59 
     60 template<class... AN>
     61 struct reduce_invalid : public rxo::operator_base<reduce_invalid_arguments<AN...>> {
     62     using type = observable<reduce_invalid_arguments<AN...>, reduce_invalid<AN...>>;
     63 };
     64 template<class... AN>
     65 using reduce_invalid_t = typename reduce_invalid<AN...>::type;
     66 
     67 template<class Seed, class ResultSelector>
     68 struct is_result_function_for {
     69 
     70     typedef rxu::decay_t<ResultSelector> result_selector_type;
     71     typedef rxu::decay_t<Seed> seed_type;
     72 
     73     struct tag_not_valid {};
     74 
     75     template<class CS, class CRS>
     76     static auto check(int) -> decltype((*(CRS*)nullptr)(*(CS*)nullptr));
     77     template<class CS, class CRS>
     78     static tag_not_valid check(...);
     79 
     80     typedef rxu::decay_t<decltype(check<seed_type, result_selector_type>(0))> type;
     81     static const bool value = !std::is_same<type, tag_not_valid>::value;
     82 };
     83 
     84 template<class T, class Observable, class Accumulator, class ResultSelector, class Seed>
     85 struct reduce_traits
     86 {
     87     typedef rxu::decay_t<Observable> source_type;
     88     typedef rxu::decay_t<Accumulator> accumulator_type;
     89     typedef rxu::decay_t<ResultSelector> result_selector_type;
     90     typedef rxu::decay_t<Seed> seed_type;
     91 
     92     typedef T source_value_type;
     93 
     94     typedef typename is_result_function_for<seed_type, result_selector_type>::type value_type;
     95 };
     96 
     97 template<class T, class Observable, class Accumulator, class ResultSelector, class Seed>
     98 struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, Observable, Accumulator, ResultSelector, Seed>>>
     99 {
    100     typedef reduce<T, Observable, Accumulator, ResultSelector, Seed> this_type;
    101     typedef reduce_traits<T, Observable, Accumulator, ResultSelector, Seed> traits;
    102 
    103     typedef typename traits::source_type source_type;
    104     typedef typename traits::accumulator_type accumulator_type;
    105     typedef typename traits::result_selector_type result_selector_type;
    106     typedef typename traits::seed_type seed_type;
    107 
    108     typedef typename traits::source_value_type source_value_type;
    109     typedef typename traits::value_type value_type;
    110 
    111     struct reduce_initial_type
    112     {
    113         ~reduce_initial_type()
    114         {
    115         }
    116         reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
    117             : source(std::move(o))
    118             , accumulator(std::move(a))
    119             , result_selector(std::move(rs))
    120             , seed(std::move(s))
    121         {
    122         }
    123         source_type source;
    124         accumulator_type accumulator;
    125         result_selector_type result_selector;
    126         seed_type seed;
    127 
    128     private:
    129         reduce_initial_type& operator=(reduce_initial_type o) RXCPP_DELETE;
    130     };
    131     reduce_initial_type initial;
    132 
    133     ~reduce()
    134     {
    135     }
    136     reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
    137         : initial(std::move(o), std::move(a), std::move(rs), std::move(s))
    138     {
    139     }
    140     template<class Subscriber>
    141     void on_subscribe(Subscriber o) const {
    142         struct reduce_state_type
    143             : public reduce_initial_type
    144             , public std::enable_shared_from_this<reduce_state_type>
    145         {
    146             reduce_state_type(reduce_initial_type i, Subscriber scrbr)
    147                 : reduce_initial_type(i)
    148                 , source(i.source)
    149                 , current(reduce_initial_type::seed)
    150                 , out(std::move(scrbr))
    151             {
    152             }
    153             source_type source;
    154             seed_type current;
    155             Subscriber out;
    156 
    157         private:
    158             reduce_state_type& operator=(reduce_state_type o) RXCPP_DELETE;
    159         };
    160         auto state = std::make_shared<reduce_state_type>(initial, std::move(o));
    161         state->source.subscribe(
    162             state->out,
    163         // on_next
    164             [state](T t) {
    165                 seed_type next = state->accumulator(std::move(state->current), std::move(t));
    166                 state->current = std::move(next);
    167             },
    168         // on_error
    169             [state](rxu::error_ptr e) {
    170                 state->out.on_error(e);
    171             },
    172         // on_completed
    173             [state]() {
    174                 auto result = on_exception(
    175                     [&](){return state->result_selector(std::move(state->current));},
    176                     state->out);
    177                 if (result.empty()) {
    178                     return;
    179                 }
    180                 state->out.on_next(std::move(result.get()));
    181                 state->out.on_completed();
    182             }
    183         );
    184     }
    185 private:
    186     reduce& operator=(reduce o) RXCPP_DELETE;
    187 };
    188 
    189 template<class T>
    190 struct initialize_seeder {
    191     typedef T seed_type;
    192     static seed_type seed() {
    193         return seed_type{};
    194     }
    195 };
    196 
    197 template<class T>
    198 struct average {
    199     struct seed_type
    200     {
    201         seed_type()
    202             : value()
    203             , count(0)
    204         {
    205         }
    206         rxu::maybe<T> value;
    207         int count;
    208         rxu::detail::maybe<double> stage;
    209     };
    210     static seed_type seed() {
    211         return seed_type{};
    212     }
    213     template<class U>
    214     seed_type operator()(seed_type a, U&& v) {
    215         if (a.count != 0 &&
    216             (a.count == std::numeric_limits<int>::max() ||
    217             ((v > 0) && (*(a.value) > (std::numeric_limits<T>::max() - v))) ||
    218             ((v < 0) && (*(a.value) < (std::numeric_limits<T>::min() - v))))) {
    219             // would overflow, calc existing and reset for next batch
    220             // this will add error to the final result, but the alternative
    221             // is to fail on overflow
    222             double avg = static_cast<double>(*(a.value)) / a.count;
    223             if (!a.stage.empty()) {
    224                 a.stage.reset((*a.stage + avg) / 2);
    225             } else {
    226                 a.stage.reset(avg);
    227             }
    228             a.value.reset(std::forward<U>(v));
    229             a.count = 1;
    230         } else if (a.value.empty()) {
    231             a.value.reset(std::forward<U>(v));
    232             a.count = 1;
    233         } else {
    234             *(a.value) += v;
    235             ++a.count;
    236         }
    237         return a;
    238     }
    239     double operator()(seed_type a) {
    240         if (!a.value.empty()) {
    241             double avg = static_cast<double>(*(a.value)) / a.count;
    242             if (!a.stage.empty()) {
    243                 avg = (*a.stage + avg) / 2;
    244             }
    245             return avg;
    246         }
    247         rxu::throw_exception(rxcpp::empty_error("average() requires a stream with at least one value"));
    248     }
    249 };
    250 
    251 template<class T>
    252 struct sum {
    253     typedef rxu::maybe<T> seed_type;
    254     static seed_type seed() {
    255         return seed_type();
    256     }
    257     template<class U>
    258     seed_type operator()(seed_type a, U&& v) const {
    259         if (a.empty())
    260             a.reset(std::forward<U>(v));
    261         else
    262             *a = *a + v;
    263         return a;
    264     }
    265     T operator()(seed_type a) const {
    266         if (a.empty())
    267             rxu::throw_exception(rxcpp::empty_error("sum() requires a stream with at least one value"));
    268         return *a;
    269     }
    270 };
    271 
    272 template<class T>
    273 struct max {
    274     typedef rxu::maybe<T> seed_type;
    275     static seed_type seed() {
    276         return seed_type();
    277     }
    278     template<class U>
    279     seed_type operator()(seed_type a, U&& v) {
    280         if (a.empty() || *a < v)
    281             a.reset(std::forward<U>(v));
    282         return a;
    283     }
    284     T operator()(seed_type a) {
    285         if (a.empty())
    286             rxu::throw_exception(rxcpp::empty_error("max() requires a stream with at least one value"));
    287         return *a;
    288     }
    289 };
    290 
    291 template<class T>
    292 struct min {
    293     typedef rxu::maybe<T> seed_type;
    294     static seed_type seed() {
    295         return seed_type();
    296     }
    297     template<class U>
    298     seed_type operator()(seed_type a, U&& v) {
    299         if (a.empty() || v < *a)
    300             a.reset(std::forward<U>(v));
    301         return a;
    302     }
    303     T operator()(seed_type a) {
    304         if (a.empty())
    305             rxu::throw_exception(rxcpp::empty_error("min() requires a stream with at least one value"));
    306         return *a;
    307     }
    308 };
    309 
    310 template<class T>
    311 struct first {
    312     using seed_type = rxu::maybe<T>;
    313     static seed_type seed() {
    314         return seed_type();
    315     }
    316     template<class U>
    317     seed_type operator()(seed_type a, U&& v) {
    318         a.reset(std::forward<U>(v));
    319         return a;
    320     }
    321     T operator()(seed_type a) {
    322         if (a.empty()) {
    323             rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
    324         }
    325         return *a;
    326     }
    327 };
    328 
    329 template<class T>
    330 struct last {
    331     using seed_type = rxu::maybe<T>;
    332     static seed_type seed() {
    333         return seed_type();
    334     }
    335     template<class U>
    336     seed_type operator()(seed_type a, U&& v) {
    337         a.reset(std::forward<U>(v));
    338         return a;
    339     }
    340     T operator()(seed_type a) {
    341         if (a.empty()) {
    342             rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
    343         }
    344         return *a;
    345     }
    346 };
    347 
    348 }
    349 
    350 /*! @copydoc rx-reduce.hpp
    351 */
    352 template<class... AN>
    353 auto reduce(AN&&... an)
    354     ->     operator_factory<reduce_tag, AN...> {
    355     return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    356 }
    357 
    358 /*! @copydoc rx-reduce.hpp
    359 */
    360 template<class... AN>
    361 auto accumulate(AN&&... an)
    362     ->     operator_factory<reduce_tag, AN...> {
    363     return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    364 }
    365 
    366 /*! \brief For each item from this observable reduce it by sending only the first item.
    367 
    368     \return  An observable that emits only the very first item emitted by the source observable.
    369 
    370     \sample
    371     \snippet math.cpp first sample
    372     \snippet output.txt first sample
    373 
    374     When the source observable calls on_error:
    375     \snippet math.cpp first empty sample
    376     \snippet output.txt first empty sample
    377 */
    378 inline auto first()
    379     ->     operator_factory<first_tag> {
    380     return operator_factory<first_tag>(std::tuple<>{});
    381 }
    382 
    383 /*! \brief For each item from this observable reduce it by sending only the last item.
    384 
    385     \return  An observable that emits only the very last item emitted by the source observable.
    386 
    387     \sample
    388     \snippet math.cpp last sample
    389     \snippet output.txt last sample
    390 
    391     When the source observable calls on_error:
    392     \snippet math.cpp last empty sample
    393     \snippet output.txt last empty sample
    394 */
    395 inline auto last()
    396     ->     operator_factory<last_tag> {
    397     return operator_factory<last_tag>(std::tuple<>{});
    398 }
    399 
    400 /*! \brief For each item from this observable reduce it by incrementing a count.
    401 
    402     \return  An observable that emits a single item: the number of elements emitted by the source observable.
    403 
    404     \sample
    405     \snippet math.cpp count sample
    406     \snippet output.txt count sample
    407 
    408     When the source observable calls on_error:
    409     \snippet math.cpp count error sample
    410     \snippet output.txt count error sample
    411 */
    412 inline auto count()
    413     ->     operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>> {
    414     return operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>>(std::make_tuple(0, rxu::count(), rxu::take_at<0>()));
    415 }
    416 
    417 /*! \brief For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end.
    418 
    419     \return  An observable that emits a single item: the average of elements emitted by the source observable.
    420 
    421     \sample
    422     \snippet math.cpp average sample
    423     \snippet output.txt average sample
    424 
    425     When the source observable completes without emitting any items:
    426     \snippet math.cpp average empty sample
    427     \snippet output.txt average empty sample
    428 
    429     When the source observable calls on_error:
    430     \snippet math.cpp average error sample
    431     \snippet output.txt average error sample
    432 */
    433 inline auto average()
    434     ->     operator_factory<average_tag> {
    435     return operator_factory<average_tag>(std::tuple<>{});
    436 }
    437 
    438 /*! \brief For each item from this observable reduce it by adding to the previous items.
    439 
    440     \return  An observable that emits a single item: the sum of elements emitted by the source observable.
    441 
    442     \sample
    443     \snippet math.cpp sum sample
    444     \snippet output.txt sum sample
    445 
    446     When the source observable completes without emitting any items:
    447     \snippet math.cpp sum empty sample
    448     \snippet output.txt sum empty sample
    449 
    450     When the source observable calls on_error:
    451     \snippet math.cpp sum error sample
    452     \snippet output.txt sum error sample
    453 */
    454 inline auto sum()
    455     ->     operator_factory<sum_tag> {
    456     return operator_factory<sum_tag>(std::tuple<>{});
    457 }
    458 
    459 /*! \brief For each item from this observable reduce it by taking the min value of the previous items.
    460 
    461     \return  An observable that emits a single item: the min of elements emitted by the source observable.
    462 
    463     \sample
    464     \snippet math.cpp min sample
    465     \snippet output.txt min sample
    466 
    467     When the source observable completes without emitting any items:
    468     \snippet math.cpp min empty sample
    469     \snippet output.txt min empty sample
    470 
    471     When the source observable calls on_error:
    472     \snippet math.cpp min error sample
    473     \snippet output.txt min error sample
    474 */
    475 inline auto min()
    476     ->     operator_factory<min_tag> {
    477     return operator_factory<min_tag>(std::tuple<>{});
    478 }
    479 
    480 /*! \brief For each item from this observable reduce it by taking the max value of the previous items.
    481 
    482     \return  An observable that emits a single item: the max of elements emitted by the source observable.
    483 
    484     \sample
    485     \snippet math.cpp max sample
    486     \snippet output.txt max sample
    487 
    488     When the source observable completes without emitting any items:
    489     \snippet math.cpp max empty sample
    490     \snippet output.txt max empty sample
    491 
    492     When the source observable calls on_error:
    493     \snippet math.cpp max error sample
    494     \snippet output.txt max error sample
    495 */
    496 inline auto max()
    497     ->     operator_factory<max_tag> {
    498     return operator_factory<max_tag>(std::tuple<>{});
    499 }
    500 
    501 }
    502 
    503 template<>
    504 struct member_overload<reduce_tag>
    505 {
    506 
    507     template<class Observable, class Seed, class Accumulator, class ResultSelector,
    508         class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
    509         class Value = rxu::value_type_t<Reduce>,
    510         class Result = observable<Value, Reduce>>
    511     static Result member(Observable&& o, Seed&& s, Accumulator&& a, ResultSelector&& r)
    512     {
    513         return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), std::forward<ResultSelector>(r), std::forward<Seed>(s)));
    514     }
    515 
    516     template<class Observable, class Seed, class Accumulator,
    517         class ResultSelector=rxu::detail::take_at<0>,
    518         class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
    519         class Value = rxu::value_type_t<Reduce>,
    520         class Result = observable<Value, Reduce>>
    521     static Result member(Observable&& o, Seed&& s, Accumulator&& a)
    522     {
    523         return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), rxu::detail::take_at<0>(), std::forward<Seed>(s)));
    524     }
    525 
    526     template<class... AN>
    527     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
    528         std::terminate();
    529         return {};
    530         static_assert(sizeof...(AN) == 10000, "reduce takes (Seed, Accumulator, optional ResultSelector), Accumulator takes (Seed, Observable::value_type) -> Seed, ResultSelector takes (Observable::value_type) -> ResultValue");
    531     }
    532 };
    533 
    534 template<>
    535 struct member_overload<first_tag>
    536 {
    537     template<class Observable,
    538         class SValue = rxu::value_type_t<Observable>,
    539         class Operation = operators::detail::first<SValue>,
    540         class Seed = decltype(Operation::seed()),
    541         class Accumulator = Operation,
    542         class ResultSelector = Operation,
    543         class TakeOne = decltype(((rxu::decay_t<Observable>*)nullptr)->take(1)),
    544         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<TakeOne>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
    545         class RValue = rxu::value_type_t<Reduce>,
    546         class Result = observable<RValue, Reduce>>
    547     static Result member(Observable&& o)
    548     {
    549         return Result(Reduce(o.take(1), Operation{}, Operation{}, Operation::seed()));
    550     }
    551 
    552     template<class... AN>
    553     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
    554         std::terminate();
    555         return {};
    556         static_assert(sizeof...(AN) == 10000, "first does not support Observable::value_type");
    557     }
    558 };
    559 
    560 template<>
    561 struct member_overload<last_tag>
    562 {
    563     template<class Observable,
    564         class SValue = rxu::value_type_t<Observable>,
    565         class Operation = operators::detail::last<SValue>,
    566         class Seed = decltype(Operation::seed()),
    567         class Accumulator = Operation,
    568         class ResultSelector = Operation,
    569         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
    570         class RValue = rxu::value_type_t<Reduce>,
    571         class Result = observable<RValue, Reduce>>
    572     static Result member(Observable&& o)
    573     {
    574         return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
    575     }
    576 
    577     template<class... AN>
    578     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
    579         std::terminate();
    580         return {};
    581         static_assert(sizeof...(AN) == 10000, "last does not support Observable::value_type");
    582     }
    583 };
    584 
    585 template<>
    586 struct member_overload<sum_tag>
    587 {
    588     template<class Observable,
    589         class SValue = rxu::value_type_t<Observable>,
    590         class Operation = operators::detail::sum<SValue>,
    591         class Seed = decltype(Operation::seed()),
    592         class Accumulator = Operation,
    593         class ResultSelector = Operation,
    594         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
    595         class RValue = rxu::value_type_t<Reduce>,
    596         class Result = observable<RValue, Reduce>>
    597     static Result member(Observable&& o)
    598     {
    599         return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
    600     }
    601 
    602     template<class... AN>
    603     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
    604         std::terminate();
    605         return {};
    606         static_assert(sizeof...(AN) == 10000, "sum does not support Observable::value_type");
    607     }
    608 };
    609 
    610 template<>
    611 struct member_overload<average_tag>
    612 {
    613     template<class Observable,
    614         class SValue = rxu::value_type_t<Observable>,
    615         class Operation = operators::detail::average<SValue>,
    616         class Seed = decltype(Operation::seed()),
    617         class Accumulator = Operation,
    618         class ResultSelector = Operation,
    619         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
    620         class RValue = rxu::value_type_t<Reduce>,
    621         class Result = observable<RValue, Reduce>>
    622     static Result member(Observable&& o)
    623     {
    624         return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
    625     }
    626 
    627     template<class... AN>
    628     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
    629         std::terminate();
    630         return {};
    631         static_assert(sizeof...(AN) == 10000, "average does not support Observable::value_type");
    632     }
    633 };
    634 
    635 template<>
    636 struct member_overload<max_tag>
    637 {
    638     template<class Observable,
    639         class SValue = rxu::value_type_t<Observable>,
    640         class Operation = operators::detail::max<SValue>,
    641         class Seed = decltype(Operation::seed()),
    642         class Accumulator = Operation,
    643         class ResultSelector = Operation,
    644         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
    645         class RValue = rxu::value_type_t<Reduce>,
    646         class Result = observable<RValue, Reduce>>
    647     static Result member(Observable&& o)
    648     {
    649         return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
    650     }
    651 
    652     template<class... AN>
    653     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
    654         std::terminate();
    655         return {};
    656         static_assert(sizeof...(AN) == 10000, "max does not support Observable::value_type");
    657     }
    658 };
    659 
    660 template<>
    661 struct member_overload<min_tag>
    662 {
    663     template<class Observable,
    664         class SValue = rxu::value_type_t<Observable>,
    665         class Operation = operators::detail::min<SValue>,
    666         class Seed = decltype(Operation::seed()),
    667         class Accumulator = Operation,
    668         class ResultSelector = Operation,
    669         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
    670         class RValue = rxu::value_type_t<Reduce>,
    671         class Result = observable<RValue, Reduce>>
    672     static Result member(Observable&& o)
    673     {
    674         return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
    675     }
    676 
    677     template<class... AN>
    678     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
    679         std::terminate();
    680         return {};
    681         static_assert(sizeof...(AN) == 10000, "min does not support Observable::value_type");
    682     }
    683 };
    684 
    685 }
    686 
    687 #endif
    688