Home | History | Annotate | Download | only in db
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include "db/log_reader.h"
      6 
      7 #include <stdio.h>
      8 #include "leveldb/env.h"
      9 #include "util/coding.h"
     10 #include "util/crc32c.h"
     11 
     12 namespace leveldb {
     13 namespace log {
     14 
     15 Reader::Reporter::~Reporter() {
     16 }
     17 
     18 Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
     19                uint64_t initial_offset)
     20     : file_(file),
     21       reporter_(reporter),
     22       checksum_(checksum),
     23       backing_store_(new char[kBlockSize]),
     24       buffer_(),
     25       eof_(false),
     26       last_record_offset_(0),
     27       end_of_buffer_offset_(0),
     28       initial_offset_(initial_offset) {
     29 }
     30 
     31 Reader::~Reader() {
     32   delete[] backing_store_;
     33 }
     34 
     35 bool Reader::SkipToInitialBlock() {
     36   size_t offset_in_block = initial_offset_ % kBlockSize;
     37   uint64_t block_start_location = initial_offset_ - offset_in_block;
     38 
     39   // Don't search a block if we'd be in the trailer
     40   if (offset_in_block > kBlockSize - 6) {
     41     offset_in_block = 0;
     42     block_start_location += kBlockSize;
     43   }
     44 
     45   end_of_buffer_offset_ = block_start_location;
     46 
     47   // Skip to start of first block that can contain the initial record
     48   if (block_start_location > 0) {
     49     Status skip_status = file_->Skip(block_start_location);
     50     if (!skip_status.ok()) {
     51       ReportDrop(block_start_location, skip_status);
     52       return false;
     53     }
     54   }
     55 
     56   return true;
     57 }
     58 
     59 bool Reader::ReadRecord(Slice* record, std::string* scratch) {
     60   if (last_record_offset_ < initial_offset_) {
     61     if (!SkipToInitialBlock()) {
     62       return false;
     63     }
     64   }
     65 
     66   scratch->clear();
     67   record->clear();
     68   bool in_fragmented_record = false;
     69   // Record offset of the logical record that we're reading
     70   // 0 is a dummy value to make compilers happy
     71   uint64_t prospective_record_offset = 0;
     72 
     73   Slice fragment;
     74   while (true) {
     75     uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
     76     const unsigned int record_type = ReadPhysicalRecord(&fragment);
     77     switch (record_type) {
     78       case kFullType:
     79         if (in_fragmented_record) {
     80           // Handle bug in earlier versions of log::Writer where
     81           // it could emit an empty kFirstType record at the tail end
     82           // of a block followed by a kFullType or kFirstType record
     83           // at the beginning of the next block.
     84           if (scratch->empty()) {
     85             in_fragmented_record = false;
     86           } else {
     87             ReportCorruption(scratch->size(), "partial record without end(1)");
     88           }
     89         }
     90         prospective_record_offset = physical_record_offset;
     91         scratch->clear();
     92         *record = fragment;
     93         last_record_offset_ = prospective_record_offset;
     94         return true;
     95 
     96       case kFirstType:
     97         if (in_fragmented_record) {
     98           // Handle bug in earlier versions of log::Writer where
     99           // it could emit an empty kFirstType record at the tail end
    100           // of a block followed by a kFullType or kFirstType record
    101           // at the beginning of the next block.
    102           if (scratch->empty()) {
    103             in_fragmented_record = false;
    104           } else {
    105             ReportCorruption(scratch->size(), "partial record without end(2)");
    106           }
    107         }
    108         prospective_record_offset = physical_record_offset;
    109         scratch->assign(fragment.data(), fragment.size());
    110         in_fragmented_record = true;
    111         break;
    112 
    113       case kMiddleType:
    114         if (!in_fragmented_record) {
    115           ReportCorruption(fragment.size(),
    116                            "missing start of fragmented record(1)");
    117         } else {
    118           scratch->append(fragment.data(), fragment.size());
    119         }
    120         break;
    121 
    122       case kLastType:
    123         if (!in_fragmented_record) {
    124           ReportCorruption(fragment.size(),
    125                            "missing start of fragmented record(2)");
    126         } else {
    127           scratch->append(fragment.data(), fragment.size());
    128           *record = Slice(*scratch);
    129           last_record_offset_ = prospective_record_offset;
    130           return true;
    131         }
    132         break;
    133 
    134       case kEof:
    135         if (in_fragmented_record) {
    136           ReportCorruption(scratch->size(), "partial record without end(3)");
    137           scratch->clear();
    138         }
    139         return false;
    140 
    141       case kBadRecord:
    142         if (in_fragmented_record) {
    143           ReportCorruption(scratch->size(), "error in middle of record");
    144           in_fragmented_record = false;
    145           scratch->clear();
    146         }
    147         break;
    148 
    149       default: {
    150         char buf[40];
    151         snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
    152         ReportCorruption(
    153             (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
    154             buf);
    155         in_fragmented_record = false;
    156         scratch->clear();
    157         break;
    158       }
    159     }
    160   }
    161   return false;
    162 }
    163 
    164 uint64_t Reader::LastRecordOffset() {
    165   return last_record_offset_;
    166 }
    167 
    168 void Reader::ReportCorruption(size_t bytes, const char* reason) {
    169   ReportDrop(bytes, Status::Corruption(reason));
    170 }
    171 
    172 void Reader::ReportDrop(size_t bytes, const Status& reason) {
    173   if (reporter_ != NULL &&
    174       end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
    175     reporter_->Corruption(bytes, reason);
    176   }
    177 }
    178 
    179 unsigned int Reader::ReadPhysicalRecord(Slice* result) {
    180   while (true) {
    181     if (buffer_.size() < kHeaderSize) {
    182       if (!eof_) {
    183         // Last read was a full read, so this is a trailer to skip
    184         buffer_.clear();
    185         Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
    186         end_of_buffer_offset_ += buffer_.size();
    187         if (!status.ok()) {
    188           buffer_.clear();
    189           ReportDrop(kBlockSize, status);
    190           eof_ = true;
    191           return kEof;
    192         } else if (buffer_.size() < kBlockSize) {
    193           eof_ = true;
    194         }
    195         continue;
    196       } else if (buffer_.size() == 0) {
    197         // End of file
    198         return kEof;
    199       } else {
    200         size_t drop_size = buffer_.size();
    201         buffer_.clear();
    202         ReportCorruption(drop_size, "truncated record at end of file");
    203         return kEof;
    204       }
    205     }
    206 
    207     // Parse the header
    208     const char* header = buffer_.data();
    209     const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
    210     const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
    211     const unsigned int type = header[6];
    212     const uint32_t length = a | (b << 8);
    213     if (kHeaderSize + length > buffer_.size()) {
    214       size_t drop_size = buffer_.size();
    215       buffer_.clear();
    216       ReportCorruption(drop_size, "bad record length");
    217       return kBadRecord;
    218     }
    219 
    220     if (type == kZeroType && length == 0) {
    221       // Skip zero length record without reporting any drops since
    222       // such records are produced by the mmap based writing code in
    223       // env_posix.cc that preallocates file regions.
    224       buffer_.clear();
    225       return kBadRecord;
    226     }
    227 
    228     // Check crc
    229     if (checksum_) {
    230       uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
    231       uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
    232       if (actual_crc != expected_crc) {
    233         // Drop the rest of the buffer since "length" itself may have
    234         // been corrupted and if we trust it, we could find some
    235         // fragment of a real log record that just happens to look
    236         // like a valid log record.
    237         size_t drop_size = buffer_.size();
    238         buffer_.clear();
    239         ReportCorruption(drop_size, "checksum mismatch");
    240         return kBadRecord;
    241       }
    242     }
    243 
    244     buffer_.remove_prefix(kHeaderSize + length);
    245 
    246     // Skip physical record that started before initial_offset_
    247     if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
    248         initial_offset_) {
    249       result->clear();
    250       return kBadRecord;
    251     }
    252 
    253     *result = Slice(header + kHeaderSize, length);
    254     return type;
    255   }
    256 }
    257 
    258 }  // namespace log
    259 }  // namespace leveldb
    260