Home | History | Annotate | Download | only in subjects
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_BEHAVIOR_HPP)
      6 #define RXCPP_RX_BEHAVIOR_HPP
      7 
      8 #include "../rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace subjects {
     13 
     14 namespace detail {
     15 
     16 template<class T>
     17 class behavior_observer : public detail::multicast_observer<T>
     18 {
     19     typedef behavior_observer<T> this_type;
     20     typedef detail::multicast_observer<T> base_type;
     21 
     22     class behavior_observer_state : public std::enable_shared_from_this<behavior_observer_state>
     23     {
     24         mutable std::mutex lock;
     25         mutable T value;
     26 
     27     public:
     28         behavior_observer_state(T first)
     29             : value(first)
     30         {
     31         }
     32 
     33         void reset(T v) const {
     34             std::unique_lock<std::mutex> guard(lock);
     35             value = std::move(v);
     36         }
     37         T get() const {
     38             std::unique_lock<std::mutex> guard(lock);
     39             return value;
     40         }
     41     };
     42 
     43     std::shared_ptr<behavior_observer_state> state;
     44 
     45 public:
     46     behavior_observer(T f, composite_subscription l)
     47         : base_type(l)
     48         , state(std::make_shared<behavior_observer_state>(std::move(f)))
     49     {
     50     }
     51 
     52     subscriber<T> get_subscriber() const {
     53         return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::behavior_observer<T>>(*this)).as_dynamic();
     54     }
     55 
     56     T get_value() const {
     57         return state->get();
     58     }
     59 
     60     template<class V>
     61     void on_next(V v) const {
     62         state->reset(v);
     63         base_type::on_next(std::move(v));
     64     }
     65 };
     66 
     67 }
     68 
     69 template<class T>
     70 class behavior
     71 {
     72     detail::behavior_observer<T> s;
     73 
     74 public:
     75     explicit behavior(T f, composite_subscription cs = composite_subscription())
     76         : s(std::move(f), cs)
     77     {
     78     }
     79 
     80     bool has_observers() const {
     81         return s.has_observers();
     82     }
     83 
     84     T get_value() const {
     85         return s.get_value();
     86     }
     87 
     88     subscriber<T> get_subscriber() const {
     89         return s.get_subscriber();
     90     }
     91 
     92     observable<T> get_observable() const {
     93         auto keepAlive = s;
     94         return make_observable_dynamic<T>([=](subscriber<T> o){
     95             if (keepAlive.get_subscription().is_subscribed()) {
     96                 o.on_next(get_value());
     97             }
     98             keepAlive.add(s.get_subscriber(), std::move(o));
     99         });
    100     }
    101 };
    102 
    103 }
    104 
    105 }
    106 
    107 #endif
    108