1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/platform/cloud/retrying_file_system.h" 17 #include <functional> 18 #include "tensorflow/core/lib/core/errors.h" 19 #include "tensorflow/core/lib/random/random.h" 20 #include "tensorflow/core/platform/cloud/retrying_utils.h" 21 #include "tensorflow/core/platform/env.h" 22 #include "tensorflow/core/platform/file_system.h" 23 24 namespace tensorflow { 25 26 namespace { 27 28 class RetryingRandomAccessFile : public RandomAccessFile { 29 public: 30 RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file, 31 int64 delay_microseconds) 32 : base_file_(std::move(base_file)), 33 initial_delay_microseconds_(delay_microseconds) {} 34 35 Status Read(uint64 offset, size_t n, StringPiece* result, 36 char* scratch) const override { 37 return RetryingUtils::CallWithRetries( 38 std::bind(&RandomAccessFile::Read, base_file_.get(), offset, n, result, 39 scratch), 40 initial_delay_microseconds_); 41 } 42 43 private: 44 std::unique_ptr<RandomAccessFile> base_file_; 45 const int64 initial_delay_microseconds_; 46 }; 47 48 class RetryingWritableFile : public WritableFile { 49 public: 50 RetryingWritableFile(std::unique_ptr<WritableFile> base_file, 51 int64 delay_microseconds) 52 : base_file_(std::move(base_file)), 53 initial_delay_microseconds_(delay_microseconds) {} 54 55 ~RetryingWritableFile() override { 56 // Makes sure the retrying version of Close() is called in the destructor. 57 Close().IgnoreError(); 58 } 59 60 Status Append(const StringPiece& data) override { 61 return RetryingUtils::CallWithRetries( 62 std::bind(&WritableFile::Append, base_file_.get(), data), 63 initial_delay_microseconds_); 64 } 65 Status Close() override { 66 return RetryingUtils::CallWithRetries( 67 std::bind(&WritableFile::Close, base_file_.get()), 68 initial_delay_microseconds_); 69 } 70 Status Flush() override { 71 return RetryingUtils::CallWithRetries( 72 std::bind(&WritableFile::Flush, base_file_.get()), 73 initial_delay_microseconds_); 74 } 75 Status Sync() override { 76 return RetryingUtils::CallWithRetries( 77 std::bind(&WritableFile::Sync, base_file_.get()), 78 initial_delay_microseconds_); 79 } 80 81 private: 82 std::unique_ptr<WritableFile> base_file_; 83 const int64 initial_delay_microseconds_; 84 }; 85 86 } // namespace 87 88 Status RetryingFileSystem::NewRandomAccessFile( 89 const string& filename, std::unique_ptr<RandomAccessFile>* result) { 90 std::unique_ptr<RandomAccessFile> base_file; 91 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries( 92 std::bind(&FileSystem::NewRandomAccessFile, base_file_system_.get(), 93 filename, &base_file), 94 initial_delay_microseconds_)); 95 result->reset(new RetryingRandomAccessFile(std::move(base_file), 96 initial_delay_microseconds_)); 97 return Status::OK(); 98 } 99 100 Status RetryingFileSystem::NewWritableFile( 101 const string& filename, std::unique_ptr<WritableFile>* result) { 102 std::unique_ptr<WritableFile> base_file; 103 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries( 104 std::bind(&FileSystem::NewWritableFile, base_file_system_.get(), filename, 105 &base_file), 106 initial_delay_microseconds_)); 107 result->reset(new RetryingWritableFile(std::move(base_file), 108 initial_delay_microseconds_)); 109 return Status::OK(); 110 } 111 112 Status RetryingFileSystem::NewAppendableFile( 113 const string& filename, std::unique_ptr<WritableFile>* result) { 114 std::unique_ptr<WritableFile> base_file; 115 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries( 116 std::bind(&FileSystem::NewAppendableFile, base_file_system_.get(), 117 filename, &base_file), 118 initial_delay_microseconds_)); 119 result->reset(new RetryingWritableFile(std::move(base_file), 120 initial_delay_microseconds_)); 121 return Status::OK(); 122 } 123 124 Status RetryingFileSystem::NewReadOnlyMemoryRegionFromFile( 125 const string& filename, std::unique_ptr<ReadOnlyMemoryRegion>* result) { 126 return RetryingUtils::CallWithRetries( 127 std::bind(&FileSystem::NewReadOnlyMemoryRegionFromFile, 128 base_file_system_.get(), filename, result), 129 initial_delay_microseconds_); 130 } 131 132 Status RetryingFileSystem::FileExists(const string& fname) { 133 return RetryingUtils::CallWithRetries( 134 std::bind(&FileSystem::FileExists, base_file_system_.get(), fname), 135 initial_delay_microseconds_); 136 } 137 138 Status RetryingFileSystem::Stat(const string& fname, FileStatistics* stat) { 139 return RetryingUtils::CallWithRetries( 140 std::bind(&FileSystem::Stat, base_file_system_.get(), fname, stat), 141 initial_delay_microseconds_); 142 } 143 144 Status RetryingFileSystem::GetChildren(const string& dir, 145 std::vector<string>* result) { 146 return RetryingUtils::CallWithRetries( 147 std::bind(&FileSystem::GetChildren, base_file_system_.get(), dir, result), 148 initial_delay_microseconds_); 149 } 150 151 Status RetryingFileSystem::GetMatchingPaths(const string& pattern, 152 std::vector<string>* result) { 153 return RetryingUtils::CallWithRetries( 154 std::bind(&FileSystem::GetMatchingPaths, base_file_system_.get(), pattern, 155 result), 156 initial_delay_microseconds_); 157 } 158 159 Status RetryingFileSystem::DeleteFile(const string& fname) { 160 return RetryingUtils::DeleteWithRetries( 161 std::bind(&FileSystem::DeleteFile, base_file_system_.get(), fname), 162 initial_delay_microseconds_); 163 } 164 165 Status RetryingFileSystem::CreateDir(const string& dirname) { 166 return RetryingUtils::CallWithRetries( 167 std::bind(&FileSystem::CreateDir, base_file_system_.get(), dirname), 168 initial_delay_microseconds_); 169 } 170 171 Status RetryingFileSystem::DeleteDir(const string& dirname) { 172 return RetryingUtils::DeleteWithRetries( 173 std::bind(&FileSystem::DeleteDir, base_file_system_.get(), dirname), 174 initial_delay_microseconds_); 175 } 176 177 Status RetryingFileSystem::GetFileSize(const string& fname, uint64* file_size) { 178 return RetryingUtils::CallWithRetries( 179 std::bind(&FileSystem::GetFileSize, base_file_system_.get(), fname, 180 file_size), 181 initial_delay_microseconds_); 182 } 183 184 Status RetryingFileSystem::RenameFile(const string& src, const string& target) { 185 return RetryingUtils::CallWithRetries( 186 std::bind(&FileSystem::RenameFile, base_file_system_.get(), src, target), 187 initial_delay_microseconds_); 188 } 189 190 Status RetryingFileSystem::IsDirectory(const string& dirname) { 191 return RetryingUtils::CallWithRetries( 192 std::bind(&FileSystem::IsDirectory, base_file_system_.get(), dirname), 193 initial_delay_microseconds_); 194 } 195 196 Status RetryingFileSystem::DeleteRecursively(const string& dirname, 197 int64* undeleted_files, 198 int64* undeleted_dirs) { 199 return RetryingUtils::DeleteWithRetries( 200 std::bind(&FileSystem::DeleteRecursively, base_file_system_.get(), 201 dirname, undeleted_files, undeleted_dirs), 202 initial_delay_microseconds_); 203 } 204 205 void RetryingFileSystem::FlushCaches() { base_file_system_->FlushCaches(); } 206 207 } // namespace tensorflow 208