1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/platform/cloud/file_block_cache.h" 17 #include <cstring> 18 #include "tensorflow/core/lib/core/blocking_counter.h" 19 #include "tensorflow/core/lib/core/status_test_util.h" 20 #include "tensorflow/core/platform/cloud/now_seconds_env.h" 21 #include "tensorflow/core/platform/env.h" 22 #include "tensorflow/core/platform/notification.h" 23 #include "tensorflow/core/platform/test.h" 24 25 namespace tensorflow { 26 namespace { 27 28 Status ReadCache(FileBlockCache* cache, const string& filename, size_t offset, 29 size_t n, std::vector<char>* out) { 30 out->clear(); 31 out->resize(n, 0); 32 size_t bytes_transferred = 0; 33 Status status = 34 cache->Read(filename, offset, n, out->data(), &bytes_transferred); 35 EXPECT_LE(bytes_transferred, n); 36 out->resize(bytes_transferred, n); 37 return status; 38 } 39 40 TEST(FileBlockCacheTest, PassThrough) { 41 const string want_filename = "foo/bar"; 42 const size_t want_offset = 42; 43 const size_t want_n = 1024; 44 int calls = 0; 45 auto fetcher = [&calls, want_filename, want_offset, want_n]( 46 const string& got_filename, size_t got_offset, 47 size_t got_n, char* buffer, size_t* bytes_transferred) { 48 EXPECT_EQ(got_filename, want_filename); 49 EXPECT_EQ(got_offset, want_offset); 50 EXPECT_EQ(got_n, want_n); 51 calls++; 52 memset(buffer, 'x', got_n); 53 *bytes_transferred = got_n; 54 return Status::OK(); 55 }; 56 // If block_size, max_bytes, or both are zero, the cache is a pass-through. 57 FileBlockCache cache1(1, 0, 0, fetcher); 58 FileBlockCache cache2(0, 1, 0, fetcher); 59 FileBlockCache cache3(0, 0, 0, fetcher); 60 std::vector<char> out; 61 TF_EXPECT_OK(ReadCache(&cache1, want_filename, want_offset, want_n, &out)); 62 EXPECT_EQ(calls, 1); 63 TF_EXPECT_OK(ReadCache(&cache2, want_filename, want_offset, want_n, &out)); 64 EXPECT_EQ(calls, 2); 65 TF_EXPECT_OK(ReadCache(&cache3, want_filename, want_offset, want_n, &out)); 66 EXPECT_EQ(calls, 3); 67 } 68 69 TEST(FileBlockCacheTest, BlockAlignment) { 70 // Initialize a 256-byte buffer. This is the file underlying the reads we'll 71 // do in this test. 72 const size_t size = 256; 73 std::vector<char> buf; 74 for (int i = 0; i < size; i++) { 75 buf.push_back(i); 76 } 77 // The fetcher just fetches slices of the buffer. 78 auto fetcher = [&buf](const string& filename, size_t offset, size_t n, 79 char* buffer, size_t* bytes_transferred) { 80 if (offset < buf.size()) { 81 size_t bytes_to_copy = std::min<size_t>(buf.size() - offset, n); 82 memcpy(buffer, buf.data() + offset, bytes_to_copy); 83 *bytes_transferred = bytes_to_copy; 84 } else { 85 *bytes_transferred = 0; 86 } 87 return Status::OK(); 88 }; 89 for (size_t block_size = 2; block_size <= 4; block_size++) { 90 // Make a cache of N-byte block size (1 block) and verify that reads of 91 // varying offsets and lengths return correct data. 92 FileBlockCache cache(block_size, block_size, 0, fetcher); 93 for (size_t offset = 0; offset < 10; offset++) { 94 for (size_t n = block_size - 2; n <= block_size + 2; n++) { 95 std::vector<char> got; 96 TF_EXPECT_OK(ReadCache(&cache, "", offset, n, &got)); 97 // Verify the size of the read. 98 if (offset + n <= size) { 99 // Expect a full read. 100 EXPECT_EQ(got.size(), n) << "block size = " << block_size 101 << ", offset = " << offset << ", n = " << n; 102 } else { 103 // Expect a partial read. 104 EXPECT_EQ(got.size(), size - offset) 105 << "block size = " << block_size << ", offset = " << offset 106 << ", n = " << n; 107 } 108 // Verify the contents of the read. 109 std::vector<char>::const_iterator begin = buf.begin() + offset; 110 std::vector<char>::const_iterator end = 111 offset + n > buf.size() ? buf.end() : begin + n; 112 std::vector<char> want(begin, end); 113 EXPECT_EQ(got, want) << "block size = " << block_size 114 << ", offset = " << offset << ", n = " << n; 115 } 116 } 117 } 118 } 119 120 TEST(FileBlockCacheTest, CacheHits) { 121 const size_t block_size = 16; 122 std::set<size_t> calls; 123 auto fetcher = [&calls, block_size](const string& filename, size_t offset, 124 size_t n, char* buffer, 125 size_t* bytes_transferred) { 126 EXPECT_EQ(n, block_size); 127 EXPECT_EQ(offset % block_size, 0); 128 EXPECT_EQ(calls.find(offset), calls.end()) << "at offset " << offset; 129 calls.insert(offset); 130 memset(buffer, 'x', n); 131 *bytes_transferred = n; 132 return Status::OK(); 133 }; 134 const uint32 block_count = 256; 135 FileBlockCache cache(block_size, block_count * block_size, 0, fetcher); 136 std::vector<char> out; 137 out.resize(block_count, 0); 138 // The cache has space for `block_count` blocks. The loop with i = 0 should 139 // fill the cache, and the loop with i = 1 should be all cache hits. The 140 // fetcher checks that it is called once and only once for each offset (to 141 // fetch the corresponding block). 142 for (int i = 0; i < 2; i++) { 143 for (int j = 0; j < block_count; j++) { 144 TF_EXPECT_OK(ReadCache(&cache, "", block_size * j, block_size, &out)); 145 } 146 } 147 } 148 149 TEST(FileBlockCacheTest, OutOfRange) { 150 // Tests reads of a 24-byte file with block size 16. 151 const size_t block_size = 16; 152 const size_t file_size = 24; 153 bool first_block = false; 154 bool second_block = false; 155 auto fetcher = [block_size, file_size, &first_block, &second_block]( 156 const string& filename, size_t offset, size_t n, 157 char* buffer, size_t* bytes_transferred) { 158 EXPECT_EQ(n, block_size); 159 EXPECT_EQ(offset % block_size, 0); 160 size_t bytes_to_copy = 0; 161 if (offset == 0) { 162 // The first block (16 bytes) of the file. 163 memset(buffer, 'x', n); 164 bytes_to_copy = n; 165 first_block = true; 166 } else if (offset == block_size) { 167 // The second block (8 bytes) of the file. 168 bytes_to_copy = file_size - block_size; 169 memset(buffer, 'x', bytes_to_copy); 170 second_block = true; 171 } 172 *bytes_transferred = bytes_to_copy; 173 return Status::OK(); 174 }; 175 FileBlockCache cache(block_size, block_size, 0, fetcher); 176 std::vector<char> out; 177 // Reading the first 16 bytes should be fine. 178 TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size, &out)); 179 EXPECT_TRUE(first_block); 180 EXPECT_EQ(out.size(), block_size); 181 // Reading at offset file_size + 4 will read the second block (since the read 182 // at file_size + 4 = 28 will be aligned to an offset of 16) but will return 183 // OutOfRange because the offset is past the end of the 24-byte file. 184 Status status = ReadCache(&cache, "", file_size + 4, 4, &out); 185 EXPECT_EQ(status.code(), error::OUT_OF_RANGE); 186 EXPECT_TRUE(second_block); 187 // Reading the second full block will return 8 bytes, from a cache hit. 188 second_block = false; 189 TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out)); 190 EXPECT_FALSE(second_block); 191 EXPECT_EQ(out.size(), file_size - block_size); 192 } 193 194 TEST(FileBlockCacheTest, Inconsistent) { 195 // Tests the detection of interrupted reads leading to partially filled blocks 196 // where we expected complete blocks. 197 const size_t block_size = 16; 198 // This fetcher returns OK but only fills in one byte for any offset. 199 auto fetcher = [block_size](const string& filename, size_t offset, size_t n, 200 char* buffer, size_t* bytes_transferred) { 201 EXPECT_EQ(n, block_size); 202 EXPECT_EQ(offset % block_size, 0); 203 EXPECT_GE(n, 1); 204 memset(buffer, 'x', 1); 205 *bytes_transferred = 1; 206 return Status::OK(); 207 }; 208 FileBlockCache cache(block_size, 2 * block_size, 0, fetcher); 209 std::vector<char> out; 210 // Read the second block; this should yield an OK status and a single byte. 211 TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out)); 212 EXPECT_EQ(out.size(), 1); 213 // Now read the first block; this should yield an INTERNAL error because we 214 // had already cached a partial block at a later position. 215 Status status = ReadCache(&cache, "", 0, block_size, &out); 216 EXPECT_EQ(status.code(), error::INTERNAL); 217 } 218 219 TEST(FileBlockCacheTest, LRU) { 220 const size_t block_size = 16; 221 std::list<size_t> calls; 222 auto fetcher = [&calls, block_size](const string& filename, size_t offset, 223 size_t n, char* buffer, 224 size_t* bytes_transferred) { 225 EXPECT_EQ(n, block_size); 226 EXPECT_FALSE(calls.empty()) << "at offset = " << offset; 227 if (!calls.empty()) { 228 EXPECT_EQ(offset, calls.front()); 229 calls.pop_front(); 230 } 231 memset(buffer, 'x', n); 232 *bytes_transferred = n; 233 return Status::OK(); 234 }; 235 const uint32 block_count = 2; 236 FileBlockCache cache(block_size, block_count * block_size, 0, fetcher); 237 std::vector<char> out; 238 // Read blocks from the cache, and verify the LRU behavior based on the 239 // fetcher calls that the cache makes. 240 calls.push_back(0); 241 // Cache miss - drains an element from `calls`. 242 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); 243 // Cache hit - does not drain an element from `calls`. 244 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); 245 calls.push_back(block_size); 246 // Cache miss followed by cache hit. 247 TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out)); 248 TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out)); 249 calls.push_back(2 * block_size); 250 // Cache miss followed by cache hit. Causes eviction of LRU element. 251 TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out)); 252 TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out)); 253 // LRU element was at offset 0. Cache miss. 254 calls.push_back(0); 255 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); 256 // Element at 2 * block_size is still in cache, and this read should update 257 // its position in the LRU list so it doesn't get evicted by the next read. 258 TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out)); 259 // Element at block_size was evicted. Reading this element will also cause 260 // the LRU element (at 0) to be evicted. 261 calls.push_back(block_size); 262 TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out)); 263 // Element at 0 was evicted again. 264 calls.push_back(0); 265 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); 266 } 267 268 TEST(FileBlockCacheTest, MaxStaleness) { 269 int calls = 0; 270 auto fetcher = [&calls](const string& filename, size_t offset, size_t n, 271 char* buffer, size_t* bytes_transferred) { 272 calls++; 273 memset(buffer, 'x', n); 274 *bytes_transferred = n; 275 return Status::OK(); 276 }; 277 std::vector<char> out; 278 std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv); 279 // Create a cache with max staleness of 2 seconds, and verify that it works as 280 // expected. 281 FileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get()); 282 // Execute the first read to load the block. 283 TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out)); 284 EXPECT_EQ(calls, 1); 285 // Now advance the clock one second at a time and redo the read. The call 286 // count should advance every 3 seconds (i.e. every time the staleness is 287 // greater than 2). 288 for (int i = 1; i <= 10; i++) { 289 env->SetNowSeconds(i + 1); 290 TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out)); 291 EXPECT_EQ(calls, 1 + i / 3); 292 } 293 // Now create a cache with max staleness of 0, and verify that it also works 294 // as expected. 295 calls = 0; 296 env->SetNowSeconds(0); 297 FileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get()); 298 // Execute the first read to load the block. 299 TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out)); 300 EXPECT_EQ(calls, 1); 301 // Advance the clock by a huge amount and verify that the cached block is 302 // used to satisfy the read. 303 env->SetNowSeconds(365 * 24 * 60 * 60); // ~1 year, just for fun. 304 TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out)); 305 EXPECT_EQ(calls, 1); 306 } 307 308 TEST(FileBlockCacheTest, RemoveFile) { 309 int calls = 0; 310 auto fetcher = [&calls](const string& filename, size_t offset, size_t n, 311 char* buffer, size_t* bytes_transferred) { 312 calls++; 313 char c = (filename == "a") ? 'a' : (filename == "b") ? 'b' : 'x'; 314 if (offset > 0) { 315 // The first block is lower case and all subsequent blocks are upper case. 316 c = toupper(c); 317 } 318 memset(buffer, c, n); 319 *bytes_transferred = n; 320 return Status::OK(); 321 }; 322 // This cache has space for 4 blocks; we'll read from two files. 323 const size_t n = 3; 324 FileBlockCache cache(8, 32, 0, fetcher); 325 std::vector<char> out; 326 std::vector<char> a(n, 'a'); 327 std::vector<char> b(n, 'b'); 328 std::vector<char> A(n, 'A'); 329 std::vector<char> B(n, 'B'); 330 // Fill the cache. 331 TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out)); 332 EXPECT_EQ(out, a); 333 EXPECT_EQ(calls, 1); 334 TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out)); 335 EXPECT_EQ(out, A); 336 EXPECT_EQ(calls, 2); 337 TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out)); 338 EXPECT_EQ(out, b); 339 EXPECT_EQ(calls, 3); 340 TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out)); 341 EXPECT_EQ(out, B); 342 EXPECT_EQ(calls, 4); 343 // All four blocks should be in the cache now. 344 TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out)); 345 EXPECT_EQ(out, a); 346 TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out)); 347 EXPECT_EQ(out, A); 348 TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out)); 349 EXPECT_EQ(out, b); 350 TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out)); 351 EXPECT_EQ(out, B); 352 EXPECT_EQ(calls, 4); 353 // Remove the blocks from "a". 354 cache.RemoveFile("a"); 355 // Both blocks from "b" should still be there. 356 TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out)); 357 EXPECT_EQ(out, b); 358 TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out)); 359 EXPECT_EQ(out, B); 360 EXPECT_EQ(calls, 4); 361 // The blocks from "a" should not be there. 362 TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out)); 363 EXPECT_EQ(out, a); 364 EXPECT_EQ(calls, 5); 365 TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out)); 366 EXPECT_EQ(out, A); 367 EXPECT_EQ(calls, 6); 368 } 369 370 TEST(FileBlockCacheTest, Prune) { 371 int calls = 0; 372 auto fetcher = [&calls](const string& filename, size_t offset, size_t n, 373 char* buffer, size_t* bytes_transferred) { 374 calls++; 375 memset(buffer, 'x', n); 376 *bytes_transferred = n; 377 return Status::OK(); 378 }; 379 std::vector<char> out; 380 // Our fake environment is initialized with the current timestamp. 381 std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv); 382 uint64 now = Env::Default()->NowSeconds(); 383 env->SetNowSeconds(now); 384 FileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get()); 385 // Read three blocks into the cache, and advance the timestamp by one second 386 // with each read. Start with a block of "a" at the current timestamp `now`. 387 TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out)); 388 // Now load a block of a different file "b" at timestamp `now` + 1 389 env->SetNowSeconds(now + 1); 390 TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out)); 391 // Now load a different block of file "a" at timestamp `now` + 1. When the 392 // first block of "a" expires, this block should also be removed because it 393 // also belongs to file "a". 394 TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out)); 395 // Ensure that all blocks are in the cache (i.e. reads are cache hits). 396 EXPECT_EQ(cache.CacheSize(), 24); 397 EXPECT_EQ(calls, 3); 398 TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out)); 399 TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out)); 400 TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out)); 401 EXPECT_EQ(calls, 3); 402 // Advance the fake timestamp so that "a" becomes stale via its first block. 403 env->SetNowSeconds(now + 2); 404 // The pruning thread periodically compares env->NowSeconds() with the oldest 405 // block's timestamp to see if it should evict any files. At the current fake 406 // timestamp of `now` + 2, file "a" is stale because its first block is stale, 407 // but file "b" is not stale yet. Thus, once the pruning thread wakes up (in 408 // one second of wall time), it should remove "a" and leave "b" alone. 409 uint64 start = Env::Default()->NowSeconds(); 410 do { 411 Env::Default()->SleepForMicroseconds(100000); 412 } while (cache.CacheSize() == 24 && Env::Default()->NowSeconds() - start < 3); 413 // There should be one block left in the cache, and it should be the first 414 // block of "b". 415 EXPECT_EQ(cache.CacheSize(), 8); 416 TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out)); 417 EXPECT_EQ(calls, 3); 418 // Advance the fake time to `now` + 3, at which point "b" becomes stale. 419 env->SetNowSeconds(now + 3); 420 // Wait for the pruner to remove "b". 421 start = Env::Default()->NowSeconds(); 422 do { 423 Env::Default()->SleepForMicroseconds(100000); 424 } while (cache.CacheSize() == 8 && Env::Default()->NowSeconds() - start < 3); 425 // The cache should now be empty. 426 EXPECT_EQ(cache.CacheSize(), 0); 427 } 428 429 TEST(FileBlockCacheTest, ParallelReads) { 430 // This fetcher won't respond until either `callers` threads are calling it 431 // concurrently (at which point it will respond with success to all callers), 432 // or 10 seconds have elapsed (at which point it will respond with an error). 433 const int callers = 4; 434 BlockingCounter counter(callers); 435 auto fetcher = [&counter](const string& filename, size_t offset, size_t n, 436 char* buffer, size_t* bytes_transferred) { 437 counter.DecrementCount(); 438 if (!counter.WaitFor(std::chrono::seconds(10))) { 439 // This avoids having the test time out, which is harder to debug. 440 return errors::FailedPrecondition("desired concurrency not reached"); 441 } 442 memset(buffer, 'x', n); 443 *bytes_transferred = n; 444 return Status::OK(); 445 }; 446 const int block_size = 8; 447 FileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher); 448 std::vector<std::unique_ptr<Thread>> threads; 449 for (int i = 0; i < callers; i++) { 450 threads.emplace_back( 451 Env::Default()->StartThread({}, "caller", [&cache, i, block_size]() { 452 std::vector<char> out; 453 TF_EXPECT_OK( 454 ReadCache(&cache, "a", i * block_size, block_size, &out)); 455 std::vector<char> x(block_size, 'x'); 456 EXPECT_EQ(out, x); 457 })); 458 } 459 // The `threads` destructor blocks until the threads can be joined, once their 460 // respective reads finish (which happens once they are all concurrently being 461 // executed, or 10 seconds have passed). 462 } 463 464 TEST(FileBlockCacheTest, CoalesceConcurrentReads) { 465 // Concurrent reads to the same file blocks should be de-duplicated. 466 const size_t block_size = 16; 467 int num_requests = 0; 468 Notification notification; 469 auto fetcher = [&num_requests, ¬ification, block_size]( 470 const string& filename, size_t offset, size_t n, 471 char* buffer, size_t* bytes_transferred) { 472 EXPECT_EQ(n, block_size); 473 EXPECT_EQ(offset, 0); 474 num_requests++; 475 memset(buffer, 'x', n); 476 *bytes_transferred = n; 477 notification.Notify(); 478 // Wait for other thread to issue read. 479 Env::Default()->SleepForMicroseconds(100000); // 0.1 secs 480 return Status::OK(); 481 }; 482 FileBlockCache cache(block_size, block_size, 0, fetcher); 483 // Fork off thread for parallel read. 484 std::unique_ptr<Thread> concurrent( 485 Env::Default()->StartThread({}, "concurrent", [&cache, block_size] { 486 std::vector<char> out; 487 TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size / 2, &out)); 488 EXPECT_EQ(out.size(), block_size / 2); 489 })); 490 EXPECT_TRUE(WaitForNotificationWithTimeout(¬ification, 10000)) 491 << "Timeout waiting for concurrent thread to start."; 492 std::vector<char> out; 493 TF_EXPECT_OK(ReadCache(&cache, "", block_size / 2, block_size / 2, &out)); 494 EXPECT_EQ(out.size(), block_size / 2); 495 496 EXPECT_EQ(1, num_requests); 497 } 498 499 TEST(FileBlockCacheTest, Flush) { 500 int calls = 0; 501 auto fetcher = [&calls](const string& filename, size_t offset, size_t n, 502 char* buffer, size_t* bytes_transferred) { 503 calls++; 504 memset(buffer, 'x', n); 505 *bytes_transferred = n; 506 return Status::OK(); 507 }; 508 FileBlockCache cache(16, 32, 0, fetcher); 509 std::vector<char> out; 510 TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); 511 TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); 512 EXPECT_EQ(calls, 1); 513 cache.Flush(); 514 TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); 515 EXPECT_EQ(calls, 2); 516 } 517 518 } // namespace 519 } // namespace tensorflow 520