1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include "db/log_reader.h" 6 7 #include <stdio.h> 8 #include "leveldb/env.h" 9 #include "util/coding.h" 10 #include "util/crc32c.h" 11 12 namespace leveldb { 13 namespace log { 14 15 Reader::Reporter::~Reporter() { 16 } 17 18 Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum, 19 uint64_t initial_offset) 20 : file_(file), 21 reporter_(reporter), 22 checksum_(checksum), 23 backing_store_(new char[kBlockSize]), 24 buffer_(), 25 eof_(false), 26 last_record_offset_(0), 27 end_of_buffer_offset_(0), 28 initial_offset_(initial_offset) { 29 } 30 31 Reader::~Reader() { 32 delete[] backing_store_; 33 } 34 35 bool Reader::SkipToInitialBlock() { 36 size_t offset_in_block = initial_offset_ % kBlockSize; 37 uint64_t block_start_location = initial_offset_ - offset_in_block; 38 39 // Don't search a block if we'd be in the trailer 40 if (offset_in_block > kBlockSize - 6) { 41 offset_in_block = 0; 42 block_start_location += kBlockSize; 43 } 44 45 end_of_buffer_offset_ = block_start_location; 46 47 // Skip to start of first block that can contain the initial record 48 if (block_start_location > 0) { 49 Status skip_status = file_->Skip(block_start_location); 50 if (!skip_status.ok()) { 51 ReportDrop(block_start_location, skip_status); 52 return false; 53 } 54 } 55 56 return true; 57 } 58 59 bool Reader::ReadRecord(Slice* record, std::string* scratch) { 60 if (last_record_offset_ < initial_offset_) { 61 if (!SkipToInitialBlock()) { 62 return false; 63 } 64 } 65 66 scratch->clear(); 67 record->clear(); 68 bool in_fragmented_record = false; 69 // Record offset of the logical record that we're reading 70 // 0 is a dummy value to make compilers happy 71 uint64_t prospective_record_offset = 0; 72 73 Slice fragment; 74 while (true) { 75 uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); 76 const unsigned int record_type = ReadPhysicalRecord(&fragment); 77 switch (record_type) { 78 case kFullType: 79 if (in_fragmented_record) { 80 // Handle bug in earlier versions of log::Writer where 81 // it could emit an empty kFirstType record at the tail end 82 // of a block followed by a kFullType or kFirstType record 83 // at the beginning of the next block. 84 if (scratch->empty()) { 85 in_fragmented_record = false; 86 } else { 87 ReportCorruption(scratch->size(), "partial record without end(1)"); 88 } 89 } 90 prospective_record_offset = physical_record_offset; 91 scratch->clear(); 92 *record = fragment; 93 last_record_offset_ = prospective_record_offset; 94 return true; 95 96 case kFirstType: 97 if (in_fragmented_record) { 98 // Handle bug in earlier versions of log::Writer where 99 // it could emit an empty kFirstType record at the tail end 100 // of a block followed by a kFullType or kFirstType record 101 // at the beginning of the next block. 102 if (scratch->empty()) { 103 in_fragmented_record = false; 104 } else { 105 ReportCorruption(scratch->size(), "partial record without end(2)"); 106 } 107 } 108 prospective_record_offset = physical_record_offset; 109 scratch->assign(fragment.data(), fragment.size()); 110 in_fragmented_record = true; 111 break; 112 113 case kMiddleType: 114 if (!in_fragmented_record) { 115 ReportCorruption(fragment.size(), 116 "missing start of fragmented record(1)"); 117 } else { 118 scratch->append(fragment.data(), fragment.size()); 119 } 120 break; 121 122 case kLastType: 123 if (!in_fragmented_record) { 124 ReportCorruption(fragment.size(), 125 "missing start of fragmented record(2)"); 126 } else { 127 scratch->append(fragment.data(), fragment.size()); 128 *record = Slice(*scratch); 129 last_record_offset_ = prospective_record_offset; 130 return true; 131 } 132 break; 133 134 case kEof: 135 if (in_fragmented_record) { 136 // This can be caused by the writer dying immediately after 137 // writing a physical record but before completing the next; don't 138 // treat it as a corruption, just ignore the entire logical record. 139 scratch->clear(); 140 } 141 return false; 142 143 case kBadRecord: 144 if (in_fragmented_record) { 145 ReportCorruption(scratch->size(), "error in middle of record"); 146 in_fragmented_record = false; 147 scratch->clear(); 148 } 149 break; 150 151 default: { 152 char buf[40]; 153 snprintf(buf, sizeof(buf), "unknown record type %u", record_type); 154 ReportCorruption( 155 (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), 156 buf); 157 in_fragmented_record = false; 158 scratch->clear(); 159 break; 160 } 161 } 162 } 163 return false; 164 } 165 166 uint64_t Reader::LastRecordOffset() { 167 return last_record_offset_; 168 } 169 170 void Reader::ReportCorruption(size_t bytes, const char* reason) { 171 ReportDrop(bytes, Status::Corruption(reason)); 172 } 173 174 void Reader::ReportDrop(size_t bytes, const Status& reason) { 175 if (reporter_ != NULL && 176 end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) { 177 reporter_->Corruption(bytes, reason); 178 } 179 } 180 181 unsigned int Reader::ReadPhysicalRecord(Slice* result) { 182 while (true) { 183 if (buffer_.size() < kHeaderSize) { 184 if (!eof_) { 185 // Last read was a full read, so this is a trailer to skip 186 buffer_.clear(); 187 Status status = file_->Read(kBlockSize, &buffer_, backing_store_); 188 end_of_buffer_offset_ += buffer_.size(); 189 if (!status.ok()) { 190 buffer_.clear(); 191 ReportDrop(kBlockSize, status); 192 eof_ = true; 193 return kEof; 194 } else if (buffer_.size() < kBlockSize) { 195 eof_ = true; 196 } 197 continue; 198 } else { 199 // Note that if buffer_ is non-empty, we have a truncated header at the 200 // end of the file, which can be caused by the writer crashing in the 201 // middle of writing the header. Instead of considering this an error, 202 // just report EOF. 203 buffer_.clear(); 204 return kEof; 205 } 206 } 207 208 // Parse the header 209 const char* header = buffer_.data(); 210 const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; 211 const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; 212 const unsigned int type = header[6]; 213 const uint32_t length = a | (b << 8); 214 if (kHeaderSize + length > buffer_.size()) { 215 size_t drop_size = buffer_.size(); 216 buffer_.clear(); 217 if (!eof_) { 218 ReportCorruption(drop_size, "bad record length"); 219 return kBadRecord; 220 } 221 // If the end of the file has been reached without reading |length| bytes 222 // of payload, assume the writer died in the middle of writing the record. 223 // Don't report a corruption. 224 return kEof; 225 } 226 227 if (type == kZeroType && length == 0) { 228 // Skip zero length record without reporting any drops since 229 // such records are produced by the mmap based writing code in 230 // env_posix.cc that preallocates file regions. 231 buffer_.clear(); 232 return kBadRecord; 233 } 234 235 // Check crc 236 if (checksum_) { 237 uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); 238 uint32_t actual_crc = crc32c::Value(header + 6, 1 + length); 239 if (actual_crc != expected_crc) { 240 // Drop the rest of the buffer since "length" itself may have 241 // been corrupted and if we trust it, we could find some 242 // fragment of a real log record that just happens to look 243 // like a valid log record. 244 size_t drop_size = buffer_.size(); 245 buffer_.clear(); 246 ReportCorruption(drop_size, "checksum mismatch"); 247 return kBadRecord; 248 } 249 } 250 251 buffer_.remove_prefix(kHeaderSize + length); 252 253 // Skip physical record that started before initial_offset_ 254 if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < 255 initial_offset_) { 256 result->clear(); 257 return kBadRecord; 258 } 259 260 *result = Slice(header + kHeaderSize, length); 261 return type; 262 } 263 } 264 265 } // namespace log 266 } // namespace leveldb 267