1 #include "../test.h" 2 #include <rxcpp/operators/rx-concat.hpp> 3 #include <rxcpp/operators/rx-group_by.hpp> 4 #include <rxcpp/operators/rx-reduce.hpp> 5 #include <rxcpp/operators/rx-map.hpp> 6 #include <rxcpp/operators/rx-merge.hpp> 7 #include <rxcpp/operators/rx-take.hpp> 8 #include <rxcpp/operators/rx-start_with.hpp> 9 #include <rxcpp/operators/rx-observe_on.hpp> 10 11 #include <locale> 12 #include <sstream> 13 14 SCENARIO("range partitioned by group_by across hardware threads to derive pi", "[!hide][pi][group_by][observe_on][long][perf]"){ 15 GIVEN("a for loop"){ 16 WHEN("partitioning pi series across all hardware threads"){ 17 18 std::atomic_int c; 19 c = 0; 20 auto pi = [&](int k) { 21 ++c; 22 return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L ); 23 }; 24 25 using namespace std::chrono; 26 auto start = steady_clock::now(); 27 28 // share an output thread across all the producer threads 29 auto outputthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler()); 30 31 struct work 32 { 33 int index; 34 int first; 35 int last; 36 }; 37 38 // use all available hardware threads 39 auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1). 40 map( 41 [](int index){ 42 static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency()); 43 int first = (chunk * index) + 1; 44 int last = chunk * (index + 1); 45 return work{index, first, last};} 46 ). 47 group_by( 48 [](work w) -> int {return w.index % std::thread::hardware_concurrency();}, 49 [](work w){return w;}). 50 map( 51 [=](rxcpp::grouped_observable<int, work> onproc) { 52 auto key = onproc.get_key(); 53 // share a producer thread across all the ranges in this group of chunks 54 auto producerthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler()); 55 return onproc. 56 map( 57 [=](work w){ 58 std::stringstream message; 59 message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last; 60 61 return rxcpp::observable<>::range(w.first, w.last, producerthread). 62 map(pi). 63 sum(). // each thread maps and reduces its contribution to the answer 64 map( 65 [=](long double v){ 66 std::stringstream message; 67 message << key << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v; 68 return std::make_tuple(message.str(), v); 69 }). 70 start_with(std::make_tuple(message.str(), 0.0L)). 71 as_dynamic(); 72 }). 73 concat(). // only subscribe to one range at a time in this group. 74 observe_on(outputthread). 75 map(rxcpp::util::apply_to( 76 [](std::string message, long double v){ 77 std::cout << message << std::endl; 78 return v; 79 })). 80 as_dynamic(); 81 }). 82 merge(). 83 sum(). // reduces the contributions from all the threads to the answer 84 as_blocking(). 85 last(); 86 87 std::cout << std::setprecision(16) << "Pi: " << total << std::endl; 88 auto finish = steady_clock::now(); 89 auto msElapsed = duration_cast<milliseconds>(finish-start); 90 std::cout << "pi using group_by and concat to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 91 92 } 93 } 94 } 95 96 SCENARIO("range partitioned by dividing work across hardware threads to derive pi", "[!hide][pi][observe_on][long][perf]"){ 97 GIVEN("a for loop"){ 98 WHEN("partitioning pi series across all hardware threads"){ 99 100 std::atomic_int c; 101 c = 0; 102 auto pi = [&](int k) { 103 ++c; 104 return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L ); 105 }; 106 107 using namespace std::chrono; 108 auto start = steady_clock::now(); 109 110 struct work 111 { 112 int index; 113 int first; 114 int last; 115 }; 116 117 // use all available hardware threads 118 auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1). 119 map( 120 [](int index){ 121 static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency()); 122 int first = (chunk * index) + 1; 123 int last = chunk * (index + 1); 124 return work{index, first, last}; 125 }). 126 map( 127 [=](work w){ 128 std::stringstream message; 129 message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last; 130 131 // create a new thread for every chunk 132 return rxcpp::observable<>::range(w.first, w.last, rxcpp::observe_on_new_thread()). 133 map(pi). 134 sum(). // each thread maps and reduces its contribution to the answer 135 map( 136 [=](long double v){ 137 std::stringstream message; 138 message << w.index << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v; 139 return std::make_tuple(message.str(), v); 140 }). 141 start_with(std::make_tuple(message.str(), 0.0L)). 142 as_dynamic(); 143 }). 144 merge(rxcpp::observe_on_new_thread()). 145 map(rxcpp::util::apply_to( 146 [](std::string message, long double v){ 147 std::cout << message << std::endl; 148 return v; 149 })). 150 sum(). // reduces the contributions from all the threads to the answer 151 as_blocking(). 152 last(); 153 154 std::cout << std::setprecision(16) << "Pi: " << total << std::endl; 155 auto finish = steady_clock::now(); 156 auto msElapsed = duration_cast<milliseconds>(finish-start); 157 std::cout << "pi using division of the whole range to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 158 159 } 160 } 161 } 162 163 char whitespace(char c) { 164 return std::isspace<char>(c, std::locale::classic()); 165 } 166 167 std::string trim(std::string s) { 168 auto first = std::find_if_not(s.begin(), s.end(), whitespace); 169 auto last = std::find_if_not(s.rbegin(), s.rend(), whitespace); 170 if (last != s.rend()) { 171 s.erase(s.end() - (last-s.rbegin()), s.end()); 172 } 173 s.erase(s.begin(), first); 174 return s; 175 } 176 177 bool tolowerLess(char lhs, char rhs) { 178 return std::tolower(lhs, std::locale::classic()) < std::tolower(rhs, std::locale::classic()); 179 } 180 181 bool tolowerStringLess(const std::string& lhs, const std::string& rhs) { 182 return std::lexicographical_compare(lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), tolowerLess); 183 } 184 185 SCENARIO("group_by", "[group_by][operators]"){ 186 GIVEN("1 hot observable of ints."){ 187 auto sc = rxsc::make_test(); 188 auto w = sc.create_worker(); 189 const rxsc::test::messages<std::string> on; 190 int keyInvoked = 0; 191 int marbleInvoked = 0; 192 193 auto xs = sc.make_hot_observable({ 194 on.next(90, "error"), 195 on.next(110, "error"), 196 on.next(130, "error"), 197 on.next(220, " foo"), 198 on.next(240, " FoO "), 199 on.next(270, "baR "), 200 on.next(310, "foO "), 201 on.next(350, " Baz "), 202 on.next(360, " qux "), 203 on.next(390, " bar"), 204 on.next(420, " BAR "), 205 on.next(470, "FOO "), 206 on.next(480, "baz "), 207 on.next(510, " bAZ "), 208 on.next(530, " fOo "), 209 on.completed(570), 210 on.next(580, "error"), 211 on.completed(600), 212 on.error(650, std::runtime_error("error in completed sequence")) 213 }); 214 215 WHEN("group normalized strings"){ 216 217 auto res = w.start( 218 [&]() { 219 return xs 220 .group_by( 221 [&](std::string v){ 222 ++keyInvoked; 223 return trim(std::move(v)); 224 }, 225 [&](std::string v){ 226 ++marbleInvoked; 227 std::reverse(v.begin(), v.end()); 228 return v; 229 }, 230 tolowerStringLess) 231 .map([](const rxcpp::grouped_observable<std::string, std::string>& g){return g.get_key();}) 232 // forget type to workaround lambda deduction bug on msvc 2013 233 .as_dynamic(); 234 } 235 ); 236 237 THEN("the output contains groups of group keys"){ 238 auto required = rxu::to_vector({ 239 on.next(220, "foo"), 240 on.next(270, "baR"), 241 on.next(350, "Baz"), 242 on.next(360, "qux"), 243 on.completed(570) 244 }); 245 auto actual = res.get_observer().messages(); 246 REQUIRE(required == actual); 247 } 248 249 THEN("there was one subscription and one unsubscription to the xs"){ 250 auto required = rxu::to_vector({ 251 on.subscribe(200, 570) 252 }); 253 auto actual = xs.subscriptions(); 254 REQUIRE(required == actual); 255 } 256 257 THEN("key selector was invoked for each value"){ 258 REQUIRE(12 == keyInvoked); 259 } 260 261 THEN("marble selector was invoked for each value"){ 262 REQUIRE(12 == marbleInvoked); 263 } 264 } 265 } 266 } 267 268 SCENARIO("group_by take 1", "[group_by][take][operators]"){ 269 GIVEN("1 hot observable of ints."){ 270 auto sc = rxsc::make_test(); 271 auto w = sc.create_worker(); 272 const rxsc::test::messages<long> on; 273 int keyInvoked = 0; 274 int marbleInvoked = 0; 275 int groupEmitted = 0; 276 277 auto xs = sc.make_hot_observable({ 278 on.next(130, -1), 279 on.next(220, 0), 280 on.next(240, -1), 281 on.next(270, 2), 282 on.next(310, -3), 283 on.next(350, 4), 284 on.next(360, -5), 285 on.next(390, 6), 286 on.next(420, -7), 287 on.next(470, 8), 288 on.next(480, -9), 289 on.completed(570) 290 }); 291 292 WHEN("1 group of ints is emitted"){ 293 294 auto res = w.start( 295 [&]() { 296 return xs 297 | rxo::group_by( 298 [&](long v) { 299 ++keyInvoked; 300 return v % 2; 301 }, 302 [&](long v){ 303 ++marbleInvoked; 304 return v; 305 }) 306 | rxo::take(1) 307 | rxo::map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> { 308 ++groupEmitted; 309 return g; 310 }) 311 | rxo::merge() 312 // forget type to workaround lambda deduction bug on msvc 2013 313 | rxo::as_dynamic(); 314 } 315 ); 316 317 THEN("the output contains groups of ints"){ 318 auto required = rxu::to_vector({ 319 on.next(220, 0), 320 on.next(270, 2), 321 on.next(350, 4), 322 on.next(390, 6), 323 on.next(470, 8), 324 on.completed(570) 325 }); 326 auto actual = res.get_observer().messages(); 327 REQUIRE(required == actual); 328 } 329 330 THEN("there was one subscription and one unsubscription to the xs"){ 331 auto required = rxu::to_vector({ 332 on.subscribe(200, 570) 333 }); 334 auto actual = xs.subscriptions(); 335 REQUIRE(required == actual); 336 } 337 338 THEN("key selector was invoked for each value"){ 339 REQUIRE(10 == keyInvoked); 340 } 341 342 THEN("marble selector was invoked for each value"){ 343 REQUIRE(5 == marbleInvoked); 344 } 345 346 THEN("1 group emitted"){ 347 REQUIRE(1 == groupEmitted); 348 } 349 } 350 } 351 } 352 353 SCENARIO("group_by take 1 take 4", "[group_by][take][operators]"){ 354 GIVEN("1 hot observable of ints."){ 355 auto sc = rxsc::make_test(); 356 auto w = sc.create_worker(); 357 const rxsc::test::messages<long> on; 358 int keyInvoked = 0; 359 int marbleInvoked = 0; 360 int groupEmitted = 0; 361 362 auto xs = sc.make_hot_observable({ 363 on.next(130, -1), 364 on.next(220, 0), 365 on.next(240, -1), 366 on.next(270, 2), 367 on.next(310, -3), 368 on.next(350, 4), 369 on.next(360, -5), 370 on.next(390, 6), 371 on.next(420, -7), 372 }); 373 374 WHEN("1 group of ints is emitted"){ 375 376 auto res = w.start( 377 [&]() { 378 return xs 379 .group_by( 380 [&](long v) { 381 ++keyInvoked; 382 return v % 2; 383 }, 384 [&](long v){ 385 ++marbleInvoked; 386 return v; 387 }) 388 .take(1) 389 .map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> { 390 ++groupEmitted; 391 return g.take(4); 392 }) 393 .merge() 394 // forget type to workaround lambda deduction bug on msvc 2013 395 .as_dynamic(); 396 } 397 ); 398 399 THEN("the output contains groups of ints"){ 400 auto required = rxu::to_vector({ 401 on.next(220, 0), 402 on.next(270, 2), 403 on.next(350, 4), 404 on.next(390, 6), 405 on.completed(390) 406 }); 407 auto actual = res.get_observer().messages(); 408 REQUIRE(required == actual); 409 } 410 411 THEN("there was one subscription and one unsubscription to the xs"){ 412 auto required = rxu::to_vector({ 413 on.subscribe(200, 390) 414 }); 415 auto actual = xs.subscriptions(); 416 REQUIRE(required == actual); 417 } 418 419 THEN("key selector was invoked for each value"){ 420 REQUIRE(7 == keyInvoked); 421 } 422 423 THEN("marble selector was invoked for each value"){ 424 REQUIRE(4 == marbleInvoked); 425 } 426 427 THEN("1 group emitted"){ 428 REQUIRE(1 == groupEmitted); 429 } 430 } 431 } 432 }