1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/compiler/xla/service/cpu/cpu_executable.h" 17 18 #include <stdint.h> 19 #include <algorithm> 20 #include <set> 21 #include <unordered_set> 22 #include <utility> 23 #include <vector> 24 25 #include "llvm/ExecutionEngine/Orc/IRCompileLayer.h" 26 #include "tensorflow/compiler/xla/service/buffer_assignment.h" 27 #include "tensorflow/compiler/xla/service/computation_layout.h" 28 #include "tensorflow/compiler/xla/service/hlo_computation.h" 29 #include "tensorflow/compiler/xla/service/hlo_module.h" 30 #include "tensorflow/compiler/xla/service/logical_buffer.h" 31 #include "tensorflow/compiler/xla/service/shaped_buffer.h" 32 #include "tensorflow/compiler/xla/shape_tree.h" 33 #include "tensorflow/compiler/xla/shape_util.h" 34 #include "tensorflow/compiler/xla/status_macros.h" 35 #include "tensorflow/compiler/xla/types.h" 36 #include "tensorflow/compiler/xla/util.h" 37 #include "tensorflow/compiler/xla/xla_data.pb.h" 38 #include "tensorflow/core/lib/strings/str_util.h" 39 #include "tensorflow/core/lib/strings/strcat.h" 40 #include "tensorflow/core/lib/strings/stringprintf.h" 41 #include "tensorflow/core/platform/env.h" 42 #include "tensorflow/core/platform/logging.h" 43 #include "tensorflow/core/platform/mem.h" 44 #include "tensorflow/core/platform/mutex.h" 45 #include "tensorflow/core/platform/types.h" 46 #include "tensorflow/stream_executor/host/host_stream.h" 47 48 namespace se = ::perftools::gputools; 49 50 namespace xla { 51 namespace cpu { 52 53 CpuExecutable::CpuExecutable( 54 std::unique_ptr<SimpleOrcJIT> jit, 55 std::unique_ptr<const BufferAssignment> assignment, 56 std::unique_ptr<const HloModule> hlo_module, 57 const string& entry_function_name, 58 std::unique_ptr<HloProfilePrinterData> hlo_profile_printer_data, 59 std::unique_ptr<HloProfileIndexMap> hlo_profile_index_map) 60 : Executable(std::move(hlo_module), std::move(hlo_profile_printer_data), 61 std::move(hlo_profile_index_map)), 62 jit_(std::move(jit)), 63 assignment_(std::move(assignment)) { 64 // Resolve symbols in the constructor rather than at execution time to avoid 65 // races because FindSymbol is not thread safe. 66 llvm::JITSymbol sym = jit_->FindCompiledSymbol(entry_function_name); 67 // We expect to find the symbol provided with entry_function_name; otherwise 68 // this is an internal error. 69 CHECK(sym) << "Symbol " << entry_function_name << " not found."; 70 // getAddress can do work under the hood in the jit, so it needs to be 71 // guarded by the mutex. 72 compute_function_ = 73 reinterpret_cast<ComputeFunctionType>(cantFail(sym.getAddress())); 74 } 75 76 Status CpuExecutable::AllocateBuffers( 77 DeviceMemoryAllocator* memory_allocator, int device_ordinal, 78 std::vector<perftools::gputools::DeviceMemoryBase>* buffers) { 79 CHECK_EQ(buffers->size(), assignment_->Allocations().size()); 80 VLOG(3) << "Allocating " << assignment_->Allocations().size() 81 << " allocations for module " << module().name(); 82 for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size(); 83 ++i) { 84 auto& allocation = assignment_->GetAllocation(i); 85 86 VLOG(3) << allocation.ToString(); 87 88 if (allocation.is_entry_computation_parameter()) { 89 VLOG(3) << "allocation #" << i << " is a parameter"; 90 continue; 91 } 92 93 if (allocation.is_thread_local()) { 94 VLOG(3) << "buffer #" << i << " is thread-local"; 95 continue; 96 } 97 98 int64 buffer_size = allocation.size(); 99 if (!(*buffers)[i].is_null()) { 100 VLOG(3) << "buffer #" << i 101 << " is in the preallocated result ShapedBuffer"; 102 } else { 103 TF_ASSIGN_OR_RETURN((*buffers)[i], memory_allocator->Allocate( 104 device_ordinal, buffer_size)); 105 106 VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes [" 107 << (*buffers)[i].opaque() << "]"; 108 } 109 110 // Since the output buffer and all the temporary buffers were written into 111 // by the JITed code, msan has no way of knowing their memory was 112 // initialized. Mark them initialized so that msan doesn't flag loads from 113 // these buffers. 114 TF_ANNOTATE_MEMORY_IS_INITIALIZED((*buffers)[i].opaque(), buffer_size); 115 } 116 117 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 118 assignment_->GetUniqueTopLevelOutputSlice()); 119 VLOG(3) << "result index: " << result_slice.index(); 120 121 return Status::OK(); 122 } 123 124 Status CpuExecutable::ExecuteComputeFunction( 125 const ExecutableRunOptions* run_options, 126 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, 127 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, 128 HloExecutionProfile* hlo_execution_profile) { 129 // The calling convention for JITed functions is: 130 // 131 // void function(void* result, const void* run_options, void** args_array, 132 // void** temps_array) 133 // 134 // result: Points at the result. 135 // run_options: the ExecutableRunOptions object. 136 // args_array: An array of pointers, each of which points to a parameter. 137 // The size of this array is determined by the function's arity 138 // (ProgramShape). 139 // temps_array: An array of pointers, each of which points to a temporary 140 // buffer the computation needs. The size of this array is 141 // determined by buffer analysis. 142 // 143 std::vector<const void*> args_array; 144 for (const ShapedBuffer* argument : arguments) { 145 args_array.push_back(argument->root_buffer().opaque()); 146 } 147 148 uint64 start_micros = tensorflow::Env::Default()->NowMicros(); 149 150 size_t profile_counters_size = 151 hlo_execution_profile ? hlo_execution_profile->profile_counters().size() 152 : 0; 153 int64* profile_counters = 154 hlo_execution_profile 155 ? hlo_execution_profile->mutable_profile_counters()->data() 156 : nullptr; 157 158 // Call the computation function following the calling convention. 159 std::vector<void*> buffer_pointers; 160 for (auto& buffer : buffers) { 161 buffer_pointers.push_back(const_cast<void*>(buffer.opaque())); 162 } 163 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 164 assignment_->GetUniqueTopLevelOutputSlice()); 165 void* result_buffer = buffer_pointers[result_slice.index()]; 166 if (VLOG_IS_ON(3)) { 167 VLOG(3) << "Executing compute function:"; 168 VLOG(3) << tensorflow::strings::Printf( 169 " func(void* result, void* params[%zu], void* temps[%zu], " 170 "uint64 profile_counters[%zu])", 171 args_array.size(), buffer_pointers.size(), profile_counters_size); 172 VLOG(3) << tensorflow::strings::Printf(" result = %p", result_buffer); 173 auto ptr_printer = [](string* out, const void* p) { 174 tensorflow::strings::StrAppend(out, tensorflow::strings::Printf("%p", p)); 175 }; 176 VLOG(3) << tensorflow::strings::Printf( 177 " params = [%s]", 178 tensorflow::str_util::Join(args_array, ", ", ptr_printer).c_str()); 179 VLOG(3) << tensorflow::strings::Printf( 180 " temps = [%s]", 181 tensorflow::str_util::Join(buffer_pointers, ", ", ptr_printer).c_str()); 182 VLOG(3) << tensorflow::strings::Printf(" profile_counters = %p", 183 profile_counters); 184 } 185 186 compute_function_(result_buffer, run_options, args_array.data(), 187 buffer_pointers.data(), profile_counters); 188 189 uint64 end_micros = tensorflow::Env::Default()->NowMicros(); 190 191 { 192 tensorflow::mutex_lock lock(mutex_); 193 const double nanoseconds = (end_micros - start_micros) * 1000.0; 194 execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0)); 195 // If hlo profiling was disabled then the cycle count is left empty. 196 if (hlo_execution_profile) { 197 execution_profile_.set_compute_cycle_count( 198 hlo_execution_profile->total_cycles_executed( 199 *module().entry_computation())); 200 } 201 } 202 203 return Status::OK(); 204 } 205 206 static void LogLiveAddresses( 207 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, 208 const std::vector<bool>& buffers_in_result) { 209 if (!VLOG_IS_ON(3)) { 210 return; 211 } 212 213 CHECK_EQ(buffers.size(), buffers_in_result.size()); 214 std::vector<const void*> live_out_buffers; 215 for (int i = 0; i < buffers.size(); ++i) { 216 if (buffers_in_result[i]) { 217 live_out_buffers.push_back(buffers[i].opaque()); 218 } 219 } 220 VLOG(3) << "Live addresses in output marking found " 221 << live_out_buffers.size() << " addresses:\n" 222 << tensorflow::str_util::Join( 223 live_out_buffers, ", ", [](string* out, const void* address) { 224 tensorflow::strings::StrAppend( 225 out, tensorflow::strings::Printf("%p", address)); 226 }); 227 } 228 229 static Status DeallocateTempBuffers( 230 DeviceMemoryAllocator* allocator, se::Stream* stream, 231 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, 232 const std::vector<bool>& buffers_in_result) { 233 // Keep those buffers in the output of the marked live because they are needed 234 // by the service. They will be deallocated by the service. 235 for (size_t i = 0; i < buffers.size(); ++i) { 236 se::DeviceMemoryBase alloc = buffers[i]; 237 if (!buffers_in_result[i] && !alloc.is_null()) { 238 VLOG(3) << "CpuExecutable deallocating buffer #" << i << " [" 239 << alloc.opaque() << "]"; 240 TF_RETURN_IF_ERROR( 241 allocator->Deallocate(stream->parent()->device_ordinal(), &alloc)); 242 } 243 } 244 245 return Status::OK(); 246 } 247 248 StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::CreateResultShapedBuffer( 249 const ServiceExecutableRunOptions* run_options, 250 tensorflow::gtl::ArraySlice<perftools::gputools::DeviceMemoryBase> 251 allocated_buffers, 252 std::vector<bool>* buffers_in_result) { 253 se::Stream* stream = run_options->stream(); 254 auto result_buffer = MakeUnique<ShapedBuffer>( 255 /*on_host_shape=*/result_shape(), /*on_device_shape=*/result_shape(), 256 stream->parent()->platform(), stream->parent()->device_ordinal()); 257 258 // Copy DeviceMemoryBase values which contain the array(s) of the result into 259 // the respective location in ShapedBuffer which is returned to the caller. 260 TF_RETURN_IF_ERROR(result_buffer->buffers().ForEachMutableElementWithStatus( 261 [&](const ShapeIndex& index, se::DeviceMemoryBase* device_memory) { 262 const auto& sources = this->GetRootPointsToSet().element(index); 263 // The points to set is unambiguous so the set should be a 264 // singleton. 265 CHECK_EQ(1, sources.size()); 266 const LogicalBuffer* buffer_source = sources[0]; 267 HloInstruction* src = buffer_source->instruction(); 268 269 // The source for this result buffer can be a nested buffer such as 270 // a tuple element. The source instruction should have a 271 // non-parameter buffer assigned. 272 TF_ASSIGN_OR_RETURN( 273 const BufferAllocation::Slice slice, 274 this->assignment_->GetUniqueSlice(src, buffer_source->index())); 275 CHECK(!slice.allocation()->is_entry_computation_parameter()); 276 277 const BufferAllocation::Index buffer_index = slice.index(); 278 const se::DeviceMemoryBase& buffer = allocated_buffers[buffer_index]; 279 CHECK(!buffer.is_null() || buffer.size() == 0); 280 *device_memory = buffer; 281 (*buffers_in_result)[buffer_index] = true; 282 return Status::OK(); 283 })); 284 return std::move(result_buffer); 285 } 286 287 StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::ExecuteOnStream( 288 const ServiceExecutableRunOptions* run_options, 289 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, 290 HloExecutionProfile* hlo_execution_profile) { 291 if (GetRootPointsToSet().IsAmbiguous()) { 292 return Unimplemented("Points-to set of root instruction is ambiguous"); 293 } 294 295 se::Stream* stream = run_options->stream(); 296 DeviceMemoryAllocator* memory_allocator = run_options->allocator(); 297 std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size()); 298 299 TF_RETURN_IF_ERROR(AllocateBuffers( 300 memory_allocator, stream->parent()->device_ordinal(), &buffers)); 301 TF_RETURN_IF_ERROR(ExecuteComputeFunction( 302 &run_options->run_options(), arguments, buffers, hlo_execution_profile)); 303 304 std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false); 305 TF_ASSIGN_OR_RETURN( 306 std::unique_ptr<ShapedBuffer> result_buffer, 307 CreateResultShapedBuffer(run_options, buffers, &buffers_in_result)); 308 309 // Free all buffers not in the result. 310 TF_RETURN_IF_ERROR(DeallocateTempBuffers(memory_allocator, stream, buffers, 311 buffers_in_result)); 312 313 return std::move(result_buffer); 314 } 315 316 StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::ExecuteAsyncOnStream( 317 const ServiceExecutableRunOptions* run_options, 318 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments) { 319 if (hlo_profiling_enabled()) { 320 return Unimplemented( 321 "Asynchronous execution on stream with hlo profiling is not yet " 322 "supported on CPU."); 323 } 324 325 auto* host_stream = dynamic_cast<perftools::gputools::host::HostStream*>( 326 run_options->stream()->implementation()); 327 se::Stream* stream = run_options->stream(); 328 DeviceMemoryAllocator* memory_allocator = run_options->allocator(); 329 std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size()); 330 331 TF_RETURN_IF_ERROR(AllocateBuffers( 332 memory_allocator, stream->parent()->device_ordinal(), &buffers)); 333 334 std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false); 335 TF_ASSIGN_OR_RETURN( 336 std::unique_ptr<ShapedBuffer> result_buffer, 337 CreateResultShapedBuffer(run_options, buffers, &buffers_in_result)); 338 339 LogLiveAddresses(buffers, buffers_in_result); 340 341 host_stream->EnqueueTask([this, run_options, arguments, buffers, 342 buffers_in_result, memory_allocator, stream]() { 343 // Failing a CHECK here is not great, but I don't see an obvious way to 344 // return a failed Status asynchronously. 345 TF_CHECK_OK(ExecuteComputeFunction(&run_options->run_options(), arguments, 346 buffers, 347 /*hlo_execution_profile=*/nullptr)); 348 TF_CHECK_OK(DeallocateTempBuffers(memory_allocator, stream, buffers, 349 buffers_in_result)); 350 }); 351 352 return std::move(result_buffer); 353 } 354 355 /*static*/ int64 CpuExecutable::ShapeSizeBytes(const Shape& shape) { 356 // On the cpu, opaques are pointers. 357 if (ShapeUtil::IsOpaque(shape)) { 358 return sizeof(void*); 359 } 360 return ShapeUtil::ByteSizeOf(shape, sizeof(void*)); 361 } 362 363 const PointsToSet& CpuExecutable::GetRootPointsToSet() const { 364 return assignment_->points_to_analysis().GetPointsToSet( 365 module().entry_computation()->root_instruction()); 366 } 367 368 } // namespace cpu 369 } // namespace xla 370