1 #include "../test.h" 2 #include <rxcpp/operators/rx-delay.hpp> 3 4 using namespace std::chrono; 5 6 SCENARIO("delay - never", "[delay][operators]"){ 7 GIVEN("a source"){ 8 auto sc = rxsc::make_test(); 9 auto so = rx::synchronize_in_one_worker(sc); 10 auto w = sc.create_worker(); 11 const rxsc::test::messages<int> on; 12 13 auto xs = sc.make_hot_observable({ 14 on.next(150, 1) 15 }); 16 17 WHEN("values are delayed"){ 18 19 auto res = w.start( 20 [so, xs]() { 21 return xs | rxo::delay(milliseconds(10), so); 22 } 23 ); 24 25 THEN("the output is empty"){ 26 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 27 auto actual = res.get_observer().messages(); 28 REQUIRE(required == actual); 29 } 30 31 THEN("there was 1 subscription/unsubscription to the source"){ 32 auto required = rxu::to_vector({ 33 on.subscribe(200, 1001) 34 }); 35 auto actual = xs.subscriptions(); 36 REQUIRE(required == actual); 37 } 38 } 39 } 40 } 41 42 SCENARIO("delay - empty", "[delay][operators]"){ 43 GIVEN("a source"){ 44 auto sc = rxsc::make_test(); 45 auto so = rx::synchronize_in_one_worker(sc); 46 auto w = sc.create_worker(); 47 const rxsc::test::messages<int> on; 48 49 auto xs = sc.make_hot_observable({ 50 on.next(150, 1), 51 on.completed(250) 52 }); 53 54 WHEN("values are delayed"){ 55 56 auto res = w.start( 57 [so, xs]() { 58 return xs.delay(so, milliseconds(10)); 59 } 60 ); 61 62 THEN("the output only contains complete message"){ 63 auto required = rxu::to_vector({ 64 on.completed(260) 65 }); 66 auto actual = res.get_observer().messages(); 67 REQUIRE(required == actual); 68 } 69 70 THEN("there was 1 subscription/unsubscription to the source"){ 71 auto required = rxu::to_vector({ 72 on.subscribe(200, 250) 73 }); 74 auto actual = xs.subscriptions(); 75 REQUIRE(required == actual); 76 } 77 78 } 79 } 80 } 81 82 SCENARIO("delay - return", "[delay][operators]"){ 83 GIVEN("a source"){ 84 auto sc = rxsc::make_test(); 85 auto so = rx::synchronize_in_one_worker(sc); 86 auto w = sc.create_worker(); 87 const rxsc::test::messages<int> on; 88 89 auto xs = sc.make_hot_observable({ 90 on.next(150, 1), 91 on.next(210, 2), 92 on.next(240, 3), 93 on.completed(300) 94 }); 95 96 WHEN("values are delayed"){ 97 98 auto res = w.start( 99 [so, xs]() { 100 return xs.delay(milliseconds(10), so); 101 } 102 ); 103 104 THEN("the output only contains delayed items sent while subscribed"){ 105 auto required = rxu::to_vector({ 106 on.next(220, 2), 107 on.next(250, 3), 108 on.completed(310) 109 }); 110 auto actual = res.get_observer().messages(); 111 REQUIRE(required == actual); 112 } 113 114 THEN("there was 1 subscription/unsubscription to the source"){ 115 auto required = rxu::to_vector({ 116 on.subscribe(200, 300) 117 }); 118 auto actual = xs.subscriptions(); 119 REQUIRE(required == actual); 120 } 121 122 } 123 } 124 } 125 126 SCENARIO("delay - throw", "[delay][operators]"){ 127 GIVEN("a source"){ 128 auto sc = rxsc::make_test(); 129 auto so = rx::synchronize_in_one_worker(sc); 130 auto w = sc.create_worker(); 131 const rxsc::test::messages<int> on; 132 133 std::runtime_error ex("delay on_error from source"); 134 135 auto xs = sc.make_hot_observable({ 136 on.next(150, 1), 137 on.error(250, ex) 138 }); 139 140 WHEN("values are delayed"){ 141 142 auto res = w.start( 143 [so, xs]() { 144 return xs.delay(milliseconds(10), so); 145 } 146 ); 147 148 THEN("the output only contains only error"){ 149 auto required = rxu::to_vector({ 150 on.error(251, ex) 151 }); 152 auto actual = res.get_observer().messages(); 153 REQUIRE(required == actual); 154 } 155 156 THEN("there was 1 subscription/unsubscription to the source"){ 157 auto required = rxu::to_vector({ 158 on.subscribe(200, 250) 159 }); 160 auto actual = xs.subscriptions(); 161 REQUIRE(required == actual); 162 } 163 164 } 165 } 166 } 167