1 #include "rxcpp/rx.hpp" 2 3 #include "rxcpp/rx-test.hpp" 4 #include "catch.hpp" 5 6 SCENARIO("subscribe by subscriber"){ 7 printf("//! [subscribe by subscriber]\n"); 8 auto subscriber = rxcpp::make_subscriber<int>( 9 [](int v){printf("OnNext: %d\n", v);}, 10 [](){printf("OnCompleted\n");}); 11 auto values = rxcpp::observable<>::range(1, 3); 12 values.subscribe(subscriber); 13 printf("//! [subscribe by subscriber]\n"); 14 } 15 16 SCENARIO("subscribe by observer"){ 17 printf("//! [subscribe by observer]\n"); 18 auto subscriber = rxcpp::make_subscriber<int>( 19 [](int v){printf("OnNext: %d\n", v);}, 20 [](){printf("OnCompleted\n");}); 21 auto values1 = rxcpp::observable<>::range(1, 3); 22 auto values2 = rxcpp::observable<>::range(4, 6); 23 values1.subscribe(subscriber.get_observer()); 24 values2.subscribe(subscriber.get_observer()); 25 printf("//! [subscribe by observer]\n"); 26 } 27 28 SCENARIO("subscribe by on_next"){ 29 printf("//! [subscribe by on_next]\n"); 30 auto values = rxcpp::observable<>::range(1, 3); 31 values.subscribe( 32 [](int v){printf("OnNext: %d\n", v);}); 33 printf("//! [subscribe by on_next]\n"); 34 } 35 36 SCENARIO("subscribe by on_next and on_error"){ 37 printf("//! [subscribe by on_next and on_error]\n"); 38 auto values = rxcpp::observable<>::range(1, 3). 39 concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))); 40 values.subscribe( 41 [](int v){printf("OnNext: %d\n", v);}, 42 [](std::exception_ptr ep){ 43 try {std::rethrow_exception(ep);} 44 catch (const std::exception& ex) { 45 printf("OnError: %s\n", ex.what()); 46 } 47 }); 48 printf("//! [subscribe by on_next and on_error]\n"); 49 } 50 51 SCENARIO("subscribe by on_next and on_completed"){ 52 printf("//! [subscribe by on_next and on_completed]\n"); 53 auto values = rxcpp::observable<>::range(1, 3); 54 values.subscribe( 55 [](int v){printf("OnNext: %d\n", v);}, 56 [](){printf("OnCompleted\n");}); 57 printf("//! [subscribe by on_next and on_completed]\n"); 58 } 59 60 SCENARIO("subscribe by subscription, on_next, and on_completed"){ 61 printf("//! [subscribe by subscription, on_next, and on_completed]\n"); 62 auto subscription = rxcpp::composite_subscription(); 63 auto values = rxcpp::observable<>::range(1, 5); 64 values.subscribe( 65 subscription, 66 [&subscription](int v){ 67 printf("OnNext: %d\n", v); 68 if (v == 3) 69 subscription.unsubscribe(); 70 }, 71 [](){printf("OnCompleted\n");}); 72 printf("//! [subscribe by subscription, on_next, and on_completed]\n"); 73 } 74 75 SCENARIO("subscribe by on_next, on_error, and on_completed"){ 76 printf("//! [subscribe by on_next, on_error, and on_completed]\n"); 77 auto values = rxcpp::observable<>::range(1, 3). 78 concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))); 79 values.subscribe( 80 [](int v){printf("OnNext: %d\n", v);}, 81 [](std::exception_ptr ep){ 82 try {std::rethrow_exception(ep);} 83 catch (const std::exception& ex) { 84 printf("OnError: %s\n", ex.what()); 85 } 86 }, 87 [](){printf("OnCompleted\n");}); 88 printf("//! [subscribe by on_next, on_error, and on_completed]\n"); 89 } 90 91 SCENARIO("subscribe unsubscribe"){ 92 printf("//! [subscribe unsubscribe]\n"); 93 auto values = rxcpp::observable<>::range(1, 3). 94 concat(rxcpp::observable<>::never<int>()). 95 finally([](){printf("The final action\n");}); 96 auto subscription = values.subscribe( 97 [](int v){printf("OnNext: %d\n", v);}, 98 [](){printf("OnCompleted\n");}); 99 subscription.unsubscribe(); 100 printf("//! [subscribe unsubscribe]\n"); 101 } 102