Home | History | Annotate | Download | only in db
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include "leveldb/db.h"
      6 
      7 #include <errno.h>
      8 #include <fcntl.h>
      9 #include <sys/stat.h>
     10 #include <sys/types.h>
     11 #include "leveldb/cache.h"
     12 #include "leveldb/env.h"
     13 #include "leveldb/table.h"
     14 #include "leveldb/write_batch.h"
     15 #include "db/db_impl.h"
     16 #include "db/filename.h"
     17 #include "db/log_format.h"
     18 #include "db/version_set.h"
     19 #include "util/logging.h"
     20 #include "util/testharness.h"
     21 #include "util/testutil.h"
     22 
     23 namespace leveldb {
     24 
     25 static const int kValueSize = 1000;
     26 
     27 class CorruptionTest {
     28  public:
     29   test::ErrorEnv env_;
     30   std::string dbname_;
     31   Cache* tiny_cache_;
     32   Options options_;
     33   DB* db_;
     34 
     35   CorruptionTest() {
     36     tiny_cache_ = NewLRUCache(100);
     37     options_.env = &env_;
     38     dbname_ = test::TmpDir() + "/db_test";
     39     DestroyDB(dbname_, options_);
     40 
     41     db_ = NULL;
     42     options_.create_if_missing = true;
     43     Reopen();
     44     options_.create_if_missing = false;
     45   }
     46 
     47   ~CorruptionTest() {
     48      delete db_;
     49      DestroyDB(dbname_, Options());
     50      delete tiny_cache_;
     51   }
     52 
     53   Status TryReopen(Options* options = NULL) {
     54     delete db_;
     55     db_ = NULL;
     56     Options opt = (options ? *options : options_);
     57     opt.env = &env_;
     58     opt.block_cache = tiny_cache_;
     59     return DB::Open(opt, dbname_, &db_);
     60   }
     61 
     62   void Reopen(Options* options = NULL) {
     63     ASSERT_OK(TryReopen(options));
     64   }
     65 
     66   void RepairDB() {
     67     delete db_;
     68     db_ = NULL;
     69     ASSERT_OK(::leveldb::RepairDB(dbname_, options_));
     70   }
     71 
     72   void Build(int n) {
     73     std::string key_space, value_space;
     74     WriteBatch batch;
     75     for (int i = 0; i < n; i++) {
     76       //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
     77       Slice key = Key(i, &key_space);
     78       batch.Clear();
     79       batch.Put(key, Value(i, &value_space));
     80       ASSERT_OK(db_->Write(WriteOptions(), &batch));
     81     }
     82   }
     83 
     84   void Check(int min_expected, int max_expected) {
     85     int next_expected = 0;
     86     int missed = 0;
     87     int bad_keys = 0;
     88     int bad_values = 0;
     89     int correct = 0;
     90     std::string value_space;
     91     Iterator* iter = db_->NewIterator(ReadOptions());
     92     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     93       uint64_t key;
     94       Slice in(iter->key());
     95       if (!ConsumeDecimalNumber(&in, &key) ||
     96           !in.empty() ||
     97           key < next_expected) {
     98         bad_keys++;
     99         continue;
    100       }
    101       missed += (key - next_expected);
    102       next_expected = key + 1;
    103       if (iter->value() != Value(key, &value_space)) {
    104         bad_values++;
    105       } else {
    106         correct++;
    107       }
    108     }
    109     delete iter;
    110 
    111     fprintf(stderr,
    112             "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%d\n",
    113             min_expected, max_expected, correct, bad_keys, bad_values, missed);
    114     ASSERT_LE(min_expected, correct);
    115     ASSERT_GE(max_expected, correct);
    116   }
    117 
    118   void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
    119     // Pick file to corrupt
    120     std::vector<std::string> filenames;
    121     ASSERT_OK(env_.GetChildren(dbname_, &filenames));
    122     uint64_t number;
    123     FileType type;
    124     std::string fname;
    125     int picked_number = -1;
    126     for (int i = 0; i < filenames.size(); i++) {
    127       if (ParseFileName(filenames[i], &number, &type) &&
    128           type == filetype &&
    129           int(number) > picked_number) {  // Pick latest file
    130         fname = dbname_ + "/" + filenames[i];
    131         picked_number = number;
    132       }
    133     }
    134     ASSERT_TRUE(!fname.empty()) << filetype;
    135 
    136     struct stat sbuf;
    137     if (stat(fname.c_str(), &sbuf) != 0) {
    138       const char* msg = strerror(errno);
    139       ASSERT_TRUE(false) << fname << ": " << msg;
    140     }
    141 
    142     if (offset < 0) {
    143       // Relative to end of file; make it absolute
    144       if (-offset > sbuf.st_size) {
    145         offset = 0;
    146       } else {
    147         offset = sbuf.st_size + offset;
    148       }
    149     }
    150     if (offset > sbuf.st_size) {
    151       offset = sbuf.st_size;
    152     }
    153     if (offset + bytes_to_corrupt > sbuf.st_size) {
    154       bytes_to_corrupt = sbuf.st_size - offset;
    155     }
    156 
    157     // Do it
    158     std::string contents;
    159     Status s = ReadFileToString(Env::Default(), fname, &contents);
    160     ASSERT_TRUE(s.ok()) << s.ToString();
    161     for (int i = 0; i < bytes_to_corrupt; i++) {
    162       contents[i + offset] ^= 0x80;
    163     }
    164     s = WriteStringToFile(Env::Default(), contents, fname);
    165     ASSERT_TRUE(s.ok()) << s.ToString();
    166   }
    167 
    168   int Property(const std::string& name) {
    169     std::string property;
    170     int result;
    171     if (db_->GetProperty(name, &property) &&
    172         sscanf(property.c_str(), "%d", &result) == 1) {
    173       return result;
    174     } else {
    175       return -1;
    176     }
    177   }
    178 
    179   // Return the ith key
    180   Slice Key(int i, std::string* storage) {
    181     char buf[100];
    182     snprintf(buf, sizeof(buf), "%016d", i);
    183     storage->assign(buf, strlen(buf));
    184     return Slice(*storage);
    185   }
    186 
    187   // Return the value to associate with the specified key
    188   Slice Value(int k, std::string* storage) {
    189     Random r(k);
    190     return test::RandomString(&r, kValueSize, storage);
    191   }
    192 };
    193 
    194 TEST(CorruptionTest, Recovery) {
    195   Build(100);
    196   Check(100, 100);
    197   Corrupt(kLogFile, 19, 1);      // WriteBatch tag for first record
    198   Corrupt(kLogFile, log::kBlockSize + 1000, 1);  // Somewhere in second block
    199   Reopen();
    200 
    201   // The 64 records in the first two log blocks are completely lost.
    202   Check(36, 36);
    203 }
    204 
    205 TEST(CorruptionTest, RecoverWriteError) {
    206   env_.writable_file_error_ = true;
    207   Status s = TryReopen();
    208   ASSERT_TRUE(!s.ok());
    209 }
    210 
    211 TEST(CorruptionTest, NewFileErrorDuringWrite) {
    212   // Do enough writing to force minor compaction
    213   env_.writable_file_error_ = true;
    214   const int num = 3 + (Options().write_buffer_size / kValueSize);
    215   std::string value_storage;
    216   Status s;
    217   for (int i = 0; s.ok() && i < num; i++) {
    218     WriteBatch batch;
    219     batch.Put("a", Value(100, &value_storage));
    220     s = db_->Write(WriteOptions(), &batch);
    221   }
    222   ASSERT_TRUE(!s.ok());
    223   ASSERT_GE(env_.num_writable_file_errors_, 1);
    224   env_.writable_file_error_ = false;
    225   Reopen();
    226 }
    227 
    228 TEST(CorruptionTest, TableFile) {
    229   Build(100);
    230   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
    231   dbi->TEST_CompactMemTable();
    232   dbi->TEST_CompactRange(0, NULL, NULL);
    233   dbi->TEST_CompactRange(1, NULL, NULL);
    234 
    235   Corrupt(kTableFile, 100, 1);
    236   Check(99, 99);
    237 }
    238 
    239 TEST(CorruptionTest, TableFileIndexData) {
    240   Build(10000);  // Enough to build multiple Tables
    241   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
    242   dbi->TEST_CompactMemTable();
    243 
    244   Corrupt(kTableFile, -2000, 500);
    245   Reopen();
    246   Check(5000, 9999);
    247 }
    248 
    249 TEST(CorruptionTest, MissingDescriptor) {
    250   Build(1000);
    251   RepairDB();
    252   Reopen();
    253   Check(1000, 1000);
    254 }
    255 
    256 TEST(CorruptionTest, SequenceNumberRecovery) {
    257   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
    258   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
    259   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v3"));
    260   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v4"));
    261   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v5"));
    262   RepairDB();
    263   Reopen();
    264   std::string v;
    265   ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
    266   ASSERT_EQ("v5", v);
    267   // Write something.  If sequence number was not recovered properly,
    268   // it will be hidden by an earlier write.
    269   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v6"));
    270   ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
    271   ASSERT_EQ("v6", v);
    272   Reopen();
    273   ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
    274   ASSERT_EQ("v6", v);
    275 }
    276 
    277 TEST(CorruptionTest, CorruptedDescriptor) {
    278   ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello"));
    279   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
    280   dbi->TEST_CompactMemTable();
    281   dbi->TEST_CompactRange(0, NULL, NULL);
    282 
    283   Corrupt(kDescriptorFile, 0, 1000);
    284   Status s = TryReopen();
    285   ASSERT_TRUE(!s.ok());
    286 
    287   RepairDB();
    288   Reopen();
    289   std::string v;
    290   ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
    291   ASSERT_EQ("hello", v);
    292 }
    293 
    294 TEST(CorruptionTest, CompactionInputError) {
    295   Build(10);
    296   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
    297   dbi->TEST_CompactMemTable();
    298   const int last = config::kMaxMemCompactLevel;
    299   ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
    300 
    301   Corrupt(kTableFile, 100, 1);
    302   Check(9, 9);
    303 
    304   // Force compactions by writing lots of values
    305   Build(10000);
    306   Check(10000, 10000);
    307 }
    308 
    309 TEST(CorruptionTest, CompactionInputErrorParanoid) {
    310   Options options;
    311   options.paranoid_checks = true;
    312   options.write_buffer_size = 1048576;
    313   Reopen(&options);
    314   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
    315 
    316   // Fill levels >= 1 so memtable compaction outputs to level 1
    317   for (int level = 1; level < config::kNumLevels; level++) {
    318     dbi->Put(WriteOptions(), "", "begin");
    319     dbi->Put(WriteOptions(), "~", "end");
    320     dbi->TEST_CompactMemTable();
    321   }
    322 
    323   Build(10);
    324   dbi->TEST_CompactMemTable();
    325   ASSERT_EQ(1, Property("leveldb.num-files-at-level0"));
    326 
    327   Corrupt(kTableFile, 100, 1);
    328   Check(9, 9);
    329 
    330   // Write must eventually fail because of corrupted table
    331   Status s;
    332   std::string tmp1, tmp2;
    333   for (int i = 0; i < 10000 && s.ok(); i++) {
    334     s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
    335   }
    336   ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
    337 }
    338 
    339 TEST(CorruptionTest, UnrelatedKeys) {
    340   Build(10);
    341   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
    342   dbi->TEST_CompactMemTable();
    343   Corrupt(kTableFile, 100, 1);
    344 
    345   std::string tmp1, tmp2;
    346   ASSERT_OK(db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2)));
    347   std::string v;
    348   ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
    349   ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
    350   dbi->TEST_CompactMemTable();
    351   ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
    352   ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
    353 }
    354 
    355 }  // namespace leveldb
    356 
    357 int main(int argc, char** argv) {
    358   return leveldb::test::RunAllTests();
    359 }
    360