Home | History | Annotate | Download | only in gpu
      1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/common_runtime/allocator_retry.h"
     17 
     18 #include <vector>
     19 #include "tensorflow/core/lib/core/notification.h"
     20 #include "tensorflow/core/platform/env.h"
     21 #include "tensorflow/core/platform/logging.h"
     22 #include "tensorflow/core/platform/mutex.h"
     23 #include "tensorflow/core/platform/test.h"
     24 #include "tensorflow/core/platform/thread_annotations.h"
     25 #include "tensorflow/core/platform/types.h"
     26 
     27 namespace tensorflow {
     28 namespace {
     29 
     30 class FakeAllocator {
     31  public:
     32   FakeAllocator(size_t cap, int millis_to_wait)
     33       : memory_capacity_(cap), millis_to_wait_(millis_to_wait) {}
     34 
     35   // Allocate just keeps track of the number of outstanding allocations,
     36   // not their sizes.  Assume a constant size for each.
     37   void* AllocateRaw(size_t alignment, size_t num_bytes) {
     38     return retry_.AllocateRaw(
     39         [this](size_t a, size_t nb, bool v) {
     40           mutex_lock l(mu_);
     41           if (memory_capacity_ > 0) {
     42             --memory_capacity_;
     43             return good_ptr_;
     44           } else {
     45             return static_cast<void*>(nullptr);
     46           }
     47         },
     48         millis_to_wait_, alignment, num_bytes);
     49   }
     50 
     51   void DeallocateRaw(void* ptr) {
     52     mutex_lock l(mu_);
     53     ++memory_capacity_;
     54     retry_.NotifyDealloc();
     55   }
     56 
     57  private:
     58   AllocatorRetry retry_;
     59   void* good_ptr_ = reinterpret_cast<void*>(0xdeadbeef);
     60   mutex mu_;
     61   size_t memory_capacity_ GUARDED_BY(mu_);
     62   int millis_to_wait_;
     63 };
     64 
     65 // GPUAllocatorRetry is a mechanism to deal with race conditions which
     66 // are inevitable in the TensorFlow runtime where parallel Nodes can
     67 // execute in any order.  Properly testing this feature would use real
     68 // multi-threaded race conditions, but that leads to flaky tests as
     69 // the expected outcome fails to occur with low but non-zero
     70 // probability.  To make these tests reliable we simulate real race
     71 // conditions by forcing parallel threads to take turns in the
     72 // interesting part of their interaction with the allocator.  This
     73 // class is the mechanism that imposes turn taking.
     74 class AlternatingBarrier {
     75  public:
     76   explicit AlternatingBarrier(int num_users)
     77       : num_users_(num_users), next_turn_(0), done_(num_users, false) {}
     78 
     79   void WaitTurn(int user_index) {
     80     mutex_lock l(mu_);
     81     int wait_cycles = 0;
     82     // A user is allowed to proceed out of turn if it waits too long.
     83     while (next_turn_ != user_index && wait_cycles++ < 10) {
     84       cv_.wait_for(l, std::chrono::milliseconds(1));
     85     }
     86     if (next_turn_ == user_index) {
     87       IncrementTurn();
     88       cv_.notify_all();
     89     }
     90   }
     91 
     92   // When a user quits, stop reserving it a turn.
     93   void Done(int user_index) {
     94     mutex_lock l(mu_);
     95     done_[user_index] = true;
     96     if (next_turn_ == user_index) {
     97       IncrementTurn();
     98       cv_.notify_all();
     99     }
    100   }
    101 
    102  private:
    103   void IncrementTurn() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
    104     int skipped = 0;
    105     while (skipped < num_users_) {
    106       next_turn_ = (next_turn_ + 1) % num_users_;
    107       if (!done_[next_turn_]) return;
    108       ++skipped;
    109     }
    110   }
    111 
    112   mutex mu_;
    113   condition_variable cv_;
    114   int num_users_;
    115   int next_turn_ GUARDED_BY(mu_);
    116   std::vector<bool> done_ GUARDED_BY(mu_);
    117 };
    118 
    119 class GPUAllocatorRetryTest : public ::testing::Test {
    120  protected:
    121   GPUAllocatorRetryTest() {}
    122 
    123   void LaunchConsumerThreads(int num_consumers, int cap_needed) {
    124     barrier_.reset(new AlternatingBarrier(num_consumers));
    125     consumer_count_.resize(num_consumers, 0);
    126     for (int i = 0; i < num_consumers; ++i) {
    127       consumers_.push_back(Env::Default()->StartThread(
    128           ThreadOptions(), "anon_thread", [this, i, cap_needed]() {
    129             do {
    130               void* ptr = nullptr;
    131               for (int j = 0; j < cap_needed; ++j) {
    132                 barrier_->WaitTurn(i);
    133                 ptr = alloc_->AllocateRaw(16, 1);
    134                 if (ptr == nullptr) {
    135                   mutex_lock l(mu_);
    136                   has_failed_ = true;
    137                   barrier_->Done(i);
    138                   return;
    139                 }
    140               }
    141               ++consumer_count_[i];
    142               for (int j = 0; j < cap_needed; ++j) {
    143                 barrier_->WaitTurn(i);
    144                 alloc_->DeallocateRaw(ptr);
    145               }
    146             } while (!notifier_.HasBeenNotified());
    147             barrier_->Done(i);
    148           }));
    149     }
    150   }
    151 
    152   // Wait up to wait_micros microseconds for has_failed_ to equal expected,
    153   // then terminate all threads.
    154   void JoinConsumerThreads(bool expected, int wait_micros) {
    155     while (wait_micros > 0) {
    156       {
    157         mutex_lock l(mu_);
    158         if (has_failed_ == expected) break;
    159       }
    160       int interval_micros = std::min(1000, wait_micros);
    161       Env::Default()->SleepForMicroseconds(interval_micros);
    162       wait_micros -= interval_micros;
    163     }
    164     notifier_.Notify();
    165     for (auto c : consumers_) {
    166       // Blocks until thread terminates.
    167       delete c;
    168     }
    169   }
    170 
    171   std::unique_ptr<FakeAllocator> alloc_;
    172   std::unique_ptr<AlternatingBarrier> barrier_;
    173   std::vector<Thread*> consumers_;
    174   std::vector<int> consumer_count_;
    175   Notification notifier_;
    176   mutex mu_;
    177   bool has_failed_ GUARDED_BY(mu_) = false;
    178   int count_ GUARDED_BY(mu_) = 0;
    179 };
    180 
    181 // Verifies correct retrying when memory is slightly overcommitted but
    182 // we allow retry.
    183 TEST_F(GPUAllocatorRetryTest, RetrySuccess) {
    184   // Support up to 2 allocations simultaneously, waits up to 1000 msec for
    185   // a chance to alloc.
    186   alloc_.reset(new FakeAllocator(2, 1000));
    187   // Launch 3 consumers, each of whom needs 1 unit at a time.
    188   LaunchConsumerThreads(3, 1);
    189   // This should be enough time for each consumer to be satisfied many times.
    190   Env::Default()->SleepForMicroseconds(50000);
    191   JoinConsumerThreads(false, 0);
    192   for (int i = 0; i < 3; ++i) {
    193     LOG(INFO) << "Consumer " << i << " is " << consumer_count_[i];
    194   }
    195   {
    196     mutex_lock l(mu_);
    197     EXPECT_FALSE(has_failed_);
    198   }
    199   EXPECT_GT(consumer_count_[0], 0);
    200   EXPECT_GT(consumer_count_[1], 0);
    201   EXPECT_GT(consumer_count_[2], 0);
    202 }
    203 
    204 // Verifies OutOfMemory failure when memory is slightly overcommitted
    205 // and retry is not allowed.  Note that this test will fail, i.e. no
    206 // memory alloc failure will be detected, if it is run in a context that
    207 // does not permit real multi-threaded execution.
    208 TEST_F(GPUAllocatorRetryTest, NoRetryFail) {
    209   // Support up to 2 allocations simultaneously, waits up to 0 msec for
    210   // a chance to alloc.
    211   alloc_.reset(new FakeAllocator(2, 0));
    212   // Launch 3 consumers, each of whom needs 1 unit at a time.
    213   LaunchConsumerThreads(3, 1);
    214   Env::Default()->SleepForMicroseconds(50000);
    215   // Will wait up to 10 seconds for proper race condition to occur, resulting
    216   // in failure.
    217   JoinConsumerThreads(true, 10000000);
    218   for (int i = 0; i < 3; ++i) {
    219     LOG(INFO) << "Consumer " << i << " is " << consumer_count_[i];
    220   }
    221   {
    222     mutex_lock l(mu_);
    223     EXPECT_TRUE(has_failed_);
    224   }
    225 }
    226 
    227 // Verifies OutOfMemory failure when retry is allowed but memory capacity
    228 // is too low even for retry.
    229 TEST_F(GPUAllocatorRetryTest, RetryInsufficientFail) {
    230   // Support up to 2 allocations simultaneously, waits up to 1000 msec for
    231   // a chance to alloc.
    232   alloc_.reset(new FakeAllocator(2, 1000));
    233   // Launch 3 consumers, each of whom needs 2 units at a time.  We expect
    234   // deadlock where 2 consumers each hold 1 unit, and timeout trying to
    235   // get the second.
    236   LaunchConsumerThreads(3, 2);
    237   Env::Default()->SleepForMicroseconds(50000);
    238   // We're forcing a race condition, so this will fail quickly, but
    239   // give it 10 seconds anyway.
    240   JoinConsumerThreads(true, 10000000);
    241   for (int i = 0; i < 3; ++i) {
    242     LOG(INFO) << "Consumer " << i << " is " << consumer_count_[i];
    243   }
    244   {
    245     mutex_lock l(mu_);
    246     EXPECT_TRUE(has_failed_);
    247   }
    248 }
    249 
    250 }  // namespace
    251 }  // namespace tensorflow
    252