Home | History | Annotate | Download | only in cloud
      1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_FILE_BLOCK_CACHE_H_
     17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_FILE_BLOCK_CACHE_H_
     18 
     19 #include <functional>
     20 #include <list>
     21 #include <map>
     22 #include <memory>
     23 #include <string>
     24 #include <vector>
     25 #include "tensorflow/core/lib/core/status.h"
     26 #include "tensorflow/core/lib/core/stringpiece.h"
     27 #include "tensorflow/core/platform/env.h"
     28 #include "tensorflow/core/platform/mutex.h"
     29 #include "tensorflow/core/platform/notification.h"
     30 #include "tensorflow/core/platform/thread_annotations.h"
     31 #include "tensorflow/core/platform/types.h"
     32 
     33 namespace tensorflow {
     34 
     35 /// \brief An LRU block cache of file contents, keyed by {filename, offset}.
     36 ///
     37 /// This class should be shared by read-only random access files on a remote
     38 /// filesystem (e.g. GCS).
     39 class FileBlockCache {
     40  public:
     41   /// The callback executed when a block is not found in the cache, and needs to
     42   /// be fetched from the backing filesystem. This callback is provided when the
     43   /// cache is constructed. The returned Status should be OK as long as the
     44   /// read from the remote filesystem succeeded (similar to the semantics of the
     45   /// read(2) system call).
     46   typedef std::function<Status(const string& filename, size_t offset,
     47                                size_t buffer_size, char* buffer,
     48                                size_t* bytes_transferred)>
     49       BlockFetcher;
     50 
     51   FileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness,
     52                  BlockFetcher block_fetcher, Env* env = Env::Default())
     53       : block_size_(block_size),
     54         max_bytes_(max_bytes),
     55         max_staleness_(max_staleness),
     56         block_fetcher_(block_fetcher),
     57         env_(env) {
     58     if (max_staleness_ > 0) {
     59       pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC",
     60                                               [this] { Prune(); }));
     61     }
     62   }
     63 
     64   ~FileBlockCache() {
     65     if (pruning_thread_) {
     66       stop_pruning_thread_.Notify();
     67       // Destroying pruning_thread_ will block until Prune() receives the above
     68       // notification and returns.
     69       pruning_thread_.reset();
     70     }
     71   }
     72 
     73   /// Read `n` bytes from `filename` starting at `offset` into `out`. This
     74   /// method will return:
     75   ///
     76   /// 1) The error from the remote filesystem, if the read from the remote
     77   ///    filesystem failed.
     78   /// 2) PRECONDITION_FAILED if the read from the remote filesystem succeeded,
     79   ///    but the read returned a partial block, and the LRU cache contained a
     80   ///    block at a higher offset (indicating that the partial block should have
     81   ///    been a full block).
     82   /// 3) OUT_OF_RANGE if the read from the remote filesystem succeeded, but
     83   ///    the file contents do not extend past `offset` and thus nothing was
     84   ///    placed in `out`.
     85   /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed
     86   ///    in `out`).
     87   Status Read(const string& filename, size_t offset, size_t n, char* buffer,
     88               size_t* bytes_transferred);
     89 
     90   /// Remove all cached blocks for `filename`.
     91   void RemoveFile(const string& filename) LOCKS_EXCLUDED(mu_);
     92 
     93   /// Remove all cached data.
     94   void Flush() LOCKS_EXCLUDED(mu_);
     95 
     96   /// Accessors for cache parameters.
     97   size_t block_size() const { return block_size_; }
     98   size_t max_bytes() const { return max_bytes_; }
     99   uint64 max_staleness() const { return max_staleness_; }
    100 
    101   /// The current size (in bytes) of the cache.
    102   size_t CacheSize() const LOCKS_EXCLUDED(mu_);
    103 
    104  private:
    105   /// The size of the blocks stored in the LRU cache, as well as the size of the
    106   /// reads from the underlying filesystem.
    107   const size_t block_size_;
    108   /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache.
    109   const size_t max_bytes_;
    110   /// The maximum staleness of any block in the LRU cache, in seconds.
    111   const uint64 max_staleness_;
    112   /// The callback to read a block from the underlying filesystem.
    113   const BlockFetcher block_fetcher_;
    114   /// The Env from which we read timestamps.
    115   Env* const env_;  // not owned
    116 
    117   /// \brief The key type for the file block cache.
    118   ///
    119   /// The file block cache key is a {filename, offset} pair.
    120   typedef std::pair<string, size_t> Key;
    121 
    122   /// \brief The state of a block.
    123   ///
    124   /// A block begins in the CREATED stage. The first thread will attempt to read
    125   /// the block from the filesystem, transitioning the state of the block to
    126   /// FETCHING. After completing, if the read was successful the state should
    127   /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can
    128   /// re-fetch the block if the state is ERROR.
    129   enum class FetchState {
    130     CREATED,
    131     FETCHING,
    132     FINISHED,
    133     ERROR,
    134   };
    135 
    136   /// \brief A block of a file.
    137   ///
    138   /// A file block consists of the block data, the block's current position in
    139   /// the LRU cache, the timestamp (seconds since epoch) at which the block
    140   /// was cached, a coordination lock, and state & condition variables.
    141   ///
    142   /// Thread safety:
    143   /// The iterator and timestamp fields should only be accessed while holding
    144   /// the block-cache-wide mu_ instance variable. The state variable should only
    145   /// be accessed while holding the Block's mu lock. The data vector should only
    146   /// be accessed after state == FINISHED, and it should never be modified.
    147   ///
    148   /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock
    149   /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking
    150   /// mu_.
    151   struct Block {
    152     /// The block data.
    153     std::vector<char> data;
    154     /// A list iterator pointing to the block's position in the LRU list.
    155     std::list<Key>::iterator lru_iterator;
    156     /// A list iterator pointing to the block's position in the LRA list.
    157     std::list<Key>::iterator lra_iterator;
    158     /// The timestamp (seconds since epoch) at which the block was cached.
    159     uint64 timestamp;
    160     /// Mutex to guard state variable
    161     mutex mu;
    162     /// The state of the block.
    163     FetchState state GUARDED_BY(mu) = FetchState::CREATED;
    164     /// Wait on cond_var if state is FETCHING.
    165     condition_variable cond_var;
    166   };
    167 
    168   /// \brief The block map type for the file block cache.
    169   ///
    170   /// The block map is an ordered map from Key to Block.
    171   typedef std::map<Key, std::shared_ptr<Block>> BlockMap;
    172 
    173   /// Prune the cache by removing files with expired blocks.
    174   void Prune() LOCKS_EXCLUDED(mu_);
    175 
    176   bool BlockNotStale(const std::shared_ptr<Block>& block)
    177       EXCLUSIVE_LOCKS_REQUIRED(mu_);
    178 
    179   /// Look up a Key in the block cache.
    180   std::shared_ptr<Block> Lookup(const Key& key) LOCKS_EXCLUDED(mu_);
    181 
    182   Status MaybeFetch(const Key& key, const std::shared_ptr<Block>& block)
    183       LOCKS_EXCLUDED(mu_);
    184 
    185   /// Trim the block cache to make room for another entry.
    186   void Trim() EXCLUSIVE_LOCKS_REQUIRED(mu_);
    187 
    188   /// Update the LRU iterator for the block at `key`.
    189   Status UpdateLRU(const Key& key, const std::shared_ptr<Block>& block)
    190       LOCKS_EXCLUDED(mu_);
    191 
    192   /// Remove all blocks of a file, with mu_ already held.
    193   void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_);
    194 
    195   /// Remove the block `entry` from the block map and LRU list, and update the
    196   /// cache size accordingly.
    197   void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_);
    198 
    199   /// The cache pruning thread that removes files with expired blocks.
    200   std::unique_ptr<Thread> pruning_thread_;
    201 
    202   /// Notification for stopping the cache pruning thread.
    203   Notification stop_pruning_thread_;
    204 
    205   /// Guards access to the block map, LRU list, and cached byte count.
    206   mutable mutex mu_;
    207 
    208   /// The block map (map from Key to Block).
    209   BlockMap block_map_ GUARDED_BY(mu_);
    210 
    211   /// The LRU list of block keys. The front of the list identifies the most
    212   /// recently accessed block.
    213   std::list<Key> lru_list_ GUARDED_BY(mu_);
    214 
    215   /// The LRA (least recently added) list of block keys. The front of the list
    216   /// identifies the most recently added block.
    217   ///
    218   /// Note: blocks are added to lra_list_ only after they have successfully been
    219   /// fetched from the underlying block store.
    220   std::list<Key> lra_list_ GUARDED_BY(mu_);
    221 
    222   /// The combined number of bytes in all of the cached blocks.
    223   size_t cache_size_ GUARDED_BY(mu_) = 0;
    224 };
    225 
    226 }  // namespace tensorflow
    227 
    228 #endif  // TENSORFLOW_CORE_PLATFORM_CLOUD_FILE_BLOCK_CACHE_H_
    229