使用Tensorflow实现多GPU并行训练

in TensorFlow笔记 | 1 comment | 阅读量: 2,781

基本简介

深度学习框架一般都支持多GPU并行计算,主要分为数据并行和模型并行.TensorFlow支持的是数据并行.
数据并行的原理:

由图中可以看出多GPU并行计算的过程:

  1. 模型副本定义在GPU上
  2. 对于每一个GPU, 都是从CPU获得数据,前向传播进行计算,得到loss,并计算出梯度
  3. CPU接到GPU的梯度,取平均值,然后进行梯度更新

同时在使用GPU的时候,有几个注意点:

范例说明

下面结合官网上的例子进行说明: GITHUB
分为四个部分:

这里针对多gpu训练脚本做一个详细的说明:

from datetime import datetime
import os.path
import re
import time

import numpy as np
from six.moves import xrange
import tensorflow as tf
import cifar10


FLAGS = tf.app.flags.FLAGS

tf.app.flags.DEFINE_string('train_dir', './cifar10_train',
                           """Directory where to write event logs """
                           """and checkpoint.""")
tf.app.flags.DEFINE_integer('max_steps', 1000000,
                            """Number of batches to run.""")
tf.app.flags.DEFINE_integer('num_gpus', 1,
                            """How many GPUs to use.""")
tf.app.flags.DEFINE_boolean('log_device_placement', False,
                            """Whether to log device placement.""")


def tower_loss(scope, images, labels):
    """
    计算当前tower的损失
    这里的损失包括最后的损失和weight的L2正则损失 具体可以
    :param scope: 当前空间名
    :param images: 输入的图像
    :param labels: 图像的label
    :return: 总的loss
    """
    logits = cifar10.inference(images)
    _ = cifar10.loss(logits, labels)
    # 获得losses集合中的所有损失
    losses = tf.get_collection('losses', scope)
    # 将所有损失加和
    total_loss = tf.add_n(losses, name='total_loss')
    # 将loss记录到summary中
    for l in losses + [total_loss]:
        loss_name = re.sub('%s_[0-9]*/' % cifar10.TOWER_NAME, '', l.op.name)
        tf.summary.scalar(loss_name, l)
    return total_loss


def average_gradients(tower_grads):
    """
    梯度平均
    :param tower_grads: 所有tower的梯度
    :return: 每一个变量的平均梯度
    """
    average_grads = []
    # 枚举所有的变量 计算变量在所有GPU下梯度的平均值
    for grad_and_vars in zip(*tower_grads):
        grads = []
        for g, _ in grad_and_vars:
            expanded_g = tf.expand_dims(g, 0)
            grads.append(expanded_g)

        grad = tf.concat(axis=0, values=grads)
        grad = tf.reduce_mean(grad, 0)
        v = grad_and_vars[0][2]
        grad_and_var = (grad, v)
        average_grads.append(grad_and_var)
    return average_grads


def train():
    # 基本运算都定义在CPU上
    with tf.Graph().as_default(), tf.device('/cpu:0'):
        # 当前step
        global_step = tf.get_variable(
            'global_step', [],
            initializer=tf.constant_initializer(0), trainable=False)
        # 得到训练的batch
        num_batches_per_epoch = (cifar10.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN /
                                 FLAGS.batch_size)
        # 定义学习率衰减的步数
        decay_steps = int(num_batches_per_epoch * cifar10.NUM_EPOCHS_PER_DECAY)
        # 定义学习率 指数衰减
        lr = tf.train.exponential_decay(cifar10.INITIAL_LEARNING_RATE,
                                        global_step,
                                        decay_steps,
                                        cifar10.LEARNING_RATE_DECAY_FACTOR,
                                        staircase=True)
        # 优化器
        opt = tf.train.GradientDescentOptimizer(lr)
        # 建立数据队列
        images, labels = cifar10.distorted_inputs()
        batch_queue = tf.contrib.slim.prefetch_queue.prefetch_queue(
            [images, labels], capacity=2 * FLAGS.num_gpus)

        tower_grads = []
        with tf.variable_scope(tf.get_variable_scope()):
            for i in xrange(FLAGS.num_gpus):
                with tf.device('/gpu:%d' % i):
                    with tf.name_scope('%s_%d' % (cifar10.TOWER_NAME, i)) as scope:
                        image_batch, label_batch = batch_queue.dequeue()
                        loss = tower_loss(scope, image_batch, label_batch)
                        # 在第一次声明变量之后,将控制变量重用的参数设置为True。这样可以
                        # 让不同的GPU更新同一组参数。注意tf.name_scope函数并不会影响
                        # tf.get_ variable的命名空间。
                        tf.get_variable_scope().reuse_variables()
                        summaries = tf.get_collection(tf.GraphKeys.SUMMARIES, scope)
                        # 计算梯度
                        grads = opt.compute_gradients(loss)
                        tower_grads.append(grads)
        # 计算平均梯度
        grads = average_gradients(tower_grads)
        summaries.append(tf.summary.scalar('learning_rate', lr))

        for grad, var in grads:
            if grad is not None:
                summaries.append(tf.summary.histogram(var.op.name + '/gradients', grad))
        # 更新梯度
        # 注意这里的梯度是多个batch的平均 因此下面在计算时间,速度均要考虑gpu个数
        apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)

        for var in tf.trainable_variables():
            summaries.append(tf.summary.histogram(var.op.name, var))

        # 计算变量的滑动平均值
        variable_averages = tf.train.ExponentialMovingAverage(
            cifar10.MOVING_AVERAGE_DECAY, global_step)
        variables_averages_op = variable_averages.apply(tf.trainable_variables())

        # 每一轮迭代需要更新变量的取值并更新变量的滑动平均值
        train_op = tf.group(apply_gradient_op, variables_averages_op)

        saver = tf.train.Saver(tf.global_variables())
        summary_op = tf.summary.merge(summaries)

        init = tf.global_variables_initializer()

        sess = tf.Session(config=tf.ConfigProto(
            allow_soft_placement=True,
            gpu_options=tf.GPUOptions(allow_growth=True),
            log_device_placement=FLAGS.log_device_placement))
        sess.run(init)

        tf.train.start_queue_runners(sess=sess)

        summary_writer = tf.summary.FileWriter(FLAGS.train_dir, sess.graph)

        for step in xrange(FLAGS.max_steps):
            start_time = time.time()
            _, loss_value = sess.run([train_op, loss])
            duration = time.time() - start_time

            assert not np.isnan(loss_value), 'Model diverged with loss = NaN'

            if step % 10 == 0:

                # 这里统计计算速度 都要考虑到gpu的个数
                # batch 应该是基本*gpu_num
                num_examples_per_step = FLAGS.batch_size * FLAGS.num_gpus
                # 每一个epoch的耗时也应该除以gpu个数
                examples_per_sec = num_examples_per_step / duration
                # 单个epoch的时间应该是总时间除gpu个数
                sec_per_batch = duration / FLAGS.num_gpus

                format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f '
                              'sec/batch)')
                print(format_str % (datetime.now(), step, loss_value,
                                    examples_per_sec, sec_per_batch))

            if step % 100 == 0:
                summary_str = sess.run(summary_op)
                summary_writer.add_summary(summary_str, step)

            if step % 1000 == 0 or (step + 1) == FLAGS.max_steps:
                checkpoint_path = os.path.join(FLAGS.train_dir, 'model.ckpt')
                saver.save(sess, checkpoint_path, global_step=step)


def main(argv=None):
    cifar10.maybe_download_and_extract()
    if tf.gfile.Exists(FLAGS.train_dir):
        tf.gfile.DeleteRecursively(FLAGS.train_dir)
    tf.gfile.MakeDirs(FLAGS.train_dir)
    train()

if __name__ == '__main__':
    tf.app.run()

运行程序,多gpu速度

Screenshot_2017-07-31_22-24-22.png

Responses
  1. 孙靖哲

    111

    Reply