Home | History | Annotate | Download | only in operators
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 /*! \file rx-ref_count.hpp
      6 
      7     \brief  Make some \c connectable_observable behave like an ordinary \c observable.
      8             Uses a reference count of the subscribers to control the connection to the published observable.
      9 
     10             The first subscription will cause a call to \c connect(), and the last \c unsubscribe will unsubscribe the connection.
     11 
     12             There are 2 variants of the operator:
     13             \li \c ref_count(): calls \c connect on the \c source \c connectable_observable.
     14             \li \c ref_count(other): calls \c connect on the \c other \c connectable_observable.
     15 
     16     \tparam ConnectableObservable the type of the \c other \c connectable_observable (optional)
     17     \param  other \c connectable_observable to call \c connect on (optional)
     18 
     19     If \c other is omitted, then \c source is used instead (which must be a \c connectable_observable).
     20     Otherwise, \c source can be a regular \c observable.
     21 
     22     \return An \c observable that emits the items from its \c source.
     23 
     24     \sample
     25     \snippet ref_count.cpp ref_count other diamond sample
     26     \snippet output.txt ref_count other diamond sample
     27  */
     28 
     29 #if !defined(RXCPP_OPERATORS_RX_REF_COUNT_HPP)
     30 #define RXCPP_OPERATORS_RX_REF_COUNT_HPP
     31 
     32 #include "../rx-includes.hpp"
     33 
     34 namespace rxcpp {
     35 
     36 namespace operators {
     37 
     38 namespace detail {
     39 
     40 template<class... AN>
     41 struct ref_count_invalid_arguments {};
     42 
     43 template<class... AN>
     44 struct ref_count_invalid : public rxo::operator_base<ref_count_invalid_arguments<AN...>> {
     45     using type = observable<ref_count_invalid_arguments<AN...>, ref_count_invalid<AN...>>;
     46 };
     47 template<class... AN>
     48 using ref_count_invalid_t = typename ref_count_invalid<AN...>::type;
     49 
     50 // ref_count(other) takes a regular observable source, not a connectable_observable.
     51 // use template specialization to avoid instantiating 'subscribe' for two different types
     52 // which would cause a compilation error.
     53 template <typename connectable_type, typename observable_type>
     54 struct ref_count_state_base {
     55     ref_count_state_base(connectable_type other, observable_type source)
     56         : connectable(std::move(other))
     57         , subscribable(std::move(source)) {}
     58 
     59     connectable_type connectable; // connects to this. subscribes to this if subscribable empty.
     60     observable_type subscribable; // subscribes to this if non-empty.
     61 
     62     template <typename Subscriber>
     63     void subscribe(Subscriber&& o) {
     64         subscribable.subscribe(std::forward<Subscriber>(o));
     65     }
     66 };
     67 
     68 // Note: explicit specializations have to be at namespace scope prior to C++17.
     69 template <typename connectable_type>
     70 struct ref_count_state_base<connectable_type, void> {
     71     explicit ref_count_state_base(connectable_type c)
     72         : connectable(std::move(c)) {}
     73 
     74     connectable_type connectable; // connects to this. subscribes to this if subscribable empty.
     75 
     76     template <typename Subscriber>
     77     void subscribe(Subscriber&& o) {
     78         connectable.subscribe(std::forward<Subscriber>(o));
     79     }
     80 };
     81 
     82 template<class T,
     83          class ConnectableObservable,
     84          class Observable = void> // note: type order flipped versus the operator.
     85 struct ref_count : public operator_base<T>
     86 {
     87     typedef rxu::decay_t<Observable> observable_type;
     88     typedef rxu::decay_t<ConnectableObservable> connectable_type;
     89 
     90     // ref_count() == false
     91     // ref_count(other) == true
     92     using has_observable_t = rxu::negation<std::is_same<void, Observable>>;
     93     static constexpr bool has_observable_v = has_observable_t::value;
     94 
     95     struct ref_count_state : public std::enable_shared_from_this<ref_count_state>,
     96                              public ref_count_state_base<ConnectableObservable, Observable>
     97     {
     98         template <class HasObservable = has_observable_t,
     99                   class Enabled = rxu::enable_if_all_true_type_t<
    100                       rxu::negation<HasObservable>>>
    101         explicit ref_count_state(connectable_type source)
    102             : ref_count_state_base<ConnectableObservable, Observable>(std::move(source))
    103             , subscribers(0)
    104         {
    105         }
    106 
    107         template <bool HasObservableV = has_observable_v>
    108         ref_count_state(connectable_type other,
    109                         typename std::enable_if<HasObservableV, observable_type>::type source)
    110             : ref_count_state_base<ConnectableObservable, Observable>(std::move(other),
    111                                                                       std::move(source))
    112             , subscribers(0)
    113         {
    114         }
    115 
    116         std::mutex lock;
    117         long subscribers;
    118         composite_subscription connection;
    119     };
    120     std::shared_ptr<ref_count_state> state;
    121 
    122     // connectable_observable<T> source = ...;
    123     // source.ref_count();
    124     //
    125     // calls connect on source after the subscribe on source.
    126     template <class HasObservable = has_observable_t,
    127               class Enabled = rxu::enable_if_all_true_type_t<
    128                   rxu::negation<HasObservable>>>
    129     explicit ref_count(connectable_type source)
    130         : state(std::make_shared<ref_count_state>(std::move(source)))
    131     {
    132     }
    133 
    134     // connectable_observable<?> other = ...;
    135     // observable<T> source = ...;
    136     // source.ref_count(other);
    137     //
    138     // calls connect on 'other' after the subscribe on 'source'.
    139     template <bool HasObservableV = has_observable_v>
    140     ref_count(connectable_type other,
    141               typename std::enable_if<HasObservableV, observable_type>::type source)
    142         : state(std::make_shared<ref_count_state>(std::move(other), std::move(source)))
    143     {
    144     }
    145 
    146     template<class Subscriber>
    147     void on_subscribe(Subscriber&& o) const {
    148         std::unique_lock<std::mutex> guard(state->lock);
    149         auto needConnect = ++state->subscribers == 1;
    150         auto keepAlive = state;
    151         guard.unlock();
    152         o.add(
    153             [keepAlive](){
    154                 std::unique_lock<std::mutex> guard_unsubscribe(keepAlive->lock);
    155                 if (--keepAlive->subscribers == 0) {
    156                     keepAlive->connection.unsubscribe();
    157                     keepAlive->connection = composite_subscription();
    158                 }
    159             });
    160         keepAlive->subscribe(std::forward<Subscriber>(o));
    161         if (needConnect) {
    162             keepAlive->connectable.connect(keepAlive->connection);
    163         }
    164     }
    165 };
    166 
    167 }
    168 
    169 /*! @copydoc rx-ref_count.hpp
    170 */
    171 template<class... AN>
    172 auto ref_count(AN&&... an)
    173     ->     operator_factory<ref_count_tag, AN...> {
    174     return operator_factory<ref_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
    175 }
    176 
    177 }
    178 
    179 template<>
    180 struct member_overload<ref_count_tag>
    181 {
    182     template<class ConnectableObservable,
    183         class Enabled = rxu::enable_if_all_true_type_t<
    184             is_connectable_observable<ConnectableObservable>>,
    185         class SourceValue = rxu::value_type_t<ConnectableObservable>,
    186         class RefCount = rxo::detail::ref_count<SourceValue, rxu::decay_t<ConnectableObservable>>,
    187         class Value = rxu::value_type_t<RefCount>,
    188         class Result = observable<Value, RefCount>
    189         >
    190     static Result member(ConnectableObservable&& o) {
    191         return Result(RefCount(std::forward<ConnectableObservable>(o)));
    192     }
    193 
    194     template<class Observable,
    195         class ConnectableObservable,
    196         class Enabled = rxu::enable_if_all_true_type_t<
    197             is_observable<Observable>,
    198             is_connectable_observable<ConnectableObservable>>,
    199         class SourceValue = rxu::value_type_t<Observable>,
    200         class RefCount = rxo::detail::ref_count<SourceValue,
    201             rxu::decay_t<ConnectableObservable>,
    202             rxu::decay_t<Observable>>,
    203         class Value = rxu::value_type_t<RefCount>,
    204         class Result = observable<Value, RefCount>
    205         >
    206     static Result member(Observable&& o, ConnectableObservable&& other) {
    207         return Result(RefCount(std::forward<ConnectableObservable>(other),
    208                                std::forward<Observable>(o)));
    209     }
    210 
    211     template<class... AN>
    212     static operators::detail::ref_count_invalid_t<AN...> member(AN...) {
    213         std::terminate();
    214         return {};
    215         static_assert(sizeof...(AN) == 10000, "ref_count takes (optional ConnectableObservable)");
    216     }
    217 };
    218 
    219 }
    220 
    221 #endif
    222