1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/platform/cloud/gcs_file_system.h" 17 #include <stdio.h> 18 #include <unistd.h> 19 #include <algorithm> 20 #include <cstdio> 21 #include <cstdlib> 22 #include <cstring> 23 #include <fstream> 24 #include <vector> 25 #ifdef _WIN32 26 #include <io.h> // for _mktemp 27 #endif 28 #include "include/json/json.h" 29 #include "tensorflow/core/lib/core/errors.h" 30 #include "tensorflow/core/lib/gtl/map_util.h" 31 #include "tensorflow/core/lib/gtl/stl_util.h" 32 #include "tensorflow/core/lib/io/path.h" 33 #include "tensorflow/core/lib/strings/numbers.h" 34 #include "tensorflow/core/lib/strings/str_util.h" 35 #include "tensorflow/core/lib/strings/stringprintf.h" 36 #include "tensorflow/core/platform/cloud/curl_http_request.h" 37 #include "tensorflow/core/platform/cloud/file_block_cache.h" 38 #include "tensorflow/core/platform/cloud/google_auth_provider.h" 39 #include "tensorflow/core/platform/cloud/retrying_utils.h" 40 #include "tensorflow/core/platform/cloud/time_util.h" 41 #include "tensorflow/core/platform/env.h" 42 #include "tensorflow/core/platform/mutex.h" 43 #include "tensorflow/core/platform/protobuf.h" 44 #include "tensorflow/core/platform/thread_annotations.h" 45 46 #ifdef _WIN32 47 #ifdef DeleteFile 48 #undef DeleteFile 49 #endif 50 #endif 51 52 namespace tensorflow { 53 namespace { 54 55 constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/"; 56 constexpr char kGcsUploadUriBase[] = 57 "https://www.googleapis.com/upload/storage/v1/"; 58 constexpr char kStorageHost[] = "storage.googleapis.com"; 59 constexpr size_t kReadAppendableFileBufferSize = 1024 * 1024; // In bytes. 60 constexpr int kGetChildrenDefaultPageSize = 1000; 61 // The HTTP response code "308 Resume Incomplete". 62 constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308; 63 // The environment variable that overrides the size of the readahead buffer. 64 // DEPRECATED. Use GCS_BLOCK_SIZE_MB instead. 65 constexpr char kReadaheadBufferSize[] = "GCS_READAHEAD_BUFFER_SIZE_BYTES"; 66 // The environment variable that overrides the block size for aligned reads from 67 // GCS. Specified in MB (e.g. "16" = 16 x 1024 x 1024 = 16777216 bytes). 68 constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB"; 69 constexpr size_t kDefaultBlockSize = 128 * 1024 * 1024; 70 // The environment variable that overrides the max size of the LRU cache of 71 // blocks read from GCS. Specified in MB. 72 constexpr char kMaxCacheSize[] = "GCS_READ_CACHE_MAX_SIZE_MB"; 73 constexpr size_t kDefaultMaxCacheSize = 2 * kDefaultBlockSize; 74 // The environment variable that overrides the maximum staleness of cached file 75 // contents. Once any block of a file reaches this staleness, all cached blocks 76 // will be evicted on the next read. 77 constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS"; 78 constexpr uint64 kDefaultMaxStaleness = 0; 79 // The environment variable that overrides the maximum age of entries in the 80 // Stat cache. A value of 0 (the default) means nothing is cached. 81 constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE"; 82 constexpr uint64 kStatCacheDefaultMaxAge = 0; 83 // The environment variable that overrides the maximum number of entries in the 84 // Stat cache. 85 constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES"; 86 constexpr size_t kStatCacheDefaultMaxEntries = 1024; 87 // The environment variable that overrides the maximum age of entries in the 88 // GetMatchingPaths cache. A value of 0 (the default) means nothing is cached. 89 constexpr char kMatchingPathsCacheMaxAge[] = "GCS_MATCHING_PATHS_CACHE_MAX_AGE"; 90 constexpr uint64 kMatchingPathsCacheDefaultMaxAge = 0; 91 // The environment variable that overrides the maximum number of entries in the 92 // GetMatchingPaths cache. 93 constexpr char kMatchingPathsCacheMaxEntries[] = 94 "GCS_MATCHING_PATHS_CACHE_MAX_ENTRIES"; 95 constexpr size_t kMatchingPathsCacheDefaultMaxEntries = 1024; 96 // The file statistics returned by Stat() for directories. 97 const FileStatistics DIRECTORY_STAT(0, 0, true); 98 // Some environments exhibit unreliable DNS resolution. Set this environment 99 // variable to a positive integer describing the frequency used to refresh the 100 // userspace DNS cache. 101 constexpr char kResolveCacheSecs[] = "GCS_RESOLVE_REFRESH_SECS"; 102 // The environment variable to configure the http request's connection timeout. 103 constexpr char kRequestConnectionTimeout[] = 104 "GCS_REQUEST_CONNECTION_TIMEOUT_SECS"; 105 // The environment varaible to configure the http request's idle timeout. 106 constexpr char kRequestIdleTimeout[] = "GCS_REQUEST_IDLE_TIMEOUT_SECS"; 107 // The environment variable to configure the overall request timeout for 108 // metadata requests. 109 constexpr char kMetadataRequestTimeout[] = "GCS_METADATA_REQUEST_TIMEOUT_SECS"; 110 // The environment variable to configure the overall request timeout for 111 // block reads requests. 112 constexpr char kReadRequestTimeout[] = "GCS_READ_REQUEST_TIMEOUT_SECS"; 113 // The environment variable to configure the overall request timeout for 114 // upload requests. 115 constexpr char kWriteRequestTimeout[] = "GCS_WRITE_REQUEST_TIMEOUT_SECS"; 116 // The environment variable to configure an additional header to send with 117 // all requests to GCS (format HEADERNAME:HEADERCONTENT) 118 constexpr char kAdditionalRequestHeader[] = "GCS_ADDITIONAL_REQUEST_HEADER"; 119 // The environment variable to configure the throttle (format: <int64>) 120 constexpr char kThrottleRate[] = "GCS_THROTTLE_TOKEN_RATE"; 121 // The environment variable to configure the token bucket size (format: <int64>) 122 constexpr char kThrottleBucket[] = "GCS_THROTTLE_BUCKET_SIZE"; 123 // The environment variable that controls the number of tokens per request. 124 // (format: <int64>) 125 constexpr char kTokensPerRequest[] = "GCS_TOKENS_PER_REQUEST"; 126 // The environment variable to configure the initial tokens (format: <int64>) 127 constexpr char kInitialTokens[] = "GCS_INITIAL_TOKENS"; 128 129 // TODO: DO NOT use a hardcoded path 130 Status GetTmpFilename(string* filename) { 131 if (!filename) { 132 return errors::Internal("'filename' cannot be nullptr."); 133 } 134 #ifndef _WIN32 135 char buffer[] = "/tmp/gcs_filesystem_XXXXXX"; 136 int fd = mkstemp(buffer); 137 if (fd < 0) { 138 return errors::Internal("Failed to create a temporary file."); 139 } 140 close(fd); 141 #else 142 char buffer[] = "/tmp/gcs_filesystem_XXXXXX"; 143 char* ret = _mktemp(buffer); 144 if (ret == nullptr) { 145 return errors::Internal("Failed to create a temporary file."); 146 } 147 #endif 148 *filename = buffer; 149 return Status::OK(); 150 } 151 152 /// \brief Splits a GCS path to a bucket and an object. 153 /// 154 /// For example, "gs://bucket-name/path/to/file.txt" gets split into 155 /// "bucket-name" and "path/to/file.txt". 156 /// If fname only contains the bucket and empty_object_ok = true, the returned 157 /// object is empty. 158 Status ParseGcsPath(StringPiece fname, bool empty_object_ok, string* bucket, 159 string* object) { 160 if (!bucket || !object) { 161 return errors::Internal("bucket and object cannot be null."); 162 } 163 StringPiece scheme, bucketp, objectp; 164 io::ParseURI(fname, &scheme, &bucketp, &objectp); 165 if (scheme != "gs") { 166 return errors::InvalidArgument("GCS path doesn't start with 'gs://': ", 167 fname); 168 } 169 *bucket = bucketp.ToString(); 170 if (bucket->empty() || *bucket == ".") { 171 return errors::InvalidArgument("GCS path doesn't contain a bucket name: ", 172 fname); 173 } 174 objectp.Consume("/"); 175 *object = objectp.ToString(); 176 if (!empty_object_ok && object->empty()) { 177 return errors::InvalidArgument("GCS path doesn't contain an object name: ", 178 fname); 179 } 180 return Status::OK(); 181 } 182 183 /// Appends a trailing slash if the name doesn't already have one. 184 string MaybeAppendSlash(const string& name) { 185 if (name.empty()) { 186 return "/"; 187 } 188 if (name.back() != '/') { 189 return strings::StrCat(name, "/"); 190 } 191 return name; 192 } 193 194 // io::JoinPath() doesn't work in cases when we want an empty subpath 195 // to result in an appended slash in order for directory markers 196 // to be processed correctly: "gs://a/b" + "" should give "gs://a/b/". 197 string JoinGcsPath(const string& path, const string& subpath) { 198 return strings::StrCat(MaybeAppendSlash(path), subpath); 199 } 200 201 /// \brief Returns the given paths appending all their subfolders. 202 /// 203 /// For every path X in the list, every subfolder in X is added to the 204 /// resulting list. 205 /// For example: 206 /// - for 'a/b/c/d' it will append 'a', 'a/b' and 'a/b/c' 207 /// - for 'a/b/c/' it will append 'a', 'a/b' and 'a/b/c' 208 std::set<string> AddAllSubpaths(const std::vector<string>& paths) { 209 std::set<string> result; 210 result.insert(paths.begin(), paths.end()); 211 for (const string& path : paths) { 212 StringPiece subpath = io::Dirname(path); 213 while (!subpath.empty()) { 214 result.emplace(subpath.ToString()); 215 subpath = io::Dirname(subpath); 216 } 217 } 218 return result; 219 } 220 221 Status ParseJson(StringPiece json, Json::Value* result) { 222 Json::Reader reader; 223 if (!reader.parse(json.data(), json.data() + json.size(), *result)) { 224 return errors::Internal("Couldn't parse JSON response from GCS."); 225 } 226 return Status::OK(); 227 } 228 229 Status ParseJson(const std::vector<char>& json, Json::Value* result) { 230 return ParseJson(StringPiece{json.data(), json.size()}, result); 231 } 232 233 /// Reads a JSON value with the given name from a parent JSON value. 234 Status GetValue(const Json::Value& parent, const char* name, 235 Json::Value* result) { 236 *result = parent.get(name, Json::Value::null); 237 if (result->isNull()) { 238 return errors::Internal("The field '", name, 239 "' was expected in the JSON response."); 240 } 241 return Status::OK(); 242 } 243 244 /// Reads a string JSON value with the given name from a parent JSON value. 245 Status GetStringValue(const Json::Value& parent, const char* name, 246 string* result) { 247 Json::Value result_value; 248 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value)); 249 if (!result_value.isString()) { 250 return errors::Internal( 251 "The field '", name, 252 "' in the JSON response was expected to be a string."); 253 } 254 *result = result_value.asString(); 255 return Status::OK(); 256 } 257 258 /// Reads a long JSON value with the given name from a parent JSON value. 259 Status GetInt64Value(const Json::Value& parent, const char* name, 260 int64* result) { 261 Json::Value result_value; 262 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value)); 263 if (result_value.isNumeric()) { 264 *result = result_value.asInt64(); 265 return Status::OK(); 266 } 267 if (result_value.isString() && 268 strings::safe_strto64(result_value.asCString(), result)) { 269 return Status::OK(); 270 } 271 return errors::Internal( 272 "The field '", name, 273 "' in the JSON response was expected to be a number."); 274 } 275 276 /// Reads a boolean JSON value with the given name from a parent JSON value. 277 Status GetBoolValue(const Json::Value& parent, const char* name, bool* result) { 278 Json::Value result_value; 279 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value)); 280 if (!result_value.isBool()) { 281 return errors::Internal( 282 "The field '", name, 283 "' in the JSON response was expected to be a boolean."); 284 } 285 *result = result_value.asBool(); 286 return Status::OK(); 287 } 288 289 /// A GCS-based implementation of a random access file with an LRU block cache. 290 class GcsRandomAccessFile : public RandomAccessFile { 291 public: 292 GcsRandomAccessFile(const string& filename, FileBlockCache* file_block_cache) 293 : filename_(filename), file_block_cache_(file_block_cache) {} 294 295 /// The implementation of reads with an LRU block cache. Thread safe. 296 Status Read(uint64 offset, size_t n, StringPiece* result, 297 char* scratch) const override { 298 *result = StringPiece(); 299 size_t bytes_transferred; 300 TF_RETURN_IF_ERROR(file_block_cache_->Read(filename_, offset, n, scratch, 301 &bytes_transferred)); 302 *result = StringPiece(scratch, bytes_transferred); 303 if (bytes_transferred < n) { 304 // This is not an error per se. The RandomAccessFile interface expects 305 // that Read returns OutOfRange if fewer bytes were read than requested. 306 return errors::OutOfRange("EOF reached, ", result->size(), 307 " bytes were read out of ", n, 308 " bytes requested."); 309 } 310 return Status::OK(); 311 } 312 313 private: 314 /// The filename of this file. 315 const string filename_; 316 /// The LRU block cache for this file. 317 mutable FileBlockCache* file_block_cache_; // not owned 318 }; 319 320 /// \brief GCS-based implementation of a writeable file. 321 /// 322 /// Since GCS objects are immutable, this implementation writes to a local 323 /// tmp file and copies it to GCS on flush/close. 324 class GcsWritableFile : public WritableFile { 325 public: 326 GcsWritableFile(const string& bucket, const string& object, 327 GcsFileSystem* filesystem, 328 GcsFileSystem::TimeoutConfig* timeouts, 329 std::function<void()> file_cache_erase, 330 int64 initial_retry_delay_usec) 331 : bucket_(bucket), 332 object_(object), 333 filesystem_(filesystem), 334 timeouts_(timeouts), 335 file_cache_erase_(std::move(file_cache_erase)), 336 sync_needed_(true), 337 initial_retry_delay_usec_(initial_retry_delay_usec) { 338 // TODO: to make it safer, outfile_ should be constructed from an FD 339 if (GetTmpFilename(&tmp_content_filename_).ok()) { 340 outfile_.open(tmp_content_filename_, 341 std::ofstream::binary | std::ofstream::app); 342 } 343 } 344 345 /// \brief Constructs the writable file in append mode. 346 /// 347 /// tmp_content_filename should contain a path of an existing temporary file 348 /// with the content to be appended. The class takes onwnership of the 349 /// specified tmp file and deletes it on close. 350 GcsWritableFile(const string& bucket, const string& object, 351 GcsFileSystem* filesystem, const string& tmp_content_filename, 352 GcsFileSystem::TimeoutConfig* timeouts, 353 std::function<void()> file_cache_erase, 354 int64 initial_retry_delay_usec) 355 : bucket_(bucket), 356 object_(object), 357 filesystem_(filesystem), 358 timeouts_(timeouts), 359 file_cache_erase_(std::move(file_cache_erase)), 360 sync_needed_(true), 361 initial_retry_delay_usec_(initial_retry_delay_usec) { 362 tmp_content_filename_ = tmp_content_filename; 363 outfile_.open(tmp_content_filename_, 364 std::ofstream::binary | std::ofstream::app); 365 } 366 367 ~GcsWritableFile() override { Close().IgnoreError(); } 368 369 Status Append(const StringPiece& data) override { 370 TF_RETURN_IF_ERROR(CheckWritable()); 371 sync_needed_ = true; 372 outfile_ << data; 373 if (!outfile_.good()) { 374 return errors::Internal( 375 "Could not append to the internal temporary file."); 376 } 377 return Status::OK(); 378 } 379 380 Status Close() override { 381 if (outfile_.is_open()) { 382 TF_RETURN_IF_ERROR(Sync()); 383 outfile_.close(); 384 std::remove(tmp_content_filename_.c_str()); 385 } 386 return Status::OK(); 387 } 388 389 Status Flush() override { return Sync(); } 390 391 Status Sync() override { 392 TF_RETURN_IF_ERROR(CheckWritable()); 393 if (!sync_needed_) { 394 return Status::OK(); 395 } 396 Status status = SyncImpl(); 397 if (status.ok()) { 398 sync_needed_ = false; 399 } 400 return status; 401 } 402 403 private: 404 /// Copies the current version of the file to GCS. 405 /// 406 /// This SyncImpl() uploads the object to GCS. 407 /// In case of a failure, it resumes failed uploads as recommended by the GCS 408 /// resumable API documentation. When the whole upload needs to be 409 /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem. 410 Status SyncImpl() { 411 outfile_.flush(); 412 if (!outfile_.good()) { 413 return errors::Internal( 414 "Could not write to the internal temporary file."); 415 } 416 string session_uri; 417 TF_RETURN_IF_ERROR(CreateNewUploadSession(&session_uri)); 418 uint64 already_uploaded = 0; 419 bool first_attempt = true; 420 const Status upload_status = RetryingUtils::CallWithRetries( 421 [&first_attempt, &already_uploaded, &session_uri, this]() { 422 if (!first_attempt) { 423 bool completed; 424 TF_RETURN_IF_ERROR(RequestUploadSessionStatus( 425 session_uri, &completed, &already_uploaded)); 426 if (completed) { 427 // Erase the file from the file cache on every successful write. 428 file_cache_erase_(); 429 // It's unclear why UploadToSession didn't return OK in the 430 // previous attempt, but GCS reports that the file is fully 431 // uploaded, so succeed. 432 return Status::OK(); 433 } 434 } 435 first_attempt = false; 436 return UploadToSession(session_uri, already_uploaded); 437 }, 438 initial_retry_delay_usec_); 439 if (upload_status.code() == errors::Code::NOT_FOUND) { 440 // GCS docs recommend retrying the whole upload. We're relying on the 441 // RetryingFileSystem to retry the Sync() call. 442 return errors::Unavailable( 443 strings::StrCat("Upload to gs://", bucket_, "/", object_, 444 " failed, caused by: ", upload_status.ToString())); 445 } 446 return upload_status; 447 } 448 449 Status CheckWritable() const { 450 if (!outfile_.is_open()) { 451 return errors::FailedPrecondition( 452 "The internal temporary file is not writable."); 453 } 454 return Status::OK(); 455 } 456 457 Status GetCurrentFileSize(uint64* size) { 458 if (size == nullptr) { 459 return errors::Internal("'size' cannot be nullptr"); 460 } 461 const auto tellp = outfile_.tellp(); 462 if (tellp == static_cast<std::streampos>(-1)) { 463 return errors::Internal( 464 "Could not get the size of the internal temporary file."); 465 } 466 *size = tellp; 467 return Status::OK(); 468 } 469 470 /// Initiates a new resumable upload session. 471 Status CreateNewUploadSession(string* session_uri) { 472 if (session_uri == nullptr) { 473 return errors::Internal("'session_uri' cannot be nullptr."); 474 } 475 uint64 file_size; 476 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size)); 477 478 std::vector<char> output_buffer; 479 std::unique_ptr<HttpRequest> request; 480 TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request)); 481 482 request->SetUri(strings::StrCat( 483 kGcsUploadUriBase, "b/", bucket_, 484 "/o?uploadType=resumable&name=", request->EscapeString(object_))); 485 request->AddHeader("X-Upload-Content-Length", std::to_string(file_size)); 486 request->SetPostEmptyBody(); 487 request->SetResultBuffer(&output_buffer); 488 request->SetTimeouts(timeouts_->connect, timeouts_->idle, 489 timeouts_->metadata); 490 TF_RETURN_WITH_CONTEXT_IF_ERROR( 491 request->Send(), " when initiating an upload to ", GetGcsPath()); 492 *session_uri = request->GetResponseHeader("Location"); 493 if (session_uri->empty()) { 494 return errors::Internal("Unexpected response from GCS when writing to ", 495 GetGcsPath(), 496 ": 'Location' header not returned."); 497 } 498 return Status::OK(); 499 } 500 501 /// \brief Requests status of a previously initiated upload session. 502 /// 503 /// If the upload has already succeeded, sets 'completed' to true. 504 /// Otherwise sets 'completed' to false and 'uploaded' to the currently 505 /// uploaded size in bytes. 506 Status RequestUploadSessionStatus(const string& session_uri, bool* completed, 507 uint64* uploaded) { 508 if (completed == nullptr || uploaded == nullptr) { 509 return errors::Internal("'completed' and 'uploaded' cannot be nullptr."); 510 } 511 uint64 file_size; 512 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size)); 513 514 std::unique_ptr<HttpRequest> request; 515 TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request)); 516 request->SetUri(session_uri); 517 request->SetTimeouts(timeouts_->connect, timeouts_->idle, 518 timeouts_->metadata); 519 request->AddHeader("Content-Range", strings::StrCat("bytes */", file_size)); 520 request->SetPutEmptyBody(); 521 const Status& status = request->Send(); 522 if (status.ok()) { 523 *completed = true; 524 return Status::OK(); 525 } 526 *completed = false; 527 if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) { 528 TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ", 529 GetGcsPath()); 530 } 531 const string& received_range = request->GetResponseHeader("Range"); 532 if (received_range.empty()) { 533 // This means GCS doesn't have any bytes of the file yet. 534 *uploaded = 0; 535 } else { 536 StringPiece range_piece(received_range); 537 range_piece.Consume("bytes="); // May or may not be present. 538 std::vector<int64> range_parts; 539 if (!str_util::SplitAndParseAsInts(range_piece, '-', &range_parts) || 540 range_parts.size() != 2) { 541 return errors::Internal("Unexpected response from GCS when writing ", 542 GetGcsPath(), ": Range header '", 543 received_range, "' could not be parsed."); 544 } 545 if (range_parts[0] != 0) { 546 return errors::Internal("Unexpected response from GCS when writing to ", 547 GetGcsPath(), ": the returned range '", 548 received_range, "' does not start at zero."); 549 } 550 // If GCS returned "Range: 0-10", this means 11 bytes were uploaded. 551 *uploaded = range_parts[1] + 1; 552 } 553 return Status::OK(); 554 } 555 556 Status UploadToSession(const string& session_uri, uint64 start_offset) { 557 uint64 file_size; 558 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size)); 559 560 std::unique_ptr<HttpRequest> request; 561 TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request)); 562 request->SetUri(session_uri); 563 if (file_size > 0) { 564 request->AddHeader("Content-Range", 565 strings::StrCat("bytes ", start_offset, "-", 566 file_size - 1, "/", file_size)); 567 } 568 request->SetTimeouts(timeouts_->connect, timeouts_->idle, timeouts_->write); 569 570 TF_RETURN_IF_ERROR( 571 request->SetPutFromFile(tmp_content_filename_, start_offset)); 572 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ", 573 GetGcsPath()); 574 // Erase the file from the file cache on every successful write. 575 file_cache_erase_(); 576 return Status::OK(); 577 } 578 579 string GetGcsPath() const { 580 return strings::StrCat("gs://", bucket_, "/", object_); 581 } 582 583 string bucket_; 584 string object_; 585 GcsFileSystem* const filesystem_; // Not owned. 586 string tmp_content_filename_; 587 std::ofstream outfile_; 588 GcsFileSystem::TimeoutConfig* timeouts_; 589 std::function<void()> file_cache_erase_; 590 bool sync_needed_; // whether there is buffered data that needs to be synced 591 int64 initial_retry_delay_usec_; 592 }; 593 594 class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion { 595 public: 596 GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length) 597 : data_(std::move(data)), length_(length) {} 598 const void* data() override { return reinterpret_cast<void*>(data_.get()); } 599 uint64 length() override { return length_; } 600 601 private: 602 std::unique_ptr<char[]> data_; 603 uint64 length_; 604 }; 605 606 // Helper function to extract an environment variable and convert it into a 607 // value of type T. 608 template <typename T> 609 bool GetEnvVar(const char* varname, bool (*convert)(StringPiece, T*), 610 T* value) { 611 const char* env_value = std::getenv(varname); 612 if (!env_value) { 613 return false; 614 } 615 return convert(env_value, value); 616 } 617 618 bool StringPieceIdentity(StringPiece str, StringPiece* value) { 619 *value = str; 620 return true; 621 } 622 623 } // namespace 624 625 GcsFileSystem::GcsFileSystem() 626 : auth_provider_(new GoogleAuthProvider()), 627 http_request_factory_(new CurlHttpRequest::Factory()) { 628 uint64 value; 629 size_t block_size = kDefaultBlockSize; 630 size_t max_bytes = kDefaultMaxCacheSize; 631 uint64 max_staleness = kDefaultMaxStaleness; 632 // Apply the sys env override for the readahead buffer size if it's provided. 633 if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &value)) { 634 block_size = value; 635 } 636 // Apply the overrides for the block size (MB), max bytes (MB), and max 637 // staleness (seconds) if provided. 638 if (GetEnvVar(kBlockSize, strings::safe_strtou64, &value)) { 639 block_size = value * 1024 * 1024; 640 } 641 if (GetEnvVar(kMaxCacheSize, strings::safe_strtou64, &value)) { 642 max_bytes = value * 1024 * 1024; 643 } 644 if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &value)) { 645 max_staleness = value; 646 } 647 file_block_cache_ = MakeFileBlockCache(block_size, max_bytes, max_staleness); 648 // Apply overrides for the stat cache max age and max entries, if provided. 649 uint64 stat_cache_max_age = kStatCacheDefaultMaxAge; 650 size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries; 651 if (GetEnvVar(kStatCacheMaxAge, strings::safe_strtou64, &value)) { 652 stat_cache_max_age = value; 653 } 654 if (GetEnvVar(kStatCacheMaxEntries, strings::safe_strtou64, &value)) { 655 stat_cache_max_entries = value; 656 } 657 stat_cache_.reset(new ExpiringLRUCache<FileStatistics>( 658 stat_cache_max_age, stat_cache_max_entries)); 659 // Apply overrides for the matching paths cache max age and max entries, if 660 // provided. 661 uint64 matching_paths_cache_max_age = kMatchingPathsCacheDefaultMaxAge; 662 size_t matching_paths_cache_max_entries = 663 kMatchingPathsCacheDefaultMaxEntries; 664 if (GetEnvVar(kMatchingPathsCacheMaxAge, strings::safe_strtou64, &value)) { 665 matching_paths_cache_max_age = value; 666 } 667 if (GetEnvVar(kMatchingPathsCacheMaxEntries, strings::safe_strtou64, 668 &value)) { 669 matching_paths_cache_max_entries = value; 670 } 671 matching_paths_cache_.reset(new ExpiringLRUCache<std::vector<string>>( 672 matching_paths_cache_max_age, matching_paths_cache_max_entries)); 673 674 int64 resolve_frequency_secs; 675 if (GetEnvVar(kResolveCacheSecs, strings::safe_strto64, 676 &resolve_frequency_secs)) { 677 dns_cache_.reset(new GcsDnsCache(resolve_frequency_secs)); 678 VLOG(1) << "GCS DNS cache is enabled. " << kResolveCacheSecs << " = " 679 << resolve_frequency_secs; 680 } else { 681 VLOG(1) << "GCS DNS cache is disabled, because " << kResolveCacheSecs 682 << " = 0 (or is not set)"; 683 } 684 685 // Get the additional header 686 StringPiece add_header_contents; 687 if (GetEnvVar(kAdditionalRequestHeader, StringPieceIdentity, 688 &add_header_contents)) { 689 size_t split = add_header_contents.find(':', 0); 690 691 if (split != StringPiece::npos) { 692 StringPiece header_name = add_header_contents.substr(0, split); 693 StringPiece header_value = add_header_contents.substr(split + 1); 694 695 if (!header_name.empty() && !header_value.empty()) { 696 additional_header_.reset(new std::pair<const string, const string>( 697 header_name.ToString(), header_value.ToString())); 698 699 VLOG(1) << "GCS additional header ENABLED. " 700 << "Name: " << additional_header_->first << ", " 701 << "Value: " << additional_header_->second; 702 } else { 703 LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: " 704 << add_header_contents; 705 } 706 } else { 707 LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: " 708 << add_header_contents; 709 } 710 } else { 711 VLOG(1) << "GCS additional header DISABLED. No environment variable set."; 712 } 713 714 // Apply the overrides for request timeouts 715 uint32 timeout_value; 716 if (GetEnvVar(kRequestConnectionTimeout, strings::safe_strtou32, 717 &timeout_value)) { 718 timeouts_.connect = timeout_value; 719 } 720 if (GetEnvVar(kRequestIdleTimeout, strings::safe_strtou32, &timeout_value)) { 721 timeouts_.idle = timeout_value; 722 } 723 if (GetEnvVar(kMetadataRequestTimeout, strings::safe_strtou32, 724 &timeout_value)) { 725 timeouts_.metadata = timeout_value; 726 } 727 if (GetEnvVar(kReadRequestTimeout, strings::safe_strtou32, &timeout_value)) { 728 timeouts_.read = timeout_value; 729 } 730 if (GetEnvVar(kWriteRequestTimeout, strings::safe_strtou32, &timeout_value)) { 731 timeouts_.write = timeout_value; 732 } 733 734 int64 token_value; 735 if (GetEnvVar(kThrottleRate, strings::safe_strto64, &token_value)) { 736 GcsThrottleConfig config; 737 config.enabled = true; 738 config.token_rate = token_value; 739 740 if (GetEnvVar(kThrottleBucket, strings::safe_strto64, &token_value)) { 741 config.bucket_size = token_value; 742 } 743 744 if (GetEnvVar(kTokensPerRequest, strings::safe_strto64, &token_value)) { 745 config.tokens_per_request = token_value; 746 } 747 748 if (GetEnvVar(kInitialTokens, strings::safe_strto64, &token_value)) { 749 config.initial_tokens = token_value; 750 } 751 throttle_.SetConfig(config); 752 } 753 } 754 755 GcsFileSystem::GcsFileSystem( 756 std::unique_ptr<AuthProvider> auth_provider, 757 std::unique_ptr<HttpRequest::Factory> http_request_factory, 758 size_t block_size, size_t max_bytes, uint64 max_staleness, 759 uint64 stat_cache_max_age, size_t stat_cache_max_entries, 760 uint64 matching_paths_cache_max_age, 761 size_t matching_paths_cache_max_entries, int64 initial_retry_delay_usec, 762 TimeoutConfig timeouts, 763 std::pair<const string, const string>* additional_header) 764 : auth_provider_(std::move(auth_provider)), 765 http_request_factory_(std::move(http_request_factory)), 766 file_block_cache_( 767 MakeFileBlockCache(block_size, max_bytes, max_staleness)), 768 stat_cache_(new StatCache(stat_cache_max_age, stat_cache_max_entries)), 769 matching_paths_cache_(new MatchingPathsCache( 770 matching_paths_cache_max_age, matching_paths_cache_max_entries)), 771 timeouts_(timeouts), 772 initial_retry_delay_usec_(initial_retry_delay_usec), 773 additional_header_(additional_header) {} 774 775 Status GcsFileSystem::NewRandomAccessFile( 776 const string& fname, std::unique_ptr<RandomAccessFile>* result) { 777 string bucket, object; 778 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); 779 result->reset(new GcsRandomAccessFile(fname, file_block_cache_.get())); 780 return Status::OK(); 781 } 782 783 // A helper function to build a FileBlockCache for GcsFileSystem. 784 std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache( 785 size_t block_size, size_t max_bytes, uint64 max_staleness) { 786 std::unique_ptr<FileBlockCache> file_block_cache( 787 new FileBlockCache(block_size, max_bytes, max_staleness, 788 [this](const string& filename, size_t offset, size_t n, 789 char* buffer, size_t* bytes_transferred) { 790 return LoadBufferFromGCS(filename, offset, n, buffer, 791 bytes_transferred); 792 })); 793 return file_block_cache; 794 } 795 796 // A helper function to actually read the data from GCS. 797 Status GcsFileSystem::LoadBufferFromGCS(const string& filename, size_t offset, 798 size_t n, char* buffer, 799 size_t* bytes_transferred) { 800 *bytes_transferred = 0; 801 802 string bucket, object; 803 TF_RETURN_IF_ERROR(ParseGcsPath(filename, false, &bucket, &object)); 804 805 std::unique_ptr<HttpRequest> request; 806 TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request), 807 "when reading gs://", bucket, "/", object); 808 809 request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket, "/", 810 request->EscapeString(object))); 811 request->SetRange(offset, offset + n - 1); 812 request->SetResultBufferDirect(buffer, n); 813 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.read); 814 815 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://", 816 bucket, "/", object); 817 818 size_t bytes_read = request->GetResultBufferDirectBytesTransferred(); 819 *bytes_transferred = bytes_read; 820 VLOG(1) << "Successful read of gs://" << bucket << "/" << object << " @ " 821 << offset << " of size: " << bytes_read; 822 823 throttle_.RecordResponse(bytes_read); 824 825 if (bytes_read < block_size()) { 826 // Check stat cache to see if we encountered an interrupted read. 827 FileStatistics stat; 828 if (stat_cache_->Lookup(filename, &stat)) { 829 if (offset + bytes_read < stat.length) { 830 return errors::Internal(strings::Printf( 831 "File contents are inconsistent for file: %s @ %lu.", 832 filename.c_str(), offset)); 833 } 834 VLOG(2) << "Successful integrity check for: gs://" << bucket << "/" 835 << object << " @ " << offset; 836 } 837 } 838 839 return Status::OK(); 840 } 841 842 Status GcsFileSystem::NewWritableFile(const string& fname, 843 std::unique_ptr<WritableFile>* result) { 844 string bucket, object; 845 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); 846 result->reset(new GcsWritableFile( 847 bucket, object, this, &timeouts_, 848 [this, fname]() { file_block_cache_->RemoveFile(fname); }, 849 initial_retry_delay_usec_)); 850 return Status::OK(); 851 } 852 853 // Reads the file from GCS in chunks and stores it in a tmp file, 854 // which is then passed to GcsWritableFile. 855 Status GcsFileSystem::NewAppendableFile(const string& fname, 856 std::unique_ptr<WritableFile>* result) { 857 std::unique_ptr<RandomAccessFile> reader; 858 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader)); 859 std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]); 860 Status status; 861 uint64 offset = 0; 862 StringPiece read_chunk; 863 864 // Read the file from GCS in chunks and save it to a tmp file. 865 string old_content_filename; 866 TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename)); 867 std::ofstream old_content(old_content_filename, std::ofstream::binary); 868 while (true) { 869 status = reader->Read(offset, kReadAppendableFileBufferSize, &read_chunk, 870 buffer.get()); 871 if (status.ok()) { 872 old_content << read_chunk; 873 offset += kReadAppendableFileBufferSize; 874 } else if (status.code() == error::OUT_OF_RANGE) { 875 // Expected, this means we reached EOF. 876 old_content << read_chunk; 877 break; 878 } else { 879 return status; 880 } 881 } 882 old_content.close(); 883 884 // Create a writable file and pass the old content to it. 885 string bucket, object; 886 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); 887 result->reset(new GcsWritableFile( 888 bucket, object, this, old_content_filename, &timeouts_, 889 [this, fname]() { file_block_cache_->RemoveFile(fname); }, 890 initial_retry_delay_usec_)); 891 return Status::OK(); 892 } 893 894 Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile( 895 const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) { 896 uint64 size; 897 TF_RETURN_IF_ERROR(GetFileSize(fname, &size)); 898 std::unique_ptr<char[]> data(new char[size]); 899 900 std::unique_ptr<RandomAccessFile> file; 901 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file)); 902 903 StringPiece piece; 904 TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get())); 905 906 result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size)); 907 return Status::OK(); 908 } 909 910 Status GcsFileSystem::FileExists(const string& fname) { 911 string bucket, object; 912 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object)); 913 if (object.empty()) { 914 bool result; 915 TF_RETURN_IF_ERROR(BucketExists(bucket, &result)); 916 if (result) { 917 return Status::OK(); 918 } 919 } 920 bool result; 921 TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &result)); 922 if (result) { 923 return Status::OK(); 924 } 925 TF_RETURN_IF_ERROR(FolderExists(fname, &result)); 926 if (result) { 927 return Status::OK(); 928 } 929 return errors::NotFound("The specified path ", fname, " was not found."); 930 } 931 932 Status GcsFileSystem::ObjectExists(const string& fname, const string& bucket, 933 const string& object, bool* result) { 934 if (!result) { 935 return errors::Internal("'result' cannot be nullptr."); 936 } 937 FileStatistics not_used_stat; 938 const Status status = StatForObject(fname, bucket, object, ¬_used_stat); 939 switch (status.code()) { 940 case errors::Code::OK: 941 *result = true; 942 return Status::OK(); 943 case errors::Code::NOT_FOUND: 944 *result = false; 945 return Status::OK(); 946 default: 947 return status; 948 } 949 } 950 951 Status GcsFileSystem::StatForObject(const string& fname, const string& bucket, 952 const string& object, 953 FileStatistics* stat) { 954 if (!stat) { 955 return errors::Internal("'stat' cannot be nullptr."); 956 } 957 if (object.empty()) { 958 return errors::InvalidArgument(strings::Printf( 959 "'object' must be a non-empty string. (File: %s)", fname.c_str())); 960 } 961 962 StatCache::ComputeFunc compute_func = [this, &bucket, &object]( 963 const string& fname, 964 FileStatistics* stat) { 965 std::vector<char> output_buffer; 966 std::unique_ptr<HttpRequest> request; 967 TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request), 968 " when reading metadata of gs://", bucket, 969 "/", object); 970 971 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/", 972 request->EscapeString(object), 973 "?fields=size%2Cupdated")); 974 request->SetResultBuffer(&output_buffer); 975 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata); 976 977 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), 978 " when reading metadata of gs://", bucket, 979 "/", object); 980 981 Json::Value root; 982 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root)); 983 984 // Parse file size. 985 TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &stat->length)); 986 987 // Parse file modification time. 988 string updated; 989 TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated)); 990 TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->mtime_nsec))); 991 992 VLOG(1) << "Stat of: gs://" << bucket << "/" << object << " -- " 993 << " length: " << stat->length 994 << "; mtime_nsec: " << stat->mtime_nsec << "; updated: " << updated; 995 996 stat->is_directory = false; 997 return Status::OK(); 998 }; 999 1000 TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(fname, stat, compute_func)); 1001 if (stat->is_directory) { 1002 return errors::NotFound(fname, " is a directory."); 1003 } else { 1004 return Status::OK(); 1005 } 1006 } 1007 1008 Status GcsFileSystem::BucketExists(const string& bucket, bool* result) { 1009 if (!result) { 1010 return errors::Internal("'result' cannot be nullptr."); 1011 } 1012 1013 std::unique_ptr<HttpRequest> request; 1014 TF_RETURN_IF_ERROR(CreateHttpRequest(&request)); 1015 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket)); 1016 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata); 1017 const Status status = request->Send(); 1018 switch (status.code()) { 1019 case errors::Code::OK: 1020 *result = true; 1021 return Status::OK(); 1022 case errors::Code::NOT_FOUND: 1023 *result = false; 1024 return Status::OK(); 1025 default: 1026 return status; 1027 } 1028 } 1029 1030 Status GcsFileSystem::FolderExists(const string& dirname, bool* result) { 1031 if (!result) { 1032 return errors::Internal("'result' cannot be nullptr."); 1033 } 1034 StatCache::ComputeFunc compute_func = [this](const string& dirname, 1035 FileStatistics* stat) { 1036 std::vector<string> children; 1037 TF_RETURN_IF_ERROR( 1038 GetChildrenBounded(dirname, 1, &children, true /* recursively */, 1039 true /* include_self_directory_marker */)); 1040 if (!children.empty()) { 1041 *stat = DIRECTORY_STAT; 1042 return Status::OK(); 1043 } else { 1044 return errors::InvalidArgument("Not a directory!"); 1045 } 1046 }; 1047 FileStatistics stat; 1048 Status s = stat_cache_->LookupOrCompute(dirname, &stat, compute_func); 1049 if (s.ok()) { 1050 *result = stat.is_directory; 1051 return Status::OK(); 1052 } 1053 if (errors::IsInvalidArgument(s)) { 1054 *result = false; 1055 return Status::OK(); 1056 } 1057 return s; 1058 } 1059 1060 Status GcsFileSystem::GetChildren(const string& dirname, 1061 std::vector<string>* result) { 1062 return GetChildrenBounded(dirname, UINT64_MAX, result, 1063 false /* recursively */, 1064 false /* include_self_directory_marker */); 1065 } 1066 1067 Status GcsFileSystem::GetMatchingPaths(const string& pattern, 1068 std::vector<string>* results) { 1069 MatchingPathsCache::ComputeFunc compute_func = 1070 [this](const string& pattern, std::vector<string>* results) { 1071 results->clear(); 1072 // Find the fixed prefix by looking for the first wildcard. 1073 const string& fixed_prefix = 1074 pattern.substr(0, pattern.find_first_of("*?[\\")); 1075 const string& dir = io::Dirname(fixed_prefix).ToString(); 1076 if (dir.empty()) { 1077 return errors::InvalidArgument( 1078 "A GCS pattern doesn't have a bucket name: ", pattern); 1079 } 1080 std::vector<string> all_files; 1081 TF_RETURN_IF_ERROR(GetChildrenBounded( 1082 dir, UINT64_MAX, &all_files, true /* recursively */, 1083 false /* include_self_directory_marker */)); 1084 1085 const auto& files_and_folders = AddAllSubpaths(all_files); 1086 1087 // Match all obtained paths to the input pattern. 1088 for (const auto& path : files_and_folders) { 1089 const string& full_path = io::JoinPath(dir, path); 1090 if (Env::Default()->MatchPath(full_path, pattern)) { 1091 results->push_back(full_path); 1092 } 1093 } 1094 return Status::OK(); 1095 }; 1096 TF_RETURN_IF_ERROR( 1097 matching_paths_cache_->LookupOrCompute(pattern, results, compute_func)); 1098 return Status::OK(); 1099 } 1100 1101 Status GcsFileSystem::GetChildrenBounded(const string& dirname, 1102 uint64 max_results, 1103 std::vector<string>* result, 1104 bool recursive, 1105 bool include_self_directory_marker) { 1106 if (!result) { 1107 return errors::InvalidArgument("'result' cannot be null"); 1108 } 1109 string bucket, object_prefix; 1110 TF_RETURN_IF_ERROR( 1111 ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix)); 1112 1113 string nextPageToken; 1114 uint64 retrieved_results = 0; 1115 while (true) { // A loop over multiple result pages. 1116 std::vector<char> output_buffer; 1117 std::unique_ptr<HttpRequest> request; 1118 TF_RETURN_IF_ERROR(CreateHttpRequest(&request)); 1119 auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o"); 1120 if (recursive) { 1121 uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken"); 1122 } else { 1123 // Set "/" as a delimiter to ask GCS to treat subfolders as children 1124 // and return them in "prefixes". 1125 uri = strings::StrCat(uri, 1126 "?fields=items%2Fname%2Cprefixes%2CnextPageToken"); 1127 uri = strings::StrCat(uri, "&delimiter=%2F"); 1128 } 1129 if (!object_prefix.empty()) { 1130 uri = strings::StrCat(uri, 1131 "&prefix=", request->EscapeString(object_prefix)); 1132 } 1133 if (!nextPageToken.empty()) { 1134 uri = strings::StrCat( 1135 uri, "&pageToken=", request->EscapeString(nextPageToken)); 1136 } 1137 if (max_results - retrieved_results < kGetChildrenDefaultPageSize) { 1138 uri = 1139 strings::StrCat(uri, "&maxResults=", max_results - retrieved_results); 1140 } 1141 request->SetUri(uri); 1142 request->SetResultBuffer(&output_buffer); 1143 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata); 1144 1145 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname); 1146 Json::Value root; 1147 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root)); 1148 const auto items = root.get("items", Json::Value::null); 1149 if (!items.isNull()) { 1150 if (!items.isArray()) { 1151 return errors::Internal( 1152 "Expected an array 'items' in the GCS response."); 1153 } 1154 for (size_t i = 0; i < items.size(); i++) { 1155 const auto item = items.get(i, Json::Value::null); 1156 if (!item.isObject()) { 1157 return errors::Internal( 1158 "Unexpected JSON format: 'items' should be a list of objects."); 1159 } 1160 string name; 1161 TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name)); 1162 // The names should be relative to the 'dirname'. That means the 1163 // 'object_prefix', which is part of 'dirname', should be removed from 1164 // the beginning of 'name'. 1165 StringPiece relative_path(name); 1166 if (!relative_path.Consume(object_prefix)) { 1167 return errors::Internal(strings::StrCat( 1168 "Unexpected response: the returned file name ", name, 1169 " doesn't match the prefix ", object_prefix)); 1170 } 1171 if (!relative_path.empty() || include_self_directory_marker) { 1172 result->emplace_back(relative_path.ToString()); 1173 } 1174 if (++retrieved_results >= max_results) { 1175 return Status::OK(); 1176 } 1177 } 1178 } 1179 const auto prefixes = root.get("prefixes", Json::Value::null); 1180 if (!prefixes.isNull()) { 1181 // Subfolders are returned for the non-recursive mode. 1182 if (!prefixes.isArray()) { 1183 return errors::Internal( 1184 "'prefixes' was expected to be an array in the GCS response."); 1185 } 1186 for (size_t i = 0; i < prefixes.size(); i++) { 1187 const auto prefix = prefixes.get(i, Json::Value::null); 1188 if (prefix.isNull() || !prefix.isString()) { 1189 return errors::Internal( 1190 "'prefixes' was expected to be an array of strings in the GCS " 1191 "response."); 1192 } 1193 const string& prefix_str = prefix.asString(); 1194 StringPiece relative_path(prefix_str); 1195 if (!relative_path.Consume(object_prefix)) { 1196 return errors::Internal( 1197 "Unexpected response: the returned folder name ", prefix_str, 1198 " doesn't match the prefix ", object_prefix); 1199 } 1200 result->emplace_back(relative_path.ToString()); 1201 if (++retrieved_results >= max_results) { 1202 return Status::OK(); 1203 } 1204 } 1205 } 1206 const auto token = root.get("nextPageToken", Json::Value::null); 1207 if (token.isNull()) { 1208 return Status::OK(); 1209 } 1210 if (!token.isString()) { 1211 return errors::Internal( 1212 "Unexpected response: nextPageToken is not a string"); 1213 } 1214 nextPageToken = token.asString(); 1215 } 1216 } 1217 1218 Status GcsFileSystem::Stat(const string& fname, FileStatistics* stat) { 1219 if (!stat) { 1220 return errors::Internal("'stat' cannot be nullptr."); 1221 } 1222 string bucket, object; 1223 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object)); 1224 if (object.empty()) { 1225 bool is_bucket; 1226 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket)); 1227 if (is_bucket) { 1228 *stat = DIRECTORY_STAT; 1229 return Status::OK(); 1230 } 1231 return errors::NotFound("The specified bucket ", fname, " was not found."); 1232 } 1233 1234 const Status status = StatForObject(fname, bucket, object, stat); 1235 if (status.ok()) { 1236 return Status::OK(); 1237 } 1238 if (status.code() != errors::Code::NOT_FOUND) { 1239 return status; 1240 } 1241 bool is_folder; 1242 TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder)); 1243 if (is_folder) { 1244 *stat = DIRECTORY_STAT; 1245 return Status::OK(); 1246 } 1247 return errors::NotFound("The specified path ", fname, " was not found."); 1248 } 1249 1250 Status GcsFileSystem::DeleteFile(const string& fname) { 1251 string bucket, object; 1252 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); 1253 1254 std::unique_ptr<HttpRequest> request; 1255 TF_RETURN_IF_ERROR(CreateHttpRequest(&request)); 1256 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/", 1257 request->EscapeString(object))); 1258 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata); 1259 request->SetDeleteRequest(); 1260 1261 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname); 1262 file_block_cache_->RemoveFile(fname); 1263 return Status::OK(); 1264 } 1265 1266 Status GcsFileSystem::CreateDir(const string& dirname) { 1267 string bucket, object; 1268 TF_RETURN_IF_ERROR(ParseGcsPath(dirname, true, &bucket, &object)); 1269 if (object.empty()) { 1270 bool is_bucket; 1271 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket)); 1272 return is_bucket ? Status::OK() 1273 : errors::NotFound("The specified bucket ", dirname, 1274 " was not found."); 1275 } 1276 // Create a zero-length directory marker object. 1277 std::unique_ptr<WritableFile> file; 1278 TF_RETURN_IF_ERROR(NewWritableFile(MaybeAppendSlash(dirname), &file)); 1279 TF_RETURN_IF_ERROR(file->Close()); 1280 return Status::OK(); 1281 } 1282 1283 // Checks that the directory is empty (i.e no objects with this prefix exist). 1284 // Deletes the GCS directory marker if it exists. 1285 Status GcsFileSystem::DeleteDir(const string& dirname) { 1286 std::vector<string> children; 1287 // A directory is considered empty either if there are no matching objects 1288 // with the corresponding name prefix or if there is exactly one matching 1289 // object and it is the directory marker. Therefore we need to retrieve 1290 // at most two children for the prefix to detect if a directory is empty. 1291 TF_RETURN_IF_ERROR( 1292 GetChildrenBounded(dirname, 2, &children, true /* recursively */, 1293 true /* include_self_directory_marker */)); 1294 1295 if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) { 1296 return errors::FailedPrecondition("Cannot delete a non-empty directory."); 1297 } 1298 if (children.size() == 1 && children[0].empty()) { 1299 // This is the directory marker object. Delete it. 1300 return DeleteFile(MaybeAppendSlash(dirname)); 1301 } 1302 return Status::OK(); 1303 } 1304 1305 Status GcsFileSystem::GetFileSize(const string& fname, uint64* file_size) { 1306 if (!file_size) { 1307 return errors::Internal("'file_size' cannot be nullptr."); 1308 } 1309 1310 // Only validate the name. 1311 string bucket, object; 1312 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); 1313 1314 FileStatistics stat; 1315 TF_RETURN_IF_ERROR(Stat(fname, &stat)); 1316 *file_size = stat.length; 1317 return Status::OK(); 1318 } 1319 1320 Status GcsFileSystem::RenameFile(const string& src, const string& target) { 1321 if (!IsDirectory(src).ok()) { 1322 return RenameObject(src, target); 1323 } 1324 // Rename all individual objects in the directory one by one. 1325 std::vector<string> children; 1326 TF_RETURN_IF_ERROR( 1327 GetChildrenBounded(src, UINT64_MAX, &children, true /* recursively */, 1328 true /* include_self_directory_marker */)); 1329 for (const string& subpath : children) { 1330 TF_RETURN_IF_ERROR( 1331 RenameObject(JoinGcsPath(src, subpath), JoinGcsPath(target, subpath))); 1332 } 1333 return Status::OK(); 1334 } 1335 1336 // Uses a GCS API command to copy the object and then deletes the old one. 1337 Status GcsFileSystem::RenameObject(const string& src, const string& target) { 1338 string src_bucket, src_object, target_bucket, target_object; 1339 TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object)); 1340 TF_RETURN_IF_ERROR( 1341 ParseGcsPath(target, false, &target_bucket, &target_object)); 1342 1343 std::unique_ptr<HttpRequest> request; 1344 TF_RETURN_IF_ERROR(CreateHttpRequest(&request)); 1345 request->SetUri(strings::StrCat(kGcsUriBase, "b/", src_bucket, "/o/", 1346 request->EscapeString(src_object), 1347 "/rewriteTo/b/", target_bucket, "/o/", 1348 request->EscapeString(target_object))); 1349 request->SetPostEmptyBody(); 1350 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata); 1351 std::vector<char> output_buffer; 1352 request->SetResultBuffer(&output_buffer); 1353 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src, 1354 " to ", target); 1355 // Flush the target from the block cache. The source will be flushed in the 1356 // DeleteFile call below. 1357 file_block_cache_->RemoveFile(target); 1358 Json::Value root; 1359 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root)); 1360 bool done; 1361 TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done)); 1362 if (!done) { 1363 // If GCS didn't complete rewrite in one call, this means that a large file 1364 // is being copied to a bucket with a different storage class or location, 1365 // which requires multiple rewrite calls. 1366 // TODO(surkov): implement multi-step rewrites. 1367 return errors::Unimplemented( 1368 "Couldn't rename ", src, " to ", target, 1369 ": moving large files between buckets with different " 1370 "locations or storage classes is not supported."); 1371 } 1372 1373 // In case the delete API call failed, but the deletion actually happened 1374 // on the server side, we can't just retry the whole RenameFile operation 1375 // because the source object is already gone. 1376 return RetryingUtils::DeleteWithRetries( 1377 std::bind(&GcsFileSystem::DeleteFile, this, src), 1378 initial_retry_delay_usec_); 1379 } 1380 1381 Status GcsFileSystem::IsDirectory(const string& fname) { 1382 string bucket, object; 1383 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object)); 1384 if (object.empty()) { 1385 bool is_bucket; 1386 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket)); 1387 if (is_bucket) { 1388 return Status::OK(); 1389 } 1390 return errors::NotFound("The specified bucket gs://", bucket, 1391 " was not found."); 1392 } 1393 bool is_folder; 1394 TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder)); 1395 if (is_folder) { 1396 return Status::OK(); 1397 } 1398 bool is_object; 1399 TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &is_object)); 1400 if (is_object) { 1401 return errors::FailedPrecondition("The specified path ", fname, 1402 " is not a directory."); 1403 } 1404 return errors::NotFound("The specified path ", fname, " was not found."); 1405 } 1406 1407 Status GcsFileSystem::DeleteRecursively(const string& dirname, 1408 int64* undeleted_files, 1409 int64* undeleted_dirs) { 1410 if (!undeleted_files || !undeleted_dirs) { 1411 return errors::Internal( 1412 "'undeleted_files' and 'undeleted_dirs' cannot be nullptr."); 1413 } 1414 *undeleted_files = 0; 1415 *undeleted_dirs = 0; 1416 if (!IsDirectory(dirname).ok()) { 1417 *undeleted_dirs = 1; 1418 return Status( 1419 error::NOT_FOUND, 1420 strings::StrCat(dirname, " doesn't exist or not a directory.")); 1421 } 1422 std::vector<string> all_objects; 1423 // Get all children in the directory recursively. 1424 TF_RETURN_IF_ERROR(GetChildrenBounded( 1425 dirname, UINT64_MAX, &all_objects, true /* recursively */, 1426 true /* include_self_directory_marker */)); 1427 for (const string& object : all_objects) { 1428 const string& full_path = JoinGcsPath(dirname, object); 1429 // Delete all objects including directory markers for subfolders. 1430 // Since DeleteRecursively returns OK if individual file deletions fail, 1431 // and therefore RetryingFileSystem won't pay attention to the failures, 1432 // we need to make sure these failures are properly retried. 1433 const auto& delete_file_status = RetryingUtils::DeleteWithRetries( 1434 std::bind(&GcsFileSystem::DeleteFile, this, full_path), 1435 initial_retry_delay_usec_); 1436 if (!delete_file_status.ok()) { 1437 if (IsDirectory(full_path).ok()) { 1438 // The object is a directory marker. 1439 (*undeleted_dirs)++; 1440 } else { 1441 (*undeleted_files)++; 1442 } 1443 } 1444 } 1445 return Status::OK(); 1446 } 1447 1448 // Flushes all caches for filesystem metadata and file contents. Useful for 1449 // reclaiming memory once filesystem operations are done (e.g. model is loaded), 1450 // or for resetting the filesystem to a consistent state. 1451 void GcsFileSystem::FlushCaches() { 1452 file_block_cache_->Flush(); 1453 stat_cache_->Clear(); 1454 matching_paths_cache_->Clear(); 1455 } 1456 1457 // Creates an HttpRequest and sets several parameters that are common to all 1458 // requests. All code (in GcsFileSystem) that creates an HttpRequest should 1459 // go through this method, rather than directly using http_request_factory_. 1460 Status GcsFileSystem::CreateHttpRequest(std::unique_ptr<HttpRequest>* request) { 1461 std::unique_ptr<HttpRequest> new_request{http_request_factory_->Create()}; 1462 if (dns_cache_) { 1463 dns_cache_->AnnotateRequest(new_request.get()); 1464 } 1465 1466 string auth_token; 1467 TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token)); 1468 1469 new_request->AddAuthBearerHeader(auth_token); 1470 1471 if (additional_header_) { 1472 new_request->AddHeader(additional_header_->first, 1473 additional_header_->second); 1474 } 1475 1476 if (!throttle_.AdmitRequest()) { 1477 return errors::Unavailable("Request throttled"); 1478 } 1479 1480 *request = std::move(new_request); 1481 return Status::OK(); 1482 } 1483 1484 REGISTER_FILE_SYSTEM("gs", RetryingGcsFileSystem); 1485 1486 } // namespace tensorflow 1487