1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include "db/log_reader.h" 6 7 #include <stdio.h> 8 #include "leveldb/env.h" 9 #include "util/coding.h" 10 #include "util/crc32c.h" 11 12 namespace leveldb { 13 namespace log { 14 15 Reader::Reporter::~Reporter() { 16 } 17 18 Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum, 19 uint64_t initial_offset) 20 : file_(file), 21 reporter_(reporter), 22 checksum_(checksum), 23 backing_store_(new char[kBlockSize]), 24 buffer_(), 25 eof_(false), 26 last_record_offset_(0), 27 end_of_buffer_offset_(0), 28 initial_offset_(initial_offset) { 29 } 30 31 Reader::~Reader() { 32 delete[] backing_store_; 33 } 34 35 bool Reader::SkipToInitialBlock() { 36 size_t offset_in_block = initial_offset_ % kBlockSize; 37 uint64_t block_start_location = initial_offset_ - offset_in_block; 38 39 // Don't search a block if we'd be in the trailer 40 if (offset_in_block > kBlockSize - 6) { 41 offset_in_block = 0; 42 block_start_location += kBlockSize; 43 } 44 45 end_of_buffer_offset_ = block_start_location; 46 47 // Skip to start of first block that can contain the initial record 48 if (block_start_location > 0) { 49 Status skip_status = file_->Skip(block_start_location); 50 if (!skip_status.ok()) { 51 ReportDrop(block_start_location, skip_status); 52 return false; 53 } 54 } 55 56 return true; 57 } 58 59 bool Reader::ReadRecord(Slice* record, std::string* scratch) { 60 if (last_record_offset_ < initial_offset_) { 61 if (!SkipToInitialBlock()) { 62 return false; 63 } 64 } 65 66 scratch->clear(); 67 record->clear(); 68 bool in_fragmented_record = false; 69 // Record offset of the logical record that we're reading 70 // 0 is a dummy value to make compilers happy 71 uint64_t prospective_record_offset = 0; 72 73 Slice fragment; 74 while (true) { 75 uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); 76 const unsigned int record_type = ReadPhysicalRecord(&fragment); 77 switch (record_type) { 78 case kFullType: 79 if (in_fragmented_record) { 80 // Handle bug in earlier versions of log::Writer where 81 // it could emit an empty kFirstType record at the tail end 82 // of a block followed by a kFullType or kFirstType record 83 // at the beginning of the next block. 84 if (scratch->empty()) { 85 in_fragmented_record = false; 86 } else { 87 ReportCorruption(scratch->size(), "partial record without end(1)"); 88 } 89 } 90 prospective_record_offset = physical_record_offset; 91 scratch->clear(); 92 *record = fragment; 93 last_record_offset_ = prospective_record_offset; 94 return true; 95 96 case kFirstType: 97 if (in_fragmented_record) { 98 // Handle bug in earlier versions of log::Writer where 99 // it could emit an empty kFirstType record at the tail end 100 // of a block followed by a kFullType or kFirstType record 101 // at the beginning of the next block. 102 if (scratch->empty()) { 103 in_fragmented_record = false; 104 } else { 105 ReportCorruption(scratch->size(), "partial record without end(2)"); 106 } 107 } 108 prospective_record_offset = physical_record_offset; 109 scratch->assign(fragment.data(), fragment.size()); 110 in_fragmented_record = true; 111 break; 112 113 case kMiddleType: 114 if (!in_fragmented_record) { 115 ReportCorruption(fragment.size(), 116 "missing start of fragmented record(1)"); 117 } else { 118 scratch->append(fragment.data(), fragment.size()); 119 } 120 break; 121 122 case kLastType: 123 if (!in_fragmented_record) { 124 ReportCorruption(fragment.size(), 125 "missing start of fragmented record(2)"); 126 } else { 127 scratch->append(fragment.data(), fragment.size()); 128 *record = Slice(*scratch); 129 last_record_offset_ = prospective_record_offset; 130 return true; 131 } 132 break; 133 134 case kEof: 135 if (in_fragmented_record) { 136 ReportCorruption(scratch->size(), "partial record without end(3)"); 137 scratch->clear(); 138 } 139 return false; 140 141 case kBadRecord: 142 if (in_fragmented_record) { 143 ReportCorruption(scratch->size(), "error in middle of record"); 144 in_fragmented_record = false; 145 scratch->clear(); 146 } 147 break; 148 149 default: { 150 char buf[40]; 151 snprintf(buf, sizeof(buf), "unknown record type %u", record_type); 152 ReportCorruption( 153 (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), 154 buf); 155 in_fragmented_record = false; 156 scratch->clear(); 157 break; 158 } 159 } 160 } 161 return false; 162 } 163 164 uint64_t Reader::LastRecordOffset() { 165 return last_record_offset_; 166 } 167 168 void Reader::ReportCorruption(size_t bytes, const char* reason) { 169 ReportDrop(bytes, Status::Corruption(reason)); 170 } 171 172 void Reader::ReportDrop(size_t bytes, const Status& reason) { 173 if (reporter_ != NULL && 174 end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) { 175 reporter_->Corruption(bytes, reason); 176 } 177 } 178 179 unsigned int Reader::ReadPhysicalRecord(Slice* result) { 180 while (true) { 181 if (buffer_.size() < kHeaderSize) { 182 if (!eof_) { 183 // Last read was a full read, so this is a trailer to skip 184 buffer_.clear(); 185 Status status = file_->Read(kBlockSize, &buffer_, backing_store_); 186 end_of_buffer_offset_ += buffer_.size(); 187 if (!status.ok()) { 188 buffer_.clear(); 189 ReportDrop(kBlockSize, status); 190 eof_ = true; 191 return kEof; 192 } else if (buffer_.size() < kBlockSize) { 193 eof_ = true; 194 } 195 continue; 196 } else if (buffer_.size() == 0) { 197 // End of file 198 return kEof; 199 } else { 200 size_t drop_size = buffer_.size(); 201 buffer_.clear(); 202 ReportCorruption(drop_size, "truncated record at end of file"); 203 return kEof; 204 } 205 } 206 207 // Parse the header 208 const char* header = buffer_.data(); 209 const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; 210 const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; 211 const unsigned int type = header[6]; 212 const uint32_t length = a | (b << 8); 213 if (kHeaderSize + length > buffer_.size()) { 214 size_t drop_size = buffer_.size(); 215 buffer_.clear(); 216 ReportCorruption(drop_size, "bad record length"); 217 return kBadRecord; 218 } 219 220 if (type == kZeroType && length == 0) { 221 // Skip zero length record without reporting any drops since 222 // such records are produced by the mmap based writing code in 223 // env_posix.cc that preallocates file regions. 224 buffer_.clear(); 225 return kBadRecord; 226 } 227 228 // Check crc 229 if (checksum_) { 230 uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); 231 uint32_t actual_crc = crc32c::Value(header + 6, 1 + length); 232 if (actual_crc != expected_crc) { 233 // Drop the rest of the buffer since "length" itself may have 234 // been corrupted and if we trust it, we could find some 235 // fragment of a real log record that just happens to look 236 // like a valid log record. 237 size_t drop_size = buffer_.size(); 238 buffer_.clear(); 239 ReportCorruption(drop_size, "checksum mismatch"); 240 return kBadRecord; 241 } 242 } 243 244 buffer_.remove_prefix(kHeaderSize + length); 245 246 // Skip physical record that started before initial_offset_ 247 if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < 248 initial_offset_) { 249 result->clear(); 250 return kBadRecord; 251 } 252 253 *result = Slice(header + kHeaderSize, length); 254 return type; 255 } 256 } 257 258 } // namespace log 259 } // namespace leveldb 260