Home | History | Annotate | Download | only in util
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include <deque>
      6 #include <set>
      7 #include <dirent.h>
      8 #include <errno.h>
      9 #include <fcntl.h>
     10 #include <pthread.h>
     11 #include <stdio.h>
     12 #include <stdlib.h>
     13 #include <string.h>
     14 #include <sys/mman.h>
     15 #include <sys/stat.h>
     16 #include <sys/time.h>
     17 #include <sys/types.h>
     18 #include <time.h>
     19 #include <unistd.h>
     20 #if defined(LEVELDB_PLATFORM_ANDROID)
     21 #include <sys/stat.h>
     22 #endif
     23 #include "leveldb/env.h"
     24 #include "leveldb/slice.h"
     25 #include "port/port.h"
     26 #include "util/logging.h"
     27 #include "util/mutexlock.h"
     28 #include "util/posix_logger.h"
     29 
     30 namespace leveldb {
     31 
     32 namespace {
     33 
     34 static Status IOError(const std::string& context, int err_number) {
     35   return Status::IOError(context, strerror(err_number));
     36 }
     37 
     38 class PosixSequentialFile: public SequentialFile {
     39  private:
     40   std::string filename_;
     41   FILE* file_;
     42 
     43  public:
     44   PosixSequentialFile(const std::string& fname, FILE* f)
     45       : filename_(fname), file_(f) { }
     46   virtual ~PosixSequentialFile() { fclose(file_); }
     47 
     48   virtual Status Read(size_t n, Slice* result, char* scratch) {
     49     Status s;
     50     size_t r = fread_unlocked(scratch, 1, n, file_);
     51     *result = Slice(scratch, r);
     52     if (r < n) {
     53       if (feof(file_)) {
     54         // We leave status as ok if we hit the end of the file
     55       } else {
     56         // A partial read with an error: return a non-ok status
     57         s = IOError(filename_, errno);
     58       }
     59     }
     60     return s;
     61   }
     62 
     63   virtual Status Skip(uint64_t n) {
     64     if (fseek(file_, n, SEEK_CUR)) {
     65       return IOError(filename_, errno);
     66     }
     67     return Status::OK();
     68   }
     69 };
     70 
     71 // pread() based random-access
     72 class PosixRandomAccessFile: public RandomAccessFile {
     73  private:
     74   std::string filename_;
     75   int fd_;
     76 
     77  public:
     78   PosixRandomAccessFile(const std::string& fname, int fd)
     79       : filename_(fname), fd_(fd) { }
     80   virtual ~PosixRandomAccessFile() { close(fd_); }
     81 
     82   virtual Status Read(uint64_t offset, size_t n, Slice* result,
     83                       char* scratch) const {
     84     Status s;
     85     ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
     86     *result = Slice(scratch, (r < 0) ? 0 : r);
     87     if (r < 0) {
     88       // An error: return a non-ok status
     89       s = IOError(filename_, errno);
     90     }
     91     return s;
     92   }
     93 };
     94 
     95 // Helper class to limit mmap file usage so that we do not end up
     96 // running out virtual memory or running into kernel performance
     97 // problems for very large databases.
     98 class MmapLimiter {
     99  public:
    100   // Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes.
    101   MmapLimiter() {
    102     SetAllowed(sizeof(void*) >= 8 ? 1000 : 0);
    103   }
    104 
    105   // If another mmap slot is available, acquire it and return true.
    106   // Else return false.
    107   bool Acquire() {
    108     if (GetAllowed() <= 0) {
    109       return false;
    110     }
    111     MutexLock l(&mu_);
    112     intptr_t x = GetAllowed();
    113     if (x <= 0) {
    114       return false;
    115     } else {
    116       SetAllowed(x - 1);
    117       return true;
    118     }
    119   }
    120 
    121   // Release a slot acquired by a previous call to Acquire() that returned true.
    122   void Release() {
    123     MutexLock l(&mu_);
    124     SetAllowed(GetAllowed() + 1);
    125   }
    126 
    127  private:
    128   port::Mutex mu_;
    129   port::AtomicPointer allowed_;
    130 
    131   intptr_t GetAllowed() const {
    132     return reinterpret_cast<intptr_t>(allowed_.Acquire_Load());
    133   }
    134 
    135   // REQUIRES: mu_ must be held
    136   void SetAllowed(intptr_t v) {
    137     allowed_.Release_Store(reinterpret_cast<void*>(v));
    138   }
    139 
    140   MmapLimiter(const MmapLimiter&);
    141   void operator=(const MmapLimiter&);
    142 };
    143 
    144 // mmap() based random-access
    145 class PosixMmapReadableFile: public RandomAccessFile {
    146  private:
    147   std::string filename_;
    148   void* mmapped_region_;
    149   size_t length_;
    150   MmapLimiter* limiter_;
    151 
    152  public:
    153   // base[0,length-1] contains the mmapped contents of the file.
    154   PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
    155                         MmapLimiter* limiter)
    156       : filename_(fname), mmapped_region_(base), length_(length),
    157         limiter_(limiter) {
    158   }
    159 
    160   virtual ~PosixMmapReadableFile() {
    161     munmap(mmapped_region_, length_);
    162     limiter_->Release();
    163   }
    164 
    165   virtual Status Read(uint64_t offset, size_t n, Slice* result,
    166                       char* scratch) const {
    167     Status s;
    168     if (offset + n > length_) {
    169       *result = Slice();
    170       s = IOError(filename_, EINVAL);
    171     } else {
    172       *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
    173     }
    174     return s;
    175   }
    176 };
    177 
    178 class PosixWritableFile : public WritableFile {
    179  private:
    180   std::string filename_;
    181   FILE* file_;
    182 
    183  public:
    184   PosixWritableFile(const std::string& fname, FILE* f)
    185       : filename_(fname), file_(f) { }
    186 
    187   ~PosixWritableFile() {
    188     if (file_ != NULL) {
    189       // Ignoring any potential errors
    190       fclose(file_);
    191     }
    192   }
    193 
    194   virtual Status Append(const Slice& data) {
    195     size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
    196     if (r != data.size()) {
    197       return IOError(filename_, errno);
    198     }
    199     return Status::OK();
    200   }
    201 
    202   virtual Status Close() {
    203     Status result;
    204     if (fclose(file_) != 0) {
    205       result = IOError(filename_, errno);
    206     }
    207     file_ = NULL;
    208     return result;
    209   }
    210 
    211   virtual Status Flush() {
    212     if (fflush_unlocked(file_) != 0) {
    213       return IOError(filename_, errno);
    214     }
    215     return Status::OK();
    216   }
    217 
    218   Status SyncDirIfManifest() {
    219     const char* f = filename_.c_str();
    220     const char* sep = strrchr(f, '/');
    221     Slice basename;
    222     std::string dir;
    223     if (sep == NULL) {
    224       dir = ".";
    225       basename = f;
    226     } else {
    227       dir = std::string(f, sep - f);
    228       basename = sep + 1;
    229     }
    230     Status s;
    231     if (basename.starts_with("MANIFEST")) {
    232       int fd = open(dir.c_str(), O_RDONLY);
    233       if (fd < 0) {
    234         s = IOError(dir, errno);
    235       } else {
    236         if (fsync(fd) < 0) {
    237           s = IOError(dir, errno);
    238         }
    239         close(fd);
    240       }
    241     }
    242     return s;
    243   }
    244 
    245   virtual Status Sync() {
    246     // Ensure new files referred to by the manifest are in the filesystem.
    247     Status s = SyncDirIfManifest();
    248     if (!s.ok()) {
    249       return s;
    250     }
    251     if (fflush_unlocked(file_) != 0 ||
    252         fdatasync(fileno(file_)) != 0) {
    253       s = Status::IOError(filename_, strerror(errno));
    254     }
    255     return s;
    256   }
    257 };
    258 
    259 static int LockOrUnlock(int fd, bool lock) {
    260   errno = 0;
    261   struct flock f;
    262   memset(&f, 0, sizeof(f));
    263   f.l_type = (lock ? F_WRLCK : F_UNLCK);
    264   f.l_whence = SEEK_SET;
    265   f.l_start = 0;
    266   f.l_len = 0;        // Lock/unlock entire file
    267   return fcntl(fd, F_SETLK, &f);
    268 }
    269 
    270 class PosixFileLock : public FileLock {
    271  public:
    272   int fd_;
    273   std::string name_;
    274 };
    275 
    276 // Set of locked files.  We keep a separate set instead of just
    277 // relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide
    278 // any protection against multiple uses from the same process.
    279 class PosixLockTable {
    280  private:
    281   port::Mutex mu_;
    282   std::set<std::string> locked_files_;
    283  public:
    284   bool Insert(const std::string& fname) {
    285     MutexLock l(&mu_);
    286     return locked_files_.insert(fname).second;
    287   }
    288   void Remove(const std::string& fname) {
    289     MutexLock l(&mu_);
    290     locked_files_.erase(fname);
    291   }
    292 };
    293 
    294 class PosixEnv : public Env {
    295  public:
    296   PosixEnv();
    297   virtual ~PosixEnv() {
    298     fprintf(stderr, "Destroying Env::Default()\n");
    299     abort();
    300   }
    301 
    302   virtual Status NewSequentialFile(const std::string& fname,
    303                                    SequentialFile** result) {
    304     FILE* f = fopen(fname.c_str(), "r");
    305     if (f == NULL) {
    306       *result = NULL;
    307       return IOError(fname, errno);
    308     } else {
    309       *result = new PosixSequentialFile(fname, f);
    310       return Status::OK();
    311     }
    312   }
    313 
    314   virtual Status NewRandomAccessFile(const std::string& fname,
    315                                      RandomAccessFile** result) {
    316     *result = NULL;
    317     Status s;
    318     int fd = open(fname.c_str(), O_RDONLY);
    319     if (fd < 0) {
    320       s = IOError(fname, errno);
    321     } else if (mmap_limit_.Acquire()) {
    322       uint64_t size;
    323       s = GetFileSize(fname, &size);
    324       if (s.ok()) {
    325         void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
    326         if (base != MAP_FAILED) {
    327           *result = new PosixMmapReadableFile(fname, base, size, &mmap_limit_);
    328         } else {
    329           s = IOError(fname, errno);
    330         }
    331       }
    332       close(fd);
    333       if (!s.ok()) {
    334         mmap_limit_.Release();
    335       }
    336     } else {
    337       *result = new PosixRandomAccessFile(fname, fd);
    338     }
    339     return s;
    340   }
    341 
    342   virtual Status NewWritableFile(const std::string& fname,
    343                                  WritableFile** result) {
    344     Status s;
    345     FILE* f = fopen(fname.c_str(), "w");
    346     if (f == NULL) {
    347       *result = NULL;
    348       s = IOError(fname, errno);
    349     } else {
    350       *result = new PosixWritableFile(fname, f);
    351     }
    352     return s;
    353   }
    354 
    355   virtual bool FileExists(const std::string& fname) {
    356     return access(fname.c_str(), F_OK) == 0;
    357   }
    358 
    359   virtual Status GetChildren(const std::string& dir,
    360                              std::vector<std::string>* result) {
    361     result->clear();
    362     DIR* d = opendir(dir.c_str());
    363     if (d == NULL) {
    364       return IOError(dir, errno);
    365     }
    366     struct dirent* entry;
    367     while ((entry = readdir(d)) != NULL) {
    368       result->push_back(entry->d_name);
    369     }
    370     closedir(d);
    371     return Status::OK();
    372   }
    373 
    374   virtual Status DeleteFile(const std::string& fname) {
    375     Status result;
    376     if (unlink(fname.c_str()) != 0) {
    377       result = IOError(fname, errno);
    378     }
    379     return result;
    380   }
    381 
    382   virtual Status CreateDir(const std::string& name) {
    383     Status result;
    384     if (mkdir(name.c_str(), 0755) != 0) {
    385       result = IOError(name, errno);
    386     }
    387     return result;
    388   }
    389 
    390   virtual Status DeleteDir(const std::string& name) {
    391     Status result;
    392     if (rmdir(name.c_str()) != 0) {
    393       result = IOError(name, errno);
    394     }
    395     return result;
    396   }
    397 
    398   virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
    399     Status s;
    400     struct stat sbuf;
    401     if (stat(fname.c_str(), &sbuf) != 0) {
    402       *size = 0;
    403       s = IOError(fname, errno);
    404     } else {
    405       *size = sbuf.st_size;
    406     }
    407     return s;
    408   }
    409 
    410   virtual Status RenameFile(const std::string& src, const std::string& target) {
    411     Status result;
    412     if (rename(src.c_str(), target.c_str()) != 0) {
    413       result = IOError(src, errno);
    414     }
    415     return result;
    416   }
    417 
    418   virtual Status LockFile(const std::string& fname, FileLock** lock) {
    419     *lock = NULL;
    420     Status result;
    421     int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
    422     if (fd < 0) {
    423       result = IOError(fname, errno);
    424     } else if (!locks_.Insert(fname)) {
    425       close(fd);
    426       result = Status::IOError("lock " + fname, "already held by process");
    427     } else if (LockOrUnlock(fd, true) == -1) {
    428       result = IOError("lock " + fname, errno);
    429       close(fd);
    430       locks_.Remove(fname);
    431     } else {
    432       PosixFileLock* my_lock = new PosixFileLock;
    433       my_lock->fd_ = fd;
    434       my_lock->name_ = fname;
    435       *lock = my_lock;
    436     }
    437     return result;
    438   }
    439 
    440   virtual Status UnlockFile(FileLock* lock) {
    441     PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
    442     Status result;
    443     if (LockOrUnlock(my_lock->fd_, false) == -1) {
    444       result = IOError("unlock", errno);
    445     }
    446     locks_.Remove(my_lock->name_);
    447     close(my_lock->fd_);
    448     delete my_lock;
    449     return result;
    450   }
    451 
    452   virtual void Schedule(void (*function)(void*), void* arg);
    453 
    454   virtual void StartThread(void (*function)(void* arg), void* arg);
    455 
    456   virtual Status GetTestDirectory(std::string* result) {
    457     const char* env = getenv("TEST_TMPDIR");
    458     if (env && env[0] != '\0') {
    459       *result = env;
    460     } else {
    461       char buf[100];
    462       snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
    463       *result = buf;
    464     }
    465     // Directory may already exist
    466     CreateDir(*result);
    467     return Status::OK();
    468   }
    469 
    470   static uint64_t gettid() {
    471     pthread_t tid = pthread_self();
    472     uint64_t thread_id = 0;
    473     memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
    474     return thread_id;
    475   }
    476 
    477   virtual Status NewLogger(const std::string& fname, Logger** result) {
    478     FILE* f = fopen(fname.c_str(), "w");
    479     if (f == NULL) {
    480       *result = NULL;
    481       return IOError(fname, errno);
    482     } else {
    483       *result = new PosixLogger(f, &PosixEnv::gettid);
    484       return Status::OK();
    485     }
    486   }
    487 
    488   virtual uint64_t NowMicros() {
    489     struct timeval tv;
    490     gettimeofday(&tv, NULL);
    491     return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
    492   }
    493 
    494   virtual void SleepForMicroseconds(int micros) {
    495     usleep(micros);
    496   }
    497 
    498  private:
    499   void PthreadCall(const char* label, int result) {
    500     if (result != 0) {
    501       fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
    502       abort();
    503     }
    504   }
    505 
    506   // BGThread() is the body of the background thread
    507   void BGThread();
    508   static void* BGThreadWrapper(void* arg) {
    509     reinterpret_cast<PosixEnv*>(arg)->BGThread();
    510     return NULL;
    511   }
    512 
    513   pthread_mutex_t mu_;
    514   pthread_cond_t bgsignal_;
    515   pthread_t bgthread_;
    516   bool started_bgthread_;
    517 
    518   // Entry per Schedule() call
    519   struct BGItem { void* arg; void (*function)(void*); };
    520   typedef std::deque<BGItem> BGQueue;
    521   BGQueue queue_;
    522 
    523   PosixLockTable locks_;
    524   MmapLimiter mmap_limit_;
    525 };
    526 
    527 PosixEnv::PosixEnv() : started_bgthread_(false) {
    528   PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
    529   PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
    530 }
    531 
    532 void PosixEnv::Schedule(void (*function)(void*), void* arg) {
    533   PthreadCall("lock", pthread_mutex_lock(&mu_));
    534 
    535   // Start background thread if necessary
    536   if (!started_bgthread_) {
    537     started_bgthread_ = true;
    538     PthreadCall(
    539         "create thread",
    540         pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));
    541   }
    542 
    543   // If the queue is currently empty, the background thread may currently be
    544   // waiting.
    545   if (queue_.empty()) {
    546     PthreadCall("signal", pthread_cond_signal(&bgsignal_));
    547   }
    548 
    549   // Add to priority queue
    550   queue_.push_back(BGItem());
    551   queue_.back().function = function;
    552   queue_.back().arg = arg;
    553 
    554   PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    555 }
    556 
    557 void PosixEnv::BGThread() {
    558   while (true) {
    559     // Wait until there is an item that is ready to run
    560     PthreadCall("lock", pthread_mutex_lock(&mu_));
    561     while (queue_.empty()) {
    562       PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
    563     }
    564 
    565     void (*function)(void*) = queue_.front().function;
    566     void* arg = queue_.front().arg;
    567     queue_.pop_front();
    568 
    569     PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    570     (*function)(arg);
    571   }
    572 }
    573 
    574 namespace {
    575 struct StartThreadState {
    576   void (*user_function)(void*);
    577   void* arg;
    578 };
    579 }
    580 static void* StartThreadWrapper(void* arg) {
    581   StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
    582   state->user_function(state->arg);
    583   delete state;
    584   return NULL;
    585 }
    586 
    587 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
    588   pthread_t t;
    589   StartThreadState* state = new StartThreadState;
    590   state->user_function = function;
    591   state->arg = arg;
    592   PthreadCall("start thread",
    593               pthread_create(&t, NULL,  &StartThreadWrapper, state));
    594 }
    595 
    596 }  // namespace
    597 
    598 static pthread_once_t once = PTHREAD_ONCE_INIT;
    599 static Env* default_env;
    600 static void InitDefaultEnv() { default_env = new PosixEnv; }
    601 
    602 Env* Env::Default() {
    603   pthread_once(&once, InitDefaultEnv);
    604   return default_env;
    605 }
    606 
    607 }  // namespace leveldb
    608