Home | History | Annotate | Download | only in cloud
      1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/platform/cloud/file_block_cache.h"
     17 #include <cstring>
     18 #include <memory>
     19 #include "tensorflow/core/lib/gtl/cleanup.h"
     20 #include "tensorflow/core/platform/env.h"
     21 
     22 namespace tensorflow {
     23 
     24 bool FileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) {
     25   mutex_lock l(block->mu);
     26   if (block->state != FetchState::FINISHED) {
     27     return true;  // No need to check for staleness.
     28   }
     29   if (max_staleness_ == 0) return true;  // Not enforcing staleness.
     30   return env_->NowSeconds() - block->timestamp <= max_staleness_;
     31 }
     32 
     33 std::shared_ptr<FileBlockCache::Block> FileBlockCache::Lookup(const Key& key) {
     34   mutex_lock lock(mu_);
     35   auto entry = block_map_.find(key);
     36   if (entry != block_map_.end()) {
     37     if (BlockNotStale(entry->second)) {
     38       return entry->second;
     39     } else {
     40       // Remove the stale block and continue.
     41       RemoveFile_Locked(key.first);
     42     }
     43   }
     44 
     45   // Insert a new empty block, setting the bookkeeping to sentinel values
     46   // in order to update them as appropriate.
     47   auto new_entry = std::make_shared<Block>();
     48   lru_list_.push_front(key);
     49   lra_list_.push_front(key);
     50   new_entry->lru_iterator = lru_list_.begin();
     51   new_entry->lra_iterator = lra_list_.begin();
     52   new_entry->timestamp = env_->NowSeconds();
     53   block_map_.emplace(std::make_pair(key, new_entry));
     54   return new_entry;
     55 }
     56 
     57 // Remove blocks from the cache until we do not exceed our maximum size.
     58 void FileBlockCache::Trim() {
     59   while (!lru_list_.empty() && cache_size_ > max_bytes_) {
     60     RemoveBlock(block_map_.find(lru_list_.back()));
     61   }
     62 }
     63 
     64 /// Move the block to the front of the LRU list if it isn't already there.
     65 Status FileBlockCache::UpdateLRU(const Key& key,
     66                                  const std::shared_ptr<Block>& block) {
     67   mutex_lock lock(mu_);
     68   if (block->timestamp == 0) {
     69     // The block was evicted from another thread. Allow it to remain evicted.
     70     return Status::OK();
     71   }
     72   if (block->lru_iterator != lru_list_.begin()) {
     73     lru_list_.erase(block->lru_iterator);
     74     lru_list_.push_front(key);
     75     block->lru_iterator = lru_list_.begin();
     76   }
     77 
     78   // Check for inconsistent state. If there is a block later in the same file
     79   // in the cache, and our current block is not block size, this likely means
     80   // we have inconsistent state within the cache. Note: it's possible some
     81   // incomplete reads may still go undetected.
     82   if (block->data.size() < block_size_) {
     83     Key fmax = std::make_pair(key.first, std::numeric_limits<size_t>::max());
     84     auto fcmp = block_map_.upper_bound(fmax);
     85     if (fcmp != block_map_.begin() && key < (--fcmp)->first) {
     86       return errors::Internal("Block cache contents are inconsistent.");
     87     }
     88   }
     89 
     90   Trim();
     91 
     92   return Status::OK();
     93 }
     94 
     95 Status FileBlockCache::MaybeFetch(const Key& key,
     96                                   const std::shared_ptr<Block>& block) {
     97   bool downloaded_block = false;
     98   auto reconcile_state =
     99       gtl::MakeCleanup([this, &downloaded_block, &key, &block] {
    100         // Perform this action in a cleanup callback to avoid locking mu_ after
    101         // locking block->mu.
    102         if (downloaded_block) {
    103           mutex_lock l(mu_);
    104           // Do not update state if the block is already to be evicted.
    105           if (block->timestamp != 0) {
    106             cache_size_ += block->data.size();
    107             // Put to beginning of LRA list.
    108             lra_list_.erase(block->lra_iterator);
    109             lra_list_.push_front(key);
    110             block->lra_iterator = lra_list_.begin();
    111             block->timestamp = env_->NowSeconds();
    112           }
    113         }
    114       });
    115   // Loop until either block content is successfully fetched, or our request
    116   // encounters an error.
    117   mutex_lock l(block->mu);
    118   Status status = Status::OK();
    119   while (true) {
    120     switch (block->state) {
    121       case FetchState::ERROR:
    122         TF_FALLTHROUGH_INTENDED;
    123       case FetchState::CREATED:
    124         block->state = FetchState::FETCHING;
    125         block->mu.unlock();  // Release the lock while making the API call.
    126         block->data.clear();
    127         block->data.resize(block_size_, 0);
    128         size_t bytes_transferred;
    129         status.Update(block_fetcher_(key.first, key.second, block_size_,
    130                                      block->data.data(), &bytes_transferred));
    131         block->mu.lock();  // Reacquire the lock immediately afterwards
    132         if (status.ok()) {
    133           block->data.resize(bytes_transferred, 0);
    134           block->data.shrink_to_fit();
    135           downloaded_block = true;
    136           block->state = FetchState::FINISHED;
    137         } else {
    138           block->state = FetchState::ERROR;
    139         }
    140         block->cond_var.notify_all();
    141         return status;
    142       case FetchState::FETCHING:
    143         block->cond_var.wait_for(l, std::chrono::seconds(60));
    144         if (block->state == FetchState::FINISHED) {
    145           return Status::OK();
    146         }
    147         // Re-loop in case of errors.
    148         break;
    149       case FetchState::FINISHED:
    150         return Status::OK();
    151     }
    152   }
    153   return errors::Internal(
    154       "Control flow should never reach the end of FileBlockCache::Fetch.");
    155 }
    156 
    157 Status FileBlockCache::Read(const string& filename, size_t offset, size_t n,
    158                             char* buffer, size_t* bytes_transferred) {
    159   *bytes_transferred = 0;
    160   if (n == 0) {
    161     return Status::OK();
    162   }
    163   if (block_size_ == 0 || max_bytes_ == 0) {
    164     // The cache is effectively disabled, so we pass the read through to the
    165     // fetcher without breaking it up into blocks.
    166     return block_fetcher_(filename, offset, n, buffer, bytes_transferred);
    167   }
    168   // Calculate the block-aligned start and end of the read.
    169   size_t start = block_size_ * (offset / block_size_);
    170   size_t finish = block_size_ * ((offset + n) / block_size_);
    171   if (finish < offset + n) {
    172     finish += block_size_;
    173   }
    174   size_t total_bytes_transferred = 0;
    175   // Now iterate through the blocks, reading them one at a time.
    176   for (size_t pos = start; pos < finish; pos += block_size_) {
    177     Key key = std::make_pair(filename, pos);
    178     // Look up the block, fetching and inserting it if necessary, and update the
    179     // LRU iterator for the key and block.
    180     std::shared_ptr<Block> block = Lookup(key);
    181     DCHECK(block) << "No block for key " << key.first << "@" << key.second;
    182     TF_RETURN_IF_ERROR(MaybeFetch(key, block));
    183     TF_RETURN_IF_ERROR(UpdateLRU(key, block));
    184     // Copy the relevant portion of the block into the result buffer.
    185     const auto& data = block->data;
    186     if (offset >= pos + data.size()) {
    187       // The requested offset is at or beyond the end of the file. This can
    188       // happen if `offset` is not block-aligned, and the read returns the last
    189       // block in the file, which does not extend all the way out to `offset`.
    190       *bytes_transferred = total_bytes_transferred;
    191       return errors::OutOfRange("EOF at offset ", offset, " in file ", filename,
    192                                 " at position ", pos, "with data size ",
    193                                 data.size());
    194     }
    195     auto begin = data.begin();
    196     if (offset > pos) {
    197       // The block begins before the slice we're reading.
    198       begin += offset - pos;
    199     }
    200     auto end = data.end();
    201     if (pos + data.size() > offset + n) {
    202       // The block extends past the end of the slice we're reading.
    203       end -= (pos + data.size()) - (offset + n);
    204     }
    205     if (begin < end) {
    206       size_t bytes_to_copy = end - begin;
    207       memcpy(&buffer[total_bytes_transferred], &*begin, bytes_to_copy);
    208       total_bytes_transferred += bytes_to_copy;
    209     }
    210     if (data.size() < block_size_) {
    211       // The block was a partial block and thus signals EOF at its upper bound.
    212       break;
    213     }
    214   }
    215   *bytes_transferred = total_bytes_transferred;
    216   return Status::OK();
    217 }
    218 
    219 size_t FileBlockCache::CacheSize() const {
    220   mutex_lock lock(mu_);
    221   return cache_size_;
    222 }
    223 
    224 void FileBlockCache::Prune() {
    225   while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) {
    226     mutex_lock lock(mu_);
    227     uint64 now = env_->NowSeconds();
    228     while (!lra_list_.empty()) {
    229       auto it = block_map_.find(lra_list_.back());
    230       if (now - it->second->timestamp <= max_staleness_) {
    231         // The oldest block is not yet expired. Come back later.
    232         break;
    233       }
    234       // We need to make a copy of the filename here, since it could otherwise
    235       // be used within RemoveFile_Locked after `it` is deleted.
    236       RemoveFile_Locked(std::string(it->first.first));
    237     }
    238   }
    239 }
    240 
    241 void FileBlockCache::Flush() {
    242   mutex_lock lock(mu_);
    243   block_map_.clear();
    244   lru_list_.clear();
    245   lra_list_.clear();
    246   cache_size_ = 0;
    247 }
    248 
    249 void FileBlockCache::RemoveFile(const string& filename) {
    250   mutex_lock lock(mu_);
    251   RemoveFile_Locked(filename);
    252 }
    253 
    254 void FileBlockCache::RemoveFile_Locked(const string& filename) {
    255   Key begin = std::make_pair(filename, 0);
    256   auto it = block_map_.lower_bound(begin);
    257   while (it != block_map_.end() && it->first.first == filename) {
    258     auto next = std::next(it);
    259     RemoveBlock(it);
    260     it = next;
    261   }
    262 }
    263 
    264 void FileBlockCache::RemoveBlock(BlockMap::iterator entry) {
    265   // This signals that the block is removed, and should not be inadvertently
    266   // reinserted into the cache in UpdateLRU.
    267   entry->second->timestamp = 0;
    268   lru_list_.erase(entry->second->lru_iterator);
    269   lra_list_.erase(entry->second->lra_iterator);
    270   cache_size_ -= entry->second->data.size();
    271   block_map_.erase(entry);
    272 }
    273 
    274 }  // namespace tensorflow
    275