Auto Byte

Science AI

# TensorFlow分布式计算机制解读：以数据并行为重

Tensorflow 是一个为数值计算（最常见的是训练神经网络）设计的流行开源库。在这个框架中，计算流程通过数据流程图（data flow graph）设计，这为更改操作结构与安置提供了很大灵活性。TensorFlow 允许多个 worker 并行计算，这对必须通过处理的大量训练数据训练的神经网络是有益的。此外，如果模型足够大，这种并行化有时可能是必须的。在本文中，我们将探讨 TensorFlow 的分布式计算机制。

TensorFlow 计算图示例

TensorFlow 中的数据并行

# single GPU (baseline)   单个 GPU（基线）import tensorflow as tf# place the initial data on the cpuwith tf.device('/cpu:0'):   input_data = tf.Variable([[1., 2., 3.],[4., 5., 6.],[7., 8., 9.],[10., 11., 12.]])   b = tf.Variable([[1.], [1.], [2.]])# compute the result on the 0th gpuwith tf.device('/gpu:0'):   output = tf.matmul(input_data, b)# create a session and runwith tf.Session() as sess:   sess.run(tf.global_variables_initializer())print sess.run(output)
# in-graph replication   图内复制import tensorflow as tfnum_gpus = 2# place the initial data on the cpuwith tf.device('/cpu:0'):   input_data = tf.Variable([[1., 2., 3.],[4., 5., 6.],[7., 8., 9.],[10., 11., 12.]])   b = tf.Variable([[1.], [1.], [2.]])# split the data into chunks for each gpuinputs = tf.split(input_data, num_gpus)outputs = []# loop over available gpus and pass input datafor i in range(num_gpus):with tf.device('/gpu:'+str(i)):       outputs.append(tf.matmul(inputs[i], b))# merge the results of the deviceswith tf.device('/cpu:0'):   output = tf.concat(outputs, axis=0)# create a session and runwith tf.Session() as sess:   sess.run(tf.global_variables_initializer())print sess.run(output)

import sysimport tensorflow as tf# specify the cluster's architecturecluster = tf.train.ClusterSpec({'ps': ['192.168.1.1:1111'],'worker': ['192.168.1.2:1111','192.168.1.3:1111']})# parse command-line to specify machinejob_type = sys.argv[1]  # job type: "worker" or "ps"task_idx = sys.argv[2]  # index job in the worker or ps list# as defined in the ClusterSpec# create TensorFlow Server. This is how the machines communicate.server = tf.train.Server(cluster, job_name=job_type, task_index=task_idx)# parameter server is updated by remote clients.# will not proceed beyond this if statement.if job_type == 'ps':   server.join()else:# workers onlywith tf.device(tf.train.replica_device_setter(                       worker_device='/job:worker/task:'+task_idx,                       cluster=cluster)):# build your model here as if you only were using a single machinewith tf.Session(server.target):# train your model here