1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/compiler/xla/service/cpu/parallel_task_assignment.h" 17 18 #include "tensorflow/compiler/xla/service/cpu/dot_op_emitter.h" 19 #include "tensorflow/compiler/xla/service/cpu/ir_emission_utils.h" 20 #include "tensorflow/compiler/xla/service/cpu/shape_partition.h" 21 #include "tensorflow/compiler/xla/service/hlo_computation.h" 22 #include "tensorflow/compiler/xla/service/hlo_instruction.h" 23 #include "tensorflow/compiler/xla/service/hlo_opcode.h" 24 25 namespace xla { 26 namespace cpu { 27 28 class SimpleCostModel : public ParallelCostModel { 29 public: 30 SimpleCostModel(const int64 max_parallelism, 31 const HloCostAnalysis::ShapeSizeFunction& shape_size) 32 : max_parallelism_(max_parallelism), shape_size_(shape_size) {} 33 ~SimpleCostModel() override {} 34 35 int64 GetParallelTaskCount(HloInstruction* instruction) override { 36 // Simple cost model based on hlo size and typical L2 cache size. 37 const int64 instruction_cost = shape_size_(instruction->shape()); 38 const int64 min_cost_per_thread = 256LL << 10; // 256KB L2 Cache size. 39 // Return target parallel task count in [1, max_parallelism_]. 40 return std::min(max_parallelism_, 41 std::max(1LL, instruction_cost / min_cost_per_thread)); 42 } 43 44 private: 45 const int64 max_parallelism_; 46 const HloCostAnalysis::ShapeSizeFunction shape_size_; 47 }; 48 49 class DefaultCostModel : public ParallelCostModel { 50 public: 51 DefaultCostModel(const int64 max_parallelism, 52 const HloCostAnalysis::ShapeSizeFunction& shape_size, 53 std::unique_ptr<HloCostAnalysis> cost_analysis) 54 : max_parallelism_(max_parallelism), 55 shape_size_(shape_size), 56 cost_analysis_(std::move(cost_analysis)) {} 57 ~DefaultCostModel() override {} 58 59 int64 GetParallelTaskCount(HloInstruction* instruction) override { 60 // Parameters for parallel task count computation. 61 int64 instruction_cost; 62 int64 min_cost_per_thread; 63 int64 max_parallelism; 64 // Calculate flops-to-bytes-ratio for 'instruction'. 65 const int64 bytes_accessed = 66 std::max(1LL, cost_analysis_->bytes_accessed(*instruction)); 67 const float flops_to_bytes_ratio = 68 cost_analysis_->flop_count(*instruction) / 69 static_cast<float>(bytes_accessed); 70 // Check for I/O bound instructions. 71 if (flops_to_bytes_ratio <= 1.0) { 72 // Limit max parallelism for I/O bound instructions by assuming a 73 // sub-linear scaling function (fit based on empirical benchmark results). 74 // TODO(29630486) Develop system bandwidth model. 75 max_parallelism = 76 std::ceil(std::sqrt(tensorflow::port::NumSchedulableCPUs())); 77 // Use shape size instruction cost and L2 cache size min per-thread cost. 78 instruction_cost = shape_size_(instruction->shape()); 79 min_cost_per_thread = 256LL << 10; // 256KB L2 Cache size. 80 } else { 81 // Use max parallelism for compute bound instructions. 82 max_parallelism = max_parallelism_; 83 // Calculate the instruction cost in cycles. 84 // TODO(29630486) Improve on this linear cost model. 85 // Consider making 'min_cost_per_thread' be a function of the target 86 // bandwidth limit for instructions with low arithmetic complexity. 87 instruction_cost = 88 1 * cost_analysis_->flop_count(*instruction) + 89 2 * cost_analysis_->transcendental_count(*instruction) + 90 10 * cost_analysis_->bytes_accessed(*instruction); 91 // Minimum per-thread cost is 100us of work on a 2GHz core. 92 min_cost_per_thread = 100000; 93 } 94 // Return target parallel task count in [1, max_parallelism_]. 95 return std::min(max_parallelism, 96 std::max(1LL, instruction_cost / min_cost_per_thread)); 97 } 98 99 private: 100 const int64 max_parallelism_; 101 const HloCostAnalysis::ShapeSizeFunction shape_size_; 102 const std::unique_ptr<HloCostAnalysis> cost_analysis_; 103 }; 104 105 ParallelTaskAssignment::ParallelTaskAssignment( 106 const int64 max_parallelism, 107 const HloCostAnalysis::ShapeSizeFunction& shape_size, HloModule* module) { 108 VLOG(1) << "ParallelTaskAssignment max_parallelism: " << max_parallelism; 109 // Run cost analysis on 'module'. 110 auto cost_analysis = MakeUnique<HloCostAnalysis>(shape_size); 111 HloComputation* computation = module->entry_computation(); 112 Status status = computation->root_instruction()->Accept(cost_analysis.get()); 113 if (status.ok()) { 114 // Set default cost model based on 'cost_analysis'. 115 cost_model_.reset(new DefaultCostModel(max_parallelism, shape_size, 116 std::move(cost_analysis))); 117 } else { 118 // Fall back to a simple cost model based on hlo size and L2 cache size. 119 // Note that HloCostAnalysis can returns an error status (likely because 120 // HLOs like CustomCall are not yet implemented in the HloCostAnalysis). 121 cost_model_.reset(new SimpleCostModel(max_parallelism, shape_size)); 122 } 123 } 124 125 int64 ParallelTaskAssignment::GetTargetParallelTaskCount( 126 HloInstruction* instruction) { 127 // Currently, we do not assign parallel tasks to instructions with at least 128 // one of the following properties: 129 // *) Internal threading (library calls to kConv, kDot, kFft, kCustomCall). 130 // *) Emit custom loops (kSelectAndScatter, FusionKind::kTransposeDot). 131 // *) Tuple-shaped. 132 // TODO(b/27458679) Parallelize instructions which are skipped here. 133 if (instruction->opcode() == HloOpcode::kParameter || 134 instruction->opcode() == HloOpcode::kConstant || 135 instruction->opcode() == HloOpcode::kCall || 136 instruction->opcode() == HloOpcode::kCustomCall || 137 instruction->opcode() == HloOpcode::kSelectAndScatter || 138 instruction->opcode() == HloOpcode::kGetTupleElement || 139 instruction->opcode() == HloOpcode::kBitcast || 140 instruction->opcode() == HloOpcode::kFft || 141 (instruction->opcode() == HloOpcode::kConvolution && 142 PotentiallyImplementedAsEigenConvolution(*instruction)) || 143 PotentiallyImplementedAsEigenDot(*instruction) || 144 (instruction->opcode() == HloOpcode::kFusion && 145 instruction->fusion_kind() != HloInstruction::FusionKind::kLoop) || 146 ShapeUtil::IsTuple(instruction->shape())) { 147 return 1; 148 } 149 // Consult 'cost_model_' to compute target parallel task count. 150 return cost_model_->GetParallelTaskCount(instruction); 151 } 152 153 StatusOr<bool> ParallelTaskAssigner::Run(HloModule* module) { 154 XLA_VLOG_LINES(2, "ParallelTaskAssigner ENTRY"); 155 XLA_VLOG_LINES(3, module->ToString()); 156 // Compute target parallel task counts for all instructions in 'module'. 157 HloToParallelTasks hlo_to_parallel_tasks; 158 ComputeTargetParallelTasks(module, &hlo_to_parallel_tasks); 159 160 // Assign parallel tasks to target specific instructions in 'module'. 161 // TODO(b/27458679) Support inter-op parallelism. 162 bool changed = AssignParallelTasks(module, hlo_to_parallel_tasks); 163 164 XLA_VLOG_LINES(2, "ParallelTaskAssigner EXIT"); 165 XLA_VLOG_LINES(3, module->ToString()); 166 return changed; 167 } 168 169 bool ParallelTaskAssigner::AssignParallelTasks( 170 HloModule* module, const HloToParallelTasks& hlo_to_parallel_tasks) { 171 return AssignParallelTasksHelper(module, module->entry_computation(), 172 hlo_to_parallel_tasks); 173 } 174 175 bool ParallelTaskAssigner::AssignParallelTasksHelper( 176 HloModule* module, HloComputation* computation, 177 const HloToParallelTasks& hlo_to_parallel_tasks) { 178 bool changed = false; 179 // Snapshot set of instructions because outlining modifies the set below. 180 std::vector<HloInstruction*> instructions(computation->instructions().begin(), 181 computation->instructions().end()); 182 for (auto* instruction : instructions) { 183 // Assign parallel tasks to sub-computations for While and Call HLOs. 184 // TODO(b/27458679) Evaluate alternative intra-op parallelsim placement, 185 // and support other callable computations like reduce. 186 if (instruction->opcode() == HloOpcode::kWhile) { 187 changed |= AssignParallelTasksHelper(module, instruction->while_body(), 188 hlo_to_parallel_tasks); 189 continue; 190 } else if (instruction->opcode() == HloOpcode::kCall) { 191 changed |= AssignParallelTasksHelper(module, instruction->to_apply(), 192 hlo_to_parallel_tasks); 193 continue; 194 } 195 // Skip if no parallel tasks were computed in first pass. 196 auto it = hlo_to_parallel_tasks.find(instruction); 197 if (it == hlo_to_parallel_tasks.end()) { 198 continue; 199 } 200 // Get target parallel task count computed for 'instruction'. 201 const int64 target_parallel_task_count = (*it).second; 202 // Assign feasible dimension partitions (based on actual dimension sizes). 203 auto dim_partition_counts = ShapePartitionAssigner(instruction->shape()) 204 .Run(target_parallel_task_count); 205 const int64 total_partition_count = 206 ShapePartitionAssigner::GetTotalPartitionCount(dim_partition_counts); 207 if (total_partition_count <= 1) { 208 // Feasible partition calculation resulting in no partitioning, so skip. 209 continue; 210 } 211 212 // Outline 'instruction' in 'computation' for parallel task assignment. 213 auto* call = module->OutlineExpressionFromComputation( 214 {instruction}, 215 tensorflow::strings::StrCat("parallel_", instruction->name()), 216 computation); 217 218 // Set assigned dimension partitioning to 'instruction'. 219 auto* new_root = call->to_apply()->root_instruction(); 220 new_root->set_outer_dimension_partitions(dim_partition_counts); 221 222 VLOG(2) << "Assigned parallel task count: " << total_partition_count 223 << " to instruction: " << new_root->name() 224 << " parent: " << new_root->parent()->name(); 225 changed = true; 226 } 227 return changed; 228 } 229 230 void ParallelTaskAssigner::ComputeTargetParallelTasks( 231 HloModule* module, HloToParallelTasks* hlo_to_parallel_tasks) { 232 ParallelTaskAssignment parallel_task_assignment(max_parallelism_, 233 shape_size_function_, module); 234 235 // Compute parallel task counts for all instructions in 'module'. 236 for (auto* computation : module->computations()) { 237 if (computation->IsFusionComputation()) { 238 continue; 239 } 240 for (auto* instruction : computation->instructions()) { 241 // Query ParallelTaskAssignment for target parallel task count. 242 const int64 target_parallel_task_count = 243 parallel_task_assignment.GetTargetParallelTaskCount(instruction); 244 if (target_parallel_task_count > 1) { 245 hlo_to_parallel_tasks->insert( 246 {instruction, target_parallel_task_count}); 247 } 248 } 249 } 250 } 251 252 } // namespace cpu 253 } // namespace xla 254