1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_BEHAVIOR_HPP) 6 #define RXCPP_RX_BEHAVIOR_HPP 7 8 #include "../rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace subjects { 13 14 namespace detail { 15 16 template<class T> 17 class behavior_observer : public detail::multicast_observer<T> 18 { 19 typedef behavior_observer<T> this_type; 20 typedef detail::multicast_observer<T> base_type; 21 22 class behavior_observer_state : public std::enable_shared_from_this<behavior_observer_state> 23 { 24 mutable std::mutex lock; 25 mutable T value; 26 27 public: 28 behavior_observer_state(T first) 29 : value(first) 30 { 31 } 32 33 void reset(T v) const { 34 std::unique_lock<std::mutex> guard(lock); 35 value = std::move(v); 36 } 37 T get() const { 38 std::unique_lock<std::mutex> guard(lock); 39 return value; 40 } 41 }; 42 43 std::shared_ptr<behavior_observer_state> state; 44 45 public: 46 behavior_observer(T f, composite_subscription l) 47 : base_type(l) 48 , state(std::make_shared<behavior_observer_state>(std::move(f))) 49 { 50 } 51 52 subscriber<T> get_subscriber() const { 53 return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::behavior_observer<T>>(*this)).as_dynamic(); 54 } 55 56 T get_value() const { 57 return state->get(); 58 } 59 60 template<class V> 61 void on_next(V v) const { 62 state->reset(v); 63 base_type::on_next(std::move(v)); 64 } 65 }; 66 67 } 68 69 template<class T> 70 class behavior 71 { 72 detail::behavior_observer<T> s; 73 74 public: 75 explicit behavior(T f, composite_subscription cs = composite_subscription()) 76 : s(std::move(f), cs) 77 { 78 } 79 80 bool has_observers() const { 81 return s.has_observers(); 82 } 83 84 T get_value() const { 85 return s.get_value(); 86 } 87 88 subscriber<T> get_subscriber() const { 89 return s.get_subscriber(); 90 } 91 92 observable<T> get_observable() const { 93 auto keepAlive = s; 94 return make_observable_dynamic<T>([=](subscriber<T> o){ 95 if (keepAlive.get_subscription().is_subscribed()) { 96 o.on_next(get_value()); 97 } 98 keepAlive.add(s.get_subscriber(), std::move(o)); 99 }); 100 } 101 }; 102 103 } 104 105 } 106 107 #endif 108