Home | History | Annotate | Download | only in db
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 //
      5 // WriteBatch::rep_ :=
      6 //    sequence: fixed64
      7 //    count: fixed32
      8 //    data: record[count]
      9 // record :=
     10 //    kTypeValue varstring varstring         |
     11 //    kTypeDeletion varstring
     12 // varstring :=
     13 //    len: varint32
     14 //    data: uint8[len]
     15 
     16 #include "leveldb/write_batch.h"
     17 
     18 #include "leveldb/db.h"
     19 #include "db/dbformat.h"
     20 #include "db/memtable.h"
     21 #include "db/write_batch_internal.h"
     22 #include "util/coding.h"
     23 
     24 namespace leveldb {
     25 
     26 // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
     27 static const size_t kHeader = 12;
     28 
     29 WriteBatch::WriteBatch() {
     30   Clear();
     31 }
     32 
     33 WriteBatch::~WriteBatch() { }
     34 
     35 WriteBatch::Handler::~Handler() { }
     36 
     37 void WriteBatch::Clear() {
     38   rep_.clear();
     39   rep_.resize(kHeader);
     40 }
     41 
     42 Status WriteBatch::Iterate(Handler* handler) const {
     43   Slice input(rep_);
     44   if (input.size() < kHeader) {
     45     return Status::Corruption("malformed WriteBatch (too small)");
     46   }
     47 
     48   input.remove_prefix(kHeader);
     49   Slice key, value;
     50   int found = 0;
     51   while (!input.empty()) {
     52     found++;
     53     char tag = input[0];
     54     input.remove_prefix(1);
     55     switch (tag) {
     56       case kTypeValue:
     57         if (GetLengthPrefixedSlice(&input, &key) &&
     58             GetLengthPrefixedSlice(&input, &value)) {
     59           handler->Put(key, value);
     60         } else {
     61           return Status::Corruption("bad WriteBatch Put");
     62         }
     63         break;
     64       case kTypeDeletion:
     65         if (GetLengthPrefixedSlice(&input, &key)) {
     66           handler->Delete(key);
     67         } else {
     68           return Status::Corruption("bad WriteBatch Delete");
     69         }
     70         break;
     71       default:
     72         return Status::Corruption("unknown WriteBatch tag");
     73     }
     74   }
     75   if (found != WriteBatchInternal::Count(this)) {
     76     return Status::Corruption("WriteBatch has wrong count");
     77   } else {
     78     return Status::OK();
     79   }
     80 }
     81 
     82 int WriteBatchInternal::Count(const WriteBatch* b) {
     83   return DecodeFixed32(b->rep_.data() + 8);
     84 }
     85 
     86 void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
     87   EncodeFixed32(&b->rep_[8], n);
     88 }
     89 
     90 SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
     91   return SequenceNumber(DecodeFixed64(b->rep_.data()));
     92 }
     93 
     94 void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
     95   EncodeFixed64(&b->rep_[0], seq);
     96 }
     97 
     98 void WriteBatch::Put(const Slice& key, const Slice& value) {
     99   WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
    100   rep_.push_back(static_cast<char>(kTypeValue));
    101   PutLengthPrefixedSlice(&rep_, key);
    102   PutLengthPrefixedSlice(&rep_, value);
    103 }
    104 
    105 void WriteBatch::Delete(const Slice& key) {
    106   WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
    107   rep_.push_back(static_cast<char>(kTypeDeletion));
    108   PutLengthPrefixedSlice(&rep_, key);
    109 }
    110 
    111 namespace {
    112 class MemTableInserter : public WriteBatch::Handler {
    113  public:
    114   SequenceNumber sequence_;
    115   MemTable* mem_;
    116 
    117   virtual void Put(const Slice& key, const Slice& value) {
    118     mem_->Add(sequence_, kTypeValue, key, value);
    119     sequence_++;
    120   }
    121   virtual void Delete(const Slice& key) {
    122     mem_->Add(sequence_, kTypeDeletion, key, Slice());
    123     sequence_++;
    124   }
    125 };
    126 }  // namespace
    127 
    128 Status WriteBatchInternal::InsertInto(const WriteBatch* b,
    129                                       MemTable* memtable) {
    130   MemTableInserter inserter;
    131   inserter.sequence_ = WriteBatchInternal::Sequence(b);
    132   inserter.mem_ = memtable;
    133   return b->Iterate(&inserter);
    134 }
    135 
    136 void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
    137   assert(contents.size() >= kHeader);
    138   b->rep_.assign(contents.data(), contents.size());
    139 }
    140 
    141 void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
    142   SetCount(dst, Count(dst) + Count(src));
    143   assert(src->rep_.size() >= kHeader);
    144   dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
    145 }
    146 
    147 }  // namespace leveldb
    148