Home | History | Annotate | Download | only in rxcpp
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_SUBSCRIPTION_HPP)
      6 #define RXCPP_RX_SUBSCRIPTION_HPP
      7 
      8 #include "rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace detail {
     13 
     14 template<class F>
     15 struct is_unsubscribe_function
     16 {
     17     struct not_void {};
     18     template<class CF>
     19     static auto check(int) -> decltype((*(CF*)nullptr)());
     20     template<class CF>
     21     static not_void check(...);
     22 
     23     static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
     24 };
     25 
     26 }
     27 
     28 struct tag_subscription {};
     29 struct subscription_base {typedef tag_subscription subscription_tag;};
     30 template<class T>
     31 class is_subscription
     32 {
     33     template<class C>
     34     static typename C::subscription_tag* check(int);
     35     template<class C>
     36     static void check(...);
     37 public:
     38     static const bool value = std::is_convertible<decltype(check<rxu::decay_t<T>>(0)), tag_subscription*>::value;
     39 };
     40 
     41 template<class Unsubscribe>
     42 class static_subscription
     43 {
     44     typedef rxu::decay_t<Unsubscribe> unsubscribe_call_type;
     45     unsubscribe_call_type unsubscribe_call;
     46     static_subscription()
     47     {
     48     }
     49 public:
     50     static_subscription(const static_subscription& o)
     51         : unsubscribe_call(o.unsubscribe_call)
     52     {
     53     }
     54     static_subscription(static_subscription&& o)
     55         : unsubscribe_call(std::move(o.unsubscribe_call))
     56     {
     57     }
     58     static_subscription(unsubscribe_call_type s)
     59         : unsubscribe_call(std::move(s))
     60     {
     61     }
     62     void unsubscribe() const {
     63         unsubscribe_call();
     64     }
     65 };
     66 
     67 class subscription : public subscription_base
     68 {
     69     class base_subscription_state : public std::enable_shared_from_this<base_subscription_state>
     70     {
     71         base_subscription_state();
     72     public:
     73 
     74         explicit base_subscription_state(bool initial)
     75             : issubscribed(initial)
     76         {
     77         }
     78         virtual ~base_subscription_state() {}
     79         virtual void unsubscribe() {
     80         }
     81         std::atomic<bool> issubscribed;
     82     };
     83 public:
     84     typedef std::weak_ptr<base_subscription_state> weak_state_type;
     85 
     86 private:
     87     template<class I>
     88     struct subscription_state : public base_subscription_state
     89     {
     90         typedef rxu::decay_t<I> inner_t;
     91         subscription_state(inner_t i)
     92             : base_subscription_state(true)
     93             , inner(std::move(i))
     94         {
     95         }
     96         virtual void unsubscribe() {
     97             if (issubscribed.exchange(false)) {
     98                 trace_activity().unsubscribe_enter(*this);
     99                 inner.unsubscribe();
    100                 trace_activity().unsubscribe_return(*this);
    101             }
    102         }
    103         inner_t inner;
    104     };
    105 
    106 protected:
    107     std::shared_ptr<base_subscription_state> state;
    108 
    109     friend bool operator<(const subscription&, const subscription&);
    110     friend bool operator==(const subscription&, const subscription&);
    111 
    112 private:
    113     subscription(weak_state_type w)
    114         : state(w.lock())
    115     {
    116         if (!state) {
    117             std::terminate();
    118         }
    119     }
    120 
    121     explicit subscription(std::shared_ptr<base_subscription_state> s)
    122         : state(std::move(s))
    123     {
    124         if (!state) {
    125             std::terminate();
    126         }
    127     }
    128 public:
    129 
    130     subscription()
    131         : state(std::make_shared<base_subscription_state>(false))
    132     {
    133         if (!state) {
    134             std::terminate();
    135         }
    136     }
    137     template<class U>
    138     explicit subscription(U u, typename std::enable_if<!is_subscription<U>::value, void**>::type = nullptr)
    139         : state(std::make_shared<subscription_state<U>>(std::move(u)))
    140     {
    141         if (!state) {
    142             std::terminate();
    143         }
    144     }
    145     template<class U>
    146     explicit subscription(U u, typename std::enable_if<!std::is_same<subscription, U>::value && is_subscription<U>::value, void**>::type = nullptr)
    147         // intentionally slice
    148         : state(std::move((*static_cast<subscription*>(&u)).state))
    149     {
    150         if (!state) {
    151             std::terminate();
    152         }
    153     }
    154     subscription(const subscription& o)
    155         : state(o.state)
    156     {
    157         if (!state) {
    158             std::terminate();
    159         }
    160     }
    161     subscription(subscription&& o)
    162         : state(std::move(o.state))
    163     {
    164         if (!state) {
    165             std::terminate();
    166         }
    167     }
    168     subscription& operator=(subscription o) {
    169         state = std::move(o.state);
    170         return *this;
    171     }
    172     bool is_subscribed() const {
    173         if (!state) {
    174             std::terminate();
    175         }
    176         return state->issubscribed;
    177     }
    178     void unsubscribe() const {
    179         if (!state) {
    180             std::terminate();
    181         }
    182         auto keepAlive = state;
    183         state->unsubscribe();
    184     }
    185 
    186     weak_state_type get_weak() {
    187         return state;
    188     }
    189 
    190     // Atomically promote weak subscription to strong.
    191     // Calls std::terminate if w has already expired.
    192     static subscription lock(weak_state_type w) {
    193         return subscription(w);
    194     }
    195 
    196     // Atomically try to promote weak subscription to strong.
    197     // Returns an empty maybe<> if w has already expired.
    198     static rxu::maybe<subscription> maybe_lock(weak_state_type w) {
    199         auto strong_subscription = w.lock();
    200         if (!strong_subscription) {
    201             return rxu::detail::maybe<subscription>{};
    202         } else {
    203             return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}};
    204         }
    205     }
    206 };
    207 
    208 inline bool operator<(const subscription& lhs, const subscription& rhs) {
    209     return lhs.state < rhs.state;
    210 }
    211 inline bool operator==(const subscription& lhs, const subscription& rhs) {
    212     return lhs.state == rhs.state;
    213 }
    214 inline bool operator!=(const subscription& lhs, const subscription& rhs) {
    215     return !(lhs == rhs);
    216 }
    217 
    218 
    219 inline auto make_subscription()
    220     ->      subscription {
    221     return  subscription();
    222 }
    223 template<class I>
    224 auto make_subscription(I&& i)
    225     -> typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value,
    226             subscription>::type {
    227     return  subscription(std::forward<I>(i));
    228 }
    229 template<class Unsubscribe>
    230 auto make_subscription(Unsubscribe&& u)
    231     -> typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value,
    232             subscription>::type {
    233     return  subscription(static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u)));
    234 }
    235 
    236 class composite_subscription;
    237 
    238 namespace detail {
    239 
    240 struct tag_composite_subscription_empty {};
    241 
    242 class composite_subscription_inner
    243 {
    244 private:
    245     typedef subscription::weak_state_type weak_subscription;
    246     struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
    247     {
    248         // invariant: cannot access this data without the lock held.
    249         std::set<subscription> subscriptions;
    250         // double checked locking:
    251         //    issubscribed must be loaded again after each lock acquisition.
    252         // invariant:
    253         //    never call subscription::unsubscribe with lock held.
    254         std::mutex lock;
    255         // invariant: transitions from 'true' to 'false' exactly once, at any time.
    256         std::atomic<bool> issubscribed;
    257 
    258         ~composite_subscription_state()
    259         {
    260             std::unique_lock<decltype(lock)> guard(lock);
    261             subscriptions.clear();
    262         }
    263 
    264         composite_subscription_state()
    265             : issubscribed(true)
    266         {
    267         }
    268         composite_subscription_state(tag_composite_subscription_empty)
    269             : issubscribed(false)
    270         {
    271         }
    272 
    273         // Atomically add 's' to the set of subscriptions.
    274         //
    275         // If unsubscribe() has already occurred, this immediately
    276         // calls s.unsubscribe().
    277         //
    278         // cs.unsubscribe() [must] happens-before s.unsubscribe()
    279         //
    280         // Due to the un-atomic nature of calling 's.unsubscribe()',
    281         // it is possible to observe the unintuitive
    282         // add(s)=>s.unsubscribe() prior
    283         // to any of the unsubscribe()=>sN.unsubscribe().
    284         inline weak_subscription add(subscription s) {
    285             if (!issubscribed) {  // load.acq [seq_cst]
    286                 s.unsubscribe();
    287             } else if (s.is_subscribed()) {
    288                 std::unique_lock<decltype(lock)> guard(lock);
    289                 if (!issubscribed) {  // load.acq [seq_cst]
    290                     // unsubscribe was called concurrently.
    291                     guard.unlock();
    292                     // invariant: do not call unsubscribe with lock held.
    293                     s.unsubscribe();
    294                 } else {
    295                     subscriptions.insert(s);
    296                 }
    297             }
    298             return s.get_weak();
    299         }
    300 
    301         // Atomically remove 'w' from the set of subscriptions.
    302         //
    303         // This does nothing if 'w' was already previously removed,
    304         // or refers to an expired value.
    305         inline void remove(weak_subscription w) {
    306             if (issubscribed) { // load.acq [seq_cst]
    307                 rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w);
    308 
    309                 if (maybe_subscription.empty()) {
    310                   // Do nothing if the subscription has already expired.
    311                   return;
    312                 }
    313 
    314                 std::unique_lock<decltype(lock)> guard(lock);
    315                 // invariant: subscriptions must be accessed under the lock.
    316 
    317                 if (issubscribed) { // load.acq [seq_cst]
    318                   subscription& s = maybe_subscription.get();
    319                   subscriptions.erase(std::move(s));
    320                 } // else unsubscribe() was called concurrently; this becomes a no-op.
    321             }
    322         }
    323 
    324         // Atomically clear all subscriptions that were observably added
    325         // (and not subsequently observably removed).
    326         //
    327         // Un-atomically call unsubscribe on those subscriptions.
    328         //
    329         // forall subscriptions in {add(s1),add(s2),...}
    330         //                         - {remove(s3), remove(s4), ...}:
    331         //   cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
    332         //
    333         // cs.unsubscribe() observed-before cs.clear ==> do nothing.
    334         inline void clear() {
    335             if (issubscribed) { // load.acq [seq_cst]
    336                 std::unique_lock<decltype(lock)> guard(lock);
    337 
    338                 if (!issubscribed) { // load.acq [seq_cst]
    339                   // unsubscribe was called concurrently.
    340                   return;
    341                 }
    342 
    343                 std::set<subscription> v(std::move(subscriptions));
    344                 // invariant: do not call unsubscribe with lock held.
    345                 guard.unlock();
    346                 std::for_each(v.begin(), v.end(),
    347                               [](const subscription& s) {
    348                                 s.unsubscribe(); });
    349             }
    350         }
    351 
    352         // Atomically clear all subscriptions that were observably added
    353         // (and not subsequently observably removed).
    354         //
    355         // Un-atomically call unsubscribe on those subscriptions.
    356         //
    357         // Switches to an 'unsubscribed' state, all subsequent
    358         // adds are immediately unsubscribed.
    359         //
    360         // cs.unsubscribe() [must] happens-before
    361         //     cs.add(s) ==> s.unsubscribe()
    362         //
    363         // forall subscriptions in {add(s1),add(s2),...}
    364         //                         - {remove(s3), remove(s4), ...}:
    365         //   cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
    366         inline void unsubscribe() {
    367             if (issubscribed.exchange(false)) {  // cas.acq_rel [seq_cst]
    368                 std::unique_lock<decltype(lock)> guard(lock);
    369 
    370                 // is_subscribed can only transition to 'false' once,
    371                 // does not need an extra atomic access here.
    372 
    373                 std::set<subscription> v(std::move(subscriptions));
    374                 // invariant: do not call unsubscribe with lock held.
    375                 guard.unlock();
    376                 std::for_each(v.begin(), v.end(),
    377                               [](const subscription& s) {
    378                                 s.unsubscribe(); });
    379             }
    380         }
    381     };
    382 
    383 public:
    384     typedef std::shared_ptr<composite_subscription_state> shared_state_type;
    385 
    386 protected:
    387     mutable shared_state_type state;
    388 
    389 public:
    390     composite_subscription_inner()
    391         : state(std::make_shared<composite_subscription_state>())
    392     {
    393     }
    394     composite_subscription_inner(tag_composite_subscription_empty et)
    395         : state(std::make_shared<composite_subscription_state>(et))
    396     {
    397     }
    398 
    399     composite_subscription_inner(const composite_subscription_inner& o)
    400         : state(o.state)
    401     {
    402         if (!state) {
    403             std::terminate();
    404         }
    405     }
    406     composite_subscription_inner(composite_subscription_inner&& o)
    407         : state(std::move(o.state))
    408     {
    409         if (!state) {
    410             std::terminate();
    411         }
    412     }
    413 
    414     composite_subscription_inner& operator=(composite_subscription_inner o)
    415     {
    416         state = std::move(o.state);
    417         if (!state) {
    418             std::terminate();
    419         }
    420         return *this;
    421     }
    422 
    423     inline weak_subscription add(subscription s) const {
    424         if (!state) {
    425             std::terminate();
    426         }
    427         return state->add(std::move(s));
    428     }
    429     inline void remove(weak_subscription w) const {
    430         if (!state) {
    431             std::terminate();
    432         }
    433         state->remove(std::move(w));
    434     }
    435     inline void clear() const {
    436         if (!state) {
    437             std::terminate();
    438         }
    439         state->clear();
    440     }
    441     inline void unsubscribe() {
    442         if (!state) {
    443             std::terminate();
    444         }
    445         state->unsubscribe();
    446     }
    447 };
    448 
    449 inline composite_subscription shared_empty();
    450 
    451 }
    452 
    453 /*!
    454     \brief controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
    455 
    456     \ingroup group-core
    457 
    458 */
    459 class composite_subscription
    460     : protected detail::composite_subscription_inner
    461     , public subscription
    462 {
    463     typedef detail::composite_subscription_inner inner_type;
    464 public:
    465     typedef subscription::weak_state_type weak_subscription;
    466 
    467     composite_subscription(detail::tag_composite_subscription_empty et)
    468         : inner_type(et)
    469         , subscription() // use empty base
    470     {
    471     }
    472 
    473 public:
    474 
    475     composite_subscription()
    476         : inner_type()
    477         , subscription(*static_cast<const inner_type*>(this))
    478     {
    479     }
    480 
    481     composite_subscription(const composite_subscription& o)
    482         : inner_type(o)
    483         , subscription(static_cast<const subscription&>(o))
    484     {
    485     }
    486     composite_subscription(composite_subscription&& o)
    487         : inner_type(std::move(o))
    488         , subscription(std::move(static_cast<subscription&>(o)))
    489     {
    490     }
    491 
    492     composite_subscription& operator=(composite_subscription o)
    493     {
    494         inner_type::operator=(std::move(o));
    495         subscription::operator=(std::move(*static_cast<subscription*>(&o)));
    496         return *this;
    497     }
    498 
    499     static inline composite_subscription empty() {
    500         return detail::shared_empty();
    501     }
    502 
    503     using subscription::is_subscribed;
    504     using subscription::unsubscribe;
    505 
    506     using inner_type::clear;
    507 
    508     inline weak_subscription add(subscription s) const {
    509         if (s == static_cast<const subscription&>(*this)) {
    510             // do not nest the same subscription
    511             std::terminate();
    512             //return s.get_weak();
    513         }
    514         auto that = this->subscription::state.get();
    515         trace_activity().subscription_add_enter(*that, s);
    516         auto w = inner_type::add(std::move(s));
    517         trace_activity().subscription_add_return(*that);
    518         return w;
    519     }
    520 
    521     template<class F>
    522     auto add(F f) const
    523     -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
    524         return add(make_subscription(std::move(f)));
    525     }
    526 
    527     inline void remove(weak_subscription w) const {
    528         auto that = this->subscription::state.get();
    529         trace_activity().subscription_remove_enter(*that, w);
    530         inner_type::remove(w);
    531         trace_activity().subscription_remove_return(*that);
    532     }
    533 };
    534 
    535 inline bool operator<(const composite_subscription& lhs, const composite_subscription& rhs) {
    536     return static_cast<const subscription&>(lhs) < static_cast<const subscription&>(rhs);
    537 }
    538 inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) {
    539     return static_cast<const subscription&>(lhs) == static_cast<const subscription&>(rhs);
    540 }
    541 inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) {
    542     return !(lhs == rhs);
    543 }
    544 
    545 namespace detail {
    546 
    547 inline composite_subscription shared_empty() {
    548     static composite_subscription shared_empty = composite_subscription(tag_composite_subscription_empty());
    549     return shared_empty;
    550 }
    551 
    552 }
    553 
    554 template<class T>
    555 class resource : public subscription_base
    556 {
    557 public:
    558     typedef typename composite_subscription::weak_subscription weak_subscription;
    559 
    560     resource()
    561         : lifetime(composite_subscription())
    562         , value(std::make_shared<rxu::detail::maybe<T>>())
    563     {
    564     }
    565 
    566     explicit resource(T t, composite_subscription cs = composite_subscription())
    567         : lifetime(std::move(cs))
    568         , value(std::make_shared<rxu::detail::maybe<T>>(rxu::detail::maybe<T>(std::move(t))))
    569     {
    570         auto localValue = value;
    571         lifetime.add(
    572             [localValue](){
    573                 localValue->reset();
    574             }
    575         );
    576     }
    577 
    578     T& get() {
    579         return value.get()->get();
    580     }
    581     composite_subscription& get_subscription() {
    582         return lifetime;
    583     }
    584 
    585     bool is_subscribed() const {
    586         return lifetime.is_subscribed();
    587     }
    588     weak_subscription add(subscription s) const {
    589         return lifetime.add(std::move(s));
    590     }
    591     template<class F>
    592     auto add(F f) const
    593     -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
    594         return lifetime.add(make_subscription(std::move(f)));
    595     }
    596     void remove(weak_subscription w) const {
    597         return lifetime.remove(std::move(w));
    598     }
    599     void clear() const {
    600         return lifetime.clear();
    601     }
    602     void unsubscribe() const {
    603         return lifetime.unsubscribe();
    604     }
    605 
    606 protected:
    607     composite_subscription lifetime;
    608     std::shared_ptr<rxu::detail::maybe<T>> value;
    609 };
    610 
    611 }
    612 
    613 #endif
    614