Home | History | Annotate | Download | only in db
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include "db/log_reader.h"
      6 #include "db/log_writer.h"
      7 #include "leveldb/env.h"
      8 #include "util/coding.h"
      9 #include "util/crc32c.h"
     10 #include "util/random.h"
     11 #include "util/testharness.h"
     12 
     13 namespace leveldb {
     14 namespace log {
     15 
     16 // Construct a string of the specified length made out of the supplied
     17 // partial string.
     18 static std::string BigString(const std::string& partial_string, size_t n) {
     19   std::string result;
     20   while (result.size() < n) {
     21     result.append(partial_string);
     22   }
     23   result.resize(n);
     24   return result;
     25 }
     26 
     27 // Construct a string from a number
     28 static std::string NumberString(int n) {
     29   char buf[50];
     30   snprintf(buf, sizeof(buf), "%d.", n);
     31   return std::string(buf);
     32 }
     33 
     34 // Return a skewed potentially long string
     35 static std::string RandomSkewedString(int i, Random* rnd) {
     36   return BigString(NumberString(i), rnd->Skewed(17));
     37 }
     38 
     39 class LogTest {
     40  private:
     41   class StringDest : public WritableFile {
     42    public:
     43     std::string contents_;
     44 
     45     virtual Status Close() { return Status::OK(); }
     46     virtual Status Flush() { return Status::OK(); }
     47     virtual Status Sync() { return Status::OK(); }
     48     virtual Status Append(const Slice& slice) {
     49       contents_.append(slice.data(), slice.size());
     50       return Status::OK();
     51     }
     52   };
     53 
     54   class StringSource : public SequentialFile {
     55    public:
     56     Slice contents_;
     57     bool force_error_;
     58     bool returned_partial_;
     59     StringSource() : force_error_(false), returned_partial_(false) { }
     60 
     61     virtual Status Read(size_t n, Slice* result, char* scratch) {
     62       ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
     63 
     64       if (force_error_) {
     65         force_error_ = false;
     66         returned_partial_ = true;
     67         return Status::Corruption("read error");
     68       }
     69 
     70       if (contents_.size() < n) {
     71         n = contents_.size();
     72         returned_partial_ = true;
     73       }
     74       *result = Slice(contents_.data(), n);
     75       contents_.remove_prefix(n);
     76       return Status::OK();
     77     }
     78 
     79     virtual Status Skip(uint64_t n) {
     80       if (n > contents_.size()) {
     81         contents_.clear();
     82         return Status::NotFound("in-memory file skipepd past end");
     83       }
     84 
     85       contents_.remove_prefix(n);
     86 
     87       return Status::OK();
     88     }
     89   };
     90 
     91   class ReportCollector : public Reader::Reporter {
     92    public:
     93     size_t dropped_bytes_;
     94     std::string message_;
     95 
     96     ReportCollector() : dropped_bytes_(0) { }
     97     virtual void Corruption(size_t bytes, const Status& status) {
     98       dropped_bytes_ += bytes;
     99       message_.append(status.ToString());
    100     }
    101   };
    102 
    103   StringDest dest_;
    104   StringSource source_;
    105   ReportCollector report_;
    106   bool reading_;
    107   Writer writer_;
    108   Reader reader_;
    109 
    110   // Record metadata for testing initial offset functionality
    111   static size_t initial_offset_record_sizes_[];
    112   static uint64_t initial_offset_last_record_offsets_[];
    113 
    114  public:
    115   LogTest() : reading_(false),
    116               writer_(&dest_),
    117               reader_(&source_, &report_, true/*checksum*/,
    118                       0/*initial_offset*/) {
    119   }
    120 
    121   void Write(const std::string& msg) {
    122     ASSERT_TRUE(!reading_) << "Write() after starting to read";
    123     writer_.AddRecord(Slice(msg));
    124   }
    125 
    126   size_t WrittenBytes() const {
    127     return dest_.contents_.size();
    128   }
    129 
    130   std::string Read() {
    131     if (!reading_) {
    132       reading_ = true;
    133       source_.contents_ = Slice(dest_.contents_);
    134     }
    135     std::string scratch;
    136     Slice record;
    137     if (reader_.ReadRecord(&record, &scratch)) {
    138       return record.ToString();
    139     } else {
    140       return "EOF";
    141     }
    142   }
    143 
    144   void IncrementByte(int offset, int delta) {
    145     dest_.contents_[offset] += delta;
    146   }
    147 
    148   void SetByte(int offset, char new_byte) {
    149     dest_.contents_[offset] = new_byte;
    150   }
    151 
    152   void ShrinkSize(int bytes) {
    153     dest_.contents_.resize(dest_.contents_.size() - bytes);
    154   }
    155 
    156   void FixChecksum(int header_offset, int len) {
    157     // Compute crc of type/len/data
    158     uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
    159     crc = crc32c::Mask(crc);
    160     EncodeFixed32(&dest_.contents_[header_offset], crc);
    161   }
    162 
    163   void ForceError() {
    164     source_.force_error_ = true;
    165   }
    166 
    167   size_t DroppedBytes() const {
    168     return report_.dropped_bytes_;
    169   }
    170 
    171   std::string ReportMessage() const {
    172     return report_.message_;
    173   }
    174 
    175   // Returns OK iff recorded error message contains "msg"
    176   std::string MatchError(const std::string& msg) const {
    177     if (report_.message_.find(msg) == std::string::npos) {
    178       return report_.message_;
    179     } else {
    180       return "OK";
    181     }
    182   }
    183 
    184   void WriteInitialOffsetLog() {
    185     for (int i = 0; i < 4; i++) {
    186       std::string record(initial_offset_record_sizes_[i],
    187                          static_cast<char>('a' + i));
    188       Write(record);
    189     }
    190   }
    191 
    192   void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
    193     WriteInitialOffsetLog();
    194     reading_ = true;
    195     source_.contents_ = Slice(dest_.contents_);
    196     Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
    197                                        WrittenBytes() + offset_past_end);
    198     Slice record;
    199     std::string scratch;
    200     ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
    201     delete offset_reader;
    202   }
    203 
    204   void CheckInitialOffsetRecord(uint64_t initial_offset,
    205                                 int expected_record_offset) {
    206     WriteInitialOffsetLog();
    207     reading_ = true;
    208     source_.contents_ = Slice(dest_.contents_);
    209     Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
    210                                        initial_offset);
    211     Slice record;
    212     std::string scratch;
    213     ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
    214     ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
    215               record.size());
    216     ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
    217               offset_reader->LastRecordOffset());
    218     ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
    219     delete offset_reader;
    220   }
    221 
    222 };
    223 
    224 size_t LogTest::initial_offset_record_sizes_[] =
    225     {10000,  // Two sizable records in first block
    226      10000,
    227      2 * log::kBlockSize - 1000,  // Span three blocks
    228      1};
    229 
    230 uint64_t LogTest::initial_offset_last_record_offsets_[] =
    231     {0,
    232      kHeaderSize + 10000,
    233      2 * (kHeaderSize + 10000),
    234      2 * (kHeaderSize + 10000) +
    235          (2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
    236 
    237 
    238 TEST(LogTest, Empty) {
    239   ASSERT_EQ("EOF", Read());
    240 }
    241 
    242 TEST(LogTest, ReadWrite) {
    243   Write("foo");
    244   Write("bar");
    245   Write("");
    246   Write("xxxx");
    247   ASSERT_EQ("foo", Read());
    248   ASSERT_EQ("bar", Read());
    249   ASSERT_EQ("", Read());
    250   ASSERT_EQ("xxxx", Read());
    251   ASSERT_EQ("EOF", Read());
    252   ASSERT_EQ("EOF", Read());  // Make sure reads at eof work
    253 }
    254 
    255 TEST(LogTest, ManyBlocks) {
    256   for (int i = 0; i < 100000; i++) {
    257     Write(NumberString(i));
    258   }
    259   for (int i = 0; i < 100000; i++) {
    260     ASSERT_EQ(NumberString(i), Read());
    261   }
    262   ASSERT_EQ("EOF", Read());
    263 }
    264 
    265 TEST(LogTest, Fragmentation) {
    266   Write("small");
    267   Write(BigString("medium", 50000));
    268   Write(BigString("large", 100000));
    269   ASSERT_EQ("small", Read());
    270   ASSERT_EQ(BigString("medium", 50000), Read());
    271   ASSERT_EQ(BigString("large", 100000), Read());
    272   ASSERT_EQ("EOF", Read());
    273 }
    274 
    275 TEST(LogTest, MarginalTrailer) {
    276   // Make a trailer that is exactly the same length as an empty record.
    277   const int n = kBlockSize - 2*kHeaderSize;
    278   Write(BigString("foo", n));
    279   ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
    280   Write("");
    281   Write("bar");
    282   ASSERT_EQ(BigString("foo", n), Read());
    283   ASSERT_EQ("", Read());
    284   ASSERT_EQ("bar", Read());
    285   ASSERT_EQ("EOF", Read());
    286 }
    287 
    288 TEST(LogTest, MarginalTrailer2) {
    289   // Make a trailer that is exactly the same length as an empty record.
    290   const int n = kBlockSize - 2*kHeaderSize;
    291   Write(BigString("foo", n));
    292   ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
    293   Write("bar");
    294   ASSERT_EQ(BigString("foo", n), Read());
    295   ASSERT_EQ("bar", Read());
    296   ASSERT_EQ("EOF", Read());
    297   ASSERT_EQ(0, DroppedBytes());
    298   ASSERT_EQ("", ReportMessage());
    299 }
    300 
    301 TEST(LogTest, ShortTrailer) {
    302   const int n = kBlockSize - 2*kHeaderSize + 4;
    303   Write(BigString("foo", n));
    304   ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
    305   Write("");
    306   Write("bar");
    307   ASSERT_EQ(BigString("foo", n), Read());
    308   ASSERT_EQ("", Read());
    309   ASSERT_EQ("bar", Read());
    310   ASSERT_EQ("EOF", Read());
    311 }
    312 
    313 TEST(LogTest, AlignedEof) {
    314   const int n = kBlockSize - 2*kHeaderSize + 4;
    315   Write(BigString("foo", n));
    316   ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
    317   ASSERT_EQ(BigString("foo", n), Read());
    318   ASSERT_EQ("EOF", Read());
    319 }
    320 
    321 TEST(LogTest, RandomRead) {
    322   const int N = 500;
    323   Random write_rnd(301);
    324   for (int i = 0; i < N; i++) {
    325     Write(RandomSkewedString(i, &write_rnd));
    326   }
    327   Random read_rnd(301);
    328   for (int i = 0; i < N; i++) {
    329     ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
    330   }
    331   ASSERT_EQ("EOF", Read());
    332 }
    333 
    334 // Tests of all the error paths in log_reader.cc follow:
    335 
    336 TEST(LogTest, ReadError) {
    337   Write("foo");
    338   ForceError();
    339   ASSERT_EQ("EOF", Read());
    340   ASSERT_EQ(kBlockSize, DroppedBytes());
    341   ASSERT_EQ("OK", MatchError("read error"));
    342 }
    343 
    344 TEST(LogTest, BadRecordType) {
    345   Write("foo");
    346   // Type is stored in header[6]
    347   IncrementByte(6, 100);
    348   FixChecksum(0, 3);
    349   ASSERT_EQ("EOF", Read());
    350   ASSERT_EQ(3, DroppedBytes());
    351   ASSERT_EQ("OK", MatchError("unknown record type"));
    352 }
    353 
    354 TEST(LogTest, TruncatedTrailingRecord) {
    355   Write("foo");
    356   ShrinkSize(4);   // Drop all payload as well as a header byte
    357   ASSERT_EQ("EOF", Read());
    358   ASSERT_EQ(kHeaderSize - 1, DroppedBytes());
    359   ASSERT_EQ("OK", MatchError("truncated record at end of file"));
    360 }
    361 
    362 TEST(LogTest, BadLength) {
    363   Write("foo");
    364   ShrinkSize(1);
    365   ASSERT_EQ("EOF", Read());
    366   ASSERT_EQ(kHeaderSize + 2, DroppedBytes());
    367   ASSERT_EQ("OK", MatchError("bad record length"));
    368 }
    369 
    370 TEST(LogTest, ChecksumMismatch) {
    371   Write("foo");
    372   IncrementByte(0, 10);
    373   ASSERT_EQ("EOF", Read());
    374   ASSERT_EQ(10, DroppedBytes());
    375   ASSERT_EQ("OK", MatchError("checksum mismatch"));
    376 }
    377 
    378 TEST(LogTest, UnexpectedMiddleType) {
    379   Write("foo");
    380   SetByte(6, kMiddleType);
    381   FixChecksum(0, 3);
    382   ASSERT_EQ("EOF", Read());
    383   ASSERT_EQ(3, DroppedBytes());
    384   ASSERT_EQ("OK", MatchError("missing start"));
    385 }
    386 
    387 TEST(LogTest, UnexpectedLastType) {
    388   Write("foo");
    389   SetByte(6, kLastType);
    390   FixChecksum(0, 3);
    391   ASSERT_EQ("EOF", Read());
    392   ASSERT_EQ(3, DroppedBytes());
    393   ASSERT_EQ("OK", MatchError("missing start"));
    394 }
    395 
    396 TEST(LogTest, UnexpectedFullType) {
    397   Write("foo");
    398   Write("bar");
    399   SetByte(6, kFirstType);
    400   FixChecksum(0, 3);
    401   ASSERT_EQ("bar", Read());
    402   ASSERT_EQ("EOF", Read());
    403   ASSERT_EQ(3, DroppedBytes());
    404   ASSERT_EQ("OK", MatchError("partial record without end"));
    405 }
    406 
    407 TEST(LogTest, UnexpectedFirstType) {
    408   Write("foo");
    409   Write(BigString("bar", 100000));
    410   SetByte(6, kFirstType);
    411   FixChecksum(0, 3);
    412   ASSERT_EQ(BigString("bar", 100000), Read());
    413   ASSERT_EQ("EOF", Read());
    414   ASSERT_EQ(3, DroppedBytes());
    415   ASSERT_EQ("OK", MatchError("partial record without end"));
    416 }
    417 
    418 TEST(LogTest, ErrorJoinsRecords) {
    419   // Consider two fragmented records:
    420   //    first(R1) last(R1) first(R2) last(R2)
    421   // where the middle two fragments disappear.  We do not want
    422   // first(R1),last(R2) to get joined and returned as a valid record.
    423 
    424   // Write records that span two blocks
    425   Write(BigString("foo", kBlockSize));
    426   Write(BigString("bar", kBlockSize));
    427   Write("correct");
    428 
    429   // Wipe the middle block
    430   for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
    431     SetByte(offset, 'x');
    432   }
    433 
    434   ASSERT_EQ("correct", Read());
    435   ASSERT_EQ("EOF", Read());
    436   const int dropped = DroppedBytes();
    437   ASSERT_LE(dropped, 2*kBlockSize + 100);
    438   ASSERT_GE(dropped, 2*kBlockSize);
    439 }
    440 
    441 TEST(LogTest, ReadStart) {
    442   CheckInitialOffsetRecord(0, 0);
    443 }
    444 
    445 TEST(LogTest, ReadSecondOneOff) {
    446   CheckInitialOffsetRecord(1, 1);
    447 }
    448 
    449 TEST(LogTest, ReadSecondTenThousand) {
    450   CheckInitialOffsetRecord(10000, 1);
    451 }
    452 
    453 TEST(LogTest, ReadSecondStart) {
    454   CheckInitialOffsetRecord(10007, 1);
    455 }
    456 
    457 TEST(LogTest, ReadThirdOneOff) {
    458   CheckInitialOffsetRecord(10008, 2);
    459 }
    460 
    461 TEST(LogTest, ReadThirdStart) {
    462   CheckInitialOffsetRecord(20014, 2);
    463 }
    464 
    465 TEST(LogTest, ReadFourthOneOff) {
    466   CheckInitialOffsetRecord(20015, 3);
    467 }
    468 
    469 TEST(LogTest, ReadFourthFirstBlockTrailer) {
    470   CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
    471 }
    472 
    473 TEST(LogTest, ReadFourthMiddleBlock) {
    474   CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
    475 }
    476 
    477 TEST(LogTest, ReadFourthLastBlock) {
    478   CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
    479 }
    480 
    481 TEST(LogTest, ReadFourthStart) {
    482   CheckInitialOffsetRecord(
    483       2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
    484       3);
    485 }
    486 
    487 TEST(LogTest, ReadEnd) {
    488   CheckOffsetPastEndReturnsNoRecords(0);
    489 }
    490 
    491 TEST(LogTest, ReadPastEnd) {
    492   CheckOffsetPastEndReturnsNoRecords(5);
    493 }
    494 
    495 }  // namespace log
    496 }  // namespace leveldb
    497 
    498 int main(int argc, char** argv) {
    499   return leveldb::test::RunAllTests();
    500 }
    501