Home | History | Annotate | Download | only in rxcpp
      1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
      2 
      3 #pragma once
      4 
      5 #if !defined(RXCPP_RX_NOTIFICATION_HPP)
      6 #define RXCPP_RX_NOTIFICATION_HPP
      7 
      8 #include "rx-includes.hpp"
      9 
     10 namespace rxcpp {
     11 
     12 namespace notifications {
     13 
     14 class subscription
     15 {
     16     long s;
     17     long u;
     18 
     19 public:
     20     explicit inline subscription(long s)
     21         : s(s), u(std::numeric_limits<long>::max()) {
     22     }
     23     inline subscription(long s, long u)
     24         : s(s), u(u) {
     25     }
     26     inline long subscribe() const {
     27         return s;
     28     }
     29     inline long unsubscribe() const {
     30         return u;
     31     }
     32 };
     33 
     34 inline bool operator == (subscription lhs, subscription rhs) {
     35     return lhs.subscribe() == rhs.subscribe() && lhs.unsubscribe() == rhs.unsubscribe();
     36 }
     37 
     38 inline std::ostream& operator<< (std::ostream& out, const subscription& s) {
     39     out << s.subscribe() << "-" << s.unsubscribe();
     40     return out;
     41 }
     42 
     43 namespace detail {
     44 
     45 template<typename T>
     46 struct notification_base
     47     : public std::enable_shared_from_this<notification_base<T>>
     48 {
     49     typedef subscriber<T> observer_type;
     50     typedef std::shared_ptr<notification_base<T>> type;
     51 
     52     virtual ~notification_base() {}
     53 
     54     virtual void out(std::ostream& out) const =0;
     55     virtual bool equals(const type& other) const = 0;
     56     virtual void accept(const observer_type& o) const =0;
     57 };
     58 
     59 template<class T>
     60 std::ostream& operator<< (std::ostream& out, const std::vector<T>& v);
     61 
     62 template<class T>
     63 auto to_stream(std::ostream& os, const T& t, int, int)
     64     -> decltype(os << t) {
     65     return      os << t;
     66 }
     67 
     68 #if RXCPP_USE_RTTI
     69 template<class T>
     70 std::ostream& to_stream(std::ostream& os, const T&, int, ...) {
     71     return os << "< " << typeid(T).name() << " does not support ostream>";
     72 }
     73 #endif
     74 
     75 template<class T>
     76 std::ostream& to_stream(std::ostream& os, const T&, ...) {
     77     return os << "<the value does not support ostream>";
     78 }
     79 
     80 template<class T>
     81 inline std::ostream& ostreamvector (std::ostream& os, const std::vector<T>& v) {
     82     os << "[";
     83     bool doemit = false;
     84     for(auto& i : v) {
     85         if (doemit) {
     86             os << ", ";
     87         } else {
     88             doemit = true;
     89         }
     90         to_stream(os, i, 0, 0);
     91     }
     92     os << "]";
     93     return os;
     94 }
     95 
     96 template<class T>
     97 inline std::ostream& operator<< (std::ostream& os, const std::vector<T>& v) {
     98     return ostreamvector(os, v);
     99 }
    100 
    101 template<class T>
    102 auto equals(const T& lhs, const T& rhs, int)
    103     -> decltype(bool(lhs == rhs)) {
    104     return lhs == rhs;
    105 }
    106 
    107 template<class T>
    108 bool equals(const T&, const T&, ...) {
    109     rxu::throw_exception(std::runtime_error("value does not support equality tests"));
    110     return false;
    111 }
    112 
    113 }
    114 
    115 template<typename T>
    116 struct notification
    117 {
    118     typedef typename detail::notification_base<T>::type type;
    119     typedef typename detail::notification_base<T>::observer_type observer_type;
    120 
    121 private:
    122     typedef detail::notification_base<T> base;
    123 
    124     struct on_next_notification : public base {
    125         on_next_notification(T value) : value(std::move(value)) {
    126         }
    127         on_next_notification(const on_next_notification& o) : value(o.value) {}
    128         on_next_notification(const on_next_notification&& o) : value(std::move(o.value)) {}
    129         on_next_notification& operator=(on_next_notification o) { value = std::move(o.value); return *this; }
    130         virtual void out(std::ostream& os) const {
    131             os << "on_next( ";
    132             detail::to_stream(os, value, 0, 0);
    133             os << ")";
    134         }
    135         virtual bool equals(const typename base::type& other) const {
    136             bool result = false;
    137             other->accept(make_subscriber<T>(make_observer_dynamic<T>([this, &result](T v) {
    138                     result = detail::equals(this->value, v, 0);
    139                 })));
    140             return result;
    141         }
    142         virtual void accept(const typename base::observer_type& o) const {
    143             o.on_next(value);
    144         }
    145         const T value;
    146     };
    147 
    148     struct on_error_notification : public base {
    149         on_error_notification(rxu::error_ptr ep) : ep(ep) {
    150         }
    151         on_error_notification(const on_error_notification& o) : ep(o.ep) {}
    152         on_error_notification(const on_error_notification&& o) : ep(std::move(o.ep)) {}
    153         on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep); return *this; }
    154         virtual void out(std::ostream& os) const {
    155             os << "on_error(";
    156             os << rxu::what(ep);
    157             os << ")";
    158         }
    159         virtual bool equals(const typename base::type& other) const {
    160             bool result = false;
    161             // not trying to compare exceptions
    162             other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](rxu::error_ptr){
    163                 result = true;
    164             })));
    165             return result;
    166         }
    167         virtual void accept(const typename base::observer_type& o) const {
    168             o.on_error(ep);
    169         }
    170         const rxu::error_ptr ep;
    171     };
    172 
    173     struct on_completed_notification : public base {
    174         on_completed_notification() {
    175         }
    176         virtual void out(std::ostream& os) const {
    177             os << "on_completed()";
    178         }
    179         virtual bool equals(const typename base::type& other) const {
    180             bool result = false;
    181             other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){
    182                 result = true;
    183             })));
    184             return result;
    185         }
    186         virtual void accept(const typename base::observer_type& o) const {
    187             o.on_completed();
    188         }
    189     };
    190 
    191     struct exception_tag {};
    192 
    193     template<typename Exception>
    194     static
    195     type make_on_error(exception_tag&&, Exception&& e) {
    196         rxu::error_ptr ep = rxu::make_error_ptr(std::forward<Exception>(e));
    197         return std::make_shared<on_error_notification>(ep);
    198     }
    199 
    200     struct exception_ptr_tag {};
    201 
    202     static
    203     type make_on_error(exception_ptr_tag&&, rxu::error_ptr ep) {
    204         return std::make_shared<on_error_notification>(ep);
    205     }
    206 
    207 public:
    208     template<typename U>
    209     static type on_next(U value) {
    210         return std::make_shared<on_next_notification>(std::move(value));
    211     }
    212 
    213     static type on_completed() {
    214         return std::make_shared<on_completed_notification>();
    215     }
    216 
    217     template<typename Exception>
    218     static type on_error(Exception&& e) {
    219         return make_on_error(typename std::conditional<
    220             std::is_same<rxu::decay_t<Exception>, rxu::error_ptr>::value,
    221                 exception_ptr_tag, exception_tag>::type(),
    222             std::forward<Exception>(e));
    223     }
    224 };
    225 
    226 template<class T>
    227 bool operator == (const std::shared_ptr<detail::notification_base<T>>& lhs, const std::shared_ptr<detail::notification_base<T>>& rhs) {
    228     if (!lhs && !rhs) {return true;}
    229     if (!lhs || !rhs) {return false;}
    230     return lhs->equals(rhs);
    231 }
    232 
    233 template<class T>
    234 std::ostream& operator<< (std::ostream& os, const std::shared_ptr<detail::notification_base<T>>& n) {
    235     n->out(os);
    236     return os;
    237 }
    238 
    239 
    240 template<class T>
    241 class recorded
    242 {
    243     long t;
    244     T v;
    245 public:
    246     recorded(long t, T v)
    247         : t(t), v(v) {
    248     }
    249     long time() const {
    250         return t;
    251     }
    252     const T& value() const {
    253         return v;
    254     }
    255 };
    256 
    257 template<class T>
    258 bool operator == (recorded<T> lhs, recorded<T> rhs) {
    259     return lhs.time() == rhs.time() && lhs.value() == rhs.value();
    260 }
    261 
    262 template<class T>
    263 std::ostream& operator<< (std::ostream& out, const recorded<T>& r) {
    264     out << "@" << r.time() << "-" << r.value();
    265     return out;
    266 }
    267 
    268 }
    269 namespace rxn=notifications;
    270 
    271 inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::subscription>& vs) {
    272     return rxcpp::notifications::detail::ostreamvector(out, vs);
    273 }
    274 template<class T>
    275 inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::recorded<T>>& vr) {
    276     return rxcpp::notifications::detail::ostreamvector(out, vr);
    277 }
    278 
    279 }
    280 
    281 #endif
    282