Home | History | Annotate | Download | only in cpu
      1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/compiler/xla/service/cpu/cpu_executable.h"
     17 
     18 #include <stdint.h>
     19 #include <algorithm>
     20 #include <set>
     21 #include <unordered_set>
     22 #include <utility>
     23 #include <vector>
     24 
     25 #include "absl/strings/str_cat.h"
     26 #include "absl/strings/str_format.h"
     27 #include "absl/strings/str_join.h"
     28 #include "llvm/ExecutionEngine/Orc/IRCompileLayer.h"
     29 #include "tensorflow/compiler/xla/service/buffer_assignment.h"
     30 #include "tensorflow/compiler/xla/service/computation_layout.h"
     31 #include "tensorflow/compiler/xla/service/hlo_computation.h"
     32 #include "tensorflow/compiler/xla/service/hlo_module.h"
     33 #include "tensorflow/compiler/xla/service/logical_buffer.h"
     34 #include "tensorflow/compiler/xla/service/shaped_buffer.h"
     35 #include "tensorflow/compiler/xla/shape_tree.h"
     36 #include "tensorflow/compiler/xla/shape_util.h"
     37 #include "tensorflow/compiler/xla/status_macros.h"
     38 #include "tensorflow/compiler/xla/types.h"
     39 #include "tensorflow/compiler/xla/util.h"
     40 #include "tensorflow/compiler/xla/xla_data.pb.h"
     41 #include "tensorflow/core/platform/env.h"
     42 #include "tensorflow/core/platform/logging.h"
     43 #include "tensorflow/core/platform/mem.h"
     44 #include "tensorflow/core/platform/mutex.h"
     45 #include "tensorflow/core/platform/types.h"
     46 #include "tensorflow/stream_executor/host/host_stream.h"
     47 
     48 namespace xla {
     49 namespace cpu {
     50 
     51 CpuExecutable::CpuExecutable(
     52     std::unique_ptr<SimpleOrcJIT> jit,
     53     std::unique_ptr<const BufferAssignment> assignment,
     54     std::unique_ptr<HloModule> hlo_module, const string& entry_function_name,
     55     std::unique_ptr<HloProfilePrinterData> hlo_profile_printer_data,
     56     std::unique_ptr<HloProfileIndexMap> hlo_profile_index_map)
     57     : Executable(std::move(hlo_module), std::move(hlo_profile_printer_data),
     58                  std::move(hlo_profile_index_map)),
     59       jit_(std::move(jit)),
     60       assignment_(std::move(assignment)) {
     61   // Resolve symbols in the constructor rather than at execution time to avoid
     62   // races because FindSymbol is not thread safe.
     63   llvm::JITSymbol sym = jit_->FindCompiledSymbol(entry_function_name);
     64   // We expect to find the symbol provided with entry_function_name; otherwise
     65   // this is an internal error.
     66   CHECK(sym) << "Symbol " << entry_function_name << " not found.";
     67   // getAddress can do work under the hood in the jit, so it needs to be
     68   // guarded by the mutex.
     69   compute_function_ =
     70       reinterpret_cast<ComputeFunctionType>(cantFail(sym.getAddress()));
     71   VLOG(1) << "compute_function_ at address "
     72           << reinterpret_cast<void*>(compute_function_);
     73 }
     74 
     75 StatusOr<std::pair<std::vector<se::DeviceMemoryBase>,
     76                    std::vector<OwningDeviceMemory>>>
     77 CpuExecutable::CreateBufferTable(
     78     DeviceMemoryAllocator* memory_allocator, int device_ordinal,
     79     absl::Span<const ShapedBuffer* const> arguments) {
     80   std::vector<se::DeviceMemoryBase> unowning_buffers(
     81       assignment_->Allocations().size());
     82   std::vector<OwningDeviceMemory> owning_buffers(
     83       assignment_->Allocations().size());
     84   VLOG(3) << "Allocating " << assignment_->Allocations().size()
     85           << " allocations for module " << module().name();
     86   for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size();
     87        ++i) {
     88     auto& allocation = assignment_->GetAllocation(i);
     89 
     90     VLOG(3) << allocation.ToString();
     91 
     92     if (allocation.is_entry_computation_parameter()) {
     93       unowning_buffers[i] = arguments[allocation.parameter_number()]->buffer(
     94           allocation.param_shape_index());
     95       VLOG(3) << "allocation #" << i << " is a parameter";
     96       continue;
     97     }
     98 
     99     if (allocation.is_constant()) {
    100       VLOG(3) << "allocation #" << i << " is a constant";
    101       continue;
    102     }
    103 
    104     if (allocation.is_thread_local()) {
    105       VLOG(3) << "buffer #" << i << " is thread-local";
    106       continue;
    107     }
    108 
    109     int64 buffer_size = allocation.size();
    110     if (!owning_buffers[i].is_null()) {
    111       VLOG(3) << "buffer #" << i
    112               << " is in the preallocated result ShapedBuffer";
    113     } else {
    114       TF_ASSIGN_OR_RETURN(owning_buffers[i], memory_allocator->Allocate(
    115                                                  device_ordinal, buffer_size));
    116       unowning_buffers[i] = owning_buffers[i].AsDeviceMemoryBase();
    117 
    118       VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes ["
    119               << owning_buffers[i].opaque() << "]";
    120     }
    121 
    122     // Since the output buffer and all the temporary buffers were written into
    123     // by the JITed code, msan has no way of knowing their memory was
    124     // initialized. Mark them initialized so that msan doesn't flag loads from
    125     // these buffers.
    126     TF_ANNOTATE_MEMORY_IS_INITIALIZED(owning_buffers[i].opaque(), buffer_size);
    127   }
    128 
    129   TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
    130                       assignment_->GetUniqueTopLevelOutputSlice());
    131   VLOG(3) << "result index: " << result_slice.index();
    132 
    133   return {{std::move(unowning_buffers), std::move(owning_buffers)}};
    134 }
    135 
    136 Status CpuExecutable::ExecuteComputeFunction(
    137     const ExecutableRunOptions* run_options,
    138     absl::Span<const se::DeviceMemoryBase> buffers,
    139     HloExecutionProfile* hlo_execution_profile) {
    140   // The calling convention for JITed functions is:
    141   //
    142   //  void function(void* result, const void* run_options, void** args_array,
    143   //                void** buffer_table)
    144   //
    145   // result: Points at the result.
    146   // run_options: the ExecutableRunOptions object.
    147   // args_array: null
    148   // buffer_table: An array of pointers, containing pointers to temporary
    149   //   buffers required by the executable adn pointers to entry computation
    150   //   parameters.
    151   //
    152 
    153   uint64 start_micros = tensorflow::Env::Default()->NowMicros();
    154 
    155   size_t profile_counters_size =
    156       hlo_execution_profile ? hlo_execution_profile->profile_counters().size()
    157                             : 0;
    158   int64* profile_counters =
    159       hlo_execution_profile
    160           ? hlo_execution_profile->mutable_profile_counters()->data()
    161           : nullptr;
    162 
    163   // Call the computation function following the calling convention.
    164   std::vector<void*> buffer_pointers;
    165   for (auto& buffer : buffers) {
    166     buffer_pointers.push_back(const_cast<void*>(buffer.opaque()));
    167   }
    168   TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
    169                       assignment_->GetUniqueTopLevelOutputSlice());
    170   void* result_buffer = buffer_pointers[result_slice.index()];
    171   if (VLOG_IS_ON(3)) {
    172     VLOG(3) << "Executing compute function:";
    173     VLOG(3) << absl::StrFormat(
    174         "  func(void* result, void* params[null], void* buffer_table[%u], "
    175         "uint64 profile_counters[%u])",
    176         buffer_pointers.size(), profile_counters_size);
    177     VLOG(3) << absl::StrFormat("    result = %p", result_buffer);
    178     auto ptr_printer = [](string* out, const void* p) {
    179       absl::StrAppend(out, absl::StrFormat("%p", p));
    180     };
    181     VLOG(3) << "    params = nullptr";
    182     VLOG(3) << absl::StrFormat(
    183         "    buffer_table = [%s]",
    184         absl::StrJoin(buffer_pointers, ", ", ptr_printer));
    185     VLOG(3) << absl::StrFormat("    profile_counters = %p", profile_counters);
    186   }
    187 
    188   compute_function_(result_buffer, run_options, nullptr, buffer_pointers.data(),
    189                     profile_counters);
    190 
    191   uint64 end_micros = tensorflow::Env::Default()->NowMicros();
    192 
    193   {
    194     tensorflow::mutex_lock lock(mutex_);
    195     const double nanoseconds = (end_micros - start_micros) * 1000.0;
    196     execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0));
    197     // If hlo profiling was disabled then the cycle count is left empty.
    198     if (hlo_execution_profile) {
    199       execution_profile_.set_compute_cycle_count(
    200           hlo_execution_profile->total_cycles_executed(
    201               *module().entry_computation()));
    202     }
    203   }
    204 
    205   return Status::OK();
    206 }
    207 
    208 StatusOr<ScopedShapedBuffer> CpuExecutable::CreateResultShapedBuffer(
    209     const ServiceExecutableRunOptions* run_options,
    210     absl::Span<OwningDeviceMemory> buffers) {
    211   se::Stream* stream = run_options->stream();
    212   ScopedShapedBuffer result_buffer(
    213       /*on_host_shape=*/result_shape(),
    214       /*on_device_shape=*/result_shape(), run_options->allocator(),
    215       stream->parent()->device_ordinal());
    216   const HloInputOutputAliasConfig& input_output_alias =
    217       module().input_output_alias_config();
    218 
    219   // Move OwningDeviceMemory values which contain the array(s) of the result
    220   // into the respective location in ScopedShapedBuffer which is returned to the
    221   // caller.
    222   TF_RETURN_IF_ERROR(result_buffer.buffers().ForEachMutableElementWithStatus(
    223       [&](const ShapeIndex& index, se::DeviceMemoryBase* device_memory) {
    224         const auto& sources = this->GetRootPointsToSet().element(index);
    225         // The points to set is unambiguous so the set should be a
    226         // singleton.
    227         CHECK_EQ(1, sources.size());
    228         const LogicalBuffer* buffer_source = sources[0];
    229         HloInstruction* src = buffer_source->instruction();
    230 
    231         // The source for this result buffer can be a nested buffer such as
    232         // a tuple element. The source instruction should have a
    233         // non-parameter buffer assigned.
    234         TF_ASSIGN_OR_RETURN(
    235             const BufferAllocation::Slice slice,
    236             this->assignment_->GetUniqueSlice(src, buffer_source->index()));
    237         const BufferAllocation::Index buffer_index = slice.index();
    238         OwningDeviceMemory& buffer = buffers[buffer_index];
    239         if (!slice.allocation()->is_entry_computation_parameter()) {
    240           // If the buffer coming out of the result is from a parameter, the
    241           // owning buffer will be null, and that means the caller aliased some
    242           // parameter buffer to an output one (via the
    243           // HloInputOutputAliasConfig API). If that is the case, the caller
    244           // will receive a partially complete scoped shaped buffer, which they
    245           // will have to fill up on return. Unfortunately the interface to the
    246           // execute APIs are ShapedBuffer pointer based, which assumes caller
    247           // ownership, and hence a buffer coming from there cannot be part of
    248           // the new ScopedShapedBuffer we create for the result (which assumes
    249           // ownership).
    250           *device_memory = buffer.Forget();
    251         } else {
    252           auto output_alias = input_output_alias.GetAliasedOutput(
    253               slice.allocation()->parameter_number(),
    254               slice.allocation()->param_shape_index());
    255           CHECK(output_alias)
    256               << "Ouput buffer is coming from parameter "
    257               << slice.allocation()->parameter_number() << " at index "
    258               << slice.allocation()->param_shape_index()
    259               << ", but no alias exists";
    260           CHECK_EQ(*output_alias, index);
    261         }
    262         return Status::OK();
    263       }));
    264   return std::move(result_buffer);
    265 }
    266 
    267 StatusOr<ScopedShapedBuffer> CpuExecutable::ExecuteOnStream(
    268     const ServiceExecutableRunOptions* run_options,
    269     absl::Span<const ShapedBuffer* const> arguments,
    270     HloExecutionProfile* hlo_execution_profile) {
    271   TF_ASSIGN_OR_RETURN(
    272       auto result,
    273       ExecuteAsyncOnStreamImpl(run_options, arguments, hlo_execution_profile));
    274   TF_RETURN_IF_ERROR(run_options->stream()->BlockHostUntilDone());
    275   return std::move(result);
    276 }
    277 
    278 StatusOr<ScopedShapedBuffer> CpuExecutable::ExecuteAsyncOnStream(
    279     const ServiceExecutableRunOptions* run_options,
    280     absl::Span<const ShapedBuffer* const> arguments) {
    281   if (hlo_profiling_enabled()) {
    282     return Unimplemented(
    283         "Asynchronous execution on stream with hlo profiling is not yet "
    284         "supported on CPU.");
    285   }
    286   return ExecuteAsyncOnStreamImpl(run_options, arguments, nullptr);
    287 }
    288 
    289 StatusOr<ScopedShapedBuffer> CpuExecutable::ExecuteAsyncOnStreamImpl(
    290     const ServiceExecutableRunOptions* run_options,
    291     absl::Span<const ShapedBuffer* const> arguments,
    292     HloExecutionProfile* hlo_execution_profile) {
    293   if (GetRootPointsToSet().IsAmbiguous()) {
    294     return Unimplemented("Points-to set of root instruction is ambiguous");
    295   }
    296 
    297   auto* host_stream = dynamic_cast<se::host::HostStream*>(
    298       run_options->stream()->implementation());
    299   se::Stream* stream = run_options->stream();
    300   DeviceMemoryAllocator* memory_allocator = run_options->allocator();
    301   std::vector<OwningDeviceMemory> owning_buffers;
    302   std::vector<se::DeviceMemoryBase> unowning_buffers;
    303   TF_ASSIGN_OR_RETURN(
    304       std::tie(unowning_buffers, owning_buffers),
    305       CreateBufferTable(memory_allocator, stream->parent()->device_ordinal(),
    306                         arguments));
    307 
    308   TF_ASSIGN_OR_RETURN(
    309       ScopedShapedBuffer result,
    310       CreateResultShapedBuffer(run_options, absl::MakeSpan(owning_buffers)));
    311 
    312   // At this point, `unowning_buffers` contains unowning pointers to all of our
    313   // buffers, and `buffers` contains owning pointers to the non-live-out
    314   // buffers.  Enqueue a task which keeps alive the non-live-out buffers.
    315   //
    316   // Logically we want this lambda to capture `buffers` by move, ultimately our
    317   // functor needs to be wrapped in an std::function, and that requires its
    318   // functor to be copyable.  Thus we perpitrate the hack of capturing buffers
    319   // "by shared pointer".
    320   //
    321   // We also need to change the types of some of the variables we capture:
    322   // run_options needs to change from a pointer to a value type, and arguments
    323   // needs to change from a Span into a vector.  We use a struct instead
    324   // of a lambda to make this explicit.
    325   struct AsyncRunTask {
    326     CpuExecutable* executable;
    327     ServiceExecutableRunOptions run_options;
    328     std::vector<se::DeviceMemoryBase> unowning_buffers;
    329     std::shared_ptr<std::vector<OwningDeviceMemory>> buffers;
    330     HloExecutionProfile* hlo_execution_profile;
    331 
    332     void operator()() {
    333       // Failing a CHECK here is not great, but I don't see an obvious way to
    334       // return a failed Status asynchronously.
    335       TF_CHECK_OK(executable->ExecuteComputeFunction(
    336           &run_options.run_options(), unowning_buffers, hlo_execution_profile));
    337     }
    338   };
    339   host_stream->EnqueueTask(
    340       AsyncRunTask{this, *run_options, std::move(unowning_buffers),
    341                    std::make_shared<std::vector<OwningDeviceMemory>>(
    342                        std::move(owning_buffers)),
    343                    hlo_execution_profile});
    344 
    345   return std::move(result);
    346 }
    347 
    348 /*static*/ int64 CpuExecutable::ShapeSizeBytes(const Shape& shape) {
    349   // On the cpu, opaques are pointers.
    350   if (shape.IsOpaque()) {
    351     return sizeof(void*);
    352   }
    353   return ShapeUtil::ByteSizeOf(shape, sizeof(void*));
    354 }
    355 
    356 const PointsToSet& CpuExecutable::GetRootPointsToSet() const {
    357   return assignment_->points_to_analysis().GetPointsToSet(
    358       module().entry_computation()->root_instruction());
    359 }
    360 
    361 }  // namespace cpu
    362 }  // namespace xla
    363