1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include "leveldb/db.h" 6 7 #include <errno.h> 8 #include <fcntl.h> 9 #include <sys/stat.h> 10 #include <sys/types.h> 11 #include "leveldb/cache.h" 12 #include "leveldb/env.h" 13 #include "leveldb/table.h" 14 #include "leveldb/write_batch.h" 15 #include "db/db_impl.h" 16 #include "db/filename.h" 17 #include "db/log_format.h" 18 #include "db/version_set.h" 19 #include "util/logging.h" 20 #include "util/testharness.h" 21 #include "util/testutil.h" 22 23 namespace leveldb { 24 25 static const int kValueSize = 1000; 26 27 class CorruptionTest { 28 public: 29 test::ErrorEnv env_; 30 std::string dbname_; 31 Cache* tiny_cache_; 32 Options options_; 33 DB* db_; 34 35 CorruptionTest() { 36 tiny_cache_ = NewLRUCache(100); 37 options_.env = &env_; 38 options_.block_cache = tiny_cache_; 39 dbname_ = test::TmpDir() + "/db_test"; 40 DestroyDB(dbname_, options_); 41 42 db_ = NULL; 43 options_.create_if_missing = true; 44 Reopen(); 45 options_.create_if_missing = false; 46 } 47 48 ~CorruptionTest() { 49 delete db_; 50 DestroyDB(dbname_, Options()); 51 delete tiny_cache_; 52 } 53 54 Status TryReopen() { 55 delete db_; 56 db_ = NULL; 57 return DB::Open(options_, dbname_, &db_); 58 } 59 60 void Reopen() { 61 ASSERT_OK(TryReopen()); 62 } 63 64 void RepairDB() { 65 delete db_; 66 db_ = NULL; 67 ASSERT_OK(::leveldb::RepairDB(dbname_, options_)); 68 } 69 70 void Build(int n) { 71 std::string key_space, value_space; 72 WriteBatch batch; 73 for (int i = 0; i < n; i++) { 74 //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n); 75 Slice key = Key(i, &key_space); 76 batch.Clear(); 77 batch.Put(key, Value(i, &value_space)); 78 WriteOptions options; 79 // Corrupt() doesn't work without this sync on windows; stat reports 0 for 80 // the file size. 81 if (i == n - 1) { 82 options.sync = true; 83 } 84 ASSERT_OK(db_->Write(options, &batch)); 85 } 86 } 87 88 void Check(int min_expected, int max_expected) { 89 int next_expected = 0; 90 int missed = 0; 91 int bad_keys = 0; 92 int bad_values = 0; 93 int correct = 0; 94 std::string value_space; 95 Iterator* iter = db_->NewIterator(ReadOptions()); 96 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { 97 uint64_t key; 98 Slice in(iter->key()); 99 if (in == "" || in == "~") { 100 // Ignore boundary keys. 101 continue; 102 } 103 if (!ConsumeDecimalNumber(&in, &key) || 104 !in.empty() || 105 key < next_expected) { 106 bad_keys++; 107 continue; 108 } 109 missed += (key - next_expected); 110 next_expected = key + 1; 111 if (iter->value() != Value(key, &value_space)) { 112 bad_values++; 113 } else { 114 correct++; 115 } 116 } 117 delete iter; 118 119 fprintf(stderr, 120 "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%d\n", 121 min_expected, max_expected, correct, bad_keys, bad_values, missed); 122 ASSERT_LE(min_expected, correct); 123 ASSERT_GE(max_expected, correct); 124 } 125 126 void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) { 127 // Pick file to corrupt 128 std::vector<std::string> filenames; 129 ASSERT_OK(env_.GetChildren(dbname_, &filenames)); 130 uint64_t number; 131 FileType type; 132 std::string fname; 133 int picked_number = -1; 134 for (size_t i = 0; i < filenames.size(); i++) { 135 if (ParseFileName(filenames[i], &number, &type) && 136 type == filetype && 137 int(number) > picked_number) { // Pick latest file 138 fname = dbname_ + "/" + filenames[i]; 139 picked_number = number; 140 } 141 } 142 ASSERT_TRUE(!fname.empty()) << filetype; 143 144 struct stat sbuf; 145 if (stat(fname.c_str(), &sbuf) != 0) { 146 const char* msg = strerror(errno); 147 ASSERT_TRUE(false) << fname << ": " << msg; 148 } 149 150 if (offset < 0) { 151 // Relative to end of file; make it absolute 152 if (-offset > sbuf.st_size) { 153 offset = 0; 154 } else { 155 offset = sbuf.st_size + offset; 156 } 157 } 158 if (offset > sbuf.st_size) { 159 offset = sbuf.st_size; 160 } 161 if (offset + bytes_to_corrupt > sbuf.st_size) { 162 bytes_to_corrupt = sbuf.st_size - offset; 163 } 164 165 // Do it 166 std::string contents; 167 Status s = ReadFileToString(Env::Default(), fname, &contents); 168 ASSERT_TRUE(s.ok()) << s.ToString(); 169 for (int i = 0; i < bytes_to_corrupt; i++) { 170 contents[i + offset] ^= 0x80; 171 } 172 s = WriteStringToFile(Env::Default(), contents, fname); 173 ASSERT_TRUE(s.ok()) << s.ToString(); 174 } 175 176 int Property(const std::string& name) { 177 std::string property; 178 int result; 179 if (db_->GetProperty(name, &property) && 180 sscanf(property.c_str(), "%d", &result) == 1) { 181 return result; 182 } else { 183 return -1; 184 } 185 } 186 187 // Return the ith key 188 Slice Key(int i, std::string* storage) { 189 char buf[100]; 190 snprintf(buf, sizeof(buf), "%016d", i); 191 storage->assign(buf, strlen(buf)); 192 return Slice(*storage); 193 } 194 195 // Return the value to associate with the specified key 196 Slice Value(int k, std::string* storage) { 197 Random r(k); 198 return test::RandomString(&r, kValueSize, storage); 199 } 200 }; 201 202 TEST(CorruptionTest, Recovery) { 203 Build(100); 204 Check(100, 100); 205 Corrupt(kLogFile, 19, 1); // WriteBatch tag for first record 206 Corrupt(kLogFile, log::kBlockSize + 1000, 1); // Somewhere in second block 207 Reopen(); 208 209 // The 64 records in the first two log blocks are completely lost. 210 Check(36, 36); 211 } 212 213 TEST(CorruptionTest, RecoverWriteError) { 214 env_.writable_file_error_ = true; 215 Status s = TryReopen(); 216 ASSERT_TRUE(!s.ok()); 217 } 218 219 TEST(CorruptionTest, NewFileErrorDuringWrite) { 220 // Do enough writing to force minor compaction 221 env_.writable_file_error_ = true; 222 const int num = 3 + (Options().write_buffer_size / kValueSize); 223 std::string value_storage; 224 Status s; 225 for (int i = 0; s.ok() && i < num; i++) { 226 WriteBatch batch; 227 batch.Put("a", Value(100, &value_storage)); 228 s = db_->Write(WriteOptions(), &batch); 229 } 230 ASSERT_TRUE(!s.ok()); 231 ASSERT_GE(env_.num_writable_file_errors_, 1); 232 env_.writable_file_error_ = false; 233 Reopen(); 234 } 235 236 TEST(CorruptionTest, TableFile) { 237 Build(100); 238 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); 239 dbi->TEST_CompactMemTable(); 240 dbi->TEST_CompactRange(0, NULL, NULL); 241 dbi->TEST_CompactRange(1, NULL, NULL); 242 243 Corrupt(kTableFile, 100, 1); 244 Check(90, 99); 245 } 246 247 TEST(CorruptionTest, TableFileRepair) { 248 options_.block_size = 2 * kValueSize; // Limit scope of corruption 249 options_.paranoid_checks = true; 250 Reopen(); 251 Build(100); 252 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); 253 dbi->TEST_CompactMemTable(); 254 dbi->TEST_CompactRange(0, NULL, NULL); 255 dbi->TEST_CompactRange(1, NULL, NULL); 256 257 Corrupt(kTableFile, 100, 1); 258 RepairDB(); 259 Reopen(); 260 Check(95, 99); 261 } 262 263 TEST(CorruptionTest, TableFileIndexData) { 264 Build(10000); // Enough to build multiple Tables 265 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); 266 dbi->TEST_CompactMemTable(); 267 268 Corrupt(kTableFile, -2000, 500); 269 Reopen(); 270 Check(5000, 9999); 271 } 272 273 TEST(CorruptionTest, MissingDescriptor) { 274 Build(1000); 275 RepairDB(); 276 Reopen(); 277 Check(1000, 1000); 278 } 279 280 TEST(CorruptionTest, SequenceNumberRecovery) { 281 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1")); 282 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2")); 283 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v3")); 284 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v4")); 285 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v5")); 286 RepairDB(); 287 Reopen(); 288 std::string v; 289 ASSERT_OK(db_->Get(ReadOptions(), "foo", &v)); 290 ASSERT_EQ("v5", v); 291 // Write something. If sequence number was not recovered properly, 292 // it will be hidden by an earlier write. 293 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v6")); 294 ASSERT_OK(db_->Get(ReadOptions(), "foo", &v)); 295 ASSERT_EQ("v6", v); 296 Reopen(); 297 ASSERT_OK(db_->Get(ReadOptions(), "foo", &v)); 298 ASSERT_EQ("v6", v); 299 } 300 301 TEST(CorruptionTest, CorruptedDescriptor) { 302 ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello")); 303 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); 304 dbi->TEST_CompactMemTable(); 305 dbi->TEST_CompactRange(0, NULL, NULL); 306 307 Corrupt(kDescriptorFile, 0, 1000); 308 Status s = TryReopen(); 309 ASSERT_TRUE(!s.ok()); 310 311 RepairDB(); 312 Reopen(); 313 std::string v; 314 ASSERT_OK(db_->Get(ReadOptions(), "foo", &v)); 315 ASSERT_EQ("hello", v); 316 } 317 318 TEST(CorruptionTest, CompactionInputError) { 319 Build(10); 320 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); 321 dbi->TEST_CompactMemTable(); 322 const int last = config::kMaxMemCompactLevel; 323 ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last))); 324 325 Corrupt(kTableFile, 100, 1); 326 Check(5, 9); 327 328 // Force compactions by writing lots of values 329 Build(10000); 330 Check(10000, 10000); 331 } 332 333 TEST(CorruptionTest, CompactionInputErrorParanoid) { 334 options_.paranoid_checks = true; 335 options_.write_buffer_size = 512 << 10; 336 Reopen(); 337 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); 338 339 // Make multiple inputs so we need to compact. 340 for (int i = 0; i < 2; i++) { 341 Build(10); 342 dbi->TEST_CompactMemTable(); 343 Corrupt(kTableFile, 100, 1); 344 env_.SleepForMicroseconds(100000); 345 } 346 dbi->CompactRange(NULL, NULL); 347 348 // Write must fail because of corrupted table 349 std::string tmp1, tmp2; 350 Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2)); 351 ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db"; 352 } 353 354 TEST(CorruptionTest, UnrelatedKeys) { 355 Build(10); 356 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); 357 dbi->TEST_CompactMemTable(); 358 Corrupt(kTableFile, 100, 1); 359 360 std::string tmp1, tmp2; 361 ASSERT_OK(db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2))); 362 std::string v; 363 ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v)); 364 ASSERT_EQ(Value(1000, &tmp2).ToString(), v); 365 dbi->TEST_CompactMemTable(); 366 ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v)); 367 ASSERT_EQ(Value(1000, &tmp2).ToString(), v); 368 } 369 370 } // namespace leveldb 371 372 int main(int argc, char** argv) { 373 return leveldb::test::RunAllTests(); 374 } 375