Home | History | Annotate | Download | only in linesfrombytes
      1 
      2 #include <rxcpp/rx-lite.hpp>
      3 #include <rxcpp/operators/rx-reduce.hpp>
      4 #include <rxcpp/operators/rx-filter.hpp>
      5 #include <rxcpp/operators/rx-map.hpp>
      6 #include <rxcpp/operators/rx-tap.hpp>
      7 #include <rxcpp/operators/rx-concat_map.hpp>
      8 #include <rxcpp/operators/rx-flat_map.hpp>
      9 #include <rxcpp/operators/rx-concat.hpp>
     10 #include <rxcpp/operators/rx-merge.hpp>
     11 #include <rxcpp/operators/rx-repeat.hpp>
     12 #include <rxcpp/operators/rx-publish.hpp>
     13 #include <rxcpp/operators/rx-ref_count.hpp>
     14 #include <rxcpp/operators/rx-window.hpp>
     15 #include <rxcpp/operators/rx-window_toggle.hpp>
     16 #include <rxcpp/operators/rx-start_with.hpp>
     17 namespace Rx {
     18 using namespace rxcpp;
     19 using namespace rxcpp::sources;
     20 using namespace rxcpp::operators;
     21 using namespace rxcpp::util;
     22 }
     23 using namespace Rx;
     24 
     25 #include <regex>
     26 #include <random>
     27 using namespace std;
     28 using namespace std::chrono;
     29 
     30 int main()
     31 {
     32     random_device rd;   // non-deterministic generator
     33     mt19937 gen(rd());
     34     uniform_int_distribution<> dist(4, 18);
     35 
     36     // for testing purposes, produce byte stream that from lines of text
     37     auto bytes = range(0, 10) |
     38         flat_map([&](int i){
     39             auto body = from((uint8_t)('A' + i)) |
     40                 repeat(dist(gen)) |
     41                 as_dynamic();
     42             auto delim = from((uint8_t)'\r');
     43             return from(body, delim) | concat();
     44         }) |
     45         window(17) |
     46         flat_map([](observable<uint8_t> w){
     47             return w |
     48                 reduce(
     49                     vector<uint8_t>(),
     50                     [](vector<uint8_t> v, uint8_t b){
     51                         v.push_back(b);
     52                         return v;
     53                     }) |
     54                 as_dynamic();
     55         }) |
     56         tap([](vector<uint8_t>& v){
     57             // print input packet of bytes
     58             copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
     59             cout << endl;
     60         });
     61 
     62     //
     63     // recover lines of text from byte stream
     64     //
     65 
     66     auto removespaces = [](string s){
     67         s.erase(remove_if(s.begin(), s.end(), ::isspace), s.end());
     68         return s;
     69     };
     70 
     71     // create strings split on \r
     72     auto strings = bytes |
     73         concat_map([](vector<uint8_t> v){
     74             string s(v.begin(), v.end());
     75             regex delim(R"/(\r)/");
     76             cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0});
     77             cregex_token_iterator end;
     78             vector<string> splits(cursor, end);
     79             return iterate(move(splits));
     80         }) |
     81         filter([](const string& s){
     82             return !s.empty();
     83         }) |
     84         publish() |
     85         ref_count();
     86 
     87     // filter to last string in each line
     88     auto closes = strings |
     89         filter(
     90             [](const string& s){
     91                 return s.back() == '\r';
     92             }) |
     93         Rx::map([](const string&){return 0;});
     94 
     95     // group strings by line
     96     auto linewindows = strings |
     97         window_toggle(closes | start_with(0), [=](int){return closes;});
     98 
     99     // reduce the strings for a line into one string
    100     auto lines = linewindows |
    101         flat_map([&](observable<string> w) {
    102             return w | start_with<string>("") | sum() | Rx::map(removespaces);
    103         });
    104 
    105     // print result
    106     lines |
    107         subscribe<string>(println(cout));
    108 
    109     return 0;
    110 }
    111