Home | History | Annotate | Download | only in hadoop
      1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/platform/hadoop/hadoop_file_system.h"
     17 
     18 #include <errno.h>
     19 
     20 #include "tensorflow/core/lib/core/status.h"
     21 #include "tensorflow/core/lib/io/path.h"
     22 #include "tensorflow/core/lib/strings/strcat.h"
     23 #include "tensorflow/core/platform/env.h"
     24 #include "tensorflow/core/platform/file_system.h"
     25 #include "tensorflow/core/platform/logging.h"
     26 #include "tensorflow/core/platform/mutex.h"
     27 #include "tensorflow/core/platform/posix/error.h"
     28 #include "third_party/hadoop/hdfs.h"
     29 
     30 namespace tensorflow {
     31 
     32 template <typename R, typename... Args>
     33 Status BindFunc(void* handle, const char* name,
     34                 std::function<R(Args...)>* func) {
     35   void* symbol_ptr = nullptr;
     36   TF_RETURN_IF_ERROR(
     37       Env::Default()->GetSymbolFromLibrary(handle, name, &symbol_ptr));
     38   *func = reinterpret_cast<R (*)(Args...)>(symbol_ptr);
     39   return Status::OK();
     40 }
     41 
     42 class LibHDFS {
     43  public:
     44   static LibHDFS* Load() {
     45     static LibHDFS* lib = []() -> LibHDFS* {
     46       LibHDFS* lib = new LibHDFS;
     47       lib->LoadAndBind();
     48       return lib;
     49     }();
     50 
     51     return lib;
     52   }
     53 
     54   // The status, if any, from failure to load.
     55   Status status() { return status_; }
     56 
     57   std::function<hdfsFS(hdfsBuilder*)> hdfsBuilderConnect;
     58   std::function<hdfsBuilder*()> hdfsNewBuilder;
     59   std::function<void(hdfsBuilder*, const char*)> hdfsBuilderSetNameNode;
     60   std::function<int(const char*, char**)> hdfsConfGetStr;
     61   std::function<void(hdfsBuilder*, const char* kerbTicketCachePath)>
     62       hdfsBuilderSetKerbTicketCachePath;
     63   std::function<int(hdfsFS, hdfsFile)> hdfsCloseFile;
     64   std::function<tSize(hdfsFS, hdfsFile, tOffset, void*, tSize)> hdfsPread;
     65   std::function<tSize(hdfsFS, hdfsFile, const void*, tSize)> hdfsWrite;
     66   std::function<int(hdfsFS, hdfsFile)> hdfsHFlush;
     67   std::function<int(hdfsFS, hdfsFile)> hdfsHSync;
     68   std::function<hdfsFile(hdfsFS, const char*, int, int, short, tSize)>
     69       hdfsOpenFile;
     70   std::function<int(hdfsFS, const char*)> hdfsExists;
     71   std::function<hdfsFileInfo*(hdfsFS, const char*, int*)> hdfsListDirectory;
     72   std::function<void(hdfsFileInfo*, int)> hdfsFreeFileInfo;
     73   std::function<int(hdfsFS, const char*, int recursive)> hdfsDelete;
     74   std::function<int(hdfsFS, const char*)> hdfsCreateDirectory;
     75   std::function<hdfsFileInfo*(hdfsFS, const char*)> hdfsGetPathInfo;
     76   std::function<int(hdfsFS, const char*, const char*)> hdfsRename;
     77 
     78  private:
     79   void LoadAndBind() {
     80     auto TryLoadAndBind = [this](const char* name, void** handle) -> Status {
     81       TF_RETURN_IF_ERROR(Env::Default()->LoadLibrary(name, handle));
     82 #define BIND_HDFS_FUNC(function) \
     83   TF_RETURN_IF_ERROR(BindFunc(*handle, #function, &function));
     84 
     85       BIND_HDFS_FUNC(hdfsBuilderConnect);
     86       BIND_HDFS_FUNC(hdfsNewBuilder);
     87       BIND_HDFS_FUNC(hdfsBuilderSetNameNode);
     88       BIND_HDFS_FUNC(hdfsConfGetStr);
     89       BIND_HDFS_FUNC(hdfsBuilderSetKerbTicketCachePath);
     90       BIND_HDFS_FUNC(hdfsCloseFile);
     91       BIND_HDFS_FUNC(hdfsPread);
     92       BIND_HDFS_FUNC(hdfsWrite);
     93       BIND_HDFS_FUNC(hdfsHFlush);
     94       BIND_HDFS_FUNC(hdfsHSync);
     95       BIND_HDFS_FUNC(hdfsOpenFile);
     96       BIND_HDFS_FUNC(hdfsExists);
     97       BIND_HDFS_FUNC(hdfsListDirectory);
     98       BIND_HDFS_FUNC(hdfsFreeFileInfo);
     99       BIND_HDFS_FUNC(hdfsDelete);
    100       BIND_HDFS_FUNC(hdfsCreateDirectory);
    101       BIND_HDFS_FUNC(hdfsGetPathInfo);
    102       BIND_HDFS_FUNC(hdfsRename);
    103 #undef BIND_HDFS_FUNC
    104       return Status::OK();
    105     };
    106 
    107 // libhdfs.so won't be in the standard locations. Use the path as specified
    108 // in the libhdfs documentation.
    109 #if defined(PLATFORM_WINDOWS)
    110     const char* kLibHdfsDso = "hdfs.dll";
    111 #else
    112     const char* kLibHdfsDso = "libhdfs.so";
    113 #endif
    114     char* hdfs_home = getenv("HADOOP_HDFS_HOME");
    115     if (hdfs_home == nullptr) {
    116       status_ = errors::FailedPrecondition(
    117           "Environment variable HADOOP_HDFS_HOME not set");
    118       return;
    119     }
    120     string path = io::JoinPath(hdfs_home, "lib", "native", kLibHdfsDso);
    121     status_ = TryLoadAndBind(path.c_str(), &handle_);
    122     if (!status_.ok()) {
    123       // try load libhdfs.so using dynamic loader's search path in case
    124       // libhdfs.so is installed in non-standard location
    125       status_ = TryLoadAndBind(kLibHdfsDso, &handle_);
    126     }
    127   }
    128 
    129   Status status_;
    130   void* handle_ = nullptr;
    131 };
    132 
    133 HadoopFileSystem::HadoopFileSystem() : hdfs_(LibHDFS::Load()) {}
    134 
    135 HadoopFileSystem::~HadoopFileSystem() {}
    136 
    137 // We rely on HDFS connection caching here. The HDFS client calls
    138 // org.apache.hadoop.fs.FileSystem.get(), which caches the connection
    139 // internally.
    140 Status HadoopFileSystem::Connect(StringPiece fname, hdfsFS* fs) {
    141   TF_RETURN_IF_ERROR(hdfs_->status());
    142 
    143   StringPiece scheme, namenode, path;
    144   io::ParseURI(fname, &scheme, &namenode, &path);
    145   const string nn = namenode.ToString();
    146 
    147   hdfsBuilder* builder = hdfs_->hdfsNewBuilder();
    148   if (scheme == "file") {
    149     hdfs_->hdfsBuilderSetNameNode(builder, nullptr);
    150   } else if (scheme == "viewfs") {
    151     char* defaultFS = nullptr;
    152     hdfs_->hdfsConfGetStr("fs.defaultFS", &defaultFS);
    153     StringPiece defaultScheme, defaultCluster, defaultPath;
    154     io::ParseURI(defaultFS, &defaultScheme, &defaultCluster, &defaultPath);
    155 
    156     if (scheme != defaultScheme || namenode != defaultCluster) {
    157       return errors::Unimplemented(
    158           "viewfs is only supported as a fs.defaultFS.");
    159     }
    160     // The default NameNode configuration will be used (from the XML
    161     // configuration files). See:
    162     // https://github.com/tensorflow/tensorflow/blob/v1.0.0/third_party/hadoop/hdfs.h#L259
    163     hdfs_->hdfsBuilderSetNameNode(builder, "default");
    164   } else {
    165     hdfs_->hdfsBuilderSetNameNode(builder, nn.c_str());
    166   }
    167   // KERB_TICKET_CACHE_PATH will be deleted in the future, Because KRB5CCNAME is
    168   // the build in environment variable of Kerberos, so KERB_TICKET_CACHE_PATH
    169   // and related code are unnecessary.
    170   char* ticket_cache_path = getenv("KERB_TICKET_CACHE_PATH");
    171   if (ticket_cache_path != nullptr) {
    172     hdfs_->hdfsBuilderSetKerbTicketCachePath(builder, ticket_cache_path);
    173   }
    174   *fs = hdfs_->hdfsBuilderConnect(builder);
    175   if (*fs == nullptr) {
    176     return errors::NotFound(strerror(errno));
    177   }
    178   return Status::OK();
    179 }
    180 
    181 string HadoopFileSystem::TranslateName(const string& name) const {
    182   StringPiece scheme, namenode, path;
    183   io::ParseURI(name, &scheme, &namenode, &path);
    184   return path.ToString();
    185 }
    186 
    187 class HDFSRandomAccessFile : public RandomAccessFile {
    188  public:
    189   HDFSRandomAccessFile(const string& filename, const string& hdfs_filename,
    190                        LibHDFS* hdfs, hdfsFS fs, hdfsFile file)
    191       : filename_(filename),
    192         hdfs_filename_(hdfs_filename),
    193         hdfs_(hdfs),
    194         fs_(fs),
    195         file_(file) {}
    196 
    197   ~HDFSRandomAccessFile() override {
    198     if (file_ != nullptr) {
    199       mutex_lock lock(mu_);
    200       hdfs_->hdfsCloseFile(fs_, file_);
    201     }
    202   }
    203 
    204   Status Read(uint64 offset, size_t n, StringPiece* result,
    205               char* scratch) const override {
    206     Status s;
    207     char* dst = scratch;
    208     bool eof_retried = false;
    209     while (n > 0 && s.ok()) {
    210       // We lock inside the loop rather than outside so we don't block other
    211       // concurrent readers.
    212       mutex_lock lock(mu_);
    213       tSize r = hdfs_->hdfsPread(fs_, file_, static_cast<tOffset>(offset), dst,
    214                                  static_cast<tSize>(n));
    215       if (r > 0) {
    216         dst += r;
    217         n -= r;
    218         offset += r;
    219       } else if (!eof_retried && r == 0) {
    220         // Always reopen the file upon reaching EOF to see if there's more data.
    221         // If writers are streaming contents while others are concurrently
    222         // reading, HDFS requires that we reopen the file to see updated
    223         // contents.
    224         //
    225         // Fixes #5438
    226         if (file_ != nullptr && hdfs_->hdfsCloseFile(fs_, file_) != 0) {
    227           return IOError(filename_, errno);
    228         }
    229         file_ =
    230             hdfs_->hdfsOpenFile(fs_, hdfs_filename_.c_str(), O_RDONLY, 0, 0, 0);
    231         if (file_ == nullptr) {
    232           return IOError(filename_, errno);
    233         }
    234         eof_retried = true;
    235       } else if (eof_retried && r == 0) {
    236         s = Status(error::OUT_OF_RANGE, "Read less bytes than requested");
    237       } else if (errno == EINTR || errno == EAGAIN) {
    238         // hdfsPread may return EINTR too. Just retry.
    239       } else {
    240         s = IOError(filename_, errno);
    241       }
    242     }
    243     *result = StringPiece(scratch, dst - scratch);
    244     return s;
    245   }
    246 
    247  private:
    248   string filename_;
    249   string hdfs_filename_;
    250   LibHDFS* hdfs_;
    251   hdfsFS fs_;
    252 
    253   mutable mutex mu_;
    254   mutable hdfsFile file_ GUARDED_BY(mu_);
    255 };
    256 
    257 Status HadoopFileSystem::NewRandomAccessFile(
    258     const string& fname, std::unique_ptr<RandomAccessFile>* result) {
    259   hdfsFS fs = nullptr;
    260   TF_RETURN_IF_ERROR(Connect(fname, &fs));
    261 
    262   hdfsFile file =
    263       hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_RDONLY, 0, 0, 0);
    264   if (file == nullptr) {
    265     return IOError(fname, errno);
    266   }
    267   result->reset(
    268       new HDFSRandomAccessFile(fname, TranslateName(fname), hdfs_, fs, file));
    269   return Status::OK();
    270 }
    271 
    272 class HDFSWritableFile : public WritableFile {
    273  public:
    274   HDFSWritableFile(const string& fname, LibHDFS* hdfs, hdfsFS fs, hdfsFile file)
    275       : filename_(fname), hdfs_(hdfs), fs_(fs), file_(file) {}
    276 
    277   ~HDFSWritableFile() override {
    278     if (file_ != nullptr) {
    279       Close().IgnoreError();
    280     }
    281   }
    282 
    283   Status Append(const StringPiece& data) override {
    284     if (hdfs_->hdfsWrite(fs_, file_, data.data(),
    285                          static_cast<tSize>(data.size())) == -1) {
    286       return IOError(filename_, errno);
    287     }
    288     return Status::OK();
    289   }
    290 
    291   Status Close() override {
    292     Status result;
    293     if (hdfs_->hdfsCloseFile(fs_, file_) != 0) {
    294       result = IOError(filename_, errno);
    295     }
    296     hdfs_ = nullptr;
    297     fs_ = nullptr;
    298     file_ = nullptr;
    299     return result;
    300   }
    301 
    302   Status Flush() override {
    303     if (hdfs_->hdfsHFlush(fs_, file_) != 0) {
    304       return IOError(filename_, errno);
    305     }
    306     return Status::OK();
    307   }
    308 
    309   Status Sync() override {
    310     if (hdfs_->hdfsHSync(fs_, file_) != 0) {
    311       return IOError(filename_, errno);
    312     }
    313     return Status::OK();
    314   }
    315 
    316  private:
    317   string filename_;
    318   LibHDFS* hdfs_;
    319   hdfsFS fs_;
    320   hdfsFile file_;
    321 };
    322 
    323 Status HadoopFileSystem::NewWritableFile(
    324     const string& fname, std::unique_ptr<WritableFile>* result) {
    325   hdfsFS fs = nullptr;
    326   TF_RETURN_IF_ERROR(Connect(fname, &fs));
    327 
    328   hdfsFile file =
    329       hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_WRONLY, 0, 0, 0);
    330   if (file == nullptr) {
    331     return IOError(fname, errno);
    332   }
    333   result->reset(new HDFSWritableFile(fname, hdfs_, fs, file));
    334   return Status::OK();
    335 }
    336 
    337 Status HadoopFileSystem::NewAppendableFile(
    338     const string& fname, std::unique_ptr<WritableFile>* result) {
    339   hdfsFS fs = nullptr;
    340   TF_RETURN_IF_ERROR(Connect(fname, &fs));
    341 
    342   hdfsFile file = hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(),
    343                                       O_WRONLY | O_APPEND, 0, 0, 0);
    344   if (file == nullptr) {
    345     return IOError(fname, errno);
    346   }
    347   result->reset(new HDFSWritableFile(fname, hdfs_, fs, file));
    348   return Status::OK();
    349 }
    350 
    351 Status HadoopFileSystem::NewReadOnlyMemoryRegionFromFile(
    352     const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
    353   // hadoopReadZero() technically supports this call with the following
    354   // caveats:
    355   // - It only works up to 2 GB. We'd have to Stat() the file to ensure that
    356   //   it fits.
    357   // - If not on the local filesystem, the entire file will be read, making
    358   //   it inefficient for callers that assume typical mmap() behavior.
    359   return errors::Unimplemented("HDFS does not support ReadOnlyMemoryRegion");
    360 }
    361 
    362 Status HadoopFileSystem::FileExists(const string& fname) {
    363   hdfsFS fs = nullptr;
    364   TF_RETURN_IF_ERROR(Connect(fname, &fs));
    365   if (hdfs_->hdfsExists(fs, TranslateName(fname).c_str()) == 0) {
    366     return Status::OK();
    367   }
    368   return errors::NotFound(fname, " not found.");
    369 }
    370 
    371 Status HadoopFileSystem::GetChildren(const string& dir,
    372                                      std::vector<string>* result) {
    373   result->clear();
    374   hdfsFS fs = nullptr;
    375   TF_RETURN_IF_ERROR(Connect(dir, &fs));
    376 
    377   // hdfsListDirectory returns nullptr if the directory is empty. Do a separate
    378   // check to verify the directory exists first.
    379   FileStatistics stat;
    380   TF_RETURN_IF_ERROR(Stat(dir, &stat));
    381 
    382   int entries = 0;
    383   hdfsFileInfo* info =
    384       hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries);
    385   if (info == nullptr) {
    386     if (stat.is_directory) {
    387       // Assume it's an empty directory.
    388       return Status::OK();
    389     }
    390     return IOError(dir, errno);
    391   }
    392   for (int i = 0; i < entries; i++) {
    393     result->push_back(io::Basename(info[i].mName).ToString());
    394   }
    395   hdfs_->hdfsFreeFileInfo(info, entries);
    396   return Status::OK();
    397 }
    398 
    399 Status HadoopFileSystem::DeleteFile(const string& fname) {
    400   hdfsFS fs = nullptr;
    401   TF_RETURN_IF_ERROR(Connect(fname, &fs));
    402 
    403   if (hdfs_->hdfsDelete(fs, TranslateName(fname).c_str(),
    404                         /*recursive=*/0) != 0) {
    405     return IOError(fname, errno);
    406   }
    407   return Status::OK();
    408 }
    409 
    410 Status HadoopFileSystem::CreateDir(const string& dir) {
    411   hdfsFS fs = nullptr;
    412   TF_RETURN_IF_ERROR(Connect(dir, &fs));
    413 
    414   if (hdfs_->hdfsCreateDirectory(fs, TranslateName(dir).c_str()) != 0) {
    415     return IOError(dir, errno);
    416   }
    417   return Status::OK();
    418 }
    419 
    420 Status HadoopFileSystem::DeleteDir(const string& dir) {
    421   hdfsFS fs = nullptr;
    422   TF_RETURN_IF_ERROR(Connect(dir, &fs));
    423 
    424   // Count the number of entries in the directory, and only delete if it's
    425   // non-empty. This is consistent with the interface, but note that there's
    426   // a race condition where a file may be added after this check, in which
    427   // case the directory will still be deleted.
    428   int entries = 0;
    429   hdfsFileInfo* info =
    430       hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries);
    431   if (info != nullptr) {
    432     hdfs_->hdfsFreeFileInfo(info, entries);
    433   }
    434   // Due to HDFS bug HDFS-8407, we can't distinguish between an error and empty
    435   // folder, expscially for Kerberos enable setup, EAGAIN is quite common when
    436   // the call is actually successful. Check again by Stat.
    437   if (info == nullptr && errno != 0) {
    438     FileStatistics stat;
    439     TF_RETURN_IF_ERROR(Stat(dir, &stat));
    440   }
    441 
    442   if (entries > 0) {
    443     return errors::FailedPrecondition("Cannot delete a non-empty directory.");
    444   }
    445   if (hdfs_->hdfsDelete(fs, TranslateName(dir).c_str(),
    446                         /*recursive=*/1) != 0) {
    447     return IOError(dir, errno);
    448   }
    449   return Status::OK();
    450 }
    451 
    452 Status HadoopFileSystem::GetFileSize(const string& fname, uint64* size) {
    453   hdfsFS fs = nullptr;
    454   TF_RETURN_IF_ERROR(Connect(fname, &fs));
    455 
    456   hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str());
    457   if (info == nullptr) {
    458     return IOError(fname, errno);
    459   }
    460   *size = static_cast<uint64>(info->mSize);
    461   hdfs_->hdfsFreeFileInfo(info, 1);
    462   return Status::OK();
    463 }
    464 
    465 Status HadoopFileSystem::RenameFile(const string& src, const string& target) {
    466   hdfsFS fs = nullptr;
    467   TF_RETURN_IF_ERROR(Connect(src, &fs));
    468 
    469   if (hdfs_->hdfsExists(fs, TranslateName(target).c_str()) == 0 &&
    470       hdfs_->hdfsDelete(fs, TranslateName(target).c_str(),
    471                         /*recursive=*/0) != 0) {
    472     return IOError(target, errno);
    473   }
    474 
    475   if (hdfs_->hdfsRename(fs, TranslateName(src).c_str(),
    476                         TranslateName(target).c_str()) != 0) {
    477     return IOError(src, errno);
    478   }
    479   return Status::OK();
    480 }
    481 
    482 Status HadoopFileSystem::Stat(const string& fname, FileStatistics* stats) {
    483   hdfsFS fs = nullptr;
    484   TF_RETURN_IF_ERROR(Connect(fname, &fs));
    485 
    486   hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str());
    487   if (info == nullptr) {
    488     return IOError(fname, errno);
    489   }
    490   stats->length = static_cast<int64>(info->mSize);
    491   stats->mtime_nsec = static_cast<int64>(info->mLastMod) * 1e9;
    492   stats->is_directory = info->mKind == kObjectKindDirectory;
    493   hdfs_->hdfsFreeFileInfo(info, 1);
    494   return Status::OK();
    495 }
    496 
    497 REGISTER_FILE_SYSTEM("hdfs", HadoopFileSystem);
    498 REGISTER_FILE_SYSTEM("viewfs", HadoopFileSystem);
    499 
    500 }  // namespace tensorflow
    501