1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/compiler/xla/service/cpu/cpu_executable.h" 17 18 #include <stdint.h> 19 #include <algorithm> 20 #include <set> 21 #include <unordered_set> 22 #include <utility> 23 #include <vector> 24 25 #include "absl/strings/str_cat.h" 26 #include "absl/strings/str_format.h" 27 #include "absl/strings/str_join.h" 28 #include "llvm/ExecutionEngine/Orc/IRCompileLayer.h" 29 #include "tensorflow/compiler/xla/service/buffer_assignment.h" 30 #include "tensorflow/compiler/xla/service/computation_layout.h" 31 #include "tensorflow/compiler/xla/service/hlo_computation.h" 32 #include "tensorflow/compiler/xla/service/hlo_module.h" 33 #include "tensorflow/compiler/xla/service/logical_buffer.h" 34 #include "tensorflow/compiler/xla/service/shaped_buffer.h" 35 #include "tensorflow/compiler/xla/shape_tree.h" 36 #include "tensorflow/compiler/xla/shape_util.h" 37 #include "tensorflow/compiler/xla/status_macros.h" 38 #include "tensorflow/compiler/xla/types.h" 39 #include "tensorflow/compiler/xla/util.h" 40 #include "tensorflow/compiler/xla/xla_data.pb.h" 41 #include "tensorflow/core/platform/env.h" 42 #include "tensorflow/core/platform/logging.h" 43 #include "tensorflow/core/platform/mem.h" 44 #include "tensorflow/core/platform/mutex.h" 45 #include "tensorflow/core/platform/types.h" 46 #include "tensorflow/stream_executor/host/host_stream.h" 47 48 namespace xla { 49 namespace cpu { 50 51 CpuExecutable::CpuExecutable( 52 std::unique_ptr<SimpleOrcJIT> jit, 53 std::unique_ptr<const BufferAssignment> assignment, 54 std::unique_ptr<HloModule> hlo_module, const string& entry_function_name, 55 std::unique_ptr<HloProfilePrinterData> hlo_profile_printer_data, 56 std::unique_ptr<HloProfileIndexMap> hlo_profile_index_map) 57 : Executable(std::move(hlo_module), std::move(hlo_profile_printer_data), 58 std::move(hlo_profile_index_map)), 59 jit_(std::move(jit)), 60 assignment_(std::move(assignment)) { 61 // Resolve symbols in the constructor rather than at execution time to avoid 62 // races because FindSymbol is not thread safe. 63 llvm::JITSymbol sym = jit_->FindCompiledSymbol(entry_function_name); 64 // We expect to find the symbol provided with entry_function_name; otherwise 65 // this is an internal error. 66 CHECK(sym) << "Symbol " << entry_function_name << " not found."; 67 // getAddress can do work under the hood in the jit, so it needs to be 68 // guarded by the mutex. 69 compute_function_ = 70 reinterpret_cast<ComputeFunctionType>(cantFail(sym.getAddress())); 71 VLOG(1) << "compute_function_ at address " 72 << reinterpret_cast<void*>(compute_function_); 73 } 74 75 StatusOr<std::pair<std::vector<se::DeviceMemoryBase>, 76 std::vector<OwningDeviceMemory>>> 77 CpuExecutable::CreateBufferTable( 78 DeviceMemoryAllocator* memory_allocator, int device_ordinal, 79 absl::Span<const ShapedBuffer* const> arguments) { 80 std::vector<se::DeviceMemoryBase> unowning_buffers( 81 assignment_->Allocations().size()); 82 std::vector<OwningDeviceMemory> owning_buffers( 83 assignment_->Allocations().size()); 84 VLOG(3) << "Allocating " << assignment_->Allocations().size() 85 << " allocations for module " << module().name(); 86 for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size(); 87 ++i) { 88 auto& allocation = assignment_->GetAllocation(i); 89 90 VLOG(3) << allocation.ToString(); 91 92 if (allocation.is_entry_computation_parameter()) { 93 unowning_buffers[i] = arguments[allocation.parameter_number()]->buffer( 94 allocation.param_shape_index()); 95 VLOG(3) << "allocation #" << i << " is a parameter"; 96 continue; 97 } 98 99 if (allocation.is_constant()) { 100 VLOG(3) << "allocation #" << i << " is a constant"; 101 continue; 102 } 103 104 if (allocation.is_thread_local()) { 105 VLOG(3) << "buffer #" << i << " is thread-local"; 106 continue; 107 } 108 109 int64 buffer_size = allocation.size(); 110 if (!owning_buffers[i].is_null()) { 111 VLOG(3) << "buffer #" << i 112 << " is in the preallocated result ShapedBuffer"; 113 } else { 114 TF_ASSIGN_OR_RETURN(owning_buffers[i], memory_allocator->Allocate( 115 device_ordinal, buffer_size)); 116 unowning_buffers[i] = owning_buffers[i].AsDeviceMemoryBase(); 117 118 VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes [" 119 << owning_buffers[i].opaque() << "]"; 120 } 121 122 // Since the output buffer and all the temporary buffers were written into 123 // by the JITed code, msan has no way of knowing their memory was 124 // initialized. Mark them initialized so that msan doesn't flag loads from 125 // these buffers. 126 TF_ANNOTATE_MEMORY_IS_INITIALIZED(owning_buffers[i].opaque(), buffer_size); 127 } 128 129 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 130 assignment_->GetUniqueTopLevelOutputSlice()); 131 VLOG(3) << "result index: " << result_slice.index(); 132 133 return {{std::move(unowning_buffers), std::move(owning_buffers)}}; 134 } 135 136 Status CpuExecutable::ExecuteComputeFunction( 137 const ExecutableRunOptions* run_options, 138 absl::Span<const se::DeviceMemoryBase> buffers, 139 HloExecutionProfile* hlo_execution_profile) { 140 // The calling convention for JITed functions is: 141 // 142 // void function(void* result, const void* run_options, void** args_array, 143 // void** buffer_table) 144 // 145 // result: Points at the result. 146 // run_options: the ExecutableRunOptions object. 147 // args_array: null 148 // buffer_table: An array of pointers, containing pointers to temporary 149 // buffers required by the executable adn pointers to entry computation 150 // parameters. 151 // 152 153 uint64 start_micros = tensorflow::Env::Default()->NowMicros(); 154 155 size_t profile_counters_size = 156 hlo_execution_profile ? hlo_execution_profile->profile_counters().size() 157 : 0; 158 int64* profile_counters = 159 hlo_execution_profile 160 ? hlo_execution_profile->mutable_profile_counters()->data() 161 : nullptr; 162 163 // Call the computation function following the calling convention. 164 std::vector<void*> buffer_pointers; 165 for (auto& buffer : buffers) { 166 buffer_pointers.push_back(const_cast<void*>(buffer.opaque())); 167 } 168 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 169 assignment_->GetUniqueTopLevelOutputSlice()); 170 void* result_buffer = buffer_pointers[result_slice.index()]; 171 if (VLOG_IS_ON(3)) { 172 VLOG(3) << "Executing compute function:"; 173 VLOG(3) << absl::StrFormat( 174 " func(void* result, void* params[null], void* buffer_table[%u], " 175 "uint64 profile_counters[%u])", 176 buffer_pointers.size(), profile_counters_size); 177 VLOG(3) << absl::StrFormat(" result = %p", result_buffer); 178 auto ptr_printer = [](string* out, const void* p) { 179 absl::StrAppend(out, absl::StrFormat("%p", p)); 180 }; 181 VLOG(3) << " params = nullptr"; 182 VLOG(3) << absl::StrFormat( 183 " buffer_table = [%s]", 184 absl::StrJoin(buffer_pointers, ", ", ptr_printer)); 185 VLOG(3) << absl::StrFormat(" profile_counters = %p", profile_counters); 186 } 187 188 compute_function_(result_buffer, run_options, nullptr, buffer_pointers.data(), 189 profile_counters); 190 191 uint64 end_micros = tensorflow::Env::Default()->NowMicros(); 192 193 { 194 tensorflow::mutex_lock lock(mutex_); 195 const double nanoseconds = (end_micros - start_micros) * 1000.0; 196 execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0)); 197 // If hlo profiling was disabled then the cycle count is left empty. 198 if (hlo_execution_profile) { 199 execution_profile_.set_compute_cycle_count( 200 hlo_execution_profile->total_cycles_executed( 201 *module().entry_computation())); 202 } 203 } 204 205 return Status::OK(); 206 } 207 208 StatusOr<ScopedShapedBuffer> CpuExecutable::CreateResultShapedBuffer( 209 const ServiceExecutableRunOptions* run_options, 210 absl::Span<OwningDeviceMemory> buffers) { 211 se::Stream* stream = run_options->stream(); 212 ScopedShapedBuffer result_buffer( 213 /*on_host_shape=*/result_shape(), 214 /*on_device_shape=*/result_shape(), run_options->allocator(), 215 stream->parent()->device_ordinal()); 216 const HloInputOutputAliasConfig& input_output_alias = 217 module().input_output_alias_config(); 218 219 // Move OwningDeviceMemory values which contain the array(s) of the result 220 // into the respective location in ScopedShapedBuffer which is returned to the 221 // caller. 222 TF_RETURN_IF_ERROR(result_buffer.buffers().ForEachMutableElementWithStatus( 223 [&](const ShapeIndex& index, se::DeviceMemoryBase* device_memory) { 224 const auto& sources = this->GetRootPointsToSet().element(index); 225 // The points to set is unambiguous so the set should be a 226 // singleton. 227 CHECK_EQ(1, sources.size()); 228 const LogicalBuffer* buffer_source = sources[0]; 229 HloInstruction* src = buffer_source->instruction(); 230 231 // The source for this result buffer can be a nested buffer such as 232 // a tuple element. The source instruction should have a 233 // non-parameter buffer assigned. 234 TF_ASSIGN_OR_RETURN( 235 const BufferAllocation::Slice slice, 236 this->assignment_->GetUniqueSlice(src, buffer_source->index())); 237 const BufferAllocation::Index buffer_index = slice.index(); 238 OwningDeviceMemory& buffer = buffers[buffer_index]; 239 if (!slice.allocation()->is_entry_computation_parameter()) { 240 // If the buffer coming out of the result is from a parameter, the 241 // owning buffer will be null, and that means the caller aliased some 242 // parameter buffer to an output one (via the 243 // HloInputOutputAliasConfig API). If that is the case, the caller 244 // will receive a partially complete scoped shaped buffer, which they 245 // will have to fill up on return. Unfortunately the interface to the 246 // execute APIs are ShapedBuffer pointer based, which assumes caller 247 // ownership, and hence a buffer coming from there cannot be part of 248 // the new ScopedShapedBuffer we create for the result (which assumes 249 // ownership). 250 *device_memory = buffer.Forget(); 251 } else { 252 auto output_alias = input_output_alias.GetAliasedOutput( 253 slice.allocation()->parameter_number(), 254 slice.allocation()->param_shape_index()); 255 CHECK(output_alias) 256 << "Ouput buffer is coming from parameter " 257 << slice.allocation()->parameter_number() << " at index " 258 << slice.allocation()->param_shape_index() 259 << ", but no alias exists"; 260 CHECK_EQ(*output_alias, index); 261 } 262 return Status::OK(); 263 })); 264 return std::move(result_buffer); 265 } 266 267 StatusOr<ScopedShapedBuffer> CpuExecutable::ExecuteOnStream( 268 const ServiceExecutableRunOptions* run_options, 269 absl::Span<const ShapedBuffer* const> arguments, 270 HloExecutionProfile* hlo_execution_profile) { 271 TF_ASSIGN_OR_RETURN( 272 auto result, 273 ExecuteAsyncOnStreamImpl(run_options, arguments, hlo_execution_profile)); 274 TF_RETURN_IF_ERROR(run_options->stream()->BlockHostUntilDone()); 275 return std::move(result); 276 } 277 278 StatusOr<ScopedShapedBuffer> CpuExecutable::ExecuteAsyncOnStream( 279 const ServiceExecutableRunOptions* run_options, 280 absl::Span<const ShapedBuffer* const> arguments) { 281 if (hlo_profiling_enabled()) { 282 return Unimplemented( 283 "Asynchronous execution on stream with hlo profiling is not yet " 284 "supported on CPU."); 285 } 286 return ExecuteAsyncOnStreamImpl(run_options, arguments, nullptr); 287 } 288 289 StatusOr<ScopedShapedBuffer> CpuExecutable::ExecuteAsyncOnStreamImpl( 290 const ServiceExecutableRunOptions* run_options, 291 absl::Span<const ShapedBuffer* const> arguments, 292 HloExecutionProfile* hlo_execution_profile) { 293 if (GetRootPointsToSet().IsAmbiguous()) { 294 return Unimplemented("Points-to set of root instruction is ambiguous"); 295 } 296 297 auto* host_stream = dynamic_cast<se::host::HostStream*>( 298 run_options->stream()->implementation()); 299 se::Stream* stream = run_options->stream(); 300 DeviceMemoryAllocator* memory_allocator = run_options->allocator(); 301 std::vector<OwningDeviceMemory> owning_buffers; 302 std::vector<se::DeviceMemoryBase> unowning_buffers; 303 TF_ASSIGN_OR_RETURN( 304 std::tie(unowning_buffers, owning_buffers), 305 CreateBufferTable(memory_allocator, stream->parent()->device_ordinal(), 306 arguments)); 307 308 TF_ASSIGN_OR_RETURN( 309 ScopedShapedBuffer result, 310 CreateResultShapedBuffer(run_options, absl::MakeSpan(owning_buffers))); 311 312 // At this point, `unowning_buffers` contains unowning pointers to all of our 313 // buffers, and `buffers` contains owning pointers to the non-live-out 314 // buffers. Enqueue a task which keeps alive the non-live-out buffers. 315 // 316 // Logically we want this lambda to capture `buffers` by move, ultimately our 317 // functor needs to be wrapped in an std::function, and that requires its 318 // functor to be copyable. Thus we perpitrate the hack of capturing buffers 319 // "by shared pointer". 320 // 321 // We also need to change the types of some of the variables we capture: 322 // run_options needs to change from a pointer to a value type, and arguments 323 // needs to change from a Span into a vector. We use a struct instead 324 // of a lambda to make this explicit. 325 struct AsyncRunTask { 326 CpuExecutable* executable; 327 ServiceExecutableRunOptions run_options; 328 std::vector<se::DeviceMemoryBase> unowning_buffers; 329 std::shared_ptr<std::vector<OwningDeviceMemory>> buffers; 330 HloExecutionProfile* hlo_execution_profile; 331 332 void operator()() { 333 // Failing a CHECK here is not great, but I don't see an obvious way to 334 // return a failed Status asynchronously. 335 TF_CHECK_OK(executable->ExecuteComputeFunction( 336 &run_options.run_options(), unowning_buffers, hlo_execution_profile)); 337 } 338 }; 339 host_stream->EnqueueTask( 340 AsyncRunTask{this, *run_options, std::move(unowning_buffers), 341 std::make_shared<std::vector<OwningDeviceMemory>>( 342 std::move(owning_buffers)), 343 hlo_execution_profile}); 344 345 return std::move(result); 346 } 347 348 /*static*/ int64 CpuExecutable::ShapeSizeBytes(const Shape& shape) { 349 // On the cpu, opaques are pointers. 350 if (shape.IsOpaque()) { 351 return sizeof(void*); 352 } 353 return ShapeUtil::ByteSizeOf(shape, sizeof(void*)); 354 } 355 356 const PointsToSet& CpuExecutable::GetRootPointsToSet() const { 357 return assignment_->points_to_analysis().GetPointsToSet( 358 module().entry_computation()->root_instruction()); 359 } 360 361 } // namespace cpu 362 } // namespace xla 363