ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

python 多进程详解(Multiprocessing模块)

2020-08-05 10:33:14  阅读:391  来源: 互联网

标签:__ python multiprocessing 进程 详解 time print import Multiprocessing


python 多进程(MultiProcess)

1.Process

创建进程的类Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组,kwargs表示调用对象的字典,name为别名,group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,start()启动某个进程。join(timeout),使主调进程阻塞,直至被调用子进程运行结束或超时(如指定timeout)。
属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

1.1 创建函数并作为单个进程

import multiprocessing
import time


def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.start()
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

结果

p.pid: 2460
p.name: Process-1
p.is_alive: True
The time is Tue Aug  4 17:31:02 2020
The time is Tue Aug  4 17:31:05 2020
The time is Tue Aug  4 17:31:08 2020
The time is Tue Aug  4 17:31:11 2020
The time is Tue Aug  4 17:31:14 2020

1.2 创建函数并作为多个进程

import multiprocessing
import time

def worker_1(interval):
    print("worker_1")
    time.sleep(interval)
    print("end worker_1")

def worker_2(interval):
    print("worker_2")
    time.sleep(interval)
    print("end worker_2")

def worker_3(interval):
    print("worker_3")
    time.sleep(interval)
    print("end worker_3")

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

结果

The number of CPU is:8
child   p.name:Process-2	p.id2897
child   p.name:Process-1	p.id2896
child   p.name:Process-3	p.id2898
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

1.3 daemon程序对比结果(不加daemon属性)

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("end!")

结果

end!
work start:Tue Aug  4 17:37:02 2020
work end:Tue Aug  4 17:37:05 2020

1.4 daemon程序对比(设置daemon为True)

import multiprocessing
import time


def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    print("end!")

结果

end!

子进程设置了daemon属性,主进程结束,它们就随着结束了。

1.5 设置daemon执行完结束的方法

import multiprocessing
import time


def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    p.join()
    print("end!")

结果

work start:Tue Aug  4 17:41:11 2020
work end:Tue Aug  4 17:41:14 2020
end!

2.Lock

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突

import multiprocessing
import sys


def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()


def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()


if __name__ == "__main__":
    lock = multiprocessing.Lock()  # 声明一个锁
    f = "file.txt"
    w = multiprocessing.Process(target=worker_with, args=(lock, f))
    nw = multiprocessing.Process(target=worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print("end")

结果

Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with

3.Semaphore

信号量Semaphore是一个计数器,控制对公共资源或者临界区域的访问量,信号量可以指定同时访问资源或者进入临界区域的进程数。每次有一个进程获得信号量时,计数器-1,若计数器为0时,其他进程就停止访问信号量,一直阻塞直到其他进程释放信号量。
常用方法和属性
acquire(blocking = True, timeout=None)
请求一个信号量
release()
释放一个信号量

import time, random
from multiprocessing import Process, Semaphore

def ktv(i, sem):
    sem.acquire()
    print('%s 走进ktv' %i)
    time.sleep(random.randint(1, 5))
    print('%s 走出ktv' %i)
    sem.release()

if __name__ == "__main__":
    sem = Semaphore(4)
    for i in range(5):
        p = Process(target=ktv, args=(i, sem))
        p.start()

结果

0 走进ktv
3 走进ktv
4 走进ktv
1 走进ktv
1 走出ktv
2 走进ktv
2 走出ktv
0 走出ktv
3 走出ktv
4 走出ktv

4.Event

Python 多进程中 Event 是用来实现进程间同步通信的(当然多线程中也可以用event)。事件event运行的机制是:全局定义了一个Flag,如果Flag值为 False,当程序执行event.wait()方法时就会阻塞,如果Flag值为True时,程序执行event.wait()方法时不会阻塞继续执行。
常用方法和属性
wait(time)方法:等待 time 时间后,执行下一步。或者在调用 event.set() 后立即执行下一步。
set()方法:将Flag的值改成True。
clear()方法:将Flag的值改成False。
is_set()方法:判断当前的Flag的值。

import time
import random
from multiprocessing import Process
from multiprocessing import Event


def now():
    return str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))


def traffic_light(e):  # 红绿灯
    print(now() + ' \033[31m红灯亮\033[0m')  # Flag 默认是False
    while True:
        if e.is_set():  # 如果是绿灯
            time.sleep(2)  # 2秒后
            print(now() + ' \033[31m红灯亮\033[0m')  # 转为红灯
            e.clear()  # 设置为False

        else:  # 如果是红灯
            time.sleep(2)  # 2秒后
            print(now() + ' \033[32m绿灯亮\033[0m')  # 转为绿灯
            e.set()  # 设置为True

def people(e, i):
    if not e.is_set():
        print(now() + ' people %s 在等待' % i)
        e.wait()
        print(now() + ' people %s 通过了' % i)


if __name__ == '__main__':
    e = Event()  # 默认为 False,红灯亮
    p = Process(target=traffic_light, args=(e,))  # 红绿灯进程
    p.daemon = True
    p.start()
    process_list = []
    for i in range(6):  # 6人过马路
        time.sleep(random.randrange(0, 4, 2))
        p = Process(target=people, args=(e, i))
        p.start()
        process_list.append(p)

    for p in process_list:
        p.join()

结果

2020-08-04 17:53:34 红灯亮
2020-08-04 17:53:36 people 0 在等待
2020-08-04 17:53:36 绿灯亮
2020-08-04 17:53:36 people 0 通过了
2020-08-04 17:53:38 红灯亮
2020-08-04 17:53:39 people 3 在等待
2020-08-04 17:53:40 people 2 在等待
2020-08-04 17:53:40 绿灯亮
2020-08-04 17:53:40 people 2 通过了
2020-08-04 17:53:40 people 3 通过了

5.Queue

常用方法和属性
Queue([maxsize])
创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现
q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。
block用于控制阻塞行为,默认为True阻塞进程. 如果设置为False,不阻塞但将引发Queue.Empty异常(定义在Queue模块中)。
timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。
block控制阻塞行为,默认为True阻塞。如果设置为False,不阻塞但将引发Queue.Empty异常(定义在Queue库模块中)。
timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

# encoding: utf-8

import os
import time
from multiprocessing import Queue, Process, freeze_support


def inputQ(queue):
    info = str(os.getpid()) + "(put):" + str(time.asctime())
    queue.put(info)


def outputQ(queue):
    info = queue.get()
    print('%s%s \033[32m%s\033[0m' % (str(os.getpid()), '(get):', info))


if __name__ == '__main__':
    freeze_support()
    record1 = []  # store input process
    record2 = []  # stroe output process
    queue = Queue(3)

    # 输入进程
    for i in range(10):
        process = Process(target=inputQ, args=(queue,))
        process.start()
        record1.append(process)
    # 输出进程
    for i in range(10):
        process = Process(target=outputQ, args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()
    for p in record2:
        p.join()

结果

7647(get): 7641(put):Tue Aug  4 17:56:15 2020
7649(get): 7637(put):Tue Aug  4 17:56:15 2020
7648(get): 7639(put):Tue Aug  4 17:56:15 2020
7646(get): 7636(put):Tue Aug  4 17:56:15 2020
7651(get): 7642(put):Tue Aug  4 17:56:15 2020
7650(get): 7640(put):Tue Aug  4 17:56:15 2020
7654(get): 7638(put):Tue Aug  4 17:56:15 2020
7652(get): 7643(put):Tue Aug  4 17:56:15 2020
7655(get): 7645(put):Tue Aug  4 17:56:15 2020
7653(get): 7644(put):Tue Aug  4 17:56:15 2020

6.Pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

import multiprocessing
import time


def proc1(pipe):
    while True:
        for i in range(10000):
            print("send: %s" % (i))
            pipe.send(i)
            time.sleep(1)


def proc2(pipe):
    while True:
        print("proc2 rev:", pipe.recv())
        time.sleep(1)





if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

结果

send: 0
proc2 rev: 0
send: 1
proc2 rev: 1
send: 2
proc2 rev: 2进程池是多个需要被执行的任务在进程池外面排队等待获取进程对象去执行自己, 而信号量是一堆进程等待着去执行一段逻辑代码.

信号量不能控制创建多少个进程, 但是可以控制同时多少个进程能够执行.
进程池能控制可以创建多少个进程.
send: 3
proc2 rev: 3
send: 4
proc2 rev: 4
send: 5
proc2 rev: 5
send: 6
proc2 rev: 6
...

7.Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

进程池是多个需要被执行的任务在进程池外面排队等待获取进程对象去执行自己, 而信号量是一堆进程等待着去执行一段逻辑代码.
信号量不能控制创建多少个进程, 但是可以控制同时多少个进程能够执行.
进程池能控制可以创建多少个进程.

7.1 非阻塞

# coding: utf-8
import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = "hello %d" % (i)
        # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        pool.apply_async(func, (msg, ))

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

结果

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
msg: hello 3
end
Sub-process(es) done.

7.2 阻塞

#coding: utf-8
import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = "hello %d" % (i)
        pool.apply(func, (msg, ))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

结果

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

7.3 使用进程池,关注结果

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in range(3):
        msg = "hello %d" % (i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print(":::", res.get())
    print("Sub-process(es) done.")

结果

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

8.数据共享

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的。
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此。

8.1 list

# -*-encoding:utf-8-*-
from multiprocessing import Process, Manager
from time import sleep


def thread_a_main(sync_data_pool):  # A 进程主函数,存入100+的数
    for ix in range(100, 105):
        sleep(1)
        sync_data_pool.append(ix)


def thread_b_main(sync_data_pool):  # B 进程主函数,存入300+的数
    for ix in range(300, 309):
        sleep(0.6)
        sync_data_pool.append(ix)


def _test_case_000():  # 测试用例
    manager = Manager()  # multiprocessing 中的 Manager 是一个工厂方法,直接获取一个 SyncManager 的实例
    sync_data_pool = manager.list()  # 利用 SyncManager 的实例来创建同步数据池
    Process(target=thread_a_main, args=(
        sync_data_pool, )).start()  # 创建并启动 A 进程
    Process(target=thread_b_main, args=(
        sync_data_pool, )).start()  # 创建并启动 B 进程
    for ix in range(6):  # C 进程(主进程)中实时的去查看数据池中的数据
        sleep(1)
        print(sync_data_pool)


if '__main__' == __name__:  # 将测试用例单独列出
    _test_case_000()

结果

[300]
[300, 100, 301, 302]
[300, 100, 301, 302, 101, 303]
[300, 100, 301, 302, 101, 303, 304, 102, 305]
[300, 100, 301, 302, 101, 303, 304, 102, 305, 103, 306, 307]
[300, 100, 301, 302, 101, 303, 304, 102, 305, 103, 306, 307, 104, 308]

8.2 dict

from multiprocessing import Manager, Lock, Process
import time


def worker(d, key, value):
    print(key, value)
    d[key] = value


if __name__ == '__main__':
    mgr = Manager()
    d = mgr.dict()
    jobs = [Process(
        target=worker, args=(d, i, i*2)) for i in range(10)]

    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print('Results:')
    print(d)

结果

6 12
7 14
9 18
0 0
8 16
2 4
5 10
1 2
3 6
4 8
Results:
{6: 12, 7: 14, 9: 18, 0: 0, 8: 16, 2: 4, 5: 10, 1: 2, 3: 6, 4: 8}

标签:__,python,multiprocessing,进程,详解,time,print,import,Multiprocessing
来源: https://www.cnblogs.com/buyizhiyou/p/13438251.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有