Home | History | Annotate | Download | only in kernels
      1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #ifdef GOOGLE_CUDA
     17 
     18 #include <algorithm>
     19 #include <random>
     20 #include <vector>
     21 
     22 #include "tensorflow/contrib/nccl/kernels/nccl_manager.h"
     23 #include "tensorflow/core/common_runtime/device_factory.h"
     24 #include "tensorflow/core/common_runtime/gpu/gpu_device.h"
     25 #include "tensorflow/core/framework/tensor_testutil.h"
     26 #include "tensorflow/core/lib/core/status_test_util.h"
     27 #include "tensorflow/core/platform/test.h"
     28 
     29 namespace tensorflow {
     30 
     31 static std::vector<BaseGPUDevice*> GetGPUDevices() {
     32   std::vector<Device*> devices;
     33   SessionOptions session_options;
     34   session_options.config.mutable_gpu_options()
     35       ->set_per_process_gpu_memory_fraction(0.1);
     36   session_options.env = Env::Default();
     37   Status s = DeviceFactory::GetFactory(DEVICE_GPU)
     38                  ->AddDevices(session_options, "", &devices);
     39   TF_CHECK_OK(s);
     40   std::vector<BaseGPUDevice*> gpus;
     41   for (Device* d : devices) {
     42     if (d->device_type() == "GPU") {
     43       gpus.push_back(static_cast<BaseGPUDevice*>(d));
     44     } else {
     45       delete d;
     46     }
     47   }
     48   return gpus;
     49 }
     50 
     51 class NcclManagerTest : public ::testing::Test {
     52  protected:
     53   static void SetUpTestCase() {
     54     setenv("NCCL_DEBUG", "INFO", 1 /* replace */);
     55     devices = new std::vector<BaseGPUDevice*>(GetGPUDevices());
     56     CHECK(!devices->empty());
     57     LOG(ERROR) << "Running test with " << devices->size() << " gpus";
     58   }
     59   static void TearDownTestCase() {
     60     for (auto device : *devices) delete device;
     61     delete devices;
     62   }
     63 
     64   static Allocator* gpu_allocator(BaseGPUDevice* device) {
     65     return device->GetStepAllocator(AllocatorAttributes(),
     66                                     nullptr /* step_resource_manager */);
     67   }
     68 
     69   static std::vector<BaseGPUDevice*>* devices;
     70 
     71   template <typename Scalar>
     72   perftools::gputools::DeviceMemory<Scalar> AsDeviceMemory(
     73       const Scalar* cuda_memory) {
     74     perftools::gputools::DeviceMemoryBase wrapped(
     75         const_cast<Scalar*>(cuda_memory));
     76     perftools::gputools::DeviceMemory<Scalar> typed(wrapped);
     77     return typed;
     78   }
     79 
     80   // A single all-reduce to apply.
     81   struct TestCase {
     82     string key;
     83     std::vector<Tensor> ins;
     84     std::vector<Tensor> outs;
     85     Tensor expected;
     86 
     87     mutex mu;
     88     Status final_status;
     89     int num_completed = 0;
     90   };
     91 
     92   TestCase* MakeTestCase(int num_ranks, ncclRedOp_t reduction_op,
     93                          TensorShape shape, float value_offset) {
     94     TestCase* test_case = new TestCase();
     95     test_case->expected = Tensor(DT_FLOAT, shape);
     96     if (reduction_op == ncclProd) {
     97       test::FillFn<float>(&test_case->expected, [](int) { return 1; });
     98     } else if (reduction_op == ncclSum) {
     99       test::FillFn<float>(&test_case->expected, [](int) { return 0; });
    100     } else if (reduction_op == ncclMax) {
    101       test::FillFn<float>(&test_case->expected, [](int) {
    102         return -1 * std::numeric_limits<float>::max();
    103       });
    104     } else if (reduction_op == ncclMin) {
    105       test::FillFn<float>(&test_case->expected, [](int) {
    106         return std::numeric_limits<float>::max();
    107       });
    108     } else {
    109       LOG(FATAL) << "Invalid reduction_op " << reduction_op;
    110     }
    111 
    112     int mult = 1;
    113     for (int i = 0; i < num_ranks; ++i) {
    114       auto* device = devices->at(i % devices->size());
    115       auto* stream = device->tensorflow_gpu_device_info()->stream;
    116 
    117       Tensor in_cpu(DT_FLOAT, shape);
    118       test::FillFn<float>(&in_cpu, [mult, value_offset](int index) {
    119         return value_offset + (index + 1) * mult;
    120       });
    121       for (int j = 0; j < shape.num_elements(); ++j) {
    122         auto in_val = in_cpu.flat<float>()(j);
    123         auto out_expr = test_case->expected.flat<float>();
    124         if (reduction_op == ncclProd) {
    125           out_expr(j) *= in_val;
    126         } else if (reduction_op == ncclSum) {
    127           out_expr(j) += in_val;
    128         } else if (reduction_op == ncclMax) {
    129           if (in_val > out_expr(j)) {
    130             out_expr(j) = in_val;
    131           }
    132         } else if (reduction_op == ncclMin) {
    133           if (in_val < out_expr(j)) {
    134             out_expr(j) = in_val;
    135           }
    136         }
    137       }
    138 
    139       mult *= 10;
    140       test_case->ins.emplace_back(gpu_allocator(device), DT_FLOAT, shape);
    141       test_case->outs.emplace_back(gpu_allocator(device), DT_FLOAT, shape);
    142 
    143       const Tensor& in_gpu = test_case->ins.back();
    144       auto in_gpu_mem = AsDeviceMemory(in_gpu.flat<float>().data());
    145       stream->ThenMemcpy(&in_gpu_mem, in_cpu.flat<float>().data(),
    146                          in_cpu.TotalBytes());
    147     }
    148     return test_case;
    149   }
    150 
    151   NcclManager::DoneCallback CreateDoneCallback(TestCase* test_case) {
    152     return [this, test_case](Status s) {
    153       mutex_lock l(test_case->mu);
    154       ++test_case->num_completed;
    155       test_case->final_status.Update(s);
    156     };
    157   }
    158 
    159   void VerifyResults(const string& case_label, TestCase* test_case) {
    160     // Wait for the done callback to be called.
    161     {
    162       test_case->mu.lock();
    163       while (test_case->num_completed != test_case->outs.size()) {
    164         test_case->mu.unlock();
    165         Env::Default()->SleepForMicroseconds(10);
    166         test_case->mu.lock();
    167       }
    168       test_case->mu.unlock();
    169     }
    170     // Copy memory to host and verify.
    171     for (int i = 0; i < test_case->outs.size(); ++i) {
    172       auto* device = devices->at(i % devices->size());
    173       auto* stream = device->tensorflow_gpu_device_info()->stream;
    174       const Tensor& out_gpu = test_case->outs[i];
    175       Tensor out_cpu(DT_FLOAT, out_gpu.shape());
    176       auto out_gpu_mem = AsDeviceMemory(out_gpu.flat<float>().data());
    177       stream->ThenMemcpy(out_cpu.flat<float>().data(), out_gpu_mem,
    178                          out_cpu.TotalBytes());
    179       SE_ASSERT_OK(stream->BlockHostUntilDone());
    180       test::ExpectTensorEqual<float>(test_case->expected, out_cpu);
    181     }
    182   }
    183 };
    184 std::vector<BaseGPUDevice*>* NcclManagerTest::devices = nullptr;
    185 
    186 // Test basic sum reduction.
    187 TEST_F(NcclManagerTest, BasicSumReduction) {
    188   const int num_ranks = 3;
    189 
    190   for (int op = 0; op < 4; ++op) {
    191     ncclRedOp_t reduction_op = static_cast<ncclRedOp_t>(op);
    192     std::unique_ptr<TestCase> test_case(
    193         MakeTestCase(num_ranks, reduction_op, TensorShape({2, 3}), 0));
    194     for (int device_num = 0; device_num < num_ranks; ++device_num) {
    195       auto* device = devices->at(device_num % devices->size());
    196       auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
    197       auto* stream = device->tensorflow_gpu_device_info()->stream;
    198       NcclManager::instance()->AddToAllReduce(
    199           num_ranks, "allreduce", reduction_op, device->executor(),
    200           device->gpu_id(), event_mgr, stream, &test_case->ins[device_num],
    201           &test_case->outs[device_num], CreateDoneCallback(test_case.get()));
    202     }
    203 
    204     LOG(ERROR) << "Verifying results";
    205     VerifyResults("test_case", test_case.get());
    206   }
    207 }
    208 
    209 // Same as the Basic test, but with multiple threads launching parts of many
    210 // reductions.
    211 //
    212 // Testing the multi-rank execution is currently reduced as it can hang when run
    213 // with num_ranks > devices->size(), for some GPUs (e.g. K20m).
    214 // To test the higher settings, increase num_ranks,
    215 // num_collectives_per_iteration and time_limit_micros.
    216 TEST_F(NcclManagerTest, MultipleCallers) {
    217   const int num_ranks = 1;                      // 2;
    218   const int num_collectives_per_iteration = 1;  // 1000;
    219   const int num_threads = 3;
    220   const int time_limit_micros = 1;  // 60 * 30 * 1000 * 1000;
    221 
    222   int64 start = Env::Default()->NowMicros();
    223   srand(Env::Default()->NowMicros());
    224 
    225   for (;;) {
    226     std::vector<std::pair<int, int>> case_and_device_num;
    227     std::vector<std::unique_ptr<TestCase>> test_cases;
    228     for (int i = 0; i < num_collectives_per_iteration; ++i) {
    229       test_cases.emplace_back(
    230           MakeTestCase(num_ranks, ncclSum,
    231                        TensorShape({100, i % 5 + 1, i % 3 + 1}), i + 0.1 * i));
    232       for (int j = 0; j < num_ranks; ++j) {
    233         case_and_device_num.emplace_back(i, j);
    234       }
    235     }
    236 
    237     for (int i = 0; i < num_ranks; ++i) {
    238       auto* device = devices->at(i % devices->size());
    239       auto* stream = device->tensorflow_gpu_device_info()->stream;
    240       SE_ASSERT_OK(stream->BlockHostUntilDone());
    241     }
    242 
    243     std::shuffle(case_and_device_num.begin(), case_and_device_num.end(),
    244                  std::mt19937(std::random_device()()));
    245 
    246     mutex mu;  // guards case_and_device_num.
    247     std::unique_ptr<thread::ThreadPool> pool(
    248         new thread::ThreadPool(Env::Default(), "test", num_threads));
    249     const int to_schedule = case_and_device_num.size();
    250     for (int i = 0; i < to_schedule; ++i) {
    251       auto fn = [&]() {
    252         int device_num;
    253         int test_num;
    254         {
    255           mutex_lock l(mu);
    256           test_num = case_and_device_num.back().first;
    257           device_num = case_and_device_num.back().second;
    258           case_and_device_num.pop_back();
    259         }
    260         auto* device = devices->at(device_num % devices->size());
    261         auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
    262         auto* stream = device->tensorflow_gpu_device_info()->stream;
    263         TestCase* test_case = test_cases[test_num].get();
    264         NcclManager::instance()->AddToAllReduce(
    265             num_ranks, strings::StrCat("allreduce", test_num), ncclSum,
    266             device->executor(), device->gpu_id(), event_mgr, stream,
    267             &test_case->ins[device_num], &test_case->outs[device_num],
    268             CreateDoneCallback(test_case));
    269       };
    270       pool->Schedule(fn);
    271     }
    272     pool.reset();  // wait for all work to be scheduled.
    273 
    274     LOG(ERROR) << "Verifying results for " << num_collectives_per_iteration
    275                << " collectives";
    276     for (int i = 0; i < test_cases.size(); ++i) {
    277       VerifyResults(strings::StrCat("collective", i), test_cases[i].get());
    278     }
    279 
    280     int64 delta = Env::Default()->NowMicros() - start;
    281     if (delta > time_limit_micros) {
    282       LOG(ERROR) << "Ran for " << delta << " quitting";
    283       break;
    284     }
    285   }
    286 }
    287 
    288 }  // namespace tensorflow
    289 
    290 #endif  // GOOGLE_CUDA
    291