1 #include "../test.h" 2 #include "rxcpp/operators/rx-timeout.hpp" 3 4 using namespace std::chrono; 5 6 SCENARIO("should timeout if the source never emits any items", "[timeout][operators]"){ 7 GIVEN("a source"){ 8 auto sc = rxsc::make_test(); 9 auto so = rx::synchronize_in_one_worker(sc); 10 auto w = sc.create_worker(); 11 const rxsc::test::messages<int> on; 12 13 rxcpp::timeout_error ex("timeout has occurred"); 14 15 auto xs = sc.make_hot_observable({ 16 on.next(150, 1) 17 }); 18 19 WHEN("timeout is set"){ 20 21 auto res = w.start( 22 [so, xs]() { 23 return xs 24 | rxo::timeout(milliseconds(10), so); 25 } 26 ); 27 28 THEN("the error notification message is captured"){ 29 auto required = rxu::to_vector({ 30 on.error(211, ex) 31 }); 32 auto actual = res.get_observer().messages(); 33 REQUIRE(required == actual); 34 } 35 36 THEN("there was 1 subscription/unsubscription to the source"){ 37 auto required = rxu::to_vector({ 38 on.subscribe(200, 212) 39 }); 40 auto actual = xs.subscriptions(); 41 REQUIRE(required == actual); 42 } 43 } 44 } 45 } 46 47 SCENARIO("should not timeout if completed before the specified timeout duration", "[timeout][operators]"){ 48 GIVEN("a source"){ 49 auto sc = rxsc::make_test(); 50 auto so = rx::synchronize_in_one_worker(sc); 51 auto w = sc.create_worker(); 52 const rxsc::test::messages<int> on; 53 54 auto xs = sc.make_hot_observable({ 55 on.next(150, 1), 56 on.completed(250) 57 }); 58 59 WHEN("timeout is set"){ 60 61 auto res = w.start( 62 [so, xs]() { 63 return xs.timeout(so, milliseconds(100)); 64 } 65 ); 66 67 THEN("the output only contains complete message"){ 68 auto required = rxu::to_vector({ 69 on.completed(251) 70 }); 71 auto actual = res.get_observer().messages(); 72 REQUIRE(required == actual); 73 } 74 75 THEN("there was 1 subscription/unsubscription to the source"){ 76 auto required = rxu::to_vector({ 77 on.subscribe(200, 250) 78 }); 79 auto actual = xs.subscriptions(); 80 REQUIRE(required == actual); 81 } 82 83 } 84 } 85 } 86 87 SCENARIO("should not timeout if all items are emitted within the specified timeout duration", "[timeout][operators]"){ 88 GIVEN("a source"){ 89 auto sc = rxsc::make_test(); 90 auto so = rx::synchronize_in_one_worker(sc); 91 auto w = sc.create_worker(); 92 const rxsc::test::messages<int> on; 93 94 auto xs = sc.make_hot_observable({ 95 on.next(150, 1), 96 on.next(210, 2), 97 on.next(240, 3), 98 on.completed(250) 99 }); 100 101 WHEN("timeout is set"){ 102 103 auto res = w.start( 104 [so, xs]() { 105 return xs.timeout(milliseconds(40), so); 106 } 107 ); 108 109 THEN("the output contains the emitted items while subscribed"){ 110 auto required = rxu::to_vector({ 111 on.next(211, 2), 112 on.next(241, 3), 113 on.completed(251) 114 }); 115 auto actual = res.get_observer().messages(); 116 REQUIRE(required == actual); 117 } 118 119 THEN("there was 1 subscription/unsubscription to the source"){ 120 auto required = rxu::to_vector({ 121 on.subscribe(200, 250) 122 }); 123 auto actual = xs.subscriptions(); 124 REQUIRE(required == actual); 125 } 126 127 } 128 } 129 } 130 131 SCENARIO("should timeout if there are no emitted items within the timeout duration", "[timeout][operators]"){ 132 GIVEN("a source"){ 133 auto sc = rxsc::make_test(); 134 auto so = rx::synchronize_in_one_worker(sc); 135 auto w = sc.create_worker(); 136 const rxsc::test::messages<int> on; 137 138 rxcpp::timeout_error ex("timeout has occurred"); 139 140 auto xs = sc.make_hot_observable({ 141 on.next(150, 1), 142 on.next(210, 2), 143 on.next(240, 3), 144 // -- no emissions 145 on.completed(300) 146 }); 147 148 WHEN("timeout is set"){ 149 150 auto res = w.start( 151 [so, xs]() { 152 return xs.timeout(milliseconds(40), so); 153 } 154 ); 155 156 THEN("an error notification message is captured"){ 157 auto required = rxu::to_vector({ 158 on.next(211, 2), 159 on.next(241, 3), 160 on.error(281, ex) 161 }); 162 auto actual = res.get_observer().messages(); 163 REQUIRE(required == actual); 164 } 165 166 THEN("there was 1 subscription/unsubscription to the source"){ 167 auto required = rxu::to_vector({ 168 on.subscribe(200, 282) 169 }); 170 auto actual = xs.subscriptions(); 171 REQUIRE(required == actual); 172 } 173 174 } 175 } 176 } 177 178 SCENARIO("should not timeout if there is an error", "[timeout][operators]"){ 179 GIVEN("a source"){ 180 auto sc = rxsc::make_test(); 181 auto so = rx::synchronize_in_one_worker(sc); 182 auto w = sc.create_worker(); 183 const rxsc::test::messages<int> on; 184 185 std::runtime_error ex("on_error from source"); 186 187 auto xs = sc.make_hot_observable({ 188 on.next(150, 1), 189 on.error(250, ex) 190 }); 191 192 WHEN("timeout is set"){ 193 194 auto res = w.start( 195 [so, xs]() { 196 return xs.timeout(milliseconds(100), so); 197 } 198 ); 199 200 THEN("the output contains only an error message"){ 201 auto required = rxu::to_vector({ 202 on.error(251, ex) 203 }); 204 auto actual = res.get_observer().messages(); 205 REQUIRE(required == actual); 206 } 207 208 THEN("there was 1 subscription/unsubscription to the source"){ 209 auto required = rxu::to_vector({ 210 on.subscribe(200, 250) 211 }); 212 auto actual = xs.subscriptions(); 213 REQUIRE(required == actual); 214 } 215 216 } 217 } 218 } 219