     16 #define EIGEN_USE_THREADS
     18 #include "tensorflow/compiler/xla/service/backend.h"
     20 #include <algorithm>
     21 #include <string>
     22 #include <utility>
     24 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
     25 #include "tensorflow/compiler/xla/service/compiler.h"
     26 #include "tensorflow/compiler/xla/service/platform_util.h"
     27 #include "tensorflow/compiler/xla/status_macros.h"
     28 #include "tensorflow/compiler/xla/statusor.h"
     29 #include "tensorflow/compiler/xla/types.h"
     30 #include "tensorflow/compiler/xla/util.h"
     31 #include "tensorflow/core/common_runtime/eigen_thread_pool.h"
     32 #include "tensorflow/core/lib/core/errors.h"
     33 #include "tensorflow/core/lib/core/threadpool.h"
     34 #include "tensorflow/core/platform/cpu_info.h"
     35 #include "tensorflow/core/platform/env.h"
     36 #include "tensorflow/core/platform/logging.h"
     37 #include "tensorflow/core/platform/stream_executor_no_cuda.h"
     39 namespace se = ::perftools::gputools;
     41 namespace xla {
     43 BackendOptions& BackendOptions::set_platform(
     44     perftools::gputools::Platform* platform) {
     45   platform_ = platform;
     46   return *this;
     47 }
     49 perftools::gputools::Platform* BackendOptions::platform() const {
     50   return platform_;
     51 }
     53 BackendOptions& BackendOptions::set_intra_op_parallelism_threads(
     54     int num_threads) {
     55   intra_op_parallelism_threads_ = num_threads;
     56   return *this;
     57 }
     59 int BackendOptions::intra_op_parallelism_threads() const {
     60   return intra_op_parallelism_threads_;
     61 }
     63 // Define this in .cc file to avoid having to include eigen or forward declare
     64 // these types in the header.
     65 struct Backend::EigenThreadPoolWrapper {
     66   explicit EigenThreadPoolWrapper(const int num_threads)
     67       : pool(new tensorflow::thread::ThreadPool(tensorflow::Env::Default(),
     68                                                 "XLAEigen", num_threads)),
     69         wrapper(new tensorflow::EigenThreadPoolWrapper(pool.get())),
     70         device(new Eigen::ThreadPoolDevice(wrapper.get(),
     71                                            wrapper->NumThreads())) {}
     73   std::unique_ptr<tensorflow::thread::ThreadPool> pool;
     74   std::unique_ptr<tensorflow::EigenThreadPoolWrapper> wrapper;
     75   std::unique_ptr<Eigen::ThreadPoolDevice> device;
     76 };
     78 /* static */ StatusOr<std::unique_ptr<Backend>> Backend::CreateBackend(
     79     const BackendOptions& options) {
     80   perftools::gputools::Platform* platform = options.platform();
     81   TF_ASSIGN_OR_RETURN(auto compiler, Compiler::GetForPlatform(platform));
     82   TF_ASSIGN_OR_RETURN(auto stream_executors,
     83                       PlatformUtil::GetStreamExecutors(platform));
     84   TF_ASSIGN_OR_RETURN(auto transfer_manager,
     85                       TransferManager::GetForPlatform(platform));
     86   TF_ASSIGN_OR_RETURN(auto computation_placer,
     87                       ComputationPlacer::GetForPlatform(platform));
     88   std::unique_ptr<Backend> backend(
     89       new Backend(platform, compiler, stream_executors, transfer_manager,
     90                   computation_placer, options.intra_op_parallelism_threads()));
     91   return std::move(backend);
     92 }
     94 /* static */ StatusOr<std::unique_ptr<Backend>>
     95 Backend::CreateDefaultBackend() {
     96   TF_ASSIGN_OR_RETURN(se::Platform * platform,
     97                       PlatformUtil::GetDefaultPlatform());
     98   BackendOptions backend_options;
     99   backend_options.set_platform(platform);
    100   return CreateBackend(backend_options);
    101 }
    103 StatusOr<Backend::StreamPtr> Backend::BorrowStream(int device_ordinal) {
    104   TF_ASSIGN_OR_RETURN(auto exec, stream_executor(device_ordinal));
    105   return BorrowStream(exec);
    106 }
    108 StatusOr<Backend::StreamPtr> Backend::BorrowStream(
    109     se::StreamExecutor* executor) {
    110   tensorflow::mutex_lock l(mu_);
    111   if (0 == stream_pools_.count(executor)) {
    112     stream_pools_.emplace(std::piecewise_construct,
    113                           std::forward_as_tuple(executor),
    114                           std::forward_as_tuple([executor]() {
    115                             auto stream = MakeUnique<se::Stream>(executor);
    116                             stream->Init();
    117                             return stream;
    118                           }));
    119   }
    120   return stream_pools_.at(executor).Allocate();
    121 }
    123 Backend::Backend(
    124     perftools::gputools::Platform* platform, Compiler* compiler,
    125     tensorflow::gtl::ArraySlice<se::StreamExecutor*> stream_executors,
    126     TransferManager* transfer_manager, ComputationPlacer* computation_placer,
    127     int intra_op_parallelism_threads)
    128     : platform_(platform),
    129       compiler_(compiler),
    130       transfer_manager_(transfer_manager),
    131       computation_placer_(computation_placer) {
    132   // The given set of stream executors set may include invalid executors.
    133   for (se::StreamExecutor* exec : stream_executors) {
    134     if (exec != nullptr) {
    135       stream_executors_.push_back(exec);
    136     }
    137   }
    138   // Create a memory allocator for the valid stream executors.
    139   memory_allocator_ =
    140       MakeUnique<StreamExecutorMemoryAllocator>(platform, stream_executors);
    141   CHECK(!stream_executors_.empty())
    142       << "Service found no devices for backend " << platform_->Name() << '.';
    144   if (platform->id() == se::host::kHostPlatformId) {
    145     inter_op_thread_pool_.reset(new tensorflow::thread::ThreadPool(
    146         tensorflow::Env::Default(), "xla_inter_op",
    147         tensorflow::port::NumSchedulableCPUs()));
    148     const int num_threads = intra_op_parallelism_threads > 0
    149                                 ? intra_op_parallelism_threads
    150                                 : tensorflow::port::NumSchedulableCPUs();
    151     intra_op_thread_pool_wrapper_.reset(
    152         new EigenThreadPoolWrapper(num_threads));
    153   }
    154 }
    156 Backend::~Backend() {}
    158 int Backend::default_device_ordinal() const {
    159   return default_stream_executor()->device_ordinal();
    160 }
    162 tensorflow::thread::ThreadPool* Backend::inter_op_thread_pool() const {
    163   return inter_op_thread_pool_.get();
    164 }
    166 const Eigen::ThreadPoolDevice* Backend::eigen_intra_op_thread_pool_device()
    167     const {
    168   if (intra_op_thread_pool_wrapper_ == nullptr) {
    169     return nullptr;
    170   }
    171   return intra_op_thread_pool_wrapper_->device.get();
    172 }
    174 tensorflow::thread::ThreadPool* Backend::eigen_intra_op_thread_pool() const {
    175   if (intra_op_thread_pool_wrapper_ == nullptr) {
    176     return nullptr;
    177   }
    178   return intra_op_thread_pool_wrapper_->pool.get();
    179 }
    181 StatusOr<perftools::gputools::StreamExecutor*> Backend::stream_executor(
    182     int device_ordinal) const {
    183   if (device_ordinal < 0 ||
    184       device_ordinal > stream_executors_.back()->device_ordinal()) {
    185     return InvalidArgument(
    186         "Invalid device ordinal value (%d). Valid range is [0, %d].",
    187         device_ordinal, stream_executors_.back()->device_ordinal());
    188   }
    189   for (auto* executor : stream_executors_) {
    190     if (executor->device_ordinal() == device_ordinal) {
    191       return executor;
    192     }
    193   }
    194   return InvalidArgument("device %s not supported by XLA service",
    195                          device_name(device_ordinal).c_str());
    196 }
    198 StatusOr<bool> Backend::devices_equivalent(int device_ordinal_a,
    199                                            int device_ordinal_b) {
    200   // Use the name from device description to determine equivalence. This is a
    201   // bit crude but works for GPUs which is the important case where we compile
    202   // an executable for one GPU and want to know if it will run (well) on
    203   // another.
    204   TF_ASSIGN_OR_RETURN(perftools::gputools::StreamExecutor * executor_a,
    205                       stream_executor(device_ordinal_a));
    206   TF_ASSIGN_OR_RETURN(perftools::gputools::StreamExecutor * executor_b,
    207                       stream_executor(device_ordinal_b));
    208   return (executor_a->GetDeviceDescription().name() ==
    209           executor_b->GetDeviceDescription().name());
    210 }
    212 Status Backend::ResetDevices() {
    213   return transfer_manager_->ResetDevices(stream_executors_);
    214 }
    216 }  // namespace xla