Home | History | Annotate | Download | only in rxcpp
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP)
      6 #define RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP
      7 
      8 #include "rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace detail {
     13 
     14 template<class T>
     15 struct has_on_connect
     16 {
     17     struct not_void {};
     18     template<class CT>
     19     static auto check(int) -> decltype((*(CT*)nullptr).on_connect(composite_subscription()));
     20     template<class CT>
     21     static not_void check(...);
     22 
     23     typedef decltype(check<T>(0)) detail_result;
     24     static const bool value = std::is_same<detail_result, void>::value;
     25 };
     26 
     27 }
     28 
     29 template<class T>
     30 class dynamic_connectable_observable
     31     : public dynamic_observable<T>
     32 {
     33     struct state_type
     34         : public std::enable_shared_from_this<state_type>
     35     {
     36         typedef std::function<void(composite_subscription)> onconnect_type;
     37 
     38         onconnect_type on_connect;
     39     };
     40     std::shared_ptr<state_type> state;
     41 
     42     template<class U>
     43     void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) {
     44         state = o.state;
     45     }
     46 
     47     template<class U>
     48     void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) {
     49         state = std::move(o.state);
     50     }
     51 
     52     template<class SO>
     53     void construct(SO&& source, rxs::tag_source&&) {
     54         auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source));
     55         state->on_connect = [so](composite_subscription cs) mutable {
     56             so->on_connect(std::move(cs));
     57         };
     58     }
     59 
     60 public:
     61 
     62     typedef tag_dynamic_observable dynamic_observable_tag;
     63 
     64     dynamic_connectable_observable()
     65     {
     66     }
     67 
     68     template<class SOF>
     69     explicit dynamic_connectable_observable(SOF sof)
     70         : dynamic_observable<T>(sof)
     71         , state(std::make_shared<state_type>())
     72     {
     73         construct(std::move(sof),
     74                   typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type());
     75     }
     76 
     77     template<class SF, class CF>
     78     dynamic_connectable_observable(SF&& sf, CF&& cf)
     79         : dynamic_observable<T>(std::forward<SF>(sf))
     80         , state(std::make_shared<state_type>())
     81     {
     82         state->on_connect = std::forward<CF>(cf);
     83     }
     84 
     85     using dynamic_observable<T>::on_subscribe;
     86 
     87     void on_connect(composite_subscription cs) const {
     88         state->on_connect(std::move(cs));
     89     }
     90 };
     91 
     92 template<class T, class Source>
     93 connectable_observable<T> make_dynamic_connectable_observable(Source&& s) {
     94     return connectable_observable<T>(dynamic_connectable_observable<T>(std::forward<Source>(s)));
     95 }
     96 
     97 
     98 /*!
     99     \brief a source of values that is shared across all subscribers and does not start until connectable_observable::connect() is called.
    100 
    101     \ingroup group-observable
    102 
    103 */
    104 template<class T, class SourceOperator>
    105 class connectable_observable
    106     : public observable<T, SourceOperator>
    107 {
    108     typedef connectable_observable<T, SourceOperator> this_type;
    109     typedef observable<T, SourceOperator> base_type;
    110     typedef rxu::decay_t<SourceOperator> source_operator_type;
    111 
    112     static_assert(detail::has_on_connect<source_operator_type>::value, "inner must have on_connect method void(composite_subscription)");
    113 
    114 public:
    115     typedef tag_connectable_observable observable_tag;
    116 
    117     connectable_observable()
    118     {
    119     }
    120 
    121     explicit connectable_observable(const SourceOperator& o)
    122         : base_type(o)
    123     {
    124     }
    125     explicit connectable_observable(SourceOperator&& o)
    126         : base_type(std::move(o))
    127     {
    128     }
    129 
    130     // implicit conversion between observables of the same value_type
    131     template<class SO>
    132     connectable_observable(const connectable_observable<T, SO>& o)
    133         : base_type(o)
    134     {}
    135     // implicit conversion between observables of the same value_type
    136     template<class SO>
    137     connectable_observable(connectable_observable<T, SO>&& o)
    138         : base_type(std::move(o))
    139     {}
    140 
    141     ///
    142     /// takes any function that will take this observable and produce a result value.
    143     /// this is intended to allow externally defined operators, that use subscribe,
    144     /// to be connected into the expression.
    145     ///
    146     template<class OperatorFactory>
    147     auto op(OperatorFactory&& of) const
    148         -> decltype(of(*(const this_type*)nullptr)) {
    149         return      of(*this);
    150         static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
    151     }
    152 
    153     ///
    154     /// performs type-forgetting conversion to a new composite_observable
    155     ///
    156     connectable_observable<T> as_dynamic() {
    157         return *this;
    158     }
    159 
    160     composite_subscription connect(composite_subscription cs = composite_subscription()) {
    161         base_type::source_operator.on_connect(cs);
    162         return cs;
    163     }
    164 
    165     /*! @copydoc rx-ref_count.hpp
    166      */
    167     template<class... AN>
    168     auto ref_count(AN... an) const
    169         /// \cond SHOW_SERVICE_MEMBERS
    170         -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
    171         /// \endcond
    172     {
    173         return      observable_member(ref_count_tag{},                *this, std::forward<AN>(an)...);
    174     }
    175 
    176     /*! @copydoc rx-connect_forever.hpp
    177      */
    178     template<class... AN>
    179     auto connect_forever(AN... an) const
    180         /// \cond SHOW_SERVICE_MEMBERS
    181         -> decltype(observable_member(connect_forever_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
    182         /// \endcond
    183     {
    184         return      observable_member(connect_forever_tag{},                *this, std::forward<AN>(an)...);
    185     }
    186 };
    187 
    188 
    189 }
    190 
    191 //
    192 // support range() >> filter() >> subscribe() syntax
    193 // '>>' is spelled 'stream'
    194 //
    195 template<class T, class SourceOperator, class OperatorFactory>
    196 auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
    197     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
    198     return      source.op(std::forward<OperatorFactory>(of));
    199 }
    200 
    201 //
    202 // support range() | filter() | subscribe() syntax
    203 // '|' is spelled 'pipe'
    204 //
    205 template<class T, class SourceOperator, class OperatorFactory>
    206 auto operator | (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
    207     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
    208     return      source.op(std::forward<OperatorFactory>(of));
    209 }
    210 
    211 #endif
    212