1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 // 5 // WriteBatch::rep_ := 6 // sequence: fixed64 7 // count: fixed32 8 // data: record[count] 9 // record := 10 // kTypeValue varstring varstring | 11 // kTypeDeletion varstring 12 // varstring := 13 // len: varint32 14 // data: uint8[len] 15 16 #include "leveldb/write_batch.h" 17 18 #include "leveldb/db.h" 19 #include "db/dbformat.h" 20 #include "db/memtable.h" 21 #include "db/write_batch_internal.h" 22 #include "util/coding.h" 23 24 namespace leveldb { 25 26 // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. 27 static const size_t kHeader = 12; 28 29 WriteBatch::WriteBatch() { 30 Clear(); 31 } 32 33 WriteBatch::~WriteBatch() { } 34 35 WriteBatch::Handler::~Handler() { } 36 37 void WriteBatch::Clear() { 38 rep_.clear(); 39 rep_.resize(kHeader); 40 } 41 42 Status WriteBatch::Iterate(Handler* handler) const { 43 Slice input(rep_); 44 if (input.size() < kHeader) { 45 return Status::Corruption("malformed WriteBatch (too small)"); 46 } 47 48 input.remove_prefix(kHeader); 49 Slice key, value; 50 int found = 0; 51 while (!input.empty()) { 52 found++; 53 char tag = input[0]; 54 input.remove_prefix(1); 55 switch (tag) { 56 case kTypeValue: 57 if (GetLengthPrefixedSlice(&input, &key) && 58 GetLengthPrefixedSlice(&input, &value)) { 59 handler->Put(key, value); 60 } else { 61 return Status::Corruption("bad WriteBatch Put"); 62 } 63 break; 64 case kTypeDeletion: 65 if (GetLengthPrefixedSlice(&input, &key)) { 66 handler->Delete(key); 67 } else { 68 return Status::Corruption("bad WriteBatch Delete"); 69 } 70 break; 71 default: 72 return Status::Corruption("unknown WriteBatch tag"); 73 } 74 } 75 if (found != WriteBatchInternal::Count(this)) { 76 return Status::Corruption("WriteBatch has wrong count"); 77 } else { 78 return Status::OK(); 79 } 80 } 81 82 int WriteBatchInternal::Count(const WriteBatch* b) { 83 return DecodeFixed32(b->rep_.data() + 8); 84 } 85 86 void WriteBatchInternal::SetCount(WriteBatch* b, int n) { 87 EncodeFixed32(&b->rep_[8], n); 88 } 89 90 SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) { 91 return SequenceNumber(DecodeFixed64(b->rep_.data())); 92 } 93 94 void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { 95 EncodeFixed64(&b->rep_[0], seq); 96 } 97 98 void WriteBatch::Put(const Slice& key, const Slice& value) { 99 WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); 100 rep_.push_back(static_cast<char>(kTypeValue)); 101 PutLengthPrefixedSlice(&rep_, key); 102 PutLengthPrefixedSlice(&rep_, value); 103 } 104 105 void WriteBatch::Delete(const Slice& key) { 106 WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); 107 rep_.push_back(static_cast<char>(kTypeDeletion)); 108 PutLengthPrefixedSlice(&rep_, key); 109 } 110 111 namespace { 112 class MemTableInserter : public WriteBatch::Handler { 113 public: 114 SequenceNumber sequence_; 115 MemTable* mem_; 116 117 virtual void Put(const Slice& key, const Slice& value) { 118 mem_->Add(sequence_, kTypeValue, key, value); 119 sequence_++; 120 } 121 virtual void Delete(const Slice& key) { 122 mem_->Add(sequence_, kTypeDeletion, key, Slice()); 123 sequence_++; 124 } 125 }; 126 } // namespace 127 128 Status WriteBatchInternal::InsertInto(const WriteBatch* b, 129 MemTable* memtable) { 130 MemTableInserter inserter; 131 inserter.sequence_ = WriteBatchInternal::Sequence(b); 132 inserter.mem_ = memtable; 133 return b->Iterate(&inserter); 134 } 135 136 void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { 137 assert(contents.size() >= kHeader); 138 b->rep_.assign(contents.data(), contents.size()); 139 } 140 141 void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { 142 SetCount(dst, Count(dst) + Count(src)); 143 assert(src->rep_.size() >= kHeader); 144 dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); 145 } 146 147 } // namespace leveldb 148