ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

python多进程并发和数据共享(使用队列、大数组通信)

2022-06-02 13:01:44  阅读:231  来源: 互联网

标签:python 数据共享 队列 https time 进程 input multiprocessing proc2


建议和原则:

1. 进程间(甚至是机器间)数据共享用Manager,数据交换用 Pipe或Queue

2. 进程之间默认是不能共享全局变量的 (子进程不能改变主进程中全局变量的值)。

3. 共享全局变量:需要用(multiprocessing.Value("d",10.0),数值)(multiprocessing.Array("i",[1,2,3,4,5]),数组)(multiprocessing.Manager().dict(),字典)(multiprocessing.Manager().list(range(5)),列表)。

4. 进程间通信(进程之间传递数据)的方式有:队列(multiprocessing.Queue(),单向通信),管道( multiprocessing.Pipe() ,双向通信)等。

 

一些思路:

一、少量数据通信:通过subprocess.Popen()的stdin, stdout输入输出。或者通过它的communicate

详细介绍subprocess的api:https://docs.python.org/zh-cn/3/library/subprocess.html#subprocess.Popen.communicate

方法1:stdin, stdout: https://blog.csdn.net/nescafe1111/article/details/15028739

方法2:communicate:https://www.liaoxuefeng.com/wiki/1016959663602400/1017628290184064

二、大数组传递:全局变量?共享内存?queue?pipe?

方法1:全局变量:

方法2:共享内存:sharedmem,http://www.voidcn.com/article/p-wfnoaoho-bsx.html

方法3:queue: https://blog.csdn.net/brucewong0516/article/details/85796073

方法4:pipe: https://blog.csdn.net/mixintu/article/details/102073990

方法5:先变成一维,再通过multiprocessing.Array()传递,接收后再变为多维

 

实测:

1. 进程间共享或者访问处理少量数据,用Manager的list或者dict是可以的。

2. 而大数组之类的数据交换和访问不要用Manager,否则速度会很慢!!!!!(猜测它们底层还是用RawArray等函数实现一维和多维的转换的,所以速度不行!!!)这种情况下还是得用Multiprocessing.Pipe或者Multiprocessing.Queue

 

需求:

1,运行主函数,并行开启子进程1和子进程2

2,实时接收子进程1的输出

3,实时监听子进程2的忙闲状态,在子进程2空闲时,将子进程1的输出传给子进程2处理;

在子进程2忙时,暂存缓冲,等子进程2空闲时再传递给它处理

4,最终主函数的输出顺序按照 子进程1的输出传递给子进程2的顺序 进行输出

import multiprocessing as mp
from multiprocessing import Process, Queue

time_cost=[]

# 子进程1,producer
def proc1(input_queue):
input_queue.put({k:v})

# 子进程2,consumer
def proc2(input_queue):
global time_cost
while True:
'''
block=True, 阻塞方式运行。队列数据满或者为空时,让生产或消费者等待,等timeout时间到达后抛出异常
block=False,非阻塞,队列数据满或为空时,直接抛出异常
默认阻塞方式,timeout=None. timeout为None,无限阻塞(允许阻塞的事件没有限制)
'''
try:
t1 = time.time() * 1000 # 单位:毫秒

# input_dict =input_q.get(block=True, timeout=2) # 阻塞方式运行,proc1和proc2同时只能运行1个,交替运行,相当于还是串行。
input_dict = input_q.get(block=False) # 非阻塞方式运行,proc1和proc2同时运行,consumer始终保持运行,producer一产生结果放入到队列里,马上就进行处理。真正的并行
[(k, v)] = input_dict.items()

if k == 'False':
break

print('proc2 result!')

t2 = time.time()-t1
time_cost.append(t2)

execpt:
pass
print('avg. time cost:', sum(time_cost) / len(time_cost))

# 主函数
if __name__ == '__main__':
t_start = time.time() * 1000 # 程序启动
print('start:', t_start)

input_q = mp.Queue()
s_proc1 = Process(target=proc1, args=(input_q, ))
s_proc2 = Process(target=proc2, args=(input_q, ))

s_proc1.start()
s_proc2.start()

s_proc1.join() # 强制执行完毕
s_proc2.join()

t_stop = time.time() * 1000 # 程序结束
print('end:', t_stop - t_start)
其他:

# Poll方法查看子进程的状态:
0 正常结束
1 sleep
2 子进程不存在
-15 kill
None 在运行

# 经实测,此方法不能成为proc2中while循环的判断条件
使用大数组(如果数组较大的话,不建议使用此方法,因为速度慢!):

>>> import numpy as np

SHAPE=(3,3,3)

# 要保存的单个数据元素
>>> d = np.array([[[1,2,3], [3,4,5], [4,5,6], [1,2,3], [3,4,5], [4,5,6], [1,2,3], [3,4,5], [4,5,6],]])
>>> d = d.reshape((SHAPE))

# 进程间传递数据的载体:大数组
>>> send_arr = []
>>> send_arr.append(d)
>>> send_arr.append(d)

# 查看传递的内容
>>> send_arr
[array([[[1, 2, 3],
[3, 4, 5],
[4, 5, 6]],

[[1, 2, 3],
[3, 4, 5],
[4, 5, 6]],

[[1, 2, 3],
[3, 4, 5],
[4, 5, 6]]]), array([[[1, 2, 3],
[3, 4, 5],
[4, 5, 6]],

[[1, 2, 3],
[3, 4, 5],
[4, 5, 6]],

[[1, 2, 3],
[3, 4, 5],
[4, 5, 6]]])]

# 扁平化之后才能传递。这一步比较耗时!! 其他扁平化方法未测试:squeeze, reshape(-1)..
>>> e_share = mp.RawArray('d', send_arr.ravel())

# 接收到之后解包
>>> e_new = np.frombuffer(e_share, dtype=np.double)
>>> k = e_new.reshape((2,9,3)) # 大数组包含的元素个数,单个元素包含的子元素个数,单个元素的列数
>>> recv_arr = []
>>> for i in k:
... i = np.array(i)
... i = i.reshape((3,3,3))
... recv_arr.append(i)
...

# 查看接收到的内容
>>> recv_arr
[array([[[1., 2., 3.],
[3., 4., 5.],
[4., 5., 6.]],

[[1., 2., 3.],
[3., 4., 5.],
[4., 5., 6.]],

[[1., 2., 3.],
[3., 4., 5.],
[4., 5., 6.]]]), array([[[1., 2., 3.],
[3., 4., 5.],
[4., 5., 6.]],

[[1., 2., 3.],
[3., 4., 5.],
[4., 5., 6.]],

[[1., 2., 3.],
[3., 4., 5.],
[4., 5., 6.]]])]

---------------------------------------------------------------------------------------------------------

整理一下,有机会应该每个都实现一遍。网上的例程最大的问题就是逻辑和数据假设的太简单,包括运行环境。导致一放到实际中就不能用。

 

完整的介绍python多进程:https://www.cnblogs.com/kaituorensheng/p/4445418.html

完整的介绍长时间运行的多个进程之间的相互通信:https://eli.thegreenplace.net/2017/interacting-with-a-long-running-child-process-in-python/

如何优雅地停止多进程:https://blog.csdn.net/qxqxqzzz/article/details/105642875

 

其他参考:

进程间通信(IPC): https://cloud.tencent.com/developer/article/1496658
参考:https://blog.csdn.net/houyanhua1/article/details/78236514

 

其他致谢:

https://blog.csdn.net/faihung/article/details/90516180

标签:python,数据共享,队列,https,time,进程,input,multiprocessing,proc2
来源: https://www.cnblogs.com/zhukaijian/p/16337388.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有