Home | History | Annotate | Download | only in cloud
      1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/platform/cloud/retrying_file_system.h"
     17 #include <functional>
     18 #include "tensorflow/core/lib/core/errors.h"
     19 #include "tensorflow/core/lib/random/random.h"
     20 #include "tensorflow/core/platform/cloud/retrying_utils.h"
     21 #include "tensorflow/core/platform/env.h"
     22 #include "tensorflow/core/platform/file_system.h"
     23 
     24 namespace tensorflow {
     25 
     26 namespace {
     27 
     28 class RetryingRandomAccessFile : public RandomAccessFile {
     29  public:
     30   RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,
     31                            int64 delay_microseconds)
     32       : base_file_(std::move(base_file)),
     33         initial_delay_microseconds_(delay_microseconds) {}
     34 
     35   Status Read(uint64 offset, size_t n, StringPiece* result,
     36               char* scratch) const override {
     37     return RetryingUtils::CallWithRetries(
     38         std::bind(&RandomAccessFile::Read, base_file_.get(), offset, n, result,
     39                   scratch),
     40         initial_delay_microseconds_);
     41   }
     42 
     43  private:
     44   std::unique_ptr<RandomAccessFile> base_file_;
     45   const int64 initial_delay_microseconds_;
     46 };
     47 
     48 class RetryingWritableFile : public WritableFile {
     49  public:
     50   RetryingWritableFile(std::unique_ptr<WritableFile> base_file,
     51                        int64 delay_microseconds)
     52       : base_file_(std::move(base_file)),
     53         initial_delay_microseconds_(delay_microseconds) {}
     54 
     55   ~RetryingWritableFile() override {
     56     // Makes sure the retrying version of Close() is called in the destructor.
     57     Close().IgnoreError();
     58   }
     59 
     60   Status Append(const StringPiece& data) override {
     61     return RetryingUtils::CallWithRetries(
     62         std::bind(&WritableFile::Append, base_file_.get(), data),
     63         initial_delay_microseconds_);
     64   }
     65   Status Close() override {
     66     return RetryingUtils::CallWithRetries(
     67         std::bind(&WritableFile::Close, base_file_.get()),
     68         initial_delay_microseconds_);
     69   }
     70   Status Flush() override {
     71     return RetryingUtils::CallWithRetries(
     72         std::bind(&WritableFile::Flush, base_file_.get()),
     73         initial_delay_microseconds_);
     74   }
     75   Status Sync() override {
     76     return RetryingUtils::CallWithRetries(
     77         std::bind(&WritableFile::Sync, base_file_.get()),
     78         initial_delay_microseconds_);
     79   }
     80 
     81  private:
     82   std::unique_ptr<WritableFile> base_file_;
     83   const int64 initial_delay_microseconds_;
     84 };
     85 
     86 }  // namespace
     87 
     88 Status RetryingFileSystem::NewRandomAccessFile(
     89     const string& filename, std::unique_ptr<RandomAccessFile>* result) {
     90   std::unique_ptr<RandomAccessFile> base_file;
     91   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
     92       std::bind(&FileSystem::NewRandomAccessFile, base_file_system_.get(),
     93                 filename, &base_file),
     94       initial_delay_microseconds_));
     95   result->reset(new RetryingRandomAccessFile(std::move(base_file),
     96                                              initial_delay_microseconds_));
     97   return Status::OK();
     98 }
     99 
    100 Status RetryingFileSystem::NewWritableFile(
    101     const string& filename, std::unique_ptr<WritableFile>* result) {
    102   std::unique_ptr<WritableFile> base_file;
    103   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
    104       std::bind(&FileSystem::NewWritableFile, base_file_system_.get(), filename,
    105                 &base_file),
    106       initial_delay_microseconds_));
    107   result->reset(new RetryingWritableFile(std::move(base_file),
    108                                          initial_delay_microseconds_));
    109   return Status::OK();
    110 }
    111 
    112 Status RetryingFileSystem::NewAppendableFile(
    113     const string& filename, std::unique_ptr<WritableFile>* result) {
    114   std::unique_ptr<WritableFile> base_file;
    115   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
    116       std::bind(&FileSystem::NewAppendableFile, base_file_system_.get(),
    117                 filename, &base_file),
    118       initial_delay_microseconds_));
    119   result->reset(new RetryingWritableFile(std::move(base_file),
    120                                          initial_delay_microseconds_));
    121   return Status::OK();
    122 }
    123 
    124 Status RetryingFileSystem::NewReadOnlyMemoryRegionFromFile(
    125     const string& filename, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
    126   return RetryingUtils::CallWithRetries(
    127       std::bind(&FileSystem::NewReadOnlyMemoryRegionFromFile,
    128                 base_file_system_.get(), filename, result),
    129       initial_delay_microseconds_);
    130 }
    131 
    132 Status RetryingFileSystem::FileExists(const string& fname) {
    133   return RetryingUtils::CallWithRetries(
    134       std::bind(&FileSystem::FileExists, base_file_system_.get(), fname),
    135       initial_delay_microseconds_);
    136 }
    137 
    138 Status RetryingFileSystem::Stat(const string& fname, FileStatistics* stat) {
    139   return RetryingUtils::CallWithRetries(
    140       std::bind(&FileSystem::Stat, base_file_system_.get(), fname, stat),
    141       initial_delay_microseconds_);
    142 }
    143 
    144 Status RetryingFileSystem::GetChildren(const string& dir,
    145                                        std::vector<string>* result) {
    146   return RetryingUtils::CallWithRetries(
    147       std::bind(&FileSystem::GetChildren, base_file_system_.get(), dir, result),
    148       initial_delay_microseconds_);
    149 }
    150 
    151 Status RetryingFileSystem::GetMatchingPaths(const string& pattern,
    152                                             std::vector<string>* result) {
    153   return RetryingUtils::CallWithRetries(
    154       std::bind(&FileSystem::GetMatchingPaths, base_file_system_.get(), pattern,
    155                 result),
    156       initial_delay_microseconds_);
    157 }
    158 
    159 Status RetryingFileSystem::DeleteFile(const string& fname) {
    160   return RetryingUtils::DeleteWithRetries(
    161       std::bind(&FileSystem::DeleteFile, base_file_system_.get(), fname),
    162       initial_delay_microseconds_);
    163 }
    164 
    165 Status RetryingFileSystem::CreateDir(const string& dirname) {
    166   return RetryingUtils::CallWithRetries(
    167       std::bind(&FileSystem::CreateDir, base_file_system_.get(), dirname),
    168       initial_delay_microseconds_);
    169 }
    170 
    171 Status RetryingFileSystem::DeleteDir(const string& dirname) {
    172   return RetryingUtils::DeleteWithRetries(
    173       std::bind(&FileSystem::DeleteDir, base_file_system_.get(), dirname),
    174       initial_delay_microseconds_);
    175 }
    176 
    177 Status RetryingFileSystem::GetFileSize(const string& fname, uint64* file_size) {
    178   return RetryingUtils::CallWithRetries(
    179       std::bind(&FileSystem::GetFileSize, base_file_system_.get(), fname,
    180                 file_size),
    181       initial_delay_microseconds_);
    182 }
    183 
    184 Status RetryingFileSystem::RenameFile(const string& src, const string& target) {
    185   return RetryingUtils::CallWithRetries(
    186       std::bind(&FileSystem::RenameFile, base_file_system_.get(), src, target),
    187       initial_delay_microseconds_);
    188 }
    189 
    190 Status RetryingFileSystem::IsDirectory(const string& dirname) {
    191   return RetryingUtils::CallWithRetries(
    192       std::bind(&FileSystem::IsDirectory, base_file_system_.get(), dirname),
    193       initial_delay_microseconds_);
    194 }
    195 
    196 Status RetryingFileSystem::DeleteRecursively(const string& dirname,
    197                                              int64* undeleted_files,
    198                                              int64* undeleted_dirs) {
    199   return RetryingUtils::DeleteWithRetries(
    200       std::bind(&FileSystem::DeleteRecursively, base_file_system_.get(),
    201                 dirname, undeleted_files, undeleted_dirs),
    202       initial_delay_microseconds_);
    203 }
    204 
    205 void RetryingFileSystem::FlushCaches() { base_file_system_->FlushCaches(); }
    206 
    207 }  // namespace tensorflow
    208