1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_NOTIFICATION_HPP) 6 #define RXCPP_RX_NOTIFICATION_HPP 7 8 #include "rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace notifications { 13 14 class subscription 15 { 16 long s; 17 long u; 18 19 public: 20 explicit inline subscription(long s) 21 : s(s), u(std::numeric_limits<long>::max()) { 22 } 23 inline subscription(long s, long u) 24 : s(s), u(u) { 25 } 26 inline long subscribe() const { 27 return s; 28 } 29 inline long unsubscribe() const { 30 return u; 31 } 32 }; 33 34 inline bool operator == (subscription lhs, subscription rhs) { 35 return lhs.subscribe() == rhs.subscribe() && lhs.unsubscribe() == rhs.unsubscribe(); 36 } 37 38 inline std::ostream& operator<< (std::ostream& out, const subscription& s) { 39 out << s.subscribe() << "-" << s.unsubscribe(); 40 return out; 41 } 42 43 namespace detail { 44 45 template<typename T> 46 struct notification_base 47 : public std::enable_shared_from_this<notification_base<T>> 48 { 49 typedef subscriber<T> observer_type; 50 typedef std::shared_ptr<notification_base<T>> type; 51 52 virtual ~notification_base() {} 53 54 virtual void out(std::ostream& out) const =0; 55 virtual bool equals(const type& other) const = 0; 56 virtual void accept(const observer_type& o) const =0; 57 }; 58 59 template<class T> 60 std::ostream& operator<< (std::ostream& out, const std::vector<T>& v); 61 62 template<class T> 63 auto to_stream(std::ostream& os, const T& t, int, int) 64 -> decltype(os << t) { 65 return os << t; 66 } 67 68 #if RXCPP_USE_RTTI 69 template<class T> 70 std::ostream& to_stream(std::ostream& os, const T&, int, ...) { 71 return os << "< " << typeid(T).name() << " does not support ostream>"; 72 } 73 #endif 74 75 template<class T> 76 std::ostream& to_stream(std::ostream& os, const T&, ...) { 77 return os << "<the value does not support ostream>"; 78 } 79 80 template<class T> 81 inline std::ostream& ostreamvector (std::ostream& os, const std::vector<T>& v) { 82 os << "["; 83 bool doemit = false; 84 for(auto& i : v) { 85 if (doemit) { 86 os << ", "; 87 } else { 88 doemit = true; 89 } 90 to_stream(os, i, 0, 0); 91 } 92 os << "]"; 93 return os; 94 } 95 96 template<class T> 97 inline std::ostream& operator<< (std::ostream& os, const std::vector<T>& v) { 98 return ostreamvector(os, v); 99 } 100 101 template<class T> 102 auto equals(const T& lhs, const T& rhs, int) 103 -> decltype(bool(lhs == rhs)) { 104 return lhs == rhs; 105 } 106 107 template<class T> 108 bool equals(const T&, const T&, ...) { 109 rxu::throw_exception(std::runtime_error("value does not support equality tests")); 110 return false; 111 } 112 113 } 114 115 template<typename T> 116 struct notification 117 { 118 typedef typename detail::notification_base<T>::type type; 119 typedef typename detail::notification_base<T>::observer_type observer_type; 120 121 private: 122 typedef detail::notification_base<T> base; 123 124 struct on_next_notification : public base { 125 on_next_notification(T value) : value(std::move(value)) { 126 } 127 on_next_notification(const on_next_notification& o) : value(o.value) {} 128 on_next_notification(const on_next_notification&& o) : value(std::move(o.value)) {} 129 on_next_notification& operator=(on_next_notification o) { value = std::move(o.value); return *this; } 130 virtual void out(std::ostream& os) const { 131 os << "on_next( "; 132 detail::to_stream(os, value, 0, 0); 133 os << ")"; 134 } 135 virtual bool equals(const typename base::type& other) const { 136 bool result = false; 137 other->accept(make_subscriber<T>(make_observer_dynamic<T>([this, &result](T v) { 138 result = detail::equals(this->value, v, 0); 139 }))); 140 return result; 141 } 142 virtual void accept(const typename base::observer_type& o) const { 143 o.on_next(value); 144 } 145 const T value; 146 }; 147 148 struct on_error_notification : public base { 149 on_error_notification(rxu::error_ptr ep) : ep(ep) { 150 } 151 on_error_notification(const on_error_notification& o) : ep(o.ep) {} 152 on_error_notification(const on_error_notification&& o) : ep(std::move(o.ep)) {} 153 on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep); return *this; } 154 virtual void out(std::ostream& os) const { 155 os << "on_error("; 156 os << rxu::what(ep); 157 os << ")"; 158 } 159 virtual bool equals(const typename base::type& other) const { 160 bool result = false; 161 // not trying to compare exceptions 162 other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](rxu::error_ptr){ 163 result = true; 164 }))); 165 return result; 166 } 167 virtual void accept(const typename base::observer_type& o) const { 168 o.on_error(ep); 169 } 170 const rxu::error_ptr ep; 171 }; 172 173 struct on_completed_notification : public base { 174 on_completed_notification() { 175 } 176 virtual void out(std::ostream& os) const { 177 os << "on_completed()"; 178 } 179 virtual bool equals(const typename base::type& other) const { 180 bool result = false; 181 other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){ 182 result = true; 183 }))); 184 return result; 185 } 186 virtual void accept(const typename base::observer_type& o) const { 187 o.on_completed(); 188 } 189 }; 190 191 struct exception_tag {}; 192 193 template<typename Exception> 194 static 195 type make_on_error(exception_tag&&, Exception&& e) { 196 rxu::error_ptr ep = rxu::make_error_ptr(std::forward<Exception>(e)); 197 return std::make_shared<on_error_notification>(ep); 198 } 199 200 struct exception_ptr_tag {}; 201 202 static 203 type make_on_error(exception_ptr_tag&&, rxu::error_ptr ep) { 204 return std::make_shared<on_error_notification>(ep); 205 } 206 207 public: 208 template<typename U> 209 static type on_next(U value) { 210 return std::make_shared<on_next_notification>(std::move(value)); 211 } 212 213 static type on_completed() { 214 return std::make_shared<on_completed_notification>(); 215 } 216 217 template<typename Exception> 218 static type on_error(Exception&& e) { 219 return make_on_error(typename std::conditional< 220 std::is_same<rxu::decay_t<Exception>, rxu::error_ptr>::value, 221 exception_ptr_tag, exception_tag>::type(), 222 std::forward<Exception>(e)); 223 } 224 }; 225 226 template<class T> 227 bool operator == (const std::shared_ptr<detail::notification_base<T>>& lhs, const std::shared_ptr<detail::notification_base<T>>& rhs) { 228 if (!lhs && !rhs) {return true;} 229 if (!lhs || !rhs) {return false;} 230 return lhs->equals(rhs); 231 } 232 233 template<class T> 234 std::ostream& operator<< (std::ostream& os, const std::shared_ptr<detail::notification_base<T>>& n) { 235 n->out(os); 236 return os; 237 } 238 239 240 template<class T> 241 class recorded 242 { 243 long t; 244 T v; 245 public: 246 recorded(long t, T v) 247 : t(t), v(v) { 248 } 249 long time() const { 250 return t; 251 } 252 const T& value() const { 253 return v; 254 } 255 }; 256 257 template<class T> 258 bool operator == (recorded<T> lhs, recorded<T> rhs) { 259 return lhs.time() == rhs.time() && lhs.value() == rhs.value(); 260 } 261 262 template<class T> 263 std::ostream& operator<< (std::ostream& out, const recorded<T>& r) { 264 out << "@" << r.time() << "-" << r.value(); 265 return out; 266 } 267 268 } 269 namespace rxn=notifications; 270 271 inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::subscription>& vs) { 272 return rxcpp::notifications::detail::ostreamvector(out, vs); 273 } 274 template<class T> 275 inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::recorded<T>>& vr) { 276 return rxcpp::notifications::detail::ostreamvector(out, vr); 277 } 278 279 } 280 281 #endif 282