Home | History | Annotate | Download | only in table
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include "leveldb/table_builder.h"
      6 
      7 #include <assert.h>
      8 #include "leveldb/comparator.h"
      9 #include "leveldb/env.h"
     10 #include "leveldb/filter_policy.h"
     11 #include "leveldb/options.h"
     12 #include "table/block_builder.h"
     13 #include "table/filter_block.h"
     14 #include "table/format.h"
     15 #include "util/coding.h"
     16 #include "util/crc32c.h"
     17 
     18 namespace leveldb {
     19 
     20 struct TableBuilder::Rep {
     21   Options options;
     22   Options index_block_options;
     23   WritableFile* file;
     24   uint64_t offset;
     25   Status status;
     26   BlockBuilder data_block;
     27   BlockBuilder index_block;
     28   std::string last_key;
     29   int64_t num_entries;
     30   bool closed;          // Either Finish() or Abandon() has been called.
     31   FilterBlockBuilder* filter_block;
     32 
     33   // We do not emit the index entry for a block until we have seen the
     34   // first key for the next data block.  This allows us to use shorter
     35   // keys in the index block.  For example, consider a block boundary
     36   // between the keys "the quick brown fox" and "the who".  We can use
     37   // "the r" as the key for the index block entry since it is >= all
     38   // entries in the first block and < all entries in subsequent
     39   // blocks.
     40   //
     41   // Invariant: r->pending_index_entry is true only if data_block is empty.
     42   bool pending_index_entry;
     43   BlockHandle pending_handle;  // Handle to add to index block
     44 
     45   std::string compressed_output;
     46 
     47   Rep(const Options& opt, WritableFile* f)
     48       : options(opt),
     49         index_block_options(opt),
     50         file(f),
     51         offset(0),
     52         data_block(&options),
     53         index_block(&index_block_options),
     54         num_entries(0),
     55         closed(false),
     56         filter_block(opt.filter_policy == NULL ? NULL
     57                      : new FilterBlockBuilder(opt.filter_policy)),
     58         pending_index_entry(false) {
     59     index_block_options.block_restart_interval = 1;
     60   }
     61 };
     62 
     63 TableBuilder::TableBuilder(const Options& options, WritableFile* file)
     64     : rep_(new Rep(options, file)) {
     65   if (rep_->filter_block != NULL) {
     66     rep_->filter_block->StartBlock(0);
     67   }
     68 }
     69 
     70 TableBuilder::~TableBuilder() {
     71   assert(rep_->closed);  // Catch errors where caller forgot to call Finish()
     72   delete rep_->filter_block;
     73   delete rep_;
     74 }
     75 
     76 Status TableBuilder::ChangeOptions(const Options& options) {
     77   // Note: if more fields are added to Options, update
     78   // this function to catch changes that should not be allowed to
     79   // change in the middle of building a Table.
     80   if (options.comparator != rep_->options.comparator) {
     81     return Status::InvalidArgument("changing comparator while building table");
     82   }
     83 
     84   // Note that any live BlockBuilders point to rep_->options and therefore
     85   // will automatically pick up the updated options.
     86   rep_->options = options;
     87   rep_->index_block_options = options;
     88   rep_->index_block_options.block_restart_interval = 1;
     89   return Status::OK();
     90 }
     91 
     92 void TableBuilder::Add(const Slice& key, const Slice& value) {
     93   Rep* r = rep_;
     94   assert(!r->closed);
     95   if (!ok()) return;
     96   if (r->num_entries > 0) {
     97     assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
     98   }
     99 
    100   if (r->pending_index_entry) {
    101     assert(r->data_block.empty());
    102     r->options.comparator->FindShortestSeparator(&r->last_key, key);
    103     std::string handle_encoding;
    104     r->pending_handle.EncodeTo(&handle_encoding);
    105     r->index_block.Add(r->last_key, Slice(handle_encoding));
    106     r->pending_index_entry = false;
    107   }
    108 
    109   if (r->filter_block != NULL) {
    110     r->filter_block->AddKey(key);
    111   }
    112 
    113   r->last_key.assign(key.data(), key.size());
    114   r->num_entries++;
    115   r->data_block.Add(key, value);
    116 
    117   const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
    118   if (estimated_block_size >= r->options.block_size) {
    119     Flush();
    120   }
    121 }
    122 
    123 void TableBuilder::Flush() {
    124   Rep* r = rep_;
    125   assert(!r->closed);
    126   if (!ok()) return;
    127   if (r->data_block.empty()) return;
    128   assert(!r->pending_index_entry);
    129   WriteBlock(&r->data_block, &r->pending_handle);
    130   if (ok()) {
    131     r->pending_index_entry = true;
    132     r->status = r->file->Flush();
    133   }
    134   if (r->filter_block != NULL) {
    135     r->filter_block->StartBlock(r->offset);
    136   }
    137 }
    138 
    139 void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
    140   // File format contains a sequence of blocks where each block has:
    141   //    block_data: uint8[n]
    142   //    type: uint8
    143   //    crc: uint32
    144   assert(ok());
    145   Rep* r = rep_;
    146   Slice raw = block->Finish();
    147 
    148   Slice block_contents;
    149   CompressionType type = r->options.compression;
    150   // TODO(postrelease): Support more compression options: zlib?
    151   switch (type) {
    152     case kNoCompression:
    153       block_contents = raw;
    154       break;
    155 
    156     case kSnappyCompression: {
    157       std::string* compressed = &r->compressed_output;
    158       if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
    159           compressed->size() < raw.size() - (raw.size() / 8u)) {
    160         block_contents = *compressed;
    161       } else {
    162         // Snappy not supported, or compressed less than 12.5%, so just
    163         // store uncompressed form
    164         block_contents = raw;
    165         type = kNoCompression;
    166       }
    167       break;
    168     }
    169   }
    170   WriteRawBlock(block_contents, type, handle);
    171   r->compressed_output.clear();
    172   block->Reset();
    173 }
    174 
    175 void TableBuilder::WriteRawBlock(const Slice& block_contents,
    176                                  CompressionType type,
    177                                  BlockHandle* handle) {
    178   Rep* r = rep_;
    179   handle->set_offset(r->offset);
    180   handle->set_size(block_contents.size());
    181   r->status = r->file->Append(block_contents);
    182   if (r->status.ok()) {
    183     char trailer[kBlockTrailerSize];
    184     trailer[0] = type;
    185     uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
    186     crc = crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
    187     EncodeFixed32(trailer+1, crc32c::Mask(crc));
    188     r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
    189     if (r->status.ok()) {
    190       r->offset += block_contents.size() + kBlockTrailerSize;
    191     }
    192   }
    193 }
    194 
    195 Status TableBuilder::status() const {
    196   return rep_->status;
    197 }
    198 
    199 Status TableBuilder::Finish() {
    200   Rep* r = rep_;
    201   Flush();
    202   assert(!r->closed);
    203   r->closed = true;
    204 
    205   BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
    206 
    207   // Write filter block
    208   if (ok() && r->filter_block != NULL) {
    209     WriteRawBlock(r->filter_block->Finish(), kNoCompression,
    210                   &filter_block_handle);
    211   }
    212 
    213   // Write metaindex block
    214   if (ok()) {
    215     BlockBuilder meta_index_block(&r->options);
    216     if (r->filter_block != NULL) {
    217       // Add mapping from "filter.Name" to location of filter data
    218       std::string key = "filter.";
    219       key.append(r->options.filter_policy->Name());
    220       std::string handle_encoding;
    221       filter_block_handle.EncodeTo(&handle_encoding);
    222       meta_index_block.Add(key, handle_encoding);
    223     }
    224 
    225     // TODO(postrelease): Add stats and other meta blocks
    226     WriteBlock(&meta_index_block, &metaindex_block_handle);
    227   }
    228 
    229   // Write index block
    230   if (ok()) {
    231     if (r->pending_index_entry) {
    232       r->options.comparator->FindShortSuccessor(&r->last_key);
    233       std::string handle_encoding;
    234       r->pending_handle.EncodeTo(&handle_encoding);
    235       r->index_block.Add(r->last_key, Slice(handle_encoding));
    236       r->pending_index_entry = false;
    237     }
    238     WriteBlock(&r->index_block, &index_block_handle);
    239   }
    240 
    241   // Write footer
    242   if (ok()) {
    243     Footer footer;
    244     footer.set_metaindex_handle(metaindex_block_handle);
    245     footer.set_index_handle(index_block_handle);
    246     std::string footer_encoding;
    247     footer.EncodeTo(&footer_encoding);
    248     r->status = r->file->Append(footer_encoding);
    249     if (r->status.ok()) {
    250       r->offset += footer_encoding.size();
    251     }
    252   }
    253   return r->status;
    254 }
    255 
    256 void TableBuilder::Abandon() {
    257   Rep* r = rep_;
    258   assert(!r->closed);
    259   r->closed = true;
    260 }
    261 
    262 uint64_t TableBuilder::NumEntries() const {
    263   return rep_->num_entries;
    264 }
    265 
    266 uint64_t TableBuilder::FileSize() const {
    267   return rep_->offset;
    268 }
    269 
    270 }  // namespace leveldb
    271