Home | History | Annotate | Download | only in cpu
      1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/compiler/xla/service/cpu/cpu_executable.h"
     17 
     18 #include <stdint.h>
     19 #include <algorithm>
     20 #include <set>
     21 #include <unordered_set>
     22 #include <utility>
     23 #include <vector>
     24 
     25 #include "llvm/ExecutionEngine/Orc/IRCompileLayer.h"
     26 #include "tensorflow/compiler/xla/service/buffer_assignment.h"
     27 #include "tensorflow/compiler/xla/service/computation_layout.h"
     28 #include "tensorflow/compiler/xla/service/hlo_computation.h"
     29 #include "tensorflow/compiler/xla/service/hlo_module.h"
     30 #include "tensorflow/compiler/xla/service/logical_buffer.h"
     31 #include "tensorflow/compiler/xla/service/shaped_buffer.h"
     32 #include "tensorflow/compiler/xla/shape_tree.h"
     33 #include "tensorflow/compiler/xla/shape_util.h"
     34 #include "tensorflow/compiler/xla/status_macros.h"
     35 #include "tensorflow/compiler/xla/types.h"
     36 #include "tensorflow/compiler/xla/util.h"
     37 #include "tensorflow/compiler/xla/xla_data.pb.h"
     38 #include "tensorflow/core/lib/strings/str_util.h"
     39 #include "tensorflow/core/lib/strings/strcat.h"
     40 #include "tensorflow/core/lib/strings/stringprintf.h"
     41 #include "tensorflow/core/platform/env.h"
     42 #include "tensorflow/core/platform/logging.h"
     43 #include "tensorflow/core/platform/mem.h"
     44 #include "tensorflow/core/platform/mutex.h"
     45 #include "tensorflow/core/platform/types.h"
     46 #include "tensorflow/stream_executor/host/host_stream.h"
     47 
     48 namespace se = ::perftools::gputools;
     49 
     50 namespace xla {
     51 namespace cpu {
     52 
     53 CpuExecutable::CpuExecutable(
     54     std::unique_ptr<SimpleOrcJIT> jit,
     55     std::unique_ptr<const BufferAssignment> assignment,
     56     std::unique_ptr<const HloModule> hlo_module,
     57     const string& entry_function_name,
     58     std::unique_ptr<HloProfilePrinterData> hlo_profile_printer_data,
     59     std::unique_ptr<HloProfileIndexMap> hlo_profile_index_map)
     60     : Executable(std::move(hlo_module), std::move(hlo_profile_printer_data),
     61                  std::move(hlo_profile_index_map)),
     62       jit_(std::move(jit)),
     63       assignment_(std::move(assignment)) {
     64   // Resolve symbols in the constructor rather than at execution time to avoid
     65   // races because FindSymbol is not thread safe.
     66   llvm::JITSymbol sym = jit_->FindCompiledSymbol(entry_function_name);
     67   // We expect to find the symbol provided with entry_function_name; otherwise
     68   // this is an internal error.
     69   CHECK(sym) << "Symbol " << entry_function_name << " not found.";
     70   // getAddress can do work under the hood in the jit, so it needs to be
     71   // guarded by the mutex.
     72   compute_function_ =
     73       reinterpret_cast<ComputeFunctionType>(cantFail(sym.getAddress()));
     74 }
     75 
     76 Status CpuExecutable::AllocateBuffers(
     77     DeviceMemoryAllocator* memory_allocator, int device_ordinal,
     78     std::vector<perftools::gputools::DeviceMemoryBase>* buffers) {
     79   CHECK_EQ(buffers->size(), assignment_->Allocations().size());
     80   VLOG(3) << "Allocating " << assignment_->Allocations().size()
     81           << " allocations for module " << module().name();
     82   for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size();
     83        ++i) {
     84     auto& allocation = assignment_->GetAllocation(i);
     85 
     86     VLOG(3) << allocation.ToString();
     87 
     88     if (allocation.is_entry_computation_parameter()) {
     89       VLOG(3) << "allocation #" << i << " is a parameter";
     90       continue;
     91     }
     92 
     93     if (allocation.is_thread_local()) {
     94       VLOG(3) << "buffer #" << i << " is thread-local";
     95       continue;
     96     }
     97 
     98     int64 buffer_size = allocation.size();
     99     if (!(*buffers)[i].is_null()) {
    100       VLOG(3) << "buffer #" << i
    101               << " is in the preallocated result ShapedBuffer";
    102     } else {
    103       TF_ASSIGN_OR_RETURN((*buffers)[i], memory_allocator->Allocate(
    104                                              device_ordinal, buffer_size));
    105 
    106       VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes ["
    107               << (*buffers)[i].opaque() << "]";
    108     }
    109 
    110     // Since the output buffer and all the temporary buffers were written into
    111     // by the JITed code, msan has no way of knowing their memory was
    112     // initialized. Mark them initialized so that msan doesn't flag loads from
    113     // these buffers.
    114     TF_ANNOTATE_MEMORY_IS_INITIALIZED((*buffers)[i].opaque(), buffer_size);
    115   }
    116 
    117   TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
    118                       assignment_->GetUniqueTopLevelOutputSlice());
    119   VLOG(3) << "result index: " << result_slice.index();
    120 
    121   return Status::OK();
    122 }
    123 
    124 Status CpuExecutable::ExecuteComputeFunction(
    125     const ExecutableRunOptions* run_options,
    126     tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
    127     tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
    128     HloExecutionProfile* hlo_execution_profile) {
    129   // The calling convention for JITed functions is:
    130   //
    131   //  void function(void* result, const void* run_options, void** args_array,
    132   //                void** temps_array)
    133   //
    134   // result: Points at the result.
    135   // run_options: the ExecutableRunOptions object.
    136   // args_array: An array of pointers, each of which points to a parameter.
    137   //               The size of this array is determined by the function's arity
    138   //               (ProgramShape).
    139   // temps_array:  An array of pointers, each of which points to a temporary
    140   //               buffer the computation needs. The size of this array is
    141   //               determined by buffer analysis.
    142   //
    143   std::vector<const void*> args_array;
    144   for (const ShapedBuffer* argument : arguments) {
    145     args_array.push_back(argument->root_buffer().opaque());
    146   }
    147 
    148   uint64 start_micros = tensorflow::Env::Default()->NowMicros();
    149 
    150   size_t profile_counters_size =
    151       hlo_execution_profile ? hlo_execution_profile->profile_counters().size()
    152                             : 0;
    153   int64* profile_counters =
    154       hlo_execution_profile
    155           ? hlo_execution_profile->mutable_profile_counters()->data()
    156           : nullptr;
    157 
    158   // Call the computation function following the calling convention.
    159   std::vector<void*> buffer_pointers;
    160   for (auto& buffer : buffers) {
    161     buffer_pointers.push_back(const_cast<void*>(buffer.opaque()));
    162   }
    163   TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
    164                       assignment_->GetUniqueTopLevelOutputSlice());
    165   void* result_buffer = buffer_pointers[result_slice.index()];
    166   if (VLOG_IS_ON(3)) {
    167     VLOG(3) << "Executing compute function:";
    168     VLOG(3) << tensorflow::strings::Printf(
    169         "  func(void* result, void* params[%zu], void* temps[%zu], "
    170         "uint64 profile_counters[%zu])",
    171         args_array.size(), buffer_pointers.size(), profile_counters_size);
    172     VLOG(3) << tensorflow::strings::Printf("    result = %p", result_buffer);
    173     auto ptr_printer = [](string* out, const void* p) {
    174       tensorflow::strings::StrAppend(out, tensorflow::strings::Printf("%p", p));
    175     };
    176     VLOG(3) << tensorflow::strings::Printf(
    177         "    params = [%s]",
    178         tensorflow::str_util::Join(args_array, ", ", ptr_printer).c_str());
    179     VLOG(3) << tensorflow::strings::Printf(
    180         "    temps = [%s]",
    181         tensorflow::str_util::Join(buffer_pointers, ", ", ptr_printer).c_str());
    182     VLOG(3) << tensorflow::strings::Printf("    profile_counters = %p",
    183                                            profile_counters);
    184   }
    185 
    186   compute_function_(result_buffer, run_options, args_array.data(),
    187                     buffer_pointers.data(), profile_counters);
    188 
    189   uint64 end_micros = tensorflow::Env::Default()->NowMicros();
    190 
    191   {
    192     tensorflow::mutex_lock lock(mutex_);
    193     const double nanoseconds = (end_micros - start_micros) * 1000.0;
    194     execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0));
    195     // If hlo profiling was disabled then the cycle count is left empty.
    196     if (hlo_execution_profile) {
    197       execution_profile_.set_compute_cycle_count(
    198           hlo_execution_profile->total_cycles_executed(
    199               *module().entry_computation()));
    200     }
    201   }
    202 
    203   return Status::OK();
    204 }
    205 
    206 static void LogLiveAddresses(
    207     tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
    208     const std::vector<bool>& buffers_in_result) {
    209   if (!VLOG_IS_ON(3)) {
    210     return;
    211   }
    212 
    213   CHECK_EQ(buffers.size(), buffers_in_result.size());
    214   std::vector<const void*> live_out_buffers;
    215   for (int i = 0; i < buffers.size(); ++i) {
    216     if (buffers_in_result[i]) {
    217       live_out_buffers.push_back(buffers[i].opaque());
    218     }
    219   }
    220   VLOG(3) << "Live addresses in output marking found "
    221           << live_out_buffers.size() << " addresses:\n"
    222           << tensorflow::str_util::Join(
    223                  live_out_buffers, ", ", [](string* out, const void* address) {
    224                    tensorflow::strings::StrAppend(
    225                        out, tensorflow::strings::Printf("%p", address));
    226                  });
    227 }
    228 
    229 static Status DeallocateTempBuffers(
    230     DeviceMemoryAllocator* allocator, se::Stream* stream,
    231     tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
    232     const std::vector<bool>& buffers_in_result) {
    233   // Keep those buffers in the output of the marked live because they are needed
    234   // by the service. They will be deallocated by the service.
    235   for (size_t i = 0; i < buffers.size(); ++i) {
    236     se::DeviceMemoryBase alloc = buffers[i];
    237     if (!buffers_in_result[i] && !alloc.is_null()) {
    238       VLOG(3) << "CpuExecutable deallocating buffer #" << i << " ["
    239               << alloc.opaque() << "]";
    240       TF_RETURN_IF_ERROR(
    241           allocator->Deallocate(stream->parent()->device_ordinal(), &alloc));
    242     }
    243   }
    244 
    245   return Status::OK();
    246 }
    247 
    248 StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::CreateResultShapedBuffer(
    249     const ServiceExecutableRunOptions* run_options,
    250     tensorflow::gtl::ArraySlice<perftools::gputools::DeviceMemoryBase>
    251         allocated_buffers,
    252     std::vector<bool>* buffers_in_result) {
    253   se::Stream* stream = run_options->stream();
    254   auto result_buffer = MakeUnique<ShapedBuffer>(
    255       /*on_host_shape=*/result_shape(), /*on_device_shape=*/result_shape(),
    256       stream->parent()->platform(), stream->parent()->device_ordinal());
    257 
    258   // Copy DeviceMemoryBase values which contain the array(s) of the result into
    259   // the respective location in ShapedBuffer which is returned to the caller.
    260   TF_RETURN_IF_ERROR(result_buffer->buffers().ForEachMutableElementWithStatus(
    261       [&](const ShapeIndex& index, se::DeviceMemoryBase* device_memory) {
    262         const auto& sources = this->GetRootPointsToSet().element(index);
    263         // The points to set is unambiguous so the set should be a
    264         // singleton.
    265         CHECK_EQ(1, sources.size());
    266         const LogicalBuffer* buffer_source = sources[0];
    267         HloInstruction* src = buffer_source->instruction();
    268 
    269         // The source for this result buffer can be a nested buffer such as
    270         // a tuple element. The source instruction should have a
    271         // non-parameter buffer assigned.
    272         TF_ASSIGN_OR_RETURN(
    273             const BufferAllocation::Slice slice,
    274             this->assignment_->GetUniqueSlice(src, buffer_source->index()));
    275         CHECK(!slice.allocation()->is_entry_computation_parameter());
    276 
    277         const BufferAllocation::Index buffer_index = slice.index();
    278         const se::DeviceMemoryBase& buffer = allocated_buffers[buffer_index];
    279         CHECK(!buffer.is_null() || buffer.size() == 0);
    280         *device_memory = buffer;
    281         (*buffers_in_result)[buffer_index] = true;
    282         return Status::OK();
    283       }));
    284   return std::move(result_buffer);
    285 }
    286 
    287 StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::ExecuteOnStream(
    288     const ServiceExecutableRunOptions* run_options,
    289     tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
    290     HloExecutionProfile* hlo_execution_profile) {
    291   if (GetRootPointsToSet().IsAmbiguous()) {
    292     return Unimplemented("Points-to set of root instruction is ambiguous");
    293   }
    294 
    295   se::Stream* stream = run_options->stream();
    296   DeviceMemoryAllocator* memory_allocator = run_options->allocator();
    297   std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
    298 
    299   TF_RETURN_IF_ERROR(AllocateBuffers(
    300       memory_allocator, stream->parent()->device_ordinal(), &buffers));
    301   TF_RETURN_IF_ERROR(ExecuteComputeFunction(
    302       &run_options->run_options(), arguments, buffers, hlo_execution_profile));
    303 
    304   std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false);
    305   TF_ASSIGN_OR_RETURN(
    306       std::unique_ptr<ShapedBuffer> result_buffer,
    307       CreateResultShapedBuffer(run_options, buffers, &buffers_in_result));
    308 
    309   // Free all buffers not in the result.
    310   TF_RETURN_IF_ERROR(DeallocateTempBuffers(memory_allocator, stream, buffers,
    311                                            buffers_in_result));
    312 
    313   return std::move(result_buffer);
    314 }
    315 
    316 StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::ExecuteAsyncOnStream(
    317     const ServiceExecutableRunOptions* run_options,
    318     tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments) {
    319   if (hlo_profiling_enabled()) {
    320     return Unimplemented(
    321         "Asynchronous execution on stream with hlo profiling is not yet "
    322         "supported on CPU.");
    323   }
    324 
    325   auto* host_stream = dynamic_cast<perftools::gputools::host::HostStream*>(
    326       run_options->stream()->implementation());
    327   se::Stream* stream = run_options->stream();
    328   DeviceMemoryAllocator* memory_allocator = run_options->allocator();
    329   std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
    330 
    331   TF_RETURN_IF_ERROR(AllocateBuffers(
    332       memory_allocator, stream->parent()->device_ordinal(), &buffers));
    333 
    334   std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false);
    335   TF_ASSIGN_OR_RETURN(
    336       std::unique_ptr<ShapedBuffer> result_buffer,
    337       CreateResultShapedBuffer(run_options, buffers, &buffers_in_result));
    338 
    339   LogLiveAddresses(buffers, buffers_in_result);
    340 
    341   host_stream->EnqueueTask([this, run_options, arguments, buffers,
    342                             buffers_in_result, memory_allocator, stream]() {
    343     // Failing a CHECK here is not great, but I don't see an obvious way to
    344     // return a failed Status asynchronously.
    345     TF_CHECK_OK(ExecuteComputeFunction(&run_options->run_options(), arguments,
    346                                        buffers,
    347                                        /*hlo_execution_profile=*/nullptr));
    348     TF_CHECK_OK(DeallocateTempBuffers(memory_allocator, stream, buffers,
    349                                       buffers_in_result));
    350   });
    351 
    352   return std::move(result_buffer);
    353 }
    354 
    355 /*static*/ int64 CpuExecutable::ShapeSizeBytes(const Shape& shape) {
    356   // On the cpu, opaques are pointers.
    357   if (ShapeUtil::IsOpaque(shape)) {
    358     return sizeof(void*);
    359   }
    360   return ShapeUtil::ByteSizeOf(shape, sizeof(void*));
    361 }
    362 
    363 const PointsToSet& CpuExecutable::GetRootPointsToSet() const {
    364   return assignment_->points_to_analysis().GetPointsToSet(
    365       module().entry_computation()->root_instruction());
    366 }
    367 
    368 }  // namespace cpu
    369 }  // namespace xla
    370