Home | History | Annotate | Download | only in util
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include <deque>
      6 #include <set>
      7 #include <dirent.h>
      8 #include <errno.h>
      9 #include <fcntl.h>
     10 #include <pthread.h>
     11 #include <stdio.h>
     12 #include <stdlib.h>
     13 #include <string.h>
     14 #include <sys/mman.h>
     15 #include <sys/stat.h>
     16 #include <sys/time.h>
     17 #include <sys/types.h>
     18 #include <time.h>
     19 #include <unistd.h>
     20 #if defined(LEVELDB_PLATFORM_ANDROID)
     21 #include <sys/stat.h>
     22 #endif
     23 #include "leveldb/env.h"
     24 #include "leveldb/slice.h"
     25 #include "port/port.h"
     26 #include "util/logging.h"
     27 #include "util/mutexlock.h"
     28 #include "util/posix_logger.h"
     29 
     30 namespace leveldb {
     31 
     32 namespace {
     33 
     34 static Status IOError(const std::string& context, int err_number) {
     35   return Status::IOError(context, strerror(err_number));
     36 }
     37 
     38 class PosixSequentialFile: public SequentialFile {
     39  private:
     40   std::string filename_;
     41   FILE* file_;
     42 
     43  public:
     44   PosixSequentialFile(const std::string& fname, FILE* f)
     45       : filename_(fname), file_(f) { }
     46   virtual ~PosixSequentialFile() { fclose(file_); }
     47 
     48   virtual Status Read(size_t n, Slice* result, char* scratch) {
     49     Status s;
     50     size_t r = fread_unlocked(scratch, 1, n, file_);
     51     *result = Slice(scratch, r);
     52     if (r < n) {
     53       if (feof(file_)) {
     54         // We leave status as ok if we hit the end of the file
     55       } else {
     56         // A partial read with an error: return a non-ok status
     57         s = IOError(filename_, errno);
     58       }
     59     }
     60     return s;
     61   }
     62 
     63   virtual Status Skip(uint64_t n) {
     64     if (fseek(file_, n, SEEK_CUR)) {
     65       return IOError(filename_, errno);
     66     }
     67     return Status::OK();
     68   }
     69 };
     70 
     71 // pread() based random-access
     72 class PosixRandomAccessFile: public RandomAccessFile {
     73  private:
     74   std::string filename_;
     75   int fd_;
     76 
     77  public:
     78   PosixRandomAccessFile(const std::string& fname, int fd)
     79       : filename_(fname), fd_(fd) { }
     80   virtual ~PosixRandomAccessFile() { close(fd_); }
     81 
     82   virtual Status Read(uint64_t offset, size_t n, Slice* result,
     83                       char* scratch) const {
     84     Status s;
     85     ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
     86     *result = Slice(scratch, (r < 0) ? 0 : r);
     87     if (r < 0) {
     88       // An error: return a non-ok status
     89       s = IOError(filename_, errno);
     90     }
     91     return s;
     92   }
     93 };
     94 
     95 // Helper class to limit mmap file usage so that we do not end up
     96 // running out virtual memory or running into kernel performance
     97 // problems for very large databases.
     98 class MmapLimiter {
     99  public:
    100   // Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes.
    101   MmapLimiter() {
    102     SetAllowed(sizeof(void*) >= 8 ? 1000 : 0);
    103   }
    104 
    105   // If another mmap slot is available, acquire it and return true.
    106   // Else return false.
    107   bool Acquire() {
    108     if (GetAllowed() <= 0) {
    109       return false;
    110     }
    111     MutexLock l(&mu_);
    112     intptr_t x = GetAllowed();
    113     if (x <= 0) {
    114       return false;
    115     } else {
    116       SetAllowed(x - 1);
    117       return true;
    118     }
    119   }
    120 
    121   // Release a slot acquired by a previous call to Acquire() that returned true.
    122   void Release() {
    123     MutexLock l(&mu_);
    124     SetAllowed(GetAllowed() + 1);
    125   }
    126 
    127  private:
    128   port::Mutex mu_;
    129   port::AtomicPointer allowed_;
    130 
    131   intptr_t GetAllowed() const {
    132     return reinterpret_cast<intptr_t>(allowed_.Acquire_Load());
    133   }
    134 
    135   // REQUIRES: mu_ must be held
    136   void SetAllowed(intptr_t v) {
    137     allowed_.Release_Store(reinterpret_cast<void*>(v));
    138   }
    139 
    140   MmapLimiter(const MmapLimiter&);
    141   void operator=(const MmapLimiter&);
    142 };
    143 
    144 // mmap() based random-access
    145 class PosixMmapReadableFile: public RandomAccessFile {
    146  private:
    147   std::string filename_;
    148   void* mmapped_region_;
    149   size_t length_;
    150   MmapLimiter* limiter_;
    151 
    152  public:
    153   // base[0,length-1] contains the mmapped contents of the file.
    154   PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
    155                         MmapLimiter* limiter)
    156       : filename_(fname), mmapped_region_(base), length_(length),
    157         limiter_(limiter) {
    158   }
    159 
    160   virtual ~PosixMmapReadableFile() {
    161     munmap(mmapped_region_, length_);
    162     limiter_->Release();
    163   }
    164 
    165   virtual Status Read(uint64_t offset, size_t n, Slice* result,
    166                       char* scratch) const {
    167     Status s;
    168     if (offset + n > length_) {
    169       *result = Slice();
    170       s = IOError(filename_, EINVAL);
    171     } else {
    172       *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
    173     }
    174     return s;
    175   }
    176 };
    177 
    178 // We preallocate up to an extra megabyte and use memcpy to append new
    179 // data to the file.  This is safe since we either properly close the
    180 // file before reading from it, or for log files, the reading code
    181 // knows enough to skip zero suffixes.
    182 class PosixMmapFile : public WritableFile {
    183  private:
    184   std::string filename_;
    185   int fd_;
    186   size_t page_size_;
    187   size_t map_size_;       // How much extra memory to map at a time
    188   char* base_;            // The mapped region
    189   char* limit_;           // Limit of the mapped region
    190   char* dst_;             // Where to write next  (in range [base_,limit_])
    191   char* last_sync_;       // Where have we synced up to
    192   uint64_t file_offset_;  // Offset of base_ in file
    193 
    194   // Have we done an munmap of unsynced data?
    195   bool pending_sync_;
    196 
    197   // Roundup x to a multiple of y
    198   static size_t Roundup(size_t x, size_t y) {
    199     return ((x + y - 1) / y) * y;
    200   }
    201 
    202   size_t TruncateToPageBoundary(size_t s) {
    203     s -= (s & (page_size_ - 1));
    204     assert((s % page_size_) == 0);
    205     return s;
    206   }
    207 
    208   bool UnmapCurrentRegion() {
    209     bool result = true;
    210     if (base_ != NULL) {
    211       if (last_sync_ < limit_) {
    212         // Defer syncing this data until next Sync() call, if any
    213         pending_sync_ = true;
    214       }
    215       if (munmap(base_, limit_ - base_) != 0) {
    216         result = false;
    217       }
    218       file_offset_ += limit_ - base_;
    219       base_ = NULL;
    220       limit_ = NULL;
    221       last_sync_ = NULL;
    222       dst_ = NULL;
    223 
    224       // Increase the amount we map the next time, but capped at 1MB
    225       if (map_size_ < (1<<20)) {
    226         map_size_ *= 2;
    227       }
    228     }
    229     return result;
    230   }
    231 
    232   bool MapNewRegion() {
    233     assert(base_ == NULL);
    234     if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
    235       return false;
    236     }
    237     void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
    238                      fd_, file_offset_);
    239     if (ptr == MAP_FAILED) {
    240       return false;
    241     }
    242     base_ = reinterpret_cast<char*>(ptr);
    243     limit_ = base_ + map_size_;
    244     dst_ = base_;
    245     last_sync_ = base_;
    246     return true;
    247   }
    248 
    249  public:
    250   PosixMmapFile(const std::string& fname, int fd, size_t page_size)
    251       : filename_(fname),
    252         fd_(fd),
    253         page_size_(page_size),
    254         map_size_(Roundup(65536, page_size)),
    255         base_(NULL),
    256         limit_(NULL),
    257         dst_(NULL),
    258         last_sync_(NULL),
    259         file_offset_(0),
    260         pending_sync_(false) {
    261     assert((page_size & (page_size - 1)) == 0);
    262   }
    263 
    264 
    265   ~PosixMmapFile() {
    266     if (fd_ >= 0) {
    267       PosixMmapFile::Close();
    268     }
    269   }
    270 
    271   virtual Status Append(const Slice& data) {
    272     const char* src = data.data();
    273     size_t left = data.size();
    274     while (left > 0) {
    275       assert(base_ <= dst_);
    276       assert(dst_ <= limit_);
    277       size_t avail = limit_ - dst_;
    278       if (avail == 0) {
    279         if (!UnmapCurrentRegion() ||
    280             !MapNewRegion()) {
    281           return IOError(filename_, errno);
    282         }
    283       }
    284 
    285       size_t n = (left <= avail) ? left : avail;
    286       memcpy(dst_, src, n);
    287       dst_ += n;
    288       src += n;
    289       left -= n;
    290     }
    291     return Status::OK();
    292   }
    293 
    294   virtual Status Close() {
    295     Status s;
    296     size_t unused = limit_ - dst_;
    297     if (!UnmapCurrentRegion()) {
    298       s = IOError(filename_, errno);
    299     } else if (unused > 0) {
    300       // Trim the extra space at the end of the file
    301       if (ftruncate(fd_, file_offset_ - unused) < 0) {
    302         s = IOError(filename_, errno);
    303       }
    304     }
    305 
    306     if (close(fd_) < 0) {
    307       if (s.ok()) {
    308         s = IOError(filename_, errno);
    309       }
    310     }
    311 
    312     fd_ = -1;
    313     base_ = NULL;
    314     limit_ = NULL;
    315     return s;
    316   }
    317 
    318   virtual Status Flush() {
    319     return Status::OK();
    320   }
    321 
    322   virtual Status Sync() {
    323     Status s;
    324 
    325     if (pending_sync_) {
    326       // Some unmapped data was not synced
    327       pending_sync_ = false;
    328       if (fdatasync(fd_) < 0) {
    329         s = IOError(filename_, errno);
    330       }
    331     }
    332 
    333     if (dst_ > last_sync_) {
    334       // Find the beginnings of the pages that contain the first and last
    335       // bytes to be synced.
    336       size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
    337       size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
    338       last_sync_ = dst_;
    339       if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
    340         s = IOError(filename_, errno);
    341       }
    342     }
    343 
    344     return s;
    345   }
    346 };
    347 
    348 static int LockOrUnlock(int fd, bool lock) {
    349   errno = 0;
    350   struct flock f;
    351   memset(&f, 0, sizeof(f));
    352   f.l_type = (lock ? F_WRLCK : F_UNLCK);
    353   f.l_whence = SEEK_SET;
    354   f.l_start = 0;
    355   f.l_len = 0;        // Lock/unlock entire file
    356   return fcntl(fd, F_SETLK, &f);
    357 }
    358 
    359 class PosixFileLock : public FileLock {
    360  public:
    361   int fd_;
    362   std::string name_;
    363 };
    364 
    365 // Set of locked files.  We keep a separate set instead of just
    366 // relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide
    367 // any protection against multiple uses from the same process.
    368 class PosixLockTable {
    369  private:
    370   port::Mutex mu_;
    371   std::set<std::string> locked_files_;
    372  public:
    373   bool Insert(const std::string& fname) {
    374     MutexLock l(&mu_);
    375     return locked_files_.insert(fname).second;
    376   }
    377   void Remove(const std::string& fname) {
    378     MutexLock l(&mu_);
    379     locked_files_.erase(fname);
    380   }
    381 };
    382 
    383 class PosixEnv : public Env {
    384  public:
    385   PosixEnv();
    386   virtual ~PosixEnv() {
    387     fprintf(stderr, "Destroying Env::Default()\n");
    388     abort();
    389   }
    390 
    391   virtual Status NewSequentialFile(const std::string& fname,
    392                                    SequentialFile** result) {
    393     FILE* f = fopen(fname.c_str(), "r");
    394     if (f == NULL) {
    395       *result = NULL;
    396       return IOError(fname, errno);
    397     } else {
    398       *result = new PosixSequentialFile(fname, f);
    399       return Status::OK();
    400     }
    401   }
    402 
    403   virtual Status NewRandomAccessFile(const std::string& fname,
    404                                      RandomAccessFile** result) {
    405     *result = NULL;
    406     Status s;
    407     int fd = open(fname.c_str(), O_RDONLY);
    408     if (fd < 0) {
    409       s = IOError(fname, errno);
    410     } else if (mmap_limit_.Acquire()) {
    411       uint64_t size;
    412       s = GetFileSize(fname, &size);
    413       if (s.ok()) {
    414         void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
    415         if (base != MAP_FAILED) {
    416           *result = new PosixMmapReadableFile(fname, base, size, &mmap_limit_);
    417         } else {
    418           s = IOError(fname, errno);
    419         }
    420       }
    421       close(fd);
    422       if (!s.ok()) {
    423         mmap_limit_.Release();
    424       }
    425     } else {
    426       *result = new PosixRandomAccessFile(fname, fd);
    427     }
    428     return s;
    429   }
    430 
    431   virtual Status NewWritableFile(const std::string& fname,
    432                                  WritableFile** result) {
    433     Status s;
    434     const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
    435     if (fd < 0) {
    436       *result = NULL;
    437       s = IOError(fname, errno);
    438     } else {
    439       *result = new PosixMmapFile(fname, fd, page_size_);
    440     }
    441     return s;
    442   }
    443 
    444   virtual bool FileExists(const std::string& fname) {
    445     return access(fname.c_str(), F_OK) == 0;
    446   }
    447 
    448   virtual Status GetChildren(const std::string& dir,
    449                              std::vector<std::string>* result) {
    450     result->clear();
    451     DIR* d = opendir(dir.c_str());
    452     if (d == NULL) {
    453       return IOError(dir, errno);
    454     }
    455     struct dirent* entry;
    456     while ((entry = readdir(d)) != NULL) {
    457       result->push_back(entry->d_name);
    458     }
    459     closedir(d);
    460     return Status::OK();
    461   }
    462 
    463   virtual Status DeleteFile(const std::string& fname) {
    464     Status result;
    465     if (unlink(fname.c_str()) != 0) {
    466       result = IOError(fname, errno);
    467     }
    468     return result;
    469   }
    470 
    471   virtual Status CreateDir(const std::string& name) {
    472     Status result;
    473     if (mkdir(name.c_str(), 0755) != 0) {
    474       result = IOError(name, errno);
    475     }
    476     return result;
    477   }
    478 
    479   virtual Status DeleteDir(const std::string& name) {
    480     Status result;
    481     if (rmdir(name.c_str()) != 0) {
    482       result = IOError(name, errno);
    483     }
    484     return result;
    485   }
    486 
    487   virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
    488     Status s;
    489     struct stat sbuf;
    490     if (stat(fname.c_str(), &sbuf) != 0) {
    491       *size = 0;
    492       s = IOError(fname, errno);
    493     } else {
    494       *size = sbuf.st_size;
    495     }
    496     return s;
    497   }
    498 
    499   virtual Status RenameFile(const std::string& src, const std::string& target) {
    500     Status result;
    501     if (rename(src.c_str(), target.c_str()) != 0) {
    502       result = IOError(src, errno);
    503     }
    504     return result;
    505   }
    506 
    507   virtual Status LockFile(const std::string& fname, FileLock** lock) {
    508     *lock = NULL;
    509     Status result;
    510     int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
    511     if (fd < 0) {
    512       result = IOError(fname, errno);
    513     } else if (!locks_.Insert(fname)) {
    514       close(fd);
    515       result = Status::IOError("lock " + fname, "already held by process");
    516     } else if (LockOrUnlock(fd, true) == -1) {
    517       result = IOError("lock " + fname, errno);
    518       close(fd);
    519       locks_.Remove(fname);
    520     } else {
    521       PosixFileLock* my_lock = new PosixFileLock;
    522       my_lock->fd_ = fd;
    523       my_lock->name_ = fname;
    524       *lock = my_lock;
    525     }
    526     return result;
    527   }
    528 
    529   virtual Status UnlockFile(FileLock* lock) {
    530     PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
    531     Status result;
    532     if (LockOrUnlock(my_lock->fd_, false) == -1) {
    533       result = IOError("unlock", errno);
    534     }
    535     locks_.Remove(my_lock->name_);
    536     close(my_lock->fd_);
    537     delete my_lock;
    538     return result;
    539   }
    540 
    541   virtual void Schedule(void (*function)(void*), void* arg);
    542 
    543   virtual void StartThread(void (*function)(void* arg), void* arg);
    544 
    545   virtual Status GetTestDirectory(std::string* result) {
    546     const char* env = getenv("TEST_TMPDIR");
    547     if (env && env[0] != '\0') {
    548       *result = env;
    549     } else {
    550       char buf[100];
    551       snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
    552       *result = buf;
    553     }
    554     // Directory may already exist
    555     CreateDir(*result);
    556     return Status::OK();
    557   }
    558 
    559   static uint64_t gettid() {
    560     pthread_t tid = pthread_self();
    561     uint64_t thread_id = 0;
    562     memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
    563     return thread_id;
    564   }
    565 
    566   virtual Status NewLogger(const std::string& fname, Logger** result) {
    567     FILE* f = fopen(fname.c_str(), "w");
    568     if (f == NULL) {
    569       *result = NULL;
    570       return IOError(fname, errno);
    571     } else {
    572       *result = new PosixLogger(f, &PosixEnv::gettid);
    573       return Status::OK();
    574     }
    575   }
    576 
    577   virtual uint64_t NowMicros() {
    578     struct timeval tv;
    579     gettimeofday(&tv, NULL);
    580     return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
    581   }
    582 
    583   virtual void SleepForMicroseconds(int micros) {
    584     usleep(micros);
    585   }
    586 
    587  private:
    588   void PthreadCall(const char* label, int result) {
    589     if (result != 0) {
    590       fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
    591       abort();
    592     }
    593   }
    594 
    595   // BGThread() is the body of the background thread
    596   void BGThread();
    597   static void* BGThreadWrapper(void* arg) {
    598     reinterpret_cast<PosixEnv*>(arg)->BGThread();
    599     return NULL;
    600   }
    601 
    602   size_t page_size_;
    603   pthread_mutex_t mu_;
    604   pthread_cond_t bgsignal_;
    605   pthread_t bgthread_;
    606   bool started_bgthread_;
    607 
    608   // Entry per Schedule() call
    609   struct BGItem { void* arg; void (*function)(void*); };
    610   typedef std::deque<BGItem> BGQueue;
    611   BGQueue queue_;
    612 
    613   PosixLockTable locks_;
    614   MmapLimiter mmap_limit_;
    615 };
    616 
    617 PosixEnv::PosixEnv() : page_size_(getpagesize()),
    618                        started_bgthread_(false) {
    619   PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
    620   PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
    621 }
    622 
    623 void PosixEnv::Schedule(void (*function)(void*), void* arg) {
    624   PthreadCall("lock", pthread_mutex_lock(&mu_));
    625 
    626   // Start background thread if necessary
    627   if (!started_bgthread_) {
    628     started_bgthread_ = true;
    629     PthreadCall(
    630         "create thread",
    631         pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));
    632   }
    633 
    634   // If the queue is currently empty, the background thread may currently be
    635   // waiting.
    636   if (queue_.empty()) {
    637     PthreadCall("signal", pthread_cond_signal(&bgsignal_));
    638   }
    639 
    640   // Add to priority queue
    641   queue_.push_back(BGItem());
    642   queue_.back().function = function;
    643   queue_.back().arg = arg;
    644 
    645   PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    646 }
    647 
    648 void PosixEnv::BGThread() {
    649   while (true) {
    650     // Wait until there is an item that is ready to run
    651     PthreadCall("lock", pthread_mutex_lock(&mu_));
    652     while (queue_.empty()) {
    653       PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
    654     }
    655 
    656     void (*function)(void*) = queue_.front().function;
    657     void* arg = queue_.front().arg;
    658     queue_.pop_front();
    659 
    660     PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    661     (*function)(arg);
    662   }
    663 }
    664 
    665 namespace {
    666 struct StartThreadState {
    667   void (*user_function)(void*);
    668   void* arg;
    669 };
    670 }
    671 static void* StartThreadWrapper(void* arg) {
    672   StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
    673   state->user_function(state->arg);
    674   delete state;
    675   return NULL;
    676 }
    677 
    678 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
    679   pthread_t t;
    680   StartThreadState* state = new StartThreadState;
    681   state->user_function = function;
    682   state->arg = arg;
    683   PthreadCall("start thread",
    684               pthread_create(&t, NULL,  &StartThreadWrapper, state));
    685 }
    686 
    687 }  // namespace
    688 
    689 static pthread_once_t once = PTHREAD_ONCE_INIT;
    690 static Env* default_env;
    691 static void InitDefaultEnv() { default_env = new PosixEnv; }
    692 
    693 Env* Env::Default() {
    694   pthread_once(&once, InitDefaultEnv);
    695   return default_env;
    696 }
    697 
    698 }  // namespace leveldb
    699