Home | History | Annotate | Download | only in db
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include "leveldb/db.h"
      6 #include "leveldb/filter_policy.h"
      7 #include "db/db_impl.h"
      8 #include "db/filename.h"
      9 #include "db/version_set.h"
     10 #include "db/write_batch_internal.h"
     11 #include "leveldb/cache.h"
     12 #include "leveldb/env.h"
     13 #include "leveldb/table.h"
     14 #include "util/hash.h"
     15 #include "util/logging.h"
     16 #include "util/mutexlock.h"
     17 #include "util/testharness.h"
     18 #include "util/testutil.h"
     19 
     20 namespace leveldb {
     21 
     22 static std::string RandomString(Random* rnd, int len) {
     23   std::string r;
     24   test::RandomString(rnd, len, &r);
     25   return r;
     26 }
     27 
     28 namespace {
     29 class AtomicCounter {
     30  private:
     31   port::Mutex mu_;
     32   int count_;
     33  public:
     34   AtomicCounter() : count_(0) { }
     35   void Increment() {
     36     IncrementBy(1);
     37   }
     38   void IncrementBy(int count) {
     39     MutexLock l(&mu_);
     40     count_ += count;
     41   }
     42   int Read() {
     43     MutexLock l(&mu_);
     44     return count_;
     45   }
     46   void Reset() {
     47     MutexLock l(&mu_);
     48     count_ = 0;
     49   }
     50 };
     51 
     52 void DelayMilliseconds(int millis) {
     53   Env::Default()->SleepForMicroseconds(millis * 1000);
     54 }
     55 }
     56 
     57 // Special Env used to delay background operations
     58 class SpecialEnv : public EnvWrapper {
     59  public:
     60   // sstable Sync() calls are blocked while this pointer is non-NULL.
     61   port::AtomicPointer delay_sstable_sync_;
     62 
     63   // Simulate no-space errors while this pointer is non-NULL.
     64   port::AtomicPointer no_space_;
     65 
     66   // Simulate non-writable file system while this pointer is non-NULL
     67   port::AtomicPointer non_writable_;
     68 
     69   // Force sync of manifest files to fail while this pointer is non-NULL
     70   port::AtomicPointer manifest_sync_error_;
     71 
     72   // Force write to manifest files to fail while this pointer is non-NULL
     73   port::AtomicPointer manifest_write_error_;
     74 
     75   bool count_random_reads_;
     76   AtomicCounter random_read_counter_;
     77 
     78   AtomicCounter sleep_counter_;
     79   AtomicCounter sleep_time_counter_;
     80 
     81   explicit SpecialEnv(Env* base) : EnvWrapper(base) {
     82     delay_sstable_sync_.Release_Store(NULL);
     83     no_space_.Release_Store(NULL);
     84     non_writable_.Release_Store(NULL);
     85     count_random_reads_ = false;
     86     manifest_sync_error_.Release_Store(NULL);
     87     manifest_write_error_.Release_Store(NULL);
     88   }
     89 
     90   Status NewWritableFile(const std::string& f, WritableFile** r) {
     91     class SSTableFile : public WritableFile {
     92      private:
     93       SpecialEnv* env_;
     94       WritableFile* base_;
     95 
     96      public:
     97       SSTableFile(SpecialEnv* env, WritableFile* base)
     98           : env_(env),
     99             base_(base) {
    100       }
    101       ~SSTableFile() { delete base_; }
    102       Status Append(const Slice& data) {
    103         if (env_->no_space_.Acquire_Load() != NULL) {
    104           // Drop writes on the floor
    105           return Status::OK();
    106         } else {
    107           return base_->Append(data);
    108         }
    109       }
    110       Status Close() { return base_->Close(); }
    111       Status Flush() { return base_->Flush(); }
    112       Status Sync() {
    113         while (env_->delay_sstable_sync_.Acquire_Load() != NULL) {
    114           DelayMilliseconds(100);
    115         }
    116         return base_->Sync();
    117       }
    118     };
    119     class ManifestFile : public WritableFile {
    120      private:
    121       SpecialEnv* env_;
    122       WritableFile* base_;
    123      public:
    124       ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
    125       ~ManifestFile() { delete base_; }
    126       Status Append(const Slice& data) {
    127         if (env_->manifest_write_error_.Acquire_Load() != NULL) {
    128           return Status::IOError("simulated writer error");
    129         } else {
    130           return base_->Append(data);
    131         }
    132       }
    133       Status Close() { return base_->Close(); }
    134       Status Flush() { return base_->Flush(); }
    135       Status Sync() {
    136         if (env_->manifest_sync_error_.Acquire_Load() != NULL) {
    137           return Status::IOError("simulated sync error");
    138         } else {
    139           return base_->Sync();
    140         }
    141       }
    142     };
    143 
    144     if (non_writable_.Acquire_Load() != NULL) {
    145       return Status::IOError("simulated write error");
    146     }
    147 
    148     Status s = target()->NewWritableFile(f, r);
    149     if (s.ok()) {
    150       if (strstr(f.c_str(), ".sst") != NULL) {
    151         *r = new SSTableFile(this, *r);
    152       } else if (strstr(f.c_str(), "MANIFEST") != NULL) {
    153         *r = new ManifestFile(this, *r);
    154       }
    155     }
    156     return s;
    157   }
    158 
    159   Status NewRandomAccessFile(const std::string& f, RandomAccessFile** r) {
    160     class CountingFile : public RandomAccessFile {
    161      private:
    162       RandomAccessFile* target_;
    163       AtomicCounter* counter_;
    164      public:
    165       CountingFile(RandomAccessFile* target, AtomicCounter* counter)
    166           : target_(target), counter_(counter) {
    167       }
    168       virtual ~CountingFile() { delete target_; }
    169       virtual Status Read(uint64_t offset, size_t n, Slice* result,
    170                           char* scratch) const {
    171         counter_->Increment();
    172         return target_->Read(offset, n, result, scratch);
    173       }
    174     };
    175 
    176     Status s = target()->NewRandomAccessFile(f, r);
    177     if (s.ok() && count_random_reads_) {
    178       *r = new CountingFile(*r, &random_read_counter_);
    179     }
    180     return s;
    181   }
    182 
    183   virtual void SleepForMicroseconds(int micros) {
    184     sleep_counter_.Increment();
    185     sleep_time_counter_.IncrementBy(micros);
    186   }
    187 
    188 };
    189 
    190 class DBTest {
    191  private:
    192   const FilterPolicy* filter_policy_;
    193 
    194   // Sequence of option configurations to try
    195   enum OptionConfig {
    196     kDefault,
    197     kFilter,
    198     kUncompressed,
    199     kEnd
    200   };
    201   int option_config_;
    202 
    203  public:
    204   std::string dbname_;
    205   SpecialEnv* env_;
    206   DB* db_;
    207 
    208   Options last_options_;
    209 
    210   DBTest() : option_config_(kDefault),
    211              env_(new SpecialEnv(Env::Default())) {
    212     filter_policy_ = NewBloomFilterPolicy(10);
    213     dbname_ = test::TmpDir() + "/db_test";
    214     DestroyDB(dbname_, Options());
    215     db_ = NULL;
    216     Reopen();
    217   }
    218 
    219   ~DBTest() {
    220     delete db_;
    221     DestroyDB(dbname_, Options());
    222     delete env_;
    223     delete filter_policy_;
    224   }
    225 
    226   // Switch to a fresh database with the next option configuration to
    227   // test.  Return false if there are no more configurations to test.
    228   bool ChangeOptions() {
    229     option_config_++;
    230     if (option_config_ >= kEnd) {
    231       return false;
    232     } else {
    233       DestroyAndReopen();
    234       return true;
    235     }
    236   }
    237 
    238   // Return the current option configuration.
    239   Options CurrentOptions() {
    240     Options options;
    241     switch (option_config_) {
    242       case kFilter:
    243         options.filter_policy = filter_policy_;
    244         break;
    245       case kUncompressed:
    246         options.compression = kNoCompression;
    247         break;
    248       default:
    249         break;
    250     }
    251     return options;
    252   }
    253 
    254   DBImpl* dbfull() {
    255     return reinterpret_cast<DBImpl*>(db_);
    256   }
    257 
    258   void Reopen(Options* options = NULL) {
    259     ASSERT_OK(TryReopen(options));
    260   }
    261 
    262   void Close() {
    263     delete db_;
    264     db_ = NULL;
    265   }
    266 
    267   void DestroyAndReopen(Options* options = NULL) {
    268     delete db_;
    269     db_ = NULL;
    270     DestroyDB(dbname_, Options());
    271     ASSERT_OK(TryReopen(options));
    272   }
    273 
    274   Status TryReopen(Options* options) {
    275     delete db_;
    276     db_ = NULL;
    277     Options opts;
    278     if (options != NULL) {
    279       opts = *options;
    280     } else {
    281       opts = CurrentOptions();
    282       opts.create_if_missing = true;
    283     }
    284     last_options_ = opts;
    285 
    286     return DB::Open(opts, dbname_, &db_);
    287   }
    288 
    289   Status Put(const std::string& k, const std::string& v) {
    290     return db_->Put(WriteOptions(), k, v);
    291   }
    292 
    293   Status Delete(const std::string& k) {
    294     return db_->Delete(WriteOptions(), k);
    295   }
    296 
    297   std::string Get(const std::string& k, const Snapshot* snapshot = NULL) {
    298     ReadOptions options;
    299     options.snapshot = snapshot;
    300     std::string result;
    301     Status s = db_->Get(options, k, &result);
    302     if (s.IsNotFound()) {
    303       result = "NOT_FOUND";
    304     } else if (!s.ok()) {
    305       result = s.ToString();
    306     }
    307     return result;
    308   }
    309 
    310   // Return a string that contains all key,value pairs in order,
    311   // formatted like "(k1->v1)(k2->v2)".
    312   std::string Contents() {
    313     std::vector<std::string> forward;
    314     std::string result;
    315     Iterator* iter = db_->NewIterator(ReadOptions());
    316     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    317       std::string s = IterStatus(iter);
    318       result.push_back('(');
    319       result.append(s);
    320       result.push_back(')');
    321       forward.push_back(s);
    322     }
    323 
    324     // Check reverse iteration results are the reverse of forward results
    325     int matched = 0;
    326     for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
    327       ASSERT_LT(matched, forward.size());
    328       ASSERT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]);
    329       matched++;
    330     }
    331     ASSERT_EQ(matched, forward.size());
    332 
    333     delete iter;
    334     return result;
    335   }
    336 
    337   std::string AllEntriesFor(const Slice& user_key) {
    338     Iterator* iter = dbfull()->TEST_NewInternalIterator();
    339     InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
    340     iter->Seek(target.Encode());
    341     std::string result;
    342     if (!iter->status().ok()) {
    343       result = iter->status().ToString();
    344     } else {
    345       result = "[ ";
    346       bool first = true;
    347       while (iter->Valid()) {
    348         ParsedInternalKey ikey;
    349         if (!ParseInternalKey(iter->key(), &ikey)) {
    350           result += "CORRUPTED";
    351         } else {
    352           if (last_options_.comparator->Compare(ikey.user_key, user_key) != 0) {
    353             break;
    354           }
    355           if (!first) {
    356             result += ", ";
    357           }
    358           first = false;
    359           switch (ikey.type) {
    360             case kTypeValue:
    361               result += iter->value().ToString();
    362               break;
    363             case kTypeDeletion:
    364               result += "DEL";
    365               break;
    366           }
    367         }
    368         iter->Next();
    369       }
    370       if (!first) {
    371         result += " ";
    372       }
    373       result += "]";
    374     }
    375     delete iter;
    376     return result;
    377   }
    378 
    379   int NumTableFilesAtLevel(int level) {
    380     std::string property;
    381     ASSERT_TRUE(
    382         db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level),
    383                          &property));
    384     return atoi(property.c_str());
    385   }
    386 
    387   int TotalTableFiles() {
    388     int result = 0;
    389     for (int level = 0; level < config::kNumLevels; level++) {
    390       result += NumTableFilesAtLevel(level);
    391     }
    392     return result;
    393   }
    394 
    395   // Return spread of files per level
    396   std::string FilesPerLevel() {
    397     std::string result;
    398     int last_non_zero_offset = 0;
    399     for (int level = 0; level < config::kNumLevels; level++) {
    400       int f = NumTableFilesAtLevel(level);
    401       char buf[100];
    402       snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
    403       result += buf;
    404       if (f > 0) {
    405         last_non_zero_offset = result.size();
    406       }
    407     }
    408     result.resize(last_non_zero_offset);
    409     return result;
    410   }
    411 
    412   int CountFiles() {
    413     std::vector<std::string> files;
    414     env_->GetChildren(dbname_, &files);
    415     return static_cast<int>(files.size());
    416   }
    417 
    418   uint64_t Size(const Slice& start, const Slice& limit) {
    419     Range r(start, limit);
    420     uint64_t size;
    421     db_->GetApproximateSizes(&r, 1, &size);
    422     return size;
    423   }
    424 
    425   void Compact(const Slice& start, const Slice& limit) {
    426     db_->CompactRange(&start, &limit);
    427   }
    428 
    429   // Do n memtable compactions, each of which produces an sstable
    430   // covering the range [small,large].
    431   void MakeTables(int n, const std::string& small, const std::string& large) {
    432     for (int i = 0; i < n; i++) {
    433       Put(small, "begin");
    434       Put(large, "end");
    435       dbfull()->TEST_CompactMemTable();
    436     }
    437   }
    438 
    439   // Prevent pushing of new sstables into deeper levels by adding
    440   // tables that cover a specified range to all levels.
    441   void FillLevels(const std::string& smallest, const std::string& largest) {
    442     MakeTables(config::kNumLevels, smallest, largest);
    443   }
    444 
    445   void DumpFileCounts(const char* label) {
    446     fprintf(stderr, "---\n%s:\n", label);
    447     fprintf(stderr, "maxoverlap: %lld\n",
    448             static_cast<long long>(
    449                 dbfull()->TEST_MaxNextLevelOverlappingBytes()));
    450     for (int level = 0; level < config::kNumLevels; level++) {
    451       int num = NumTableFilesAtLevel(level);
    452       if (num > 0) {
    453         fprintf(stderr, "  level %3d : %d files\n", level, num);
    454       }
    455     }
    456   }
    457 
    458   std::string DumpSSTableList() {
    459     std::string property;
    460     db_->GetProperty("leveldb.sstables", &property);
    461     return property;
    462   }
    463 
    464   std::string IterStatus(Iterator* iter) {
    465     std::string result;
    466     if (iter->Valid()) {
    467       result = iter->key().ToString() + "->" + iter->value().ToString();
    468     } else {
    469       result = "(invalid)";
    470     }
    471     return result;
    472   }
    473 
    474   bool DeleteAnSSTFile() {
    475     std::vector<std::string> filenames;
    476     ASSERT_OK(env_->GetChildren(dbname_, &filenames));
    477     uint64_t number;
    478     FileType type;
    479     for (size_t i = 0; i < filenames.size(); i++) {
    480       if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) {
    481         ASSERT_OK(env_->DeleteFile(TableFileName(dbname_, number)));
    482         return true;
    483       }
    484     }
    485     return false;
    486   }
    487 };
    488 
    489 TEST(DBTest, Empty) {
    490   do {
    491     ASSERT_TRUE(db_ != NULL);
    492     ASSERT_EQ("NOT_FOUND", Get("foo"));
    493   } while (ChangeOptions());
    494 }
    495 
    496 TEST(DBTest, ReadWrite) {
    497   do {
    498     ASSERT_OK(Put("foo", "v1"));
    499     ASSERT_EQ("v1", Get("foo"));
    500     ASSERT_OK(Put("bar", "v2"));
    501     ASSERT_OK(Put("foo", "v3"));
    502     ASSERT_EQ("v3", Get("foo"));
    503     ASSERT_EQ("v2", Get("bar"));
    504   } while (ChangeOptions());
    505 }
    506 
    507 TEST(DBTest, PutDeleteGet) {
    508   do {
    509     ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
    510     ASSERT_EQ("v1", Get("foo"));
    511     ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
    512     ASSERT_EQ("v2", Get("foo"));
    513     ASSERT_OK(db_->Delete(WriteOptions(), "foo"));
    514     ASSERT_EQ("NOT_FOUND", Get("foo"));
    515   } while (ChangeOptions());
    516 }
    517 
    518 TEST(DBTest, GetFromImmutableLayer) {
    519   do {
    520     Options options = CurrentOptions();
    521     options.env = env_;
    522     options.write_buffer_size = 100000;  // Small write buffer
    523     Reopen(&options);
    524 
    525     ASSERT_OK(Put("foo", "v1"));
    526     ASSERT_EQ("v1", Get("foo"));
    527 
    528     env_->delay_sstable_sync_.Release_Store(env_);   // Block sync calls
    529     Put("k1", std::string(100000, 'x'));             // Fill memtable
    530     Put("k2", std::string(100000, 'y'));             // Trigger compaction
    531     ASSERT_EQ("v1", Get("foo"));
    532     env_->delay_sstable_sync_.Release_Store(NULL);   // Release sync calls
    533   } while (ChangeOptions());
    534 }
    535 
    536 TEST(DBTest, GetFromVersions) {
    537   do {
    538     ASSERT_OK(Put("foo", "v1"));
    539     dbfull()->TEST_CompactMemTable();
    540     ASSERT_EQ("v1", Get("foo"));
    541   } while (ChangeOptions());
    542 }
    543 
    544 TEST(DBTest, GetSnapshot) {
    545   do {
    546     // Try with both a short key and a long key
    547     for (int i = 0; i < 2; i++) {
    548       std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
    549       ASSERT_OK(Put(key, "v1"));
    550       const Snapshot* s1 = db_->GetSnapshot();
    551       ASSERT_OK(Put(key, "v2"));
    552       ASSERT_EQ("v2", Get(key));
    553       ASSERT_EQ("v1", Get(key, s1));
    554       dbfull()->TEST_CompactMemTable();
    555       ASSERT_EQ("v2", Get(key));
    556       ASSERT_EQ("v1", Get(key, s1));
    557       db_->ReleaseSnapshot(s1);
    558     }
    559   } while (ChangeOptions());
    560 }
    561 
    562 TEST(DBTest, GetLevel0Ordering) {
    563   do {
    564     // Check that we process level-0 files in correct order.  The code
    565     // below generates two level-0 files where the earlier one comes
    566     // before the later one in the level-0 file list since the earlier
    567     // one has a smaller "smallest" key.
    568     ASSERT_OK(Put("bar", "b"));
    569     ASSERT_OK(Put("foo", "v1"));
    570     dbfull()->TEST_CompactMemTable();
    571     ASSERT_OK(Put("foo", "v2"));
    572     dbfull()->TEST_CompactMemTable();
    573     ASSERT_EQ("v2", Get("foo"));
    574   } while (ChangeOptions());
    575 }
    576 
    577 TEST(DBTest, GetOrderedByLevels) {
    578   do {
    579     ASSERT_OK(Put("foo", "v1"));
    580     Compact("a", "z");
    581     ASSERT_EQ("v1", Get("foo"));
    582     ASSERT_OK(Put("foo", "v2"));
    583     ASSERT_EQ("v2", Get("foo"));
    584     dbfull()->TEST_CompactMemTable();
    585     ASSERT_EQ("v2", Get("foo"));
    586   } while (ChangeOptions());
    587 }
    588 
    589 TEST(DBTest, GetPicksCorrectFile) {
    590   do {
    591     // Arrange to have multiple files in a non-level-0 level.
    592     ASSERT_OK(Put("a", "va"));
    593     Compact("a", "b");
    594     ASSERT_OK(Put("x", "vx"));
    595     Compact("x", "y");
    596     ASSERT_OK(Put("f", "vf"));
    597     Compact("f", "g");
    598     ASSERT_EQ("va", Get("a"));
    599     ASSERT_EQ("vf", Get("f"));
    600     ASSERT_EQ("vx", Get("x"));
    601   } while (ChangeOptions());
    602 }
    603 
    604 TEST(DBTest, GetEncountersEmptyLevel) {
    605   do {
    606     // Arrange for the following to happen:
    607     //   * sstable A in level 0
    608     //   * nothing in level 1
    609     //   * sstable B in level 2
    610     // Then do enough Get() calls to arrange for an automatic compaction
    611     // of sstable A.  A bug would cause the compaction to be marked as
    612     // occuring at level 1 (instead of the correct level 0).
    613 
    614     // Step 1: First place sstables in levels 0 and 2
    615     int compaction_count = 0;
    616     while (NumTableFilesAtLevel(0) == 0 ||
    617            NumTableFilesAtLevel(2) == 0) {
    618       ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2";
    619       compaction_count++;
    620       Put("a", "begin");
    621       Put("z", "end");
    622       dbfull()->TEST_CompactMemTable();
    623     }
    624 
    625     // Step 2: clear level 1 if necessary.
    626     dbfull()->TEST_CompactRange(1, NULL, NULL);
    627     ASSERT_EQ(NumTableFilesAtLevel(0), 1);
    628     ASSERT_EQ(NumTableFilesAtLevel(1), 0);
    629     ASSERT_EQ(NumTableFilesAtLevel(2), 1);
    630 
    631     // Step 3: read a bunch of times
    632     for (int i = 0; i < 1000; i++) {
    633       ASSERT_EQ("NOT_FOUND", Get("missing"));
    634     }
    635 
    636     // Step 4: Wait for compaction to finish
    637     DelayMilliseconds(1000);
    638 
    639     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
    640   } while (ChangeOptions());
    641 }
    642 
    643 TEST(DBTest, IterEmpty) {
    644   Iterator* iter = db_->NewIterator(ReadOptions());
    645 
    646   iter->SeekToFirst();
    647   ASSERT_EQ(IterStatus(iter), "(invalid)");
    648 
    649   iter->SeekToLast();
    650   ASSERT_EQ(IterStatus(iter), "(invalid)");
    651 
    652   iter->Seek("foo");
    653   ASSERT_EQ(IterStatus(iter), "(invalid)");
    654 
    655   delete iter;
    656 }
    657 
    658 TEST(DBTest, IterSingle) {
    659   ASSERT_OK(Put("a", "va"));
    660   Iterator* iter = db_->NewIterator(ReadOptions());
    661 
    662   iter->SeekToFirst();
    663   ASSERT_EQ(IterStatus(iter), "a->va");
    664   iter->Next();
    665   ASSERT_EQ(IterStatus(iter), "(invalid)");
    666   iter->SeekToFirst();
    667   ASSERT_EQ(IterStatus(iter), "a->va");
    668   iter->Prev();
    669   ASSERT_EQ(IterStatus(iter), "(invalid)");
    670 
    671   iter->SeekToLast();
    672   ASSERT_EQ(IterStatus(iter), "a->va");
    673   iter->Next();
    674   ASSERT_EQ(IterStatus(iter), "(invalid)");
    675   iter->SeekToLast();
    676   ASSERT_EQ(IterStatus(iter), "a->va");
    677   iter->Prev();
    678   ASSERT_EQ(IterStatus(iter), "(invalid)");
    679 
    680   iter->Seek("");
    681   ASSERT_EQ(IterStatus(iter), "a->va");
    682   iter->Next();
    683   ASSERT_EQ(IterStatus(iter), "(invalid)");
    684 
    685   iter->Seek("a");
    686   ASSERT_EQ(IterStatus(iter), "a->va");
    687   iter->Next();
    688   ASSERT_EQ(IterStatus(iter), "(invalid)");
    689 
    690   iter->Seek("b");
    691   ASSERT_EQ(IterStatus(iter), "(invalid)");
    692 
    693   delete iter;
    694 }
    695 
    696 TEST(DBTest, IterMulti) {
    697   ASSERT_OK(Put("a", "va"));
    698   ASSERT_OK(Put("b", "vb"));
    699   ASSERT_OK(Put("c", "vc"));
    700   Iterator* iter = db_->NewIterator(ReadOptions());
    701 
    702   iter->SeekToFirst();
    703   ASSERT_EQ(IterStatus(iter), "a->va");
    704   iter->Next();
    705   ASSERT_EQ(IterStatus(iter), "b->vb");
    706   iter->Next();
    707   ASSERT_EQ(IterStatus(iter), "c->vc");
    708   iter->Next();
    709   ASSERT_EQ(IterStatus(iter), "(invalid)");
    710   iter->SeekToFirst();
    711   ASSERT_EQ(IterStatus(iter), "a->va");
    712   iter->Prev();
    713   ASSERT_EQ(IterStatus(iter), "(invalid)");
    714 
    715   iter->SeekToLast();
    716   ASSERT_EQ(IterStatus(iter), "c->vc");
    717   iter->Prev();
    718   ASSERT_EQ(IterStatus(iter), "b->vb");
    719   iter->Prev();
    720   ASSERT_EQ(IterStatus(iter), "a->va");
    721   iter->Prev();
    722   ASSERT_EQ(IterStatus(iter), "(invalid)");
    723   iter->SeekToLast();
    724   ASSERT_EQ(IterStatus(iter), "c->vc");
    725   iter->Next();
    726   ASSERT_EQ(IterStatus(iter), "(invalid)");
    727 
    728   iter->Seek("");
    729   ASSERT_EQ(IterStatus(iter), "a->va");
    730   iter->Seek("a");
    731   ASSERT_EQ(IterStatus(iter), "a->va");
    732   iter->Seek("ax");
    733   ASSERT_EQ(IterStatus(iter), "b->vb");
    734   iter->Seek("b");
    735   ASSERT_EQ(IterStatus(iter), "b->vb");
    736   iter->Seek("z");
    737   ASSERT_EQ(IterStatus(iter), "(invalid)");
    738 
    739   // Switch from reverse to forward
    740   iter->SeekToLast();
    741   iter->Prev();
    742   iter->Prev();
    743   iter->Next();
    744   ASSERT_EQ(IterStatus(iter), "b->vb");
    745 
    746   // Switch from forward to reverse
    747   iter->SeekToFirst();
    748   iter->Next();
    749   iter->Next();
    750   iter->Prev();
    751   ASSERT_EQ(IterStatus(iter), "b->vb");
    752 
    753   // Make sure iter stays at snapshot
    754   ASSERT_OK(Put("a",  "va2"));
    755   ASSERT_OK(Put("a2", "va3"));
    756   ASSERT_OK(Put("b",  "vb2"));
    757   ASSERT_OK(Put("c",  "vc2"));
    758   ASSERT_OK(Delete("b"));
    759   iter->SeekToFirst();
    760   ASSERT_EQ(IterStatus(iter), "a->va");
    761   iter->Next();
    762   ASSERT_EQ(IterStatus(iter), "b->vb");
    763   iter->Next();
    764   ASSERT_EQ(IterStatus(iter), "c->vc");
    765   iter->Next();
    766   ASSERT_EQ(IterStatus(iter), "(invalid)");
    767   iter->SeekToLast();
    768   ASSERT_EQ(IterStatus(iter), "c->vc");
    769   iter->Prev();
    770   ASSERT_EQ(IterStatus(iter), "b->vb");
    771   iter->Prev();
    772   ASSERT_EQ(IterStatus(iter), "a->va");
    773   iter->Prev();
    774   ASSERT_EQ(IterStatus(iter), "(invalid)");
    775 
    776   delete iter;
    777 }
    778 
    779 TEST(DBTest, IterSmallAndLargeMix) {
    780   ASSERT_OK(Put("a", "va"));
    781   ASSERT_OK(Put("b", std::string(100000, 'b')));
    782   ASSERT_OK(Put("c", "vc"));
    783   ASSERT_OK(Put("d", std::string(100000, 'd')));
    784   ASSERT_OK(Put("e", std::string(100000, 'e')));
    785 
    786   Iterator* iter = db_->NewIterator(ReadOptions());
    787 
    788   iter->SeekToFirst();
    789   ASSERT_EQ(IterStatus(iter), "a->va");
    790   iter->Next();
    791   ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
    792   iter->Next();
    793   ASSERT_EQ(IterStatus(iter), "c->vc");
    794   iter->Next();
    795   ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
    796   iter->Next();
    797   ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
    798   iter->Next();
    799   ASSERT_EQ(IterStatus(iter), "(invalid)");
    800 
    801   iter->SeekToLast();
    802   ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
    803   iter->Prev();
    804   ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
    805   iter->Prev();
    806   ASSERT_EQ(IterStatus(iter), "c->vc");
    807   iter->Prev();
    808   ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
    809   iter->Prev();
    810   ASSERT_EQ(IterStatus(iter), "a->va");
    811   iter->Prev();
    812   ASSERT_EQ(IterStatus(iter), "(invalid)");
    813 
    814   delete iter;
    815 }
    816 
    817 TEST(DBTest, IterMultiWithDelete) {
    818   do {
    819     ASSERT_OK(Put("a", "va"));
    820     ASSERT_OK(Put("b", "vb"));
    821     ASSERT_OK(Put("c", "vc"));
    822     ASSERT_OK(Delete("b"));
    823     ASSERT_EQ("NOT_FOUND", Get("b"));
    824 
    825     Iterator* iter = db_->NewIterator(ReadOptions());
    826     iter->Seek("c");
    827     ASSERT_EQ(IterStatus(iter), "c->vc");
    828     iter->Prev();
    829     ASSERT_EQ(IterStatus(iter), "a->va");
    830     delete iter;
    831   } while (ChangeOptions());
    832 }
    833 
    834 TEST(DBTest, Recover) {
    835   do {
    836     ASSERT_OK(Put("foo", "v1"));
    837     ASSERT_OK(Put("baz", "v5"));
    838 
    839     Reopen();
    840     ASSERT_EQ("v1", Get("foo"));
    841 
    842     ASSERT_EQ("v1", Get("foo"));
    843     ASSERT_EQ("v5", Get("baz"));
    844     ASSERT_OK(Put("bar", "v2"));
    845     ASSERT_OK(Put("foo", "v3"));
    846 
    847     Reopen();
    848     ASSERT_EQ("v3", Get("foo"));
    849     ASSERT_OK(Put("foo", "v4"));
    850     ASSERT_EQ("v4", Get("foo"));
    851     ASSERT_EQ("v2", Get("bar"));
    852     ASSERT_EQ("v5", Get("baz"));
    853   } while (ChangeOptions());
    854 }
    855 
    856 TEST(DBTest, RecoveryWithEmptyLog) {
    857   do {
    858     ASSERT_OK(Put("foo", "v1"));
    859     ASSERT_OK(Put("foo", "v2"));
    860     Reopen();
    861     Reopen();
    862     ASSERT_OK(Put("foo", "v3"));
    863     Reopen();
    864     ASSERT_EQ("v3", Get("foo"));
    865   } while (ChangeOptions());
    866 }
    867 
    868 // Check that writes done during a memtable compaction are recovered
    869 // if the database is shutdown during the memtable compaction.
    870 TEST(DBTest, RecoverDuringMemtableCompaction) {
    871   do {
    872     Options options = CurrentOptions();
    873     options.env = env_;
    874     options.write_buffer_size = 1000000;
    875     Reopen(&options);
    876 
    877     // Trigger a long memtable compaction and reopen the database during it
    878     ASSERT_OK(Put("foo", "v1"));                         // Goes to 1st log file
    879     ASSERT_OK(Put("big1", std::string(10000000, 'x')));  // Fills memtable
    880     ASSERT_OK(Put("big2", std::string(1000, 'y')));      // Triggers compaction
    881     ASSERT_OK(Put("bar", "v2"));                         // Goes to new log file
    882 
    883     Reopen(&options);
    884     ASSERT_EQ("v1", Get("foo"));
    885     ASSERT_EQ("v2", Get("bar"));
    886     ASSERT_EQ(std::string(10000000, 'x'), Get("big1"));
    887     ASSERT_EQ(std::string(1000, 'y'), Get("big2"));
    888   } while (ChangeOptions());
    889 }
    890 
    891 static std::string Key(int i) {
    892   char buf[100];
    893   snprintf(buf, sizeof(buf), "key%06d", i);
    894   return std::string(buf);
    895 }
    896 
    897 TEST(DBTest, MinorCompactionsHappen) {
    898   Options options = CurrentOptions();
    899   options.write_buffer_size = 10000;
    900   Reopen(&options);
    901 
    902   const int N = 500;
    903 
    904   int starting_num_tables = TotalTableFiles();
    905   for (int i = 0; i < N; i++) {
    906     ASSERT_OK(Put(Key(i), Key(i) + std::string(1000, 'v')));
    907   }
    908   int ending_num_tables = TotalTableFiles();
    909   ASSERT_GT(ending_num_tables, starting_num_tables);
    910 
    911   for (int i = 0; i < N; i++) {
    912     ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
    913   }
    914 
    915   Reopen();
    916 
    917   for (int i = 0; i < N; i++) {
    918     ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
    919   }
    920 }
    921 
    922 TEST(DBTest, RecoverWithLargeLog) {
    923   {
    924     Options options = CurrentOptions();
    925     Reopen(&options);
    926     ASSERT_OK(Put("big1", std::string(200000, '1')));
    927     ASSERT_OK(Put("big2", std::string(200000, '2')));
    928     ASSERT_OK(Put("small3", std::string(10, '3')));
    929     ASSERT_OK(Put("small4", std::string(10, '4')));
    930     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
    931   }
    932 
    933   // Make sure that if we re-open with a small write buffer size that
    934   // we flush table files in the middle of a large log file.
    935   Options options = CurrentOptions();
    936   options.write_buffer_size = 100000;
    937   Reopen(&options);
    938   ASSERT_EQ(NumTableFilesAtLevel(0), 3);
    939   ASSERT_EQ(std::string(200000, '1'), Get("big1"));
    940   ASSERT_EQ(std::string(200000, '2'), Get("big2"));
    941   ASSERT_EQ(std::string(10, '3'), Get("small3"));
    942   ASSERT_EQ(std::string(10, '4'), Get("small4"));
    943   ASSERT_GT(NumTableFilesAtLevel(0), 1);
    944 }
    945 
    946 TEST(DBTest, CompactionsGenerateMultipleFiles) {
    947   Options options = CurrentOptions();
    948   options.write_buffer_size = 100000000;        // Large write buffer
    949   Reopen(&options);
    950 
    951   Random rnd(301);
    952 
    953   // Write 8MB (80 values, each 100K)
    954   ASSERT_EQ(NumTableFilesAtLevel(0), 0);
    955   std::vector<std::string> values;
    956   for (int i = 0; i < 80; i++) {
    957     values.push_back(RandomString(&rnd, 100000));
    958     ASSERT_OK(Put(Key(i), values[i]));
    959   }
    960 
    961   // Reopening moves updates to level-0
    962   Reopen(&options);
    963   dbfull()->TEST_CompactRange(0, NULL, NULL);
    964 
    965   ASSERT_EQ(NumTableFilesAtLevel(0), 0);
    966   ASSERT_GT(NumTableFilesAtLevel(1), 1);
    967   for (int i = 0; i < 80; i++) {
    968     ASSERT_EQ(Get(Key(i)), values[i]);
    969   }
    970 }
    971 
    972 TEST(DBTest, RepeatedWritesToSameKey) {
    973   Options options = CurrentOptions();
    974   options.env = env_;
    975   options.write_buffer_size = 100000;  // Small write buffer
    976   Reopen(&options);
    977 
    978   // We must have at most one file per level except for level-0,
    979   // which may have up to kL0_StopWritesTrigger files.
    980   const int kMaxFiles = config::kNumLevels + config::kL0_StopWritesTrigger;
    981 
    982   Random rnd(301);
    983   std::string value = RandomString(&rnd, 2 * options.write_buffer_size);
    984   for (int i = 0; i < 5 * kMaxFiles; i++) {
    985     Put("key", value);
    986     ASSERT_LE(TotalTableFiles(), kMaxFiles);
    987     fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles());
    988   }
    989 }
    990 
    991 TEST(DBTest, SparseMerge) {
    992   Options options = CurrentOptions();
    993   options.compression = kNoCompression;
    994   Reopen(&options);
    995 
    996   FillLevels("A", "Z");
    997 
    998   // Suppose there is:
    999   //    small amount of data with prefix A
   1000   //    large amount of data with prefix B
   1001   //    small amount of data with prefix C
   1002   // and that recent updates have made small changes to all three prefixes.
   1003   // Check that we do not do a compaction that merges all of B in one shot.
   1004   const std::string value(1000, 'x');
   1005   Put("A", "va");
   1006   // Write approximately 100MB of "B" values
   1007   for (int i = 0; i < 100000; i++) {
   1008     char key[100];
   1009     snprintf(key, sizeof(key), "B%010d", i);
   1010     Put(key, value);
   1011   }
   1012   Put("C", "vc");
   1013   dbfull()->TEST_CompactMemTable();
   1014   dbfull()->TEST_CompactRange(0, NULL, NULL);
   1015 
   1016   // Make sparse update
   1017   Put("A",    "va2");
   1018   Put("B100", "bvalue2");
   1019   Put("C",    "vc2");
   1020   dbfull()->TEST_CompactMemTable();
   1021 
   1022   // Compactions should not cause us to create a situation where
   1023   // a file overlaps too much data at the next level.
   1024   ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
   1025   dbfull()->TEST_CompactRange(0, NULL, NULL);
   1026   ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
   1027   dbfull()->TEST_CompactRange(1, NULL, NULL);
   1028   ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
   1029 }
   1030 
   1031 static bool Between(uint64_t val, uint64_t low, uint64_t high) {
   1032   bool result = (val >= low) && (val <= high);
   1033   if (!result) {
   1034     fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n",
   1035             (unsigned long long)(val),
   1036             (unsigned long long)(low),
   1037             (unsigned long long)(high));
   1038   }
   1039   return result;
   1040 }
   1041 
   1042 TEST(DBTest, ApproximateSizes) {
   1043   do {
   1044     Options options = CurrentOptions();
   1045     options.write_buffer_size = 100000000;        // Large write buffer
   1046     options.compression = kNoCompression;
   1047     DestroyAndReopen();
   1048 
   1049     ASSERT_TRUE(Between(Size("", "xyz"), 0, 0));
   1050     Reopen(&options);
   1051     ASSERT_TRUE(Between(Size("", "xyz"), 0, 0));
   1052 
   1053     // Write 8MB (80 values, each 100K)
   1054     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
   1055     const int N = 80;
   1056     static const int S1 = 100000;
   1057     static const int S2 = 105000;  // Allow some expansion from metadata
   1058     Random rnd(301);
   1059     for (int i = 0; i < N; i++) {
   1060       ASSERT_OK(Put(Key(i), RandomString(&rnd, S1)));
   1061     }
   1062 
   1063     // 0 because GetApproximateSizes() does not account for memtable space
   1064     ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
   1065 
   1066     // Check sizes across recovery by reopening a few times
   1067     for (int run = 0; run < 3; run++) {
   1068       Reopen(&options);
   1069 
   1070       for (int compact_start = 0; compact_start < N; compact_start += 10) {
   1071         for (int i = 0; i < N; i += 10) {
   1072           ASSERT_TRUE(Between(Size("", Key(i)), S1*i, S2*i));
   1073           ASSERT_TRUE(Between(Size("", Key(i)+".suffix"), S1*(i+1), S2*(i+1)));
   1074           ASSERT_TRUE(Between(Size(Key(i), Key(i+10)), S1*10, S2*10));
   1075         }
   1076         ASSERT_TRUE(Between(Size("", Key(50)), S1*50, S2*50));
   1077         ASSERT_TRUE(Between(Size("", Key(50)+".suffix"), S1*50, S2*50));
   1078 
   1079         std::string cstart_str = Key(compact_start);
   1080         std::string cend_str = Key(compact_start + 9);
   1081         Slice cstart = cstart_str;
   1082         Slice cend = cend_str;
   1083         dbfull()->TEST_CompactRange(0, &cstart, &cend);
   1084       }
   1085 
   1086       ASSERT_EQ(NumTableFilesAtLevel(0), 0);
   1087       ASSERT_GT(NumTableFilesAtLevel(1), 0);
   1088     }
   1089   } while (ChangeOptions());
   1090 }
   1091 
   1092 TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
   1093   do {
   1094     Options options = CurrentOptions();
   1095     options.compression = kNoCompression;
   1096     Reopen();
   1097 
   1098     Random rnd(301);
   1099     std::string big1 = RandomString(&rnd, 100000);
   1100     ASSERT_OK(Put(Key(0), RandomString(&rnd, 10000)));
   1101     ASSERT_OK(Put(Key(1), RandomString(&rnd, 10000)));
   1102     ASSERT_OK(Put(Key(2), big1));
   1103     ASSERT_OK(Put(Key(3), RandomString(&rnd, 10000)));
   1104     ASSERT_OK(Put(Key(4), big1));
   1105     ASSERT_OK(Put(Key(5), RandomString(&rnd, 10000)));
   1106     ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000)));
   1107     ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000)));
   1108 
   1109     // Check sizes across recovery by reopening a few times
   1110     for (int run = 0; run < 3; run++) {
   1111       Reopen(&options);
   1112 
   1113       ASSERT_TRUE(Between(Size("", Key(0)), 0, 0));
   1114       ASSERT_TRUE(Between(Size("", Key(1)), 10000, 11000));
   1115       ASSERT_TRUE(Between(Size("", Key(2)), 20000, 21000));
   1116       ASSERT_TRUE(Between(Size("", Key(3)), 120000, 121000));
   1117       ASSERT_TRUE(Between(Size("", Key(4)), 130000, 131000));
   1118       ASSERT_TRUE(Between(Size("", Key(5)), 230000, 231000));
   1119       ASSERT_TRUE(Between(Size("", Key(6)), 240000, 241000));
   1120       ASSERT_TRUE(Between(Size("", Key(7)), 540000, 541000));
   1121       ASSERT_TRUE(Between(Size("", Key(8)), 550000, 560000));
   1122 
   1123       ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000));
   1124 
   1125       dbfull()->TEST_CompactRange(0, NULL, NULL);
   1126     }
   1127   } while (ChangeOptions());
   1128 }
   1129 
   1130 TEST(DBTest, IteratorPinsRef) {
   1131   Put("foo", "hello");
   1132 
   1133   // Get iterator that will yield the current contents of the DB.
   1134   Iterator* iter = db_->NewIterator(ReadOptions());
   1135 
   1136   // Write to force compactions
   1137   Put("foo", "newvalue1");
   1138   for (int i = 0; i < 100; i++) {
   1139     ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
   1140   }
   1141   Put("foo", "newvalue2");
   1142 
   1143   iter->SeekToFirst();
   1144   ASSERT_TRUE(iter->Valid());
   1145   ASSERT_EQ("foo", iter->key().ToString());
   1146   ASSERT_EQ("hello", iter->value().ToString());
   1147   iter->Next();
   1148   ASSERT_TRUE(!iter->Valid());
   1149   delete iter;
   1150 }
   1151 
   1152 TEST(DBTest, Snapshot) {
   1153   do {
   1154     Put("foo", "v1");
   1155     const Snapshot* s1 = db_->GetSnapshot();
   1156     Put("foo", "v2");
   1157     const Snapshot* s2 = db_->GetSnapshot();
   1158     Put("foo", "v3");
   1159     const Snapshot* s3 = db_->GetSnapshot();
   1160 
   1161     Put("foo", "v4");
   1162     ASSERT_EQ("v1", Get("foo", s1));
   1163     ASSERT_EQ("v2", Get("foo", s2));
   1164     ASSERT_EQ("v3", Get("foo", s3));
   1165     ASSERT_EQ("v4", Get("foo"));
   1166 
   1167     db_->ReleaseSnapshot(s3);
   1168     ASSERT_EQ("v1", Get("foo", s1));
   1169     ASSERT_EQ("v2", Get("foo", s2));
   1170     ASSERT_EQ("v4", Get("foo"));
   1171 
   1172     db_->ReleaseSnapshot(s1);
   1173     ASSERT_EQ("v2", Get("foo", s2));
   1174     ASSERT_EQ("v4", Get("foo"));
   1175 
   1176     db_->ReleaseSnapshot(s2);
   1177     ASSERT_EQ("v4", Get("foo"));
   1178   } while (ChangeOptions());
   1179 }
   1180 
   1181 TEST(DBTest, HiddenValuesAreRemoved) {
   1182   do {
   1183     Random rnd(301);
   1184     FillLevels("a", "z");
   1185 
   1186     std::string big = RandomString(&rnd, 50000);
   1187     Put("foo", big);
   1188     Put("pastfoo", "v");
   1189     const Snapshot* snapshot = db_->GetSnapshot();
   1190     Put("foo", "tiny");
   1191     Put("pastfoo2", "v2");        // Advance sequence number one more
   1192 
   1193     ASSERT_OK(dbfull()->TEST_CompactMemTable());
   1194     ASSERT_GT(NumTableFilesAtLevel(0), 0);
   1195 
   1196     ASSERT_EQ(big, Get("foo", snapshot));
   1197     ASSERT_TRUE(Between(Size("", "pastfoo"), 50000, 60000));
   1198     db_->ReleaseSnapshot(snapshot);
   1199     ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]");
   1200     Slice x("x");
   1201     dbfull()->TEST_CompactRange(0, NULL, &x);
   1202     ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
   1203     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
   1204     ASSERT_GE(NumTableFilesAtLevel(1), 1);
   1205     dbfull()->TEST_CompactRange(1, NULL, &x);
   1206     ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
   1207 
   1208     ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000));
   1209   } while (ChangeOptions());
   1210 }
   1211 
   1212 TEST(DBTest, DeletionMarkers1) {
   1213   Put("foo", "v1");
   1214   ASSERT_OK(dbfull()->TEST_CompactMemTable());
   1215   const int last = config::kMaxMemCompactLevel;
   1216   ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo => v1 is now in last level
   1217 
   1218   // Place a table at level last-1 to prevent merging with preceding mutation
   1219   Put("a", "begin");
   1220   Put("z", "end");
   1221   dbfull()->TEST_CompactMemTable();
   1222   ASSERT_EQ(NumTableFilesAtLevel(last), 1);
   1223   ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
   1224 
   1225   Delete("foo");
   1226   Put("foo", "v2");
   1227   ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
   1228   ASSERT_OK(dbfull()->TEST_CompactMemTable());  // Moves to level last-2
   1229   ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
   1230   Slice z("z");
   1231   dbfull()->TEST_CompactRange(last-2, NULL, &z);
   1232   // DEL eliminated, but v1 remains because we aren't compacting that level
   1233   // (DEL can be eliminated because v2 hides v1).
   1234   ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
   1235   dbfull()->TEST_CompactRange(last-1, NULL, NULL);
   1236   // Merging last-1 w/ last, so we are the base level for "foo", so
   1237   // DEL is removed.  (as is v1).
   1238   ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
   1239 }
   1240 
   1241 TEST(DBTest, DeletionMarkers2) {
   1242   Put("foo", "v1");
   1243   ASSERT_OK(dbfull()->TEST_CompactMemTable());
   1244   const int last = config::kMaxMemCompactLevel;
   1245   ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo => v1 is now in last level
   1246 
   1247   // Place a table at level last-1 to prevent merging with preceding mutation
   1248   Put("a", "begin");
   1249   Put("z", "end");
   1250   dbfull()->TEST_CompactMemTable();
   1251   ASSERT_EQ(NumTableFilesAtLevel(last), 1);
   1252   ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
   1253 
   1254   Delete("foo");
   1255   ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
   1256   ASSERT_OK(dbfull()->TEST_CompactMemTable());  // Moves to level last-2
   1257   ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
   1258   dbfull()->TEST_CompactRange(last-2, NULL, NULL);
   1259   // DEL kept: "last" file overlaps
   1260   ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
   1261   dbfull()->TEST_CompactRange(last-1, NULL, NULL);
   1262   // Merging last-1 w/ last, so we are the base level for "foo", so
   1263   // DEL is removed.  (as is v1).
   1264   ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
   1265 }
   1266 
   1267 TEST(DBTest, OverlapInLevel0) {
   1268   do {
   1269     ASSERT_EQ(config::kMaxMemCompactLevel, 2) << "Fix test to match config";
   1270 
   1271     // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
   1272     ASSERT_OK(Put("100", "v100"));
   1273     ASSERT_OK(Put("999", "v999"));
   1274     dbfull()->TEST_CompactMemTable();
   1275     ASSERT_OK(Delete("100"));
   1276     ASSERT_OK(Delete("999"));
   1277     dbfull()->TEST_CompactMemTable();
   1278     ASSERT_EQ("0,1,1", FilesPerLevel());
   1279 
   1280     // Make files spanning the following ranges in level-0:
   1281     //  files[0]  200 .. 900
   1282     //  files[1]  300 .. 500
   1283     // Note that files are sorted by smallest key.
   1284     ASSERT_OK(Put("300", "v300"));
   1285     ASSERT_OK(Put("500", "v500"));
   1286     dbfull()->TEST_CompactMemTable();
   1287     ASSERT_OK(Put("200", "v200"));
   1288     ASSERT_OK(Put("600", "v600"));
   1289     ASSERT_OK(Put("900", "v900"));
   1290     dbfull()->TEST_CompactMemTable();
   1291     ASSERT_EQ("2,1,1", FilesPerLevel());
   1292 
   1293     // Compact away the placeholder files we created initially
   1294     dbfull()->TEST_CompactRange(1, NULL, NULL);
   1295     dbfull()->TEST_CompactRange(2, NULL, NULL);
   1296     ASSERT_EQ("2", FilesPerLevel());
   1297 
   1298     // Do a memtable compaction.  Before bug-fix, the compaction would
   1299     // not detect the overlap with level-0 files and would incorrectly place
   1300     // the deletion in a deeper level.
   1301     ASSERT_OK(Delete("600"));
   1302     dbfull()->TEST_CompactMemTable();
   1303     ASSERT_EQ("3", FilesPerLevel());
   1304     ASSERT_EQ("NOT_FOUND", Get("600"));
   1305   } while (ChangeOptions());
   1306 }
   1307 
   1308 TEST(DBTest, L0_CompactionBug_Issue44_a) {
   1309   Reopen();
   1310   ASSERT_OK(Put("b", "v"));
   1311   Reopen();
   1312   ASSERT_OK(Delete("b"));
   1313   ASSERT_OK(Delete("a"));
   1314   Reopen();
   1315   ASSERT_OK(Delete("a"));
   1316   Reopen();
   1317   ASSERT_OK(Put("a", "v"));
   1318   Reopen();
   1319   Reopen();
   1320   ASSERT_EQ("(a->v)", Contents());
   1321   DelayMilliseconds(1000);  // Wait for compaction to finish
   1322   ASSERT_EQ("(a->v)", Contents());
   1323 }
   1324 
   1325 TEST(DBTest, L0_CompactionBug_Issue44_b) {
   1326   Reopen();
   1327   Put("","");
   1328   Reopen();
   1329   Delete("e");
   1330   Put("","");
   1331   Reopen();
   1332   Put("c", "cv");
   1333   Reopen();
   1334   Put("","");
   1335   Reopen();
   1336   Put("","");
   1337   DelayMilliseconds(1000);  // Wait for compaction to finish
   1338   Reopen();
   1339   Put("d","dv");
   1340   Reopen();
   1341   Put("","");
   1342   Reopen();
   1343   Delete("d");
   1344   Delete("b");
   1345   Reopen();
   1346   ASSERT_EQ("(->)(c->cv)", Contents());
   1347   DelayMilliseconds(1000);  // Wait for compaction to finish
   1348   ASSERT_EQ("(->)(c->cv)", Contents());
   1349 }
   1350 
   1351 TEST(DBTest, ComparatorCheck) {
   1352   class NewComparator : public Comparator {
   1353    public:
   1354     virtual const char* Name() const { return "leveldb.NewComparator"; }
   1355     virtual int Compare(const Slice& a, const Slice& b) const {
   1356       return BytewiseComparator()->Compare(a, b);
   1357     }
   1358     virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
   1359       BytewiseComparator()->FindShortestSeparator(s, l);
   1360     }
   1361     virtual void FindShortSuccessor(std::string* key) const {
   1362       BytewiseComparator()->FindShortSuccessor(key);
   1363     }
   1364   };
   1365   NewComparator cmp;
   1366   Options new_options = CurrentOptions();
   1367   new_options.comparator = &cmp;
   1368   Status s = TryReopen(&new_options);
   1369   ASSERT_TRUE(!s.ok());
   1370   ASSERT_TRUE(s.ToString().find("comparator") != std::string::npos)
   1371       << s.ToString();
   1372 }
   1373 
   1374 TEST(DBTest, CustomComparator) {
   1375   class NumberComparator : public Comparator {
   1376    public:
   1377     virtual const char* Name() const { return "test.NumberComparator"; }
   1378     virtual int Compare(const Slice& a, const Slice& b) const {
   1379       return ToNumber(a) - ToNumber(b);
   1380     }
   1381     virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
   1382       ToNumber(*s);     // Check format
   1383       ToNumber(l);      // Check format
   1384     }
   1385     virtual void FindShortSuccessor(std::string* key) const {
   1386       ToNumber(*key);   // Check format
   1387     }
   1388    private:
   1389     static int ToNumber(const Slice& x) {
   1390       // Check that there are no extra characters.
   1391       ASSERT_TRUE(x.size() >= 2 && x[0] == '[' && x[x.size()-1] == ']')
   1392           << EscapeString(x);
   1393       int val;
   1394       char ignored;
   1395       ASSERT_TRUE(sscanf(x.ToString().c_str(), "[%i]%c", &val, &ignored) == 1)
   1396           << EscapeString(x);
   1397       return val;
   1398     }
   1399   };
   1400   NumberComparator cmp;
   1401   Options new_options = CurrentOptions();
   1402   new_options.create_if_missing = true;
   1403   new_options.comparator = &cmp;
   1404   new_options.filter_policy = NULL;     // Cannot use bloom filters
   1405   new_options.write_buffer_size = 1000;  // Compact more often
   1406   DestroyAndReopen(&new_options);
   1407   ASSERT_OK(Put("[10]", "ten"));
   1408   ASSERT_OK(Put("[0x14]", "twenty"));
   1409   for (int i = 0; i < 2; i++) {
   1410     ASSERT_EQ("ten", Get("[10]"));
   1411     ASSERT_EQ("ten", Get("[0xa]"));
   1412     ASSERT_EQ("twenty", Get("[20]"));
   1413     ASSERT_EQ("twenty", Get("[0x14]"));
   1414     ASSERT_EQ("NOT_FOUND", Get("[15]"));
   1415     ASSERT_EQ("NOT_FOUND", Get("[0xf]"));
   1416     Compact("[0]", "[9999]");
   1417   }
   1418 
   1419   for (int run = 0; run < 2; run++) {
   1420     for (int i = 0; i < 1000; i++) {
   1421       char buf[100];
   1422       snprintf(buf, sizeof(buf), "[%d]", i*10);
   1423       ASSERT_OK(Put(buf, buf));
   1424     }
   1425     Compact("[0]", "[1000000]");
   1426   }
   1427 }
   1428 
   1429 TEST(DBTest, ManualCompaction) {
   1430   ASSERT_EQ(config::kMaxMemCompactLevel, 2)
   1431       << "Need to update this test to match kMaxMemCompactLevel";
   1432 
   1433   MakeTables(3, "p", "q");
   1434   ASSERT_EQ("1,1,1", FilesPerLevel());
   1435 
   1436   // Compaction range falls before files
   1437   Compact("", "c");
   1438   ASSERT_EQ("1,1,1", FilesPerLevel());
   1439 
   1440   // Compaction range falls after files
   1441   Compact("r", "z");
   1442   ASSERT_EQ("1,1,1", FilesPerLevel());
   1443 
   1444   // Compaction range overlaps files
   1445   Compact("p1", "p9");
   1446   ASSERT_EQ("0,0,1", FilesPerLevel());
   1447 
   1448   // Populate a different range
   1449   MakeTables(3, "c", "e");
   1450   ASSERT_EQ("1,1,2", FilesPerLevel());
   1451 
   1452   // Compact just the new range
   1453   Compact("b", "f");
   1454   ASSERT_EQ("0,0,2", FilesPerLevel());
   1455 
   1456   // Compact all
   1457   MakeTables(1, "a", "z");
   1458   ASSERT_EQ("0,1,2", FilesPerLevel());
   1459   db_->CompactRange(NULL, NULL);
   1460   ASSERT_EQ("0,0,1", FilesPerLevel());
   1461 }
   1462 
   1463 TEST(DBTest, DBOpen_Options) {
   1464   std::string dbname = test::TmpDir() + "/db_options_test";
   1465   DestroyDB(dbname, Options());
   1466 
   1467   // Does not exist, and create_if_missing == false: error
   1468   DB* db = NULL;
   1469   Options opts;
   1470   opts.create_if_missing = false;
   1471   Status s = DB::Open(opts, dbname, &db);
   1472   ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != NULL);
   1473   ASSERT_TRUE(db == NULL);
   1474 
   1475   // Does not exist, and create_if_missing == true: OK
   1476   opts.create_if_missing = true;
   1477   s = DB::Open(opts, dbname, &db);
   1478   ASSERT_OK(s);
   1479   ASSERT_TRUE(db != NULL);
   1480 
   1481   delete db;
   1482   db = NULL;
   1483 
   1484   // Does exist, and error_if_exists == true: error
   1485   opts.create_if_missing = false;
   1486   opts.error_if_exists = true;
   1487   s = DB::Open(opts, dbname, &db);
   1488   ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != NULL);
   1489   ASSERT_TRUE(db == NULL);
   1490 
   1491   // Does exist, and error_if_exists == false: OK
   1492   opts.create_if_missing = true;
   1493   opts.error_if_exists = false;
   1494   s = DB::Open(opts, dbname, &db);
   1495   ASSERT_OK(s);
   1496   ASSERT_TRUE(db != NULL);
   1497 
   1498   delete db;
   1499   db = NULL;
   1500 }
   1501 
   1502 TEST(DBTest, Locking) {
   1503   DB* db2 = NULL;
   1504   Status s = DB::Open(CurrentOptions(), dbname_, &db2);
   1505   ASSERT_TRUE(!s.ok()) << "Locking did not prevent re-opening db";
   1506 }
   1507 
   1508 // Check that number of files does not grow when we are out of space
   1509 TEST(DBTest, NoSpace) {
   1510   Options options = CurrentOptions();
   1511   options.env = env_;
   1512   Reopen(&options);
   1513 
   1514   ASSERT_OK(Put("foo", "v1"));
   1515   ASSERT_EQ("v1", Get("foo"));
   1516   Compact("a", "z");
   1517   const int num_files = CountFiles();
   1518   env_->no_space_.Release_Store(env_);   // Force out-of-space errors
   1519   env_->sleep_counter_.Reset();
   1520   for (int i = 0; i < 5; i++) {
   1521     for (int level = 0; level < config::kNumLevels-1; level++) {
   1522       dbfull()->TEST_CompactRange(level, NULL, NULL);
   1523     }
   1524   }
   1525   env_->no_space_.Release_Store(NULL);
   1526   ASSERT_LT(CountFiles(), num_files + 3);
   1527 
   1528   // Check that compaction attempts slept after errors
   1529   ASSERT_GE(env_->sleep_counter_.Read(), 5);
   1530 }
   1531 
   1532 TEST(DBTest, ExponentialBackoff) {
   1533   Options options = CurrentOptions();
   1534   options.env = env_;
   1535   Reopen(&options);
   1536 
   1537   ASSERT_OK(Put("foo", "v1"));
   1538   ASSERT_EQ("v1", Get("foo"));
   1539   Compact("a", "z");
   1540   env_->non_writable_.Release_Store(env_);  // Force errors for new files
   1541   env_->sleep_counter_.Reset();
   1542   env_->sleep_time_counter_.Reset();
   1543   for (int i = 0; i < 5; i++) {
   1544     dbfull()->TEST_CompactRange(2, NULL, NULL);
   1545   }
   1546   env_->non_writable_.Release_Store(NULL);
   1547 
   1548   // Wait for compaction to finish
   1549   DelayMilliseconds(1000);
   1550 
   1551   ASSERT_GE(env_->sleep_counter_.Read(), 5);
   1552   ASSERT_LT(env_->sleep_counter_.Read(), 10);
   1553   ASSERT_GE(env_->sleep_time_counter_.Read(), 10e6);
   1554 }
   1555 
   1556 TEST(DBTest, NonWritableFileSystem) {
   1557   Options options = CurrentOptions();
   1558   options.write_buffer_size = 1000;
   1559   options.env = env_;
   1560   Reopen(&options);
   1561   ASSERT_OK(Put("foo", "v1"));
   1562   env_->non_writable_.Release_Store(env_);  // Force errors for new files
   1563   std::string big(100000, 'x');
   1564   int errors = 0;
   1565   for (int i = 0; i < 20; i++) {
   1566     fprintf(stderr, "iter %d; errors %d\n", i, errors);
   1567     if (!Put("foo", big).ok()) {
   1568       errors++;
   1569       DelayMilliseconds(100);
   1570     }
   1571   }
   1572   ASSERT_GT(errors, 0);
   1573   env_->non_writable_.Release_Store(NULL);
   1574 }
   1575 
   1576 TEST(DBTest, ManifestWriteError) {
   1577   // Test for the following problem:
   1578   // (a) Compaction produces file F
   1579   // (b) Log record containing F is written to MANIFEST file, but Sync() fails
   1580   // (c) GC deletes F
   1581   // (d) After reopening DB, reads fail since deleted F is named in log record
   1582 
   1583   // We iterate twice.  In the second iteration, everything is the
   1584   // same except the log record never makes it to the MANIFEST file.
   1585   for (int iter = 0; iter < 2; iter++) {
   1586     port::AtomicPointer* error_type = (iter == 0)
   1587         ? &env_->manifest_sync_error_
   1588         : &env_->manifest_write_error_;
   1589 
   1590     // Insert foo=>bar mapping
   1591     Options options = CurrentOptions();
   1592     options.env = env_;
   1593     options.create_if_missing = true;
   1594     options.error_if_exists = false;
   1595     DestroyAndReopen(&options);
   1596     ASSERT_OK(Put("foo", "bar"));
   1597     ASSERT_EQ("bar", Get("foo"));
   1598 
   1599     // Memtable compaction (will succeed)
   1600     dbfull()->TEST_CompactMemTable();
   1601     ASSERT_EQ("bar", Get("foo"));
   1602     const int last = config::kMaxMemCompactLevel;
   1603     ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo=>bar is now in last level
   1604 
   1605     // Merging compaction (will fail)
   1606     error_type->Release_Store(env_);
   1607     dbfull()->TEST_CompactRange(last, NULL, NULL);  // Should fail
   1608     ASSERT_EQ("bar", Get("foo"));
   1609 
   1610     // Recovery: should not lose data
   1611     error_type->Release_Store(NULL);
   1612     Reopen(&options);
   1613     ASSERT_EQ("bar", Get("foo"));
   1614   }
   1615 }
   1616 
   1617 TEST(DBTest, MissingSSTFile) {
   1618   ASSERT_OK(Put("foo", "bar"));
   1619   ASSERT_EQ("bar", Get("foo"));
   1620 
   1621   // Dump the memtable to disk.
   1622   dbfull()->TEST_CompactMemTable();
   1623   ASSERT_EQ("bar", Get("foo"));
   1624 
   1625   Close();
   1626   ASSERT_TRUE(DeleteAnSSTFile());
   1627   Options options = CurrentOptions();
   1628   options.paranoid_checks = true;
   1629   Status s = TryReopen(&options);
   1630   ASSERT_TRUE(!s.ok());
   1631   ASSERT_TRUE(s.ToString().find("issing") != std::string::npos)
   1632       << s.ToString();
   1633 }
   1634 
   1635 TEST(DBTest, FilesDeletedAfterCompaction) {
   1636   ASSERT_OK(Put("foo", "v2"));
   1637   Compact("a", "z");
   1638   const int num_files = CountFiles();
   1639   for (int i = 0; i < 10; i++) {
   1640     ASSERT_OK(Put("foo", "v2"));
   1641     Compact("a", "z");
   1642   }
   1643   ASSERT_EQ(CountFiles(), num_files);
   1644 }
   1645 
   1646 TEST(DBTest, BloomFilter) {
   1647   env_->count_random_reads_ = true;
   1648   Options options = CurrentOptions();
   1649   options.env = env_;
   1650   options.block_cache = NewLRUCache(0);  // Prevent cache hits
   1651   options.filter_policy = NewBloomFilterPolicy(10);
   1652   Reopen(&options);
   1653 
   1654   // Populate multiple layers
   1655   const int N = 10000;
   1656   for (int i = 0; i < N; i++) {
   1657     ASSERT_OK(Put(Key(i), Key(i)));
   1658   }
   1659   Compact("a", "z");
   1660   for (int i = 0; i < N; i += 100) {
   1661     ASSERT_OK(Put(Key(i), Key(i)));
   1662   }
   1663   dbfull()->TEST_CompactMemTable();
   1664 
   1665   // Prevent auto compactions triggered by seeks
   1666   env_->delay_sstable_sync_.Release_Store(env_);
   1667 
   1668   // Lookup present keys.  Should rarely read from small sstable.
   1669   env_->random_read_counter_.Reset();
   1670   for (int i = 0; i < N; i++) {
   1671     ASSERT_EQ(Key(i), Get(Key(i)));
   1672   }
   1673   int reads = env_->random_read_counter_.Read();
   1674   fprintf(stderr, "%d present => %d reads\n", N, reads);
   1675   ASSERT_GE(reads, N);
   1676   ASSERT_LE(reads, N + 2*N/100);
   1677 
   1678   // Lookup present keys.  Should rarely read from either sstable.
   1679   env_->random_read_counter_.Reset();
   1680   for (int i = 0; i < N; i++) {
   1681     ASSERT_EQ("NOT_FOUND", Get(Key(i) + ".missing"));
   1682   }
   1683   reads = env_->random_read_counter_.Read();
   1684   fprintf(stderr, "%d missing => %d reads\n", N, reads);
   1685   ASSERT_LE(reads, 3*N/100);
   1686 
   1687   env_->delay_sstable_sync_.Release_Store(NULL);
   1688   Close();
   1689   delete options.block_cache;
   1690   delete options.filter_policy;
   1691 }
   1692 
   1693 // Multi-threaded test:
   1694 namespace {
   1695 
   1696 static const int kNumThreads = 4;
   1697 static const int kTestSeconds = 10;
   1698 static const int kNumKeys = 1000;
   1699 
   1700 struct MTState {
   1701   DBTest* test;
   1702   port::AtomicPointer stop;
   1703   port::AtomicPointer counter[kNumThreads];
   1704   port::AtomicPointer thread_done[kNumThreads];
   1705 };
   1706 
   1707 struct MTThread {
   1708   MTState* state;
   1709   int id;
   1710 };
   1711 
   1712 static void MTThreadBody(void* arg) {
   1713   MTThread* t = reinterpret_cast<MTThread*>(arg);
   1714   int id = t->id;
   1715   DB* db = t->state->test->db_;
   1716   uintptr_t counter = 0;
   1717   fprintf(stderr, "... starting thread %d\n", id);
   1718   Random rnd(1000 + id);
   1719   std::string value;
   1720   char valbuf[1500];
   1721   while (t->state->stop.Acquire_Load() == NULL) {
   1722     t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
   1723 
   1724     int key = rnd.Uniform(kNumKeys);
   1725     char keybuf[20];
   1726     snprintf(keybuf, sizeof(keybuf), "%016d", key);
   1727 
   1728     if (rnd.OneIn(2)) {
   1729       // Write values of the form <key, my id, counter>.
   1730       // We add some padding for force compactions.
   1731       snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
   1732                key, id, static_cast<int>(counter));
   1733       ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
   1734     } else {
   1735       // Read a value and verify that it matches the pattern written above.
   1736       Status s = db->Get(ReadOptions(), Slice(keybuf), &value);
   1737       if (s.IsNotFound()) {
   1738         // Key has not yet been written
   1739       } else {
   1740         // Check that the writer thread counter is >= the counter in the value
   1741         ASSERT_OK(s);
   1742         int k, w, c;
   1743         ASSERT_EQ(3, sscanf(value.c_str(), "%d.%d.%d", &k, &w, &c)) << value;
   1744         ASSERT_EQ(k, key);
   1745         ASSERT_GE(w, 0);
   1746         ASSERT_LT(w, kNumThreads);
   1747         ASSERT_LE(c, reinterpret_cast<uintptr_t>(
   1748             t->state->counter[w].Acquire_Load()));
   1749       }
   1750     }
   1751     counter++;
   1752   }
   1753   t->state->thread_done[id].Release_Store(t);
   1754   fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
   1755 }
   1756 
   1757 }  // namespace
   1758 
   1759 TEST(DBTest, MultiThreaded) {
   1760   do {
   1761     // Initialize state
   1762     MTState mt;
   1763     mt.test = this;
   1764     mt.stop.Release_Store(0);
   1765     for (int id = 0; id < kNumThreads; id++) {
   1766       mt.counter[id].Release_Store(0);
   1767       mt.thread_done[id].Release_Store(0);
   1768     }
   1769 
   1770     // Start threads
   1771     MTThread thread[kNumThreads];
   1772     for (int id = 0; id < kNumThreads; id++) {
   1773       thread[id].state = &mt;
   1774       thread[id].id = id;
   1775       env_->StartThread(MTThreadBody, &thread[id]);
   1776     }
   1777 
   1778     // Let them run for a while
   1779     DelayMilliseconds(kTestSeconds * 1000);
   1780 
   1781     // Stop the threads and wait for them to finish
   1782     mt.stop.Release_Store(&mt);
   1783     for (int id = 0; id < kNumThreads; id++) {
   1784       while (mt.thread_done[id].Acquire_Load() == NULL) {
   1785         DelayMilliseconds(100);
   1786       }
   1787     }
   1788   } while (ChangeOptions());
   1789 }
   1790 
   1791 namespace {
   1792 typedef std::map<std::string, std::string> KVMap;
   1793 }
   1794 
   1795 class ModelDB: public DB {
   1796  public:
   1797   class ModelSnapshot : public Snapshot {
   1798    public:
   1799     KVMap map_;
   1800   };
   1801 
   1802   explicit ModelDB(const Options& options): options_(options) { }
   1803   ~ModelDB() { }
   1804   virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) {
   1805     return DB::Put(o, k, v);
   1806   }
   1807   virtual Status Delete(const WriteOptions& o, const Slice& key) {
   1808     return DB::Delete(o, key);
   1809   }
   1810   virtual Status Get(const ReadOptions& options,
   1811                      const Slice& key, std::string* value) {
   1812     assert(false);      // Not implemented
   1813     return Status::NotFound(key);
   1814   }
   1815   virtual Iterator* NewIterator(const ReadOptions& options) {
   1816     if (options.snapshot == NULL) {
   1817       KVMap* saved = new KVMap;
   1818       *saved = map_;
   1819       return new ModelIter(saved, true);
   1820     } else {
   1821       const KVMap* snapshot_state =
   1822           &(reinterpret_cast<const ModelSnapshot*>(options.snapshot)->map_);
   1823       return new ModelIter(snapshot_state, false);
   1824     }
   1825   }
   1826   virtual const Snapshot* GetSnapshot() {
   1827     ModelSnapshot* snapshot = new ModelSnapshot;
   1828     snapshot->map_ = map_;
   1829     return snapshot;
   1830   }
   1831 
   1832   virtual void ReleaseSnapshot(const Snapshot* snapshot) {
   1833     delete reinterpret_cast<const ModelSnapshot*>(snapshot);
   1834   }
   1835   virtual Status Write(const WriteOptions& options, WriteBatch* batch) {
   1836     class Handler : public WriteBatch::Handler {
   1837      public:
   1838       KVMap* map_;
   1839       virtual void Put(const Slice& key, const Slice& value) {
   1840         (*map_)[key.ToString()] = value.ToString();
   1841       }
   1842       virtual void Delete(const Slice& key) {
   1843         map_->erase(key.ToString());
   1844       }
   1845     };
   1846     Handler handler;
   1847     handler.map_ = &map_;
   1848     return batch->Iterate(&handler);
   1849   }
   1850 
   1851   virtual bool GetProperty(const Slice& property, std::string* value) {
   1852     return false;
   1853   }
   1854   virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes) {
   1855     for (int i = 0; i < n; i++) {
   1856       sizes[i] = 0;
   1857     }
   1858   }
   1859   virtual void CompactRange(const Slice* start, const Slice* end) {
   1860   }
   1861 
   1862  private:
   1863   class ModelIter: public Iterator {
   1864    public:
   1865     ModelIter(const KVMap* map, bool owned)
   1866         : map_(map), owned_(owned), iter_(map_->end()) {
   1867     }
   1868     ~ModelIter() {
   1869       if (owned_) delete map_;
   1870     }
   1871     virtual bool Valid() const { return iter_ != map_->end(); }
   1872     virtual void SeekToFirst() { iter_ = map_->begin(); }
   1873     virtual void SeekToLast() {
   1874       if (map_->empty()) {
   1875         iter_ = map_->end();
   1876       } else {
   1877         iter_ = map_->find(map_->rbegin()->first);
   1878       }
   1879     }
   1880     virtual void Seek(const Slice& k) {
   1881       iter_ = map_->lower_bound(k.ToString());
   1882     }
   1883     virtual void Next() { ++iter_; }
   1884     virtual void Prev() { --iter_; }
   1885     virtual Slice key() const { return iter_->first; }
   1886     virtual Slice value() const { return iter_->second; }
   1887     virtual Status status() const { return Status::OK(); }
   1888    private:
   1889     const KVMap* const map_;
   1890     const bool owned_;  // Do we own map_
   1891     KVMap::const_iterator iter_;
   1892   };
   1893   const Options options_;
   1894   KVMap map_;
   1895 };
   1896 
   1897 static std::string RandomKey(Random* rnd) {
   1898   int len = (rnd->OneIn(3)
   1899              ? 1                // Short sometimes to encourage collisions
   1900              : (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
   1901   return test::RandomKey(rnd, len);
   1902 }
   1903 
   1904 static bool CompareIterators(int step,
   1905                              DB* model,
   1906                              DB* db,
   1907                              const Snapshot* model_snap,
   1908                              const Snapshot* db_snap) {
   1909   ReadOptions options;
   1910   options.snapshot = model_snap;
   1911   Iterator* miter = model->NewIterator(options);
   1912   options.snapshot = db_snap;
   1913   Iterator* dbiter = db->NewIterator(options);
   1914   bool ok = true;
   1915   int count = 0;
   1916   for (miter->SeekToFirst(), dbiter->SeekToFirst();
   1917        ok && miter->Valid() && dbiter->Valid();
   1918        miter->Next(), dbiter->Next()) {
   1919     count++;
   1920     if (miter->key().compare(dbiter->key()) != 0) {
   1921       fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n",
   1922               step,
   1923               EscapeString(miter->key()).c_str(),
   1924               EscapeString(dbiter->key()).c_str());
   1925       ok = false;
   1926       break;
   1927     }
   1928 
   1929     if (miter->value().compare(dbiter->value()) != 0) {
   1930       fprintf(stderr, "step %d: Value mismatch for key '%s': '%s' vs. '%s'\n",
   1931               step,
   1932               EscapeString(miter->key()).c_str(),
   1933               EscapeString(miter->value()).c_str(),
   1934               EscapeString(miter->value()).c_str());
   1935       ok = false;
   1936     }
   1937   }
   1938 
   1939   if (ok) {
   1940     if (miter->Valid() != dbiter->Valid()) {
   1941       fprintf(stderr, "step %d: Mismatch at end of iterators: %d vs. %d\n",
   1942               step, miter->Valid(), dbiter->Valid());
   1943       ok = false;
   1944     }
   1945   }
   1946   fprintf(stderr, "%d entries compared: ok=%d\n", count, ok);
   1947   delete miter;
   1948   delete dbiter;
   1949   return ok;
   1950 }
   1951 
   1952 TEST(DBTest, Randomized) {
   1953   Random rnd(test::RandomSeed());
   1954   do {
   1955     ModelDB model(CurrentOptions());
   1956     const int N = 10000;
   1957     const Snapshot* model_snap = NULL;
   1958     const Snapshot* db_snap = NULL;
   1959     std::string k, v;
   1960     for (int step = 0; step < N; step++) {
   1961       if (step % 100 == 0) {
   1962         fprintf(stderr, "Step %d of %d\n", step, N);
   1963       }
   1964       // TODO(sanjay): Test Get() works
   1965       int p = rnd.Uniform(100);
   1966       if (p < 45) {                               // Put
   1967         k = RandomKey(&rnd);
   1968         v = RandomString(&rnd,
   1969                          rnd.OneIn(20)
   1970                          ? 100 + rnd.Uniform(100)
   1971                          : rnd.Uniform(8));
   1972         ASSERT_OK(model.Put(WriteOptions(), k, v));
   1973         ASSERT_OK(db_->Put(WriteOptions(), k, v));
   1974 
   1975       } else if (p < 90) {                        // Delete
   1976         k = RandomKey(&rnd);
   1977         ASSERT_OK(model.Delete(WriteOptions(), k));
   1978         ASSERT_OK(db_->Delete(WriteOptions(), k));
   1979 
   1980 
   1981       } else {                                    // Multi-element batch
   1982         WriteBatch b;
   1983         const int num = rnd.Uniform(8);
   1984         for (int i = 0; i < num; i++) {
   1985           if (i == 0 || !rnd.OneIn(10)) {
   1986             k = RandomKey(&rnd);
   1987           } else {
   1988             // Periodically re-use the same key from the previous iter, so
   1989             // we have multiple entries in the write batch for the same key
   1990           }
   1991           if (rnd.OneIn(2)) {
   1992             v = RandomString(&rnd, rnd.Uniform(10));
   1993             b.Put(k, v);
   1994           } else {
   1995             b.Delete(k);
   1996           }
   1997         }
   1998         ASSERT_OK(model.Write(WriteOptions(), &b));
   1999         ASSERT_OK(db_->Write(WriteOptions(), &b));
   2000       }
   2001 
   2002       if ((step % 100) == 0) {
   2003         ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL));
   2004         ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap));
   2005         // Save a snapshot from each DB this time that we'll use next
   2006         // time we compare things, to make sure the current state is
   2007         // preserved with the snapshot
   2008         if (model_snap != NULL) model.ReleaseSnapshot(model_snap);
   2009         if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
   2010 
   2011         Reopen();
   2012         ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL));
   2013 
   2014         model_snap = model.GetSnapshot();
   2015         db_snap = db_->GetSnapshot();
   2016       }
   2017     }
   2018     if (model_snap != NULL) model.ReleaseSnapshot(model_snap);
   2019     if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
   2020   } while (ChangeOptions());
   2021 }
   2022 
   2023 std::string MakeKey(unsigned int num) {
   2024   char buf[30];
   2025   snprintf(buf, sizeof(buf), "%016u", num);
   2026   return std::string(buf);
   2027 }
   2028 
   2029 void BM_LogAndApply(int iters, int num_base_files) {
   2030   std::string dbname = test::TmpDir() + "/leveldb_test_benchmark";
   2031   DestroyDB(dbname, Options());
   2032 
   2033   DB* db = NULL;
   2034   Options opts;
   2035   opts.create_if_missing = true;
   2036   Status s = DB::Open(opts, dbname, &db);
   2037   ASSERT_OK(s);
   2038   ASSERT_TRUE(db != NULL);
   2039 
   2040   delete db;
   2041   db = NULL;
   2042 
   2043   Env* env = Env::Default();
   2044 
   2045   port::Mutex mu;
   2046   MutexLock l(&mu);
   2047 
   2048   InternalKeyComparator cmp(BytewiseComparator());
   2049   Options options;
   2050   VersionSet vset(dbname, &options, NULL, &cmp);
   2051   ASSERT_OK(vset.Recover());
   2052   VersionEdit vbase;
   2053   uint64_t fnum = 1;
   2054   for (int i = 0; i < num_base_files; i++) {
   2055     InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
   2056     InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
   2057     vbase.AddFile(2, fnum++, 1 /* file size */, start, limit);
   2058   }
   2059   ASSERT_OK(vset.LogAndApply(&vbase, &mu));
   2060 
   2061   uint64_t start_micros = env->NowMicros();
   2062 
   2063   for (int i = 0; i < iters; i++) {
   2064     VersionEdit vedit;
   2065     vedit.DeleteFile(2, fnum);
   2066     InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
   2067     InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
   2068     vedit.AddFile(2, fnum++, 1 /* file size */, start, limit);
   2069     vset.LogAndApply(&vedit, &mu);
   2070   }
   2071   uint64_t stop_micros = env->NowMicros();
   2072   unsigned int us = stop_micros - start_micros;
   2073   char buf[16];
   2074   snprintf(buf, sizeof(buf), "%d", num_base_files);
   2075   fprintf(stderr,
   2076           "BM_LogAndApply/%-6s   %8d iters : %9u us (%7.0f us / iter)\n",
   2077           buf, iters, us, ((float)us) / iters);
   2078 }
   2079 
   2080 }  // namespace leveldb
   2081 
   2082 int main(int argc, char** argv) {
   2083   if (argc > 1 && std::string(argv[1]) == "--benchmark") {
   2084     leveldb::BM_LogAndApply(1000, 1);
   2085     leveldb::BM_LogAndApply(1000, 100);
   2086     leveldb::BM_LogAndApply(1000, 10000);
   2087     leveldb::BM_LogAndApply(100, 100000);
   2088     return 0;
   2089   }
   2090 
   2091   return leveldb::test::RunAllTests();
   2092 }
   2093