1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_FILE_BLOCK_CACHE_H_ 17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_FILE_BLOCK_CACHE_H_ 18 19 #include <functional> 20 #include <list> 21 #include <map> 22 #include <memory> 23 #include <string> 24 #include <vector> 25 #include "tensorflow/core/lib/core/status.h" 26 #include "tensorflow/core/lib/core/stringpiece.h" 27 #include "tensorflow/core/platform/env.h" 28 #include "tensorflow/core/platform/mutex.h" 29 #include "tensorflow/core/platform/notification.h" 30 #include "tensorflow/core/platform/thread_annotations.h" 31 #include "tensorflow/core/platform/types.h" 32 33 namespace tensorflow { 34 35 /// \brief An LRU block cache of file contents, keyed by {filename, offset}. 36 /// 37 /// This class should be shared by read-only random access files on a remote 38 /// filesystem (e.g. GCS). 39 class FileBlockCache { 40 public: 41 /// The callback executed when a block is not found in the cache, and needs to 42 /// be fetched from the backing filesystem. This callback is provided when the 43 /// cache is constructed. The returned Status should be OK as long as the 44 /// read from the remote filesystem succeeded (similar to the semantics of the 45 /// read(2) system call). 46 typedef std::function<Status(const string& filename, size_t offset, 47 size_t buffer_size, char* buffer, 48 size_t* bytes_transferred)> 49 BlockFetcher; 50 51 FileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness, 52 BlockFetcher block_fetcher, Env* env = Env::Default()) 53 : block_size_(block_size), 54 max_bytes_(max_bytes), 55 max_staleness_(max_staleness), 56 block_fetcher_(block_fetcher), 57 env_(env) { 58 if (max_staleness_ > 0) { 59 pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC", 60 [this] { Prune(); })); 61 } 62 } 63 64 ~FileBlockCache() { 65 if (pruning_thread_) { 66 stop_pruning_thread_.Notify(); 67 // Destroying pruning_thread_ will block until Prune() receives the above 68 // notification and returns. 69 pruning_thread_.reset(); 70 } 71 } 72 73 /// Read `n` bytes from `filename` starting at `offset` into `out`. This 74 /// method will return: 75 /// 76 /// 1) The error from the remote filesystem, if the read from the remote 77 /// filesystem failed. 78 /// 2) PRECONDITION_FAILED if the read from the remote filesystem succeeded, 79 /// but the read returned a partial block, and the LRU cache contained a 80 /// block at a higher offset (indicating that the partial block should have 81 /// been a full block). 82 /// 3) OUT_OF_RANGE if the read from the remote filesystem succeeded, but 83 /// the file contents do not extend past `offset` and thus nothing was 84 /// placed in `out`. 85 /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed 86 /// in `out`). 87 Status Read(const string& filename, size_t offset, size_t n, char* buffer, 88 size_t* bytes_transferred); 89 90 /// Remove all cached blocks for `filename`. 91 void RemoveFile(const string& filename) LOCKS_EXCLUDED(mu_); 92 93 /// Remove all cached data. 94 void Flush() LOCKS_EXCLUDED(mu_); 95 96 /// Accessors for cache parameters. 97 size_t block_size() const { return block_size_; } 98 size_t max_bytes() const { return max_bytes_; } 99 uint64 max_staleness() const { return max_staleness_; } 100 101 /// The current size (in bytes) of the cache. 102 size_t CacheSize() const LOCKS_EXCLUDED(mu_); 103 104 private: 105 /// The size of the blocks stored in the LRU cache, as well as the size of the 106 /// reads from the underlying filesystem. 107 const size_t block_size_; 108 /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache. 109 const size_t max_bytes_; 110 /// The maximum staleness of any block in the LRU cache, in seconds. 111 const uint64 max_staleness_; 112 /// The callback to read a block from the underlying filesystem. 113 const BlockFetcher block_fetcher_; 114 /// The Env from which we read timestamps. 115 Env* const env_; // not owned 116 117 /// \brief The key type for the file block cache. 118 /// 119 /// The file block cache key is a {filename, offset} pair. 120 typedef std::pair<string, size_t> Key; 121 122 /// \brief The state of a block. 123 /// 124 /// A block begins in the CREATED stage. The first thread will attempt to read 125 /// the block from the filesystem, transitioning the state of the block to 126 /// FETCHING. After completing, if the read was successful the state should 127 /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can 128 /// re-fetch the block if the state is ERROR. 129 enum class FetchState { 130 CREATED, 131 FETCHING, 132 FINISHED, 133 ERROR, 134 }; 135 136 /// \brief A block of a file. 137 /// 138 /// A file block consists of the block data, the block's current position in 139 /// the LRU cache, the timestamp (seconds since epoch) at which the block 140 /// was cached, a coordination lock, and state & condition variables. 141 /// 142 /// Thread safety: 143 /// The iterator and timestamp fields should only be accessed while holding 144 /// the block-cache-wide mu_ instance variable. The state variable should only 145 /// be accessed while holding the Block's mu lock. The data vector should only 146 /// be accessed after state == FINISHED, and it should never be modified. 147 /// 148 /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock 149 /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking 150 /// mu_. 151 struct Block { 152 /// The block data. 153 std::vector<char> data; 154 /// A list iterator pointing to the block's position in the LRU list. 155 std::list<Key>::iterator lru_iterator; 156 /// A list iterator pointing to the block's position in the LRA list. 157 std::list<Key>::iterator lra_iterator; 158 /// The timestamp (seconds since epoch) at which the block was cached. 159 uint64 timestamp; 160 /// Mutex to guard state variable 161 mutex mu; 162 /// The state of the block. 163 FetchState state GUARDED_BY(mu) = FetchState::CREATED; 164 /// Wait on cond_var if state is FETCHING. 165 condition_variable cond_var; 166 }; 167 168 /// \brief The block map type for the file block cache. 169 /// 170 /// The block map is an ordered map from Key to Block. 171 typedef std::map<Key, std::shared_ptr<Block>> BlockMap; 172 173 /// Prune the cache by removing files with expired blocks. 174 void Prune() LOCKS_EXCLUDED(mu_); 175 176 bool BlockNotStale(const std::shared_ptr<Block>& block) 177 EXCLUSIVE_LOCKS_REQUIRED(mu_); 178 179 /// Look up a Key in the block cache. 180 std::shared_ptr<Block> Lookup(const Key& key) LOCKS_EXCLUDED(mu_); 181 182 Status MaybeFetch(const Key& key, const std::shared_ptr<Block>& block) 183 LOCKS_EXCLUDED(mu_); 184 185 /// Trim the block cache to make room for another entry. 186 void Trim() EXCLUSIVE_LOCKS_REQUIRED(mu_); 187 188 /// Update the LRU iterator for the block at `key`. 189 Status UpdateLRU(const Key& key, const std::shared_ptr<Block>& block) 190 LOCKS_EXCLUDED(mu_); 191 192 /// Remove all blocks of a file, with mu_ already held. 193 void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_); 194 195 /// Remove the block `entry` from the block map and LRU list, and update the 196 /// cache size accordingly. 197 void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_); 198 199 /// The cache pruning thread that removes files with expired blocks. 200 std::unique_ptr<Thread> pruning_thread_; 201 202 /// Notification for stopping the cache pruning thread. 203 Notification stop_pruning_thread_; 204 205 /// Guards access to the block map, LRU list, and cached byte count. 206 mutable mutex mu_; 207 208 /// The block map (map from Key to Block). 209 BlockMap block_map_ GUARDED_BY(mu_); 210 211 /// The LRU list of block keys. The front of the list identifies the most 212 /// recently accessed block. 213 std::list<Key> lru_list_ GUARDED_BY(mu_); 214 215 /// The LRA (least recently added) list of block keys. The front of the list 216 /// identifies the most recently added block. 217 /// 218 /// Note: blocks are added to lra_list_ only after they have successfully been 219 /// fetched from the underlying block store. 220 std::list<Key> lra_list_ GUARDED_BY(mu_); 221 222 /// The combined number of bytes in all of the cached blocks. 223 size_t cache_size_ GUARDED_BY(mu_) = 0; 224 }; 225 226 } // namespace tensorflow 227 228 #endif // TENSORFLOW_CORE_PLATFORM_CLOUD_FILE_BLOCK_CACHE_H_ 229