Home | History | Annotate | Download | only in db
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include "db/db_impl.h"
      6 
      7 #include <algorithm>
      8 #include <set>
      9 #include <string>
     10 #include <stdint.h>
     11 #include <stdio.h>
     12 #include <vector>
     13 #include "db/builder.h"
     14 #include "db/db_iter.h"
     15 #include "db/dbformat.h"
     16 #include "db/filename.h"
     17 #include "db/log_reader.h"
     18 #include "db/log_writer.h"
     19 #include "db/memtable.h"
     20 #include "db/table_cache.h"
     21 #include "db/version_set.h"
     22 #include "db/write_batch_internal.h"
     23 #include "leveldb/db.h"
     24 #include "leveldb/env.h"
     25 #include "leveldb/status.h"
     26 #include "leveldb/table.h"
     27 #include "leveldb/table_builder.h"
     28 #include "port/port.h"
     29 #include "table/block.h"
     30 #include "table/merger.h"
     31 #include "table/two_level_iterator.h"
     32 #include "util/coding.h"
     33 #include "util/logging.h"
     34 #include "util/mutexlock.h"
     35 
     36 namespace leveldb {
     37 
     38 const int kNumNonTableCacheFiles = 10;
     39 
     40 // Information kept for every waiting writer
     41 struct DBImpl::Writer {
     42   Status status;
     43   WriteBatch* batch;
     44   bool sync;
     45   bool done;
     46   port::CondVar cv;
     47 
     48   explicit Writer(port::Mutex* mu) : cv(mu) { }
     49 };
     50 
     51 struct DBImpl::CompactionState {
     52   Compaction* const compaction;
     53 
     54   // Sequence numbers < smallest_snapshot are not significant since we
     55   // will never have to service a snapshot below smallest_snapshot.
     56   // Therefore if we have seen a sequence number S <= smallest_snapshot,
     57   // we can drop all entries for the same key with sequence numbers < S.
     58   SequenceNumber smallest_snapshot;
     59 
     60   // Files produced by compaction
     61   struct Output {
     62     uint64_t number;
     63     uint64_t file_size;
     64     InternalKey smallest, largest;
     65   };
     66   std::vector<Output> outputs;
     67 
     68   // State kept for output being generated
     69   WritableFile* outfile;
     70   TableBuilder* builder;
     71 
     72   uint64_t total_bytes;
     73 
     74   Output* current_output() { return &outputs[outputs.size()-1]; }
     75 
     76   explicit CompactionState(Compaction* c)
     77       : compaction(c),
     78         outfile(NULL),
     79         builder(NULL),
     80         total_bytes(0) {
     81   }
     82 };
     83 
     84 // Fix user-supplied options to be reasonable
     85 template <class T,class V>
     86 static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
     87   if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
     88   if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
     89 }
     90 Options SanitizeOptions(const std::string& dbname,
     91                         const InternalKeyComparator* icmp,
     92                         const InternalFilterPolicy* ipolicy,
     93                         const Options& src) {
     94   Options result = src;
     95   result.comparator = icmp;
     96   result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
     97   ClipToRange(&result.max_open_files,    64 + kNumNonTableCacheFiles, 50000);
     98   ClipToRange(&result.write_buffer_size, 64<<10,                      1<<30);
     99   ClipToRange(&result.block_size,        1<<10,                       4<<20);
    100   if (result.info_log == NULL) {
    101     // Open a log file in the same directory as the db
    102     src.env->CreateDir(dbname);  // In case it does not exist
    103     src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
    104     Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
    105     if (!s.ok()) {
    106       // No place suitable for logging
    107       result.info_log = NULL;
    108     }
    109   }
    110   if (result.block_cache == NULL) {
    111     result.block_cache = NewLRUCache(8 << 20);
    112   }
    113   return result;
    114 }
    115 
    116 DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
    117     : env_(raw_options.env),
    118       internal_comparator_(raw_options.comparator),
    119       internal_filter_policy_(raw_options.filter_policy),
    120       options_(SanitizeOptions(dbname, &internal_comparator_,
    121                                &internal_filter_policy_, raw_options)),
    122       owns_info_log_(options_.info_log != raw_options.info_log),
    123       owns_cache_(options_.block_cache != raw_options.block_cache),
    124       dbname_(dbname),
    125       db_lock_(NULL),
    126       shutting_down_(NULL),
    127       bg_cv_(&mutex_),
    128       mem_(new MemTable(internal_comparator_)),
    129       imm_(NULL),
    130       logfile_(NULL),
    131       logfile_number_(0),
    132       log_(NULL),
    133       seed_(0),
    134       tmp_batch_(new WriteBatch),
    135       bg_compaction_scheduled_(false),
    136       manual_compaction_(NULL) {
    137   mem_->Ref();
    138   has_imm_.Release_Store(NULL);
    139 
    140   // Reserve ten files or so for other uses and give the rest to TableCache.
    141   const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
    142   table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
    143 
    144   versions_ = new VersionSet(dbname_, &options_, table_cache_,
    145                              &internal_comparator_);
    146 }
    147 
    148 DBImpl::~DBImpl() {
    149   // Wait for background work to finish
    150   mutex_.Lock();
    151   shutting_down_.Release_Store(this);  // Any non-NULL value is ok
    152   while (bg_compaction_scheduled_) {
    153     bg_cv_.Wait();
    154   }
    155   mutex_.Unlock();
    156 
    157   if (db_lock_ != NULL) {
    158     env_->UnlockFile(db_lock_);
    159   }
    160 
    161   delete versions_;
    162   if (mem_ != NULL) mem_->Unref();
    163   if (imm_ != NULL) imm_->Unref();
    164   delete tmp_batch_;
    165   delete log_;
    166   delete logfile_;
    167   delete table_cache_;
    168 
    169   if (owns_info_log_) {
    170     delete options_.info_log;
    171   }
    172   if (owns_cache_) {
    173     delete options_.block_cache;
    174   }
    175 }
    176 
    177 Status DBImpl::NewDB() {
    178   VersionEdit new_db;
    179   new_db.SetComparatorName(user_comparator()->Name());
    180   new_db.SetLogNumber(0);
    181   new_db.SetNextFile(2);
    182   new_db.SetLastSequence(0);
    183 
    184   const std::string manifest = DescriptorFileName(dbname_, 1);
    185   WritableFile* file;
    186   Status s = env_->NewWritableFile(manifest, &file);
    187   if (!s.ok()) {
    188     return s;
    189   }
    190   {
    191     log::Writer log(file);
    192     std::string record;
    193     new_db.EncodeTo(&record);
    194     s = log.AddRecord(record);
    195     if (s.ok()) {
    196       s = file->Close();
    197     }
    198   }
    199   delete file;
    200   if (s.ok()) {
    201     // Make "CURRENT" file that points to the new manifest file.
    202     s = SetCurrentFile(env_, dbname_, 1);
    203   } else {
    204     env_->DeleteFile(manifest);
    205   }
    206   return s;
    207 }
    208 
    209 void DBImpl::MaybeIgnoreError(Status* s) const {
    210   if (s->ok() || options_.paranoid_checks) {
    211     // No change needed
    212   } else {
    213     Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
    214     *s = Status::OK();
    215   }
    216 }
    217 
    218 void DBImpl::DeleteObsoleteFiles() {
    219   if (!bg_error_.ok()) {
    220     // After a background error, we don't know whether a new version may
    221     // or may not have been committed, so we cannot safely garbage collect.
    222     return;
    223   }
    224 
    225   // Make a set of all of the live files
    226   std::set<uint64_t> live = pending_outputs_;
    227   versions_->AddLiveFiles(&live);
    228 
    229   std::vector<std::string> filenames;
    230   env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
    231   uint64_t number;
    232   FileType type;
    233   for (size_t i = 0; i < filenames.size(); i++) {
    234     if (ParseFileName(filenames[i], &number, &type)) {
    235       bool keep = true;
    236       switch (type) {
    237         case kLogFile:
    238           keep = ((number >= versions_->LogNumber()) ||
    239                   (number == versions_->PrevLogNumber()));
    240           break;
    241         case kDescriptorFile:
    242           // Keep my manifest file, and any newer incarnations'
    243           // (in case there is a race that allows other incarnations)
    244           keep = (number >= versions_->ManifestFileNumber());
    245           break;
    246         case kTableFile:
    247           keep = (live.find(number) != live.end());
    248           break;
    249         case kTempFile:
    250           // Any temp files that are currently being written to must
    251           // be recorded in pending_outputs_, which is inserted into "live"
    252           keep = (live.find(number) != live.end());
    253           break;
    254         case kCurrentFile:
    255         case kDBLockFile:
    256         case kInfoLogFile:
    257           keep = true;
    258           break;
    259       }
    260 
    261       if (!keep) {
    262         if (type == kTableFile) {
    263           table_cache_->Evict(number);
    264         }
    265         Log(options_.info_log, "Delete type=%d #%lld\n",
    266             int(type),
    267             static_cast<unsigned long long>(number));
    268         env_->DeleteFile(dbname_ + "/" + filenames[i]);
    269       }
    270     }
    271   }
    272 }
    273 
    274 Status DBImpl::Recover(VersionEdit* edit) {
    275   mutex_.AssertHeld();
    276 
    277   // Ignore error from CreateDir since the creation of the DB is
    278   // committed only when the descriptor is created, and this directory
    279   // may already exist from a previous failed creation attempt.
    280   env_->CreateDir(dbname_);
    281   assert(db_lock_ == NULL);
    282   Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
    283   if (!s.ok()) {
    284     return s;
    285   }
    286 
    287   if (!env_->FileExists(CurrentFileName(dbname_))) {
    288     if (options_.create_if_missing) {
    289       s = NewDB();
    290       if (!s.ok()) {
    291         return s;
    292       }
    293     } else {
    294       return Status::InvalidArgument(
    295           dbname_, "does not exist (create_if_missing is false)");
    296     }
    297   } else {
    298     if (options_.error_if_exists) {
    299       return Status::InvalidArgument(
    300           dbname_, "exists (error_if_exists is true)");
    301     }
    302   }
    303 
    304   s = versions_->Recover();
    305   if (s.ok()) {
    306     SequenceNumber max_sequence(0);
    307 
    308     // Recover from all newer log files than the ones named in the
    309     // descriptor (new log files may have been added by the previous
    310     // incarnation without registering them in the descriptor).
    311     //
    312     // Note that PrevLogNumber() is no longer used, but we pay
    313     // attention to it in case we are recovering a database
    314     // produced by an older version of leveldb.
    315     const uint64_t min_log = versions_->LogNumber();
    316     const uint64_t prev_log = versions_->PrevLogNumber();
    317     std::vector<std::string> filenames;
    318     s = env_->GetChildren(dbname_, &filenames);
    319     if (!s.ok()) {
    320       return s;
    321     }
    322     std::set<uint64_t> expected;
    323     versions_->AddLiveFiles(&expected);
    324     uint64_t number;
    325     FileType type;
    326     std::vector<uint64_t> logs;
    327     for (size_t i = 0; i < filenames.size(); i++) {
    328       if (ParseFileName(filenames[i], &number, &type)) {
    329         expected.erase(number);
    330         if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
    331           logs.push_back(number);
    332       }
    333     }
    334     if (!expected.empty()) {
    335       char buf[50];
    336       snprintf(buf, sizeof(buf), "%d missing files; e.g.",
    337                static_cast<int>(expected.size()));
    338       return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
    339     }
    340 
    341     // Recover in the order in which the logs were generated
    342     std::sort(logs.begin(), logs.end());
    343     for (size_t i = 0; i < logs.size(); i++) {
    344       s = RecoverLogFile(logs[i], edit, &max_sequence);
    345 
    346       // The previous incarnation may not have written any MANIFEST
    347       // records after allocating this log number.  So we manually
    348       // update the file number allocation counter in VersionSet.
    349       versions_->MarkFileNumberUsed(logs[i]);
    350     }
    351 
    352     if (s.ok()) {
    353       if (versions_->LastSequence() < max_sequence) {
    354         versions_->SetLastSequence(max_sequence);
    355       }
    356     }
    357   }
    358 
    359   return s;
    360 }
    361 
    362 Status DBImpl::RecoverLogFile(uint64_t log_number,
    363                               VersionEdit* edit,
    364                               SequenceNumber* max_sequence) {
    365   struct LogReporter : public log::Reader::Reporter {
    366     Env* env;
    367     Logger* info_log;
    368     const char* fname;
    369     Status* status;  // NULL if options_.paranoid_checks==false
    370     virtual void Corruption(size_t bytes, const Status& s) {
    371       Log(info_log, "%s%s: dropping %d bytes; %s",
    372           (this->status == NULL ? "(ignoring error) " : ""),
    373           fname, static_cast<int>(bytes), s.ToString().c_str());
    374       if (this->status != NULL && this->status->ok()) *this->status = s;
    375     }
    376   };
    377 
    378   mutex_.AssertHeld();
    379 
    380   // Open the log file
    381   std::string fname = LogFileName(dbname_, log_number);
    382   SequentialFile* file;
    383   Status status = env_->NewSequentialFile(fname, &file);
    384   if (!status.ok()) {
    385     MaybeIgnoreError(&status);
    386     return status;
    387   }
    388 
    389   // Create the log reader.
    390   LogReporter reporter;
    391   reporter.env = env_;
    392   reporter.info_log = options_.info_log;
    393   reporter.fname = fname.c_str();
    394   reporter.status = (options_.paranoid_checks ? &status : NULL);
    395   // We intentially make log::Reader do checksumming even if
    396   // paranoid_checks==false so that corruptions cause entire commits
    397   // to be skipped instead of propagating bad information (like overly
    398   // large sequence numbers).
    399   log::Reader reader(file, &reporter, true/*checksum*/,
    400                      0/*initial_offset*/);
    401   Log(options_.info_log, "Recovering log #%llu",
    402       (unsigned long long) log_number);
    403 
    404   // Read all the records and add to a memtable
    405   std::string scratch;
    406   Slice record;
    407   WriteBatch batch;
    408   MemTable* mem = NULL;
    409   while (reader.ReadRecord(&record, &scratch) &&
    410          status.ok()) {
    411     if (record.size() < 12) {
    412       reporter.Corruption(
    413           record.size(), Status::Corruption("log record too small"));
    414       continue;
    415     }
    416     WriteBatchInternal::SetContents(&batch, record);
    417 
    418     if (mem == NULL) {
    419       mem = new MemTable(internal_comparator_);
    420       mem->Ref();
    421     }
    422     status = WriteBatchInternal::InsertInto(&batch, mem);
    423     MaybeIgnoreError(&status);
    424     if (!status.ok()) {
    425       break;
    426     }
    427     const SequenceNumber last_seq =
    428         WriteBatchInternal::Sequence(&batch) +
    429         WriteBatchInternal::Count(&batch) - 1;
    430     if (last_seq > *max_sequence) {
    431       *max_sequence = last_seq;
    432     }
    433 
    434     if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
    435       status = WriteLevel0Table(mem, edit, NULL);
    436       if (!status.ok()) {
    437         // Reflect errors immediately so that conditions like full
    438         // file-systems cause the DB::Open() to fail.
    439         break;
    440       }
    441       mem->Unref();
    442       mem = NULL;
    443     }
    444   }
    445 
    446   if (status.ok() && mem != NULL) {
    447     status = WriteLevel0Table(mem, edit, NULL);
    448     // Reflect errors immediately so that conditions like full
    449     // file-systems cause the DB::Open() to fail.
    450   }
    451 
    452   if (mem != NULL) mem->Unref();
    453   delete file;
    454   return status;
    455 }
    456 
    457 Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
    458                                 Version* base) {
    459   mutex_.AssertHeld();
    460   const uint64_t start_micros = env_->NowMicros();
    461   FileMetaData meta;
    462   meta.number = versions_->NewFileNumber();
    463   pending_outputs_.insert(meta.number);
    464   Iterator* iter = mem->NewIterator();
    465   Log(options_.info_log, "Level-0 table #%llu: started",
    466       (unsigned long long) meta.number);
    467 
    468   Status s;
    469   {
    470     mutex_.Unlock();
    471     s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
    472     mutex_.Lock();
    473   }
    474 
    475   Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
    476       (unsigned long long) meta.number,
    477       (unsigned long long) meta.file_size,
    478       s.ToString().c_str());
    479   delete iter;
    480   pending_outputs_.erase(meta.number);
    481 
    482 
    483   // Note that if file_size is zero, the file has been deleted and
    484   // should not be added to the manifest.
    485   int level = 0;
    486   if (s.ok() && meta.file_size > 0) {
    487     const Slice min_user_key = meta.smallest.user_key();
    488     const Slice max_user_key = meta.largest.user_key();
    489     if (base != NULL) {
    490       level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
    491     }
    492     edit->AddFile(level, meta.number, meta.file_size,
    493                   meta.smallest, meta.largest);
    494   }
    495 
    496   CompactionStats stats;
    497   stats.micros = env_->NowMicros() - start_micros;
    498   stats.bytes_written = meta.file_size;
    499   stats_[level].Add(stats);
    500   return s;
    501 }
    502 
    503 void DBImpl::CompactMemTable() {
    504   mutex_.AssertHeld();
    505   assert(imm_ != NULL);
    506 
    507   // Save the contents of the memtable as a new Table
    508   VersionEdit edit;
    509   Version* base = versions_->current();
    510   base->Ref();
    511   Status s = WriteLevel0Table(imm_, &edit, base);
    512   base->Unref();
    513 
    514   if (s.ok() && shutting_down_.Acquire_Load()) {
    515     s = Status::IOError("Deleting DB during memtable compaction");
    516   }
    517 
    518   // Replace immutable memtable with the generated Table
    519   if (s.ok()) {
    520     edit.SetPrevLogNumber(0);
    521     edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
    522     s = versions_->LogAndApply(&edit, &mutex_);
    523   }
    524 
    525   if (s.ok()) {
    526     // Commit to the new state
    527     imm_->Unref();
    528     imm_ = NULL;
    529     has_imm_.Release_Store(NULL);
    530     DeleteObsoleteFiles();
    531   } else {
    532     RecordBackgroundError(s);
    533   }
    534 }
    535 
    536 void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
    537   int max_level_with_files = 1;
    538   {
    539     MutexLock l(&mutex_);
    540     Version* base = versions_->current();
    541     for (int level = 1; level < config::kNumLevels; level++) {
    542       if (base->OverlapInLevel(level, begin, end)) {
    543         max_level_with_files = level;
    544       }
    545     }
    546   }
    547   TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
    548   for (int level = 0; level < max_level_with_files; level++) {
    549     TEST_CompactRange(level, begin, end);
    550   }
    551 }
    552 
    553 void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
    554   assert(level >= 0);
    555   assert(level + 1 < config::kNumLevels);
    556 
    557   InternalKey begin_storage, end_storage;
    558 
    559   ManualCompaction manual;
    560   manual.level = level;
    561   manual.done = false;
    562   if (begin == NULL) {
    563     manual.begin = NULL;
    564   } else {
    565     begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
    566     manual.begin = &begin_storage;
    567   }
    568   if (end == NULL) {
    569     manual.end = NULL;
    570   } else {
    571     end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
    572     manual.end = &end_storage;
    573   }
    574 
    575   MutexLock l(&mutex_);
    576   while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
    577     if (manual_compaction_ == NULL) {  // Idle
    578       manual_compaction_ = &manual;
    579       MaybeScheduleCompaction();
    580     } else {  // Running either my compaction or another compaction.
    581       bg_cv_.Wait();
    582     }
    583   }
    584   if (manual_compaction_ == &manual) {
    585     // Cancel my manual compaction since we aborted early for some reason.
    586     manual_compaction_ = NULL;
    587   }
    588 }
    589 
    590 Status DBImpl::TEST_CompactMemTable() {
    591   // NULL batch means just wait for earlier writes to be done
    592   Status s = Write(WriteOptions(), NULL);
    593   if (s.ok()) {
    594     // Wait until the compaction completes
    595     MutexLock l(&mutex_);
    596     while (imm_ != NULL && bg_error_.ok()) {
    597       bg_cv_.Wait();
    598     }
    599     if (imm_ != NULL) {
    600       s = bg_error_;
    601     }
    602   }
    603   return s;
    604 }
    605 
    606 void DBImpl::RecordBackgroundError(const Status& s) {
    607   mutex_.AssertHeld();
    608   if (bg_error_.ok()) {
    609     bg_error_ = s;
    610     bg_cv_.SignalAll();
    611   }
    612 }
    613 
    614 void DBImpl::MaybeScheduleCompaction() {
    615   mutex_.AssertHeld();
    616   if (bg_compaction_scheduled_) {
    617     // Already scheduled
    618   } else if (shutting_down_.Acquire_Load()) {
    619     // DB is being deleted; no more background compactions
    620   } else if (!bg_error_.ok()) {
    621     // Already got an error; no more changes
    622   } else if (imm_ == NULL &&
    623              manual_compaction_ == NULL &&
    624              !versions_->NeedsCompaction()) {
    625     // No work to be done
    626   } else {
    627     bg_compaction_scheduled_ = true;
    628     env_->Schedule(&DBImpl::BGWork, this);
    629   }
    630 }
    631 
    632 void DBImpl::BGWork(void* db) {
    633   reinterpret_cast<DBImpl*>(db)->BackgroundCall();
    634 }
    635 
    636 void DBImpl::BackgroundCall() {
    637   MutexLock l(&mutex_);
    638   assert(bg_compaction_scheduled_);
    639   if (shutting_down_.Acquire_Load()) {
    640     // No more background work when shutting down.
    641   } else if (!bg_error_.ok()) {
    642     // No more background work after a background error.
    643   } else {
    644     BackgroundCompaction();
    645   }
    646 
    647   bg_compaction_scheduled_ = false;
    648 
    649   // Previous compaction may have produced too many files in a level,
    650   // so reschedule another compaction if needed.
    651   MaybeScheduleCompaction();
    652   bg_cv_.SignalAll();
    653 }
    654 
    655 void DBImpl::BackgroundCompaction() {
    656   mutex_.AssertHeld();
    657 
    658   if (imm_ != NULL) {
    659     CompactMemTable();
    660     return;
    661   }
    662 
    663   Compaction* c;
    664   bool is_manual = (manual_compaction_ != NULL);
    665   InternalKey manual_end;
    666   if (is_manual) {
    667     ManualCompaction* m = manual_compaction_;
    668     c = versions_->CompactRange(m->level, m->begin, m->end);
    669     m->done = (c == NULL);
    670     if (c != NULL) {
    671       manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
    672     }
    673     Log(options_.info_log,
    674         "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
    675         m->level,
    676         (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
    677         (m->end ? m->end->DebugString().c_str() : "(end)"),
    678         (m->done ? "(end)" : manual_end.DebugString().c_str()));
    679   } else {
    680     c = versions_->PickCompaction();
    681   }
    682 
    683   Status status;
    684   if (c == NULL) {
    685     // Nothing to do
    686   } else if (!is_manual && c->IsTrivialMove()) {
    687     // Move file to next level
    688     assert(c->num_input_files(0) == 1);
    689     FileMetaData* f = c->input(0, 0);
    690     c->edit()->DeleteFile(c->level(), f->number);
    691     c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
    692                        f->smallest, f->largest);
    693     status = versions_->LogAndApply(c->edit(), &mutex_);
    694     if (!status.ok()) {
    695       RecordBackgroundError(status);
    696     }
    697     VersionSet::LevelSummaryStorage tmp;
    698     Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
    699         static_cast<unsigned long long>(f->number),
    700         c->level() + 1,
    701         static_cast<unsigned long long>(f->file_size),
    702         status.ToString().c_str(),
    703         versions_->LevelSummary(&tmp));
    704   } else {
    705     CompactionState* compact = new CompactionState(c);
    706     status = DoCompactionWork(compact);
    707     if (!status.ok()) {
    708       RecordBackgroundError(status);
    709     }
    710     CleanupCompaction(compact);
    711     c->ReleaseInputs();
    712     DeleteObsoleteFiles();
    713   }
    714   delete c;
    715 
    716   if (status.ok()) {
    717     // Done
    718   } else if (shutting_down_.Acquire_Load()) {
    719     // Ignore compaction errors found during shutting down
    720   } else {
    721     Log(options_.info_log,
    722         "Compaction error: %s", status.ToString().c_str());
    723   }
    724 
    725   if (is_manual) {
    726     ManualCompaction* m = manual_compaction_;
    727     if (!status.ok()) {
    728       m->done = true;
    729     }
    730     if (!m->done) {
    731       // We only compacted part of the requested range.  Update *m
    732       // to the range that is left to be compacted.
    733       m->tmp_storage = manual_end;
    734       m->begin = &m->tmp_storage;
    735     }
    736     manual_compaction_ = NULL;
    737   }
    738 }
    739 
    740 void DBImpl::CleanupCompaction(CompactionState* compact) {
    741   mutex_.AssertHeld();
    742   if (compact->builder != NULL) {
    743     // May happen if we get a shutdown call in the middle of compaction
    744     compact->builder->Abandon();
    745     delete compact->builder;
    746   } else {
    747     assert(compact->outfile == NULL);
    748   }
    749   delete compact->outfile;
    750   for (size_t i = 0; i < compact->outputs.size(); i++) {
    751     const CompactionState::Output& out = compact->outputs[i];
    752     pending_outputs_.erase(out.number);
    753   }
    754   delete compact;
    755 }
    756 
    757 Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
    758   assert(compact != NULL);
    759   assert(compact->builder == NULL);
    760   uint64_t file_number;
    761   {
    762     mutex_.Lock();
    763     file_number = versions_->NewFileNumber();
    764     pending_outputs_.insert(file_number);
    765     CompactionState::Output out;
    766     out.number = file_number;
    767     out.smallest.Clear();
    768     out.largest.Clear();
    769     compact->outputs.push_back(out);
    770     mutex_.Unlock();
    771   }
    772 
    773   // Make the output file
    774   std::string fname = TableFileName(dbname_, file_number);
    775   Status s = env_->NewWritableFile(fname, &compact->outfile);
    776   if (s.ok()) {
    777     compact->builder = new TableBuilder(options_, compact->outfile);
    778   }
    779   return s;
    780 }
    781 
    782 Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
    783                                           Iterator* input) {
    784   assert(compact != NULL);
    785   assert(compact->outfile != NULL);
    786   assert(compact->builder != NULL);
    787 
    788   const uint64_t output_number = compact->current_output()->number;
    789   assert(output_number != 0);
    790 
    791   // Check for iterator errors
    792   Status s = input->status();
    793   const uint64_t current_entries = compact->builder->NumEntries();
    794   if (s.ok()) {
    795     s = compact->builder->Finish();
    796   } else {
    797     compact->builder->Abandon();
    798   }
    799   const uint64_t current_bytes = compact->builder->FileSize();
    800   compact->current_output()->file_size = current_bytes;
    801   compact->total_bytes += current_bytes;
    802   delete compact->builder;
    803   compact->builder = NULL;
    804 
    805   // Finish and check for file errors
    806   if (s.ok()) {
    807     s = compact->outfile->Sync();
    808   }
    809   if (s.ok()) {
    810     s = compact->outfile->Close();
    811   }
    812   delete compact->outfile;
    813   compact->outfile = NULL;
    814 
    815   if (s.ok() && current_entries > 0) {
    816     // Verify that the table is usable
    817     Iterator* iter = table_cache_->NewIterator(ReadOptions(),
    818                                                output_number,
    819                                                current_bytes);
    820     s = iter->status();
    821     delete iter;
    822     if (s.ok()) {
    823       Log(options_.info_log,
    824           "Generated table #%llu: %lld keys, %lld bytes",
    825           (unsigned long long) output_number,
    826           (unsigned long long) current_entries,
    827           (unsigned long long) current_bytes);
    828     }
    829   }
    830   return s;
    831 }
    832 
    833 
    834 Status DBImpl::InstallCompactionResults(CompactionState* compact) {
    835   mutex_.AssertHeld();
    836   Log(options_.info_log,  "Compacted %d@%d + %d@%d files => %lld bytes",
    837       compact->compaction->num_input_files(0),
    838       compact->compaction->level(),
    839       compact->compaction->num_input_files(1),
    840       compact->compaction->level() + 1,
    841       static_cast<long long>(compact->total_bytes));
    842 
    843   // Add compaction outputs
    844   compact->compaction->AddInputDeletions(compact->compaction->edit());
    845   const int level = compact->compaction->level();
    846   for (size_t i = 0; i < compact->outputs.size(); i++) {
    847     const CompactionState::Output& out = compact->outputs[i];
    848     compact->compaction->edit()->AddFile(
    849         level + 1,
    850         out.number, out.file_size, out.smallest, out.largest);
    851   }
    852   return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
    853 }
    854 
    855 Status DBImpl::DoCompactionWork(CompactionState* compact) {
    856   const uint64_t start_micros = env_->NowMicros();
    857   int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
    858 
    859   Log(options_.info_log,  "Compacting %d@%d + %d@%d files",
    860       compact->compaction->num_input_files(0),
    861       compact->compaction->level(),
    862       compact->compaction->num_input_files(1),
    863       compact->compaction->level() + 1);
    864 
    865   assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
    866   assert(compact->builder == NULL);
    867   assert(compact->outfile == NULL);
    868   if (snapshots_.empty()) {
    869     compact->smallest_snapshot = versions_->LastSequence();
    870   } else {
    871     compact->smallest_snapshot = snapshots_.oldest()->number_;
    872   }
    873 
    874   // Release mutex while we're actually doing the compaction work
    875   mutex_.Unlock();
    876 
    877   Iterator* input = versions_->MakeInputIterator(compact->compaction);
    878   input->SeekToFirst();
    879   Status status;
    880   ParsedInternalKey ikey;
    881   std::string current_user_key;
    882   bool has_current_user_key = false;
    883   SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
    884   for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
    885     // Prioritize immutable compaction work
    886     if (has_imm_.NoBarrier_Load() != NULL) {
    887       const uint64_t imm_start = env_->NowMicros();
    888       mutex_.Lock();
    889       if (imm_ != NULL) {
    890         CompactMemTable();
    891         bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary
    892       }
    893       mutex_.Unlock();
    894       imm_micros += (env_->NowMicros() - imm_start);
    895     }
    896 
    897     Slice key = input->key();
    898     if (compact->compaction->ShouldStopBefore(key) &&
    899         compact->builder != NULL) {
    900       status = FinishCompactionOutputFile(compact, input);
    901       if (!status.ok()) {
    902         break;
    903       }
    904     }
    905 
    906     // Handle key/value, add to state, etc.
    907     bool drop = false;
    908     if (!ParseInternalKey(key, &ikey)) {
    909       // Do not hide error keys
    910       current_user_key.clear();
    911       has_current_user_key = false;
    912       last_sequence_for_key = kMaxSequenceNumber;
    913     } else {
    914       if (!has_current_user_key ||
    915           user_comparator()->Compare(ikey.user_key,
    916                                      Slice(current_user_key)) != 0) {
    917         // First occurrence of this user key
    918         current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
    919         has_current_user_key = true;
    920         last_sequence_for_key = kMaxSequenceNumber;
    921       }
    922 
    923       if (last_sequence_for_key <= compact->smallest_snapshot) {
    924         // Hidden by an newer entry for same user key
    925         drop = true;    // (A)
    926       } else if (ikey.type == kTypeDeletion &&
    927                  ikey.sequence <= compact->smallest_snapshot &&
    928                  compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
    929         // For this user key:
    930         // (1) there is no data in higher levels
    931         // (2) data in lower levels will have larger sequence numbers
    932         // (3) data in layers that are being compacted here and have
    933         //     smaller sequence numbers will be dropped in the next
    934         //     few iterations of this loop (by rule (A) above).
    935         // Therefore this deletion marker is obsolete and can be dropped.
    936         drop = true;
    937       }
    938 
    939       last_sequence_for_key = ikey.sequence;
    940     }
    941 #if 0
    942     Log(options_.info_log,
    943         "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
    944         "%d smallest_snapshot: %d",
    945         ikey.user_key.ToString().c_str(),
    946         (int)ikey.sequence, ikey.type, kTypeValue, drop,
    947         compact->compaction->IsBaseLevelForKey(ikey.user_key),
    948         (int)last_sequence_for_key, (int)compact->smallest_snapshot);
    949 #endif
    950 
    951     if (!drop) {
    952       // Open output file if necessary
    953       if (compact->builder == NULL) {
    954         status = OpenCompactionOutputFile(compact);
    955         if (!status.ok()) {
    956           break;
    957         }
    958       }
    959       if (compact->builder->NumEntries() == 0) {
    960         compact->current_output()->smallest.DecodeFrom(key);
    961       }
    962       compact->current_output()->largest.DecodeFrom(key);
    963       compact->builder->Add(key, input->value());
    964 
    965       // Close output file if it is big enough
    966       if (compact->builder->FileSize() >=
    967           compact->compaction->MaxOutputFileSize()) {
    968         status = FinishCompactionOutputFile(compact, input);
    969         if (!status.ok()) {
    970           break;
    971         }
    972       }
    973     }
    974 
    975     input->Next();
    976   }
    977 
    978   if (status.ok() && shutting_down_.Acquire_Load()) {
    979     status = Status::IOError("Deleting DB during compaction");
    980   }
    981   if (status.ok() && compact->builder != NULL) {
    982     status = FinishCompactionOutputFile(compact, input);
    983   }
    984   if (status.ok()) {
    985     status = input->status();
    986   }
    987   delete input;
    988   input = NULL;
    989 
    990   CompactionStats stats;
    991   stats.micros = env_->NowMicros() - start_micros - imm_micros;
    992   for (int which = 0; which < 2; which++) {
    993     for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
    994       stats.bytes_read += compact->compaction->input(which, i)->file_size;
    995     }
    996   }
    997   for (size_t i = 0; i < compact->outputs.size(); i++) {
    998     stats.bytes_written += compact->outputs[i].file_size;
    999   }
   1000 
   1001   mutex_.Lock();
   1002   stats_[compact->compaction->level() + 1].Add(stats);
   1003 
   1004   if (status.ok()) {
   1005     status = InstallCompactionResults(compact);
   1006   }
   1007   if (!status.ok()) {
   1008     RecordBackgroundError(status);
   1009   }
   1010   VersionSet::LevelSummaryStorage tmp;
   1011   Log(options_.info_log,
   1012       "compacted to: %s", versions_->LevelSummary(&tmp));
   1013   return status;
   1014 }
   1015 
   1016 namespace {
   1017 struct IterState {
   1018   port::Mutex* mu;
   1019   Version* version;
   1020   MemTable* mem;
   1021   MemTable* imm;
   1022 };
   1023 
   1024 static void CleanupIteratorState(void* arg1, void* arg2) {
   1025   IterState* state = reinterpret_cast<IterState*>(arg1);
   1026   state->mu->Lock();
   1027   state->mem->Unref();
   1028   if (state->imm != NULL) state->imm->Unref();
   1029   state->version->Unref();
   1030   state->mu->Unlock();
   1031   delete state;
   1032 }
   1033 }  // namespace
   1034 
   1035 Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
   1036                                       SequenceNumber* latest_snapshot,
   1037                                       uint32_t* seed) {
   1038   IterState* cleanup = new IterState;
   1039   mutex_.Lock();
   1040   *latest_snapshot = versions_->LastSequence();
   1041 
   1042   // Collect together all needed child iterators
   1043   std::vector<Iterator*> list;
   1044   list.push_back(mem_->NewIterator());
   1045   mem_->Ref();
   1046   if (imm_ != NULL) {
   1047     list.push_back(imm_->NewIterator());
   1048     imm_->Ref();
   1049   }
   1050   versions_->current()->AddIterators(options, &list);
   1051   Iterator* internal_iter =
   1052       NewMergingIterator(&internal_comparator_, &list[0], list.size());
   1053   versions_->current()->Ref();
   1054 
   1055   cleanup->mu = &mutex_;
   1056   cleanup->mem = mem_;
   1057   cleanup->imm = imm_;
   1058   cleanup->version = versions_->current();
   1059   internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
   1060 
   1061   *seed = ++seed_;
   1062   mutex_.Unlock();
   1063   return internal_iter;
   1064 }
   1065 
   1066 Iterator* DBImpl::TEST_NewInternalIterator() {
   1067   SequenceNumber ignored;
   1068   uint32_t ignored_seed;
   1069   return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
   1070 }
   1071 
   1072 int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
   1073   MutexLock l(&mutex_);
   1074   return versions_->MaxNextLevelOverlappingBytes();
   1075 }
   1076 
   1077 Status DBImpl::Get(const ReadOptions& options,
   1078                    const Slice& key,
   1079                    std::string* value) {
   1080   Status s;
   1081   MutexLock l(&mutex_);
   1082   SequenceNumber snapshot;
   1083   if (options.snapshot != NULL) {
   1084     snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
   1085   } else {
   1086     snapshot = versions_->LastSequence();
   1087   }
   1088 
   1089   MemTable* mem = mem_;
   1090   MemTable* imm = imm_;
   1091   Version* current = versions_->current();
   1092   mem->Ref();
   1093   if (imm != NULL) imm->Ref();
   1094   current->Ref();
   1095 
   1096   bool have_stat_update = false;
   1097   Version::GetStats stats;
   1098 
   1099   // Unlock while reading from files and memtables
   1100   {
   1101     mutex_.Unlock();
   1102     // First look in the memtable, then in the immutable memtable (if any).
   1103     LookupKey lkey(key, snapshot);
   1104     if (mem->Get(lkey, value, &s)) {
   1105       // Done
   1106     } else if (imm != NULL && imm->Get(lkey, value, &s)) {
   1107       // Done
   1108     } else {
   1109       s = current->Get(options, lkey, value, &stats);
   1110       have_stat_update = true;
   1111     }
   1112     mutex_.Lock();
   1113   }
   1114 
   1115   if (have_stat_update && current->UpdateStats(stats)) {
   1116     MaybeScheduleCompaction();
   1117   }
   1118   mem->Unref();
   1119   if (imm != NULL) imm->Unref();
   1120   current->Unref();
   1121   return s;
   1122 }
   1123 
   1124 Iterator* DBImpl::NewIterator(const ReadOptions& options) {
   1125   SequenceNumber latest_snapshot;
   1126   uint32_t seed;
   1127   Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
   1128   return NewDBIterator(
   1129       this, user_comparator(), iter,
   1130       (options.snapshot != NULL
   1131        ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
   1132        : latest_snapshot),
   1133       seed);
   1134 }
   1135 
   1136 void DBImpl::RecordReadSample(Slice key) {
   1137   MutexLock l(&mutex_);
   1138   if (versions_->current()->RecordReadSample(key)) {
   1139     MaybeScheduleCompaction();
   1140   }
   1141 }
   1142 
   1143 const Snapshot* DBImpl::GetSnapshot() {
   1144   MutexLock l(&mutex_);
   1145   return snapshots_.New(versions_->LastSequence());
   1146 }
   1147 
   1148 void DBImpl::ReleaseSnapshot(const Snapshot* s) {
   1149   MutexLock l(&mutex_);
   1150   snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
   1151 }
   1152 
   1153 // Convenience methods
   1154 Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
   1155   return DB::Put(o, key, val);
   1156 }
   1157 
   1158 Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
   1159   return DB::Delete(options, key);
   1160 }
   1161 
   1162 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
   1163   Writer w(&mutex_);
   1164   w.batch = my_batch;
   1165   w.sync = options.sync;
   1166   w.done = false;
   1167 
   1168   MutexLock l(&mutex_);
   1169   writers_.push_back(&w);
   1170   while (!w.done && &w != writers_.front()) {
   1171     w.cv.Wait();
   1172   }
   1173   if (w.done) {
   1174     return w.status;
   1175   }
   1176 
   1177   // May temporarily unlock and wait.
   1178   Status status = MakeRoomForWrite(my_batch == NULL);
   1179   uint64_t last_sequence = versions_->LastSequence();
   1180   Writer* last_writer = &w;
   1181   if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
   1182     WriteBatch* updates = BuildBatchGroup(&last_writer);
   1183     WriteBatchInternal::SetSequence(updates, last_sequence + 1);
   1184     last_sequence += WriteBatchInternal::Count(updates);
   1185 
   1186     // Add to log and apply to memtable.  We can release the lock
   1187     // during this phase since &w is currently responsible for logging
   1188     // and protects against concurrent loggers and concurrent writes
   1189     // into mem_.
   1190     {
   1191       mutex_.Unlock();
   1192       status = log_->AddRecord(WriteBatchInternal::Contents(updates));
   1193       bool sync_error = false;
   1194       if (status.ok() && options.sync) {
   1195         status = logfile_->Sync();
   1196         if (!status.ok()) {
   1197           sync_error = true;
   1198         }
   1199       }
   1200       if (status.ok()) {
   1201         status = WriteBatchInternal::InsertInto(updates, mem_);
   1202       }
   1203       mutex_.Lock();
   1204       if (sync_error) {
   1205         // The state of the log file is indeterminate: the log record we
   1206         // just added may or may not show up when the DB is re-opened.
   1207         // So we force the DB into a mode where all future writes fail.
   1208         RecordBackgroundError(status);
   1209       }
   1210     }
   1211     if (updates == tmp_batch_) tmp_batch_->Clear();
   1212 
   1213     versions_->SetLastSequence(last_sequence);
   1214   }
   1215 
   1216   while (true) {
   1217     Writer* ready = writers_.front();
   1218     writers_.pop_front();
   1219     if (ready != &w) {
   1220       ready->status = status;
   1221       ready->done = true;
   1222       ready->cv.Signal();
   1223     }
   1224     if (ready == last_writer) break;
   1225   }
   1226 
   1227   // Notify new head of write queue
   1228   if (!writers_.empty()) {
   1229     writers_.front()->cv.Signal();
   1230   }
   1231 
   1232   return status;
   1233 }
   1234 
   1235 // REQUIRES: Writer list must be non-empty
   1236 // REQUIRES: First writer must have a non-NULL batch
   1237 WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
   1238   assert(!writers_.empty());
   1239   Writer* first = writers_.front();
   1240   WriteBatch* result = first->batch;
   1241   assert(result != NULL);
   1242 
   1243   size_t size = WriteBatchInternal::ByteSize(first->batch);
   1244 
   1245   // Allow the group to grow up to a maximum size, but if the
   1246   // original write is small, limit the growth so we do not slow
   1247   // down the small write too much.
   1248   size_t max_size = 1 << 20;
   1249   if (size <= (128<<10)) {
   1250     max_size = size + (128<<10);
   1251   }
   1252 
   1253   *last_writer = first;
   1254   std::deque<Writer*>::iterator iter = writers_.begin();
   1255   ++iter;  // Advance past "first"
   1256   for (; iter != writers_.end(); ++iter) {
   1257     Writer* w = *iter;
   1258     if (w->sync && !first->sync) {
   1259       // Do not include a sync write into a batch handled by a non-sync write.
   1260       break;
   1261     }
   1262 
   1263     if (w->batch != NULL) {
   1264       size += WriteBatchInternal::ByteSize(w->batch);
   1265       if (size > max_size) {
   1266         // Do not make batch too big
   1267         break;
   1268       }
   1269 
   1270       // Append to *reuslt
   1271       if (result == first->batch) {
   1272         // Switch to temporary batch instead of disturbing caller's batch
   1273         result = tmp_batch_;
   1274         assert(WriteBatchInternal::Count(result) == 0);
   1275         WriteBatchInternal::Append(result, first->batch);
   1276       }
   1277       WriteBatchInternal::Append(result, w->batch);
   1278     }
   1279     *last_writer = w;
   1280   }
   1281   return result;
   1282 }
   1283 
   1284 // REQUIRES: mutex_ is held
   1285 // REQUIRES: this thread is currently at the front of the writer queue
   1286 Status DBImpl::MakeRoomForWrite(bool force) {
   1287   mutex_.AssertHeld();
   1288   assert(!writers_.empty());
   1289   bool allow_delay = !force;
   1290   Status s;
   1291   while (true) {
   1292     if (!bg_error_.ok()) {
   1293       // Yield previous error
   1294       s = bg_error_;
   1295       break;
   1296     } else if (
   1297         allow_delay &&
   1298         versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
   1299       // We are getting close to hitting a hard limit on the number of
   1300       // L0 files.  Rather than delaying a single write by several
   1301       // seconds when we hit the hard limit, start delaying each
   1302       // individual write by 1ms to reduce latency variance.  Also,
   1303       // this delay hands over some CPU to the compaction thread in
   1304       // case it is sharing the same core as the writer.
   1305       mutex_.Unlock();
   1306       env_->SleepForMicroseconds(1000);
   1307       allow_delay = false;  // Do not delay a single write more than once
   1308       mutex_.Lock();
   1309     } else if (!force &&
   1310                (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
   1311       // There is room in current memtable
   1312       break;
   1313     } else if (imm_ != NULL) {
   1314       // We have filled up the current memtable, but the previous
   1315       // one is still being compacted, so we wait.
   1316       Log(options_.info_log, "Current memtable full; waiting...\n");
   1317       bg_cv_.Wait();
   1318     } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
   1319       // There are too many level-0 files.
   1320       Log(options_.info_log, "Too many L0 files; waiting...\n");
   1321       bg_cv_.Wait();
   1322     } else {
   1323       // Attempt to switch to a new memtable and trigger compaction of old
   1324       assert(versions_->PrevLogNumber() == 0);
   1325       uint64_t new_log_number = versions_->NewFileNumber();
   1326       WritableFile* lfile = NULL;
   1327       s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
   1328       if (!s.ok()) {
   1329         // Avoid chewing through file number space in a tight loop.
   1330         versions_->ReuseFileNumber(new_log_number);
   1331         break;
   1332       }
   1333       delete log_;
   1334       delete logfile_;
   1335       logfile_ = lfile;
   1336       logfile_number_ = new_log_number;
   1337       log_ = new log::Writer(lfile);
   1338       imm_ = mem_;
   1339       has_imm_.Release_Store(imm_);
   1340       mem_ = new MemTable(internal_comparator_);
   1341       mem_->Ref();
   1342       force = false;   // Do not force another compaction if have room
   1343       MaybeScheduleCompaction();
   1344     }
   1345   }
   1346   return s;
   1347 }
   1348 
   1349 bool DBImpl::GetProperty(const Slice& property, std::string* value) {
   1350   value->clear();
   1351 
   1352   MutexLock l(&mutex_);
   1353   Slice in = property;
   1354   Slice prefix("leveldb.");
   1355   if (!in.starts_with(prefix)) return false;
   1356   in.remove_prefix(prefix.size());
   1357 
   1358   if (in.starts_with("num-files-at-level")) {
   1359     in.remove_prefix(strlen("num-files-at-level"));
   1360     uint64_t level;
   1361     bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
   1362     if (!ok || level >= config::kNumLevels) {
   1363       return false;
   1364     } else {
   1365       char buf[100];
   1366       snprintf(buf, sizeof(buf), "%d",
   1367                versions_->NumLevelFiles(static_cast<int>(level)));
   1368       *value = buf;
   1369       return true;
   1370     }
   1371   } else if (in == "stats") {
   1372     char buf[200];
   1373     snprintf(buf, sizeof(buf),
   1374              "                               Compactions\n"
   1375              "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
   1376              "--------------------------------------------------\n"
   1377              );
   1378     value->append(buf);
   1379     for (int level = 0; level < config::kNumLevels; level++) {
   1380       int files = versions_->NumLevelFiles(level);
   1381       if (stats_[level].micros > 0 || files > 0) {
   1382         snprintf(
   1383             buf, sizeof(buf),
   1384             "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
   1385             level,
   1386             files,
   1387             versions_->NumLevelBytes(level) / 1048576.0,
   1388             stats_[level].micros / 1e6,
   1389             stats_[level].bytes_read / 1048576.0,
   1390             stats_[level].bytes_written / 1048576.0);
   1391         value->append(buf);
   1392       }
   1393     }
   1394     return true;
   1395   } else if (in == "sstables") {
   1396     *value = versions_->current()->DebugString();
   1397     return true;
   1398   }
   1399 
   1400   return false;
   1401 }
   1402 
   1403 void DBImpl::GetApproximateSizes(
   1404     const Range* range, int n,
   1405     uint64_t* sizes) {
   1406   // TODO(opt): better implementation
   1407   Version* v;
   1408   {
   1409     MutexLock l(&mutex_);
   1410     versions_->current()->Ref();
   1411     v = versions_->current();
   1412   }
   1413 
   1414   for (int i = 0; i < n; i++) {
   1415     // Convert user_key into a corresponding internal key.
   1416     InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
   1417     InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
   1418     uint64_t start = versions_->ApproximateOffsetOf(v, k1);
   1419     uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
   1420     sizes[i] = (limit >= start ? limit - start : 0);
   1421   }
   1422 
   1423   {
   1424     MutexLock l(&mutex_);
   1425     v->Unref();
   1426   }
   1427 }
   1428 
   1429 // Default implementations of convenience methods that subclasses of DB
   1430 // can call if they wish
   1431 Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
   1432   WriteBatch batch;
   1433   batch.Put(key, value);
   1434   return Write(opt, &batch);
   1435 }
   1436 
   1437 Status DB::Delete(const WriteOptions& opt, const Slice& key) {
   1438   WriteBatch batch;
   1439   batch.Delete(key);
   1440   return Write(opt, &batch);
   1441 }
   1442 
   1443 DB::~DB() { }
   1444 
   1445 Status DB::Open(const Options& options, const std::string& dbname,
   1446                 DB** dbptr) {
   1447   *dbptr = NULL;
   1448 
   1449   DBImpl* impl = new DBImpl(options, dbname);
   1450   impl->mutex_.Lock();
   1451   VersionEdit edit;
   1452   Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
   1453   if (s.ok()) {
   1454     uint64_t new_log_number = impl->versions_->NewFileNumber();
   1455     WritableFile* lfile;
   1456     s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
   1457                                      &lfile);
   1458     if (s.ok()) {
   1459       edit.SetLogNumber(new_log_number);
   1460       impl->logfile_ = lfile;
   1461       impl->logfile_number_ = new_log_number;
   1462       impl->log_ = new log::Writer(lfile);
   1463       s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
   1464     }
   1465     if (s.ok()) {
   1466       impl->DeleteObsoleteFiles();
   1467       impl->MaybeScheduleCompaction();
   1468     }
   1469   }
   1470   impl->mutex_.Unlock();
   1471   if (s.ok()) {
   1472     *dbptr = impl;
   1473   } else {
   1474     delete impl;
   1475   }
   1476   return s;
   1477 }
   1478 
   1479 Snapshot::~Snapshot() {
   1480 }
   1481 
   1482 Status DestroyDB(const std::string& dbname, const Options& options) {
   1483   Env* env = options.env;
   1484   std::vector<std::string> filenames;
   1485   // Ignore error in case directory does not exist
   1486   env->GetChildren(dbname, &filenames);
   1487   if (filenames.empty()) {
   1488     return Status::OK();
   1489   }
   1490 
   1491   FileLock* lock;
   1492   const std::string lockname = LockFileName(dbname);
   1493   Status result = env->LockFile(lockname, &lock);
   1494   if (result.ok()) {
   1495     uint64_t number;
   1496     FileType type;
   1497     for (size_t i = 0; i < filenames.size(); i++) {
   1498       if (ParseFileName(filenames[i], &number, &type) &&
   1499           type != kDBLockFile) {  // Lock file will be deleted at end
   1500         Status del = env->DeleteFile(dbname + "/" + filenames[i]);
   1501         if (result.ok() && !del.ok()) {
   1502           result = del;
   1503         }
   1504       }
   1505     }
   1506     env->UnlockFile(lock);  // Ignore error since state is already gone
   1507     env->DeleteFile(lockname);
   1508     env->DeleteDir(dbname);  // Ignore error in case dir contains other files
   1509   }
   1510   return result;
   1511 }
   1512 
   1513 }  // namespace leveldb
   1514