1 2 #include <rxcpp/rx-lite.hpp> 3 #include <rxcpp/operators/rx-reduce.hpp> 4 #include <rxcpp/operators/rx-filter.hpp> 5 #include <rxcpp/operators/rx-map.hpp> 6 #include <rxcpp/operators/rx-tap.hpp> 7 #include <rxcpp/operators/rx-concat_map.hpp> 8 #include <rxcpp/operators/rx-flat_map.hpp> 9 #include <rxcpp/operators/rx-concat.hpp> 10 #include <rxcpp/operators/rx-merge.hpp> 11 #include <rxcpp/operators/rx-repeat.hpp> 12 #include <rxcpp/operators/rx-publish.hpp> 13 #include <rxcpp/operators/rx-ref_count.hpp> 14 #include <rxcpp/operators/rx-window.hpp> 15 #include <rxcpp/operators/rx-window_toggle.hpp> 16 #include <rxcpp/operators/rx-start_with.hpp> 17 namespace Rx { 18 using namespace rxcpp; 19 using namespace rxcpp::sources; 20 using namespace rxcpp::operators; 21 using namespace rxcpp::util; 22 } 23 using namespace Rx; 24 25 #include <regex> 26 #include <random> 27 using namespace std; 28 using namespace std::chrono; 29 30 int main() 31 { 32 random_device rd; // non-deterministic generator 33 mt19937 gen(rd()); 34 uniform_int_distribution<> dist(4, 18); 35 36 // for testing purposes, produce byte stream that from lines of text 37 auto bytes = range(0, 10) | 38 flat_map([&](int i){ 39 auto body = from((uint8_t)('A' + i)) | 40 repeat(dist(gen)) | 41 as_dynamic(); 42 auto delim = from((uint8_t)'\r'); 43 return from(body, delim) | concat(); 44 }) | 45 window(17) | 46 flat_map([](observable<uint8_t> w){ 47 return w | 48 reduce( 49 vector<uint8_t>(), 50 [](vector<uint8_t> v, uint8_t b){ 51 v.push_back(b); 52 return v; 53 }) | 54 as_dynamic(); 55 }) | 56 tap([](vector<uint8_t>& v){ 57 // print input packet of bytes 58 copy(v.begin(), v.end(), ostream_iterator<long>(cout, " ")); 59 cout << endl; 60 }); 61 62 // 63 // recover lines of text from byte stream 64 // 65 66 auto removespaces = [](string s){ 67 s.erase(remove_if(s.begin(), s.end(), ::isspace), s.end()); 68 return s; 69 }; 70 71 // create strings split on \r 72 auto strings = bytes | 73 concat_map([](vector<uint8_t> v){ 74 string s(v.begin(), v.end()); 75 regex delim(R"/(\r)/"); 76 cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0}); 77 cregex_token_iterator end; 78 vector<string> splits(cursor, end); 79 return iterate(move(splits)); 80 }) | 81 filter([](const string& s){ 82 return !s.empty(); 83 }) | 84 publish() | 85 ref_count(); 86 87 // filter to last string in each line 88 auto closes = strings | 89 filter( 90 [](const string& s){ 91 return s.back() == '\r'; 92 }) | 93 Rx::map([](const string&){return 0;}); 94 95 // group strings by line 96 auto linewindows = strings | 97 window_toggle(closes | start_with(0), [=](int){return closes;}); 98 99 // reduce the strings for a line into one string 100 auto lines = linewindows | 101 flat_map([&](observable<string> w) { 102 return w | start_with<string>("") | sum() | Rx::map(removespaces); 103 }); 104 105 // print result 106 lines | 107 subscribe<string>(println(cout)); 108 109 return 0; 110 } 111