1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_SUBSCRIPTION_HPP) 6 #define RXCPP_RX_SUBSCRIPTION_HPP 7 8 #include "rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace detail { 13 14 template<class F> 15 struct is_unsubscribe_function 16 { 17 struct not_void {}; 18 template<class CF> 19 static auto check(int) -> decltype((*(CF*)nullptr)()); 20 template<class CF> 21 static not_void check(...); 22 23 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value; 24 }; 25 26 } 27 28 struct tag_subscription {}; 29 struct subscription_base {typedef tag_subscription subscription_tag;}; 30 template<class T> 31 class is_subscription 32 { 33 template<class C> 34 static typename C::subscription_tag* check(int); 35 template<class C> 36 static void check(...); 37 public: 38 static const bool value = std::is_convertible<decltype(check<rxu::decay_t<T>>(0)), tag_subscription*>::value; 39 }; 40 41 template<class Unsubscribe> 42 class static_subscription 43 { 44 typedef rxu::decay_t<Unsubscribe> unsubscribe_call_type; 45 unsubscribe_call_type unsubscribe_call; 46 static_subscription() 47 { 48 } 49 public: 50 static_subscription(const static_subscription& o) 51 : unsubscribe_call(o.unsubscribe_call) 52 { 53 } 54 static_subscription(static_subscription&& o) 55 : unsubscribe_call(std::move(o.unsubscribe_call)) 56 { 57 } 58 static_subscription(unsubscribe_call_type s) 59 : unsubscribe_call(std::move(s)) 60 { 61 } 62 void unsubscribe() const { 63 unsubscribe_call(); 64 } 65 }; 66 67 class subscription : public subscription_base 68 { 69 class base_subscription_state : public std::enable_shared_from_this<base_subscription_state> 70 { 71 base_subscription_state(); 72 public: 73 74 explicit base_subscription_state(bool initial) 75 : issubscribed(initial) 76 { 77 } 78 virtual ~base_subscription_state() {} 79 virtual void unsubscribe() { 80 } 81 std::atomic<bool> issubscribed; 82 }; 83 public: 84 typedef std::weak_ptr<base_subscription_state> weak_state_type; 85 86 private: 87 template<class I> 88 struct subscription_state : public base_subscription_state 89 { 90 typedef rxu::decay_t<I> inner_t; 91 subscription_state(inner_t i) 92 : base_subscription_state(true) 93 , inner(std::move(i)) 94 { 95 } 96 virtual void unsubscribe() { 97 if (issubscribed.exchange(false)) { 98 trace_activity().unsubscribe_enter(*this); 99 inner.unsubscribe(); 100 trace_activity().unsubscribe_return(*this); 101 } 102 } 103 inner_t inner; 104 }; 105 106 protected: 107 std::shared_ptr<base_subscription_state> state; 108 109 friend bool operator<(const subscription&, const subscription&); 110 friend bool operator==(const subscription&, const subscription&); 111 112 private: 113 subscription(weak_state_type w) 114 : state(w.lock()) 115 { 116 if (!state) { 117 std::terminate(); 118 } 119 } 120 121 explicit subscription(std::shared_ptr<base_subscription_state> s) 122 : state(std::move(s)) 123 { 124 if (!state) { 125 std::terminate(); 126 } 127 } 128 public: 129 130 subscription() 131 : state(std::make_shared<base_subscription_state>(false)) 132 { 133 if (!state) { 134 std::terminate(); 135 } 136 } 137 template<class U> 138 explicit subscription(U u, typename std::enable_if<!is_subscription<U>::value, void**>::type = nullptr) 139 : state(std::make_shared<subscription_state<U>>(std::move(u))) 140 { 141 if (!state) { 142 std::terminate(); 143 } 144 } 145 template<class U> 146 explicit subscription(U u, typename std::enable_if<!std::is_same<subscription, U>::value && is_subscription<U>::value, void**>::type = nullptr) 147 // intentionally slice 148 : state(std::move((*static_cast<subscription*>(&u)).state)) 149 { 150 if (!state) { 151 std::terminate(); 152 } 153 } 154 subscription(const subscription& o) 155 : state(o.state) 156 { 157 if (!state) { 158 std::terminate(); 159 } 160 } 161 subscription(subscription&& o) 162 : state(std::move(o.state)) 163 { 164 if (!state) { 165 std::terminate(); 166 } 167 } 168 subscription& operator=(subscription o) { 169 state = std::move(o.state); 170 return *this; 171 } 172 bool is_subscribed() const { 173 if (!state) { 174 std::terminate(); 175 } 176 return state->issubscribed; 177 } 178 void unsubscribe() const { 179 if (!state) { 180 std::terminate(); 181 } 182 auto keepAlive = state; 183 state->unsubscribe(); 184 } 185 186 weak_state_type get_weak() { 187 return state; 188 } 189 190 // Atomically promote weak subscription to strong. 191 // Calls std::terminate if w has already expired. 192 static subscription lock(weak_state_type w) { 193 return subscription(w); 194 } 195 196 // Atomically try to promote weak subscription to strong. 197 // Returns an empty maybe<> if w has already expired. 198 static rxu::maybe<subscription> maybe_lock(weak_state_type w) { 199 auto strong_subscription = w.lock(); 200 if (!strong_subscription) { 201 return rxu::detail::maybe<subscription>{}; 202 } else { 203 return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}}; 204 } 205 } 206 }; 207 208 inline bool operator<(const subscription& lhs, const subscription& rhs) { 209 return lhs.state < rhs.state; 210 } 211 inline bool operator==(const subscription& lhs, const subscription& rhs) { 212 return lhs.state == rhs.state; 213 } 214 inline bool operator!=(const subscription& lhs, const subscription& rhs) { 215 return !(lhs == rhs); 216 } 217 218 219 inline auto make_subscription() 220 -> subscription { 221 return subscription(); 222 } 223 template<class I> 224 auto make_subscription(I&& i) 225 -> typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value, 226 subscription>::type { 227 return subscription(std::forward<I>(i)); 228 } 229 template<class Unsubscribe> 230 auto make_subscription(Unsubscribe&& u) 231 -> typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value, 232 subscription>::type { 233 return subscription(static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u))); 234 } 235 236 class composite_subscription; 237 238 namespace detail { 239 240 struct tag_composite_subscription_empty {}; 241 242 class composite_subscription_inner 243 { 244 private: 245 typedef subscription::weak_state_type weak_subscription; 246 struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state> 247 { 248 // invariant: cannot access this data without the lock held. 249 std::set<subscription> subscriptions; 250 // double checked locking: 251 // issubscribed must be loaded again after each lock acquisition. 252 // invariant: 253 // never call subscription::unsubscribe with lock held. 254 std::mutex lock; 255 // invariant: transitions from 'true' to 'false' exactly once, at any time. 256 std::atomic<bool> issubscribed; 257 258 ~composite_subscription_state() 259 { 260 std::unique_lock<decltype(lock)> guard(lock); 261 subscriptions.clear(); 262 } 263 264 composite_subscription_state() 265 : issubscribed(true) 266 { 267 } 268 composite_subscription_state(tag_composite_subscription_empty) 269 : issubscribed(false) 270 { 271 } 272 273 // Atomically add 's' to the set of subscriptions. 274 // 275 // If unsubscribe() has already occurred, this immediately 276 // calls s.unsubscribe(). 277 // 278 // cs.unsubscribe() [must] happens-before s.unsubscribe() 279 // 280 // Due to the un-atomic nature of calling 's.unsubscribe()', 281 // it is possible to observe the unintuitive 282 // add(s)=>s.unsubscribe() prior 283 // to any of the unsubscribe()=>sN.unsubscribe(). 284 inline weak_subscription add(subscription s) { 285 if (!issubscribed) { // load.acq [seq_cst] 286 s.unsubscribe(); 287 } else if (s.is_subscribed()) { 288 std::unique_lock<decltype(lock)> guard(lock); 289 if (!issubscribed) { // load.acq [seq_cst] 290 // unsubscribe was called concurrently. 291 guard.unlock(); 292 // invariant: do not call unsubscribe with lock held. 293 s.unsubscribe(); 294 } else { 295 subscriptions.insert(s); 296 } 297 } 298 return s.get_weak(); 299 } 300 301 // Atomically remove 'w' from the set of subscriptions. 302 // 303 // This does nothing if 'w' was already previously removed, 304 // or refers to an expired value. 305 inline void remove(weak_subscription w) { 306 if (issubscribed) { // load.acq [seq_cst] 307 rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w); 308 309 if (maybe_subscription.empty()) { 310 // Do nothing if the subscription has already expired. 311 return; 312 } 313 314 std::unique_lock<decltype(lock)> guard(lock); 315 // invariant: subscriptions must be accessed under the lock. 316 317 if (issubscribed) { // load.acq [seq_cst] 318 subscription& s = maybe_subscription.get(); 319 subscriptions.erase(std::move(s)); 320 } // else unsubscribe() was called concurrently; this becomes a no-op. 321 } 322 } 323 324 // Atomically clear all subscriptions that were observably added 325 // (and not subsequently observably removed). 326 // 327 // Un-atomically call unsubscribe on those subscriptions. 328 // 329 // forall subscriptions in {add(s1),add(s2),...} 330 // - {remove(s3), remove(s4), ...}: 331 // cs.unsubscribe() || cs.clear() happens before s.unsubscribe() 332 // 333 // cs.unsubscribe() observed-before cs.clear ==> do nothing. 334 inline void clear() { 335 if (issubscribed) { // load.acq [seq_cst] 336 std::unique_lock<decltype(lock)> guard(lock); 337 338 if (!issubscribed) { // load.acq [seq_cst] 339 // unsubscribe was called concurrently. 340 return; 341 } 342 343 std::set<subscription> v(std::move(subscriptions)); 344 // invariant: do not call unsubscribe with lock held. 345 guard.unlock(); 346 std::for_each(v.begin(), v.end(), 347 [](const subscription& s) { 348 s.unsubscribe(); }); 349 } 350 } 351 352 // Atomically clear all subscriptions that were observably added 353 // (and not subsequently observably removed). 354 // 355 // Un-atomically call unsubscribe on those subscriptions. 356 // 357 // Switches to an 'unsubscribed' state, all subsequent 358 // adds are immediately unsubscribed. 359 // 360 // cs.unsubscribe() [must] happens-before 361 // cs.add(s) ==> s.unsubscribe() 362 // 363 // forall subscriptions in {add(s1),add(s2),...} 364 // - {remove(s3), remove(s4), ...}: 365 // cs.unsubscribe() || cs.clear() happens before s.unsubscribe() 366 inline void unsubscribe() { 367 if (issubscribed.exchange(false)) { // cas.acq_rel [seq_cst] 368 std::unique_lock<decltype(lock)> guard(lock); 369 370 // is_subscribed can only transition to 'false' once, 371 // does not need an extra atomic access here. 372 373 std::set<subscription> v(std::move(subscriptions)); 374 // invariant: do not call unsubscribe with lock held. 375 guard.unlock(); 376 std::for_each(v.begin(), v.end(), 377 [](const subscription& s) { 378 s.unsubscribe(); }); 379 } 380 } 381 }; 382 383 public: 384 typedef std::shared_ptr<composite_subscription_state> shared_state_type; 385 386 protected: 387 mutable shared_state_type state; 388 389 public: 390 composite_subscription_inner() 391 : state(std::make_shared<composite_subscription_state>()) 392 { 393 } 394 composite_subscription_inner(tag_composite_subscription_empty et) 395 : state(std::make_shared<composite_subscription_state>(et)) 396 { 397 } 398 399 composite_subscription_inner(const composite_subscription_inner& o) 400 : state(o.state) 401 { 402 if (!state) { 403 std::terminate(); 404 } 405 } 406 composite_subscription_inner(composite_subscription_inner&& o) 407 : state(std::move(o.state)) 408 { 409 if (!state) { 410 std::terminate(); 411 } 412 } 413 414 composite_subscription_inner& operator=(composite_subscription_inner o) 415 { 416 state = std::move(o.state); 417 if (!state) { 418 std::terminate(); 419 } 420 return *this; 421 } 422 423 inline weak_subscription add(subscription s) const { 424 if (!state) { 425 std::terminate(); 426 } 427 return state->add(std::move(s)); 428 } 429 inline void remove(weak_subscription w) const { 430 if (!state) { 431 std::terminate(); 432 } 433 state->remove(std::move(w)); 434 } 435 inline void clear() const { 436 if (!state) { 437 std::terminate(); 438 } 439 state->clear(); 440 } 441 inline void unsubscribe() { 442 if (!state) { 443 std::terminate(); 444 } 445 state->unsubscribe(); 446 } 447 }; 448 449 inline composite_subscription shared_empty(); 450 451 } 452 453 /*! 454 \brief controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe. 455 456 \ingroup group-core 457 458 */ 459 class composite_subscription 460 : protected detail::composite_subscription_inner 461 , public subscription 462 { 463 typedef detail::composite_subscription_inner inner_type; 464 public: 465 typedef subscription::weak_state_type weak_subscription; 466 467 composite_subscription(detail::tag_composite_subscription_empty et) 468 : inner_type(et) 469 , subscription() // use empty base 470 { 471 } 472 473 public: 474 475 composite_subscription() 476 : inner_type() 477 , subscription(*static_cast<const inner_type*>(this)) 478 { 479 } 480 481 composite_subscription(const composite_subscription& o) 482 : inner_type(o) 483 , subscription(static_cast<const subscription&>(o)) 484 { 485 } 486 composite_subscription(composite_subscription&& o) 487 : inner_type(std::move(o)) 488 , subscription(std::move(static_cast<subscription&>(o))) 489 { 490 } 491 492 composite_subscription& operator=(composite_subscription o) 493 { 494 inner_type::operator=(std::move(o)); 495 subscription::operator=(std::move(*static_cast<subscription*>(&o))); 496 return *this; 497 } 498 499 static inline composite_subscription empty() { 500 return detail::shared_empty(); 501 } 502 503 using subscription::is_subscribed; 504 using subscription::unsubscribe; 505 506 using inner_type::clear; 507 508 inline weak_subscription add(subscription s) const { 509 if (s == static_cast<const subscription&>(*this)) { 510 // do not nest the same subscription 511 std::terminate(); 512 //return s.get_weak(); 513 } 514 auto that = this->subscription::state.get(); 515 trace_activity().subscription_add_enter(*that, s); 516 auto w = inner_type::add(std::move(s)); 517 trace_activity().subscription_add_return(*that); 518 return w; 519 } 520 521 template<class F> 522 auto add(F f) const 523 -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type { 524 return add(make_subscription(std::move(f))); 525 } 526 527 inline void remove(weak_subscription w) const { 528 auto that = this->subscription::state.get(); 529 trace_activity().subscription_remove_enter(*that, w); 530 inner_type::remove(w); 531 trace_activity().subscription_remove_return(*that); 532 } 533 }; 534 535 inline bool operator<(const composite_subscription& lhs, const composite_subscription& rhs) { 536 return static_cast<const subscription&>(lhs) < static_cast<const subscription&>(rhs); 537 } 538 inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) { 539 return static_cast<const subscription&>(lhs) == static_cast<const subscription&>(rhs); 540 } 541 inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) { 542 return !(lhs == rhs); 543 } 544 545 namespace detail { 546 547 inline composite_subscription shared_empty() { 548 static composite_subscription shared_empty = composite_subscription(tag_composite_subscription_empty()); 549 return shared_empty; 550 } 551 552 } 553 554 template<class T> 555 class resource : public subscription_base 556 { 557 public: 558 typedef typename composite_subscription::weak_subscription weak_subscription; 559 560 resource() 561 : lifetime(composite_subscription()) 562 , value(std::make_shared<rxu::detail::maybe<T>>()) 563 { 564 } 565 566 explicit resource(T t, composite_subscription cs = composite_subscription()) 567 : lifetime(std::move(cs)) 568 , value(std::make_shared<rxu::detail::maybe<T>>(rxu::detail::maybe<T>(std::move(t)))) 569 { 570 auto localValue = value; 571 lifetime.add( 572 [localValue](){ 573 localValue->reset(); 574 } 575 ); 576 } 577 578 T& get() { 579 return value.get()->get(); 580 } 581 composite_subscription& get_subscription() { 582 return lifetime; 583 } 584 585 bool is_subscribed() const { 586 return lifetime.is_subscribed(); 587 } 588 weak_subscription add(subscription s) const { 589 return lifetime.add(std::move(s)); 590 } 591 template<class F> 592 auto add(F f) const 593 -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type { 594 return lifetime.add(make_subscription(std::move(f))); 595 } 596 void remove(weak_subscription w) const { 597 return lifetime.remove(std::move(w)); 598 } 599 void clear() const { 600 return lifetime.clear(); 601 } 602 void unsubscribe() const { 603 return lifetime.unsubscribe(); 604 } 605 606 protected: 607 composite_subscription lifetime; 608 std::shared_ptr<rxu::detail::maybe<T>> value; 609 }; 610 611 } 612 613 #endif 614