Home | History | Annotate | Download | only in db
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include <sys/types.h>
      6 #include <stdio.h>
      7 #include <stdlib.h>
      8 #include "db/db_impl.h"
      9 #include "db/version_set.h"
     10 #include "leveldb/cache.h"
     11 #include "leveldb/db.h"
     12 #include "leveldb/env.h"
     13 #include "leveldb/write_batch.h"
     14 #include "port/port.h"
     15 #include "util/crc32c.h"
     16 #include "util/histogram.h"
     17 #include "util/mutexlock.h"
     18 #include "util/random.h"
     19 #include "util/testutil.h"
     20 
     21 // Comma-separated list of operations to run in the specified order
     22 //   Actual benchmarks:
     23 //      fillseq       -- write N values in sequential key order in async mode
     24 //      fillrandom    -- write N values in random key order in async mode
     25 //      overwrite     -- overwrite N values in random key order in async mode
     26 //      fillsync      -- write N/100 values in random key order in sync mode
     27 //      fill100K      -- write N/1000 100K values in random order in async mode
     28 //      deleteseq     -- delete N keys in sequential order
     29 //      deleterandom  -- delete N keys in random order
     30 //      readseq       -- read N times sequentially
     31 //      readreverse   -- read N times in reverse order
     32 //      readrandom    -- read N times in random order
     33 //      readmissing   -- read N missing keys in random order
     34 //      readhot       -- read N times in random order from 1% section of DB
     35 //      seekrandom    -- N random seeks
     36 //      crc32c        -- repeated crc32c of 4K of data
     37 //      acquireload   -- load N*1000 times
     38 //   Meta operations:
     39 //      compact     -- Compact the entire DB
     40 //      stats       -- Print DB stats
     41 //      sstables    -- Print sstable info
     42 //      heapprofile -- Dump a heap profile (if supported by this port)
     43 static const char* FLAGS_benchmarks =
     44     "fillseq,"
     45     "fillsync,"
     46     "fillrandom,"
     47     "overwrite,"
     48     "readrandom,"
     49     "readrandom,"  // Extra run to allow previous compactions to quiesce
     50     "readseq,"
     51     "readreverse,"
     52     "compact,"
     53     "readrandom,"
     54     "readseq,"
     55     "readreverse,"
     56     "fill100K,"
     57     "crc32c,"
     58     "snappycomp,"
     59     "snappyuncomp,"
     60     "acquireload,"
     61     ;
     62 
     63 // Number of key/values to place in database
     64 static int FLAGS_num = 1000000;
     65 
     66 // Number of read operations to do.  If negative, do FLAGS_num reads.
     67 static int FLAGS_reads = -1;
     68 
     69 // Number of concurrent threads to run.
     70 static int FLAGS_threads = 1;
     71 
     72 // Size of each value
     73 static int FLAGS_value_size = 100;
     74 
     75 // Arrange to generate values that shrink to this fraction of
     76 // their original size after compression
     77 static double FLAGS_compression_ratio = 0.5;
     78 
     79 // Print histogram of operation timings
     80 static bool FLAGS_histogram = false;
     81 
     82 // Number of bytes to buffer in memtable before compacting
     83 // (initialized to default value by "main")
     84 static int FLAGS_write_buffer_size = 0;
     85 
     86 // Number of bytes to use as a cache of uncompressed data.
     87 // Negative means use default settings.
     88 static int FLAGS_cache_size = -1;
     89 
     90 // Maximum number of files to keep open at the same time (use default if == 0)
     91 static int FLAGS_open_files = 0;
     92 
     93 // Bloom filter bits per key.
     94 // Negative means use default settings.
     95 static int FLAGS_bloom_bits = -1;
     96 
     97 // If true, do not destroy the existing database.  If you set this
     98 // flag and also specify a benchmark that wants a fresh database, that
     99 // benchmark will fail.
    100 static bool FLAGS_use_existing_db = false;
    101 
    102 // Use the db with the following name.
    103 static const char* FLAGS_db = NULL;
    104 
    105 namespace leveldb {
    106 
    107 namespace {
    108 
    109 // Helper for quickly generating random data.
    110 class RandomGenerator {
    111  private:
    112   std::string data_;
    113   int pos_;
    114 
    115  public:
    116   RandomGenerator() {
    117     // We use a limited amount of data over and over again and ensure
    118     // that it is larger than the compression window (32KB), and also
    119     // large enough to serve all typical value sizes we want to write.
    120     Random rnd(301);
    121     std::string piece;
    122     while (data_.size() < 1048576) {
    123       // Add a short fragment that is as compressible as specified
    124       // by FLAGS_compression_ratio.
    125       test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
    126       data_.append(piece);
    127     }
    128     pos_ = 0;
    129   }
    130 
    131   Slice Generate(size_t len) {
    132     if (pos_ + len > data_.size()) {
    133       pos_ = 0;
    134       assert(len < data_.size());
    135     }
    136     pos_ += len;
    137     return Slice(data_.data() + pos_ - len, len);
    138   }
    139 };
    140 
    141 static Slice TrimSpace(Slice s) {
    142   size_t start = 0;
    143   while (start < s.size() && isspace(s[start])) {
    144     start++;
    145   }
    146   size_t limit = s.size();
    147   while (limit > start && isspace(s[limit-1])) {
    148     limit--;
    149   }
    150   return Slice(s.data() + start, limit - start);
    151 }
    152 
    153 static void AppendWithSpace(std::string* str, Slice msg) {
    154   if (msg.empty()) return;
    155   if (!str->empty()) {
    156     str->push_back(' ');
    157   }
    158   str->append(msg.data(), msg.size());
    159 }
    160 
    161 class Stats {
    162  private:
    163   double start_;
    164   double finish_;
    165   double seconds_;
    166   int done_;
    167   int next_report_;
    168   int64_t bytes_;
    169   double last_op_finish_;
    170   Histogram hist_;
    171   std::string message_;
    172 
    173  public:
    174   Stats() { Start(); }
    175 
    176   void Start() {
    177     next_report_ = 100;
    178     last_op_finish_ = start_;
    179     hist_.Clear();
    180     done_ = 0;
    181     bytes_ = 0;
    182     seconds_ = 0;
    183     start_ = Env::Default()->NowMicros();
    184     finish_ = start_;
    185     message_.clear();
    186   }
    187 
    188   void Merge(const Stats& other) {
    189     hist_.Merge(other.hist_);
    190     done_ += other.done_;
    191     bytes_ += other.bytes_;
    192     seconds_ += other.seconds_;
    193     if (other.start_ < start_) start_ = other.start_;
    194     if (other.finish_ > finish_) finish_ = other.finish_;
    195 
    196     // Just keep the messages from one thread
    197     if (message_.empty()) message_ = other.message_;
    198   }
    199 
    200   void Stop() {
    201     finish_ = Env::Default()->NowMicros();
    202     seconds_ = (finish_ - start_) * 1e-6;
    203   }
    204 
    205   void AddMessage(Slice msg) {
    206     AppendWithSpace(&message_, msg);
    207   }
    208 
    209   void FinishedSingleOp() {
    210     if (FLAGS_histogram) {
    211       double now = Env::Default()->NowMicros();
    212       double micros = now - last_op_finish_;
    213       hist_.Add(micros);
    214       if (micros > 20000) {
    215         fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
    216         fflush(stderr);
    217       }
    218       last_op_finish_ = now;
    219     }
    220 
    221     done_++;
    222     if (done_ >= next_report_) {
    223       if      (next_report_ < 1000)   next_report_ += 100;
    224       else if (next_report_ < 5000)   next_report_ += 500;
    225       else if (next_report_ < 10000)  next_report_ += 1000;
    226       else if (next_report_ < 50000)  next_report_ += 5000;
    227       else if (next_report_ < 100000) next_report_ += 10000;
    228       else if (next_report_ < 500000) next_report_ += 50000;
    229       else                            next_report_ += 100000;
    230       fprintf(stderr, "... finished %d ops%30s\r", done_, "");
    231       fflush(stderr);
    232     }
    233   }
    234 
    235   void AddBytes(int64_t n) {
    236     bytes_ += n;
    237   }
    238 
    239   void Report(const Slice& name) {
    240     // Pretend at least one op was done in case we are running a benchmark
    241     // that does not call FinishedSingleOp().
    242     if (done_ < 1) done_ = 1;
    243 
    244     std::string extra;
    245     if (bytes_ > 0) {
    246       // Rate is computed on actual elapsed time, not the sum of per-thread
    247       // elapsed times.
    248       double elapsed = (finish_ - start_) * 1e-6;
    249       char rate[100];
    250       snprintf(rate, sizeof(rate), "%6.1f MB/s",
    251                (bytes_ / 1048576.0) / elapsed);
    252       extra = rate;
    253     }
    254     AppendWithSpace(&extra, message_);
    255 
    256     fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
    257             name.ToString().c_str(),
    258             seconds_ * 1e6 / done_,
    259             (extra.empty() ? "" : " "),
    260             extra.c_str());
    261     if (FLAGS_histogram) {
    262       fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
    263     }
    264     fflush(stdout);
    265   }
    266 };
    267 
    268 // State shared by all concurrent executions of the same benchmark.
    269 struct SharedState {
    270   port::Mutex mu;
    271   port::CondVar cv;
    272   int total;
    273 
    274   // Each thread goes through the following states:
    275   //    (1) initializing
    276   //    (2) waiting for others to be initialized
    277   //    (3) running
    278   //    (4) done
    279 
    280   int num_initialized;
    281   int num_done;
    282   bool start;
    283 
    284   SharedState() : cv(&mu) { }
    285 };
    286 
    287 // Per-thread state for concurrent executions of the same benchmark.
    288 struct ThreadState {
    289   int tid;             // 0..n-1 when running in n threads
    290   Random rand;         // Has different seeds for different threads
    291   Stats stats;
    292   SharedState* shared;
    293 
    294   ThreadState(int index)
    295       : tid(index),
    296         rand(1000 + index) {
    297   }
    298 };
    299 
    300 }  // namespace
    301 
    302 class Benchmark {
    303  private:
    304   Cache* cache_;
    305   const FilterPolicy* filter_policy_;
    306   DB* db_;
    307   int num_;
    308   int value_size_;
    309   int entries_per_batch_;
    310   WriteOptions write_options_;
    311   int reads_;
    312   int heap_counter_;
    313 
    314   void PrintHeader() {
    315     const int kKeySize = 16;
    316     PrintEnvironment();
    317     fprintf(stdout, "Keys:       %d bytes each\n", kKeySize);
    318     fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
    319             FLAGS_value_size,
    320             static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
    321     fprintf(stdout, "Entries:    %d\n", num_);
    322     fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
    323             ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_)
    324              / 1048576.0));
    325     fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
    326             (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_)
    327              / 1048576.0));
    328     PrintWarnings();
    329     fprintf(stdout, "------------------------------------------------\n");
    330   }
    331 
    332   void PrintWarnings() {
    333 #if defined(__GNUC__) && !defined(__OPTIMIZE__)
    334     fprintf(stdout,
    335             "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
    336             );
    337 #endif
    338 #ifndef NDEBUG
    339     fprintf(stdout,
    340             "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
    341 #endif
    342 
    343     // See if snappy is working by attempting to compress a compressible string
    344     const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
    345     std::string compressed;
    346     if (!port::Snappy_Compress(text, sizeof(text), &compressed)) {
    347       fprintf(stdout, "WARNING: Snappy compression is not enabled\n");
    348     } else if (compressed.size() >= sizeof(text)) {
    349       fprintf(stdout, "WARNING: Snappy compression is not effective\n");
    350     }
    351   }
    352 
    353   void PrintEnvironment() {
    354     fprintf(stderr, "LevelDB:    version %d.%d\n",
    355             kMajorVersion, kMinorVersion);
    356 
    357 #if defined(__linux)
    358     time_t now = time(NULL);
    359     fprintf(stderr, "Date:       %s", ctime(&now));  // ctime() adds newline
    360 
    361     FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
    362     if (cpuinfo != NULL) {
    363       char line[1000];
    364       int num_cpus = 0;
    365       std::string cpu_type;
    366       std::string cache_size;
    367       while (fgets(line, sizeof(line), cpuinfo) != NULL) {
    368         const char* sep = strchr(line, ':');
    369         if (sep == NULL) {
    370           continue;
    371         }
    372         Slice key = TrimSpace(Slice(line, sep - 1 - line));
    373         Slice val = TrimSpace(Slice(sep + 1));
    374         if (key == "model name") {
    375           ++num_cpus;
    376           cpu_type = val.ToString();
    377         } else if (key == "cache size") {
    378           cache_size = val.ToString();
    379         }
    380       }
    381       fclose(cpuinfo);
    382       fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
    383       fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
    384     }
    385 #endif
    386   }
    387 
    388  public:
    389   Benchmark()
    390   : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL),
    391     filter_policy_(FLAGS_bloom_bits >= 0
    392                    ? NewBloomFilterPolicy(FLAGS_bloom_bits)
    393                    : NULL),
    394     db_(NULL),
    395     num_(FLAGS_num),
    396     value_size_(FLAGS_value_size),
    397     entries_per_batch_(1),
    398     reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
    399     heap_counter_(0) {
    400     std::vector<std::string> files;
    401     Env::Default()->GetChildren(FLAGS_db, &files);
    402     for (size_t i = 0; i < files.size(); i++) {
    403       if (Slice(files[i]).starts_with("heap-")) {
    404         Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
    405       }
    406     }
    407     if (!FLAGS_use_existing_db) {
    408       DestroyDB(FLAGS_db, Options());
    409     }
    410   }
    411 
    412   ~Benchmark() {
    413     delete db_;
    414     delete cache_;
    415     delete filter_policy_;
    416   }
    417 
    418   void Run() {
    419     PrintHeader();
    420     Open();
    421 
    422     const char* benchmarks = FLAGS_benchmarks;
    423     while (benchmarks != NULL) {
    424       const char* sep = strchr(benchmarks, ',');
    425       Slice name;
    426       if (sep == NULL) {
    427         name = benchmarks;
    428         benchmarks = NULL;
    429       } else {
    430         name = Slice(benchmarks, sep - benchmarks);
    431         benchmarks = sep + 1;
    432       }
    433 
    434       // Reset parameters that may be overriddden bwlow
    435       num_ = FLAGS_num;
    436       reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
    437       value_size_ = FLAGS_value_size;
    438       entries_per_batch_ = 1;
    439       write_options_ = WriteOptions();
    440 
    441       void (Benchmark::*method)(ThreadState*) = NULL;
    442       bool fresh_db = false;
    443       int num_threads = FLAGS_threads;
    444 
    445       if (name == Slice("fillseq")) {
    446         fresh_db = true;
    447         method = &Benchmark::WriteSeq;
    448       } else if (name == Slice("fillbatch")) {
    449         fresh_db = true;
    450         entries_per_batch_ = 1000;
    451         method = &Benchmark::WriteSeq;
    452       } else if (name == Slice("fillrandom")) {
    453         fresh_db = true;
    454         method = &Benchmark::WriteRandom;
    455       } else if (name == Slice("overwrite")) {
    456         fresh_db = false;
    457         method = &Benchmark::WriteRandom;
    458       } else if (name == Slice("fillsync")) {
    459         fresh_db = true;
    460         num_ /= 1000;
    461         write_options_.sync = true;
    462         method = &Benchmark::WriteRandom;
    463       } else if (name == Slice("fill100K")) {
    464         fresh_db = true;
    465         num_ /= 1000;
    466         value_size_ = 100 * 1000;
    467         method = &Benchmark::WriteRandom;
    468       } else if (name == Slice("readseq")) {
    469         method = &Benchmark::ReadSequential;
    470       } else if (name == Slice("readreverse")) {
    471         method = &Benchmark::ReadReverse;
    472       } else if (name == Slice("readrandom")) {
    473         method = &Benchmark::ReadRandom;
    474       } else if (name == Slice("readmissing")) {
    475         method = &Benchmark::ReadMissing;
    476       } else if (name == Slice("seekrandom")) {
    477         method = &Benchmark::SeekRandom;
    478       } else if (name == Slice("readhot")) {
    479         method = &Benchmark::ReadHot;
    480       } else if (name == Slice("readrandomsmall")) {
    481         reads_ /= 1000;
    482         method = &Benchmark::ReadRandom;
    483       } else if (name == Slice("deleteseq")) {
    484         method = &Benchmark::DeleteSeq;
    485       } else if (name == Slice("deleterandom")) {
    486         method = &Benchmark::DeleteRandom;
    487       } else if (name == Slice("readwhilewriting")) {
    488         num_threads++;  // Add extra thread for writing
    489         method = &Benchmark::ReadWhileWriting;
    490       } else if (name == Slice("compact")) {
    491         method = &Benchmark::Compact;
    492       } else if (name == Slice("crc32c")) {
    493         method = &Benchmark::Crc32c;
    494       } else if (name == Slice("acquireload")) {
    495         method = &Benchmark::AcquireLoad;
    496       } else if (name == Slice("snappycomp")) {
    497         method = &Benchmark::SnappyCompress;
    498       } else if (name == Slice("snappyuncomp")) {
    499         method = &Benchmark::SnappyUncompress;
    500       } else if (name == Slice("heapprofile")) {
    501         HeapProfile();
    502       } else if (name == Slice("stats")) {
    503         PrintStats("leveldb.stats");
    504       } else if (name == Slice("sstables")) {
    505         PrintStats("leveldb.sstables");
    506       } else {
    507         if (name != Slice()) {  // No error message for empty name
    508           fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
    509         }
    510       }
    511 
    512       if (fresh_db) {
    513         if (FLAGS_use_existing_db) {
    514           fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
    515                   name.ToString().c_str());
    516           method = NULL;
    517         } else {
    518           delete db_;
    519           db_ = NULL;
    520           DestroyDB(FLAGS_db, Options());
    521           Open();
    522         }
    523       }
    524 
    525       if (method != NULL) {
    526         RunBenchmark(num_threads, name, method);
    527       }
    528     }
    529   }
    530 
    531  private:
    532   struct ThreadArg {
    533     Benchmark* bm;
    534     SharedState* shared;
    535     ThreadState* thread;
    536     void (Benchmark::*method)(ThreadState*);
    537   };
    538 
    539   static void ThreadBody(void* v) {
    540     ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
    541     SharedState* shared = arg->shared;
    542     ThreadState* thread = arg->thread;
    543     {
    544       MutexLock l(&shared->mu);
    545       shared->num_initialized++;
    546       if (shared->num_initialized >= shared->total) {
    547         shared->cv.SignalAll();
    548       }
    549       while (!shared->start) {
    550         shared->cv.Wait();
    551       }
    552     }
    553 
    554     thread->stats.Start();
    555     (arg->bm->*(arg->method))(thread);
    556     thread->stats.Stop();
    557 
    558     {
    559       MutexLock l(&shared->mu);
    560       shared->num_done++;
    561       if (shared->num_done >= shared->total) {
    562         shared->cv.SignalAll();
    563       }
    564     }
    565   }
    566 
    567   void RunBenchmark(int n, Slice name,
    568                     void (Benchmark::*method)(ThreadState*)) {
    569     SharedState shared;
    570     shared.total = n;
    571     shared.num_initialized = 0;
    572     shared.num_done = 0;
    573     shared.start = false;
    574 
    575     ThreadArg* arg = new ThreadArg[n];
    576     for (int i = 0; i < n; i++) {
    577       arg[i].bm = this;
    578       arg[i].method = method;
    579       arg[i].shared = &shared;
    580       arg[i].thread = new ThreadState(i);
    581       arg[i].thread->shared = &shared;
    582       Env::Default()->StartThread(ThreadBody, &arg[i]);
    583     }
    584 
    585     shared.mu.Lock();
    586     while (shared.num_initialized < n) {
    587       shared.cv.Wait();
    588     }
    589 
    590     shared.start = true;
    591     shared.cv.SignalAll();
    592     while (shared.num_done < n) {
    593       shared.cv.Wait();
    594     }
    595     shared.mu.Unlock();
    596 
    597     for (int i = 1; i < n; i++) {
    598       arg[0].thread->stats.Merge(arg[i].thread->stats);
    599     }
    600     arg[0].thread->stats.Report(name);
    601 
    602     for (int i = 0; i < n; i++) {
    603       delete arg[i].thread;
    604     }
    605     delete[] arg;
    606   }
    607 
    608   void Crc32c(ThreadState* thread) {
    609     // Checksum about 500MB of data total
    610     const int size = 4096;
    611     const char* label = "(4K per op)";
    612     std::string data(size, 'x');
    613     int64_t bytes = 0;
    614     uint32_t crc = 0;
    615     while (bytes < 500 * 1048576) {
    616       crc = crc32c::Value(data.data(), size);
    617       thread->stats.FinishedSingleOp();
    618       bytes += size;
    619     }
    620     // Print so result is not dead
    621     fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
    622 
    623     thread->stats.AddBytes(bytes);
    624     thread->stats.AddMessage(label);
    625   }
    626 
    627   void AcquireLoad(ThreadState* thread) {
    628     int dummy;
    629     port::AtomicPointer ap(&dummy);
    630     int count = 0;
    631     void *ptr = NULL;
    632     thread->stats.AddMessage("(each op is 1000 loads)");
    633     while (count < 100000) {
    634       for (int i = 0; i < 1000; i++) {
    635         ptr = ap.Acquire_Load();
    636       }
    637       count++;
    638       thread->stats.FinishedSingleOp();
    639     }
    640     if (ptr == NULL) exit(1); // Disable unused variable warning.
    641   }
    642 
    643   void SnappyCompress(ThreadState* thread) {
    644     RandomGenerator gen;
    645     Slice input = gen.Generate(Options().block_size);
    646     int64_t bytes = 0;
    647     int64_t produced = 0;
    648     bool ok = true;
    649     std::string compressed;
    650     while (ok && bytes < 1024 * 1048576) {  // Compress 1G
    651       ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
    652       produced += compressed.size();
    653       bytes += input.size();
    654       thread->stats.FinishedSingleOp();
    655     }
    656 
    657     if (!ok) {
    658       thread->stats.AddMessage("(snappy failure)");
    659     } else {
    660       char buf[100];
    661       snprintf(buf, sizeof(buf), "(output: %.1f%%)",
    662                (produced * 100.0) / bytes);
    663       thread->stats.AddMessage(buf);
    664       thread->stats.AddBytes(bytes);
    665     }
    666   }
    667 
    668   void SnappyUncompress(ThreadState* thread) {
    669     RandomGenerator gen;
    670     Slice input = gen.Generate(Options().block_size);
    671     std::string compressed;
    672     bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
    673     int64_t bytes = 0;
    674     char* uncompressed = new char[input.size()];
    675     while (ok && bytes < 1024 * 1048576) {  // Compress 1G
    676       ok =  port::Snappy_Uncompress(compressed.data(), compressed.size(),
    677                                     uncompressed);
    678       bytes += input.size();
    679       thread->stats.FinishedSingleOp();
    680     }
    681     delete[] uncompressed;
    682 
    683     if (!ok) {
    684       thread->stats.AddMessage("(snappy failure)");
    685     } else {
    686       thread->stats.AddBytes(bytes);
    687     }
    688   }
    689 
    690   void Open() {
    691     assert(db_ == NULL);
    692     Options options;
    693     options.create_if_missing = !FLAGS_use_existing_db;
    694     options.block_cache = cache_;
    695     options.write_buffer_size = FLAGS_write_buffer_size;
    696     options.max_open_files = FLAGS_open_files;
    697     options.filter_policy = filter_policy_;
    698     Status s = DB::Open(options, FLAGS_db, &db_);
    699     if (!s.ok()) {
    700       fprintf(stderr, "open error: %s\n", s.ToString().c_str());
    701       exit(1);
    702     }
    703   }
    704 
    705   void WriteSeq(ThreadState* thread) {
    706     DoWrite(thread, true);
    707   }
    708 
    709   void WriteRandom(ThreadState* thread) {
    710     DoWrite(thread, false);
    711   }
    712 
    713   void DoWrite(ThreadState* thread, bool seq) {
    714     if (num_ != FLAGS_num) {
    715       char msg[100];
    716       snprintf(msg, sizeof(msg), "(%d ops)", num_);
    717       thread->stats.AddMessage(msg);
    718     }
    719 
    720     RandomGenerator gen;
    721     WriteBatch batch;
    722     Status s;
    723     int64_t bytes = 0;
    724     for (int i = 0; i < num_; i += entries_per_batch_) {
    725       batch.Clear();
    726       for (int j = 0; j < entries_per_batch_; j++) {
    727         const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
    728         char key[100];
    729         snprintf(key, sizeof(key), "%016d", k);
    730         batch.Put(key, gen.Generate(value_size_));
    731         bytes += value_size_ + strlen(key);
    732         thread->stats.FinishedSingleOp();
    733       }
    734       s = db_->Write(write_options_, &batch);
    735       if (!s.ok()) {
    736         fprintf(stderr, "put error: %s\n", s.ToString().c_str());
    737         exit(1);
    738       }
    739     }
    740     thread->stats.AddBytes(bytes);
    741   }
    742 
    743   void ReadSequential(ThreadState* thread) {
    744     Iterator* iter = db_->NewIterator(ReadOptions());
    745     int i = 0;
    746     int64_t bytes = 0;
    747     for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
    748       bytes += iter->key().size() + iter->value().size();
    749       thread->stats.FinishedSingleOp();
    750       ++i;
    751     }
    752     delete iter;
    753     thread->stats.AddBytes(bytes);
    754   }
    755 
    756   void ReadReverse(ThreadState* thread) {
    757     Iterator* iter = db_->NewIterator(ReadOptions());
    758     int i = 0;
    759     int64_t bytes = 0;
    760     for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
    761       bytes += iter->key().size() + iter->value().size();
    762       thread->stats.FinishedSingleOp();
    763       ++i;
    764     }
    765     delete iter;
    766     thread->stats.AddBytes(bytes);
    767   }
    768 
    769   void ReadRandom(ThreadState* thread) {
    770     ReadOptions options;
    771     std::string value;
    772     int found = 0;
    773     for (int i = 0; i < reads_; i++) {
    774       char key[100];
    775       const int k = thread->rand.Next() % FLAGS_num;
    776       snprintf(key, sizeof(key), "%016d", k);
    777       if (db_->Get(options, key, &value).ok()) {
    778         found++;
    779       }
    780       thread->stats.FinishedSingleOp();
    781     }
    782     char msg[100];
    783     snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
    784     thread->stats.AddMessage(msg);
    785   }
    786 
    787   void ReadMissing(ThreadState* thread) {
    788     ReadOptions options;
    789     std::string value;
    790     for (int i = 0; i < reads_; i++) {
    791       char key[100];
    792       const int k = thread->rand.Next() % FLAGS_num;
    793       snprintf(key, sizeof(key), "%016d.", k);
    794       db_->Get(options, key, &value);
    795       thread->stats.FinishedSingleOp();
    796     }
    797   }
    798 
    799   void ReadHot(ThreadState* thread) {
    800     ReadOptions options;
    801     std::string value;
    802     const int range = (FLAGS_num + 99) / 100;
    803     for (int i = 0; i < reads_; i++) {
    804       char key[100];
    805       const int k = thread->rand.Next() % range;
    806       snprintf(key, sizeof(key), "%016d", k);
    807       db_->Get(options, key, &value);
    808       thread->stats.FinishedSingleOp();
    809     }
    810   }
    811 
    812   void SeekRandom(ThreadState* thread) {
    813     ReadOptions options;
    814     std::string value;
    815     int found = 0;
    816     for (int i = 0; i < reads_; i++) {
    817       Iterator* iter = db_->NewIterator(options);
    818       char key[100];
    819       const int k = thread->rand.Next() % FLAGS_num;
    820       snprintf(key, sizeof(key), "%016d", k);
    821       iter->Seek(key);
    822       if (iter->Valid() && iter->key() == key) found++;
    823       delete iter;
    824       thread->stats.FinishedSingleOp();
    825     }
    826     char msg[100];
    827     snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
    828     thread->stats.AddMessage(msg);
    829   }
    830 
    831   void DoDelete(ThreadState* thread, bool seq) {
    832     RandomGenerator gen;
    833     WriteBatch batch;
    834     Status s;
    835     for (int i = 0; i < num_; i += entries_per_batch_) {
    836       batch.Clear();
    837       for (int j = 0; j < entries_per_batch_; j++) {
    838         const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
    839         char key[100];
    840         snprintf(key, sizeof(key), "%016d", k);
    841         batch.Delete(key);
    842         thread->stats.FinishedSingleOp();
    843       }
    844       s = db_->Write(write_options_, &batch);
    845       if (!s.ok()) {
    846         fprintf(stderr, "del error: %s\n", s.ToString().c_str());
    847         exit(1);
    848       }
    849     }
    850   }
    851 
    852   void DeleteSeq(ThreadState* thread) {
    853     DoDelete(thread, true);
    854   }
    855 
    856   void DeleteRandom(ThreadState* thread) {
    857     DoDelete(thread, false);
    858   }
    859 
    860   void ReadWhileWriting(ThreadState* thread) {
    861     if (thread->tid > 0) {
    862       ReadRandom(thread);
    863     } else {
    864       // Special thread that keeps writing until other threads are done.
    865       RandomGenerator gen;
    866       while (true) {
    867         {
    868           MutexLock l(&thread->shared->mu);
    869           if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
    870             // Other threads have finished
    871             break;
    872           }
    873         }
    874 
    875         const int k = thread->rand.Next() % FLAGS_num;
    876         char key[100];
    877         snprintf(key, sizeof(key), "%016d", k);
    878         Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
    879         if (!s.ok()) {
    880           fprintf(stderr, "put error: %s\n", s.ToString().c_str());
    881           exit(1);
    882         }
    883       }
    884 
    885       // Do not count any of the preceding work/delay in stats.
    886       thread->stats.Start();
    887     }
    888   }
    889 
    890   void Compact(ThreadState* thread) {
    891     db_->CompactRange(NULL, NULL);
    892   }
    893 
    894   void PrintStats(const char* key) {
    895     std::string stats;
    896     if (!db_->GetProperty(key, &stats)) {
    897       stats = "(failed)";
    898     }
    899     fprintf(stdout, "\n%s\n", stats.c_str());
    900   }
    901 
    902   static void WriteToFile(void* arg, const char* buf, int n) {
    903     reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
    904   }
    905 
    906   void HeapProfile() {
    907     char fname[100];
    908     snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
    909     WritableFile* file;
    910     Status s = Env::Default()->NewWritableFile(fname, &file);
    911     if (!s.ok()) {
    912       fprintf(stderr, "%s\n", s.ToString().c_str());
    913       return;
    914     }
    915     bool ok = port::GetHeapProfile(WriteToFile, file);
    916     delete file;
    917     if (!ok) {
    918       fprintf(stderr, "heap profiling not supported\n");
    919       Env::Default()->DeleteFile(fname);
    920     }
    921   }
    922 };
    923 
    924 }  // namespace leveldb
    925 
    926 int main(int argc, char** argv) {
    927   FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
    928   FLAGS_open_files = leveldb::Options().max_open_files;
    929   std::string default_db_path;
    930 
    931   for (int i = 1; i < argc; i++) {
    932     double d;
    933     int n;
    934     char junk;
    935     if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) {
    936       FLAGS_benchmarks = argv[i] + strlen("--benchmarks=");
    937     } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) {
    938       FLAGS_compression_ratio = d;
    939     } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
    940                (n == 0 || n == 1)) {
    941       FLAGS_histogram = n;
    942     } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
    943                (n == 0 || n == 1)) {
    944       FLAGS_use_existing_db = n;
    945     } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
    946       FLAGS_num = n;
    947     } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
    948       FLAGS_reads = n;
    949     } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
    950       FLAGS_threads = n;
    951     } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
    952       FLAGS_value_size = n;
    953     } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
    954       FLAGS_write_buffer_size = n;
    955     } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
    956       FLAGS_cache_size = n;
    957     } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
    958       FLAGS_bloom_bits = n;
    959     } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
    960       FLAGS_open_files = n;
    961     } else if (strncmp(argv[i], "--db=", 5) == 0) {
    962       FLAGS_db = argv[i] + 5;
    963     } else {
    964       fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
    965       exit(1);
    966     }
    967   }
    968 
    969   // Choose a location for the test database if none given with --db=<path>
    970   if (FLAGS_db == NULL) {
    971       leveldb::Env::Default()->GetTestDirectory(&default_db_path);
    972       default_db_path += "/dbbench";
    973       FLAGS_db = default_db_path.c_str();
    974   }
    975 
    976   leveldb::Benchmark benchmark;
    977   benchmark.Run();
    978   return 0;
    979 }
    980