1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 /*! \file rx-reduce.hpp 6 7 \brief For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. 8 9 \tparam Seed the type of the initial value for the accumulator 10 \tparam Accumulator the type of the data accumulating function 11 \tparam ResultSelector the type of the result producing function 12 13 \param seed the initial value for the accumulator 14 \param a an accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call 15 \param rs a result producing function that makes the final value from the last accumulator call result 16 17 \return An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable. 18 19 Some basic reduce-type operators have already been implemented: 20 - rxcpp::operators::first 21 - rxcpp::operators::last 22 - rxcpp::operators::count 23 - rxcpp::operators::sum 24 - rxcpp::operators::average 25 - rxcpp::operators::min 26 - rxcpp::operators::max 27 28 \sample 29 Geometric mean of source values: 30 \snippet reduce.cpp reduce sample 31 \snippet output.txt reduce sample 32 33 If the source observable completes without emitting any items, the resulting observable emits the result of passing the initial seed to the result selector: 34 \snippet reduce.cpp reduce empty sample 35 \snippet output.txt reduce empty sample 36 37 If the accumulator raises an exception, it is returned by the resulting observable in on_error: 38 \snippet reduce.cpp reduce exception from accumulator sample 39 \snippet output.txt reduce exception from accumulator sample 40 41 The same for exceptions raised by the result selector: 42 \snippet reduce.cpp reduce exception from result selector sample 43 \snippet output.txt reduce exception from result selector sample 44 */ 45 46 #if !defined(RXCPP_OPERATORS_RX_REDUCE_HPP) 47 #define RXCPP_OPERATORS_RX_REDUCE_HPP 48 49 #include "../rx-includes.hpp" 50 51 namespace rxcpp { 52 53 namespace operators { 54 55 namespace detail { 56 57 template<class... AN> 58 struct reduce_invalid_arguments {}; 59 60 template<class... AN> 61 struct reduce_invalid : public rxo::operator_base<reduce_invalid_arguments<AN...>> { 62 using type = observable<reduce_invalid_arguments<AN...>, reduce_invalid<AN...>>; 63 }; 64 template<class... AN> 65 using reduce_invalid_t = typename reduce_invalid<AN...>::type; 66 67 template<class Seed, class ResultSelector> 68 struct is_result_function_for { 69 70 typedef rxu::decay_t<ResultSelector> result_selector_type; 71 typedef rxu::decay_t<Seed> seed_type; 72 73 struct tag_not_valid {}; 74 75 template<class CS, class CRS> 76 static auto check(int) -> decltype((*(CRS*)nullptr)(*(CS*)nullptr)); 77 template<class CS, class CRS> 78 static tag_not_valid check(...); 79 80 typedef rxu::decay_t<decltype(check<seed_type, result_selector_type>(0))> type; 81 static const bool value = !std::is_same<type, tag_not_valid>::value; 82 }; 83 84 template<class T, class Observable, class Accumulator, class ResultSelector, class Seed> 85 struct reduce_traits 86 { 87 typedef rxu::decay_t<Observable> source_type; 88 typedef rxu::decay_t<Accumulator> accumulator_type; 89 typedef rxu::decay_t<ResultSelector> result_selector_type; 90 typedef rxu::decay_t<Seed> seed_type; 91 92 typedef T source_value_type; 93 94 typedef typename is_result_function_for<seed_type, result_selector_type>::type value_type; 95 }; 96 97 template<class T, class Observable, class Accumulator, class ResultSelector, class Seed> 98 struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, Observable, Accumulator, ResultSelector, Seed>>> 99 { 100 typedef reduce<T, Observable, Accumulator, ResultSelector, Seed> this_type; 101 typedef reduce_traits<T, Observable, Accumulator, ResultSelector, Seed> traits; 102 103 typedef typename traits::source_type source_type; 104 typedef typename traits::accumulator_type accumulator_type; 105 typedef typename traits::result_selector_type result_selector_type; 106 typedef typename traits::seed_type seed_type; 107 108 typedef typename traits::source_value_type source_value_type; 109 typedef typename traits::value_type value_type; 110 111 struct reduce_initial_type 112 { 113 ~reduce_initial_type() 114 { 115 } 116 reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s) 117 : source(std::move(o)) 118 , accumulator(std::move(a)) 119 , result_selector(std::move(rs)) 120 , seed(std::move(s)) 121 { 122 } 123 source_type source; 124 accumulator_type accumulator; 125 result_selector_type result_selector; 126 seed_type seed; 127 128 private: 129 reduce_initial_type& operator=(reduce_initial_type o) RXCPP_DELETE; 130 }; 131 reduce_initial_type initial; 132 133 ~reduce() 134 { 135 } 136 reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s) 137 : initial(std::move(o), std::move(a), std::move(rs), std::move(s)) 138 { 139 } 140 template<class Subscriber> 141 void on_subscribe(Subscriber o) const { 142 struct reduce_state_type 143 : public reduce_initial_type 144 , public std::enable_shared_from_this<reduce_state_type> 145 { 146 reduce_state_type(reduce_initial_type i, Subscriber scrbr) 147 : reduce_initial_type(i) 148 , source(i.source) 149 , current(reduce_initial_type::seed) 150 , out(std::move(scrbr)) 151 { 152 } 153 source_type source; 154 seed_type current; 155 Subscriber out; 156 157 private: 158 reduce_state_type& operator=(reduce_state_type o) RXCPP_DELETE; 159 }; 160 auto state = std::make_shared<reduce_state_type>(initial, std::move(o)); 161 state->source.subscribe( 162 state->out, 163 // on_next 164 [state](T t) { 165 seed_type next = state->accumulator(std::move(state->current), std::move(t)); 166 state->current = std::move(next); 167 }, 168 // on_error 169 [state](rxu::error_ptr e) { 170 state->out.on_error(e); 171 }, 172 // on_completed 173 [state]() { 174 auto result = on_exception( 175 [&](){return state->result_selector(std::move(state->current));}, 176 state->out); 177 if (result.empty()) { 178 return; 179 } 180 state->out.on_next(std::move(result.get())); 181 state->out.on_completed(); 182 } 183 ); 184 } 185 private: 186 reduce& operator=(reduce o) RXCPP_DELETE; 187 }; 188 189 template<class T> 190 struct initialize_seeder { 191 typedef T seed_type; 192 static seed_type seed() { 193 return seed_type{}; 194 } 195 }; 196 197 template<class T> 198 struct average { 199 struct seed_type 200 { 201 seed_type() 202 : value() 203 , count(0) 204 { 205 } 206 rxu::maybe<T> value; 207 int count; 208 rxu::detail::maybe<double> stage; 209 }; 210 static seed_type seed() { 211 return seed_type{}; 212 } 213 template<class U> 214 seed_type operator()(seed_type a, U&& v) { 215 if (a.count != 0 && 216 (a.count == std::numeric_limits<int>::max() || 217 ((v > 0) && (*(a.value) > (std::numeric_limits<T>::max() - v))) || 218 ((v < 0) && (*(a.value) < (std::numeric_limits<T>::min() - v))))) { 219 // would overflow, calc existing and reset for next batch 220 // this will add error to the final result, but the alternative 221 // is to fail on overflow 222 double avg = static_cast<double>(*(a.value)) / a.count; 223 if (!a.stage.empty()) { 224 a.stage.reset((*a.stage + avg) / 2); 225 } else { 226 a.stage.reset(avg); 227 } 228 a.value.reset(std::forward<U>(v)); 229 a.count = 1; 230 } else if (a.value.empty()) { 231 a.value.reset(std::forward<U>(v)); 232 a.count = 1; 233 } else { 234 *(a.value) += v; 235 ++a.count; 236 } 237 return a; 238 } 239 double operator()(seed_type a) { 240 if (!a.value.empty()) { 241 double avg = static_cast<double>(*(a.value)) / a.count; 242 if (!a.stage.empty()) { 243 avg = (*a.stage + avg) / 2; 244 } 245 return avg; 246 } 247 rxu::throw_exception(rxcpp::empty_error("average() requires a stream with at least one value")); 248 } 249 }; 250 251 template<class T> 252 struct sum { 253 typedef rxu::maybe<T> seed_type; 254 static seed_type seed() { 255 return seed_type(); 256 } 257 template<class U> 258 seed_type operator()(seed_type a, U&& v) const { 259 if (a.empty()) 260 a.reset(std::forward<U>(v)); 261 else 262 *a = *a + v; 263 return a; 264 } 265 T operator()(seed_type a) const { 266 if (a.empty()) 267 rxu::throw_exception(rxcpp::empty_error("sum() requires a stream with at least one value")); 268 return *a; 269 } 270 }; 271 272 template<class T> 273 struct max { 274 typedef rxu::maybe<T> seed_type; 275 static seed_type seed() { 276 return seed_type(); 277 } 278 template<class U> 279 seed_type operator()(seed_type a, U&& v) { 280 if (a.empty() || *a < v) 281 a.reset(std::forward<U>(v)); 282 return a; 283 } 284 T operator()(seed_type a) { 285 if (a.empty()) 286 rxu::throw_exception(rxcpp::empty_error("max() requires a stream with at least one value")); 287 return *a; 288 } 289 }; 290 291 template<class T> 292 struct min { 293 typedef rxu::maybe<T> seed_type; 294 static seed_type seed() { 295 return seed_type(); 296 } 297 template<class U> 298 seed_type operator()(seed_type a, U&& v) { 299 if (a.empty() || v < *a) 300 a.reset(std::forward<U>(v)); 301 return a; 302 } 303 T operator()(seed_type a) { 304 if (a.empty()) 305 rxu::throw_exception(rxcpp::empty_error("min() requires a stream with at least one value")); 306 return *a; 307 } 308 }; 309 310 template<class T> 311 struct first { 312 using seed_type = rxu::maybe<T>; 313 static seed_type seed() { 314 return seed_type(); 315 } 316 template<class U> 317 seed_type operator()(seed_type a, U&& v) { 318 a.reset(std::forward<U>(v)); 319 return a; 320 } 321 T operator()(seed_type a) { 322 if (a.empty()) { 323 rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value")); 324 } 325 return *a; 326 } 327 }; 328 329 template<class T> 330 struct last { 331 using seed_type = rxu::maybe<T>; 332 static seed_type seed() { 333 return seed_type(); 334 } 335 template<class U> 336 seed_type operator()(seed_type a, U&& v) { 337 a.reset(std::forward<U>(v)); 338 return a; 339 } 340 T operator()(seed_type a) { 341 if (a.empty()) { 342 rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value")); 343 } 344 return *a; 345 } 346 }; 347 348 } 349 350 /*! @copydoc rx-reduce.hpp 351 */ 352 template<class... AN> 353 auto reduce(AN&&... an) 354 -> operator_factory<reduce_tag, AN...> { 355 return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 356 } 357 358 /*! @copydoc rx-reduce.hpp 359 */ 360 template<class... AN> 361 auto accumulate(AN&&... an) 362 -> operator_factory<reduce_tag, AN...> { 363 return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); 364 } 365 366 /*! \brief For each item from this observable reduce it by sending only the first item. 367 368 \return An observable that emits only the very first item emitted by the source observable. 369 370 \sample 371 \snippet math.cpp first sample 372 \snippet output.txt first sample 373 374 When the source observable calls on_error: 375 \snippet math.cpp first empty sample 376 \snippet output.txt first empty sample 377 */ 378 inline auto first() 379 -> operator_factory<first_tag> { 380 return operator_factory<first_tag>(std::tuple<>{}); 381 } 382 383 /*! \brief For each item from this observable reduce it by sending only the last item. 384 385 \return An observable that emits only the very last item emitted by the source observable. 386 387 \sample 388 \snippet math.cpp last sample 389 \snippet output.txt last sample 390 391 When the source observable calls on_error: 392 \snippet math.cpp last empty sample 393 \snippet output.txt last empty sample 394 */ 395 inline auto last() 396 -> operator_factory<last_tag> { 397 return operator_factory<last_tag>(std::tuple<>{}); 398 } 399 400 /*! \brief For each item from this observable reduce it by incrementing a count. 401 402 \return An observable that emits a single item: the number of elements emitted by the source observable. 403 404 \sample 405 \snippet math.cpp count sample 406 \snippet output.txt count sample 407 408 When the source observable calls on_error: 409 \snippet math.cpp count error sample 410 \snippet output.txt count error sample 411 */ 412 inline auto count() 413 -> operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>> { 414 return operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>>(std::make_tuple(0, rxu::count(), rxu::take_at<0>())); 415 } 416 417 /*! \brief For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end. 418 419 \return An observable that emits a single item: the average of elements emitted by the source observable. 420 421 \sample 422 \snippet math.cpp average sample 423 \snippet output.txt average sample 424 425 When the source observable completes without emitting any items: 426 \snippet math.cpp average empty sample 427 \snippet output.txt average empty sample 428 429 When the source observable calls on_error: 430 \snippet math.cpp average error sample 431 \snippet output.txt average error sample 432 */ 433 inline auto average() 434 -> operator_factory<average_tag> { 435 return operator_factory<average_tag>(std::tuple<>{}); 436 } 437 438 /*! \brief For each item from this observable reduce it by adding to the previous items. 439 440 \return An observable that emits a single item: the sum of elements emitted by the source observable. 441 442 \sample 443 \snippet math.cpp sum sample 444 \snippet output.txt sum sample 445 446 When the source observable completes without emitting any items: 447 \snippet math.cpp sum empty sample 448 \snippet output.txt sum empty sample 449 450 When the source observable calls on_error: 451 \snippet math.cpp sum error sample 452 \snippet output.txt sum error sample 453 */ 454 inline auto sum() 455 -> operator_factory<sum_tag> { 456 return operator_factory<sum_tag>(std::tuple<>{}); 457 } 458 459 /*! \brief For each item from this observable reduce it by taking the min value of the previous items. 460 461 \return An observable that emits a single item: the min of elements emitted by the source observable. 462 463 \sample 464 \snippet math.cpp min sample 465 \snippet output.txt min sample 466 467 When the source observable completes without emitting any items: 468 \snippet math.cpp min empty sample 469 \snippet output.txt min empty sample 470 471 When the source observable calls on_error: 472 \snippet math.cpp min error sample 473 \snippet output.txt min error sample 474 */ 475 inline auto min() 476 -> operator_factory<min_tag> { 477 return operator_factory<min_tag>(std::tuple<>{}); 478 } 479 480 /*! \brief For each item from this observable reduce it by taking the max value of the previous items. 481 482 \return An observable that emits a single item: the max of elements emitted by the source observable. 483 484 \sample 485 \snippet math.cpp max sample 486 \snippet output.txt max sample 487 488 When the source observable completes without emitting any items: 489 \snippet math.cpp max empty sample 490 \snippet output.txt max empty sample 491 492 When the source observable calls on_error: 493 \snippet math.cpp max error sample 494 \snippet output.txt max error sample 495 */ 496 inline auto max() 497 -> operator_factory<max_tag> { 498 return operator_factory<max_tag>(std::tuple<>{}); 499 } 500 501 } 502 503 template<> 504 struct member_overload<reduce_tag> 505 { 506 507 template<class Observable, class Seed, class Accumulator, class ResultSelector, 508 class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, 509 class Value = rxu::value_type_t<Reduce>, 510 class Result = observable<Value, Reduce>> 511 static Result member(Observable&& o, Seed&& s, Accumulator&& a, ResultSelector&& r) 512 { 513 return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), std::forward<ResultSelector>(r), std::forward<Seed>(s))); 514 } 515 516 template<class Observable, class Seed, class Accumulator, 517 class ResultSelector=rxu::detail::take_at<0>, 518 class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, 519 class Value = rxu::value_type_t<Reduce>, 520 class Result = observable<Value, Reduce>> 521 static Result member(Observable&& o, Seed&& s, Accumulator&& a) 522 { 523 return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), rxu::detail::take_at<0>(), std::forward<Seed>(s))); 524 } 525 526 template<class... AN> 527 static operators::detail::reduce_invalid_t<AN...> member(AN...) { 528 std::terminate(); 529 return {}; 530 static_assert(sizeof...(AN) == 10000, "reduce takes (Seed, Accumulator, optional ResultSelector), Accumulator takes (Seed, Observable::value_type) -> Seed, ResultSelector takes (Observable::value_type) -> ResultValue"); 531 } 532 }; 533 534 template<> 535 struct member_overload<first_tag> 536 { 537 template<class Observable, 538 class SValue = rxu::value_type_t<Observable>, 539 class Operation = operators::detail::first<SValue>, 540 class Seed = decltype(Operation::seed()), 541 class Accumulator = Operation, 542 class ResultSelector = Operation, 543 class TakeOne = decltype(((rxu::decay_t<Observable>*)nullptr)->take(1)), 544 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<TakeOne>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, 545 class RValue = rxu::value_type_t<Reduce>, 546 class Result = observable<RValue, Reduce>> 547 static Result member(Observable&& o) 548 { 549 return Result(Reduce(o.take(1), Operation{}, Operation{}, Operation::seed())); 550 } 551 552 template<class... AN> 553 static operators::detail::reduce_invalid_t<AN...> member(AN...) { 554 std::terminate(); 555 return {}; 556 static_assert(sizeof...(AN) == 10000, "first does not support Observable::value_type"); 557 } 558 }; 559 560 template<> 561 struct member_overload<last_tag> 562 { 563 template<class Observable, 564 class SValue = rxu::value_type_t<Observable>, 565 class Operation = operators::detail::last<SValue>, 566 class Seed = decltype(Operation::seed()), 567 class Accumulator = Operation, 568 class ResultSelector = Operation, 569 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, 570 class RValue = rxu::value_type_t<Reduce>, 571 class Result = observable<RValue, Reduce>> 572 static Result member(Observable&& o) 573 { 574 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed())); 575 } 576 577 template<class... AN> 578 static operators::detail::reduce_invalid_t<AN...> member(AN...) { 579 std::terminate(); 580 return {}; 581 static_assert(sizeof...(AN) == 10000, "last does not support Observable::value_type"); 582 } 583 }; 584 585 template<> 586 struct member_overload<sum_tag> 587 { 588 template<class Observable, 589 class SValue = rxu::value_type_t<Observable>, 590 class Operation = operators::detail::sum<SValue>, 591 class Seed = decltype(Operation::seed()), 592 class Accumulator = Operation, 593 class ResultSelector = Operation, 594 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, 595 class RValue = rxu::value_type_t<Reduce>, 596 class Result = observable<RValue, Reduce>> 597 static Result member(Observable&& o) 598 { 599 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed())); 600 } 601 602 template<class... AN> 603 static operators::detail::reduce_invalid_t<AN...> member(AN...) { 604 std::terminate(); 605 return {}; 606 static_assert(sizeof...(AN) == 10000, "sum does not support Observable::value_type"); 607 } 608 }; 609 610 template<> 611 struct member_overload<average_tag> 612 { 613 template<class Observable, 614 class SValue = rxu::value_type_t<Observable>, 615 class Operation = operators::detail::average<SValue>, 616 class Seed = decltype(Operation::seed()), 617 class Accumulator = Operation, 618 class ResultSelector = Operation, 619 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, 620 class RValue = rxu::value_type_t<Reduce>, 621 class Result = observable<RValue, Reduce>> 622 static Result member(Observable&& o) 623 { 624 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed())); 625 } 626 627 template<class... AN> 628 static operators::detail::reduce_invalid_t<AN...> member(AN...) { 629 std::terminate(); 630 return {}; 631 static_assert(sizeof...(AN) == 10000, "average does not support Observable::value_type"); 632 } 633 }; 634 635 template<> 636 struct member_overload<max_tag> 637 { 638 template<class Observable, 639 class SValue = rxu::value_type_t<Observable>, 640 class Operation = operators::detail::max<SValue>, 641 class Seed = decltype(Operation::seed()), 642 class Accumulator = Operation, 643 class ResultSelector = Operation, 644 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, 645 class RValue = rxu::value_type_t<Reduce>, 646 class Result = observable<RValue, Reduce>> 647 static Result member(Observable&& o) 648 { 649 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed())); 650 } 651 652 template<class... AN> 653 static operators::detail::reduce_invalid_t<AN...> member(AN...) { 654 std::terminate(); 655 return {}; 656 static_assert(sizeof...(AN) == 10000, "max does not support Observable::value_type"); 657 } 658 }; 659 660 template<> 661 struct member_overload<min_tag> 662 { 663 template<class Observable, 664 class SValue = rxu::value_type_t<Observable>, 665 class Operation = operators::detail::min<SValue>, 666 class Seed = decltype(Operation::seed()), 667 class Accumulator = Operation, 668 class ResultSelector = Operation, 669 class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, 670 class RValue = rxu::value_type_t<Reduce>, 671 class Result = observable<RValue, Reduce>> 672 static Result member(Observable&& o) 673 { 674 return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed())); 675 } 676 677 template<class... AN> 678 static operators::detail::reduce_invalid_t<AN...> member(AN...) { 679 std::terminate(); 680 return {}; 681 static_assert(sizeof...(AN) == 10000, "min does not support Observable::value_type"); 682 } 683 }; 684 685 } 686 687 #endif 688