Home | History | Annotate | Download | only in cloud
      1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/platform/cloud/gcs_file_system.h"
     17 #include <stdio.h>
     18 #include <unistd.h>
     19 #include <algorithm>
     20 #include <cstdio>
     21 #include <cstdlib>
     22 #include <cstring>
     23 #include <fstream>
     24 #include <vector>
     25 #ifdef _WIN32
     26 #include <io.h>  // for _mktemp
     27 #endif
     28 #include "include/json/json.h"
     29 #include "tensorflow/core/lib/core/errors.h"
     30 #include "tensorflow/core/lib/gtl/map_util.h"
     31 #include "tensorflow/core/lib/gtl/stl_util.h"
     32 #include "tensorflow/core/lib/io/path.h"
     33 #include "tensorflow/core/lib/strings/numbers.h"
     34 #include "tensorflow/core/lib/strings/str_util.h"
     35 #include "tensorflow/core/lib/strings/stringprintf.h"
     36 #include "tensorflow/core/platform/cloud/curl_http_request.h"
     37 #include "tensorflow/core/platform/cloud/file_block_cache.h"
     38 #include "tensorflow/core/platform/cloud/google_auth_provider.h"
     39 #include "tensorflow/core/platform/cloud/retrying_utils.h"
     40 #include "tensorflow/core/platform/cloud/time_util.h"
     41 #include "tensorflow/core/platform/env.h"
     42 #include "tensorflow/core/platform/mutex.h"
     43 #include "tensorflow/core/platform/protobuf.h"
     44 #include "tensorflow/core/platform/thread_annotations.h"
     45 
     46 #ifdef _WIN32
     47 #ifdef DeleteFile
     48 #undef DeleteFile
     49 #endif
     50 #endif
     51 
     52 namespace tensorflow {
     53 namespace {
     54 
     55 constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/";
     56 constexpr char kGcsUploadUriBase[] =
     57     "https://www.googleapis.com/upload/storage/v1/";
     58 constexpr char kStorageHost[] = "storage.googleapis.com";
     59 constexpr size_t kReadAppendableFileBufferSize = 1024 * 1024;  // In bytes.
     60 constexpr int kGetChildrenDefaultPageSize = 1000;
     61 // The HTTP response code "308 Resume Incomplete".
     62 constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308;
     63 // The environment variable that overrides the size of the readahead buffer.
     64 // DEPRECATED. Use GCS_BLOCK_SIZE_MB instead.
     65 constexpr char kReadaheadBufferSize[] = "GCS_READAHEAD_BUFFER_SIZE_BYTES";
     66 // The environment variable that overrides the block size for aligned reads from
     67 // GCS. Specified in MB (e.g. "16" = 16 x 1024 x 1024 = 16777216 bytes).
     68 constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB";
     69 constexpr size_t kDefaultBlockSize = 128 * 1024 * 1024;
     70 // The environment variable that overrides the max size of the LRU cache of
     71 // blocks read from GCS. Specified in MB.
     72 constexpr char kMaxCacheSize[] = "GCS_READ_CACHE_MAX_SIZE_MB";
     73 constexpr size_t kDefaultMaxCacheSize = 2 * kDefaultBlockSize;
     74 // The environment variable that overrides the maximum staleness of cached file
     75 // contents. Once any block of a file reaches this staleness, all cached blocks
     76 // will be evicted on the next read.
     77 constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS";
     78 constexpr uint64 kDefaultMaxStaleness = 0;
     79 // The environment variable that overrides the maximum age of entries in the
     80 // Stat cache. A value of 0 (the default) means nothing is cached.
     81 constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE";
     82 constexpr uint64 kStatCacheDefaultMaxAge = 0;
     83 // The environment variable that overrides the maximum number of entries in the
     84 // Stat cache.
     85 constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES";
     86 constexpr size_t kStatCacheDefaultMaxEntries = 1024;
     87 // The environment variable that overrides the maximum age of entries in the
     88 // GetMatchingPaths cache. A value of 0 (the default) means nothing is cached.
     89 constexpr char kMatchingPathsCacheMaxAge[] = "GCS_MATCHING_PATHS_CACHE_MAX_AGE";
     90 constexpr uint64 kMatchingPathsCacheDefaultMaxAge = 0;
     91 // The environment variable that overrides the maximum number of entries in the
     92 // GetMatchingPaths cache.
     93 constexpr char kMatchingPathsCacheMaxEntries[] =
     94     "GCS_MATCHING_PATHS_CACHE_MAX_ENTRIES";
     95 constexpr size_t kMatchingPathsCacheDefaultMaxEntries = 1024;
     96 // The file statistics returned by Stat() for directories.
     97 const FileStatistics DIRECTORY_STAT(0, 0, true);
     98 // Some environments exhibit unreliable DNS resolution. Set this environment
     99 // variable to a positive integer describing the frequency used to refresh the
    100 // userspace DNS cache.
    101 constexpr char kResolveCacheSecs[] = "GCS_RESOLVE_REFRESH_SECS";
    102 // The environment variable to configure the http request's connection timeout.
    103 constexpr char kRequestConnectionTimeout[] =
    104     "GCS_REQUEST_CONNECTION_TIMEOUT_SECS";
    105 // The environment varaible to configure the http request's idle timeout.
    106 constexpr char kRequestIdleTimeout[] = "GCS_REQUEST_IDLE_TIMEOUT_SECS";
    107 // The environment variable to configure the overall request timeout for
    108 // metadata requests.
    109 constexpr char kMetadataRequestTimeout[] = "GCS_METADATA_REQUEST_TIMEOUT_SECS";
    110 // The environment variable to configure the overall request timeout for
    111 // block reads requests.
    112 constexpr char kReadRequestTimeout[] = "GCS_READ_REQUEST_TIMEOUT_SECS";
    113 // The environment variable to configure the overall request timeout for
    114 // upload requests.
    115 constexpr char kWriteRequestTimeout[] = "GCS_WRITE_REQUEST_TIMEOUT_SECS";
    116 // The environment variable to configure an additional header to send with
    117 // all requests to GCS (format HEADERNAME:HEADERCONTENT)
    118 constexpr char kAdditionalRequestHeader[] = "GCS_ADDITIONAL_REQUEST_HEADER";
    119 // The environment variable to configure the throttle (format: <int64>)
    120 constexpr char kThrottleRate[] = "GCS_THROTTLE_TOKEN_RATE";
    121 // The environment variable to configure the token bucket size (format: <int64>)
    122 constexpr char kThrottleBucket[] = "GCS_THROTTLE_BUCKET_SIZE";
    123 // The environment variable that controls the number of tokens per request.
    124 // (format: <int64>)
    125 constexpr char kTokensPerRequest[] = "GCS_TOKENS_PER_REQUEST";
    126 // The environment variable to configure the initial tokens (format: <int64>)
    127 constexpr char kInitialTokens[] = "GCS_INITIAL_TOKENS";
    128 
    129 // TODO: DO NOT use a hardcoded path
    130 Status GetTmpFilename(string* filename) {
    131   if (!filename) {
    132     return errors::Internal("'filename' cannot be nullptr.");
    133   }
    134 #ifndef _WIN32
    135   char buffer[] = "/tmp/gcs_filesystem_XXXXXX";
    136   int fd = mkstemp(buffer);
    137   if (fd < 0) {
    138     return errors::Internal("Failed to create a temporary file.");
    139   }
    140   close(fd);
    141 #else
    142   char buffer[] = "/tmp/gcs_filesystem_XXXXXX";
    143   char* ret = _mktemp(buffer);
    144   if (ret == nullptr) {
    145     return errors::Internal("Failed to create a temporary file.");
    146   }
    147 #endif
    148   *filename = buffer;
    149   return Status::OK();
    150 }
    151 
    152 /// \brief Splits a GCS path to a bucket and an object.
    153 ///
    154 /// For example, "gs://bucket-name/path/to/file.txt" gets split into
    155 /// "bucket-name" and "path/to/file.txt".
    156 /// If fname only contains the bucket and empty_object_ok = true, the returned
    157 /// object is empty.
    158 Status ParseGcsPath(StringPiece fname, bool empty_object_ok, string* bucket,
    159                     string* object) {
    160   if (!bucket || !object) {
    161     return errors::Internal("bucket and object cannot be null.");
    162   }
    163   StringPiece scheme, bucketp, objectp;
    164   io::ParseURI(fname, &scheme, &bucketp, &objectp);
    165   if (scheme != "gs") {
    166     return errors::InvalidArgument("GCS path doesn't start with 'gs://': ",
    167                                    fname);
    168   }
    169   *bucket = bucketp.ToString();
    170   if (bucket->empty() || *bucket == ".") {
    171     return errors::InvalidArgument("GCS path doesn't contain a bucket name: ",
    172                                    fname);
    173   }
    174   objectp.Consume("/");
    175   *object = objectp.ToString();
    176   if (!empty_object_ok && object->empty()) {
    177     return errors::InvalidArgument("GCS path doesn't contain an object name: ",
    178                                    fname);
    179   }
    180   return Status::OK();
    181 }
    182 
    183 /// Appends a trailing slash if the name doesn't already have one.
    184 string MaybeAppendSlash(const string& name) {
    185   if (name.empty()) {
    186     return "/";
    187   }
    188   if (name.back() != '/') {
    189     return strings::StrCat(name, "/");
    190   }
    191   return name;
    192 }
    193 
    194 // io::JoinPath() doesn't work in cases when we want an empty subpath
    195 // to result in an appended slash in order for directory markers
    196 // to be processed correctly: "gs://a/b" + "" should give "gs://a/b/".
    197 string JoinGcsPath(const string& path, const string& subpath) {
    198   return strings::StrCat(MaybeAppendSlash(path), subpath);
    199 }
    200 
    201 /// \brief Returns the given paths appending all their subfolders.
    202 ///
    203 /// For every path X in the list, every subfolder in X is added to the
    204 /// resulting list.
    205 /// For example:
    206 ///  - for 'a/b/c/d' it will append 'a', 'a/b' and 'a/b/c'
    207 ///  - for 'a/b/c/' it will append 'a', 'a/b' and 'a/b/c'
    208 std::set<string> AddAllSubpaths(const std::vector<string>& paths) {
    209   std::set<string> result;
    210   result.insert(paths.begin(), paths.end());
    211   for (const string& path : paths) {
    212     StringPiece subpath = io::Dirname(path);
    213     while (!subpath.empty()) {
    214       result.emplace(subpath.ToString());
    215       subpath = io::Dirname(subpath);
    216     }
    217   }
    218   return result;
    219 }
    220 
    221 Status ParseJson(StringPiece json, Json::Value* result) {
    222   Json::Reader reader;
    223   if (!reader.parse(json.data(), json.data() + json.size(), *result)) {
    224     return errors::Internal("Couldn't parse JSON response from GCS.");
    225   }
    226   return Status::OK();
    227 }
    228 
    229 Status ParseJson(const std::vector<char>& json, Json::Value* result) {
    230   return ParseJson(StringPiece{json.data(), json.size()}, result);
    231 }
    232 
    233 /// Reads a JSON value with the given name from a parent JSON value.
    234 Status GetValue(const Json::Value& parent, const char* name,
    235                 Json::Value* result) {
    236   *result = parent.get(name, Json::Value::null);
    237   if (result->isNull()) {
    238     return errors::Internal("The field '", name,
    239                             "' was expected in the JSON response.");
    240   }
    241   return Status::OK();
    242 }
    243 
    244 /// Reads a string JSON value with the given name from a parent JSON value.
    245 Status GetStringValue(const Json::Value& parent, const char* name,
    246                       string* result) {
    247   Json::Value result_value;
    248   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
    249   if (!result_value.isString()) {
    250     return errors::Internal(
    251         "The field '", name,
    252         "' in the JSON response was expected to be a string.");
    253   }
    254   *result = result_value.asString();
    255   return Status::OK();
    256 }
    257 
    258 /// Reads a long JSON value with the given name from a parent JSON value.
    259 Status GetInt64Value(const Json::Value& parent, const char* name,
    260                      int64* result) {
    261   Json::Value result_value;
    262   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
    263   if (result_value.isNumeric()) {
    264     *result = result_value.asInt64();
    265     return Status::OK();
    266   }
    267   if (result_value.isString() &&
    268       strings::safe_strto64(result_value.asCString(), result)) {
    269     return Status::OK();
    270   }
    271   return errors::Internal(
    272       "The field '", name,
    273       "' in the JSON response was expected to be a number.");
    274 }
    275 
    276 /// Reads a boolean JSON value with the given name from a parent JSON value.
    277 Status GetBoolValue(const Json::Value& parent, const char* name, bool* result) {
    278   Json::Value result_value;
    279   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
    280   if (!result_value.isBool()) {
    281     return errors::Internal(
    282         "The field '", name,
    283         "' in the JSON response was expected to be a boolean.");
    284   }
    285   *result = result_value.asBool();
    286   return Status::OK();
    287 }
    288 
    289 /// A GCS-based implementation of a random access file with an LRU block cache.
    290 class GcsRandomAccessFile : public RandomAccessFile {
    291  public:
    292   GcsRandomAccessFile(const string& filename, FileBlockCache* file_block_cache)
    293       : filename_(filename), file_block_cache_(file_block_cache) {}
    294 
    295   /// The implementation of reads with an LRU block cache. Thread safe.
    296   Status Read(uint64 offset, size_t n, StringPiece* result,
    297               char* scratch) const override {
    298     *result = StringPiece();
    299     size_t bytes_transferred;
    300     TF_RETURN_IF_ERROR(file_block_cache_->Read(filename_, offset, n, scratch,
    301                                                &bytes_transferred));
    302     *result = StringPiece(scratch, bytes_transferred);
    303     if (bytes_transferred < n) {
    304       // This is not an error per se. The RandomAccessFile interface expects
    305       // that Read returns OutOfRange if fewer bytes were read than requested.
    306       return errors::OutOfRange("EOF reached, ", result->size(),
    307                                 " bytes were read out of ", n,
    308                                 " bytes requested.");
    309     }
    310     return Status::OK();
    311   }
    312 
    313  private:
    314   /// The filename of this file.
    315   const string filename_;
    316   /// The LRU block cache for this file.
    317   mutable FileBlockCache* file_block_cache_;  // not owned
    318 };
    319 
    320 /// \brief GCS-based implementation of a writeable file.
    321 ///
    322 /// Since GCS objects are immutable, this implementation writes to a local
    323 /// tmp file and copies it to GCS on flush/close.
    324 class GcsWritableFile : public WritableFile {
    325  public:
    326   GcsWritableFile(const string& bucket, const string& object,
    327                   GcsFileSystem* filesystem,
    328                   GcsFileSystem::TimeoutConfig* timeouts,
    329                   std::function<void()> file_cache_erase,
    330                   int64 initial_retry_delay_usec)
    331       : bucket_(bucket),
    332         object_(object),
    333         filesystem_(filesystem),
    334         timeouts_(timeouts),
    335         file_cache_erase_(std::move(file_cache_erase)),
    336         sync_needed_(true),
    337         initial_retry_delay_usec_(initial_retry_delay_usec) {
    338     // TODO: to make it safer, outfile_ should be constructed from an FD
    339     if (GetTmpFilename(&tmp_content_filename_).ok()) {
    340       outfile_.open(tmp_content_filename_,
    341                     std::ofstream::binary | std::ofstream::app);
    342     }
    343   }
    344 
    345   /// \brief Constructs the writable file in append mode.
    346   ///
    347   /// tmp_content_filename should contain a path of an existing temporary file
    348   /// with the content to be appended. The class takes onwnership of the
    349   /// specified tmp file and deletes it on close.
    350   GcsWritableFile(const string& bucket, const string& object,
    351                   GcsFileSystem* filesystem, const string& tmp_content_filename,
    352                   GcsFileSystem::TimeoutConfig* timeouts,
    353                   std::function<void()> file_cache_erase,
    354                   int64 initial_retry_delay_usec)
    355       : bucket_(bucket),
    356         object_(object),
    357         filesystem_(filesystem),
    358         timeouts_(timeouts),
    359         file_cache_erase_(std::move(file_cache_erase)),
    360         sync_needed_(true),
    361         initial_retry_delay_usec_(initial_retry_delay_usec) {
    362     tmp_content_filename_ = tmp_content_filename;
    363     outfile_.open(tmp_content_filename_,
    364                   std::ofstream::binary | std::ofstream::app);
    365   }
    366 
    367   ~GcsWritableFile() override { Close().IgnoreError(); }
    368 
    369   Status Append(const StringPiece& data) override {
    370     TF_RETURN_IF_ERROR(CheckWritable());
    371     sync_needed_ = true;
    372     outfile_ << data;
    373     if (!outfile_.good()) {
    374       return errors::Internal(
    375           "Could not append to the internal temporary file.");
    376     }
    377     return Status::OK();
    378   }
    379 
    380   Status Close() override {
    381     if (outfile_.is_open()) {
    382       TF_RETURN_IF_ERROR(Sync());
    383       outfile_.close();
    384       std::remove(tmp_content_filename_.c_str());
    385     }
    386     return Status::OK();
    387   }
    388 
    389   Status Flush() override { return Sync(); }
    390 
    391   Status Sync() override {
    392     TF_RETURN_IF_ERROR(CheckWritable());
    393     if (!sync_needed_) {
    394       return Status::OK();
    395     }
    396     Status status = SyncImpl();
    397     if (status.ok()) {
    398       sync_needed_ = false;
    399     }
    400     return status;
    401   }
    402 
    403  private:
    404   /// Copies the current version of the file to GCS.
    405   ///
    406   /// This SyncImpl() uploads the object to GCS.
    407   /// In case of a failure, it resumes failed uploads as recommended by the GCS
    408   /// resumable API documentation. When the whole upload needs to be
    409   /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem.
    410   Status SyncImpl() {
    411     outfile_.flush();
    412     if (!outfile_.good()) {
    413       return errors::Internal(
    414           "Could not write to the internal temporary file.");
    415     }
    416     string session_uri;
    417     TF_RETURN_IF_ERROR(CreateNewUploadSession(&session_uri));
    418     uint64 already_uploaded = 0;
    419     bool first_attempt = true;
    420     const Status upload_status = RetryingUtils::CallWithRetries(
    421         [&first_attempt, &already_uploaded, &session_uri, this]() {
    422           if (!first_attempt) {
    423             bool completed;
    424             TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
    425                 session_uri, &completed, &already_uploaded));
    426             if (completed) {
    427               // Erase the file from the file cache on every successful write.
    428               file_cache_erase_();
    429               // It's unclear why UploadToSession didn't return OK in the
    430               // previous attempt, but GCS reports that the file is fully
    431               // uploaded, so succeed.
    432               return Status::OK();
    433             }
    434           }
    435           first_attempt = false;
    436           return UploadToSession(session_uri, already_uploaded);
    437         },
    438         initial_retry_delay_usec_);
    439     if (upload_status.code() == errors::Code::NOT_FOUND) {
    440       // GCS docs recommend retrying the whole upload. We're relying on the
    441       // RetryingFileSystem to retry the Sync() call.
    442       return errors::Unavailable(
    443           strings::StrCat("Upload to gs://", bucket_, "/", object_,
    444                           " failed, caused by: ", upload_status.ToString()));
    445     }
    446     return upload_status;
    447   }
    448 
    449   Status CheckWritable() const {
    450     if (!outfile_.is_open()) {
    451       return errors::FailedPrecondition(
    452           "The internal temporary file is not writable.");
    453     }
    454     return Status::OK();
    455   }
    456 
    457   Status GetCurrentFileSize(uint64* size) {
    458     if (size == nullptr) {
    459       return errors::Internal("'size' cannot be nullptr");
    460     }
    461     const auto tellp = outfile_.tellp();
    462     if (tellp == static_cast<std::streampos>(-1)) {
    463       return errors::Internal(
    464           "Could not get the size of the internal temporary file.");
    465     }
    466     *size = tellp;
    467     return Status::OK();
    468   }
    469 
    470   /// Initiates a new resumable upload session.
    471   Status CreateNewUploadSession(string* session_uri) {
    472     if (session_uri == nullptr) {
    473       return errors::Internal("'session_uri' cannot be nullptr.");
    474     }
    475     uint64 file_size;
    476     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
    477 
    478     std::vector<char> output_buffer;
    479     std::unique_ptr<HttpRequest> request;
    480     TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
    481 
    482     request->SetUri(strings::StrCat(
    483         kGcsUploadUriBase, "b/", bucket_,
    484         "/o?uploadType=resumable&name=", request->EscapeString(object_)));
    485     request->AddHeader("X-Upload-Content-Length", std::to_string(file_size));
    486     request->SetPostEmptyBody();
    487     request->SetResultBuffer(&output_buffer);
    488     request->SetTimeouts(timeouts_->connect, timeouts_->idle,
    489                          timeouts_->metadata);
    490     TF_RETURN_WITH_CONTEXT_IF_ERROR(
    491         request->Send(), " when initiating an upload to ", GetGcsPath());
    492     *session_uri = request->GetResponseHeader("Location");
    493     if (session_uri->empty()) {
    494       return errors::Internal("Unexpected response from GCS when writing to ",
    495                               GetGcsPath(),
    496                               ": 'Location' header not returned.");
    497     }
    498     return Status::OK();
    499   }
    500 
    501   /// \brief Requests status of a previously initiated upload session.
    502   ///
    503   /// If the upload has already succeeded, sets 'completed' to true.
    504   /// Otherwise sets 'completed' to false and 'uploaded' to the currently
    505   /// uploaded size in bytes.
    506   Status RequestUploadSessionStatus(const string& session_uri, bool* completed,
    507                                     uint64* uploaded) {
    508     if (completed == nullptr || uploaded == nullptr) {
    509       return errors::Internal("'completed' and 'uploaded' cannot be nullptr.");
    510     }
    511     uint64 file_size;
    512     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
    513 
    514     std::unique_ptr<HttpRequest> request;
    515     TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
    516     request->SetUri(session_uri);
    517     request->SetTimeouts(timeouts_->connect, timeouts_->idle,
    518                          timeouts_->metadata);
    519     request->AddHeader("Content-Range", strings::StrCat("bytes */", file_size));
    520     request->SetPutEmptyBody();
    521     const Status& status = request->Send();
    522     if (status.ok()) {
    523       *completed = true;
    524       return Status::OK();
    525     }
    526     *completed = false;
    527     if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) {
    528       TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ",
    529                                       GetGcsPath());
    530     }
    531     const string& received_range = request->GetResponseHeader("Range");
    532     if (received_range.empty()) {
    533       // This means GCS doesn't have any bytes of the file yet.
    534       *uploaded = 0;
    535     } else {
    536       StringPiece range_piece(received_range);
    537       range_piece.Consume("bytes=");  // May or may not be present.
    538       std::vector<int64> range_parts;
    539       if (!str_util::SplitAndParseAsInts(range_piece, '-', &range_parts) ||
    540           range_parts.size() != 2) {
    541         return errors::Internal("Unexpected response from GCS when writing ",
    542                                 GetGcsPath(), ": Range header '",
    543                                 received_range, "' could not be parsed.");
    544       }
    545       if (range_parts[0] != 0) {
    546         return errors::Internal("Unexpected response from GCS when writing to ",
    547                                 GetGcsPath(), ": the returned range '",
    548                                 received_range, "' does not start at zero.");
    549       }
    550       // If GCS returned "Range: 0-10", this means 11 bytes were uploaded.
    551       *uploaded = range_parts[1] + 1;
    552     }
    553     return Status::OK();
    554   }
    555 
    556   Status UploadToSession(const string& session_uri, uint64 start_offset) {
    557     uint64 file_size;
    558     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
    559 
    560     std::unique_ptr<HttpRequest> request;
    561     TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
    562     request->SetUri(session_uri);
    563     if (file_size > 0) {
    564       request->AddHeader("Content-Range",
    565                          strings::StrCat("bytes ", start_offset, "-",
    566                                          file_size - 1, "/", file_size));
    567     }
    568     request->SetTimeouts(timeouts_->connect, timeouts_->idle, timeouts_->write);
    569 
    570     TF_RETURN_IF_ERROR(
    571         request->SetPutFromFile(tmp_content_filename_, start_offset));
    572     TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
    573                                     GetGcsPath());
    574     // Erase the file from the file cache on every successful write.
    575     file_cache_erase_();
    576     return Status::OK();
    577   }
    578 
    579   string GetGcsPath() const {
    580     return strings::StrCat("gs://", bucket_, "/", object_);
    581   }
    582 
    583   string bucket_;
    584   string object_;
    585   GcsFileSystem* const filesystem_;  // Not owned.
    586   string tmp_content_filename_;
    587   std::ofstream outfile_;
    588   GcsFileSystem::TimeoutConfig* timeouts_;
    589   std::function<void()> file_cache_erase_;
    590   bool sync_needed_;  // whether there is buffered data that needs to be synced
    591   int64 initial_retry_delay_usec_;
    592 };
    593 
    594 class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
    595  public:
    596   GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length)
    597       : data_(std::move(data)), length_(length) {}
    598   const void* data() override { return reinterpret_cast<void*>(data_.get()); }
    599   uint64 length() override { return length_; }
    600 
    601  private:
    602   std::unique_ptr<char[]> data_;
    603   uint64 length_;
    604 };
    605 
    606 // Helper function to extract an environment variable and convert it into a
    607 // value of type T.
    608 template <typename T>
    609 bool GetEnvVar(const char* varname, bool (*convert)(StringPiece, T*),
    610                T* value) {
    611   const char* env_value = std::getenv(varname);
    612   if (!env_value) {
    613     return false;
    614   }
    615   return convert(env_value, value);
    616 }
    617 
    618 bool StringPieceIdentity(StringPiece str, StringPiece* value) {
    619   *value = str;
    620   return true;
    621 }
    622 
    623 }  // namespace
    624 
    625 GcsFileSystem::GcsFileSystem()
    626     : auth_provider_(new GoogleAuthProvider()),
    627       http_request_factory_(new CurlHttpRequest::Factory()) {
    628   uint64 value;
    629   size_t block_size = kDefaultBlockSize;
    630   size_t max_bytes = kDefaultMaxCacheSize;
    631   uint64 max_staleness = kDefaultMaxStaleness;
    632   // Apply the sys env override for the readahead buffer size if it's provided.
    633   if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &value)) {
    634     block_size = value;
    635   }
    636   // Apply the overrides for the block size (MB), max bytes (MB), and max
    637   // staleness (seconds) if provided.
    638   if (GetEnvVar(kBlockSize, strings::safe_strtou64, &value)) {
    639     block_size = value * 1024 * 1024;
    640   }
    641   if (GetEnvVar(kMaxCacheSize, strings::safe_strtou64, &value)) {
    642     max_bytes = value * 1024 * 1024;
    643   }
    644   if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &value)) {
    645     max_staleness = value;
    646   }
    647   file_block_cache_ = MakeFileBlockCache(block_size, max_bytes, max_staleness);
    648   // Apply overrides for the stat cache max age and max entries, if provided.
    649   uint64 stat_cache_max_age = kStatCacheDefaultMaxAge;
    650   size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries;
    651   if (GetEnvVar(kStatCacheMaxAge, strings::safe_strtou64, &value)) {
    652     stat_cache_max_age = value;
    653   }
    654   if (GetEnvVar(kStatCacheMaxEntries, strings::safe_strtou64, &value)) {
    655     stat_cache_max_entries = value;
    656   }
    657   stat_cache_.reset(new ExpiringLRUCache<FileStatistics>(
    658       stat_cache_max_age, stat_cache_max_entries));
    659   // Apply overrides for the matching paths cache max age and max entries, if
    660   // provided.
    661   uint64 matching_paths_cache_max_age = kMatchingPathsCacheDefaultMaxAge;
    662   size_t matching_paths_cache_max_entries =
    663       kMatchingPathsCacheDefaultMaxEntries;
    664   if (GetEnvVar(kMatchingPathsCacheMaxAge, strings::safe_strtou64, &value)) {
    665     matching_paths_cache_max_age = value;
    666   }
    667   if (GetEnvVar(kMatchingPathsCacheMaxEntries, strings::safe_strtou64,
    668                 &value)) {
    669     matching_paths_cache_max_entries = value;
    670   }
    671   matching_paths_cache_.reset(new ExpiringLRUCache<std::vector<string>>(
    672       matching_paths_cache_max_age, matching_paths_cache_max_entries));
    673 
    674   int64 resolve_frequency_secs;
    675   if (GetEnvVar(kResolveCacheSecs, strings::safe_strto64,
    676                 &resolve_frequency_secs)) {
    677     dns_cache_.reset(new GcsDnsCache(resolve_frequency_secs));
    678     VLOG(1) << "GCS DNS cache is enabled.  " << kResolveCacheSecs << " = "
    679             << resolve_frequency_secs;
    680   } else {
    681     VLOG(1) << "GCS DNS cache is disabled, because " << kResolveCacheSecs
    682             << " = 0 (or is not set)";
    683   }
    684 
    685   // Get the additional header
    686   StringPiece add_header_contents;
    687   if (GetEnvVar(kAdditionalRequestHeader, StringPieceIdentity,
    688                 &add_header_contents)) {
    689     size_t split = add_header_contents.find(':', 0);
    690 
    691     if (split != StringPiece::npos) {
    692       StringPiece header_name = add_header_contents.substr(0, split);
    693       StringPiece header_value = add_header_contents.substr(split + 1);
    694 
    695       if (!header_name.empty() && !header_value.empty()) {
    696         additional_header_.reset(new std::pair<const string, const string>(
    697             header_name.ToString(), header_value.ToString()));
    698 
    699         VLOG(1) << "GCS additional header ENABLED. "
    700                 << "Name: " << additional_header_->first << ", "
    701                 << "Value: " << additional_header_->second;
    702       } else {
    703         LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
    704                    << add_header_contents;
    705       }
    706     } else {
    707       LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
    708                  << add_header_contents;
    709     }
    710   } else {
    711     VLOG(1) << "GCS additional header DISABLED. No environment variable set.";
    712   }
    713 
    714   // Apply the overrides for request timeouts
    715   uint32 timeout_value;
    716   if (GetEnvVar(kRequestConnectionTimeout, strings::safe_strtou32,
    717                 &timeout_value)) {
    718     timeouts_.connect = timeout_value;
    719   }
    720   if (GetEnvVar(kRequestIdleTimeout, strings::safe_strtou32, &timeout_value)) {
    721     timeouts_.idle = timeout_value;
    722   }
    723   if (GetEnvVar(kMetadataRequestTimeout, strings::safe_strtou32,
    724                 &timeout_value)) {
    725     timeouts_.metadata = timeout_value;
    726   }
    727   if (GetEnvVar(kReadRequestTimeout, strings::safe_strtou32, &timeout_value)) {
    728     timeouts_.read = timeout_value;
    729   }
    730   if (GetEnvVar(kWriteRequestTimeout, strings::safe_strtou32, &timeout_value)) {
    731     timeouts_.write = timeout_value;
    732   }
    733 
    734   int64 token_value;
    735   if (GetEnvVar(kThrottleRate, strings::safe_strto64, &token_value)) {
    736     GcsThrottleConfig config;
    737     config.enabled = true;
    738     config.token_rate = token_value;
    739 
    740     if (GetEnvVar(kThrottleBucket, strings::safe_strto64, &token_value)) {
    741       config.bucket_size = token_value;
    742     }
    743 
    744     if (GetEnvVar(kTokensPerRequest, strings::safe_strto64, &token_value)) {
    745       config.tokens_per_request = token_value;
    746     }
    747 
    748     if (GetEnvVar(kInitialTokens, strings::safe_strto64, &token_value)) {
    749       config.initial_tokens = token_value;
    750     }
    751     throttle_.SetConfig(config);
    752   }
    753 }
    754 
    755 GcsFileSystem::GcsFileSystem(
    756     std::unique_ptr<AuthProvider> auth_provider,
    757     std::unique_ptr<HttpRequest::Factory> http_request_factory,
    758     size_t block_size, size_t max_bytes, uint64 max_staleness,
    759     uint64 stat_cache_max_age, size_t stat_cache_max_entries,
    760     uint64 matching_paths_cache_max_age,
    761     size_t matching_paths_cache_max_entries, int64 initial_retry_delay_usec,
    762     TimeoutConfig timeouts,
    763     std::pair<const string, const string>* additional_header)
    764     : auth_provider_(std::move(auth_provider)),
    765       http_request_factory_(std::move(http_request_factory)),
    766       file_block_cache_(
    767           MakeFileBlockCache(block_size, max_bytes, max_staleness)),
    768       stat_cache_(new StatCache(stat_cache_max_age, stat_cache_max_entries)),
    769       matching_paths_cache_(new MatchingPathsCache(
    770           matching_paths_cache_max_age, matching_paths_cache_max_entries)),
    771       timeouts_(timeouts),
    772       initial_retry_delay_usec_(initial_retry_delay_usec),
    773       additional_header_(additional_header) {}
    774 
    775 Status GcsFileSystem::NewRandomAccessFile(
    776     const string& fname, std::unique_ptr<RandomAccessFile>* result) {
    777   string bucket, object;
    778   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
    779   result->reset(new GcsRandomAccessFile(fname, file_block_cache_.get()));
    780   return Status::OK();
    781 }
    782 
    783 // A helper function to build a FileBlockCache for GcsFileSystem.
    784 std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache(
    785     size_t block_size, size_t max_bytes, uint64 max_staleness) {
    786   std::unique_ptr<FileBlockCache> file_block_cache(
    787       new FileBlockCache(block_size, max_bytes, max_staleness,
    788                          [this](const string& filename, size_t offset, size_t n,
    789                                 char* buffer, size_t* bytes_transferred) {
    790                            return LoadBufferFromGCS(filename, offset, n, buffer,
    791                                                     bytes_transferred);
    792                          }));
    793   return file_block_cache;
    794 }
    795 
    796 // A helper function to actually read the data from GCS.
    797 Status GcsFileSystem::LoadBufferFromGCS(const string& filename, size_t offset,
    798                                         size_t n, char* buffer,
    799                                         size_t* bytes_transferred) {
    800   *bytes_transferred = 0;
    801 
    802   string bucket, object;
    803   TF_RETURN_IF_ERROR(ParseGcsPath(filename, false, &bucket, &object));
    804 
    805   std::unique_ptr<HttpRequest> request;
    806   TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
    807                                   "when reading gs://", bucket, "/", object);
    808 
    809   request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket, "/",
    810                                   request->EscapeString(object)));
    811   request->SetRange(offset, offset + n - 1);
    812   request->SetResultBufferDirect(buffer, n);
    813   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.read);
    814 
    815   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
    816                                   bucket, "/", object);
    817 
    818   size_t bytes_read = request->GetResultBufferDirectBytesTransferred();
    819   *bytes_transferred = bytes_read;
    820   VLOG(1) << "Successful read of gs://" << bucket << "/" << object << " @ "
    821           << offset << " of size: " << bytes_read;
    822 
    823   throttle_.RecordResponse(bytes_read);
    824 
    825   if (bytes_read < block_size()) {
    826     // Check stat cache to see if we encountered an interrupted read.
    827     FileStatistics stat;
    828     if (stat_cache_->Lookup(filename, &stat)) {
    829       if (offset + bytes_read < stat.length) {
    830         return errors::Internal(strings::Printf(
    831             "File contents are inconsistent for file: %s @ %lu.",
    832             filename.c_str(), offset));
    833       }
    834       VLOG(2) << "Successful integrity check for: gs://" << bucket << "/"
    835               << object << " @ " << offset;
    836     }
    837   }
    838 
    839   return Status::OK();
    840 }
    841 
    842 Status GcsFileSystem::NewWritableFile(const string& fname,
    843                                       std::unique_ptr<WritableFile>* result) {
    844   string bucket, object;
    845   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
    846   result->reset(new GcsWritableFile(
    847       bucket, object, this, &timeouts_,
    848       [this, fname]() { file_block_cache_->RemoveFile(fname); },
    849       initial_retry_delay_usec_));
    850   return Status::OK();
    851 }
    852 
    853 // Reads the file from GCS in chunks and stores it in a tmp file,
    854 // which is then passed to GcsWritableFile.
    855 Status GcsFileSystem::NewAppendableFile(const string& fname,
    856                                         std::unique_ptr<WritableFile>* result) {
    857   std::unique_ptr<RandomAccessFile> reader;
    858   TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader));
    859   std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]);
    860   Status status;
    861   uint64 offset = 0;
    862   StringPiece read_chunk;
    863 
    864   // Read the file from GCS in chunks and save it to a tmp file.
    865   string old_content_filename;
    866   TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename));
    867   std::ofstream old_content(old_content_filename, std::ofstream::binary);
    868   while (true) {
    869     status = reader->Read(offset, kReadAppendableFileBufferSize, &read_chunk,
    870                           buffer.get());
    871     if (status.ok()) {
    872       old_content << read_chunk;
    873       offset += kReadAppendableFileBufferSize;
    874     } else if (status.code() == error::OUT_OF_RANGE) {
    875       // Expected, this means we reached EOF.
    876       old_content << read_chunk;
    877       break;
    878     } else {
    879       return status;
    880     }
    881   }
    882   old_content.close();
    883 
    884   // Create a writable file and pass the old content to it.
    885   string bucket, object;
    886   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
    887   result->reset(new GcsWritableFile(
    888       bucket, object, this, old_content_filename, &timeouts_,
    889       [this, fname]() { file_block_cache_->RemoveFile(fname); },
    890       initial_retry_delay_usec_));
    891   return Status::OK();
    892 }
    893 
    894 Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
    895     const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
    896   uint64 size;
    897   TF_RETURN_IF_ERROR(GetFileSize(fname, &size));
    898   std::unique_ptr<char[]> data(new char[size]);
    899 
    900   std::unique_ptr<RandomAccessFile> file;
    901   TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file));
    902 
    903   StringPiece piece;
    904   TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
    905 
    906   result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size));
    907   return Status::OK();
    908 }
    909 
    910 Status GcsFileSystem::FileExists(const string& fname) {
    911   string bucket, object;
    912   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
    913   if (object.empty()) {
    914     bool result;
    915     TF_RETURN_IF_ERROR(BucketExists(bucket, &result));
    916     if (result) {
    917       return Status::OK();
    918     }
    919   }
    920   bool result;
    921   TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &result));
    922   if (result) {
    923     return Status::OK();
    924   }
    925   TF_RETURN_IF_ERROR(FolderExists(fname, &result));
    926   if (result) {
    927     return Status::OK();
    928   }
    929   return errors::NotFound("The specified path ", fname, " was not found.");
    930 }
    931 
    932 Status GcsFileSystem::ObjectExists(const string& fname, const string& bucket,
    933                                    const string& object, bool* result) {
    934   if (!result) {
    935     return errors::Internal("'result' cannot be nullptr.");
    936   }
    937   FileStatistics not_used_stat;
    938   const Status status = StatForObject(fname, bucket, object, &not_used_stat);
    939   switch (status.code()) {
    940     case errors::Code::OK:
    941       *result = true;
    942       return Status::OK();
    943     case errors::Code::NOT_FOUND:
    944       *result = false;
    945       return Status::OK();
    946     default:
    947       return status;
    948   }
    949 }
    950 
    951 Status GcsFileSystem::StatForObject(const string& fname, const string& bucket,
    952                                     const string& object,
    953                                     FileStatistics* stat) {
    954   if (!stat) {
    955     return errors::Internal("'stat' cannot be nullptr.");
    956   }
    957   if (object.empty()) {
    958     return errors::InvalidArgument(strings::Printf(
    959         "'object' must be a non-empty string. (File: %s)", fname.c_str()));
    960   }
    961 
    962   StatCache::ComputeFunc compute_func = [this, &bucket, &object](
    963                                             const string& fname,
    964                                             FileStatistics* stat) {
    965     std::vector<char> output_buffer;
    966     std::unique_ptr<HttpRequest> request;
    967     TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
    968                                     " when reading metadata of gs://", bucket,
    969                                     "/", object);
    970 
    971     request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
    972                                     request->EscapeString(object),
    973                                     "?fields=size%2Cupdated"));
    974     request->SetResultBuffer(&output_buffer);
    975     request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
    976 
    977     TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
    978                                     " when reading metadata of gs://", bucket,
    979                                     "/", object);
    980 
    981     Json::Value root;
    982     TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
    983 
    984     // Parse file size.
    985     TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &stat->length));
    986 
    987     // Parse file modification time.
    988     string updated;
    989     TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated));
    990     TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->mtime_nsec)));
    991 
    992     VLOG(1) << "Stat of: gs://" << bucket << "/" << object << " -- "
    993             << " length: " << stat->length
    994             << "; mtime_nsec: " << stat->mtime_nsec << "; updated: " << updated;
    995 
    996     stat->is_directory = false;
    997     return Status::OK();
    998   };
    999 
   1000   TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(fname, stat, compute_func));
   1001   if (stat->is_directory) {
   1002     return errors::NotFound(fname, " is a directory.");
   1003   } else {
   1004     return Status::OK();
   1005   }
   1006 }
   1007 
   1008 Status GcsFileSystem::BucketExists(const string& bucket, bool* result) {
   1009   if (!result) {
   1010     return errors::Internal("'result' cannot be nullptr.");
   1011   }
   1012 
   1013   std::unique_ptr<HttpRequest> request;
   1014   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
   1015   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket));
   1016   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
   1017   const Status status = request->Send();
   1018   switch (status.code()) {
   1019     case errors::Code::OK:
   1020       *result = true;
   1021       return Status::OK();
   1022     case errors::Code::NOT_FOUND:
   1023       *result = false;
   1024       return Status::OK();
   1025     default:
   1026       return status;
   1027   }
   1028 }
   1029 
   1030 Status GcsFileSystem::FolderExists(const string& dirname, bool* result) {
   1031   if (!result) {
   1032     return errors::Internal("'result' cannot be nullptr.");
   1033   }
   1034   StatCache::ComputeFunc compute_func = [this](const string& dirname,
   1035                                                FileStatistics* stat) {
   1036     std::vector<string> children;
   1037     TF_RETURN_IF_ERROR(
   1038         GetChildrenBounded(dirname, 1, &children, true /* recursively */,
   1039                            true /* include_self_directory_marker */));
   1040     if (!children.empty()) {
   1041       *stat = DIRECTORY_STAT;
   1042       return Status::OK();
   1043     } else {
   1044       return errors::InvalidArgument("Not a directory!");
   1045     }
   1046   };
   1047   FileStatistics stat;
   1048   Status s = stat_cache_->LookupOrCompute(dirname, &stat, compute_func);
   1049   if (s.ok()) {
   1050     *result = stat.is_directory;
   1051     return Status::OK();
   1052   }
   1053   if (errors::IsInvalidArgument(s)) {
   1054     *result = false;
   1055     return Status::OK();
   1056   }
   1057   return s;
   1058 }
   1059 
   1060 Status GcsFileSystem::GetChildren(const string& dirname,
   1061                                   std::vector<string>* result) {
   1062   return GetChildrenBounded(dirname, UINT64_MAX, result,
   1063                             false /* recursively */,
   1064                             false /* include_self_directory_marker */);
   1065 }
   1066 
   1067 Status GcsFileSystem::GetMatchingPaths(const string& pattern,
   1068                                        std::vector<string>* results) {
   1069   MatchingPathsCache::ComputeFunc compute_func =
   1070       [this](const string& pattern, std::vector<string>* results) {
   1071         results->clear();
   1072         // Find the fixed prefix by looking for the first wildcard.
   1073         const string& fixed_prefix =
   1074             pattern.substr(0, pattern.find_first_of("*?[\\"));
   1075         const string& dir = io::Dirname(fixed_prefix).ToString();
   1076         if (dir.empty()) {
   1077           return errors::InvalidArgument(
   1078               "A GCS pattern doesn't have a bucket name: ", pattern);
   1079         }
   1080         std::vector<string> all_files;
   1081         TF_RETURN_IF_ERROR(GetChildrenBounded(
   1082             dir, UINT64_MAX, &all_files, true /* recursively */,
   1083             false /* include_self_directory_marker */));
   1084 
   1085         const auto& files_and_folders = AddAllSubpaths(all_files);
   1086 
   1087         // Match all obtained paths to the input pattern.
   1088         for (const auto& path : files_and_folders) {
   1089           const string& full_path = io::JoinPath(dir, path);
   1090           if (Env::Default()->MatchPath(full_path, pattern)) {
   1091             results->push_back(full_path);
   1092           }
   1093         }
   1094         return Status::OK();
   1095       };
   1096   TF_RETURN_IF_ERROR(
   1097       matching_paths_cache_->LookupOrCompute(pattern, results, compute_func));
   1098   return Status::OK();
   1099 }
   1100 
   1101 Status GcsFileSystem::GetChildrenBounded(const string& dirname,
   1102                                          uint64 max_results,
   1103                                          std::vector<string>* result,
   1104                                          bool recursive,
   1105                                          bool include_self_directory_marker) {
   1106   if (!result) {
   1107     return errors::InvalidArgument("'result' cannot be null");
   1108   }
   1109   string bucket, object_prefix;
   1110   TF_RETURN_IF_ERROR(
   1111       ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix));
   1112 
   1113   string nextPageToken;
   1114   uint64 retrieved_results = 0;
   1115   while (true) {  // A loop over multiple result pages.
   1116     std::vector<char> output_buffer;
   1117     std::unique_ptr<HttpRequest> request;
   1118     TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
   1119     auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o");
   1120     if (recursive) {
   1121       uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken");
   1122     } else {
   1123       // Set "/" as a delimiter to ask GCS to treat subfolders as children
   1124       // and return them in "prefixes".
   1125       uri = strings::StrCat(uri,
   1126                             "?fields=items%2Fname%2Cprefixes%2CnextPageToken");
   1127       uri = strings::StrCat(uri, "&delimiter=%2F");
   1128     }
   1129     if (!object_prefix.empty()) {
   1130       uri = strings::StrCat(uri,
   1131                             "&prefix=", request->EscapeString(object_prefix));
   1132     }
   1133     if (!nextPageToken.empty()) {
   1134       uri = strings::StrCat(
   1135           uri, "&pageToken=", request->EscapeString(nextPageToken));
   1136     }
   1137     if (max_results - retrieved_results < kGetChildrenDefaultPageSize) {
   1138       uri =
   1139           strings::StrCat(uri, "&maxResults=", max_results - retrieved_results);
   1140     }
   1141     request->SetUri(uri);
   1142     request->SetResultBuffer(&output_buffer);
   1143     request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
   1144 
   1145     TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname);
   1146     Json::Value root;
   1147     TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
   1148     const auto items = root.get("items", Json::Value::null);
   1149     if (!items.isNull()) {
   1150       if (!items.isArray()) {
   1151         return errors::Internal(
   1152             "Expected an array 'items' in the GCS response.");
   1153       }
   1154       for (size_t i = 0; i < items.size(); i++) {
   1155         const auto item = items.get(i, Json::Value::null);
   1156         if (!item.isObject()) {
   1157           return errors::Internal(
   1158               "Unexpected JSON format: 'items' should be a list of objects.");
   1159         }
   1160         string name;
   1161         TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name));
   1162         // The names should be relative to the 'dirname'. That means the
   1163         // 'object_prefix', which is part of 'dirname', should be removed from
   1164         // the beginning of 'name'.
   1165         StringPiece relative_path(name);
   1166         if (!relative_path.Consume(object_prefix)) {
   1167           return errors::Internal(strings::StrCat(
   1168               "Unexpected response: the returned file name ", name,
   1169               " doesn't match the prefix ", object_prefix));
   1170         }
   1171         if (!relative_path.empty() || include_self_directory_marker) {
   1172           result->emplace_back(relative_path.ToString());
   1173         }
   1174         if (++retrieved_results >= max_results) {
   1175           return Status::OK();
   1176         }
   1177       }
   1178     }
   1179     const auto prefixes = root.get("prefixes", Json::Value::null);
   1180     if (!prefixes.isNull()) {
   1181       // Subfolders are returned for the non-recursive mode.
   1182       if (!prefixes.isArray()) {
   1183         return errors::Internal(
   1184             "'prefixes' was expected to be an array in the GCS response.");
   1185       }
   1186       for (size_t i = 0; i < prefixes.size(); i++) {
   1187         const auto prefix = prefixes.get(i, Json::Value::null);
   1188         if (prefix.isNull() || !prefix.isString()) {
   1189           return errors::Internal(
   1190               "'prefixes' was expected to be an array of strings in the GCS "
   1191               "response.");
   1192         }
   1193         const string& prefix_str = prefix.asString();
   1194         StringPiece relative_path(prefix_str);
   1195         if (!relative_path.Consume(object_prefix)) {
   1196           return errors::Internal(
   1197               "Unexpected response: the returned folder name ", prefix_str,
   1198               " doesn't match the prefix ", object_prefix);
   1199         }
   1200         result->emplace_back(relative_path.ToString());
   1201         if (++retrieved_results >= max_results) {
   1202           return Status::OK();
   1203         }
   1204       }
   1205     }
   1206     const auto token = root.get("nextPageToken", Json::Value::null);
   1207     if (token.isNull()) {
   1208       return Status::OK();
   1209     }
   1210     if (!token.isString()) {
   1211       return errors::Internal(
   1212           "Unexpected response: nextPageToken is not a string");
   1213     }
   1214     nextPageToken = token.asString();
   1215   }
   1216 }
   1217 
   1218 Status GcsFileSystem::Stat(const string& fname, FileStatistics* stat) {
   1219   if (!stat) {
   1220     return errors::Internal("'stat' cannot be nullptr.");
   1221   }
   1222   string bucket, object;
   1223   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
   1224   if (object.empty()) {
   1225     bool is_bucket;
   1226     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
   1227     if (is_bucket) {
   1228       *stat = DIRECTORY_STAT;
   1229       return Status::OK();
   1230     }
   1231     return errors::NotFound("The specified bucket ", fname, " was not found.");
   1232   }
   1233 
   1234   const Status status = StatForObject(fname, bucket, object, stat);
   1235   if (status.ok()) {
   1236     return Status::OK();
   1237   }
   1238   if (status.code() != errors::Code::NOT_FOUND) {
   1239     return status;
   1240   }
   1241   bool is_folder;
   1242   TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
   1243   if (is_folder) {
   1244     *stat = DIRECTORY_STAT;
   1245     return Status::OK();
   1246   }
   1247   return errors::NotFound("The specified path ", fname, " was not found.");
   1248 }
   1249 
   1250 Status GcsFileSystem::DeleteFile(const string& fname) {
   1251   string bucket, object;
   1252   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
   1253 
   1254   std::unique_ptr<HttpRequest> request;
   1255   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
   1256   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
   1257                                   request->EscapeString(object)));
   1258   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
   1259   request->SetDeleteRequest();
   1260 
   1261   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname);
   1262   file_block_cache_->RemoveFile(fname);
   1263   return Status::OK();
   1264 }
   1265 
   1266 Status GcsFileSystem::CreateDir(const string& dirname) {
   1267   string bucket, object;
   1268   TF_RETURN_IF_ERROR(ParseGcsPath(dirname, true, &bucket, &object));
   1269   if (object.empty()) {
   1270     bool is_bucket;
   1271     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
   1272     return is_bucket ? Status::OK()
   1273                      : errors::NotFound("The specified bucket ", dirname,
   1274                                         " was not found.");
   1275   }
   1276   // Create a zero-length directory marker object.
   1277   std::unique_ptr<WritableFile> file;
   1278   TF_RETURN_IF_ERROR(NewWritableFile(MaybeAppendSlash(dirname), &file));
   1279   TF_RETURN_IF_ERROR(file->Close());
   1280   return Status::OK();
   1281 }
   1282 
   1283 // Checks that the directory is empty (i.e no objects with this prefix exist).
   1284 // Deletes the GCS directory marker if it exists.
   1285 Status GcsFileSystem::DeleteDir(const string& dirname) {
   1286   std::vector<string> children;
   1287   // A directory is considered empty either if there are no matching objects
   1288   // with the corresponding name prefix or if there is exactly one matching
   1289   // object and it is the directory marker. Therefore we need to retrieve
   1290   // at most two children for the prefix to detect if a directory is empty.
   1291   TF_RETURN_IF_ERROR(
   1292       GetChildrenBounded(dirname, 2, &children, true /* recursively */,
   1293                          true /* include_self_directory_marker */));
   1294 
   1295   if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) {
   1296     return errors::FailedPrecondition("Cannot delete a non-empty directory.");
   1297   }
   1298   if (children.size() == 1 && children[0].empty()) {
   1299     // This is the directory marker object. Delete it.
   1300     return DeleteFile(MaybeAppendSlash(dirname));
   1301   }
   1302   return Status::OK();
   1303 }
   1304 
   1305 Status GcsFileSystem::GetFileSize(const string& fname, uint64* file_size) {
   1306   if (!file_size) {
   1307     return errors::Internal("'file_size' cannot be nullptr.");
   1308   }
   1309 
   1310   // Only validate the name.
   1311   string bucket, object;
   1312   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
   1313 
   1314   FileStatistics stat;
   1315   TF_RETURN_IF_ERROR(Stat(fname, &stat));
   1316   *file_size = stat.length;
   1317   return Status::OK();
   1318 }
   1319 
   1320 Status GcsFileSystem::RenameFile(const string& src, const string& target) {
   1321   if (!IsDirectory(src).ok()) {
   1322     return RenameObject(src, target);
   1323   }
   1324   // Rename all individual objects in the directory one by one.
   1325   std::vector<string> children;
   1326   TF_RETURN_IF_ERROR(
   1327       GetChildrenBounded(src, UINT64_MAX, &children, true /* recursively */,
   1328                          true /* include_self_directory_marker */));
   1329   for (const string& subpath : children) {
   1330     TF_RETURN_IF_ERROR(
   1331         RenameObject(JoinGcsPath(src, subpath), JoinGcsPath(target, subpath)));
   1332   }
   1333   return Status::OK();
   1334 }
   1335 
   1336 // Uses a GCS API command to copy the object and then deletes the old one.
   1337 Status GcsFileSystem::RenameObject(const string& src, const string& target) {
   1338   string src_bucket, src_object, target_bucket, target_object;
   1339   TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object));
   1340   TF_RETURN_IF_ERROR(
   1341       ParseGcsPath(target, false, &target_bucket, &target_object));
   1342 
   1343   std::unique_ptr<HttpRequest> request;
   1344   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
   1345   request->SetUri(strings::StrCat(kGcsUriBase, "b/", src_bucket, "/o/",
   1346                                   request->EscapeString(src_object),
   1347                                   "/rewriteTo/b/", target_bucket, "/o/",
   1348                                   request->EscapeString(target_object)));
   1349   request->SetPostEmptyBody();
   1350   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
   1351   std::vector<char> output_buffer;
   1352   request->SetResultBuffer(&output_buffer);
   1353   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src,
   1354                                   " to ", target);
   1355   // Flush the target from the block cache.  The source will be flushed in the
   1356   // DeleteFile call below.
   1357   file_block_cache_->RemoveFile(target);
   1358   Json::Value root;
   1359   TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
   1360   bool done;
   1361   TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done));
   1362   if (!done) {
   1363     // If GCS didn't complete rewrite in one call, this means that a large file
   1364     // is being copied to a bucket with a different storage class or location,
   1365     // which requires multiple rewrite calls.
   1366     // TODO(surkov): implement multi-step rewrites.
   1367     return errors::Unimplemented(
   1368         "Couldn't rename ", src, " to ", target,
   1369         ": moving large files between buckets with different "
   1370         "locations or storage classes is not supported.");
   1371   }
   1372 
   1373   // In case the delete API call failed, but the deletion actually happened
   1374   // on the server side, we can't just retry the whole RenameFile operation
   1375   // because the source object is already gone.
   1376   return RetryingUtils::DeleteWithRetries(
   1377       std::bind(&GcsFileSystem::DeleteFile, this, src),
   1378       initial_retry_delay_usec_);
   1379 }
   1380 
   1381 Status GcsFileSystem::IsDirectory(const string& fname) {
   1382   string bucket, object;
   1383   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
   1384   if (object.empty()) {
   1385     bool is_bucket;
   1386     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
   1387     if (is_bucket) {
   1388       return Status::OK();
   1389     }
   1390     return errors::NotFound("The specified bucket gs://", bucket,
   1391                             " was not found.");
   1392   }
   1393   bool is_folder;
   1394   TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
   1395   if (is_folder) {
   1396     return Status::OK();
   1397   }
   1398   bool is_object;
   1399   TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &is_object));
   1400   if (is_object) {
   1401     return errors::FailedPrecondition("The specified path ", fname,
   1402                                       " is not a directory.");
   1403   }
   1404   return errors::NotFound("The specified path ", fname, " was not found.");
   1405 }
   1406 
   1407 Status GcsFileSystem::DeleteRecursively(const string& dirname,
   1408                                         int64* undeleted_files,
   1409                                         int64* undeleted_dirs) {
   1410   if (!undeleted_files || !undeleted_dirs) {
   1411     return errors::Internal(
   1412         "'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
   1413   }
   1414   *undeleted_files = 0;
   1415   *undeleted_dirs = 0;
   1416   if (!IsDirectory(dirname).ok()) {
   1417     *undeleted_dirs = 1;
   1418     return Status(
   1419         error::NOT_FOUND,
   1420         strings::StrCat(dirname, " doesn't exist or not a directory."));
   1421   }
   1422   std::vector<string> all_objects;
   1423   // Get all children in the directory recursively.
   1424   TF_RETURN_IF_ERROR(GetChildrenBounded(
   1425       dirname, UINT64_MAX, &all_objects, true /* recursively */,
   1426       true /* include_self_directory_marker */));
   1427   for (const string& object : all_objects) {
   1428     const string& full_path = JoinGcsPath(dirname, object);
   1429     // Delete all objects including directory markers for subfolders.
   1430     // Since DeleteRecursively returns OK if individual file deletions fail,
   1431     // and therefore RetryingFileSystem won't pay attention to the failures,
   1432     // we need to make sure these failures are properly retried.
   1433     const auto& delete_file_status = RetryingUtils::DeleteWithRetries(
   1434         std::bind(&GcsFileSystem::DeleteFile, this, full_path),
   1435         initial_retry_delay_usec_);
   1436     if (!delete_file_status.ok()) {
   1437       if (IsDirectory(full_path).ok()) {
   1438         // The object is a directory marker.
   1439         (*undeleted_dirs)++;
   1440       } else {
   1441         (*undeleted_files)++;
   1442       }
   1443     }
   1444   }
   1445   return Status::OK();
   1446 }
   1447 
   1448 // Flushes all caches for filesystem metadata and file contents. Useful for
   1449 // reclaiming memory once filesystem operations are done (e.g. model is loaded),
   1450 // or for resetting the filesystem to a consistent state.
   1451 void GcsFileSystem::FlushCaches() {
   1452   file_block_cache_->Flush();
   1453   stat_cache_->Clear();
   1454   matching_paths_cache_->Clear();
   1455 }
   1456 
   1457 // Creates an HttpRequest and sets several parameters that are common to all
   1458 // requests.  All code (in GcsFileSystem) that creates an HttpRequest should
   1459 // go through this method, rather than directly using http_request_factory_.
   1460 Status GcsFileSystem::CreateHttpRequest(std::unique_ptr<HttpRequest>* request) {
   1461   std::unique_ptr<HttpRequest> new_request{http_request_factory_->Create()};
   1462   if (dns_cache_) {
   1463     dns_cache_->AnnotateRequest(new_request.get());
   1464   }
   1465 
   1466   string auth_token;
   1467   TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token));
   1468 
   1469   new_request->AddAuthBearerHeader(auth_token);
   1470 
   1471   if (additional_header_) {
   1472     new_request->AddHeader(additional_header_->first,
   1473                            additional_header_->second);
   1474   }
   1475 
   1476   if (!throttle_.AdmitRequest()) {
   1477     return errors::Unavailable("Request throttled");
   1478   }
   1479 
   1480   *request = std::move(new_request);
   1481   return Status::OK();
   1482 }
   1483 
   1484 REGISTER_FILE_SYSTEM("gs", RetryingGcsFileSystem);
   1485 
   1486 }  // namespace tensorflow
   1487