Home | History | Annotate | Download | only in cpu
      1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/compiler/xla/service/cpu/runtime_fork_join.h"
     17 
     18 #define EIGEN_USE_THREADS
     19 
     20 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
     21 #include "tensorflow/compiler/xla/executable_run_options.h"
     22 #include "tensorflow/core/lib/core/blocking_counter.h"
     23 #include "tensorflow/core/platform/logging.h"
     24 #include "tensorflow/core/platform/types.h"
     25 
     26 using tensorflow::int32;
     27 using tensorflow::int64;
     28 using tensorflow::uint64;
     29 
     30 using ComputeFunctionType = void (*)(void*, const void*, const void**, void**,
     31                                      int64*, uint64*);
     32 
     33 // Dispatches 'num_partitions - 1' calls to 'function_ptr' in parallel.
     34 // Calls 'function_ptr' for first partition inline.
     35 // Uses blocking counter to synchonize threads after parallel calls complete.
     36 //
     37 // The 'partitions' array has a total number of elements equal to
     38 // 'num_partitions * num_partitioned_dims * 2' (the '2' is necessary to specify
     39 // dimension start and limit indices).
     40 //
     41 // The 'partitions' array layout stores array elements in memory with dimension
     42 // start limit as the most-minor dimension, followed by dimension, then
     43 // partition.
     44 //
     45 // EX: Layout of 'partitions' array with 'num_partitions = 2', and
     46 //     'num_partitioned_dims = 3'
     47 //
     48 //   [partition0_dim0_start]
     49 //   [partition0_dim0_limit]
     50 //   [partition0_dim1_start]
     51 //   [partition0_dim1_limit]
     52 //   [partition0_dim2_start]
     53 //   [partition0_dim2_limit]
     54 //   [partition1_dim0_start]
     55 //   [partition1_dim0_limit]
     56 //   [partition1_dim1_start]
     57 //   [partition1_dim1_limit]
     58 //   [partition1_dim2_start]
     59 //   [partition1_dim2_limit]
     60 //
     61 void __xla_cpu_runtime_ParallelForkJoin(
     62     void* result_ptr, const void* run_options_ptr, const void** params,
     63     void** temps, uint64* prof_counters, int32 num_partitions,
     64     int64* partitions, int32 num_partitioned_dims, void* function_ptr) {
     65   VLOG(2) << "ParallelForkJoin ENTRY"
     66           << " num_partitions: " << num_partitions
     67           << " num_partitioned_dims: " << num_partitioned_dims;
     68   CHECK_GT(num_partitions, 1);
     69   CHECK_GT(num_partitioned_dims, 0);
     70   const xla::ExecutableRunOptions* run_options =
     71       static_cast<const xla::ExecutableRunOptions*>(run_options_ptr);
     72   ComputeFunctionType function =
     73       reinterpret_cast<ComputeFunctionType>(function_ptr);
     74   // Compute partition stride in 'partitions' array.
     75   const int64 stride = 2 * num_partitioned_dims;
     76 
     77   // Dispatch 'num_partitions - 1' compute functions to run in parallel.
     78   tensorflow::BlockingCounter bc(num_partitions - 1);
     79   for (int32 i = 1; i < num_partitions; ++i) {
     80     const int64 offset = i * stride;
     81     run_options->intra_op_thread_pool()->enqueueNoNotification(
     82         [i, function, result_ptr, run_options_ptr, params, temps, prof_counters,
     83          partitions, offset, &bc]() {
     84           function(result_ptr, run_options_ptr, params, temps,
     85                    &partitions[offset], prof_counters);
     86           bc.DecrementCount();
     87           VLOG(3) << "ParallelForkJoin partition " << i << " done.";
     88         });
     89   }
     90 
     91   // Call first compute function inline.
     92   function(result_ptr, run_options_ptr, params, temps, &partitions[0],
     93            prof_counters);
     94   VLOG(3) << "ParallelForkJoin partition 0 done.";
     95   bc.Wait();
     96   VLOG(2) << "ParallelForkJoin EXIT";
     97 }
     98