1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include <sys/types.h> 6 #include <stdio.h> 7 #include <stdlib.h> 8 #include "db/db_impl.h" 9 #include "db/version_set.h" 10 #include "leveldb/cache.h" 11 #include "leveldb/db.h" 12 #include "leveldb/env.h" 13 #include "leveldb/write_batch.h" 14 #include "port/port.h" 15 #include "util/crc32c.h" 16 #include "util/histogram.h" 17 #include "util/mutexlock.h" 18 #include "util/random.h" 19 #include "util/testutil.h" 20 21 // Comma-separated list of operations to run in the specified order 22 // Actual benchmarks: 23 // fillseq -- write N values in sequential key order in async mode 24 // fillrandom -- write N values in random key order in async mode 25 // overwrite -- overwrite N values in random key order in async mode 26 // fillsync -- write N/100 values in random key order in sync mode 27 // fill100K -- write N/1000 100K values in random order in async mode 28 // deleteseq -- delete N keys in sequential order 29 // deleterandom -- delete N keys in random order 30 // readseq -- read N times sequentially 31 // readreverse -- read N times in reverse order 32 // readrandom -- read N times in random order 33 // readmissing -- read N missing keys in random order 34 // readhot -- read N times in random order from 1% section of DB 35 // seekrandom -- N random seeks 36 // crc32c -- repeated crc32c of 4K of data 37 // acquireload -- load N*1000 times 38 // Meta operations: 39 // compact -- Compact the entire DB 40 // stats -- Print DB stats 41 // sstables -- Print sstable info 42 // heapprofile -- Dump a heap profile (if supported by this port) 43 static const char* FLAGS_benchmarks = 44 "fillseq," 45 "fillsync," 46 "fillrandom," 47 "overwrite," 48 "readrandom," 49 "readrandom," // Extra run to allow previous compactions to quiesce 50 "readseq," 51 "readreverse," 52 "compact," 53 "readrandom," 54 "readseq," 55 "readreverse," 56 "fill100K," 57 "crc32c," 58 "snappycomp," 59 "snappyuncomp," 60 "acquireload," 61 ; 62 63 // Number of key/values to place in database 64 static int FLAGS_num = 1000000; 65 66 // Number of read operations to do. If negative, do FLAGS_num reads. 67 static int FLAGS_reads = -1; 68 69 // Number of concurrent threads to run. 70 static int FLAGS_threads = 1; 71 72 // Size of each value 73 static int FLAGS_value_size = 100; 74 75 // Arrange to generate values that shrink to this fraction of 76 // their original size after compression 77 static double FLAGS_compression_ratio = 0.5; 78 79 // Print histogram of operation timings 80 static bool FLAGS_histogram = false; 81 82 // Number of bytes to buffer in memtable before compacting 83 // (initialized to default value by "main") 84 static int FLAGS_write_buffer_size = 0; 85 86 // Number of bytes to use as a cache of uncompressed data. 87 // Negative means use default settings. 88 static int FLAGS_cache_size = -1; 89 90 // Maximum number of files to keep open at the same time (use default if == 0) 91 static int FLAGS_open_files = 0; 92 93 // Bloom filter bits per key. 94 // Negative means use default settings. 95 static int FLAGS_bloom_bits = -1; 96 97 // If true, do not destroy the existing database. If you set this 98 // flag and also specify a benchmark that wants a fresh database, that 99 // benchmark will fail. 100 static bool FLAGS_use_existing_db = false; 101 102 // Use the db with the following name. 103 static const char* FLAGS_db = NULL; 104 105 namespace leveldb { 106 107 namespace { 108 109 // Helper for quickly generating random data. 110 class RandomGenerator { 111 private: 112 std::string data_; 113 int pos_; 114 115 public: 116 RandomGenerator() { 117 // We use a limited amount of data over and over again and ensure 118 // that it is larger than the compression window (32KB), and also 119 // large enough to serve all typical value sizes we want to write. 120 Random rnd(301); 121 std::string piece; 122 while (data_.size() < 1048576) { 123 // Add a short fragment that is as compressible as specified 124 // by FLAGS_compression_ratio. 125 test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece); 126 data_.append(piece); 127 } 128 pos_ = 0; 129 } 130 131 Slice Generate(size_t len) { 132 if (pos_ + len > data_.size()) { 133 pos_ = 0; 134 assert(len < data_.size()); 135 } 136 pos_ += len; 137 return Slice(data_.data() + pos_ - len, len); 138 } 139 }; 140 141 static Slice TrimSpace(Slice s) { 142 size_t start = 0; 143 while (start < s.size() && isspace(s[start])) { 144 start++; 145 } 146 size_t limit = s.size(); 147 while (limit > start && isspace(s[limit-1])) { 148 limit--; 149 } 150 return Slice(s.data() + start, limit - start); 151 } 152 153 static void AppendWithSpace(std::string* str, Slice msg) { 154 if (msg.empty()) return; 155 if (!str->empty()) { 156 str->push_back(' '); 157 } 158 str->append(msg.data(), msg.size()); 159 } 160 161 class Stats { 162 private: 163 double start_; 164 double finish_; 165 double seconds_; 166 int done_; 167 int next_report_; 168 int64_t bytes_; 169 double last_op_finish_; 170 Histogram hist_; 171 std::string message_; 172 173 public: 174 Stats() { Start(); } 175 176 void Start() { 177 next_report_ = 100; 178 last_op_finish_ = start_; 179 hist_.Clear(); 180 done_ = 0; 181 bytes_ = 0; 182 seconds_ = 0; 183 start_ = Env::Default()->NowMicros(); 184 finish_ = start_; 185 message_.clear(); 186 } 187 188 void Merge(const Stats& other) { 189 hist_.Merge(other.hist_); 190 done_ += other.done_; 191 bytes_ += other.bytes_; 192 seconds_ += other.seconds_; 193 if (other.start_ < start_) start_ = other.start_; 194 if (other.finish_ > finish_) finish_ = other.finish_; 195 196 // Just keep the messages from one thread 197 if (message_.empty()) message_ = other.message_; 198 } 199 200 void Stop() { 201 finish_ = Env::Default()->NowMicros(); 202 seconds_ = (finish_ - start_) * 1e-6; 203 } 204 205 void AddMessage(Slice msg) { 206 AppendWithSpace(&message_, msg); 207 } 208 209 void FinishedSingleOp() { 210 if (FLAGS_histogram) { 211 double now = Env::Default()->NowMicros(); 212 double micros = now - last_op_finish_; 213 hist_.Add(micros); 214 if (micros > 20000) { 215 fprintf(stderr, "long op: %.1f micros%30s\r", micros, ""); 216 fflush(stderr); 217 } 218 last_op_finish_ = now; 219 } 220 221 done_++; 222 if (done_ >= next_report_) { 223 if (next_report_ < 1000) next_report_ += 100; 224 else if (next_report_ < 5000) next_report_ += 500; 225 else if (next_report_ < 10000) next_report_ += 1000; 226 else if (next_report_ < 50000) next_report_ += 5000; 227 else if (next_report_ < 100000) next_report_ += 10000; 228 else if (next_report_ < 500000) next_report_ += 50000; 229 else next_report_ += 100000; 230 fprintf(stderr, "... finished %d ops%30s\r", done_, ""); 231 fflush(stderr); 232 } 233 } 234 235 void AddBytes(int64_t n) { 236 bytes_ += n; 237 } 238 239 void Report(const Slice& name) { 240 // Pretend at least one op was done in case we are running a benchmark 241 // that does not call FinishedSingleOp(). 242 if (done_ < 1) done_ = 1; 243 244 std::string extra; 245 if (bytes_ > 0) { 246 // Rate is computed on actual elapsed time, not the sum of per-thread 247 // elapsed times. 248 double elapsed = (finish_ - start_) * 1e-6; 249 char rate[100]; 250 snprintf(rate, sizeof(rate), "%6.1f MB/s", 251 (bytes_ / 1048576.0) / elapsed); 252 extra = rate; 253 } 254 AppendWithSpace(&extra, message_); 255 256 fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", 257 name.ToString().c_str(), 258 seconds_ * 1e6 / done_, 259 (extra.empty() ? "" : " "), 260 extra.c_str()); 261 if (FLAGS_histogram) { 262 fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); 263 } 264 fflush(stdout); 265 } 266 }; 267 268 // State shared by all concurrent executions of the same benchmark. 269 struct SharedState { 270 port::Mutex mu; 271 port::CondVar cv; 272 int total; 273 274 // Each thread goes through the following states: 275 // (1) initializing 276 // (2) waiting for others to be initialized 277 // (3) running 278 // (4) done 279 280 int num_initialized; 281 int num_done; 282 bool start; 283 284 SharedState() : cv(&mu) { } 285 }; 286 287 // Per-thread state for concurrent executions of the same benchmark. 288 struct ThreadState { 289 int tid; // 0..n-1 when running in n threads 290 Random rand; // Has different seeds for different threads 291 Stats stats; 292 SharedState* shared; 293 294 ThreadState(int index) 295 : tid(index), 296 rand(1000 + index) { 297 } 298 }; 299 300 } // namespace 301 302 class Benchmark { 303 private: 304 Cache* cache_; 305 const FilterPolicy* filter_policy_; 306 DB* db_; 307 int num_; 308 int value_size_; 309 int entries_per_batch_; 310 WriteOptions write_options_; 311 int reads_; 312 int heap_counter_; 313 314 void PrintHeader() { 315 const int kKeySize = 16; 316 PrintEnvironment(); 317 fprintf(stdout, "Keys: %d bytes each\n", kKeySize); 318 fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n", 319 FLAGS_value_size, 320 static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5)); 321 fprintf(stdout, "Entries: %d\n", num_); 322 fprintf(stdout, "RawSize: %.1f MB (estimated)\n", 323 ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_) 324 / 1048576.0)); 325 fprintf(stdout, "FileSize: %.1f MB (estimated)\n", 326 (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_) 327 / 1048576.0)); 328 PrintWarnings(); 329 fprintf(stdout, "------------------------------------------------\n"); 330 } 331 332 void PrintWarnings() { 333 #if defined(__GNUC__) && !defined(__OPTIMIZE__) 334 fprintf(stdout, 335 "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n" 336 ); 337 #endif 338 #ifndef NDEBUG 339 fprintf(stdout, 340 "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n"); 341 #endif 342 343 // See if snappy is working by attempting to compress a compressible string 344 const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"; 345 std::string compressed; 346 if (!port::Snappy_Compress(text, sizeof(text), &compressed)) { 347 fprintf(stdout, "WARNING: Snappy compression is not enabled\n"); 348 } else if (compressed.size() >= sizeof(text)) { 349 fprintf(stdout, "WARNING: Snappy compression is not effective\n"); 350 } 351 } 352 353 void PrintEnvironment() { 354 fprintf(stderr, "LevelDB: version %d.%d\n", 355 kMajorVersion, kMinorVersion); 356 357 #if defined(__linux) 358 time_t now = time(NULL); 359 fprintf(stderr, "Date: %s", ctime(&now)); // ctime() adds newline 360 361 FILE* cpuinfo = fopen("/proc/cpuinfo", "r"); 362 if (cpuinfo != NULL) { 363 char line[1000]; 364 int num_cpus = 0; 365 std::string cpu_type; 366 std::string cache_size; 367 while (fgets(line, sizeof(line), cpuinfo) != NULL) { 368 const char* sep = strchr(line, ':'); 369 if (sep == NULL) { 370 continue; 371 } 372 Slice key = TrimSpace(Slice(line, sep - 1 - line)); 373 Slice val = TrimSpace(Slice(sep + 1)); 374 if (key == "model name") { 375 ++num_cpus; 376 cpu_type = val.ToString(); 377 } else if (key == "cache size") { 378 cache_size = val.ToString(); 379 } 380 } 381 fclose(cpuinfo); 382 fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str()); 383 fprintf(stderr, "CPUCache: %s\n", cache_size.c_str()); 384 } 385 #endif 386 } 387 388 public: 389 Benchmark() 390 : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL), 391 filter_policy_(FLAGS_bloom_bits >= 0 392 ? NewBloomFilterPolicy(FLAGS_bloom_bits) 393 : NULL), 394 db_(NULL), 395 num_(FLAGS_num), 396 value_size_(FLAGS_value_size), 397 entries_per_batch_(1), 398 reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), 399 heap_counter_(0) { 400 std::vector<std::string> files; 401 Env::Default()->GetChildren(FLAGS_db, &files); 402 for (size_t i = 0; i < files.size(); i++) { 403 if (Slice(files[i]).starts_with("heap-")) { 404 Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]); 405 } 406 } 407 if (!FLAGS_use_existing_db) { 408 DestroyDB(FLAGS_db, Options()); 409 } 410 } 411 412 ~Benchmark() { 413 delete db_; 414 delete cache_; 415 delete filter_policy_; 416 } 417 418 void Run() { 419 PrintHeader(); 420 Open(); 421 422 const char* benchmarks = FLAGS_benchmarks; 423 while (benchmarks != NULL) { 424 const char* sep = strchr(benchmarks, ','); 425 Slice name; 426 if (sep == NULL) { 427 name = benchmarks; 428 benchmarks = NULL; 429 } else { 430 name = Slice(benchmarks, sep - benchmarks); 431 benchmarks = sep + 1; 432 } 433 434 // Reset parameters that may be overriddden bwlow 435 num_ = FLAGS_num; 436 reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads); 437 value_size_ = FLAGS_value_size; 438 entries_per_batch_ = 1; 439 write_options_ = WriteOptions(); 440 441 void (Benchmark::*method)(ThreadState*) = NULL; 442 bool fresh_db = false; 443 int num_threads = FLAGS_threads; 444 445 if (name == Slice("fillseq")) { 446 fresh_db = true; 447 method = &Benchmark::WriteSeq; 448 } else if (name == Slice("fillbatch")) { 449 fresh_db = true; 450 entries_per_batch_ = 1000; 451 method = &Benchmark::WriteSeq; 452 } else if (name == Slice("fillrandom")) { 453 fresh_db = true; 454 method = &Benchmark::WriteRandom; 455 } else if (name == Slice("overwrite")) { 456 fresh_db = false; 457 method = &Benchmark::WriteRandom; 458 } else if (name == Slice("fillsync")) { 459 fresh_db = true; 460 num_ /= 1000; 461 write_options_.sync = true; 462 method = &Benchmark::WriteRandom; 463 } else if (name == Slice("fill100K")) { 464 fresh_db = true; 465 num_ /= 1000; 466 value_size_ = 100 * 1000; 467 method = &Benchmark::WriteRandom; 468 } else if (name == Slice("readseq")) { 469 method = &Benchmark::ReadSequential; 470 } else if (name == Slice("readreverse")) { 471 method = &Benchmark::ReadReverse; 472 } else if (name == Slice("readrandom")) { 473 method = &Benchmark::ReadRandom; 474 } else if (name == Slice("readmissing")) { 475 method = &Benchmark::ReadMissing; 476 } else if (name == Slice("seekrandom")) { 477 method = &Benchmark::SeekRandom; 478 } else if (name == Slice("readhot")) { 479 method = &Benchmark::ReadHot; 480 } else if (name == Slice("readrandomsmall")) { 481 reads_ /= 1000; 482 method = &Benchmark::ReadRandom; 483 } else if (name == Slice("deleteseq")) { 484 method = &Benchmark::DeleteSeq; 485 } else if (name == Slice("deleterandom")) { 486 method = &Benchmark::DeleteRandom; 487 } else if (name == Slice("readwhilewriting")) { 488 num_threads++; // Add extra thread for writing 489 method = &Benchmark::ReadWhileWriting; 490 } else if (name == Slice("compact")) { 491 method = &Benchmark::Compact; 492 } else if (name == Slice("crc32c")) { 493 method = &Benchmark::Crc32c; 494 } else if (name == Slice("acquireload")) { 495 method = &Benchmark::AcquireLoad; 496 } else if (name == Slice("snappycomp")) { 497 method = &Benchmark::SnappyCompress; 498 } else if (name == Slice("snappyuncomp")) { 499 method = &Benchmark::SnappyUncompress; 500 } else if (name == Slice("heapprofile")) { 501 HeapProfile(); 502 } else if (name == Slice("stats")) { 503 PrintStats("leveldb.stats"); 504 } else if (name == Slice("sstables")) { 505 PrintStats("leveldb.sstables"); 506 } else { 507 if (name != Slice()) { // No error message for empty name 508 fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str()); 509 } 510 } 511 512 if (fresh_db) { 513 if (FLAGS_use_existing_db) { 514 fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n", 515 name.ToString().c_str()); 516 method = NULL; 517 } else { 518 delete db_; 519 db_ = NULL; 520 DestroyDB(FLAGS_db, Options()); 521 Open(); 522 } 523 } 524 525 if (method != NULL) { 526 RunBenchmark(num_threads, name, method); 527 } 528 } 529 } 530 531 private: 532 struct ThreadArg { 533 Benchmark* bm; 534 SharedState* shared; 535 ThreadState* thread; 536 void (Benchmark::*method)(ThreadState*); 537 }; 538 539 static void ThreadBody(void* v) { 540 ThreadArg* arg = reinterpret_cast<ThreadArg*>(v); 541 SharedState* shared = arg->shared; 542 ThreadState* thread = arg->thread; 543 { 544 MutexLock l(&shared->mu); 545 shared->num_initialized++; 546 if (shared->num_initialized >= shared->total) { 547 shared->cv.SignalAll(); 548 } 549 while (!shared->start) { 550 shared->cv.Wait(); 551 } 552 } 553 554 thread->stats.Start(); 555 (arg->bm->*(arg->method))(thread); 556 thread->stats.Stop(); 557 558 { 559 MutexLock l(&shared->mu); 560 shared->num_done++; 561 if (shared->num_done >= shared->total) { 562 shared->cv.SignalAll(); 563 } 564 } 565 } 566 567 void RunBenchmark(int n, Slice name, 568 void (Benchmark::*method)(ThreadState*)) { 569 SharedState shared; 570 shared.total = n; 571 shared.num_initialized = 0; 572 shared.num_done = 0; 573 shared.start = false; 574 575 ThreadArg* arg = new ThreadArg[n]; 576 for (int i = 0; i < n; i++) { 577 arg[i].bm = this; 578 arg[i].method = method; 579 arg[i].shared = &shared; 580 arg[i].thread = new ThreadState(i); 581 arg[i].thread->shared = &shared; 582 Env::Default()->StartThread(ThreadBody, &arg[i]); 583 } 584 585 shared.mu.Lock(); 586 while (shared.num_initialized < n) { 587 shared.cv.Wait(); 588 } 589 590 shared.start = true; 591 shared.cv.SignalAll(); 592 while (shared.num_done < n) { 593 shared.cv.Wait(); 594 } 595 shared.mu.Unlock(); 596 597 for (int i = 1; i < n; i++) { 598 arg[0].thread->stats.Merge(arg[i].thread->stats); 599 } 600 arg[0].thread->stats.Report(name); 601 602 for (int i = 0; i < n; i++) { 603 delete arg[i].thread; 604 } 605 delete[] arg; 606 } 607 608 void Crc32c(ThreadState* thread) { 609 // Checksum about 500MB of data total 610 const int size = 4096; 611 const char* label = "(4K per op)"; 612 std::string data(size, 'x'); 613 int64_t bytes = 0; 614 uint32_t crc = 0; 615 while (bytes < 500 * 1048576) { 616 crc = crc32c::Value(data.data(), size); 617 thread->stats.FinishedSingleOp(); 618 bytes += size; 619 } 620 // Print so result is not dead 621 fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc)); 622 623 thread->stats.AddBytes(bytes); 624 thread->stats.AddMessage(label); 625 } 626 627 void AcquireLoad(ThreadState* thread) { 628 int dummy; 629 port::AtomicPointer ap(&dummy); 630 int count = 0; 631 void *ptr = NULL; 632 thread->stats.AddMessage("(each op is 1000 loads)"); 633 while (count < 100000) { 634 for (int i = 0; i < 1000; i++) { 635 ptr = ap.Acquire_Load(); 636 } 637 count++; 638 thread->stats.FinishedSingleOp(); 639 } 640 if (ptr == NULL) exit(1); // Disable unused variable warning. 641 } 642 643 void SnappyCompress(ThreadState* thread) { 644 RandomGenerator gen; 645 Slice input = gen.Generate(Options().block_size); 646 int64_t bytes = 0; 647 int64_t produced = 0; 648 bool ok = true; 649 std::string compressed; 650 while (ok && bytes < 1024 * 1048576) { // Compress 1G 651 ok = port::Snappy_Compress(input.data(), input.size(), &compressed); 652 produced += compressed.size(); 653 bytes += input.size(); 654 thread->stats.FinishedSingleOp(); 655 } 656 657 if (!ok) { 658 thread->stats.AddMessage("(snappy failure)"); 659 } else { 660 char buf[100]; 661 snprintf(buf, sizeof(buf), "(output: %.1f%%)", 662 (produced * 100.0) / bytes); 663 thread->stats.AddMessage(buf); 664 thread->stats.AddBytes(bytes); 665 } 666 } 667 668 void SnappyUncompress(ThreadState* thread) { 669 RandomGenerator gen; 670 Slice input = gen.Generate(Options().block_size); 671 std::string compressed; 672 bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); 673 int64_t bytes = 0; 674 char* uncompressed = new char[input.size()]; 675 while (ok && bytes < 1024 * 1048576) { // Compress 1G 676 ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), 677 uncompressed); 678 bytes += input.size(); 679 thread->stats.FinishedSingleOp(); 680 } 681 delete[] uncompressed; 682 683 if (!ok) { 684 thread->stats.AddMessage("(snappy failure)"); 685 } else { 686 thread->stats.AddBytes(bytes); 687 } 688 } 689 690 void Open() { 691 assert(db_ == NULL); 692 Options options; 693 options.create_if_missing = !FLAGS_use_existing_db; 694 options.block_cache = cache_; 695 options.write_buffer_size = FLAGS_write_buffer_size; 696 options.max_open_files = FLAGS_open_files; 697 options.filter_policy = filter_policy_; 698 Status s = DB::Open(options, FLAGS_db, &db_); 699 if (!s.ok()) { 700 fprintf(stderr, "open error: %s\n", s.ToString().c_str()); 701 exit(1); 702 } 703 } 704 705 void WriteSeq(ThreadState* thread) { 706 DoWrite(thread, true); 707 } 708 709 void WriteRandom(ThreadState* thread) { 710 DoWrite(thread, false); 711 } 712 713 void DoWrite(ThreadState* thread, bool seq) { 714 if (num_ != FLAGS_num) { 715 char msg[100]; 716 snprintf(msg, sizeof(msg), "(%d ops)", num_); 717 thread->stats.AddMessage(msg); 718 } 719 720 RandomGenerator gen; 721 WriteBatch batch; 722 Status s; 723 int64_t bytes = 0; 724 for (int i = 0; i < num_; i += entries_per_batch_) { 725 batch.Clear(); 726 for (int j = 0; j < entries_per_batch_; j++) { 727 const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num); 728 char key[100]; 729 snprintf(key, sizeof(key), "%016d", k); 730 batch.Put(key, gen.Generate(value_size_)); 731 bytes += value_size_ + strlen(key); 732 thread->stats.FinishedSingleOp(); 733 } 734 s = db_->Write(write_options_, &batch); 735 if (!s.ok()) { 736 fprintf(stderr, "put error: %s\n", s.ToString().c_str()); 737 exit(1); 738 } 739 } 740 thread->stats.AddBytes(bytes); 741 } 742 743 void ReadSequential(ThreadState* thread) { 744 Iterator* iter = db_->NewIterator(ReadOptions()); 745 int i = 0; 746 int64_t bytes = 0; 747 for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { 748 bytes += iter->key().size() + iter->value().size(); 749 thread->stats.FinishedSingleOp(); 750 ++i; 751 } 752 delete iter; 753 thread->stats.AddBytes(bytes); 754 } 755 756 void ReadReverse(ThreadState* thread) { 757 Iterator* iter = db_->NewIterator(ReadOptions()); 758 int i = 0; 759 int64_t bytes = 0; 760 for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { 761 bytes += iter->key().size() + iter->value().size(); 762 thread->stats.FinishedSingleOp(); 763 ++i; 764 } 765 delete iter; 766 thread->stats.AddBytes(bytes); 767 } 768 769 void ReadRandom(ThreadState* thread) { 770 ReadOptions options; 771 std::string value; 772 int found = 0; 773 for (int i = 0; i < reads_; i++) { 774 char key[100]; 775 const int k = thread->rand.Next() % FLAGS_num; 776 snprintf(key, sizeof(key), "%016d", k); 777 if (db_->Get(options, key, &value).ok()) { 778 found++; 779 } 780 thread->stats.FinishedSingleOp(); 781 } 782 char msg[100]; 783 snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); 784 thread->stats.AddMessage(msg); 785 } 786 787 void ReadMissing(ThreadState* thread) { 788 ReadOptions options; 789 std::string value; 790 for (int i = 0; i < reads_; i++) { 791 char key[100]; 792 const int k = thread->rand.Next() % FLAGS_num; 793 snprintf(key, sizeof(key), "%016d.", k); 794 db_->Get(options, key, &value); 795 thread->stats.FinishedSingleOp(); 796 } 797 } 798 799 void ReadHot(ThreadState* thread) { 800 ReadOptions options; 801 std::string value; 802 const int range = (FLAGS_num + 99) / 100; 803 for (int i = 0; i < reads_; i++) { 804 char key[100]; 805 const int k = thread->rand.Next() % range; 806 snprintf(key, sizeof(key), "%016d", k); 807 db_->Get(options, key, &value); 808 thread->stats.FinishedSingleOp(); 809 } 810 } 811 812 void SeekRandom(ThreadState* thread) { 813 ReadOptions options; 814 std::string value; 815 int found = 0; 816 for (int i = 0; i < reads_; i++) { 817 Iterator* iter = db_->NewIterator(options); 818 char key[100]; 819 const int k = thread->rand.Next() % FLAGS_num; 820 snprintf(key, sizeof(key), "%016d", k); 821 iter->Seek(key); 822 if (iter->Valid() && iter->key() == key) found++; 823 delete iter; 824 thread->stats.FinishedSingleOp(); 825 } 826 char msg[100]; 827 snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); 828 thread->stats.AddMessage(msg); 829 } 830 831 void DoDelete(ThreadState* thread, bool seq) { 832 RandomGenerator gen; 833 WriteBatch batch; 834 Status s; 835 for (int i = 0; i < num_; i += entries_per_batch_) { 836 batch.Clear(); 837 for (int j = 0; j < entries_per_batch_; j++) { 838 const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num); 839 char key[100]; 840 snprintf(key, sizeof(key), "%016d", k); 841 batch.Delete(key); 842 thread->stats.FinishedSingleOp(); 843 } 844 s = db_->Write(write_options_, &batch); 845 if (!s.ok()) { 846 fprintf(stderr, "del error: %s\n", s.ToString().c_str()); 847 exit(1); 848 } 849 } 850 } 851 852 void DeleteSeq(ThreadState* thread) { 853 DoDelete(thread, true); 854 } 855 856 void DeleteRandom(ThreadState* thread) { 857 DoDelete(thread, false); 858 } 859 860 void ReadWhileWriting(ThreadState* thread) { 861 if (thread->tid > 0) { 862 ReadRandom(thread); 863 } else { 864 // Special thread that keeps writing until other threads are done. 865 RandomGenerator gen; 866 while (true) { 867 { 868 MutexLock l(&thread->shared->mu); 869 if (thread->shared->num_done + 1 >= thread->shared->num_initialized) { 870 // Other threads have finished 871 break; 872 } 873 } 874 875 const int k = thread->rand.Next() % FLAGS_num; 876 char key[100]; 877 snprintf(key, sizeof(key), "%016d", k); 878 Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); 879 if (!s.ok()) { 880 fprintf(stderr, "put error: %s\n", s.ToString().c_str()); 881 exit(1); 882 } 883 } 884 885 // Do not count any of the preceding work/delay in stats. 886 thread->stats.Start(); 887 } 888 } 889 890 void Compact(ThreadState* thread) { 891 db_->CompactRange(NULL, NULL); 892 } 893 894 void PrintStats(const char* key) { 895 std::string stats; 896 if (!db_->GetProperty(key, &stats)) { 897 stats = "(failed)"; 898 } 899 fprintf(stdout, "\n%s\n", stats.c_str()); 900 } 901 902 static void WriteToFile(void* arg, const char* buf, int n) { 903 reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n)); 904 } 905 906 void HeapProfile() { 907 char fname[100]; 908 snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_); 909 WritableFile* file; 910 Status s = Env::Default()->NewWritableFile(fname, &file); 911 if (!s.ok()) { 912 fprintf(stderr, "%s\n", s.ToString().c_str()); 913 return; 914 } 915 bool ok = port::GetHeapProfile(WriteToFile, file); 916 delete file; 917 if (!ok) { 918 fprintf(stderr, "heap profiling not supported\n"); 919 Env::Default()->DeleteFile(fname); 920 } 921 } 922 }; 923 924 } // namespace leveldb 925 926 int main(int argc, char** argv) { 927 FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; 928 FLAGS_open_files = leveldb::Options().max_open_files; 929 std::string default_db_path; 930 931 for (int i = 1; i < argc; i++) { 932 double d; 933 int n; 934 char junk; 935 if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) { 936 FLAGS_benchmarks = argv[i] + strlen("--benchmarks="); 937 } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) { 938 FLAGS_compression_ratio = d; 939 } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 && 940 (n == 0 || n == 1)) { 941 FLAGS_histogram = n; 942 } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 && 943 (n == 0 || n == 1)) { 944 FLAGS_use_existing_db = n; 945 } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) { 946 FLAGS_num = n; 947 } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) { 948 FLAGS_reads = n; 949 } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) { 950 FLAGS_threads = n; 951 } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) { 952 FLAGS_value_size = n; 953 } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { 954 FLAGS_write_buffer_size = n; 955 } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) { 956 FLAGS_cache_size = n; 957 } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) { 958 FLAGS_bloom_bits = n; 959 } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { 960 FLAGS_open_files = n; 961 } else if (strncmp(argv[i], "--db=", 5) == 0) { 962 FLAGS_db = argv[i] + 5; 963 } else { 964 fprintf(stderr, "Invalid flag '%s'\n", argv[i]); 965 exit(1); 966 } 967 } 968 969 // Choose a location for the test database if none given with --db=<path> 970 if (FLAGS_db == NULL) { 971 leveldb::Env::Default()->GetTestDirectory(&default_db_path); 972 default_db_path += "/dbbench"; 973 FLAGS_db = default_db_path.c_str(); 974 } 975 976 leveldb::Benchmark benchmark; 977 benchmark.Run(); 978 return 0; 979 } 980