Home | History | Annotate | Download | only in mpi_collectives
      1 # Copyright 2017 The TensorFlow Authors. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 # ==============================================================================
     15 # pylint: disable=g-short-docstring-punctuation
     16 """## Communicating Between Processes with MPI
     17 
     18 TensorFlow natively provides inter-device communication through send and
     19 receive ops and inter-node communication through Distributed TensorFlow, based
     20 on the same send and receive abstractions. On HPC clusters where Infiniband or
     21 other high-speed node interconnects are available, these can end up being
     22 insufficient for synchronous data-parallel training (without asynchronous
     23 gradient descent). This module implements a variety of MPI ops which can take
     24 advantage of hardware-specific MPI libraries for efficient communication.
     25 
     26 In order to use this module, TensorFlow must be built with an MPI library,
     27 which can be provided to the `./configure` script at build time. As a user of
     28 TensorFlow, you will need to build TensorFlow yourself to select the MPI
     29 library to use; to do so, follow the [instructions for building TensorFlow from
     30 source](https://www.tensorflow.org/get_started/os_setup#installing_from_sources).
     31 
     32 ### Utility Ops
     33 
     34 In addition to reductions and gathers, this module provides utility operations
     35 for detecting the running MPI configuration.
     36 
     37 Example:
     38 
     39 ```python
     40 import tensorflow.contrib.mpi_collectives as mpi
     41 
     42 # Use `mpi.Session` instead of `tf.Session`
     43 with mpi.Session() as session:
     44     rank = session.run(mpi.rank())
     45     print("My MPI Rank:", rank)
     46 
     47     if rank == 0:
     48         print("MPI Size:", session.run(mpi.size()))
     49 ```
     50 
     51 @@init
     52 @@size
     53 @@rank
     54 @@local_rank
     55 
     56 ### Ring Allreduce and Allgather
     57 
     58 When summing or averaging tensors across many processes, communication can
     59 easily become a bottleneck. A naive implementation will send all the tensor
     60 values to the same process, perform the reduction, and then broadcast the
     61 values back to all other processes, effectively creating a synchronous
     62 parameter server in one process. However, the process responsible for
     63 performing the reduction will have to receive and send a massive amount of data
     64 which scales with the number of processes *and* the number of parameters in the
     65 model.
     66 
     67 Instead of centralizing the reduction and having one primary reducer, we can
     68 implement a distributed allreduce or allgather. A bandwidth-optimal allreduce
     69 will end up sending 2(N - 1) values for every value in the input tensor,
     70 and can be implemented with a ring allreduce [1]. (Intuitively, a linear reduce
     71 requires at least (N - 1) sends between the different nodes, and a broadcast of
     72 the result also requires (N - 1) sends, for a total of 2 (N - 1); these two
     73 steps cannot be combined in a clever way to reduce the number of required
     74 sends.) This module implements bandwidth-optimal ring allreduce and ring
     75 allgather operations using MPI; by choosing a hardware-appropriate MPI
     76 implementation (such as OpenMPI with CUDA-IPC support), you can train large
     77 models with synchronous gradient descent with minimal communication overhead.
     78 
     79 In addition to the `allreduce` and `allgather` functions, a convenience
     80 `DistributedOptimizer` wrapper is provided to simplify using these functions
     81 for reducing model gradients.
     82 
     83 Example:
     84 
     85 ```python
     86 import tensorflow as tf
     87 from tensorflow.contrib import mpi_collectives as mpi
     88 
     89 # Construct a simple linear regression model to optimize
     90 W = tf.get_variable("W", shape=[20, 1], dtype=tf.float32)
     91 B = tf.get_variable("B", shape=[1, 1], dtype=tf.float32)
     92 inputs = tf.placeholder("Inputs", shape=[None, 20])
     93 outputs = tf.placeholder("Outputs", shape=[None, 1])
     94 loss = tf.nn.l2_loss(tf.matmul(inputs, W) + B - outputs)
     95 
     96 # Training using MPI allreduce with DistributedOptimizer
     97 optimizer = mpi.DistributedOptimizer(tf.train.AdamOptimizer())
     98 train = optimizer.minimize(loss)
     99 
    100 # Average loss over all ranks, for printing.
    101 # Do not pass this to an optimizer!
    102 avg_loss = mpi.allreduce(loss)
    103 
    104 # On different ranks, feed different input data.
    105 with mpi.Session() as session:
    106     rank = session.run(mpi.rank())
    107     batch_inputs, batch_outputs = construct_batch_for_rank(rank)
    108     feed_dict = {inputs: batch_inputs, outputs: batch_outputs}
    109     _, l = session.run([train, avg_loss], feed_dict=feed_dict)
    110     print("Average Loss:", l)
    111 ```
    112 
    113 [1] Patarasuk, Pitch and Yuan, Xin. "Bandwidth Optimal All-reduce Algorithms
    114 for Clusters of Workstations".
    115 
    116 @@Session
    117 @@DistributedOptimizer
    118 @@allreduce
    119 @@allgather
    120 """
    121 
    122 from __future__ import absolute_import
    123 from __future__ import division
    124 from __future__ import print_function
    125 
    126 import tensorflow as tf
    127 
    128 from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import init
    129 from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import size
    130 from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import rank
    131 from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import local_rank
    132 from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import allgather
    133 from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import _allreduce
    134 
    135 
    136 def allreduce(tensor, average=True):
    137   """Perform an MPI allreduce on a tf.Tensor or tf.IndexedSlices.
    138 
    139   Arguments:
    140   tensor: tf.Tensor, tf.Variable, or tf.IndexedSlices to reduce.
    141           The shape of the input must be identical across all ranks.
    142   average: If True, computes the average over all ranks.
    143            Otherwise, computes the sum over all ranks.
    144 
    145   This function performs a bandwidth-optimal ring allreduce on the input
    146   tensor. If the input is an tf.IndexedSlices, the function instead does an
    147   allgather on the values and the indices, effectively doing an allreduce on
    148   the represented tensor.
    149   """
    150   if isinstance(tensor, tf.IndexedSlices):
    151     # For IndexedSlices, do two allgathers intead of an allreduce.
    152     mpi_size = tf.cast(size(), tensor.values.dtype)
    153     values = allgather(tensor.values)
    154     indices = allgather(tensor.indices)
    155 
    156     # To make this operation into an average, divide all gathered values by
    157     # the MPI size.
    158     new_values = tf.div(values, mpi_size) if average else values
    159     return tf.IndexedSlices(new_values, indices,
    160                             dense_shape=tensor.dense_shape)
    161   else:
    162     mpi_size = tf.cast(size(), tensor.dtype)
    163     summed_tensor = _allreduce(tensor)
    164     new_tensor = (tf.div(summed_tensor, mpi_size)
    165                   if average else summed_tensor)
    166     return new_tensor
    167 
    168 
    169 class DistributedOptimizer(tf.train.Optimizer):
    170   """An optimizer that wraps another tf.Optimizer, using an MPI allreduce to
    171   average gradient values before applying gradients to model weights."""
    172 
    173   def __init__(self, optimizer, name=None, use_locking=False):
    174     """Construct a new DistributedOptimizer, which uses another optimizer
    175     under the hood for computing single-process gradient values and
    176     applying gradient updates after the gradient values have been averaged
    177     across all the MPI ranks.
    178 
    179     Args:
    180     optimizer: Optimizer to use for computing gradients and applying updates.
    181     name: Optional name prefix for the operations created when applying
    182           gradients. Defaults to "Distributed" followed by the provided
    183           optimizer type.
    184     use_locking: Whether to use locking when updating variables. See
    185                  Optimizer.__init__ for more info.
    186     """
    187     if name is None:
    188       name = "Distributed{}".format(type(optimizer).__name__)
    189 
    190     self._optimizer = optimizer
    191     super(DistributedOptimizer, self).__init__(
    192         name=name, use_locking=use_locking)
    193 
    194   def compute_gradients(self, *args, **kwargs):
    195     """Compute gradients of all trainable variables.
    196 
    197     See Optimizer.compute_gradients() for more info.
    198 
    199     In DistributedOptimizer, compute_gradients() is overridden to also
    200     allreduce the gradients before returning them.
    201     """
    202     gradients = (super(DistributedOptimizer, self)
    203                  .compute_gradients(*args, **kwargs))
    204     return [(allreduce(gradient), var) for (gradient, var) in gradients]
    205 
    206   def _apply_dense(self, *args, **kwargs):
    207     """Calls this same method on the underlying optimizer."""
    208     return self._optimizer._apply_dense(*args, **kwargs)
    209 
    210   def _apply_sparse(self, *args, **kwargs):
    211     """Calls this same method on the underlying optimizer."""
    212     return self._optimizer._apply_sparse(*args, **kwargs)
    213 
    214   def _apply_sparse_duplicate_indices(self, *args, **kwargs):
    215     """Calls this same method on the underlying optimizer."""
    216     return self._optimizer._apply_sparse_duplicate_indices(*args,
    217                                                            **kwargs)
    218 
    219   def _prepare(self, *args, **kwargs):
    220     """Calls this same method on the underlying optimizer."""
    221     return self._optimizer._prepare(*args, **kwargs)
    222 
    223   def _create_slots(self, *args, **kwargs):
    224     """Calls this same method on the underlying optimizer."""
    225     return self._optimizer._create_slots(*args, **kwargs)
    226 
    227   def _valid_dtypes(self, *args, **kwargs):
    228     """Calls this same method on the underlying optimizer."""
    229     return self._optimizer._valid_dtypes(*args, **kwargs)
    230 
    231   def _finish(self, *args, **kwargs):
    232     """Calls this same method on the underlying optimizer."""
    233     return self._optimizer._finish(*args, **kwargs)
    234 
    235 
    236 class Session(tf.Session):
    237   """A class for running TensorFlow operations, with copies of the same graph
    238   running distributed across different MPI nodes.
    239 
    240   The primary difference between `tf.Session` and
    241   `tf.contrib.mpi_collectives.Session` is that the MPI `Session` ensures that
    242   the `Session` options are correct for use with `tf.contrib.mpi`, and
    243   initializes MPI immediately upon the start of the session.
    244   """
    245 
    246   def __init__(self, target='', graph=None, config=None):
    247     """Creates a new TensorFlow MPI session.
    248 
    249     Unlike a normal `tf.Session`, an MPI Session may only use a single GPU,
    250     which must be specified in advance before the session is initialized.
    251     In addition, it only uses a single graph evaluation thread, and
    252     initializes MPI immediately upon starting.
    253 
    254     If no `graph` argument is specified when constructing the session,
    255     the default graph will be launched in the session. If you are
    256     using more than one graph (created with `tf.Graph()` in the same
    257     process, you will have to use different sessions for each graph,
    258     but each graph can be used in multiple sessions. In this case, it
    259     is often clearer to pass the graph to be launched explicitly to
    260     the session constructor.
    261 
    262     Args:
    263     target: (Optional.) The execution engine to connect to.
    264     graph: (Optional.) The `Graph` to be launched (described above).
    265     config: (Optional.) A `ConfigProto` protocol buffer with configuration
    266     options for the session.
    267     """
    268     super(Session, self).__init__(target, graph, config=config)
    269 
    270     # Initialize MPI on the relevant device.
    271     # TODO: Move this to library load and eliminate mpi.Session()
    272     if graph is None:
    273       graph = tf.get_default_graph()
    274     with graph.as_default():
    275       self.run(init())
    276