Home | History | Annotate | Download | only in kernels
      1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #ifndef TENSORFLOW_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_
     17 #define TENSORFLOW_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_
     18 
     19 #include "tensorflow/core/kernels/typed_conditional_accumulator_base.h"
     20 
     21 namespace tensorflow {
     22 
     23 /**
     24  * An aggregation object for adding sparse gradients, represented as a tuple of
     25  * indices, values, and a (possibly empty) shape.
     26  *
     27  * The two main methods of this class are TryApplyGrad and TryTakeGrad.
     28  *
     29  * TryApplyGrad tries add a gradient to the accumulator. The attempt is
     30  * successful if local_step >= global_step, i.e., if the gradient is not stale,
     31  * having been computed using up-to-date information. Otherwise, the gradient is
     32  * silently dropped.
     33  *
     34  * TryTakeGrad logs an attempt to read the average gradient. The attempt is
     35  * blocked until the number of gradients accumulated (via TryApplyGrad) is equal
     36  * or exceeds the number requested by TryTakeGrad.
     37  * Once this condition is satisfied, the following actions are taken:
     38  * (1) the value of the average gradient is returned
     39  * (2) the count of accumulated gradients is reset to 0
     40  * (3) the internal global_step value (current_global_step_) is incremented by 1
     41  *
     42  * SparseConditionalAccumulator is the datatype-dependent templated sub-class of
     43  * ConditionalAccumulatorBase. It implements the virtual arithmetic methods that
     44  * are used by for aggregating, averaging, allocating, returning indexed slices.
     45  */
     46 template <typename Device, typename T>
     47 class SparseConditionalAccumulator
     48     : public TypedConditionalAccumulatorBase<
     49           std::tuple<const Tensor*, const Tensor*, const Tensor*>> {
     50  public:
     51   SparseConditionalAccumulator(const DataType& dtype,
     52                                const PartialTensorShape& shape,
     53                                const string& name)
     54       : TypedConditionalAccumulatorBase<
     55             std::tuple<const Tensor*, const Tensor*, const Tensor*>>(
     56             dtype, shape, name) {
     57     accum_idx_vec_ = nullptr;
     58     count_element_ = nullptr;
     59     accum_val_ = nullptr;
     60     accum_val_persistent_ = new PersistentTensor();
     61   }
     62 
     63   ~SparseConditionalAccumulator() override {
     64     if (accum_idx_vec_ != nullptr) delete accum_idx_vec_;
     65     if (count_element_ != nullptr) delete count_element_;
     66     if (accum_val_persistent_ != nullptr) delete accum_val_persistent_;
     67     // Do not delete accum_val_! Will be automatically garbage collected
     68   };
     69 
     70  protected:
     71   std::vector<int64>* accum_idx_vec_ = nullptr;
     72   std::vector<int>* count_element_ = nullptr;
     73 
     74   Tensor* accum_val_ = nullptr;
     75   PersistentTensor* accum_val_persistent_ = nullptr;
     76 
     77   typedef Eigen::TensorMap<Eigen::Tensor<T, 1, Eigen::RowMajor>,
     78                            Eigen::Unaligned>
     79       SliceT;
     80   typedef Eigen::TensorMap<Eigen::Tensor<const T, 1, Eigen::RowMajor>,
     81                            Eigen::Unaligned>
     82       SliceConstT;
     83 
     84   Status ValidateShape(
     85       std::tuple<const Tensor*, const Tensor*, const Tensor*>* tensor,
     86       bool has_known_shape) EXCLUSIVE_LOCKS_REQUIRED(this->mu_) {
     87     const Tensor* tensor_idx = std::get<0>(*tensor);
     88     const Tensor* tensor_val = std::get<1>(*tensor);
     89     const Tensor* tensor_shape = std::get<2>(*tensor);
     90     int64 grad_val_dims = tensor_val->dims();
     91     int64 grad_dims = grad_val_dims;
     92 
     93     // Compare with provided shape
     94     if (has_known_shape) {
     95       if (shape_.dims() > tensor_shape->NumElements()) {
     96         return errors::InvalidArgument(
     97             "Shape mismatch: expected shape rank at least ", shape_.dims(),
     98             ", got ", tensor_shape->NumElements());
     99       }
    100       const auto tensor_shape_flat = tensor_shape->flat<int64>();
    101       for (int64 i = 0; i < shape_.dims(); i++) {
    102         if (shape_.dim_size(i) != -1 &&
    103             shape_.dim_size(i) != tensor_shape_flat(i)) {
    104           return errors::InvalidArgument("Shape mismatch: expected shape dim ",
    105                                          i, " to be ", shape_.dim_size(i),
    106                                          ", got ", tensor_shape_flat(i));
    107         }
    108       }
    109     }
    110     // Check that indices are within limits
    111     if (shape_.dims() > 0 && shape_.dim_size(0) != -1 &&
    112         tensor_idx->dims() > 0) {
    113       for (int64 i = 0; i < tensor_idx->dim_size(0); i++) {
    114         if (tensor_idx->vec<int64>()(i) >= shape_.dim_size(0)) {
    115           return errors::InvalidArgument(
    116               "Shape mismatch: index of slice ", i, " exceeded limits of shape",
    117               "; index is ", tensor_idx->vec<int64>()(i), " exceeded ",
    118               shape_.dim_size(0));
    119         }
    120       }
    121     }
    122 
    123     // Check values compatibility with accumulated gradient if available
    124     if (counter_ > 0) {
    125       int64 accum_val_dims = accum_val_->dims();
    126       if (accum_val_dims != grad_val_dims) {
    127         return errors::InvalidArgument("Shape mismatch: expected values rank ",
    128                                        accum_val_dims, ", got ", grad_val_dims);
    129       }
    130       for (int64 i = 1; i < accum_val_dims; i++) {
    131         if (accum_val_->dim_size(i) != tensor_val->dim_size(i)) {
    132           return errors::InvalidArgument("Shape mismatch: expected values dim ",
    133                                          i, " to be ", accum_val_->dim_size(i),
    134                                          ", got ", tensor_val->dim_size(i));
    135         }
    136       }
    137     } else {
    138       // If there are no accumulated gradients, check against shape_
    139       if (shape_.dims() > grad_dims) {
    140         return errors::InvalidArgument(
    141             "Shape mismatch: expected values rank at least ", shape_.dims(),
    142             ", got ", grad_dims);
    143       }
    144       // Check that values have correct dimensions
    145       for (int64 i = 1; i < shape_.dims(); i++) {
    146         if (shape_.dim_size(i) != -1 &&
    147             shape_.dim_size(i) != tensor_val->dim_size(i)) {
    148           return errors::InvalidArgument("Shape mismatch: expected values dim ",
    149                                          i, " to be ", shape_.dim_size(i),
    150                                          ", got ", tensor_val->dim_size(i));
    151         }
    152       }
    153     }
    154 
    155     return Status::OK();
    156   }
    157 
    158   void AllocateAndAssignToAccumGradFunction(
    159       OpKernelContext* ctx,
    160       std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override {
    161     const Tensor* grad_idx = std::get<0>(*grad);
    162     const Tensor* grad_val = std::get<1>(*grad);
    163 
    164     const int64 nnz = grad_idx->dim_size(0);
    165 
    166     // Assign indices
    167     if (accum_idx_vec_ != nullptr) delete accum_idx_vec_;
    168     accum_idx_vec_ = new std::vector<int64>();
    169     accum_idx_vec_->reserve(nnz);
    170     for (int i = 0; i < nnz; i++) {
    171       accum_idx_vec_->push_back(grad_idx->vec<int64>()(i));
    172     }
    173 
    174     // Assign values to accum_val_tensor
    175     // TODO(b/32704451): Don't just ignore the ::tensorflow::Status object!
    176     ctx->allocate_persistent(dtype_, grad_val->shape(), accum_val_persistent_,
    177                              &accum_val_)
    178         .IgnoreError();
    179     accum_val_->flat<T>().device(ctx->template eigen_device<Device>()) =
    180         grad_val->flat<T>();
    181 
    182     // Assign count_element_
    183     if (count_element_ != nullptr) {
    184       delete count_element_;
    185     }
    186     count_element_ = new std::vector<int>(nnz, 1);
    187 
    188     // Do not need shape; Assume that the op has checked that the shapes match,
    189     // so grad's shape == shape_
    190   }
    191 
    192   void AddToAccumGradFunction(
    193       OpKernelContext* ctx,
    194       std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override {
    195     // Modeled after third_party/tensorflow/core/kernels/sparse_add_op
    196 
    197     const Tensor* grad_idx = std::get<0>(*grad);
    198     const Tensor* grad_val = std::get<1>(*grad);
    199 
    200     const int64 accum_nnz = accum_idx_vec_->size();
    201     const int64 grad_nnz = grad_idx->dim_size(0);
    202 
    203     // Source enumerates the origin of a non-zero element: whether it is from
    204     // the new gradient, the accumulated gradient, or the sum of both.
    205     enum Source { from_accum, from_grad, from_accum_and_grad };
    206 
    207     // (1) do a pass over inputs, and append values and indices to vectors
    208     std::vector<std::tuple<Source, int64, int64>> entries_to_copy;
    209     entries_to_copy.reserve(accum_nnz + grad_nnz);
    210 
    211     // Pass over all non-zero elements of both the gradient and the accumulated
    212     // value, to identify where each non-zero element of the sum comes from.
    213     // The input and output indexed slices are assumed to be ordered along
    214     // increasing dimension number.
    215     int64 i = 0, j = 0;
    216     int64 sum_nnz = 0;
    217     while (i < accum_nnz && j < grad_nnz) {
    218       sum_nnz++;
    219       switch (cmp(accum_idx_vec_, grad_idx, i, j)) {
    220         case -1:
    221           entries_to_copy.emplace_back(from_accum, i, -1);
    222           ++i;
    223           break;
    224         case 0:
    225           entries_to_copy.emplace_back(from_accum_and_grad, i, j);
    226           ++i;
    227           ++j;
    228           break;
    229         case 1:
    230           entries_to_copy.emplace_back(from_grad, -1, j);
    231           ++j;
    232           break;
    233       }
    234     }
    235 
    236     // Handle leftovers
    237     while (i < accum_nnz) {
    238       sum_nnz++;
    239       entries_to_copy.emplace_back(from_accum, i, -1);
    240       ++i;
    241     }
    242     while (j < grad_nnz) {
    243       sum_nnz++;
    244       entries_to_copy.emplace_back(from_grad, -1, j);
    245       ++j;
    246     }
    247 
    248     // (2) Copy or sum the non-zero elements into sum_indices and sum_tensor
    249     std::vector<int64>* sum_indices_vec = new std::vector<int64>();
    250     sum_indices_vec->reserve(sum_nnz);
    251 
    252     std::vector<int>* sum_counts = new std::vector<int>();
    253     sum_counts->reserve(sum_nnz);
    254 
    255     Tensor* sum_tensor = nullptr;
    256     PersistentTensor* tensor_sum_persistent = new PersistentTensor();
    257 
    258     TensorShape sum_shape = grad_val->shape();
    259     sum_shape.set_dim(0, sum_nnz);
    260 
    261     OP_REQUIRES_OK(
    262         ctx, ctx->allocate_persistent(dtype_, sum_shape, tensor_sum_persistent,
    263                                       &sum_tensor));
    264     auto sum_flat = sum_tensor->flat_outer_dims<T>();
    265     auto accum_flat = accum_val_->flat_outer_dims<T>();
    266     auto grad_flat = grad_val->flat_outer_dims<T>();
    267 
    268     const int64 num_col = grad_flat.dimension(1);
    269 
    270     Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(num_col);
    271 
    272     for (i = 0; i < sum_nnz; ++i) {
    273       const Source src = std::get<0>(entries_to_copy[i]);
    274       const int64 idx_a = std::get<1>(entries_to_copy[i]);
    275       const int64 idx_b = std::get<2>(entries_to_copy[i]);
    276       T* sum_slice_ptr = &sum_flat(i, 0);
    277       SliceT sum_slice(sum_slice_ptr, slice_shape);
    278       if (src == from_accum) {
    279         // Element comes from accumulator; directly copy data structures over
    280         sum_indices_vec->push_back(accum_idx_vec_->at(idx_a));
    281         T* accum_slice_ptr = &accum_flat(idx_a, 0);
    282         SliceT accum_slice(accum_slice_ptr, slice_shape);
    283         sum_slice = accum_slice;
    284         sum_counts->push_back(count_element_->at(idx_a));
    285       } else if (src == from_accum_and_grad) {
    286         // Element is a sum of accumulated value and new gradient;
    287         // compute sum here
    288         sum_indices_vec->push_back(accum_idx_vec_->at(idx_a));
    289         const T* grad_slice_ptr = &grad_flat(idx_b, 0);
    290         SliceConstT grad_slice(grad_slice_ptr, slice_shape);
    291         T* accum_slice_ptr = &accum_flat(idx_a, 0);
    292         SliceT accum_slice(accum_slice_ptr, slice_shape);
    293         sum_slice = grad_slice + accum_slice;
    294         sum_counts->push_back(count_element_->at(idx_a) + 1);
    295       } else if (src == from_grad) {
    296         // Element comes from new gradient; make a copy of indices and values
    297         sum_indices_vec->push_back(grad_idx->vec<int64>()(idx_b));
    298         const T* grad_slice_ptr = &grad_flat(idx_b, 0);
    299         SliceConstT grad_slice(grad_slice_ptr, slice_shape);
    300         sum_slice = grad_slice;
    301         sum_counts->push_back(1);
    302       }
    303     }
    304 
    305     // (3) Keep output, i.e., switch pointers to point to new data structures
    306     // representing the sum
    307     // Indices
    308     if (accum_idx_vec_ != nullptr) delete accum_idx_vec_;
    309     accum_idx_vec_ = sum_indices_vec;
    310     // Values
    311     accum_val_ = sum_tensor;
    312     delete accum_val_persistent_;
    313     accum_val_persistent_ = tensor_sum_persistent;
    314     // Counts
    315     if (count_element_ != nullptr) delete count_element_;
    316     count_element_ = sum_counts;
    317 
    318     // No need to copy shape, since shape remains the same after sum.
    319   }
    320 
    321   void DivideAccumGradByCounter(OpKernelContext* ctx) override
    322       EXCLUSIVE_LOCKS_REQUIRED(this->mu_) {
    323     const int64 nnz = count_element_->size();
    324     auto accum_flat = accum_val_->flat_outer_dims<T>();
    325     std::vector<T> count_typet;
    326     std::transform(count_element_->begin(), count_element_->end(),
    327                    std::back_inserter(count_typet),
    328                    TypeConverter<T, int>::ConvertUToT);
    329 
    330     // Option 1: divide all by counter
    331     /*
    332     std::transform(
    333         &accum_flat(0,0), &accum_flat(nnz,0), &accum_flat(0,0),
    334         std::bind2nd(std::divides<T>(),
    335                      TypeConverter<T, int>::ConvertUToT(this->counter_)));
    336     */
    337 
    338     // Option 2: average element-wise
    339     Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(accum_flat.dimension(1));
    340     for (int64 i = 0; i < nnz; i++) {
    341       T* accum_slice_ptr = &accum_flat(i, 0);
    342       SliceT accum_slice(accum_slice_ptr, slice_shape);
    343       accum_slice.device(ctx->template eigen_device<Device>()) =
    344           accum_slice / count_typet[i];
    345     }
    346   }
    347 
    348   bool SetOutput(OpKernelContext* ctx) override {
    349     bool is_successful = true;
    350     if (is_successful) is_successful = ReturnIdxTensor(ctx);
    351     if (is_successful) is_successful = ReturnValTensor(ctx);
    352     if (is_successful) is_successful = ReturnShapeTensor(ctx);
    353     return is_successful;
    354   }
    355 
    356   bool GetAndValidateTensorInputForApplyGrad(
    357       OpKernelContext* ctx,
    358       std::tuple<const Tensor*, const Tensor*, const Tensor*>** tensor) override
    359       EXCLUSIVE_LOCKS_REQUIRED(this->mu_) {
    360     // TODO(xinghao, jmchen): The roundabout way of getting attr from
    361     // OpKernelContext (instead of OpKernelConstruction) is a hack, and should
    362     // be fixed if it affects efficiency.
    363     bool has_known_shape = false;
    364     OP_REQUIRES_OK_BOOLEAN(
    365         ctx, GetNodeAttr(ctx->op_kernel().def(), "has_known_shape",
    366                          &has_known_shape));
    367 
    368     // Get input gradient tensors
    369     const Tensor* grad_idx_tensor;
    370     OP_REQUIRES_OK_BOOLEAN(ctx,
    371                            ctx->input("gradient_indices", &grad_idx_tensor));
    372     const Tensor* grad_val_tensor;
    373     OP_REQUIRES_OK_BOOLEAN(ctx,
    374                            ctx->input("gradient_values", &grad_val_tensor));
    375     const Tensor* grad_shape_tensor = nullptr;
    376     if (has_known_shape) {
    377       OP_REQUIRES_OK_BOOLEAN(ctx,
    378                              ctx->input("gradient_shape", &grad_shape_tensor));
    379     }
    380 
    381     // Checks
    382     OP_REQUIRES_BOOLEAN(
    383         ctx, TensorShapeUtils::IsVector(grad_idx_tensor->shape()),
    384         errors::InvalidArgument(
    385             "Input indices should be vector but received shape: ",
    386             grad_idx_tensor->shape().DebugString()));
    387     const int64 nnz = grad_idx_tensor->dim_size(0);
    388     OP_REQUIRES_BOOLEAN(
    389         ctx, grad_val_tensor->dims() > 0,
    390         errors::InvalidArgument("Values cannot be 0-dimensional."));
    391     OP_REQUIRES_BOOLEAN(ctx, grad_val_tensor->dim_size(0) == nnz,
    392                         errors::InvalidArgument("Expected ", nnz,
    393                                                 " non-empty input values, got ",
    394                                                 grad_val_tensor->dim_size(0)));
    395 
    396     *tensor = new std::tuple<const Tensor*, const Tensor*, const Tensor*>(
    397         grad_idx_tensor, grad_val_tensor, grad_shape_tensor);
    398 
    399     OP_REQUIRES_OK_BOOLEAN(ctx, this->ValidateShape(*tensor, has_known_shape));
    400 
    401     return true;
    402   }
    403 
    404   void CleanUpGradTensor(std::tuple<const Tensor*, const Tensor*,
    405                                     const Tensor*>* tensor) override {
    406     if (tensor != nullptr) delete tensor;
    407   }
    408 
    409  private:
    410   inline int cmp(std::vector<int64>* a_idx, const Tensor* b_idx,
    411                  const int64 a_row, const int64 b_row) {
    412     const int64 a = a_idx->at(a_row);
    413     const int64 b = b_idx->vec<int64>()(b_row);
    414     if (a < b) {
    415       return -1;
    416     } else if (a > b) {
    417       return 1;
    418     }
    419     return 0;
    420   }
    421 
    422   inline bool ReturnIdxTensor(OpKernelContext* ctx) {
    423     Tensor* idx_tensor;
    424     const int64 nnz = accum_idx_vec_->size();
    425     OP_REQUIRES_OK_BOOLEAN(ctx, ctx->allocate_output(0, {nnz}, &idx_tensor));
    426     // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit
    427     // the remaining code and just return false
    428     auto idx_tensor_vec = idx_tensor->vec<int64>();
    429     for (int i = 0; i < nnz; ++i) {
    430       idx_tensor_vec(i) = accum_idx_vec_->at(i);
    431     }
    432     return true;
    433   }
    434 
    435   inline bool ReturnValTensor(OpKernelContext* ctx) {
    436     ctx->set_output(1, *accum_val_);
    437     return true;
    438   }
    439 
    440   inline bool ReturnShapeTensor(OpKernelContext* ctx) {
    441     int64 accum_val_dims = accum_val_->dims();
    442     Tensor* shape_tensor;
    443     OP_REQUIRES_OK_BOOLEAN(
    444         ctx, ctx->allocate_output(2, {accum_val_dims}, &shape_tensor));
    445     // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit
    446     // the remaining code and just return false
    447 
    448     // First dim of shape is defined by shape_, others by accum_val_->shape
    449     shape_tensor->flat<int64>()(0) =
    450         (shape_.dims() > 0) ? shape_.dim_size(0) : -1;
    451     for (int64 i = 1; i < accum_val_dims; i++) {
    452       shape_tensor->flat<int64>()(i) = accum_val_->dim_size(i);
    453     }
    454     return true;
    455   }
    456 
    457   TF_DISALLOW_COPY_AND_ASSIGN(SparseConditionalAccumulator);
    458 };
    459 
    460 }  // namespace tensorflow
    461 
    462 #endif  // TENSORFLOW_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_
    463