1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include <deque> 6 #include <set> 7 #include <dirent.h> 8 #include <errno.h> 9 #include <fcntl.h> 10 #include <pthread.h> 11 #include <stdio.h> 12 #include <stdlib.h> 13 #include <string.h> 14 #include <sys/mman.h> 15 #include <sys/stat.h> 16 #include <sys/time.h> 17 #include <sys/types.h> 18 #include <time.h> 19 #include <unistd.h> 20 #if defined(LEVELDB_PLATFORM_ANDROID) 21 #include <sys/stat.h> 22 #endif 23 #include "leveldb/env.h" 24 #include "leveldb/slice.h" 25 #include "port/port.h" 26 #include "util/logging.h" 27 #include "util/mutexlock.h" 28 #include "util/posix_logger.h" 29 30 namespace leveldb { 31 32 namespace { 33 34 static Status IOError(const std::string& context, int err_number) { 35 return Status::IOError(context, strerror(err_number)); 36 } 37 38 class PosixSequentialFile: public SequentialFile { 39 private: 40 std::string filename_; 41 FILE* file_; 42 43 public: 44 PosixSequentialFile(const std::string& fname, FILE* f) 45 : filename_(fname), file_(f) { } 46 virtual ~PosixSequentialFile() { fclose(file_); } 47 48 virtual Status Read(size_t n, Slice* result, char* scratch) { 49 Status s; 50 size_t r = fread_unlocked(scratch, 1, n, file_); 51 *result = Slice(scratch, r); 52 if (r < n) { 53 if (feof(file_)) { 54 // We leave status as ok if we hit the end of the file 55 } else { 56 // A partial read with an error: return a non-ok status 57 s = IOError(filename_, errno); 58 } 59 } 60 return s; 61 } 62 63 virtual Status Skip(uint64_t n) { 64 if (fseek(file_, n, SEEK_CUR)) { 65 return IOError(filename_, errno); 66 } 67 return Status::OK(); 68 } 69 }; 70 71 // pread() based random-access 72 class PosixRandomAccessFile: public RandomAccessFile { 73 private: 74 std::string filename_; 75 int fd_; 76 77 public: 78 PosixRandomAccessFile(const std::string& fname, int fd) 79 : filename_(fname), fd_(fd) { } 80 virtual ~PosixRandomAccessFile() { close(fd_); } 81 82 virtual Status Read(uint64_t offset, size_t n, Slice* result, 83 char* scratch) const { 84 Status s; 85 ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset)); 86 *result = Slice(scratch, (r < 0) ? 0 : r); 87 if (r < 0) { 88 // An error: return a non-ok status 89 s = IOError(filename_, errno); 90 } 91 return s; 92 } 93 }; 94 95 // Helper class to limit mmap file usage so that we do not end up 96 // running out virtual memory or running into kernel performance 97 // problems for very large databases. 98 class MmapLimiter { 99 public: 100 // Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes. 101 MmapLimiter() { 102 SetAllowed(sizeof(void*) >= 8 ? 1000 : 0); 103 } 104 105 // If another mmap slot is available, acquire it and return true. 106 // Else return false. 107 bool Acquire() { 108 if (GetAllowed() <= 0) { 109 return false; 110 } 111 MutexLock l(&mu_); 112 intptr_t x = GetAllowed(); 113 if (x <= 0) { 114 return false; 115 } else { 116 SetAllowed(x - 1); 117 return true; 118 } 119 } 120 121 // Release a slot acquired by a previous call to Acquire() that returned true. 122 void Release() { 123 MutexLock l(&mu_); 124 SetAllowed(GetAllowed() + 1); 125 } 126 127 private: 128 port::Mutex mu_; 129 port::AtomicPointer allowed_; 130 131 intptr_t GetAllowed() const { 132 return reinterpret_cast<intptr_t>(allowed_.Acquire_Load()); 133 } 134 135 // REQUIRES: mu_ must be held 136 void SetAllowed(intptr_t v) { 137 allowed_.Release_Store(reinterpret_cast<void*>(v)); 138 } 139 140 MmapLimiter(const MmapLimiter&); 141 void operator=(const MmapLimiter&); 142 }; 143 144 // mmap() based random-access 145 class PosixMmapReadableFile: public RandomAccessFile { 146 private: 147 std::string filename_; 148 void* mmapped_region_; 149 size_t length_; 150 MmapLimiter* limiter_; 151 152 public: 153 // base[0,length-1] contains the mmapped contents of the file. 154 PosixMmapReadableFile(const std::string& fname, void* base, size_t length, 155 MmapLimiter* limiter) 156 : filename_(fname), mmapped_region_(base), length_(length), 157 limiter_(limiter) { 158 } 159 160 virtual ~PosixMmapReadableFile() { 161 munmap(mmapped_region_, length_); 162 limiter_->Release(); 163 } 164 165 virtual Status Read(uint64_t offset, size_t n, Slice* result, 166 char* scratch) const { 167 Status s; 168 if (offset + n > length_) { 169 *result = Slice(); 170 s = IOError(filename_, EINVAL); 171 } else { 172 *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n); 173 } 174 return s; 175 } 176 }; 177 178 // We preallocate up to an extra megabyte and use memcpy to append new 179 // data to the file. This is safe since we either properly close the 180 // file before reading from it, or for log files, the reading code 181 // knows enough to skip zero suffixes. 182 class PosixMmapFile : public WritableFile { 183 private: 184 std::string filename_; 185 int fd_; 186 size_t page_size_; 187 size_t map_size_; // How much extra memory to map at a time 188 char* base_; // The mapped region 189 char* limit_; // Limit of the mapped region 190 char* dst_; // Where to write next (in range [base_,limit_]) 191 char* last_sync_; // Where have we synced up to 192 uint64_t file_offset_; // Offset of base_ in file 193 194 // Have we done an munmap of unsynced data? 195 bool pending_sync_; 196 197 // Roundup x to a multiple of y 198 static size_t Roundup(size_t x, size_t y) { 199 return ((x + y - 1) / y) * y; 200 } 201 202 size_t TruncateToPageBoundary(size_t s) { 203 s -= (s & (page_size_ - 1)); 204 assert((s % page_size_) == 0); 205 return s; 206 } 207 208 bool UnmapCurrentRegion() { 209 bool result = true; 210 if (base_ != NULL) { 211 if (last_sync_ < limit_) { 212 // Defer syncing this data until next Sync() call, if any 213 pending_sync_ = true; 214 } 215 if (munmap(base_, limit_ - base_) != 0) { 216 result = false; 217 } 218 file_offset_ += limit_ - base_; 219 base_ = NULL; 220 limit_ = NULL; 221 last_sync_ = NULL; 222 dst_ = NULL; 223 224 // Increase the amount we map the next time, but capped at 1MB 225 if (map_size_ < (1<<20)) { 226 map_size_ *= 2; 227 } 228 } 229 return result; 230 } 231 232 bool MapNewRegion() { 233 assert(base_ == NULL); 234 if (ftruncate(fd_, file_offset_ + map_size_) < 0) { 235 return false; 236 } 237 void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, 238 fd_, file_offset_); 239 if (ptr == MAP_FAILED) { 240 return false; 241 } 242 base_ = reinterpret_cast<char*>(ptr); 243 limit_ = base_ + map_size_; 244 dst_ = base_; 245 last_sync_ = base_; 246 return true; 247 } 248 249 public: 250 PosixMmapFile(const std::string& fname, int fd, size_t page_size) 251 : filename_(fname), 252 fd_(fd), 253 page_size_(page_size), 254 map_size_(Roundup(65536, page_size)), 255 base_(NULL), 256 limit_(NULL), 257 dst_(NULL), 258 last_sync_(NULL), 259 file_offset_(0), 260 pending_sync_(false) { 261 assert((page_size & (page_size - 1)) == 0); 262 } 263 264 265 ~PosixMmapFile() { 266 if (fd_ >= 0) { 267 PosixMmapFile::Close(); 268 } 269 } 270 271 virtual Status Append(const Slice& data) { 272 const char* src = data.data(); 273 size_t left = data.size(); 274 while (left > 0) { 275 assert(base_ <= dst_); 276 assert(dst_ <= limit_); 277 size_t avail = limit_ - dst_; 278 if (avail == 0) { 279 if (!UnmapCurrentRegion() || 280 !MapNewRegion()) { 281 return IOError(filename_, errno); 282 } 283 } 284 285 size_t n = (left <= avail) ? left : avail; 286 memcpy(dst_, src, n); 287 dst_ += n; 288 src += n; 289 left -= n; 290 } 291 return Status::OK(); 292 } 293 294 virtual Status Close() { 295 Status s; 296 size_t unused = limit_ - dst_; 297 if (!UnmapCurrentRegion()) { 298 s = IOError(filename_, errno); 299 } else if (unused > 0) { 300 // Trim the extra space at the end of the file 301 if (ftruncate(fd_, file_offset_ - unused) < 0) { 302 s = IOError(filename_, errno); 303 } 304 } 305 306 if (close(fd_) < 0) { 307 if (s.ok()) { 308 s = IOError(filename_, errno); 309 } 310 } 311 312 fd_ = -1; 313 base_ = NULL; 314 limit_ = NULL; 315 return s; 316 } 317 318 virtual Status Flush() { 319 return Status::OK(); 320 } 321 322 virtual Status Sync() { 323 Status s; 324 325 if (pending_sync_) { 326 // Some unmapped data was not synced 327 pending_sync_ = false; 328 if (fdatasync(fd_) < 0) { 329 s = IOError(filename_, errno); 330 } 331 } 332 333 if (dst_ > last_sync_) { 334 // Find the beginnings of the pages that contain the first and last 335 // bytes to be synced. 336 size_t p1 = TruncateToPageBoundary(last_sync_ - base_); 337 size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1); 338 last_sync_ = dst_; 339 if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) { 340 s = IOError(filename_, errno); 341 } 342 } 343 344 return s; 345 } 346 }; 347 348 static int LockOrUnlock(int fd, bool lock) { 349 errno = 0; 350 struct flock f; 351 memset(&f, 0, sizeof(f)); 352 f.l_type = (lock ? F_WRLCK : F_UNLCK); 353 f.l_whence = SEEK_SET; 354 f.l_start = 0; 355 f.l_len = 0; // Lock/unlock entire file 356 return fcntl(fd, F_SETLK, &f); 357 } 358 359 class PosixFileLock : public FileLock { 360 public: 361 int fd_; 362 std::string name_; 363 }; 364 365 // Set of locked files. We keep a separate set instead of just 366 // relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide 367 // any protection against multiple uses from the same process. 368 class PosixLockTable { 369 private: 370 port::Mutex mu_; 371 std::set<std::string> locked_files_; 372 public: 373 bool Insert(const std::string& fname) { 374 MutexLock l(&mu_); 375 return locked_files_.insert(fname).second; 376 } 377 void Remove(const std::string& fname) { 378 MutexLock l(&mu_); 379 locked_files_.erase(fname); 380 } 381 }; 382 383 class PosixEnv : public Env { 384 public: 385 PosixEnv(); 386 virtual ~PosixEnv() { 387 fprintf(stderr, "Destroying Env::Default()\n"); 388 abort(); 389 } 390 391 virtual Status NewSequentialFile(const std::string& fname, 392 SequentialFile** result) { 393 FILE* f = fopen(fname.c_str(), "r"); 394 if (f == NULL) { 395 *result = NULL; 396 return IOError(fname, errno); 397 } else { 398 *result = new PosixSequentialFile(fname, f); 399 return Status::OK(); 400 } 401 } 402 403 virtual Status NewRandomAccessFile(const std::string& fname, 404 RandomAccessFile** result) { 405 *result = NULL; 406 Status s; 407 int fd = open(fname.c_str(), O_RDONLY); 408 if (fd < 0) { 409 s = IOError(fname, errno); 410 } else if (mmap_limit_.Acquire()) { 411 uint64_t size; 412 s = GetFileSize(fname, &size); 413 if (s.ok()) { 414 void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0); 415 if (base != MAP_FAILED) { 416 *result = new PosixMmapReadableFile(fname, base, size, &mmap_limit_); 417 } else { 418 s = IOError(fname, errno); 419 } 420 } 421 close(fd); 422 if (!s.ok()) { 423 mmap_limit_.Release(); 424 } 425 } else { 426 *result = new PosixRandomAccessFile(fname, fd); 427 } 428 return s; 429 } 430 431 virtual Status NewWritableFile(const std::string& fname, 432 WritableFile** result) { 433 Status s; 434 const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); 435 if (fd < 0) { 436 *result = NULL; 437 s = IOError(fname, errno); 438 } else { 439 *result = new PosixMmapFile(fname, fd, page_size_); 440 } 441 return s; 442 } 443 444 virtual bool FileExists(const std::string& fname) { 445 return access(fname.c_str(), F_OK) == 0; 446 } 447 448 virtual Status GetChildren(const std::string& dir, 449 std::vector<std::string>* result) { 450 result->clear(); 451 DIR* d = opendir(dir.c_str()); 452 if (d == NULL) { 453 return IOError(dir, errno); 454 } 455 struct dirent* entry; 456 while ((entry = readdir(d)) != NULL) { 457 result->push_back(entry->d_name); 458 } 459 closedir(d); 460 return Status::OK(); 461 } 462 463 virtual Status DeleteFile(const std::string& fname) { 464 Status result; 465 if (unlink(fname.c_str()) != 0) { 466 result = IOError(fname, errno); 467 } 468 return result; 469 } 470 471 virtual Status CreateDir(const std::string& name) { 472 Status result; 473 if (mkdir(name.c_str(), 0755) != 0) { 474 result = IOError(name, errno); 475 } 476 return result; 477 } 478 479 virtual Status DeleteDir(const std::string& name) { 480 Status result; 481 if (rmdir(name.c_str()) != 0) { 482 result = IOError(name, errno); 483 } 484 return result; 485 } 486 487 virtual Status GetFileSize(const std::string& fname, uint64_t* size) { 488 Status s; 489 struct stat sbuf; 490 if (stat(fname.c_str(), &sbuf) != 0) { 491 *size = 0; 492 s = IOError(fname, errno); 493 } else { 494 *size = sbuf.st_size; 495 } 496 return s; 497 } 498 499 virtual Status RenameFile(const std::string& src, const std::string& target) { 500 Status result; 501 if (rename(src.c_str(), target.c_str()) != 0) { 502 result = IOError(src, errno); 503 } 504 return result; 505 } 506 507 virtual Status LockFile(const std::string& fname, FileLock** lock) { 508 *lock = NULL; 509 Status result; 510 int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644); 511 if (fd < 0) { 512 result = IOError(fname, errno); 513 } else if (!locks_.Insert(fname)) { 514 close(fd); 515 result = Status::IOError("lock " + fname, "already held by process"); 516 } else if (LockOrUnlock(fd, true) == -1) { 517 result = IOError("lock " + fname, errno); 518 close(fd); 519 locks_.Remove(fname); 520 } else { 521 PosixFileLock* my_lock = new PosixFileLock; 522 my_lock->fd_ = fd; 523 my_lock->name_ = fname; 524 *lock = my_lock; 525 } 526 return result; 527 } 528 529 virtual Status UnlockFile(FileLock* lock) { 530 PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock); 531 Status result; 532 if (LockOrUnlock(my_lock->fd_, false) == -1) { 533 result = IOError("unlock", errno); 534 } 535 locks_.Remove(my_lock->name_); 536 close(my_lock->fd_); 537 delete my_lock; 538 return result; 539 } 540 541 virtual void Schedule(void (*function)(void*), void* arg); 542 543 virtual void StartThread(void (*function)(void* arg), void* arg); 544 545 virtual Status GetTestDirectory(std::string* result) { 546 const char* env = getenv("TEST_TMPDIR"); 547 if (env && env[0] != '\0') { 548 *result = env; 549 } else { 550 char buf[100]; 551 snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid())); 552 *result = buf; 553 } 554 // Directory may already exist 555 CreateDir(*result); 556 return Status::OK(); 557 } 558 559 static uint64_t gettid() { 560 pthread_t tid = pthread_self(); 561 uint64_t thread_id = 0; 562 memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); 563 return thread_id; 564 } 565 566 virtual Status NewLogger(const std::string& fname, Logger** result) { 567 FILE* f = fopen(fname.c_str(), "w"); 568 if (f == NULL) { 569 *result = NULL; 570 return IOError(fname, errno); 571 } else { 572 *result = new PosixLogger(f, &PosixEnv::gettid); 573 return Status::OK(); 574 } 575 } 576 577 virtual uint64_t NowMicros() { 578 struct timeval tv; 579 gettimeofday(&tv, NULL); 580 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec; 581 } 582 583 virtual void SleepForMicroseconds(int micros) { 584 usleep(micros); 585 } 586 587 private: 588 void PthreadCall(const char* label, int result) { 589 if (result != 0) { 590 fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); 591 abort(); 592 } 593 } 594 595 // BGThread() is the body of the background thread 596 void BGThread(); 597 static void* BGThreadWrapper(void* arg) { 598 reinterpret_cast<PosixEnv*>(arg)->BGThread(); 599 return NULL; 600 } 601 602 size_t page_size_; 603 pthread_mutex_t mu_; 604 pthread_cond_t bgsignal_; 605 pthread_t bgthread_; 606 bool started_bgthread_; 607 608 // Entry per Schedule() call 609 struct BGItem { void* arg; void (*function)(void*); }; 610 typedef std::deque<BGItem> BGQueue; 611 BGQueue queue_; 612 613 PosixLockTable locks_; 614 MmapLimiter mmap_limit_; 615 }; 616 617 PosixEnv::PosixEnv() : page_size_(getpagesize()), 618 started_bgthread_(false) { 619 PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL)); 620 PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL)); 621 } 622 623 void PosixEnv::Schedule(void (*function)(void*), void* arg) { 624 PthreadCall("lock", pthread_mutex_lock(&mu_)); 625 626 // Start background thread if necessary 627 if (!started_bgthread_) { 628 started_bgthread_ = true; 629 PthreadCall( 630 "create thread", 631 pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this)); 632 } 633 634 // If the queue is currently empty, the background thread may currently be 635 // waiting. 636 if (queue_.empty()) { 637 PthreadCall("signal", pthread_cond_signal(&bgsignal_)); 638 } 639 640 // Add to priority queue 641 queue_.push_back(BGItem()); 642 queue_.back().function = function; 643 queue_.back().arg = arg; 644 645 PthreadCall("unlock", pthread_mutex_unlock(&mu_)); 646 } 647 648 void PosixEnv::BGThread() { 649 while (true) { 650 // Wait until there is an item that is ready to run 651 PthreadCall("lock", pthread_mutex_lock(&mu_)); 652 while (queue_.empty()) { 653 PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); 654 } 655 656 void (*function)(void*) = queue_.front().function; 657 void* arg = queue_.front().arg; 658 queue_.pop_front(); 659 660 PthreadCall("unlock", pthread_mutex_unlock(&mu_)); 661 (*function)(arg); 662 } 663 } 664 665 namespace { 666 struct StartThreadState { 667 void (*user_function)(void*); 668 void* arg; 669 }; 670 } 671 static void* StartThreadWrapper(void* arg) { 672 StartThreadState* state = reinterpret_cast<StartThreadState*>(arg); 673 state->user_function(state->arg); 674 delete state; 675 return NULL; 676 } 677 678 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { 679 pthread_t t; 680 StartThreadState* state = new StartThreadState; 681 state->user_function = function; 682 state->arg = arg; 683 PthreadCall("start thread", 684 pthread_create(&t, NULL, &StartThreadWrapper, state)); 685 } 686 687 } // namespace 688 689 static pthread_once_t once = PTHREAD_ONCE_INIT; 690 static Env* default_env; 691 static void InitDefaultEnv() { default_env = new PosixEnv; } 692 693 Env* Env::Default() { 694 pthread_once(&once, InitDefaultEnv); 695 return default_env; 696 } 697 698 } // namespace leveldb 699