     16 #include "tensorflow/compiler/xla/service/cpu/cpu_executable.h"
     18 #include <stdint.h>
     19 #include <algorithm>
     20 #include <set>
     21 #include <unordered_set>
     22 #include <utility>
     23 #include <vector>
     25 #include "llvm/ExecutionEngine/Orc/IRCompileLayer.h"
     26 #include "tensorflow/compiler/xla/service/buffer_assignment.h"
     27 #include "tensorflow/compiler/xla/service/computation_layout.h"
     28 #include "tensorflow/compiler/xla/service/hlo_computation.h"
     29 #include "tensorflow/compiler/xla/service/hlo_module.h"
     30 #include "tensorflow/compiler/xla/service/logical_buffer.h"
     31 #include "tensorflow/compiler/xla/service/shaped_buffer.h"
     32 #include "tensorflow/compiler/xla/shape_tree.h"
     33 #include "tensorflow/compiler/xla/shape_util.h"
     34 #include "tensorflow/compiler/xla/status_macros.h"
     35 #include "tensorflow/compiler/xla/types.h"
     36 #include "tensorflow/compiler/xla/util.h"
     37 #include "tensorflow/compiler/xla/xla_data.pb.h"
     38 #include "tensorflow/core/lib/strings/str_util.h"
     39 #include "tensorflow/core/lib/strings/strcat.h"
     40 #include "tensorflow/core/lib/strings/stringprintf.h"
     41 #include "tensorflow/core/platform/env.h"
     42 #include "tensorflow/core/platform/logging.h"
     43 #include "tensorflow/core/platform/mem.h"
     44 #include "tensorflow/core/platform/mutex.h"
     45 #include "tensorflow/core/platform/types.h"
     46 #include "tensorflow/stream_executor/host/host_stream.h"
     48 namespace se = ::perftools::gputools;
     50 namespace xla {
     51 namespace cpu {
     53 CpuExecutable::CpuExecutable(
     54     std::unique_ptr<SimpleOrcJIT> jit,
     55     std::unique_ptr<const BufferAssignment> assignment,
     56     std::unique_ptr<const HloModule> hlo_module,
     57     const string& entry_function_name,
     58     std::unique_ptr<HloProfilePrinterData> hlo_profile_printer_data,
     59     std::unique_ptr<HloProfileIndexMap> hlo_profile_index_map)
     60     : Executable(std::move(hlo_module), std::move(hlo_profile_printer_data),
     61                  std::move(hlo_profile_index_map)),
     62       jit_(std::move(jit)),
     63       assignment_(std::move(assignment)) {
     64   // Resolve symbols in the constructor rather than at execution time to avoid
     65   // races because FindSymbol is not thread safe.
     66   llvm::JITSymbol sym = jit_->FindCompiledSymbol(entry_function_name);
     67   // We expect to find the symbol provided with entry_function_name; otherwise
     68   // this is an internal error.
     69   CHECK(sym) << "Symbol " << entry_function_name << " not found.";
     70   // getAddress can do work under the hood in the jit, so it needs to be
     71   // guarded by the mutex.
     72   compute_function_ =
     73       reinterpret_cast<ComputeFunctionType>(cantFail(sym.getAddress()));
     74 }
     76 Status CpuExecutable::AllocateBuffers(
     77     DeviceMemoryAllocator* memory_allocator, int device_ordinal,
     78     std::vector<perftools::gputools::DeviceMemoryBase>* buffers) {
     79   CHECK_EQ(buffers->size(), assignment_->Allocations().size());
     80   VLOG(3) << "Allocating " << assignment_->Allocations().size()
     81           << " allocations for module " << module().name();
     82   for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size();
     83        ++i) {
     84     auto& allocation = assignment_->GetAllocation(i);
     86     VLOG(3) << allocation.ToString();
     88     if (allocation.is_entry_computation_parameter()) {
     89       VLOG(3) << "allocation #" << i << " is a parameter";
     90       continue;
     91     }
     93     if (allocation.is_thread_local()) {
     94       VLOG(3) << "buffer #" << i << " is thread-local";
     95       continue;
     96     }
     98     int64 buffer_size = allocation.size();
     99     if (!(*buffers)[i].is_null()) {
    100       VLOG(3) << "buffer #" << i
    101               << " is in the preallocated result ShapedBuffer";
    102     } else {
    103       TF_ASSIGN_OR_RETURN((*buffers)[i], memory_allocator->Allocate(
    104                                              device_ordinal, buffer_size));
    106       VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes ["
    107               << (*buffers)[i].opaque() << "]";
    108     }
    110     // Since the output buffer and all the temporary buffers were written into
    111     // by the JITed code, msan has no way of knowing their memory was
    112     // initialized. Mark them initialized so that msan doesn't flag loads from
    113     // these buffers.
    114     TF_ANNOTATE_MEMORY_IS_INITIALIZED((*buffers)[i].opaque(), buffer_size);
    115   }
    117   TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
    118                       assignment_->GetUniqueTopLevelOutputSlice());
    119   VLOG(3) << "result index: " << result_slice.index();
    121   return Status::OK();
    122 }
    124 Status CpuExecutable::ExecuteComputeFunction(
    125     const ExecutableRunOptions* run_options,
    126     tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
    127     tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
    128     HloExecutionProfile* hlo_execution_profile) {
    129   // The calling convention for JITed functions is:
    130   //
    131   //  void function(void* result, const void* run_options, void** args_array,
    132   //                void** temps_array)
    133   //
    134   // result: Points at the result.
    135   // run_options: the ExecutableRunOptions object.
    136   // args_array: An array of pointers, each of which points to a parameter.
    137   //               The size of this array is determined by the function's arity
    138   //               (ProgramShape).
    139   // temps_array:  An array of pointers, each of which points to a temporary
    140   //               buffer the computation needs. The size of this array is
    141   //               determined by buffer analysis.
    142   //
    143   std::vector<const void*> args_array;
    144   for (const ShapedBuffer* argument : arguments) {
    145     args_array.push_back(argument->root_buffer().opaque());
    146   }
    148   uint64 start_micros = tensorflow::Env::Default()->NowMicros();
    150   size_t profile_counters_size =
    151       hlo_execution_profile ? hlo_execution_profile->profile_counters().size()
    152                             : 0;
    153   int64* profile_counters =
    154       hlo_execution_profile
    155           ? hlo_execution_profile->mutable_profile_counters()->data()
    156           : nullptr;
    158   // Call the computation function following the calling convention.
    159   std::vector<void*> buffer_pointers;
    160   for (auto& buffer : buffers) {
    161     buffer_pointers.push_back(const_cast<void*>(buffer.opaque()));
    162   }
    163   TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
    164                       assignment_->GetUniqueTopLevelOutputSlice());
    165   void* result_buffer = buffer_pointers[result_slice.index()];
    166   if (VLOG_IS_ON(3)) {
    167     VLOG(3) << "Executing compute function:";
    168     VLOG(3) << tensorflow::strings::Printf(
    169         "  func(void* result, void* params[%zu], void* temps[%zu], "
    170         "uint64 profile_counters[%zu])",
    171         args_array.size(), buffer_pointers.size(), profile_counters_size);
    172     VLOG(3) << tensorflow::strings::Printf("    result = %p", result_buffer);
    173     auto ptr_printer = [](string* out, const void* p) {
    174       tensorflow::strings::StrAppend(out, tensorflow::strings::Printf("%p", p));
    175     };
    176     VLOG(3) << tensorflow::strings::Printf(
    177         "    params = [%s]",
    178         tensorflow::str_util::Join(args_array, ", ", ptr_printer).c_str());
    179     VLOG(3) << tensorflow::strings::Printf(
    180         "    temps = [%s]",
    181         tensorflow::str_util::Join(buffer_pointers, ", ", ptr_printer).c_str());
    182     VLOG(3) << tensorflow::strings::Printf("    profile_counters = %p",
    183                                            profile_counters);
    184   }
    186   compute_function_(result_buffer, run_options, args_array.data(),
    187                     buffer_pointers.data(), profile_counters);
    189   uint64 end_micros = tensorflow::Env::Default()->NowMicros();
    191   {
    192     tensorflow::mutex_lock lock(mutex_);
    193     const double nanoseconds = (end_micros - start_micros) * 1000.0;
    194     execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0));
    195     // If hlo profiling was disabled then the cycle count is left empty.
    196     if (hlo_execution_profile) {
    197       execution_profile_.set_compute_cycle_count(
    198           hlo_execution_profile->total_cycles_executed(
    199               *module().entry_computation()));
    200     }
    201   }
    203   return Status::OK();
    204 }
    206 static void LogLiveAddresses(
    207     tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
    208     const std::vector<bool>& buffers_in_result) {
    209   if (!VLOG_IS_ON(3)) {
    210     return;
    211   }
    213   CHECK_EQ(buffers.size(), buffers_in_result.size());
    214   std::vector<const void*> live_out_buffers;
    215   for (int i = 0; i < buffers.size(); ++i) {
    216     if (buffers_in_result[i]) {
    217       live_out_buffers.push_back(buffers[i].opaque());
    218     }
    219   }
    220   VLOG(3) << "Live addresses in output marking found "
    221           << live_out_buffers.size() << " addresses:\n"
    222           << tensorflow::str_util::Join(
    223                  live_out_buffers, ", ", [](string* out, const void* address) {
    224                    tensorflow::strings::StrAppend(
    225                        out, tensorflow::strings::Printf("%p", address));
    226                  });
    227 }
    229 static Status DeallocateTempBuffers(
    230     DeviceMemoryAllocator* allocator, se::Stream* stream,
    231     tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
    232     const std::vector<bool>& buffers_in_result) {
    233   // Keep those buffers in the output of the marked live because they are needed
    234   // by the service. They will be deallocated by the service.
    235   for (size_t i = 0; i < buffers.size(); ++i) {
    236     se::DeviceMemoryBase alloc = buffers[i];
    237     if (!buffers_in_result[i] && !alloc.is_null()) {
    238       VLOG(3) << "CpuExecutable deallocating buffer #" << i << " ["
    239               << alloc.opaque() << "]";
    240       TF_RETURN_IF_ERROR(
    241           allocator->Deallocate(stream->parent()->device_ordinal(), &alloc));
    242     }
    243   }
    245   return Status::OK();
    246 }
    248 StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::CreateResultShapedBuffer(
    249     const ServiceExecutableRunOptions* run_options,
    250     tensorflow::gtl::ArraySlice<perftools::gputools::DeviceMemoryBase>
    251         allocated_buffers,
    252     std::vector<bool>* buffers_in_result) {
    253   se::Stream* stream = run_options->stream();
    254   auto result_buffer = MakeUnique<ShapedBuffer>(
    255       /*on_host_shape=*/result_shape(), /*on_device_shape=*/result_shape(),
    256       stream->parent()->platform(), stream->parent()->device_ordinal());
    258   // Copy DeviceMemoryBase values which contain the array(s) of the result into
    259   // the respective location in ShapedBuffer which is returned to the caller.
    260   TF_RETURN_IF_ERROR(result_buffer->buffers().ForEachMutableElementWithStatus(
    261       [&](const ShapeIndex& index, se::DeviceMemoryBase* device_memory) {
    262         const auto& sources = this->GetRootPointsToSet().element(index);
    263         // The points to set is unambiguous so the set should be a
    264         // singleton.
    265         CHECK_EQ(1, sources.size());
    266         const LogicalBuffer* buffer_source = sources[0];
    267         HloInstruction* src = buffer_source->instruction();
    269         // The source for this result buffer can be a nested buffer such as
    270         // a tuple element. The source instruction should have a
    271         // non-parameter buffer assigned.
    272         TF_ASSIGN_OR_RETURN(
    273             const BufferAllocation::Slice slice,
    274             this->assignment_->GetUniqueSlice(src, buffer_source->index()));
    275         CHECK(!slice.allocation()->is_entry_computation_parameter());
    277         const BufferAllocation::Index buffer_index = slice.index();
    278         const se::DeviceMemoryBase& buffer = allocated_buffers[buffer_index];
    279         CHECK(!buffer.is_null() || buffer.size() == 0);
    280         *device_memory = buffer;
    281         (*buffers_in_result)[buffer_index] = true;
    282         return Status::OK();
    283       }));
    284   return std::move(result_buffer);
    285 }
    287 StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::ExecuteOnStream(
    288     const ServiceExecutableRunOptions* run_options,
    289     tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
    290     HloExecutionProfile* hlo_execution_profile) {
    291   if (GetRootPointsToSet().IsAmbiguous()) {
    292     return Unimplemented("Points-to set of root instruction is ambiguous");
    293   }
    295   se::Stream* stream = run_options->stream();
    296   DeviceMemoryAllocator* memory_allocator = run_options->allocator();
    297   std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
    299   TF_RETURN_IF_ERROR(AllocateBuffers(
    300       memory_allocator, stream->parent()->device_ordinal(), &buffers));
    301   TF_RETURN_IF_ERROR(ExecuteComputeFunction(
    302       &run_options->run_options(), arguments, buffers, hlo_execution_profile));
    304   std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false);
    306       std::unique_ptr<ShapedBuffer> result_buffer,
    307       CreateResultShapedBuffer(run_options, buffers, &buffers_in_result));
    309   // Free all buffers not in the result.
    310   TF_RETURN_IF_ERROR(DeallocateTempBuffers(memory_allocator, stream, buffers,
    311                                            buffers_in_result));
    313   return std::move(result_buffer);
    314 }
    316 StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::ExecuteAsyncOnStream(
    317     const ServiceExecutableRunOptions* run_options,
    318     tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments) {
    319   if (hlo_profiling_enabled()) {
    320     return Unimplemented(
    321         "Asynchronous execution on stream with hlo profiling is not yet "
    322         "supported on CPU.");
    323   }
    325   auto* host_stream = dynamic_cast<perftools::gputools::host::HostStream*>(
    326       run_options->stream()->implementation());
    327   se::Stream* stream = run_options->stream();
    328   DeviceMemoryAllocator* memory_allocator = run_options->allocator();
    329   std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
    331   TF_RETURN_IF_ERROR(AllocateBuffers(
    332       memory_allocator, stream->parent()->device_ordinal(), &buffers));
    334   std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false);
    336       std::unique_ptr<ShapedBuffer> result_buffer,
    337       CreateResultShapedBuffer(run_options, buffers, &buffers_in_result));
    339   LogLiveAddresses(buffers, buffers_in_result);
    341   host_stream->EnqueueTask([this, run_options, arguments, buffers,
    342                             buffers_in_result, memory_allocator, stream]() {
    343     // Failing a CHECK here is not great, but I don't see an obvious way to
    344     // return a failed Status asynchronously.
    345     TF_CHECK_OK(ExecuteComputeFunction(&run_options->run_options(), arguments,
    346                                        buffers,
    347                                        /*hlo_execution_profile=*/nullptr));
    348     TF_CHECK_OK(DeallocateTempBuffers(memory_allocator, stream, buffers,
    349                                       buffers_in_result));
    350   });
    352   return std::move(result_buffer);
    353 }
    355 /*static*/ int64 CpuExecutable::ShapeSizeBytes(const Shape& shape) {
    356   // On the cpu, opaques are pointers.
    357   if (ShapeUtil::IsOpaque(shape)) {
    358     return sizeof(void*);
    359   }
    360   return ShapeUtil::ByteSizeOf(shape, sizeof(void*));
    361 }
    363 const PointsToSet& CpuExecutable::GetRootPointsToSet() const {
    364   return assignment_->points_to_analysis().GetPointsToSet(
    365       module().entry_computation()->root_instruction());
    366 }
    368 }  // namespace cpu
    369 }  // namespace xla