1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include "db/log_reader.h" 6 #include "db/log_writer.h" 7 #include "leveldb/env.h" 8 #include "util/coding.h" 9 #include "util/crc32c.h" 10 #include "util/random.h" 11 #include "util/testharness.h" 12 13 namespace leveldb { 14 namespace log { 15 16 // Construct a string of the specified length made out of the supplied 17 // partial string. 18 static std::string BigString(const std::string& partial_string, size_t n) { 19 std::string result; 20 while (result.size() < n) { 21 result.append(partial_string); 22 } 23 result.resize(n); 24 return result; 25 } 26 27 // Construct a string from a number 28 static std::string NumberString(int n) { 29 char buf[50]; 30 snprintf(buf, sizeof(buf), "%d.", n); 31 return std::string(buf); 32 } 33 34 // Return a skewed potentially long string 35 static std::string RandomSkewedString(int i, Random* rnd) { 36 return BigString(NumberString(i), rnd->Skewed(17)); 37 } 38 39 class LogTest { 40 private: 41 class StringDest : public WritableFile { 42 public: 43 std::string contents_; 44 45 virtual Status Close() { return Status::OK(); } 46 virtual Status Flush() { return Status::OK(); } 47 virtual Status Sync() { return Status::OK(); } 48 virtual Status Append(const Slice& slice) { 49 contents_.append(slice.data(), slice.size()); 50 return Status::OK(); 51 } 52 }; 53 54 class StringSource : public SequentialFile { 55 public: 56 Slice contents_; 57 bool force_error_; 58 bool returned_partial_; 59 StringSource() : force_error_(false), returned_partial_(false) { } 60 61 virtual Status Read(size_t n, Slice* result, char* scratch) { 62 ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error"; 63 64 if (force_error_) { 65 force_error_ = false; 66 returned_partial_ = true; 67 return Status::Corruption("read error"); 68 } 69 70 if (contents_.size() < n) { 71 n = contents_.size(); 72 returned_partial_ = true; 73 } 74 *result = Slice(contents_.data(), n); 75 contents_.remove_prefix(n); 76 return Status::OK(); 77 } 78 79 virtual Status Skip(uint64_t n) { 80 if (n > contents_.size()) { 81 contents_.clear(); 82 return Status::NotFound("in-memory file skipepd past end"); 83 } 84 85 contents_.remove_prefix(n); 86 87 return Status::OK(); 88 } 89 }; 90 91 class ReportCollector : public Reader::Reporter { 92 public: 93 size_t dropped_bytes_; 94 std::string message_; 95 96 ReportCollector() : dropped_bytes_(0) { } 97 virtual void Corruption(size_t bytes, const Status& status) { 98 dropped_bytes_ += bytes; 99 message_.append(status.ToString()); 100 } 101 }; 102 103 StringDest dest_; 104 StringSource source_; 105 ReportCollector report_; 106 bool reading_; 107 Writer writer_; 108 Reader reader_; 109 110 // Record metadata for testing initial offset functionality 111 static size_t initial_offset_record_sizes_[]; 112 static uint64_t initial_offset_last_record_offsets_[]; 113 114 public: 115 LogTest() : reading_(false), 116 writer_(&dest_), 117 reader_(&source_, &report_, true/*checksum*/, 118 0/*initial_offset*/) { 119 } 120 121 void Write(const std::string& msg) { 122 ASSERT_TRUE(!reading_) << "Write() after starting to read"; 123 writer_.AddRecord(Slice(msg)); 124 } 125 126 size_t WrittenBytes() const { 127 return dest_.contents_.size(); 128 } 129 130 std::string Read() { 131 if (!reading_) { 132 reading_ = true; 133 source_.contents_ = Slice(dest_.contents_); 134 } 135 std::string scratch; 136 Slice record; 137 if (reader_.ReadRecord(&record, &scratch)) { 138 return record.ToString(); 139 } else { 140 return "EOF"; 141 } 142 } 143 144 void IncrementByte(int offset, int delta) { 145 dest_.contents_[offset] += delta; 146 } 147 148 void SetByte(int offset, char new_byte) { 149 dest_.contents_[offset] = new_byte; 150 } 151 152 void ShrinkSize(int bytes) { 153 dest_.contents_.resize(dest_.contents_.size() - bytes); 154 } 155 156 void FixChecksum(int header_offset, int len) { 157 // Compute crc of type/len/data 158 uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len); 159 crc = crc32c::Mask(crc); 160 EncodeFixed32(&dest_.contents_[header_offset], crc); 161 } 162 163 void ForceError() { 164 source_.force_error_ = true; 165 } 166 167 size_t DroppedBytes() const { 168 return report_.dropped_bytes_; 169 } 170 171 std::string ReportMessage() const { 172 return report_.message_; 173 } 174 175 // Returns OK iff recorded error message contains "msg" 176 std::string MatchError(const std::string& msg) const { 177 if (report_.message_.find(msg) == std::string::npos) { 178 return report_.message_; 179 } else { 180 return "OK"; 181 } 182 } 183 184 void WriteInitialOffsetLog() { 185 for (int i = 0; i < 4; i++) { 186 std::string record(initial_offset_record_sizes_[i], 187 static_cast<char>('a' + i)); 188 Write(record); 189 } 190 } 191 192 void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { 193 WriteInitialOffsetLog(); 194 reading_ = true; 195 source_.contents_ = Slice(dest_.contents_); 196 Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/, 197 WrittenBytes() + offset_past_end); 198 Slice record; 199 std::string scratch; 200 ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch)); 201 delete offset_reader; 202 } 203 204 void CheckInitialOffsetRecord(uint64_t initial_offset, 205 int expected_record_offset) { 206 WriteInitialOffsetLog(); 207 reading_ = true; 208 source_.contents_ = Slice(dest_.contents_); 209 Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/, 210 initial_offset); 211 Slice record; 212 std::string scratch; 213 ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch)); 214 ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset], 215 record.size()); 216 ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset], 217 offset_reader->LastRecordOffset()); 218 ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]); 219 delete offset_reader; 220 } 221 222 }; 223 224 size_t LogTest::initial_offset_record_sizes_[] = 225 {10000, // Two sizable records in first block 226 10000, 227 2 * log::kBlockSize - 1000, // Span three blocks 228 1}; 229 230 uint64_t LogTest::initial_offset_last_record_offsets_[] = 231 {0, 232 kHeaderSize + 10000, 233 2 * (kHeaderSize + 10000), 234 2 * (kHeaderSize + 10000) + 235 (2 * log::kBlockSize - 1000) + 3 * kHeaderSize}; 236 237 238 TEST(LogTest, Empty) { 239 ASSERT_EQ("EOF", Read()); 240 } 241 242 TEST(LogTest, ReadWrite) { 243 Write("foo"); 244 Write("bar"); 245 Write(""); 246 Write("xxxx"); 247 ASSERT_EQ("foo", Read()); 248 ASSERT_EQ("bar", Read()); 249 ASSERT_EQ("", Read()); 250 ASSERT_EQ("xxxx", Read()); 251 ASSERT_EQ("EOF", Read()); 252 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work 253 } 254 255 TEST(LogTest, ManyBlocks) { 256 for (int i = 0; i < 100000; i++) { 257 Write(NumberString(i)); 258 } 259 for (int i = 0; i < 100000; i++) { 260 ASSERT_EQ(NumberString(i), Read()); 261 } 262 ASSERT_EQ("EOF", Read()); 263 } 264 265 TEST(LogTest, Fragmentation) { 266 Write("small"); 267 Write(BigString("medium", 50000)); 268 Write(BigString("large", 100000)); 269 ASSERT_EQ("small", Read()); 270 ASSERT_EQ(BigString("medium", 50000), Read()); 271 ASSERT_EQ(BigString("large", 100000), Read()); 272 ASSERT_EQ("EOF", Read()); 273 } 274 275 TEST(LogTest, MarginalTrailer) { 276 // Make a trailer that is exactly the same length as an empty record. 277 const int n = kBlockSize - 2*kHeaderSize; 278 Write(BigString("foo", n)); 279 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes()); 280 Write(""); 281 Write("bar"); 282 ASSERT_EQ(BigString("foo", n), Read()); 283 ASSERT_EQ("", Read()); 284 ASSERT_EQ("bar", Read()); 285 ASSERT_EQ("EOF", Read()); 286 } 287 288 TEST(LogTest, MarginalTrailer2) { 289 // Make a trailer that is exactly the same length as an empty record. 290 const int n = kBlockSize - 2*kHeaderSize; 291 Write(BigString("foo", n)); 292 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes()); 293 Write("bar"); 294 ASSERT_EQ(BigString("foo", n), Read()); 295 ASSERT_EQ("bar", Read()); 296 ASSERT_EQ("EOF", Read()); 297 ASSERT_EQ(0, DroppedBytes()); 298 ASSERT_EQ("", ReportMessage()); 299 } 300 301 TEST(LogTest, ShortTrailer) { 302 const int n = kBlockSize - 2*kHeaderSize + 4; 303 Write(BigString("foo", n)); 304 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes()); 305 Write(""); 306 Write("bar"); 307 ASSERT_EQ(BigString("foo", n), Read()); 308 ASSERT_EQ("", Read()); 309 ASSERT_EQ("bar", Read()); 310 ASSERT_EQ("EOF", Read()); 311 } 312 313 TEST(LogTest, AlignedEof) { 314 const int n = kBlockSize - 2*kHeaderSize + 4; 315 Write(BigString("foo", n)); 316 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes()); 317 ASSERT_EQ(BigString("foo", n), Read()); 318 ASSERT_EQ("EOF", Read()); 319 } 320 321 TEST(LogTest, RandomRead) { 322 const int N = 500; 323 Random write_rnd(301); 324 for (int i = 0; i < N; i++) { 325 Write(RandomSkewedString(i, &write_rnd)); 326 } 327 Random read_rnd(301); 328 for (int i = 0; i < N; i++) { 329 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read()); 330 } 331 ASSERT_EQ("EOF", Read()); 332 } 333 334 // Tests of all the error paths in log_reader.cc follow: 335 336 TEST(LogTest, ReadError) { 337 Write("foo"); 338 ForceError(); 339 ASSERT_EQ("EOF", Read()); 340 ASSERT_EQ(kBlockSize, DroppedBytes()); 341 ASSERT_EQ("OK", MatchError("read error")); 342 } 343 344 TEST(LogTest, BadRecordType) { 345 Write("foo"); 346 // Type is stored in header[6] 347 IncrementByte(6, 100); 348 FixChecksum(0, 3); 349 ASSERT_EQ("EOF", Read()); 350 ASSERT_EQ(3, DroppedBytes()); 351 ASSERT_EQ("OK", MatchError("unknown record type")); 352 } 353 354 TEST(LogTest, TruncatedTrailingRecord) { 355 Write("foo"); 356 ShrinkSize(4); // Drop all payload as well as a header byte 357 ASSERT_EQ("EOF", Read()); 358 ASSERT_EQ(kHeaderSize - 1, DroppedBytes()); 359 ASSERT_EQ("OK", MatchError("truncated record at end of file")); 360 } 361 362 TEST(LogTest, BadLength) { 363 Write("foo"); 364 ShrinkSize(1); 365 ASSERT_EQ("EOF", Read()); 366 ASSERT_EQ(kHeaderSize + 2, DroppedBytes()); 367 ASSERT_EQ("OK", MatchError("bad record length")); 368 } 369 370 TEST(LogTest, ChecksumMismatch) { 371 Write("foo"); 372 IncrementByte(0, 10); 373 ASSERT_EQ("EOF", Read()); 374 ASSERT_EQ(10, DroppedBytes()); 375 ASSERT_EQ("OK", MatchError("checksum mismatch")); 376 } 377 378 TEST(LogTest, UnexpectedMiddleType) { 379 Write("foo"); 380 SetByte(6, kMiddleType); 381 FixChecksum(0, 3); 382 ASSERT_EQ("EOF", Read()); 383 ASSERT_EQ(3, DroppedBytes()); 384 ASSERT_EQ("OK", MatchError("missing start")); 385 } 386 387 TEST(LogTest, UnexpectedLastType) { 388 Write("foo"); 389 SetByte(6, kLastType); 390 FixChecksum(0, 3); 391 ASSERT_EQ("EOF", Read()); 392 ASSERT_EQ(3, DroppedBytes()); 393 ASSERT_EQ("OK", MatchError("missing start")); 394 } 395 396 TEST(LogTest, UnexpectedFullType) { 397 Write("foo"); 398 Write("bar"); 399 SetByte(6, kFirstType); 400 FixChecksum(0, 3); 401 ASSERT_EQ("bar", Read()); 402 ASSERT_EQ("EOF", Read()); 403 ASSERT_EQ(3, DroppedBytes()); 404 ASSERT_EQ("OK", MatchError("partial record without end")); 405 } 406 407 TEST(LogTest, UnexpectedFirstType) { 408 Write("foo"); 409 Write(BigString("bar", 100000)); 410 SetByte(6, kFirstType); 411 FixChecksum(0, 3); 412 ASSERT_EQ(BigString("bar", 100000), Read()); 413 ASSERT_EQ("EOF", Read()); 414 ASSERT_EQ(3, DroppedBytes()); 415 ASSERT_EQ("OK", MatchError("partial record without end")); 416 } 417 418 TEST(LogTest, ErrorJoinsRecords) { 419 // Consider two fragmented records: 420 // first(R1) last(R1) first(R2) last(R2) 421 // where the middle two fragments disappear. We do not want 422 // first(R1),last(R2) to get joined and returned as a valid record. 423 424 // Write records that span two blocks 425 Write(BigString("foo", kBlockSize)); 426 Write(BigString("bar", kBlockSize)); 427 Write("correct"); 428 429 // Wipe the middle block 430 for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) { 431 SetByte(offset, 'x'); 432 } 433 434 ASSERT_EQ("correct", Read()); 435 ASSERT_EQ("EOF", Read()); 436 const int dropped = DroppedBytes(); 437 ASSERT_LE(dropped, 2*kBlockSize + 100); 438 ASSERT_GE(dropped, 2*kBlockSize); 439 } 440 441 TEST(LogTest, ReadStart) { 442 CheckInitialOffsetRecord(0, 0); 443 } 444 445 TEST(LogTest, ReadSecondOneOff) { 446 CheckInitialOffsetRecord(1, 1); 447 } 448 449 TEST(LogTest, ReadSecondTenThousand) { 450 CheckInitialOffsetRecord(10000, 1); 451 } 452 453 TEST(LogTest, ReadSecondStart) { 454 CheckInitialOffsetRecord(10007, 1); 455 } 456 457 TEST(LogTest, ReadThirdOneOff) { 458 CheckInitialOffsetRecord(10008, 2); 459 } 460 461 TEST(LogTest, ReadThirdStart) { 462 CheckInitialOffsetRecord(20014, 2); 463 } 464 465 TEST(LogTest, ReadFourthOneOff) { 466 CheckInitialOffsetRecord(20015, 3); 467 } 468 469 TEST(LogTest, ReadFourthFirstBlockTrailer) { 470 CheckInitialOffsetRecord(log::kBlockSize - 4, 3); 471 } 472 473 TEST(LogTest, ReadFourthMiddleBlock) { 474 CheckInitialOffsetRecord(log::kBlockSize + 1, 3); 475 } 476 477 TEST(LogTest, ReadFourthLastBlock) { 478 CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3); 479 } 480 481 TEST(LogTest, ReadFourthStart) { 482 CheckInitialOffsetRecord( 483 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize, 484 3); 485 } 486 487 TEST(LogTest, ReadEnd) { 488 CheckOffsetPastEndReturnsNoRecords(0); 489 } 490 491 TEST(LogTest, ReadPastEnd) { 492 CheckOffsetPastEndReturnsNoRecords(5); 493 } 494 495 } // namespace log 496 } // namespace leveldb 497 498 int main(int argc, char** argv) { 499 return leveldb::test::RunAllTests(); 500 } 501