1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 // See docs in ../ops/data_flow_ops.cc. 17 18 #include <limits.h> 19 #include <atomic> 20 #include <vector> 21 22 #include "tensorflow/core/common_runtime/device.h" 23 #include "tensorflow/core/framework/device_base.h" 24 #include "tensorflow/core/framework/op_kernel.h" 25 #include "tensorflow/core/framework/register_types.h" 26 #include "tensorflow/core/framework/resource_mgr.h" 27 #include "tensorflow/core/framework/tensor.h" 28 #include "tensorflow/core/framework/tensor_shape.h" 29 #include "tensorflow/core/framework/types.h" 30 #include "tensorflow/core/lib/core/errors.h" 31 #include "tensorflow/core/lib/core/refcount.h" 32 #include "tensorflow/core/lib/gtl/map_util.h" 33 #include "tensorflow/core/platform/logging.h" 34 #include "tensorflow/core/platform/macros.h" 35 #include "tensorflow/core/platform/mutex.h" 36 #include "tensorflow/core/platform/thread_annotations.h" 37 #include "tensorflow/core/platform/types.h" 38 39 namespace tensorflow { 40 41 typedef Eigen::ThreadPoolDevice CPUDevice; 42 typedef Eigen::GpuDevice GPUDevice; 43 #ifdef TENSORFLOW_USE_SYCL 44 typedef Eigen::SyclDevice SYCLDevice; 45 #endif // TENSORFLOW_USE_SYCL 46 47 class Stack : public ResourceBase { 48 public: 49 static std::atomic<int64> stack_counter; 50 51 struct TensorAndAllocation { 52 Tensor tensor; 53 AllocatorAttributes alloc_attrs; 54 bool swapped_to_cpu; 55 }; 56 57 Stack(const DataType& elem_type, const string& stack_name, int max_size) 58 : elem_type_(elem_type), 59 stack_name_(stack_name), 60 max_size_(max_size), 61 closed_(false) {} 62 63 Status Push(const TensorAndAllocation& value) { 64 mutex_lock l(mu_); 65 TF_RETURN_IF_ERROR(CheckNotClosed()); 66 if (max_size_ >= 0 && stack_.size() >= max_size_) { 67 return errors::InvalidArgument("Stack[", stack_name_, "] overflowed ", 68 "its max_size (", max_size_, ")"); 69 } 70 stack_.push_back(value); 71 return Status::OK(); 72 } 73 74 Status Pop(TensorAndAllocation* value) { 75 mutex_lock l(mu_); 76 TF_RETURN_IF_ERROR(CheckNotClosed()); 77 if (stack_.empty()) { 78 return errors::InvalidArgument("Stack[", stack_name_, 79 "] is empty when calling Pop()."); 80 } 81 *value = stack_.back(); 82 stack_.pop_back(); 83 return Status::OK(); 84 } 85 86 // We don't swap the first tensor on the stack and any subsequent tensors 87 // that share the buffer with the first tensor. 88 bool IsUsefulToSwap(const Tensor& tensor) const { 89 mutex_lock l(mu_); 90 if (stack_.empty()) { 91 return false; 92 } 93 const Tensor& first = stack_.front().tensor; 94 return !tensor.SharesBufferWith(first); 95 } 96 97 void Close() { 98 mutex_lock l(mu_); 99 stack_.clear(); 100 closed_ = true; 101 } 102 103 DataType ElemType() { return elem_type_; } 104 105 string DebugString() override { 106 mutex_lock l(mu_); 107 return strings::StrCat("Stack[", stack_name_, "]"); 108 } 109 110 const string& stack_name() { return stack_name_; } 111 112 private: 113 friend class StackOp; 114 mutex* mu() { return &mu_; } 115 116 mutable mutex mu_; 117 DataType elem_type_; 118 const string stack_name_; 119 Tensor handle_; 120 int max_size_; 121 bool closed_ GUARDED_BY(mu_); 122 std::vector<TensorAndAllocation> stack_ GUARDED_BY(mu_); 123 124 Status CheckNotClosed() const EXCLUSIVE_LOCKS_REQUIRED(mu_) { 125 if (closed_) { 126 return errors::InvalidArgument("Stack[", stack_name_, 127 "] has already been closed."); 128 } 129 return Status::OK(); 130 } 131 }; 132 133 Status GetStack(OpKernelContext* ctx, Stack** stack) { 134 string key; 135 if (ctx->input_dtype(0) == DT_RESOURCE) { 136 auto resource = ctx->input(0).flat<ResourceHandle>()(0); 137 key = resource.name(); 138 } else { 139 Tensor Tstack_handle = ctx->mutable_input(0, false); 140 if (Tstack_handle.NumElements() != 2) { 141 return errors::InvalidArgument( 142 "Stack handle must have two elements, but had shape: ", 143 Tstack_handle.shape().DebugString()); 144 } 145 const string& container = Tstack_handle.flat<string>()(0); 146 const string& stack_name = Tstack_handle.flat<string>()(1); 147 key = strings::StrCat(container, stack_name); 148 } 149 ResourceMgr* rm = ctx->resource_manager(); 150 if (rm == nullptr) { 151 return errors::Internal("No resource manager."); 152 } 153 auto* step_container = ctx->step_container(); 154 if (step_container == nullptr) { 155 return errors::Internal("No step container."); 156 } 157 TF_RETURN_IF_ERROR(rm->Lookup(step_container->name(), key, stack)); 158 return Status::OK(); 159 } 160 161 std::atomic<int64> Stack::stack_counter{0}; 162 163 // A per-run local stack. The stack uses a "per-step" resource manager which 164 // ensures that correct garbage collection on error or successful completion. 165 class StackOp : public OpKernel { 166 public: 167 explicit StackOp(OpKernelConstruction* context) : OpKernel(context) { 168 OP_REQUIRES_OK(context, context->GetAttr("elem_type", &elem_type_)); 169 OP_REQUIRES_OK(context, context->GetAttr("stack_name", &stack_name_)); 170 if (stack_name_.empty()) stack_name_ = name(); 171 } 172 173 void Compute(OpKernelContext* ctx) override { 174 int32 size = std::numeric_limits<int32>::max(); 175 if (ctx->num_inputs() > 0) { 176 const Tensor* tensor_size; 177 OP_REQUIRES_OK(ctx, ctx->input("max_size", &tensor_size)); 178 179 OP_REQUIRES(ctx, TensorShapeUtils::IsScalar(tensor_size->shape()), 180 errors::InvalidArgument( 181 "Stack size must be a scalar, but had shape: ", 182 tensor_size->shape().DebugString())); 183 184 int32 size_value = tensor_size->scalar<int32>()(); 185 if (size_value >= 0) { 186 size = size_value; 187 } 188 } 189 190 static const char kContainer[] = "_stacks"; 191 auto stack_id = Stack::stack_counter.fetch_add(1); 192 string stack_name = strings::StrCat(stack_name_, "_", stack_id); 193 // Store the handle in a per-step container. 194 ResourceMgr* rm = ctx->resource_manager(); 195 OP_REQUIRES(ctx, rm != nullptr, errors::Internal("No resource manager.")); 196 string key = strings::StrCat(kContainer, stack_name); 197 Stack* stack = new Stack(elem_type_, stack_name, size); 198 auto* step_container = ctx->step_container(); 199 OP_REQUIRES(ctx, step_container != nullptr, 200 errors::Internal("No step container.")); 201 OP_REQUIRES_OK(ctx, rm->Create(step_container->name(), key, stack)); 202 if (IsRefType(ctx->expected_output_dtype(0))) { 203 // Create the stack handle. 204 AllocatorAttributes alloc_attr; 205 alloc_attr.set_on_host(true); 206 OP_REQUIRES_OK(ctx, ctx->allocate_temp(tensorflow::DT_STRING, 207 tensorflow::TensorShape({2}), 208 &stack->handle_, alloc_attr)); 209 auto handle = stack->handle_.flat<string>(); 210 handle(0) = kContainer; 211 handle(1) = std::move(stack_name); 212 ctx->set_output_ref(0, stack->mu(), &stack->handle_); 213 } else { 214 Tensor* handle; 215 OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), &handle)); 216 handle->flat<ResourceHandle>()(0) = 217 MakePerStepResourceHandle<Stack>(ctx, key); 218 } 219 } 220 221 private: 222 DataType elem_type_; 223 string stack_name_; 224 225 TF_DISALLOW_COPY_AND_ASSIGN(StackOp); 226 }; 227 228 REGISTER_KERNEL_BUILDER(Name("Stack").Device(DEVICE_CPU), StackOp); 229 REGISTER_KERNEL_BUILDER(Name("Stack").Device(DEVICE_GPU).HostMemory("handle"), 230 StackOp); 231 REGISTER_KERNEL_BUILDER(Name("StackV2").Device(DEVICE_CPU), StackOp); 232 REGISTER_KERNEL_BUILDER(Name("StackV2") 233 .Device(DEVICE_GPU) 234 .HostMemory("max_size") 235 .HostMemory("handle"), 236 StackOp); 237 #ifdef TENSORFLOW_USE_SYCL 238 REGISTER_KERNEL_BUILDER(Name("Stack").Device(DEVICE_SYCL).HostMemory("handle"), 239 StackOp); 240 REGISTER_KERNEL_BUILDER(Name("StackV2") 241 .Device(DEVICE_SYCL) 242 .HostMemory("max_size") 243 .HostMemory("handle"), 244 StackOp); 245 #endif // TENSORFLOW_USE_SYCL 246 247 template <typename Device> 248 class StackPushOp : public AsyncOpKernel { 249 public: 250 explicit StackPushOp(OpKernelConstruction* context) : AsyncOpKernel(context) { 251 OP_REQUIRES_OK(context, context->GetAttr("swap_memory", &swap_memory_)); 252 } 253 254 void ComputeAsync(OpKernelContext* ctx, DoneCallback done) override { 255 // Get the stack from the handle. 256 Stack* stack = nullptr; 257 OP_REQUIRES_OK_ASYNC(ctx, GetStack(ctx, &stack), done); 258 core::ScopedUnref unref(stack); 259 260 if (ctx->input_dtype(1) != stack->ElemType()) { 261 ctx->CtxFailure(errors::InvalidArgument("Must have type ", 262 stack->ElemType(), " but got ", 263 ctx->input_dtype(1))); 264 done(); 265 return; 266 } 267 268 // Push the tensor onto the stack. Swap the tensor to CPU if instructed. 269 const Tensor& tensor = ctx->input(1); 270 AllocatorAttributes alloc_attrs = ctx->input_alloc_attr(1); 271 // For now, we use a simple heuristic for swapping: A GPU tensor is moved 272 // to CPU if the tensor has more than kCopyThreshold bytes and the GPU 273 // allocator says more than kOccupancy of the memory is in use. 274 static constexpr int kCopyThreshold = 2048; 275 static constexpr double kOccupancy = 0.7; 276 if (swap_memory_ && !alloc_attrs.on_host() && 277 (std::is_same<Device, GPUDevice>::value 278 #ifdef TENSORFLOW_USE_SYCL 279 || std::is_same<Device, SYCLDevice>::value 280 #endif // TENSORFLOW_USE_SYCL 281 ) && 282 tensor.TotalBytes() > kCopyThreshold && stack->IsUsefulToSwap(tensor)) { 283 DeviceContext* device_ctxt = ctx->op_device_context(); 284 auto device = static_cast<tensorflow::Device*>(ctx->device()); 285 Allocator* allocator = device->GetAllocator(alloc_attrs); 286 AllocatorStats stats; 287 allocator->GetStats(&stats); 288 if (stats.bytes_in_use > (stats.bytes_limit * kOccupancy)) { 289 // Asynchronously copy the tensor from GPU to CPU memory. 290 // TODO(yuanbyu): Swap the oldest tensor first. 291 AllocatorAttributes host_alloc_attrs; 292 host_alloc_attrs.set_gpu_compatible(true); 293 host_alloc_attrs.set_on_host(true); 294 Allocator* cpu_allocator = device->GetAllocator(host_alloc_attrs); 295 Tensor* cpu_tensor = 296 new Tensor(cpu_allocator, tensor.dtype(), tensor.shape()); 297 device_ctxt->CopyDeviceTensorToCPU( 298 &tensor, "StackPush", device, cpu_tensor, 299 [cpu_tensor, stack, ctx, done](const Status& s) { 300 ctx->SetStatus(s); 301 if (s.ok()) { 302 AllocatorAttributes alloc_attrs = ctx->input_alloc_attr(1); 303 ctx->SetStatus(stack->Push({*cpu_tensor, alloc_attrs, true})); 304 } 305 if (ctx->status().ok()) { 306 ctx->set_output(0, *cpu_tensor); 307 } 308 done(); 309 delete cpu_tensor; 310 }); 311 return; 312 } 313 } 314 315 // Execute synchronously if not swapped. 316 OP_REQUIRES_OK_ASYNC(ctx, stack->Push({tensor, alloc_attrs, false}), done); 317 ctx->set_output(0, tensor); 318 done(); 319 } 320 321 bool IsExpensive() override { return false; } 322 323 private: 324 bool swap_memory_; 325 }; 326 327 REGISTER_KERNEL_BUILDER(Name("StackPush").Device(DEVICE_CPU), 328 StackPushOp<CPUDevice>); 329 REGISTER_KERNEL_BUILDER(Name("StackPushV2").Device(DEVICE_CPU), 330 StackPushOp<CPUDevice>); 331 332 #define REGISTER_GPU_KERNEL(type) \ 333 REGISTER_KERNEL_BUILDER(Name("StackPush") \ 334 .Device(DEVICE_GPU) \ 335 .HostMemory("handle") \ 336 .TypeConstraint<type>("T"), \ 337 StackPushOp<GPUDevice>); \ 338 REGISTER_KERNEL_BUILDER(Name("StackPushV2") \ 339 .Device(DEVICE_GPU) \ 340 .HostMemory("handle") \ 341 .TypeConstraint<type>("T"), \ 342 StackPushOp<GPUDevice>); 343 344 TF_CALL_NUMBER_TYPES_NO_INT32(REGISTER_GPU_KERNEL); 345 #undef REGISTER_GPU_KERNEL 346 347 // Special GPU kernels for int32 and bool. 348 // TODO(b/25387198): Also enable int32 in device memory. This kernel 349 // registration requires all int32 inputs and outputs to be in host memory. 350 #define REGISTER_GPU_HOST_KERNEL(type) \ 351 REGISTER_KERNEL_BUILDER(Name("StackPush") \ 352 .Device(DEVICE_GPU) \ 353 .HostMemory("handle") \ 354 .HostMemory("elem") \ 355 .HostMemory("output") \ 356 .TypeConstraint<type>("T"), \ 357 StackPushOp<GPUDevice>); \ 358 REGISTER_KERNEL_BUILDER(Name("StackPushV2") \ 359 .Device(DEVICE_GPU) \ 360 .HostMemory("handle") \ 361 .HostMemory("elem") \ 362 .HostMemory("output") \ 363 .TypeConstraint<type>("T"), \ 364 StackPushOp<GPUDevice>); 365 366 REGISTER_GPU_HOST_KERNEL(int32); 367 REGISTER_GPU_HOST_KERNEL(bool); 368 369 #undef REGISTER_GPU_HOST_KERNEL 370 371 #ifdef TENSORFLOW_USE_SYCL 372 #define REGISTER_SYCL_KERNEL(type) \ 373 REGISTER_KERNEL_BUILDER(Name("StackPush") \ 374 .Device(DEVICE_SYCL) \ 375 .HostMemory("handle") \ 376 .TypeConstraint<type>("T"), \ 377 StackPushOp<SYCLDevice>); 378 379 TF_CALL_GPU_NUMBER_TYPES(REGISTER_SYCL_KERNEL); 380 381 #define REGISTER_SYCL_HOST_KERNEL(type) \ 382 REGISTER_KERNEL_BUILDER(Name("StackPush") \ 383 .Device(DEVICE_SYCL) \ 384 .HostMemory("handle") \ 385 .HostMemory("elem") \ 386 .HostMemory("output") \ 387 .TypeConstraint<type>("T"), \ 388 StackPushOp<SYCLDevice>) 389 390 REGISTER_SYCL_HOST_KERNEL(int32); 391 REGISTER_SYCL_HOST_KERNEL(bool); 392 #undef REGISTER_SYCL_KERNEL 393 #undef REGISTER_SYCL_HOST_KERNEL 394 #endif // TENSORFLOW_USE_SYCL 395 396 class StackPopOp : public AsyncOpKernel { 397 public: 398 explicit StackPopOp(OpKernelConstruction* context) : AsyncOpKernel(context) {} 399 400 void ComputeAsync(OpKernelContext* ctx, DoneCallback done) override { 401 // Get the stack from the handle. 402 Stack* stack = nullptr; 403 OP_REQUIRES_OK_ASYNC(ctx, GetStack(ctx, &stack), done); 404 core::ScopedUnref unref(stack); 405 406 // Pop the tensor. Transfer the tensor back to device if it was 407 // swapped out to CPU. 408 Stack::TensorAndAllocation value; 409 OP_REQUIRES_OK_ASYNC(ctx, stack->Pop(&value), done); 410 if (value.swapped_to_cpu) { 411 // Asynchronously copy the tensor back from CPU to GPU memory. 412 DeviceContext* device_ctxt = ctx->op_device_context(); 413 Device* device = static_cast<Device*>(ctx->device()); 414 Tensor* cpu_tensor = &value.tensor; 415 Allocator* gpu_allocator = device->GetAllocator(value.alloc_attrs); 416 Tensor* device_tensor = 417 new Tensor(gpu_allocator, cpu_tensor->dtype(), cpu_tensor->shape()); 418 device_ctxt->CopyCPUTensorToDevice( 419 cpu_tensor, device, device_tensor, 420 [device_tensor, ctx, done](const Status& s) { 421 ctx->SetStatus(s); 422 if (s.ok()) { 423 ctx->set_output(0, *device_tensor); 424 } 425 done(); 426 delete device_tensor; 427 }); 428 } else { 429 // Execute synchronously if not swapped. 430 ctx->set_output(0, value.tensor); 431 done(); 432 } 433 } 434 435 bool IsExpensive() override { return false; } 436 }; 437 438 REGISTER_KERNEL_BUILDER(Name("StackPop").Device(DEVICE_CPU), StackPopOp); 439 REGISTER_KERNEL_BUILDER(Name("StackPopV2").Device(DEVICE_CPU), StackPopOp); 440 441 #define REGISTER_GPU_KERNEL(type) \ 442 REGISTER_KERNEL_BUILDER(Name("StackPop") \ 443 .Device(DEVICE_GPU) \ 444 .HostMemory("handle") \ 445 .TypeConstraint<type>("elem_type"), \ 446 StackPopOp); \ 447 REGISTER_KERNEL_BUILDER(Name("StackPopV2") \ 448 .Device(DEVICE_GPU) \ 449 .HostMemory("handle") \ 450 .TypeConstraint<type>("elem_type"), \ 451 StackPopOp); 452 453 TF_CALL_NUMBER_TYPES_NO_INT32(REGISTER_GPU_KERNEL); 454 #undef REGISTER_GPU_KERNEL 455 456 // Special GPU kernels for int32 and bool. 457 // TODO(b/25387198): Also enable int32 in device memory. This kernel 458 // registration requires all int32 inputs and outputs to be in host memory. 459 #define REGISTER_GPU_HOST_KERNEL(type) \ 460 REGISTER_KERNEL_BUILDER(Name("StackPop") \ 461 .Device(DEVICE_GPU) \ 462 .HostMemory("handle") \ 463 .HostMemory("elem") \ 464 .TypeConstraint<type>("elem_type"), \ 465 StackPopOp); \ 466 REGISTER_KERNEL_BUILDER(Name("StackPopV2") \ 467 .Device(DEVICE_GPU) \ 468 .HostMemory("handle") \ 469 .HostMemory("elem") \ 470 .TypeConstraint<type>("elem_type"), \ 471 StackPopOp); 472 473 REGISTER_GPU_HOST_KERNEL(int32); 474 REGISTER_GPU_HOST_KERNEL(bool); 475 476 #undef REGISTER_GPU_HOST_KERNEL 477 478 #ifdef TENSORFLOW_USE_SYCL 479 #define REGISTER_SYCL_KERNEL(type) \ 480 REGISTER_KERNEL_BUILDER(Name("StackPop") \ 481 .Device(DEVICE_SYCL) \ 482 .HostMemory("handle") \ 483 .TypeConstraint<type>("elem_type"), \ 484 StackPopOp) 485 486 TF_CALL_GPU_NUMBER_TYPES(REGISTER_SYCL_KERNEL); 487 488 #define REGISTER_SYCL_HOST_KERNEL(type) \ 489 REGISTER_KERNEL_BUILDER(Name("StackPop") \ 490 .Device(DEVICE_SYCL) \ 491 .HostMemory("handle") \ 492 .HostMemory("elem") \ 493 .TypeConstraint<type>("elem_type"), \ 494 StackPopOp) 495 496 REGISTER_SYCL_HOST_KERNEL(int32); 497 REGISTER_SYCL_HOST_KERNEL(bool); 498 499 #undef REGISTER_SYCL_KERNEL 500 #undef REGISTER_SYCL_HOST_KERNEL 501 #endif // TENSORFLOW_USE_SYCL 502 503 class StackCloseOp : public OpKernel { 504 public: 505 explicit StackCloseOp(OpKernelConstruction* context) : OpKernel(context) {} 506 507 void Compute(OpKernelContext* ctx) override { 508 Stack* stack = nullptr; 509 OP_REQUIRES_OK(ctx, GetStack(ctx, &stack)); 510 core::ScopedUnref unref(stack); 511 stack->Close(); 512 } 513 514 bool IsExpensive() override { return false; } 515 }; 516 517 REGISTER_KERNEL_BUILDER(Name("StackClose").Device(DEVICE_CPU), StackCloseOp); 518 REGISTER_KERNEL_BUILDER( 519 Name("StackClose").Device(DEVICE_GPU).HostMemory("handle"), StackCloseOp); 520 REGISTER_KERNEL_BUILDER(Name("StackCloseV2").Device(DEVICE_CPU), StackCloseOp); 521 REGISTER_KERNEL_BUILDER( 522 Name("StackCloseV2").Device(DEVICE_GPU).HostMemory("handle"), StackCloseOp); 523 #ifdef TENSORFLOW_USE_SYCL 524 REGISTER_KERNEL_BUILDER( 525 Name("StackClose").Device(DEVICE_SYCL).HostMemory("handle"), StackCloseOp); 526 REGISTER_KERNEL_BUILDER( 527 Name("StackCloseV2").Device(DEVICE_SYCL).HostMemory("handle"), 528 StackCloseOp); 529 #endif // TENSORFLOW_USE_SYCL 530 531 } // namespace tensorflow 532