Home | History | Annotate | Download | only in io
      1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/lib/io/table.h"
     17 
     18 #include <algorithm>
     19 #include <map>
     20 #include <string>
     21 #include <vector>
     22 #include "tensorflow/core/lib/core/errors.h"
     23 #include "tensorflow/core/lib/io/block.h"
     24 #include "tensorflow/core/lib/io/block_builder.h"
     25 #include "tensorflow/core/lib/io/format.h"
     26 #include "tensorflow/core/lib/io/iterator.h"
     27 #include "tensorflow/core/lib/io/table_builder.h"
     28 #include "tensorflow/core/lib/random/simple_philox.h"
     29 #include "tensorflow/core/lib/strings/str_util.h"
     30 #include "tensorflow/core/platform/env.h"
     31 #include "tensorflow/core/platform/snappy.h"
     32 #include "tensorflow/core/platform/test.h"
     33 
     34 namespace tensorflow {
     35 namespace table {
     36 
     37 namespace {
     38 typedef std::pair<StringPiece, StringPiece> StringPiecePair;
     39 }
     40 
     41 namespace test {
     42 static StringPiece RandomString(random::SimplePhilox* rnd, int len,
     43                                 string* dst) {
     44   dst->resize(len);
     45   for (int i = 0; i < len; i++) {
     46     (*dst)[i] = static_cast<char>(' ' + rnd->Uniform(95));  // ' ' .. '~'
     47   }
     48   return StringPiece(*dst);
     49 }
     50 static string RandomKey(random::SimplePhilox* rnd, int len) {
     51   // Make sure to generate a wide variety of characters so we
     52   // test the boundary conditions for short-key optimizations.
     53   static const char kTestChars[] = {'\0', '\1', 'a',    'b',    'c',
     54                                     'd',  'e',  '\xfd', '\xfe', '\xff'};
     55   string result;
     56   for (int i = 0; i < len; i++) {
     57     result += kTestChars[rnd->Uniform(sizeof(kTestChars))];
     58   }
     59   return result;
     60 }
     61 static StringPiece CompressibleString(random::SimplePhilox* rnd,
     62                                       double compressed_fraction, size_t len,
     63                                       string* dst) {
     64   int raw = static_cast<int>(len * compressed_fraction);
     65   if (raw < 1) raw = 1;
     66   string raw_data;
     67   RandomString(rnd, raw, &raw_data);
     68 
     69   // Duplicate the random data until we have filled "len" bytes
     70   dst->clear();
     71   while (dst->size() < len) {
     72     dst->append(raw_data);
     73   }
     74   dst->resize(len);
     75   return StringPiece(*dst);
     76 }
     77 }  // namespace test
     78 
     79 static void Increment(string* key) { key->push_back('\0'); }
     80 
     81 // An STL comparator that compares two StringPieces
     82 namespace {
     83 struct STLLessThan {
     84   STLLessThan() {}
     85   bool operator()(const string& a, const string& b) const {
     86     return StringPiece(a).compare(StringPiece(b)) < 0;
     87   }
     88 };
     89 }  // namespace
     90 
     91 class StringSink : public WritableFile {
     92  public:
     93   ~StringSink() override {}
     94 
     95   const string& contents() const { return contents_; }
     96 
     97   Status Close() override { return Status::OK(); }
     98   Status Flush() override { return Status::OK(); }
     99   Status Sync() override { return Status::OK(); }
    100 
    101   Status Append(const StringPiece& data) override {
    102     contents_.append(data.data(), data.size());
    103     return Status::OK();
    104   }
    105 
    106  private:
    107   string contents_;
    108 };
    109 
    110 class StringSource : public RandomAccessFile {
    111  public:
    112   explicit StringSource(const StringPiece& contents)
    113       : contents_(contents.data(), contents.size()), bytes_read_(0) {}
    114 
    115   ~StringSource() override {}
    116 
    117   uint64 Size() const { return contents_.size(); }
    118 
    119   Status Read(uint64 offset, size_t n, StringPiece* result,
    120               char* scratch) const override {
    121     if (offset > contents_.size()) {
    122       return errors::InvalidArgument("invalid Read offset");
    123     }
    124     if (offset + n > contents_.size()) {
    125       n = contents_.size() - offset;
    126     }
    127     memcpy(scratch, &contents_[offset], n);
    128     *result = StringPiece(scratch, n);
    129     bytes_read_ += n;
    130     return Status::OK();
    131   }
    132 
    133   uint64 BytesRead() const { return bytes_read_; }
    134 
    135  private:
    136   string contents_;
    137   mutable uint64 bytes_read_;
    138 };
    139 
    140 typedef std::map<string, string, STLLessThan> KVMap;
    141 
    142 // Helper class for tests to unify the interface between
    143 // BlockBuilder/TableBuilder and Block/Table.
    144 class Constructor {
    145  public:
    146   explicit Constructor() : data_(STLLessThan()) {}
    147   virtual ~Constructor() {}
    148 
    149   void Add(const string& key, const StringPiece& value) {
    150     data_[key] = value.ToString();
    151   }
    152 
    153   // Finish constructing the data structure with all the keys that have
    154   // been added so far.  Returns the keys in sorted order in "*keys"
    155   // and stores the key/value pairs in "*kvmap"
    156   void Finish(const Options& options, std::vector<string>* keys, KVMap* kvmap) {
    157     *kvmap = data_;
    158     keys->clear();
    159     for (KVMap::const_iterator it = data_.begin(); it != data_.end(); ++it) {
    160       keys->push_back(it->first);
    161     }
    162     data_.clear();
    163     Status s = FinishImpl(options, *kvmap);
    164     ASSERT_TRUE(s.ok()) << s.ToString();
    165   }
    166 
    167   // Construct the data structure from the data in "data"
    168   virtual Status FinishImpl(const Options& options, const KVMap& data) = 0;
    169 
    170   virtual Iterator* NewIterator() const = 0;
    171 
    172   virtual const KVMap& data() { return data_; }
    173 
    174  private:
    175   KVMap data_;
    176 };
    177 
    178 class BlockConstructor : public Constructor {
    179  public:
    180   BlockConstructor() : block_(nullptr) {}
    181   ~BlockConstructor() override { delete block_; }
    182   Status FinishImpl(const Options& options, const KVMap& data) override {
    183     delete block_;
    184     block_ = nullptr;
    185     BlockBuilder builder(&options);
    186 
    187     for (KVMap::const_iterator it = data.begin(); it != data.end(); ++it) {
    188       builder.Add(it->first, it->second);
    189     }
    190     // Open the block
    191     data_ = builder.Finish().ToString();
    192     BlockContents contents;
    193     contents.data = data_;
    194     contents.cachable = false;
    195     contents.heap_allocated = false;
    196     block_ = new Block(contents);
    197     return Status::OK();
    198   }
    199   Iterator* NewIterator() const override { return block_->NewIterator(); }
    200 
    201  private:
    202   string data_;
    203   Block* block_;
    204 };
    205 
    206 class TableConstructor : public Constructor {
    207  public:
    208   TableConstructor() : source_(nullptr), table_(nullptr) {}
    209   ~TableConstructor() override { Reset(); }
    210   Status FinishImpl(const Options& options, const KVMap& data) override {
    211     Reset();
    212     StringSink sink;
    213     TableBuilder builder(options, &sink);
    214 
    215     for (KVMap::const_iterator it = data.begin(); it != data.end(); ++it) {
    216       builder.Add(it->first, it->second);
    217       TF_CHECK_OK(builder.status());
    218     }
    219     Status s = builder.Finish();
    220     TF_CHECK_OK(s) << s.ToString();
    221 
    222     CHECK_EQ(sink.contents().size(), builder.FileSize());
    223 
    224     // Open the table
    225     source_ = new StringSource(sink.contents());
    226     Options table_options;
    227     return Table::Open(table_options, source_, sink.contents().size(), &table_);
    228   }
    229 
    230   Iterator* NewIterator() const override { return table_->NewIterator(); }
    231 
    232   uint64 ApproximateOffsetOf(const StringPiece& key) const {
    233     return table_->ApproximateOffsetOf(key);
    234   }
    235 
    236   uint64 BytesRead() const { return source_->BytesRead(); }
    237 
    238  private:
    239   void Reset() {
    240     delete table_;
    241     delete source_;
    242     table_ = nullptr;
    243     source_ = nullptr;
    244   }
    245 
    246   StringSource* source_;
    247   Table* table_;
    248 };
    249 
    250 enum TestType { TABLE_TEST, BLOCK_TEST };
    251 
    252 struct TestArgs {
    253   TestType type;
    254   int restart_interval;
    255 };
    256 
    257 static const TestArgs kTestArgList[] = {
    258     {TABLE_TEST, 16}, {TABLE_TEST, 1}, {TABLE_TEST, 1024},
    259     {BLOCK_TEST, 16}, {BLOCK_TEST, 1}, {BLOCK_TEST, 1024},
    260 };
    261 static const int kNumTestArgs = sizeof(kTestArgList) / sizeof(kTestArgList[0]);
    262 
    263 class Harness : public ::testing::Test {
    264  public:
    265   Harness() : constructor_(nullptr) {}
    266 
    267   void Init(const TestArgs& args) {
    268     delete constructor_;
    269     constructor_ = nullptr;
    270     options_ = Options();
    271 
    272     options_.block_restart_interval = args.restart_interval;
    273     // Use shorter block size for tests to exercise block boundary
    274     // conditions more.
    275     options_.block_size = 256;
    276     switch (args.type) {
    277       case TABLE_TEST:
    278         constructor_ = new TableConstructor();
    279         break;
    280       case BLOCK_TEST:
    281         constructor_ = new BlockConstructor();
    282         break;
    283     }
    284   }
    285 
    286   ~Harness() override { delete constructor_; }
    287 
    288   void Add(const string& key, const string& value) {
    289     constructor_->Add(key, value);
    290   }
    291 
    292   void Test(random::SimplePhilox* rnd, int num_random_access_iters = 200) {
    293     std::vector<string> keys;
    294     KVMap data;
    295     constructor_->Finish(options_, &keys, &data);
    296 
    297     TestForwardScan(keys, data);
    298     TestRandomAccess(rnd, keys, data, num_random_access_iters);
    299   }
    300 
    301   void TestForwardScan(const std::vector<string>& keys, const KVMap& data) {
    302     Iterator* iter = constructor_->NewIterator();
    303     ASSERT_TRUE(!iter->Valid());
    304     iter->SeekToFirst();
    305     for (KVMap::const_iterator model_iter = data.begin();
    306          model_iter != data.end(); ++model_iter) {
    307       ASSERT_EQ(ToStringPiecePair(data, model_iter), ToStringPiecePair(iter));
    308       iter->Next();
    309     }
    310     ASSERT_TRUE(!iter->Valid());
    311     delete iter;
    312   }
    313 
    314   void TestRandomAccess(random::SimplePhilox* rnd,
    315                         const std::vector<string>& keys, const KVMap& data,
    316                         int num_random_access_iters) {
    317     static const bool kVerbose = false;
    318     Iterator* iter = constructor_->NewIterator();
    319     ASSERT_TRUE(!iter->Valid());
    320     KVMap::const_iterator model_iter = data.begin();
    321     if (kVerbose) fprintf(stderr, "---\n");
    322     for (int i = 0; i < num_random_access_iters; i++) {
    323       const int toss = rnd->Uniform(3);
    324       switch (toss) {
    325         case 0: {
    326           if (iter->Valid()) {
    327             if (kVerbose) fprintf(stderr, "Next\n");
    328             iter->Next();
    329             ++model_iter;
    330             ASSERT_EQ(ToStringPiecePair(data, model_iter),
    331                       ToStringPiecePair(iter));
    332           }
    333           break;
    334         }
    335 
    336         case 1: {
    337           if (kVerbose) fprintf(stderr, "SeekToFirst\n");
    338           iter->SeekToFirst();
    339           model_iter = data.begin();
    340           ASSERT_EQ(ToStringPiecePair(data, model_iter),
    341                     ToStringPiecePair(iter));
    342           break;
    343         }
    344 
    345         case 2: {
    346           string key = PickRandomKey(rnd, keys);
    347           model_iter = data.lower_bound(key);
    348           if (kVerbose)
    349             fprintf(stderr, "Seek '%s'\n", str_util::CEscape(key).c_str());
    350           iter->Seek(StringPiece(key));
    351           ASSERT_EQ(ToStringPiecePair(data, model_iter),
    352                     ToStringPiecePair(iter));
    353           break;
    354         }
    355       }
    356     }
    357     delete iter;
    358   }
    359 
    360   StringPiecePair ToStringPiecePair(const KVMap& data,
    361                                     const KVMap::const_iterator& it) {
    362     if (it == data.end()) {
    363       return StringPiecePair("END", "");
    364     } else {
    365       return StringPiecePair(it->first, it->second);
    366     }
    367   }
    368 
    369   StringPiecePair ToStringPiecePair(const KVMap& data,
    370                                     const KVMap::const_reverse_iterator& it) {
    371     if (it == data.rend()) {
    372       return StringPiecePair("END", "");
    373     } else {
    374       return StringPiecePair(it->first, it->second);
    375     }
    376   }
    377 
    378   StringPiecePair ToStringPiecePair(const Iterator* it) {
    379     if (!it->Valid()) {
    380       return StringPiecePair("END", "");
    381     } else {
    382       return StringPiecePair(it->key(), it->value());
    383     }
    384   }
    385 
    386   string PickRandomKey(random::SimplePhilox* rnd,
    387                        const std::vector<string>& keys) {
    388     if (keys.empty()) {
    389       return "foo";
    390     } else {
    391       const int index = rnd->Uniform(keys.size());
    392       string result = keys[index];
    393       switch (rnd->Uniform(3)) {
    394         case 0:
    395           // Return an existing key
    396           break;
    397         case 1: {
    398           // Attempt to return something smaller than an existing key
    399           if (!result.empty() && result[result.size() - 1] > '\0') {
    400             result[result.size() - 1]--;
    401           }
    402           break;
    403         }
    404         case 2: {
    405           // Return something larger than an existing key
    406           Increment(&result);
    407           break;
    408         }
    409       }
    410       return result;
    411     }
    412   }
    413 
    414  private:
    415   Options options_;
    416   Constructor* constructor_;
    417 };
    418 
    419 // Test empty table/block.
    420 TEST_F(Harness, Empty) {
    421   for (int i = 0; i < kNumTestArgs; i++) {
    422     Init(kTestArgList[i]);
    423     random::PhiloxRandom philox(testing::RandomSeed() + 1, 17);
    424     random::SimplePhilox rnd(&philox);
    425     Test(&rnd);
    426   }
    427 }
    428 
    429 // Special test for a block with no restart entries.  The C++ leveldb
    430 // code never generates such blocks, but the Java version of leveldb
    431 // seems to.
    432 TEST_F(Harness, ZeroRestartPointsInBlock) {
    433   char data[sizeof(uint32)];
    434   memset(data, 0, sizeof(data));
    435   BlockContents contents;
    436   contents.data = StringPiece(data, sizeof(data));
    437   contents.cachable = false;
    438   contents.heap_allocated = false;
    439   Block block(contents);
    440   Iterator* iter = block.NewIterator();
    441   iter->SeekToFirst();
    442   ASSERT_TRUE(!iter->Valid());
    443   iter->Seek("foo");
    444   ASSERT_TRUE(!iter->Valid());
    445   delete iter;
    446 }
    447 
    448 // Test the empty key
    449 TEST_F(Harness, SimpleEmptyKey) {
    450   for (int i = 0; i < kNumTestArgs; i++) {
    451     Init(kTestArgList[i]);
    452     random::PhiloxRandom philox(testing::RandomSeed() + 1, 17);
    453     random::SimplePhilox rnd(&philox);
    454     Add("", "v");
    455     Test(&rnd);
    456   }
    457 }
    458 
    459 TEST_F(Harness, SimpleSingle) {
    460   for (int i = 0; i < kNumTestArgs; i++) {
    461     Init(kTestArgList[i]);
    462     random::PhiloxRandom philox(testing::RandomSeed() + 2, 17);
    463     random::SimplePhilox rnd(&philox);
    464     Add("abc", "v");
    465     Test(&rnd);
    466   }
    467 }
    468 
    469 TEST_F(Harness, SimpleMulti) {
    470   for (int i = 0; i < kNumTestArgs; i++) {
    471     Init(kTestArgList[i]);
    472     random::PhiloxRandom philox(testing::RandomSeed() + 3, 17);
    473     random::SimplePhilox rnd(&philox);
    474     Add("abc", "v");
    475     Add("abcd", "v");
    476     Add("ac", "v2");
    477     Test(&rnd);
    478   }
    479 }
    480 
    481 TEST_F(Harness, SimpleMultiBigValues) {
    482   for (int i = 0; i < kNumTestArgs; i++) {
    483     Init(kTestArgList[i]);
    484     random::PhiloxRandom philox(testing::RandomSeed() + 3, 17);
    485     random::SimplePhilox rnd(&philox);
    486     Add("ainitial", "tiny");
    487     Add("anext", string(10000000, 'a'));
    488     Add("anext2", string(10000000, 'b'));
    489     Add("azz", "tiny");
    490     Test(&rnd, 100 /* num_random_access_iters */);
    491   }
    492 }
    493 
    494 TEST_F(Harness, SimpleSpecialKey) {
    495   for (int i = 0; i < kNumTestArgs; i++) {
    496     Init(kTestArgList[i]);
    497     random::PhiloxRandom philox(testing::RandomSeed() + 4, 17);
    498     random::SimplePhilox rnd(&philox);
    499     Add("\xff\xff", "v3");
    500     Test(&rnd);
    501   }
    502 }
    503 
    504 TEST_F(Harness, Randomized) {
    505   for (int i = 0; i < kNumTestArgs; i++) {
    506     Init(kTestArgList[i]);
    507     random::PhiloxRandom philox(testing::RandomSeed() + 5, 17);
    508     random::SimplePhilox rnd(&philox);
    509     for (int num_entries = 0; num_entries < 2000;
    510          num_entries += (num_entries < 50 ? 1 : 200)) {
    511       if ((num_entries % 10) == 0) {
    512         fprintf(stderr, "case %d of %d: num_entries = %d\n", (i + 1),
    513                 int(kNumTestArgs), num_entries);
    514       }
    515       for (int e = 0; e < num_entries; e++) {
    516         string v;
    517         Add(test::RandomKey(&rnd, rnd.Skewed(4)),
    518             test::RandomString(&rnd, rnd.Skewed(5), &v).ToString());
    519       }
    520       Test(&rnd);
    521     }
    522   }
    523 }
    524 
    525 static bool Between(uint64 val, uint64 low, uint64 high) {
    526   bool result = (val >= low) && (val <= high);
    527   if (!result) {
    528     fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n",
    529             static_cast<unsigned long long>(val),
    530             static_cast<unsigned long long>(low),
    531             static_cast<unsigned long long>(high));
    532   }
    533   return result;
    534 }
    535 
    536 class TableTest {};
    537 
    538 TEST(TableTest, ApproximateOffsetOfPlain) {
    539   TableConstructor c;
    540   c.Add("k01", "hello");
    541   c.Add("k02", "hello2");
    542   c.Add("k03", string(10000, 'x'));
    543   c.Add("k04", string(200000, 'x'));
    544   c.Add("k05", string(300000, 'x'));
    545   c.Add("k06", "hello3");
    546   c.Add("k07", string(100000, 'x'));
    547   std::vector<string> keys;
    548   KVMap kvmap;
    549   Options options;
    550   options.block_size = 1024;
    551   options.compression = kNoCompression;
    552   c.Finish(options, &keys, &kvmap);
    553 
    554   ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0));
    555   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, 0));
    556   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01a"), 0, 0));
    557   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"), 0, 0));
    558   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), 10, 500));
    559   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), 10000, 11000));
    560   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04a"), 210000, 211000));
    561   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k05"), 210000, 211000));
    562   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k06"), 510000, 511000));
    563   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k07"), 510000, 511000));
    564   ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000));
    565 }
    566 
    567 static bool SnappyCompressionSupported() {
    568   string out;
    569   StringPiece in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
    570   return port::Snappy_Compress(in.data(), in.size(), &out);
    571 }
    572 
    573 TEST(TableTest, ApproximateOffsetOfCompressed) {
    574   if (!SnappyCompressionSupported()) {
    575     fprintf(stderr, "skipping compression tests\n");
    576     return;
    577   }
    578 
    579   random::PhiloxRandom philox(301, 17);
    580   random::SimplePhilox rnd(&philox);
    581   TableConstructor c;
    582   string tmp;
    583   c.Add("k01", "hello");
    584   c.Add("k02", test::CompressibleString(&rnd, 0.25, 10000, &tmp));
    585   c.Add("k03", "hello3");
    586   c.Add("k04", test::CompressibleString(&rnd, 0.25, 10000, &tmp));
    587   std::vector<string> keys;
    588   KVMap kvmap;
    589   Options options;
    590   options.block_size = 1024;
    591   options.compression = kSnappyCompression;
    592   c.Finish(options, &keys, &kvmap);
    593   ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0));
    594   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, 0));
    595   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"), 10, 100));
    596   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), 2000, 4000));
    597   ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), 2000, 4000));
    598   ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 4000, 7000));
    599 }
    600 
    601 TEST(TableTest, SeekToFirstKeyDoesNotReadTooMuch) {
    602   random::PhiloxRandom philox(301, 17);
    603   random::SimplePhilox rnd(&philox);
    604   string tmp;
    605   TableConstructor c;
    606   c.Add("k01", "firstvalue");
    607   c.Add("k03", test::CompressibleString(&rnd, 0.25, 1000000, &tmp));
    608   c.Add("k04", "abc");
    609   std::vector<string> keys;
    610   KVMap kvmap;
    611   Options options;
    612   options.block_size = 1024;
    613   options.compression = kNoCompression;
    614   c.Finish(options, &keys, &kvmap);
    615 
    616   Iterator* iter = c.NewIterator();
    617   iter->Seek("k01");
    618   delete iter;
    619   // Make sure we don't read the big second block when just trying to
    620   // retrieve the data in the first key
    621   EXPECT_LT(c.BytesRead(), 200);
    622 }
    623 
    624 }  // namespace table
    625 }  // namespace tensorflow
    626