1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include <deque> 6 #include <set> 7 #include <dirent.h> 8 #include <errno.h> 9 #include <fcntl.h> 10 #include <pthread.h> 11 #include <stdio.h> 12 #include <stdlib.h> 13 #include <string.h> 14 #include <sys/mman.h> 15 #include <sys/stat.h> 16 #include <sys/time.h> 17 #include <sys/types.h> 18 #include <time.h> 19 #include <unistd.h> 20 #if defined(LEVELDB_PLATFORM_ANDROID) 21 #include <sys/stat.h> 22 #endif 23 #include "leveldb/env.h" 24 #include "leveldb/slice.h" 25 #include "port/port.h" 26 #include "util/logging.h" 27 #include "util/mutexlock.h" 28 #include "util/posix_logger.h" 29 30 namespace leveldb { 31 32 namespace { 33 34 static Status IOError(const std::string& context, int err_number) { 35 return Status::IOError(context, strerror(err_number)); 36 } 37 38 class PosixSequentialFile: public SequentialFile { 39 private: 40 std::string filename_; 41 FILE* file_; 42 43 public: 44 PosixSequentialFile(const std::string& fname, FILE* f) 45 : filename_(fname), file_(f) { } 46 virtual ~PosixSequentialFile() { fclose(file_); } 47 48 virtual Status Read(size_t n, Slice* result, char* scratch) { 49 Status s; 50 size_t r = fread_unlocked(scratch, 1, n, file_); 51 *result = Slice(scratch, r); 52 if (r < n) { 53 if (feof(file_)) { 54 // We leave status as ok if we hit the end of the file 55 } else { 56 // A partial read with an error: return a non-ok status 57 s = IOError(filename_, errno); 58 } 59 } 60 return s; 61 } 62 63 virtual Status Skip(uint64_t n) { 64 if (fseek(file_, n, SEEK_CUR)) { 65 return IOError(filename_, errno); 66 } 67 return Status::OK(); 68 } 69 }; 70 71 // pread() based random-access 72 class PosixRandomAccessFile: public RandomAccessFile { 73 private: 74 std::string filename_; 75 int fd_; 76 77 public: 78 PosixRandomAccessFile(const std::string& fname, int fd) 79 : filename_(fname), fd_(fd) { } 80 virtual ~PosixRandomAccessFile() { close(fd_); } 81 82 virtual Status Read(uint64_t offset, size_t n, Slice* result, 83 char* scratch) const { 84 Status s; 85 ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset)); 86 *result = Slice(scratch, (r < 0) ? 0 : r); 87 if (r < 0) { 88 // An error: return a non-ok status 89 s = IOError(filename_, errno); 90 } 91 return s; 92 } 93 }; 94 95 // Helper class to limit mmap file usage so that we do not end up 96 // running out virtual memory or running into kernel performance 97 // problems for very large databases. 98 class MmapLimiter { 99 public: 100 // Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes. 101 MmapLimiter() { 102 SetAllowed(sizeof(void*) >= 8 ? 1000 : 0); 103 } 104 105 // If another mmap slot is available, acquire it and return true. 106 // Else return false. 107 bool Acquire() { 108 if (GetAllowed() <= 0) { 109 return false; 110 } 111 MutexLock l(&mu_); 112 intptr_t x = GetAllowed(); 113 if (x <= 0) { 114 return false; 115 } else { 116 SetAllowed(x - 1); 117 return true; 118 } 119 } 120 121 // Release a slot acquired by a previous call to Acquire() that returned true. 122 void Release() { 123 MutexLock l(&mu_); 124 SetAllowed(GetAllowed() + 1); 125 } 126 127 private: 128 port::Mutex mu_; 129 port::AtomicPointer allowed_; 130 131 intptr_t GetAllowed() const { 132 return reinterpret_cast<intptr_t>(allowed_.Acquire_Load()); 133 } 134 135 // REQUIRES: mu_ must be held 136 void SetAllowed(intptr_t v) { 137 allowed_.Release_Store(reinterpret_cast<void*>(v)); 138 } 139 140 MmapLimiter(const MmapLimiter&); 141 void operator=(const MmapLimiter&); 142 }; 143 144 // mmap() based random-access 145 class PosixMmapReadableFile: public RandomAccessFile { 146 private: 147 std::string filename_; 148 void* mmapped_region_; 149 size_t length_; 150 MmapLimiter* limiter_; 151 152 public: 153 // base[0,length-1] contains the mmapped contents of the file. 154 PosixMmapReadableFile(const std::string& fname, void* base, size_t length, 155 MmapLimiter* limiter) 156 : filename_(fname), mmapped_region_(base), length_(length), 157 limiter_(limiter) { 158 } 159 160 virtual ~PosixMmapReadableFile() { 161 munmap(mmapped_region_, length_); 162 limiter_->Release(); 163 } 164 165 virtual Status Read(uint64_t offset, size_t n, Slice* result, 166 char* scratch) const { 167 Status s; 168 if (offset + n > length_) { 169 *result = Slice(); 170 s = IOError(filename_, EINVAL); 171 } else { 172 *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n); 173 } 174 return s; 175 } 176 }; 177 178 class PosixWritableFile : public WritableFile { 179 private: 180 std::string filename_; 181 FILE* file_; 182 183 public: 184 PosixWritableFile(const std::string& fname, FILE* f) 185 : filename_(fname), file_(f) { } 186 187 ~PosixWritableFile() { 188 if (file_ != NULL) { 189 // Ignoring any potential errors 190 fclose(file_); 191 } 192 } 193 194 virtual Status Append(const Slice& data) { 195 size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_); 196 if (r != data.size()) { 197 return IOError(filename_, errno); 198 } 199 return Status::OK(); 200 } 201 202 virtual Status Close() { 203 Status result; 204 if (fclose(file_) != 0) { 205 result = IOError(filename_, errno); 206 } 207 file_ = NULL; 208 return result; 209 } 210 211 virtual Status Flush() { 212 if (fflush_unlocked(file_) != 0) { 213 return IOError(filename_, errno); 214 } 215 return Status::OK(); 216 } 217 218 Status SyncDirIfManifest() { 219 const char* f = filename_.c_str(); 220 const char* sep = strrchr(f, '/'); 221 Slice basename; 222 std::string dir; 223 if (sep == NULL) { 224 dir = "."; 225 basename = f; 226 } else { 227 dir = std::string(f, sep - f); 228 basename = sep + 1; 229 } 230 Status s; 231 if (basename.starts_with("MANIFEST")) { 232 int fd = open(dir.c_str(), O_RDONLY); 233 if (fd < 0) { 234 s = IOError(dir, errno); 235 } else { 236 if (fsync(fd) < 0) { 237 s = IOError(dir, errno); 238 } 239 close(fd); 240 } 241 } 242 return s; 243 } 244 245 virtual Status Sync() { 246 // Ensure new files referred to by the manifest are in the filesystem. 247 Status s = SyncDirIfManifest(); 248 if (!s.ok()) { 249 return s; 250 } 251 if (fflush_unlocked(file_) != 0 || 252 fdatasync(fileno(file_)) != 0) { 253 s = Status::IOError(filename_, strerror(errno)); 254 } 255 return s; 256 } 257 }; 258 259 static int LockOrUnlock(int fd, bool lock) { 260 errno = 0; 261 struct flock f; 262 memset(&f, 0, sizeof(f)); 263 f.l_type = (lock ? F_WRLCK : F_UNLCK); 264 f.l_whence = SEEK_SET; 265 f.l_start = 0; 266 f.l_len = 0; // Lock/unlock entire file 267 return fcntl(fd, F_SETLK, &f); 268 } 269 270 class PosixFileLock : public FileLock { 271 public: 272 int fd_; 273 std::string name_; 274 }; 275 276 // Set of locked files. We keep a separate set instead of just 277 // relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide 278 // any protection against multiple uses from the same process. 279 class PosixLockTable { 280 private: 281 port::Mutex mu_; 282 std::set<std::string> locked_files_; 283 public: 284 bool Insert(const std::string& fname) { 285 MutexLock l(&mu_); 286 return locked_files_.insert(fname).second; 287 } 288 void Remove(const std::string& fname) { 289 MutexLock l(&mu_); 290 locked_files_.erase(fname); 291 } 292 }; 293 294 class PosixEnv : public Env { 295 public: 296 PosixEnv(); 297 virtual ~PosixEnv() { 298 fprintf(stderr, "Destroying Env::Default()\n"); 299 abort(); 300 } 301 302 virtual Status NewSequentialFile(const std::string& fname, 303 SequentialFile** result) { 304 FILE* f = fopen(fname.c_str(), "r"); 305 if (f == NULL) { 306 *result = NULL; 307 return IOError(fname, errno); 308 } else { 309 *result = new PosixSequentialFile(fname, f); 310 return Status::OK(); 311 } 312 } 313 314 virtual Status NewRandomAccessFile(const std::string& fname, 315 RandomAccessFile** result) { 316 *result = NULL; 317 Status s; 318 int fd = open(fname.c_str(), O_RDONLY); 319 if (fd < 0) { 320 s = IOError(fname, errno); 321 } else if (mmap_limit_.Acquire()) { 322 uint64_t size; 323 s = GetFileSize(fname, &size); 324 if (s.ok()) { 325 void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0); 326 if (base != MAP_FAILED) { 327 *result = new PosixMmapReadableFile(fname, base, size, &mmap_limit_); 328 } else { 329 s = IOError(fname, errno); 330 } 331 } 332 close(fd); 333 if (!s.ok()) { 334 mmap_limit_.Release(); 335 } 336 } else { 337 *result = new PosixRandomAccessFile(fname, fd); 338 } 339 return s; 340 } 341 342 virtual Status NewWritableFile(const std::string& fname, 343 WritableFile** result) { 344 Status s; 345 FILE* f = fopen(fname.c_str(), "w"); 346 if (f == NULL) { 347 *result = NULL; 348 s = IOError(fname, errno); 349 } else { 350 *result = new PosixWritableFile(fname, f); 351 } 352 return s; 353 } 354 355 virtual bool FileExists(const std::string& fname) { 356 return access(fname.c_str(), F_OK) == 0; 357 } 358 359 virtual Status GetChildren(const std::string& dir, 360 std::vector<std::string>* result) { 361 result->clear(); 362 DIR* d = opendir(dir.c_str()); 363 if (d == NULL) { 364 return IOError(dir, errno); 365 } 366 struct dirent* entry; 367 while ((entry = readdir(d)) != NULL) { 368 result->push_back(entry->d_name); 369 } 370 closedir(d); 371 return Status::OK(); 372 } 373 374 virtual Status DeleteFile(const std::string& fname) { 375 Status result; 376 if (unlink(fname.c_str()) != 0) { 377 result = IOError(fname, errno); 378 } 379 return result; 380 } 381 382 virtual Status CreateDir(const std::string& name) { 383 Status result; 384 if (mkdir(name.c_str(), 0755) != 0) { 385 result = IOError(name, errno); 386 } 387 return result; 388 } 389 390 virtual Status DeleteDir(const std::string& name) { 391 Status result; 392 if (rmdir(name.c_str()) != 0) { 393 result = IOError(name, errno); 394 } 395 return result; 396 } 397 398 virtual Status GetFileSize(const std::string& fname, uint64_t* size) { 399 Status s; 400 struct stat sbuf; 401 if (stat(fname.c_str(), &sbuf) != 0) { 402 *size = 0; 403 s = IOError(fname, errno); 404 } else { 405 *size = sbuf.st_size; 406 } 407 return s; 408 } 409 410 virtual Status RenameFile(const std::string& src, const std::string& target) { 411 Status result; 412 if (rename(src.c_str(), target.c_str()) != 0) { 413 result = IOError(src, errno); 414 } 415 return result; 416 } 417 418 virtual Status LockFile(const std::string& fname, FileLock** lock) { 419 *lock = NULL; 420 Status result; 421 int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644); 422 if (fd < 0) { 423 result = IOError(fname, errno); 424 } else if (!locks_.Insert(fname)) { 425 close(fd); 426 result = Status::IOError("lock " + fname, "already held by process"); 427 } else if (LockOrUnlock(fd, true) == -1) { 428 result = IOError("lock " + fname, errno); 429 close(fd); 430 locks_.Remove(fname); 431 } else { 432 PosixFileLock* my_lock = new PosixFileLock; 433 my_lock->fd_ = fd; 434 my_lock->name_ = fname; 435 *lock = my_lock; 436 } 437 return result; 438 } 439 440 virtual Status UnlockFile(FileLock* lock) { 441 PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock); 442 Status result; 443 if (LockOrUnlock(my_lock->fd_, false) == -1) { 444 result = IOError("unlock", errno); 445 } 446 locks_.Remove(my_lock->name_); 447 close(my_lock->fd_); 448 delete my_lock; 449 return result; 450 } 451 452 virtual void Schedule(void (*function)(void*), void* arg); 453 454 virtual void StartThread(void (*function)(void* arg), void* arg); 455 456 virtual Status GetTestDirectory(std::string* result) { 457 const char* env = getenv("TEST_TMPDIR"); 458 if (env && env[0] != '\0') { 459 *result = env; 460 } else { 461 char buf[100]; 462 snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid())); 463 *result = buf; 464 } 465 // Directory may already exist 466 CreateDir(*result); 467 return Status::OK(); 468 } 469 470 static uint64_t gettid() { 471 pthread_t tid = pthread_self(); 472 uint64_t thread_id = 0; 473 memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); 474 return thread_id; 475 } 476 477 virtual Status NewLogger(const std::string& fname, Logger** result) { 478 FILE* f = fopen(fname.c_str(), "w"); 479 if (f == NULL) { 480 *result = NULL; 481 return IOError(fname, errno); 482 } else { 483 *result = new PosixLogger(f, &PosixEnv::gettid); 484 return Status::OK(); 485 } 486 } 487 488 virtual uint64_t NowMicros() { 489 struct timeval tv; 490 gettimeofday(&tv, NULL); 491 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec; 492 } 493 494 virtual void SleepForMicroseconds(int micros) { 495 usleep(micros); 496 } 497 498 private: 499 void PthreadCall(const char* label, int result) { 500 if (result != 0) { 501 fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); 502 abort(); 503 } 504 } 505 506 // BGThread() is the body of the background thread 507 void BGThread(); 508 static void* BGThreadWrapper(void* arg) { 509 reinterpret_cast<PosixEnv*>(arg)->BGThread(); 510 return NULL; 511 } 512 513 pthread_mutex_t mu_; 514 pthread_cond_t bgsignal_; 515 pthread_t bgthread_; 516 bool started_bgthread_; 517 518 // Entry per Schedule() call 519 struct BGItem { void* arg; void (*function)(void*); }; 520 typedef std::deque<BGItem> BGQueue; 521 BGQueue queue_; 522 523 PosixLockTable locks_; 524 MmapLimiter mmap_limit_; 525 }; 526 527 PosixEnv::PosixEnv() : started_bgthread_(false) { 528 PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL)); 529 PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL)); 530 } 531 532 void PosixEnv::Schedule(void (*function)(void*), void* arg) { 533 PthreadCall("lock", pthread_mutex_lock(&mu_)); 534 535 // Start background thread if necessary 536 if (!started_bgthread_) { 537 started_bgthread_ = true; 538 PthreadCall( 539 "create thread", 540 pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this)); 541 } 542 543 // If the queue is currently empty, the background thread may currently be 544 // waiting. 545 if (queue_.empty()) { 546 PthreadCall("signal", pthread_cond_signal(&bgsignal_)); 547 } 548 549 // Add to priority queue 550 queue_.push_back(BGItem()); 551 queue_.back().function = function; 552 queue_.back().arg = arg; 553 554 PthreadCall("unlock", pthread_mutex_unlock(&mu_)); 555 } 556 557 void PosixEnv::BGThread() { 558 while (true) { 559 // Wait until there is an item that is ready to run 560 PthreadCall("lock", pthread_mutex_lock(&mu_)); 561 while (queue_.empty()) { 562 PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); 563 } 564 565 void (*function)(void*) = queue_.front().function; 566 void* arg = queue_.front().arg; 567 queue_.pop_front(); 568 569 PthreadCall("unlock", pthread_mutex_unlock(&mu_)); 570 (*function)(arg); 571 } 572 } 573 574 namespace { 575 struct StartThreadState { 576 void (*user_function)(void*); 577 void* arg; 578 }; 579 } 580 static void* StartThreadWrapper(void* arg) { 581 StartThreadState* state = reinterpret_cast<StartThreadState*>(arg); 582 state->user_function(state->arg); 583 delete state; 584 return NULL; 585 } 586 587 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { 588 pthread_t t; 589 StartThreadState* state = new StartThreadState; 590 state->user_function = function; 591 state->arg = arg; 592 PthreadCall("start thread", 593 pthread_create(&t, NULL, &StartThreadWrapper, state)); 594 } 595 596 } // namespace 597 598 static pthread_once_t once = PTHREAD_ONCE_INIT; 599 static Env* default_env; 600 static void InitDefaultEnv() { default_env = new PosixEnv; } 601 602 Env* Env::Default() { 603 pthread_once(&once, InitDefaultEnv); 604 return default_env; 605 } 606 607 } // namespace leveldb 608