1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/platform/hadoop/hadoop_file_system.h" 17 18 #include <errno.h> 19 20 #include "tensorflow/core/lib/core/status.h" 21 #include "tensorflow/core/lib/io/path.h" 22 #include "tensorflow/core/lib/strings/strcat.h" 23 #include "tensorflow/core/platform/env.h" 24 #include "tensorflow/core/platform/file_system.h" 25 #include "tensorflow/core/platform/logging.h" 26 #include "tensorflow/core/platform/mutex.h" 27 #include "tensorflow/core/platform/posix/error.h" 28 #include "third_party/hadoop/hdfs.h" 29 30 namespace tensorflow { 31 32 template <typename R, typename... Args> 33 Status BindFunc(void* handle, const char* name, 34 std::function<R(Args...)>* func) { 35 void* symbol_ptr = nullptr; 36 TF_RETURN_IF_ERROR( 37 Env::Default()->GetSymbolFromLibrary(handle, name, &symbol_ptr)); 38 *func = reinterpret_cast<R (*)(Args...)>(symbol_ptr); 39 return Status::OK(); 40 } 41 42 class LibHDFS { 43 public: 44 static LibHDFS* Load() { 45 static LibHDFS* lib = []() -> LibHDFS* { 46 LibHDFS* lib = new LibHDFS; 47 lib->LoadAndBind(); 48 return lib; 49 }(); 50 51 return lib; 52 } 53 54 // The status, if any, from failure to load. 55 Status status() { return status_; } 56 57 std::function<hdfsFS(hdfsBuilder*)> hdfsBuilderConnect; 58 std::function<hdfsBuilder*()> hdfsNewBuilder; 59 std::function<void(hdfsBuilder*, const char*)> hdfsBuilderSetNameNode; 60 std::function<int(const char*, char**)> hdfsConfGetStr; 61 std::function<void(hdfsBuilder*, const char* kerbTicketCachePath)> 62 hdfsBuilderSetKerbTicketCachePath; 63 std::function<int(hdfsFS, hdfsFile)> hdfsCloseFile; 64 std::function<tSize(hdfsFS, hdfsFile, tOffset, void*, tSize)> hdfsPread; 65 std::function<tSize(hdfsFS, hdfsFile, const void*, tSize)> hdfsWrite; 66 std::function<int(hdfsFS, hdfsFile)> hdfsHFlush; 67 std::function<int(hdfsFS, hdfsFile)> hdfsHSync; 68 std::function<hdfsFile(hdfsFS, const char*, int, int, short, tSize)> 69 hdfsOpenFile; 70 std::function<int(hdfsFS, const char*)> hdfsExists; 71 std::function<hdfsFileInfo*(hdfsFS, const char*, int*)> hdfsListDirectory; 72 std::function<void(hdfsFileInfo*, int)> hdfsFreeFileInfo; 73 std::function<int(hdfsFS, const char*, int recursive)> hdfsDelete; 74 std::function<int(hdfsFS, const char*)> hdfsCreateDirectory; 75 std::function<hdfsFileInfo*(hdfsFS, const char*)> hdfsGetPathInfo; 76 std::function<int(hdfsFS, const char*, const char*)> hdfsRename; 77 78 private: 79 void LoadAndBind() { 80 auto TryLoadAndBind = [this](const char* name, void** handle) -> Status { 81 TF_RETURN_IF_ERROR(Env::Default()->LoadLibrary(name, handle)); 82 #define BIND_HDFS_FUNC(function) \ 83 TF_RETURN_IF_ERROR(BindFunc(*handle, #function, &function)); 84 85 BIND_HDFS_FUNC(hdfsBuilderConnect); 86 BIND_HDFS_FUNC(hdfsNewBuilder); 87 BIND_HDFS_FUNC(hdfsBuilderSetNameNode); 88 BIND_HDFS_FUNC(hdfsConfGetStr); 89 BIND_HDFS_FUNC(hdfsBuilderSetKerbTicketCachePath); 90 BIND_HDFS_FUNC(hdfsCloseFile); 91 BIND_HDFS_FUNC(hdfsPread); 92 BIND_HDFS_FUNC(hdfsWrite); 93 BIND_HDFS_FUNC(hdfsHFlush); 94 BIND_HDFS_FUNC(hdfsHSync); 95 BIND_HDFS_FUNC(hdfsOpenFile); 96 BIND_HDFS_FUNC(hdfsExists); 97 BIND_HDFS_FUNC(hdfsListDirectory); 98 BIND_HDFS_FUNC(hdfsFreeFileInfo); 99 BIND_HDFS_FUNC(hdfsDelete); 100 BIND_HDFS_FUNC(hdfsCreateDirectory); 101 BIND_HDFS_FUNC(hdfsGetPathInfo); 102 BIND_HDFS_FUNC(hdfsRename); 103 #undef BIND_HDFS_FUNC 104 return Status::OK(); 105 }; 106 107 // libhdfs.so won't be in the standard locations. Use the path as specified 108 // in the libhdfs documentation. 109 #if defined(PLATFORM_WINDOWS) 110 const char* kLibHdfsDso = "hdfs.dll"; 111 #else 112 const char* kLibHdfsDso = "libhdfs.so"; 113 #endif 114 char* hdfs_home = getenv("HADOOP_HDFS_HOME"); 115 if (hdfs_home == nullptr) { 116 status_ = errors::FailedPrecondition( 117 "Environment variable HADOOP_HDFS_HOME not set"); 118 return; 119 } 120 string path = io::JoinPath(hdfs_home, "lib", "native", kLibHdfsDso); 121 status_ = TryLoadAndBind(path.c_str(), &handle_); 122 if (!status_.ok()) { 123 // try load libhdfs.so using dynamic loader's search path in case 124 // libhdfs.so is installed in non-standard location 125 status_ = TryLoadAndBind(kLibHdfsDso, &handle_); 126 } 127 } 128 129 Status status_; 130 void* handle_ = nullptr; 131 }; 132 133 HadoopFileSystem::HadoopFileSystem() : hdfs_(LibHDFS::Load()) {} 134 135 HadoopFileSystem::~HadoopFileSystem() {} 136 137 // We rely on HDFS connection caching here. The HDFS client calls 138 // org.apache.hadoop.fs.FileSystem.get(), which caches the connection 139 // internally. 140 Status HadoopFileSystem::Connect(StringPiece fname, hdfsFS* fs) { 141 TF_RETURN_IF_ERROR(hdfs_->status()); 142 143 StringPiece scheme, namenode, path; 144 io::ParseURI(fname, &scheme, &namenode, &path); 145 const string nn = namenode.ToString(); 146 147 hdfsBuilder* builder = hdfs_->hdfsNewBuilder(); 148 if (scheme == "file") { 149 hdfs_->hdfsBuilderSetNameNode(builder, nullptr); 150 } else if (scheme == "viewfs") { 151 char* defaultFS = nullptr; 152 hdfs_->hdfsConfGetStr("fs.defaultFS", &defaultFS); 153 StringPiece defaultScheme, defaultCluster, defaultPath; 154 io::ParseURI(defaultFS, &defaultScheme, &defaultCluster, &defaultPath); 155 156 if (scheme != defaultScheme || namenode != defaultCluster) { 157 return errors::Unimplemented( 158 "viewfs is only supported as a fs.defaultFS."); 159 } 160 // The default NameNode configuration will be used (from the XML 161 // configuration files). See: 162 // https://github.com/tensorflow/tensorflow/blob/v1.0.0/third_party/hadoop/hdfs.h#L259 163 hdfs_->hdfsBuilderSetNameNode(builder, "default"); 164 } else { 165 hdfs_->hdfsBuilderSetNameNode(builder, nn.c_str()); 166 } 167 // KERB_TICKET_CACHE_PATH will be deleted in the future, Because KRB5CCNAME is 168 // the build in environment variable of Kerberos, so KERB_TICKET_CACHE_PATH 169 // and related code are unnecessary. 170 char* ticket_cache_path = getenv("KERB_TICKET_CACHE_PATH"); 171 if (ticket_cache_path != nullptr) { 172 hdfs_->hdfsBuilderSetKerbTicketCachePath(builder, ticket_cache_path); 173 } 174 *fs = hdfs_->hdfsBuilderConnect(builder); 175 if (*fs == nullptr) { 176 return errors::NotFound(strerror(errno)); 177 } 178 return Status::OK(); 179 } 180 181 string HadoopFileSystem::TranslateName(const string& name) const { 182 StringPiece scheme, namenode, path; 183 io::ParseURI(name, &scheme, &namenode, &path); 184 return path.ToString(); 185 } 186 187 class HDFSRandomAccessFile : public RandomAccessFile { 188 public: 189 HDFSRandomAccessFile(const string& filename, const string& hdfs_filename, 190 LibHDFS* hdfs, hdfsFS fs, hdfsFile file) 191 : filename_(filename), 192 hdfs_filename_(hdfs_filename), 193 hdfs_(hdfs), 194 fs_(fs), 195 file_(file) {} 196 197 ~HDFSRandomAccessFile() override { 198 if (file_ != nullptr) { 199 mutex_lock lock(mu_); 200 hdfs_->hdfsCloseFile(fs_, file_); 201 } 202 } 203 204 Status Read(uint64 offset, size_t n, StringPiece* result, 205 char* scratch) const override { 206 Status s; 207 char* dst = scratch; 208 bool eof_retried = false; 209 while (n > 0 && s.ok()) { 210 // We lock inside the loop rather than outside so we don't block other 211 // concurrent readers. 212 mutex_lock lock(mu_); 213 tSize r = hdfs_->hdfsPread(fs_, file_, static_cast<tOffset>(offset), dst, 214 static_cast<tSize>(n)); 215 if (r > 0) { 216 dst += r; 217 n -= r; 218 offset += r; 219 } else if (!eof_retried && r == 0) { 220 // Always reopen the file upon reaching EOF to see if there's more data. 221 // If writers are streaming contents while others are concurrently 222 // reading, HDFS requires that we reopen the file to see updated 223 // contents. 224 // 225 // Fixes #5438 226 if (file_ != nullptr && hdfs_->hdfsCloseFile(fs_, file_) != 0) { 227 return IOError(filename_, errno); 228 } 229 file_ = 230 hdfs_->hdfsOpenFile(fs_, hdfs_filename_.c_str(), O_RDONLY, 0, 0, 0); 231 if (file_ == nullptr) { 232 return IOError(filename_, errno); 233 } 234 eof_retried = true; 235 } else if (eof_retried && r == 0) { 236 s = Status(error::OUT_OF_RANGE, "Read less bytes than requested"); 237 } else if (errno == EINTR || errno == EAGAIN) { 238 // hdfsPread may return EINTR too. Just retry. 239 } else { 240 s = IOError(filename_, errno); 241 } 242 } 243 *result = StringPiece(scratch, dst - scratch); 244 return s; 245 } 246 247 private: 248 string filename_; 249 string hdfs_filename_; 250 LibHDFS* hdfs_; 251 hdfsFS fs_; 252 253 mutable mutex mu_; 254 mutable hdfsFile file_ GUARDED_BY(mu_); 255 }; 256 257 Status HadoopFileSystem::NewRandomAccessFile( 258 const string& fname, std::unique_ptr<RandomAccessFile>* result) { 259 hdfsFS fs = nullptr; 260 TF_RETURN_IF_ERROR(Connect(fname, &fs)); 261 262 hdfsFile file = 263 hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_RDONLY, 0, 0, 0); 264 if (file == nullptr) { 265 return IOError(fname, errno); 266 } 267 result->reset( 268 new HDFSRandomAccessFile(fname, TranslateName(fname), hdfs_, fs, file)); 269 return Status::OK(); 270 } 271 272 class HDFSWritableFile : public WritableFile { 273 public: 274 HDFSWritableFile(const string& fname, LibHDFS* hdfs, hdfsFS fs, hdfsFile file) 275 : filename_(fname), hdfs_(hdfs), fs_(fs), file_(file) {} 276 277 ~HDFSWritableFile() override { 278 if (file_ != nullptr) { 279 Close().IgnoreError(); 280 } 281 } 282 283 Status Append(const StringPiece& data) override { 284 if (hdfs_->hdfsWrite(fs_, file_, data.data(), 285 static_cast<tSize>(data.size())) == -1) { 286 return IOError(filename_, errno); 287 } 288 return Status::OK(); 289 } 290 291 Status Close() override { 292 Status result; 293 if (hdfs_->hdfsCloseFile(fs_, file_) != 0) { 294 result = IOError(filename_, errno); 295 } 296 hdfs_ = nullptr; 297 fs_ = nullptr; 298 file_ = nullptr; 299 return result; 300 } 301 302 Status Flush() override { 303 if (hdfs_->hdfsHFlush(fs_, file_) != 0) { 304 return IOError(filename_, errno); 305 } 306 return Status::OK(); 307 } 308 309 Status Sync() override { 310 if (hdfs_->hdfsHSync(fs_, file_) != 0) { 311 return IOError(filename_, errno); 312 } 313 return Status::OK(); 314 } 315 316 private: 317 string filename_; 318 LibHDFS* hdfs_; 319 hdfsFS fs_; 320 hdfsFile file_; 321 }; 322 323 Status HadoopFileSystem::NewWritableFile( 324 const string& fname, std::unique_ptr<WritableFile>* result) { 325 hdfsFS fs = nullptr; 326 TF_RETURN_IF_ERROR(Connect(fname, &fs)); 327 328 hdfsFile file = 329 hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_WRONLY, 0, 0, 0); 330 if (file == nullptr) { 331 return IOError(fname, errno); 332 } 333 result->reset(new HDFSWritableFile(fname, hdfs_, fs, file)); 334 return Status::OK(); 335 } 336 337 Status HadoopFileSystem::NewAppendableFile( 338 const string& fname, std::unique_ptr<WritableFile>* result) { 339 hdfsFS fs = nullptr; 340 TF_RETURN_IF_ERROR(Connect(fname, &fs)); 341 342 hdfsFile file = hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), 343 O_WRONLY | O_APPEND, 0, 0, 0); 344 if (file == nullptr) { 345 return IOError(fname, errno); 346 } 347 result->reset(new HDFSWritableFile(fname, hdfs_, fs, file)); 348 return Status::OK(); 349 } 350 351 Status HadoopFileSystem::NewReadOnlyMemoryRegionFromFile( 352 const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) { 353 // hadoopReadZero() technically supports this call with the following 354 // caveats: 355 // - It only works up to 2 GB. We'd have to Stat() the file to ensure that 356 // it fits. 357 // - If not on the local filesystem, the entire file will be read, making 358 // it inefficient for callers that assume typical mmap() behavior. 359 return errors::Unimplemented("HDFS does not support ReadOnlyMemoryRegion"); 360 } 361 362 Status HadoopFileSystem::FileExists(const string& fname) { 363 hdfsFS fs = nullptr; 364 TF_RETURN_IF_ERROR(Connect(fname, &fs)); 365 if (hdfs_->hdfsExists(fs, TranslateName(fname).c_str()) == 0) { 366 return Status::OK(); 367 } 368 return errors::NotFound(fname, " not found."); 369 } 370 371 Status HadoopFileSystem::GetChildren(const string& dir, 372 std::vector<string>* result) { 373 result->clear(); 374 hdfsFS fs = nullptr; 375 TF_RETURN_IF_ERROR(Connect(dir, &fs)); 376 377 // hdfsListDirectory returns nullptr if the directory is empty. Do a separate 378 // check to verify the directory exists first. 379 FileStatistics stat; 380 TF_RETURN_IF_ERROR(Stat(dir, &stat)); 381 382 int entries = 0; 383 hdfsFileInfo* info = 384 hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries); 385 if (info == nullptr) { 386 if (stat.is_directory) { 387 // Assume it's an empty directory. 388 return Status::OK(); 389 } 390 return IOError(dir, errno); 391 } 392 for (int i = 0; i < entries; i++) { 393 result->push_back(io::Basename(info[i].mName).ToString()); 394 } 395 hdfs_->hdfsFreeFileInfo(info, entries); 396 return Status::OK(); 397 } 398 399 Status HadoopFileSystem::DeleteFile(const string& fname) { 400 hdfsFS fs = nullptr; 401 TF_RETURN_IF_ERROR(Connect(fname, &fs)); 402 403 if (hdfs_->hdfsDelete(fs, TranslateName(fname).c_str(), 404 /*recursive=*/0) != 0) { 405 return IOError(fname, errno); 406 } 407 return Status::OK(); 408 } 409 410 Status HadoopFileSystem::CreateDir(const string& dir) { 411 hdfsFS fs = nullptr; 412 TF_RETURN_IF_ERROR(Connect(dir, &fs)); 413 414 if (hdfs_->hdfsCreateDirectory(fs, TranslateName(dir).c_str()) != 0) { 415 return IOError(dir, errno); 416 } 417 return Status::OK(); 418 } 419 420 Status HadoopFileSystem::DeleteDir(const string& dir) { 421 hdfsFS fs = nullptr; 422 TF_RETURN_IF_ERROR(Connect(dir, &fs)); 423 424 // Count the number of entries in the directory, and only delete if it's 425 // non-empty. This is consistent with the interface, but note that there's 426 // a race condition where a file may be added after this check, in which 427 // case the directory will still be deleted. 428 int entries = 0; 429 hdfsFileInfo* info = 430 hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries); 431 if (info != nullptr) { 432 hdfs_->hdfsFreeFileInfo(info, entries); 433 } 434 // Due to HDFS bug HDFS-8407, we can't distinguish between an error and empty 435 // folder, expscially for Kerberos enable setup, EAGAIN is quite common when 436 // the call is actually successful. Check again by Stat. 437 if (info == nullptr && errno != 0) { 438 FileStatistics stat; 439 TF_RETURN_IF_ERROR(Stat(dir, &stat)); 440 } 441 442 if (entries > 0) { 443 return errors::FailedPrecondition("Cannot delete a non-empty directory."); 444 } 445 if (hdfs_->hdfsDelete(fs, TranslateName(dir).c_str(), 446 /*recursive=*/1) != 0) { 447 return IOError(dir, errno); 448 } 449 return Status::OK(); 450 } 451 452 Status HadoopFileSystem::GetFileSize(const string& fname, uint64* size) { 453 hdfsFS fs = nullptr; 454 TF_RETURN_IF_ERROR(Connect(fname, &fs)); 455 456 hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str()); 457 if (info == nullptr) { 458 return IOError(fname, errno); 459 } 460 *size = static_cast<uint64>(info->mSize); 461 hdfs_->hdfsFreeFileInfo(info, 1); 462 return Status::OK(); 463 } 464 465 Status HadoopFileSystem::RenameFile(const string& src, const string& target) { 466 hdfsFS fs = nullptr; 467 TF_RETURN_IF_ERROR(Connect(src, &fs)); 468 469 if (hdfs_->hdfsExists(fs, TranslateName(target).c_str()) == 0 && 470 hdfs_->hdfsDelete(fs, TranslateName(target).c_str(), 471 /*recursive=*/0) != 0) { 472 return IOError(target, errno); 473 } 474 475 if (hdfs_->hdfsRename(fs, TranslateName(src).c_str(), 476 TranslateName(target).c_str()) != 0) { 477 return IOError(src, errno); 478 } 479 return Status::OK(); 480 } 481 482 Status HadoopFileSystem::Stat(const string& fname, FileStatistics* stats) { 483 hdfsFS fs = nullptr; 484 TF_RETURN_IF_ERROR(Connect(fname, &fs)); 485 486 hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str()); 487 if (info == nullptr) { 488 return IOError(fname, errno); 489 } 490 stats->length = static_cast<int64>(info->mSize); 491 stats->mtime_nsec = static_cast<int64>(info->mLastMod) * 1e9; 492 stats->is_directory = info->mKind == kObjectKindDirectory; 493 hdfs_->hdfsFreeFileInfo(info, 1); 494 return Status::OK(); 495 } 496 497 REGISTER_FILE_SYSTEM("hdfs", HadoopFileSystem); 498 REGISTER_FILE_SYSTEM("viewfs", HadoopFileSystem); 499 500 } // namespace tensorflow 501