Home | History | Annotate | Download | only in db
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include "db/log_reader.h"
      6 
      7 #include <stdio.h>
      8 #include "leveldb/env.h"
      9 #include "util/coding.h"
     10 #include "util/crc32c.h"
     11 
     12 namespace leveldb {
     13 namespace log {
     14 
     15 Reader::Reporter::~Reporter() {
     16 }
     17 
     18 Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
     19                uint64_t initial_offset)
     20     : file_(file),
     21       reporter_(reporter),
     22       checksum_(checksum),
     23       backing_store_(new char[kBlockSize]),
     24       buffer_(),
     25       eof_(false),
     26       last_record_offset_(0),
     27       end_of_buffer_offset_(0),
     28       initial_offset_(initial_offset) {
     29 }
     30 
     31 Reader::~Reader() {
     32   delete[] backing_store_;
     33 }
     34 
     35 bool Reader::SkipToInitialBlock() {
     36   size_t offset_in_block = initial_offset_ % kBlockSize;
     37   uint64_t block_start_location = initial_offset_ - offset_in_block;
     38 
     39   // Don't search a block if we'd be in the trailer
     40   if (offset_in_block > kBlockSize - 6) {
     41     offset_in_block = 0;
     42     block_start_location += kBlockSize;
     43   }
     44 
     45   end_of_buffer_offset_ = block_start_location;
     46 
     47   // Skip to start of first block that can contain the initial record
     48   if (block_start_location > 0) {
     49     Status skip_status = file_->Skip(block_start_location);
     50     if (!skip_status.ok()) {
     51       ReportDrop(block_start_location, skip_status);
     52       return false;
     53     }
     54   }
     55 
     56   return true;
     57 }
     58 
     59 bool Reader::ReadRecord(Slice* record, std::string* scratch) {
     60   if (last_record_offset_ < initial_offset_) {
     61     if (!SkipToInitialBlock()) {
     62       return false;
     63     }
     64   }
     65 
     66   scratch->clear();
     67   record->clear();
     68   bool in_fragmented_record = false;
     69   // Record offset of the logical record that we're reading
     70   // 0 is a dummy value to make compilers happy
     71   uint64_t prospective_record_offset = 0;
     72 
     73   Slice fragment;
     74   while (true) {
     75     uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
     76     const unsigned int record_type = ReadPhysicalRecord(&fragment);
     77     switch (record_type) {
     78       case kFullType:
     79         if (in_fragmented_record) {
     80           // Handle bug in earlier versions of log::Writer where
     81           // it could emit an empty kFirstType record at the tail end
     82           // of a block followed by a kFullType or kFirstType record
     83           // at the beginning of the next block.
     84           if (scratch->empty()) {
     85             in_fragmented_record = false;
     86           } else {
     87             ReportCorruption(scratch->size(), "partial record without end(1)");
     88           }
     89         }
     90         prospective_record_offset = physical_record_offset;
     91         scratch->clear();
     92         *record = fragment;
     93         last_record_offset_ = prospective_record_offset;
     94         return true;
     95 
     96       case kFirstType:
     97         if (in_fragmented_record) {
     98           // Handle bug in earlier versions of log::Writer where
     99           // it could emit an empty kFirstType record at the tail end
    100           // of a block followed by a kFullType or kFirstType record
    101           // at the beginning of the next block.
    102           if (scratch->empty()) {
    103             in_fragmented_record = false;
    104           } else {
    105             ReportCorruption(scratch->size(), "partial record without end(2)");
    106           }
    107         }
    108         prospective_record_offset = physical_record_offset;
    109         scratch->assign(fragment.data(), fragment.size());
    110         in_fragmented_record = true;
    111         break;
    112 
    113       case kMiddleType:
    114         if (!in_fragmented_record) {
    115           ReportCorruption(fragment.size(),
    116                            "missing start of fragmented record(1)");
    117         } else {
    118           scratch->append(fragment.data(), fragment.size());
    119         }
    120         break;
    121 
    122       case kLastType:
    123         if (!in_fragmented_record) {
    124           ReportCorruption(fragment.size(),
    125                            "missing start of fragmented record(2)");
    126         } else {
    127           scratch->append(fragment.data(), fragment.size());
    128           *record = Slice(*scratch);
    129           last_record_offset_ = prospective_record_offset;
    130           return true;
    131         }
    132         break;
    133 
    134       case kEof:
    135         if (in_fragmented_record) {
    136           // This can be caused by the writer dying immediately after
    137           // writing a physical record but before completing the next; don't
    138           // treat it as a corruption, just ignore the entire logical record.
    139           scratch->clear();
    140         }
    141         return false;
    142 
    143       case kBadRecord:
    144         if (in_fragmented_record) {
    145           ReportCorruption(scratch->size(), "error in middle of record");
    146           in_fragmented_record = false;
    147           scratch->clear();
    148         }
    149         break;
    150 
    151       default: {
    152         char buf[40];
    153         snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
    154         ReportCorruption(
    155             (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
    156             buf);
    157         in_fragmented_record = false;
    158         scratch->clear();
    159         break;
    160       }
    161     }
    162   }
    163   return false;
    164 }
    165 
    166 uint64_t Reader::LastRecordOffset() {
    167   return last_record_offset_;
    168 }
    169 
    170 void Reader::ReportCorruption(size_t bytes, const char* reason) {
    171   ReportDrop(bytes, Status::Corruption(reason));
    172 }
    173 
    174 void Reader::ReportDrop(size_t bytes, const Status& reason) {
    175   if (reporter_ != NULL &&
    176       end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
    177     reporter_->Corruption(bytes, reason);
    178   }
    179 }
    180 
    181 unsigned int Reader::ReadPhysicalRecord(Slice* result) {
    182   while (true) {
    183     if (buffer_.size() < kHeaderSize) {
    184       if (!eof_) {
    185         // Last read was a full read, so this is a trailer to skip
    186         buffer_.clear();
    187         Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
    188         end_of_buffer_offset_ += buffer_.size();
    189         if (!status.ok()) {
    190           buffer_.clear();
    191           ReportDrop(kBlockSize, status);
    192           eof_ = true;
    193           return kEof;
    194         } else if (buffer_.size() < kBlockSize) {
    195           eof_ = true;
    196         }
    197         continue;
    198       } else {
    199         // Note that if buffer_ is non-empty, we have a truncated header at the
    200         // end of the file, which can be caused by the writer crashing in the
    201         // middle of writing the header. Instead of considering this an error,
    202         // just report EOF.
    203         buffer_.clear();
    204         return kEof;
    205       }
    206     }
    207 
    208     // Parse the header
    209     const char* header = buffer_.data();
    210     const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
    211     const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
    212     const unsigned int type = header[6];
    213     const uint32_t length = a | (b << 8);
    214     if (kHeaderSize + length > buffer_.size()) {
    215       size_t drop_size = buffer_.size();
    216       buffer_.clear();
    217       if (!eof_) {
    218         ReportCorruption(drop_size, "bad record length");
    219         return kBadRecord;
    220       }
    221       // If the end of the file has been reached without reading |length| bytes
    222       // of payload, assume the writer died in the middle of writing the record.
    223       // Don't report a corruption.
    224       return kEof;
    225     }
    226 
    227     if (type == kZeroType && length == 0) {
    228       // Skip zero length record without reporting any drops since
    229       // such records are produced by the mmap based writing code in
    230       // env_posix.cc that preallocates file regions.
    231       buffer_.clear();
    232       return kBadRecord;
    233     }
    234 
    235     // Check crc
    236     if (checksum_) {
    237       uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
    238       uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
    239       if (actual_crc != expected_crc) {
    240         // Drop the rest of the buffer since "length" itself may have
    241         // been corrupted and if we trust it, we could find some
    242         // fragment of a real log record that just happens to look
    243         // like a valid log record.
    244         size_t drop_size = buffer_.size();
    245         buffer_.clear();
    246         ReportCorruption(drop_size, "checksum mismatch");
    247         return kBadRecord;
    248       }
    249     }
    250 
    251     buffer_.remove_prefix(kHeaderSize + length);
    252 
    253     // Skip physical record that started before initial_offset_
    254     if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
    255         initial_offset_) {
    256       result->clear();
    257       return kBadRecord;
    258     }
    259 
    260     *result = Slice(header + kHeaderSize, length);
    261     return type;
    262   }
    263 }
    264 
    265 }  // namespace log
    266 }  // namespace leveldb
    267