ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

分布式tensorflow介绍1:实现最简单的ps-work工作模式

2022-01-29 12:31:05  阅读:227  来源: 互联网

标签:ps 机器 10 work worker 0.4 tf tensorflow float32


tensorflow的Parameter server架构(PS架构),集群中的节点被分为两类:参数服务器(parameter server)和工作服务器(worker)。其中参数服务器存放模型的参数,而工作服务器负责计算参数的梯度。在每个迭代过程,工作服务器从参数服务器中获得参数,然后将计算的梯度返回给参数服务器,参数服务器聚合从工作服务器传回的梯度,然后更新参数,并将新的参数广播给工作服务器。下面给一个简单的例子来说明,在一台机器上构建ps和worker(ip地址相同,端口号不同即可实现,和两台机器实际上是一样的),ps端和worker端代码如下:

ps:

#coding=utf-8
#上面是因为worker计算内容各不相同,不过再深度学习中,一般每个worker的计算内容是一样的,
# 以为都是计算神经网络的每个batch 前向传导,所以一般代码是重用的
#coding=utf-8
#多台机器,每台机器有一个显卡、或者多个显卡,这种训练叫做分布式训练
from time import sleep
import tensorflow.compat.v1 as tf
#现在假设我们有A、B、C、D四台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的代码内容大部分相同
# ,除了开始定义的时候,需要各自指定该台机器的task之外。以机器A为例子,A机器上的代码如下:
cluster=tf.train.ClusterSpec({
    "worker": [
    "127.0.0.1:1234",#格式 IP地址:端口号,第一台机器A的IP地址 ,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0
    ],
    "ps": [
    "127.0.0.1:2223"#第四台机器的IP地址 对应到代码块:/job:ps/task:0
    ]})
#不同的机器,下面这一行代码各不相同,server可以根据job_name、task_index两个参数,查找到集群cluster中对应的机器
isps=True
if isps:
    server=tf.train.Server(cluster,job_name='ps',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
    server.join()
else:
    server=tf.train.Server(cluster,job_name='worker',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
    with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:0',cluster=cluster)):
        w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
        b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
        addwb=w+b
        mutwb=w*b
        divwb=w/b
saver = tf.train.Saver()
summary_op = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
sv = tf.train.Supervisor(init_op=init_op, summary_op=summary_op, saver=saver)
with sv.managed_session(server.target) as sess:
    while 1:
        print(sess.run([addwb,mutwb,divwb]))

worker:

#coding=utf-8
#上面是因为worker计算内容各不相同,不过再深度学习中,一般每个worker的计算内容是一样的,
# 以为都是计算神经网络的每个batch 前向传导,所以一般代码是重用的
from time import sleep
import tensorflow.compat.v1 as tf
#现在假设我们有A、B台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的代码内容大部分相同
# ,除了开始定义的时候,需要各自指定该台机器的task之外。以机器A为例子,A机器上的代码如下:
cluster=tf.train.ClusterSpec({
    "worker": [
        "127.0.0.1:1234",#格式 IP地址:端口号,第一台机器A的IP地址 ,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0
    ],
    "ps": [
        "127.0.0.1:2223"#第四台机器的IP地址 对应到代码块:/job:ps/task:0
    ]})
 
#不同的机器,下面这一行代码各不相同,server可以根据job_name、task_index两个参数,查找到集群cluster中对应的机器
 
isps=False
if isps:
	server=tf.train.Server(cluster,job_name='ps',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
	server.join()
else:
	server=tf.train.Server(cluster,job_name='worker',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
	with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:0',cluster=cluster)):
		w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
		b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
		addwb=w+b
		mutwb=w*b
		divwb=w/b
 
saver = tf.train.Saver()
summary_op = tf.summary.merge_all()
init_op = tf.initialize_all_variables()
sv = tf.train.Supervisor(logdir="./simple_log", init_op=init_op, summary_op=summary_op, saver=saver)
# Supervisor会自动判断在logdir模型是否存在,去找checkpoint,如果存在的话会自动读取模型,不用显示地调用restore。
with sv.managed_session(server.target) as sess:
    count = 0
    while 1:
        count = count + 1
        print(str(count) + "th compute result is :")
        print(sess.run([addwb,mutwb,divwb]))
        print("***********************************")
        sleep(1)
        if count >= 5:
            break

先执行ps,ps会在server.join()处等待worker传回参数,之后执行worker打印结果如下:

1th compute result is :
[array([[7., 7.],
       [7., 7.]], dtype=float32), array([[10., 10.],
       [10., 10.]], dtype=float32), array([[0.4, 0.4],
       [0.4, 0.4]], dtype=float32)]
***********************************
2th compute result is :
[array([[7., 7.],
       [7., 7.]], dtype=float32), array([[10., 10.],
       [10., 10.]], dtype=float32), array([[0.4, 0.4],
       [0.4, 0.4]], dtype=float32)]
***********************************
3th compute result is :
[array([[7., 7.],
       [7., 7.]], dtype=float32), array([[10., 10.],
       [10., 10.]], dtype=float32), array([[0.4, 0.4],
       [0.4, 0.4]], dtype=float32)]
***********************************
4th compute result is :
[array([[7., 7.],
       [7., 7.]], dtype=float32), array([[10., 10.],
       [10., 10.]], dtype=float32), array([[0.4, 0.4],
       [0.4, 0.4]], dtype=float32)]
***********************************
5th compute result is :
[array([[7., 7.],
       [7., 7.]], dtype=float32), array([[10., 10.],
       [10., 10.]], dtype=float32), array([[0.4, 0.4],
       [0.4, 0.4]], dtype=float32)]
***********************************

标签:ps,机器,10,work,worker,0.4,tf,tensorflow,float32
来源: https://blog.csdn.net/fangfanglovezhou/article/details/122741122

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有