Home | History | Annotate | Download | only in cpu
      1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/compiler/xla/service/cpu/parallel_task_assignment.h"
     17 
     18 #include "tensorflow/compiler/xla/service/cpu/dot_op_emitter.h"
     19 #include "tensorflow/compiler/xla/service/cpu/ir_emission_utils.h"
     20 #include "tensorflow/compiler/xla/service/cpu/shape_partition.h"
     21 #include "tensorflow/compiler/xla/service/hlo_computation.h"
     22 #include "tensorflow/compiler/xla/service/hlo_instruction.h"
     23 #include "tensorflow/compiler/xla/service/hlo_opcode.h"
     24 
     25 namespace xla {
     26 namespace cpu {
     27 
     28 class SimpleCostModel : public ParallelCostModel {
     29  public:
     30   SimpleCostModel(const int64 max_parallelism,
     31                   const HloCostAnalysis::ShapeSizeFunction& shape_size)
     32       : max_parallelism_(max_parallelism), shape_size_(shape_size) {}
     33   ~SimpleCostModel() override {}
     34 
     35   int64 GetParallelTaskCount(HloInstruction* instruction) override {
     36     // Simple cost model based on hlo size and typical L2 cache size.
     37     const int64 instruction_cost = shape_size_(instruction->shape());
     38     const int64 min_cost_per_thread = 256LL << 10;  // 256KB L2 Cache size.
     39     // Return target parallel task count in [1, max_parallelism_].
     40     return std::min(max_parallelism_,
     41                     std::max(1LL, instruction_cost / min_cost_per_thread));
     42   }
     43 
     44  private:
     45   const int64 max_parallelism_;
     46   const HloCostAnalysis::ShapeSizeFunction shape_size_;
     47 };
     48 
     49 class DefaultCostModel : public ParallelCostModel {
     50  public:
     51   DefaultCostModel(const int64 max_parallelism,
     52                    const HloCostAnalysis::ShapeSizeFunction& shape_size,
     53                    std::unique_ptr<HloCostAnalysis> cost_analysis)
     54       : max_parallelism_(max_parallelism),
     55         shape_size_(shape_size),
     56         cost_analysis_(std::move(cost_analysis)) {}
     57   ~DefaultCostModel() override {}
     58 
     59   int64 GetParallelTaskCount(HloInstruction* instruction) override {
     60     // Parameters for parallel task count computation.
     61     int64 instruction_cost;
     62     int64 min_cost_per_thread;
     63     int64 max_parallelism;
     64     // Calculate flops-to-bytes-ratio for 'instruction'.
     65     const int64 bytes_accessed =
     66         std::max(1LL, cost_analysis_->bytes_accessed(*instruction));
     67     const float flops_to_bytes_ratio =
     68         cost_analysis_->flop_count(*instruction) /
     69         static_cast<float>(bytes_accessed);
     70     // Check for I/O bound instructions.
     71     if (flops_to_bytes_ratio <= 1.0) {
     72       // Limit max parallelism for I/O bound instructions by assuming a
     73       // sub-linear scaling function (fit based on empirical benchmark results).
     74       // TODO(29630486) Develop system bandwidth model.
     75       max_parallelism =
     76           std::ceil(std::sqrt(tensorflow::port::NumSchedulableCPUs()));
     77       // Use shape size instruction cost and L2 cache size min per-thread cost.
     78       instruction_cost = shape_size_(instruction->shape());
     79       min_cost_per_thread = 256LL << 10;  // 256KB L2 Cache size.
     80     } else {
     81       // Use max parallelism for compute bound instructions.
     82       max_parallelism = max_parallelism_;
     83       // Calculate the instruction cost in cycles.
     84       // TODO(29630486) Improve on this linear cost model.
     85       // Consider making 'min_cost_per_thread' be a function of the target
     86       // bandwidth limit for instructions with low arithmetic complexity.
     87       instruction_cost =
     88           1 * cost_analysis_->flop_count(*instruction) +
     89           2 * cost_analysis_->transcendental_count(*instruction) +
     90           10 * cost_analysis_->bytes_accessed(*instruction);
     91       // Minimum per-thread cost is 100us of work on a 2GHz core.
     92       min_cost_per_thread = 100000;
     93     }
     94     // Return target parallel task count in [1, max_parallelism_].
     95     return std::min(max_parallelism,
     96                     std::max(1LL, instruction_cost / min_cost_per_thread));
     97   }
     98 
     99  private:
    100   const int64 max_parallelism_;
    101   const HloCostAnalysis::ShapeSizeFunction shape_size_;
    102   const std::unique_ptr<HloCostAnalysis> cost_analysis_;
    103 };
    104 
    105 ParallelTaskAssignment::ParallelTaskAssignment(
    106     const int64 max_parallelism,
    107     const HloCostAnalysis::ShapeSizeFunction& shape_size, HloModule* module) {
    108   VLOG(1) << "ParallelTaskAssignment max_parallelism: " << max_parallelism;
    109   // Run cost analysis on 'module'.
    110   auto cost_analysis = MakeUnique<HloCostAnalysis>(shape_size);
    111   HloComputation* computation = module->entry_computation();
    112   Status status = computation->root_instruction()->Accept(cost_analysis.get());
    113   if (status.ok()) {
    114     // Set default cost model based on 'cost_analysis'.
    115     cost_model_.reset(new DefaultCostModel(max_parallelism, shape_size,
    116                                            std::move(cost_analysis)));
    117   } else {
    118     // Fall back to a simple cost model based on hlo size and L2 cache size.
    119     // Note that HloCostAnalysis can returns an error status (likely because
    120     // HLOs like CustomCall are not yet implemented in the HloCostAnalysis).
    121     cost_model_.reset(new SimpleCostModel(max_parallelism, shape_size));
    122   }
    123 }
    124 
    125 int64 ParallelTaskAssignment::GetTargetParallelTaskCount(
    126     HloInstruction* instruction) {
    127   // Currently, we do not assign parallel tasks to instructions with at least
    128   // one of the following properties:
    129   // *) Internal threading (library calls to kConv, kDot, kFft, kCustomCall).
    130   // *) Emit custom loops (kSelectAndScatter, FusionKind::kTransposeDot).
    131   // *) Tuple-shaped.
    132   // TODO(b/27458679) Parallelize instructions which are skipped here.
    133   if (instruction->opcode() == HloOpcode::kParameter ||
    134       instruction->opcode() == HloOpcode::kConstant ||
    135       instruction->opcode() == HloOpcode::kCall ||
    136       instruction->opcode() == HloOpcode::kCustomCall ||
    137       instruction->opcode() == HloOpcode::kSelectAndScatter ||
    138       instruction->opcode() == HloOpcode::kGetTupleElement ||
    139       instruction->opcode() == HloOpcode::kBitcast ||
    140       instruction->opcode() == HloOpcode::kFft ||
    141       (instruction->opcode() == HloOpcode::kConvolution &&
    142        PotentiallyImplementedAsEigenConvolution(*instruction)) ||
    143       PotentiallyImplementedAsEigenDot(*instruction) ||
    144       (instruction->opcode() == HloOpcode::kFusion &&
    145        instruction->fusion_kind() != HloInstruction::FusionKind::kLoop) ||
    146       ShapeUtil::IsTuple(instruction->shape())) {
    147     return 1;
    148   }
    149   // Consult 'cost_model_' to compute target parallel task count.
    150   return cost_model_->GetParallelTaskCount(instruction);
    151 }
    152 
    153 StatusOr<bool> ParallelTaskAssigner::Run(HloModule* module) {
    154   XLA_VLOG_LINES(2, "ParallelTaskAssigner ENTRY");
    155   XLA_VLOG_LINES(3, module->ToString());
    156   // Compute target parallel task counts for all instructions in 'module'.
    157   HloToParallelTasks hlo_to_parallel_tasks;
    158   ComputeTargetParallelTasks(module, &hlo_to_parallel_tasks);
    159 
    160   // Assign parallel tasks to target specific instructions in 'module'.
    161   // TODO(b/27458679) Support inter-op parallelism.
    162   bool changed = AssignParallelTasks(module, hlo_to_parallel_tasks);
    163 
    164   XLA_VLOG_LINES(2, "ParallelTaskAssigner EXIT");
    165   XLA_VLOG_LINES(3, module->ToString());
    166   return changed;
    167 }
    168 
    169 bool ParallelTaskAssigner::AssignParallelTasks(
    170     HloModule* module, const HloToParallelTasks& hlo_to_parallel_tasks) {
    171   return AssignParallelTasksHelper(module, module->entry_computation(),
    172                                    hlo_to_parallel_tasks);
    173 }
    174 
    175 bool ParallelTaskAssigner::AssignParallelTasksHelper(
    176     HloModule* module, HloComputation* computation,
    177     const HloToParallelTasks& hlo_to_parallel_tasks) {
    178   bool changed = false;
    179   // Snapshot set of instructions because outlining modifies the set below.
    180   std::vector<HloInstruction*> instructions(computation->instructions().begin(),
    181                                             computation->instructions().end());
    182   for (auto* instruction : instructions) {
    183     // Assign parallel tasks to sub-computations for While and Call HLOs.
    184     // TODO(b/27458679) Evaluate alternative intra-op parallelsim placement,
    185     // and support other callable computations like reduce.
    186     if (instruction->opcode() == HloOpcode::kWhile) {
    187       changed |= AssignParallelTasksHelper(module, instruction->while_body(),
    188                                            hlo_to_parallel_tasks);
    189       continue;
    190     } else if (instruction->opcode() == HloOpcode::kCall) {
    191       changed |= AssignParallelTasksHelper(module, instruction->to_apply(),
    192                                            hlo_to_parallel_tasks);
    193       continue;
    194     }
    195     // Skip if no parallel tasks were computed in first pass.
    196     auto it = hlo_to_parallel_tasks.find(instruction);
    197     if (it == hlo_to_parallel_tasks.end()) {
    198       continue;
    199     }
    200     // Get target parallel task count computed for 'instruction'.
    201     const int64 target_parallel_task_count = (*it).second;
    202     // Assign feasible dimension partitions (based on actual dimension sizes).
    203     auto dim_partition_counts = ShapePartitionAssigner(instruction->shape())
    204                                     .Run(target_parallel_task_count);
    205     const int64 total_partition_count =
    206         ShapePartitionAssigner::GetTotalPartitionCount(dim_partition_counts);
    207     if (total_partition_count <= 1) {
    208       // Feasible partition calculation resulting in no partitioning, so skip.
    209       continue;
    210     }
    211 
    212     // Outline 'instruction' in 'computation' for parallel task assignment.
    213     auto* call = module->OutlineExpressionFromComputation(
    214         {instruction},
    215         tensorflow::strings::StrCat("parallel_", instruction->name()),
    216         computation);
    217 
    218     // Set assigned dimension partitioning to 'instruction'.
    219     auto* new_root = call->to_apply()->root_instruction();
    220     new_root->set_outer_dimension_partitions(dim_partition_counts);
    221 
    222     VLOG(2) << "Assigned parallel task count: " << total_partition_count
    223             << " to instruction: " << new_root->name()
    224             << " parent: " << new_root->parent()->name();
    225     changed = true;
    226   }
    227   return changed;
    228 }
    229 
    230 void ParallelTaskAssigner::ComputeTargetParallelTasks(
    231     HloModule* module, HloToParallelTasks* hlo_to_parallel_tasks) {
    232   ParallelTaskAssignment parallel_task_assignment(max_parallelism_,
    233                                                   shape_size_function_, module);
    234 
    235   // Compute parallel task counts for all instructions in 'module'.
    236   for (auto* computation : module->computations()) {
    237     if (computation->IsFusionComputation()) {
    238       continue;
    239     }
    240     for (auto* instruction : computation->instructions()) {
    241       // Query ParallelTaskAssignment for target parallel task count.
    242       const int64 target_parallel_task_count =
    243           parallel_task_assignment.GetTargetParallelTaskCount(instruction);
    244       if (target_parallel_task_count > 1) {
    245         hlo_to_parallel_tasks->insert(
    246             {instruction, target_parallel_task_count});
    247       }
    248     }
    249   }
    250 }
    251 
    252 }  // namespace cpu
    253 }  // namespace xla
    254