1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifdef GOOGLE_CUDA 17 18 #include <algorithm> 19 #include <random> 20 #include <vector> 21 22 #include "tensorflow/contrib/nccl/kernels/nccl_manager.h" 23 #include "tensorflow/core/common_runtime/device_factory.h" 24 #include "tensorflow/core/common_runtime/gpu/gpu_device.h" 25 #include "tensorflow/core/framework/tensor_testutil.h" 26 #include "tensorflow/core/lib/core/status_test_util.h" 27 #include "tensorflow/core/platform/test.h" 28 29 namespace tensorflow { 30 31 static std::vector<BaseGPUDevice*> GetGPUDevices() { 32 std::vector<Device*> devices; 33 SessionOptions session_options; 34 session_options.config.mutable_gpu_options() 35 ->set_per_process_gpu_memory_fraction(0.1); 36 session_options.env = Env::Default(); 37 Status s = DeviceFactory::GetFactory(DEVICE_GPU) 38 ->AddDevices(session_options, "", &devices); 39 TF_CHECK_OK(s); 40 std::vector<BaseGPUDevice*> gpus; 41 for (Device* d : devices) { 42 if (d->device_type() == "GPU") { 43 gpus.push_back(static_cast<BaseGPUDevice*>(d)); 44 } else { 45 delete d; 46 } 47 } 48 return gpus; 49 } 50 51 class NcclManagerTest : public ::testing::Test { 52 protected: 53 static void SetUpTestCase() { 54 setenv("NCCL_DEBUG", "INFO", 1 /* replace */); 55 devices = new std::vector<BaseGPUDevice*>(GetGPUDevices()); 56 CHECK(!devices->empty()); 57 LOG(ERROR) << "Running test with " << devices->size() << " gpus"; 58 } 59 static void TearDownTestCase() { 60 for (auto device : *devices) delete device; 61 delete devices; 62 } 63 64 static Allocator* gpu_allocator(BaseGPUDevice* device) { 65 return device->GetStepAllocator(AllocatorAttributes(), 66 nullptr /* step_resource_manager */); 67 } 68 69 static std::vector<BaseGPUDevice*>* devices; 70 71 template <typename Scalar> 72 perftools::gputools::DeviceMemory<Scalar> AsDeviceMemory( 73 const Scalar* cuda_memory) { 74 perftools::gputools::DeviceMemoryBase wrapped( 75 const_cast<Scalar*>(cuda_memory)); 76 perftools::gputools::DeviceMemory<Scalar> typed(wrapped); 77 return typed; 78 } 79 80 // A single all-reduce to apply. 81 struct TestCase { 82 string key; 83 std::vector<Tensor> ins; 84 std::vector<Tensor> outs; 85 Tensor expected; 86 87 mutex mu; 88 Status final_status; 89 int num_completed = 0; 90 }; 91 92 TestCase* MakeTestCase(int num_ranks, ncclRedOp_t reduction_op, 93 TensorShape shape, float value_offset) { 94 TestCase* test_case = new TestCase(); 95 test_case->expected = Tensor(DT_FLOAT, shape); 96 if (reduction_op == ncclProd) { 97 test::FillFn<float>(&test_case->expected, [](int) { return 1; }); 98 } else if (reduction_op == ncclSum) { 99 test::FillFn<float>(&test_case->expected, [](int) { return 0; }); 100 } else if (reduction_op == ncclMax) { 101 test::FillFn<float>(&test_case->expected, [](int) { 102 return -1 * std::numeric_limits<float>::max(); 103 }); 104 } else if (reduction_op == ncclMin) { 105 test::FillFn<float>(&test_case->expected, [](int) { 106 return std::numeric_limits<float>::max(); 107 }); 108 } else { 109 LOG(FATAL) << "Invalid reduction_op " << reduction_op; 110 } 111 112 int mult = 1; 113 for (int i = 0; i < num_ranks; ++i) { 114 auto* device = devices->at(i % devices->size()); 115 auto* stream = device->tensorflow_gpu_device_info()->stream; 116 117 Tensor in_cpu(DT_FLOAT, shape); 118 test::FillFn<float>(&in_cpu, [mult, value_offset](int index) { 119 return value_offset + (index + 1) * mult; 120 }); 121 for (int j = 0; j < shape.num_elements(); ++j) { 122 auto in_val = in_cpu.flat<float>()(j); 123 auto out_expr = test_case->expected.flat<float>(); 124 if (reduction_op == ncclProd) { 125 out_expr(j) *= in_val; 126 } else if (reduction_op == ncclSum) { 127 out_expr(j) += in_val; 128 } else if (reduction_op == ncclMax) { 129 if (in_val > out_expr(j)) { 130 out_expr(j) = in_val; 131 } 132 } else if (reduction_op == ncclMin) { 133 if (in_val < out_expr(j)) { 134 out_expr(j) = in_val; 135 } 136 } 137 } 138 139 mult *= 10; 140 test_case->ins.emplace_back(gpu_allocator(device), DT_FLOAT, shape); 141 test_case->outs.emplace_back(gpu_allocator(device), DT_FLOAT, shape); 142 143 const Tensor& in_gpu = test_case->ins.back(); 144 auto in_gpu_mem = AsDeviceMemory(in_gpu.flat<float>().data()); 145 stream->ThenMemcpy(&in_gpu_mem, in_cpu.flat<float>().data(), 146 in_cpu.TotalBytes()); 147 } 148 return test_case; 149 } 150 151 NcclManager::DoneCallback CreateDoneCallback(TestCase* test_case) { 152 return [this, test_case](Status s) { 153 mutex_lock l(test_case->mu); 154 ++test_case->num_completed; 155 test_case->final_status.Update(s); 156 }; 157 } 158 159 void VerifyResults(const string& case_label, TestCase* test_case) { 160 // Wait for the done callback to be called. 161 { 162 test_case->mu.lock(); 163 while (test_case->num_completed != test_case->outs.size()) { 164 test_case->mu.unlock(); 165 Env::Default()->SleepForMicroseconds(10); 166 test_case->mu.lock(); 167 } 168 test_case->mu.unlock(); 169 } 170 // Copy memory to host and verify. 171 for (int i = 0; i < test_case->outs.size(); ++i) { 172 auto* device = devices->at(i % devices->size()); 173 auto* stream = device->tensorflow_gpu_device_info()->stream; 174 const Tensor& out_gpu = test_case->outs[i]; 175 Tensor out_cpu(DT_FLOAT, out_gpu.shape()); 176 auto out_gpu_mem = AsDeviceMemory(out_gpu.flat<float>().data()); 177 stream->ThenMemcpy(out_cpu.flat<float>().data(), out_gpu_mem, 178 out_cpu.TotalBytes()); 179 SE_ASSERT_OK(stream->BlockHostUntilDone()); 180 test::ExpectTensorEqual<float>(test_case->expected, out_cpu); 181 } 182 } 183 }; 184 std::vector<BaseGPUDevice*>* NcclManagerTest::devices = nullptr; 185 186 // Test basic sum reduction. 187 TEST_F(NcclManagerTest, BasicSumReduction) { 188 const int num_ranks = 3; 189 190 for (int op = 0; op < 4; ++op) { 191 ncclRedOp_t reduction_op = static_cast<ncclRedOp_t>(op); 192 std::unique_ptr<TestCase> test_case( 193 MakeTestCase(num_ranks, reduction_op, TensorShape({2, 3}), 0)); 194 for (int device_num = 0; device_num < num_ranks; ++device_num) { 195 auto* device = devices->at(device_num % devices->size()); 196 auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr; 197 auto* stream = device->tensorflow_gpu_device_info()->stream; 198 NcclManager::instance()->AddToAllReduce( 199 num_ranks, "allreduce", reduction_op, device->executor(), 200 device->gpu_id(), event_mgr, stream, &test_case->ins[device_num], 201 &test_case->outs[device_num], CreateDoneCallback(test_case.get())); 202 } 203 204 LOG(ERROR) << "Verifying results"; 205 VerifyResults("test_case", test_case.get()); 206 } 207 } 208 209 // Same as the Basic test, but with multiple threads launching parts of many 210 // reductions. 211 // 212 // Testing the multi-rank execution is currently reduced as it can hang when run 213 // with num_ranks > devices->size(), for some GPUs (e.g. K20m). 214 // To test the higher settings, increase num_ranks, 215 // num_collectives_per_iteration and time_limit_micros. 216 TEST_F(NcclManagerTest, MultipleCallers) { 217 const int num_ranks = 1; // 2; 218 const int num_collectives_per_iteration = 1; // 1000; 219 const int num_threads = 3; 220 const int time_limit_micros = 1; // 60 * 30 * 1000 * 1000; 221 222 int64 start = Env::Default()->NowMicros(); 223 srand(Env::Default()->NowMicros()); 224 225 for (;;) { 226 std::vector<std::pair<int, int>> case_and_device_num; 227 std::vector<std::unique_ptr<TestCase>> test_cases; 228 for (int i = 0; i < num_collectives_per_iteration; ++i) { 229 test_cases.emplace_back( 230 MakeTestCase(num_ranks, ncclSum, 231 TensorShape({100, i % 5 + 1, i % 3 + 1}), i + 0.1 * i)); 232 for (int j = 0; j < num_ranks; ++j) { 233 case_and_device_num.emplace_back(i, j); 234 } 235 } 236 237 for (int i = 0; i < num_ranks; ++i) { 238 auto* device = devices->at(i % devices->size()); 239 auto* stream = device->tensorflow_gpu_device_info()->stream; 240 SE_ASSERT_OK(stream->BlockHostUntilDone()); 241 } 242 243 std::shuffle(case_and_device_num.begin(), case_and_device_num.end(), 244 std::mt19937(std::random_device()())); 245 246 mutex mu; // guards case_and_device_num. 247 std::unique_ptr<thread::ThreadPool> pool( 248 new thread::ThreadPool(Env::Default(), "test", num_threads)); 249 const int to_schedule = case_and_device_num.size(); 250 for (int i = 0; i < to_schedule; ++i) { 251 auto fn = [&]() { 252 int device_num; 253 int test_num; 254 { 255 mutex_lock l(mu); 256 test_num = case_and_device_num.back().first; 257 device_num = case_and_device_num.back().second; 258 case_and_device_num.pop_back(); 259 } 260 auto* device = devices->at(device_num % devices->size()); 261 auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr; 262 auto* stream = device->tensorflow_gpu_device_info()->stream; 263 TestCase* test_case = test_cases[test_num].get(); 264 NcclManager::instance()->AddToAllReduce( 265 num_ranks, strings::StrCat("allreduce", test_num), ncclSum, 266 device->executor(), device->gpu_id(), event_mgr, stream, 267 &test_case->ins[device_num], &test_case->outs[device_num], 268 CreateDoneCallback(test_case)); 269 }; 270 pool->Schedule(fn); 271 } 272 pool.reset(); // wait for all work to be scheduled. 273 274 LOG(ERROR) << "Verifying results for " << num_collectives_per_iteration 275 << " collectives"; 276 for (int i = 0; i < test_cases.size(); ++i) { 277 VerifyResults(strings::StrCat("collective", i), test_cases[i].get()); 278 } 279 280 int64 delta = Env::Default()->NowMicros() - start; 281 if (delta > time_limit_micros) { 282 LOG(ERROR) << "Ran for " << delta << " quitting"; 283 break; 284 } 285 } 286 } 287 288 } // namespace tensorflow 289 290 #endif // GOOGLE_CUDA 291