ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

多进程

2022-07-06 18:33:34  阅读:132  来源: 互联网

标签:__ queue print time 进程 multiprocessing


multiprocessing通过使用子进程而非线程有效的绕过了全局解释器锁。multiprocessing可以利用cpu的多核性能。multiprocessing的Api与threading类似

Process类

开启子进程的方法

  1. spawn
    • 启动一个全新的python解释器进程,子进程不继承父进程的文件描述符或其它资源,只继承和run相关的资源。windows默认
  2. fork
    • 父进程使用os.fork()来开启一个子进程。子进程继承父进程的所有资源。Unix默认。
  3. forkserver
    • 使用forkserver时,会启动一个服务器进程来调用os.fork()来创建子进程。
  4. 通过上下文对象创建子进程
    使用multiprocessing.get_context()方法来获取上下文对象,上下文对象有和multiprocessing相似的Api。
    对象在不同上下文创建的进程可能不兼容,fork上下文创建的锁不能传递给spawn或forkserver启动方法启动的进程。
    multiprocessing.get_context(method=None):
    • 返回一个Context对象,具有和multiprocessing 模块相同的API。
    • 如果method为None,则返回默认上下文对象
    • method为fork、spawn或forkserver
import multiprocessing 
import time 
def fun(i):
    print(f"process{i} start at {time.strftime('%X')}")


if __name__ == "__main__":
    ctx = multiprocessing.get_context()  
    p1 = ctx.Process(target=fun, args=(1,))
    p2 = ctx.Process(target=fun, args=(2,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Process

*class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, , daemon=None)
与threading.Thread的api类似。run()、start()、join()、name、is_alive()

  1. daemon
    • 守护进程的标志,当进程退出时,会尝试终止所有守护进程子进程。不允许在守护进程中创建子进程
  2. pid: 进程ID,start之前为None
  3. exitcode: 子进程退出代码。
    • 如果该进程尚未终止为None如果子进程run方法正常返回,退出代码将是0.如果它通过sys.exit()终止将返回一个N。
    • 如果时因为run内未捕获异常终止返回1
    • 如果由信号N终止,返回-N
  4. authkey: 进程的身份验证密钥(字节字符串)
    • multiprocessing初始化时,主进程使用os.urandom()分配一个随机字符串。
    • 创建Process对象时,它将继承父进程的身份验证密钥
  5. sentinel
    • 系统对象的数字句柄,当进程结束时变为ready
    • 如果要使用 multiprocessing.connection.wait() 一次等待多个事件,可以使用此值。否则调用 join() 更简单。
    • 在Windows上,这是一个操作系统句柄,可以与 WaitForSingleObject 和 WaitForMultipleObjects 系列API调用一起使用。在Unix上,这是一个文件描述符,可以使用来自 select 模块的原语。
  6. terminate():终止进程
    • Unix上这是使用SIGTERM 信号完成的;在Windows上使用 TerminateProcess() 。 请注意,不会执行退出处理程序和finally子句等。
    • 子进程的子进程不会被终止,它们会变成孤儿进程
  7. kill(): 与terminate相同,在Unix上使用SIGKILL信号
  8. close(): 关闭Process对象,释放与之关联的所有资源,如果底层进程仍在运行将会引发ValueError。

进程间交换对象

队列

multiprocessing的Queue类时queue.Queue的克隆,是一个线程安全的队列。put方法添加元素时如果队满会一直阻塞直到有空间放入元素。get方法获取元素时如果队空也会一直阻塞。
multiprocessing.Queue([maxsize])

  1. qsize():返回队列长度,但是由于多线程或多进程的上下文,数字不可靠。Unix平台会引起NotImplementedError
  2. empty():队列是否为空。因为多线程或多进程环境状态不可靠。
  3. put(obj, block=Ture, timeout=None):
    • 添加元素,block为True和timeout为None时会阻塞当前进程。直到有空的缓冲槽。
    • 如果timeout为正数,则会在超时后抛出queue.Full异常。如果block为False时,不会阻塞,会抛出queue.Full异常。
  4. put_nowait(obj): 等同于put(obj, block=False)
  5. get(block=True, timeout=None):
    • 获取元素。如果超时或者block为False会抛出queue.Empty异常。
  6. get_nowait(obj): 相当于get(False)
    get和put方法在队列关闭后会抛出ValueError(3.8)
  7. close():指示当前进程将不会再往队列中放入对象。一旦所有缓冲区的数据被写入管道之后,后台的线程会退出。
  8. join_thread():等待后台线程,再close方法之后调用,阻塞当前进程直到后台线程退出,确保缓冲区数据被写入管道。
  9. cancel_join_thread():防止join_thread方法阻塞当前进程。
    multiprocessing.SimpleQueue
    简化版的Queue
  10. close():关闭队列,释放内部资源。队列在被关闭后就不再被使用。不能再用get,put,empty方法
  11. empty()
  12. get()
  13. put(item)
    multiprocessing.JoinableQueue([maxsize])
    Queue子类额外添加了task_done和join方法
  14. task_done():
    • 支出之前进入队列的任务已经完成,由队列的消费者进程使用。每次调用get获取的任务,执行完成后调用task_done告诉队列该任务已经处理完成。
    • 如果join方法正在阻塞,则在所有对象都被处理完后返回。
  15. join(): 阻塞队列直到所有元素都被接受和处理完毕。
    • 当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。
import multiprocessing 
import random 
import time 
import random 

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue 

    def run(self):
        for i in range(10):
            item = random.randint(0,256)
            self.queue.put(item)
            print(f"producer append {item} to queue")
            time.sleep(1)
            print(f"the size of queue is {self.queue.qsize()}")

class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue 

    def run(self):
        while True:
            if self.queue.empty():
                print("queue is empty")
                break 
            else:
                time.sleep(2)
                item = self.queue.get()
                print(f"Consumer get {item}")
                time.sleep(1)


if __name__ =="__main__":
    queue = multiprocessing.Queue()
    producer = Producer(queue)
    consumer = Consumer(queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

管道

multiprocessing.Pipe([duplex])
返回一对Connection对象,(con1,con2),分别表示管道两端。duplex默认为True,表示可以双向通信。如果False为单向的con1只能接收消息,con2只能发送。
multiprocessing.connection.Connection
连接对象,允许发送可序列化对象。

  1. send(obj):发生一个可序列化的对象
  2. recv():返回另一端使用send发送的对象,该方法会一直阻塞直到接收到对象,如果对端关闭了连接或没有东西可接收返回EOFerror
  3. fileno():返回由连接对象使用的描述符或者句柄
  4. close():关闭对象
    5.poll([timeout]):返回连接对象中释放有可以读取的数据。如果timeout是None那么将一直等待不会超时。
import multiprocessing 
import time 

def send(left, right):
    left.send(['left', time.strftime("%X")])
    print(left.recv())


def recv(left,right):
    right.send(['right', time.strftime("%X")])
    print(right.recv())


if __name__ == '__main__':
    left,right = multiprocessing.Pipe()
    s_p = multiprocessing.Process(target=send, args=(left,right))
    s_p.start()
    r_p = multiprocessing.Process(target=recv, args=(left,right))
    
    r_p.start()
    s_p.join()
    r_p.join()

共享内存

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回从共享内存上创建的ctypes对象,默认情况下返回的对象实际上是经过了同步器包装过的,可以通过value属性访问对象本身。

  1. typecode_or_type指明了返回的对象类型。可能是ctype类型或array模块中每个类型对应的单字符长度的字符串。
  2. *args会传递给这个类的构造函数
  3. lock默认为True,将会新建一个递归式用于同步此值的访问操作。如果是Lock或RLock对象,那么这个传入的锁将会用于同步这个值的访问操作。如果为False,则这个对象的访问将没有锁保护,这个变量不是进程安全的。
from multiprocessing import Process, Value 

def f(v):
    with v.get_lock():  # += 类操作不具有原子性,使用对象内部关联锁
        v.value+=1

if __name__ == "__main__":
    v = Value('i',0)
    p1 = Process(target=f, args=(v,))
    p2 = Process(target=f, args=(v,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(v.value)

multiprocessing.Array(typecode_or_type, size_or_initializer,*,lock=True)

从共享内存申请并返回一个具有ctypes类型的数组对象,默认情况下返回值实际上是被同步器包装过的数组对象。

  1. size_or_initializer如果是整数,则表示数组长度,并且每个元素都会初始化为0,如果是一个序列,则会使用这个序列初始化数组中的每一元素,并且根据元素个数自动判断数组长度。
from multiprocessing import Process, Array 

def f(arr, i):
    arr[i]=i

if __name__ =="__main__":
    arr = Array('i', 10)
    processes = []
    for i in range(10):
        process = Process(target=f,args=(arr,i))
        processes.append(process)
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(arr[:])

管理器multiprocessing.Manager

管理器维护一个用于管理共享对象的服务,其他进程可以通过代理访问这些共享对象。
multiprocessing.Manager()返回一个已启动的SyncManager管理器对象,可以用于在不同进程中共享数据。
支持 list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。

from multiprocessing import Manager , Process

def f(mylist, i):
    mylist.append(i)

if __name__ =="__main__":
    manager = Manager()
    mylist = manager.list()
    processes = []
    for i in range(10):
        p = Process(target=f, args=(mylist, i))
        processes.append(p)
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(mylist)

进程池

multiprocessing.Pool([processes[,initalizer[,initargs[,maxtaskperchild[,[context]]]]])
返回一个进程池对象,它控制可以提交作业的工作进程池,支持超时和回调的异步结果以及并行的map。
- processes进程数,如果为None,则使用os.cup_count()返回的值
- 如果initalizer不为None,则每个工作进程将会在启动时调用initalizer(*initargs)

  1. apply(func[,args[,kwds]])
    • 使用args参数以及kwds命名参数调用func,在返回结果前阻塞
  2. apply_async(func[,args[,kwds[,callback[,error_callback]]]])
    • appyly的变种返回AsyncResult对象
    • callback和error_callback是一个接受单格参数的可调用对象,执行成功调用callback,否则调用error_callback
    • 回调函数应该立即执行完成,否则会阻塞负责处理结果的线程
  3. map(func, iterable[,chunksize])
    • 内置map()函数的并行版本,会保持阻塞到获得结果,该方法会将可迭代对象分割为许多块,提交给进程池,可以将chunksize设置为一个正整数从而近似指定块的大小
  4. map_async(func,iterable[,chunksize[,callback[,error_callback]]])
    • map的变种,返回AsyncResult对象
  5. close(): 阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。
  6. terminate():不等待未完成任务,立即停止工作进程,进程池对象被垃圾回收时,会立即调用termainate
  7. join(): 必须在close或terminate后调用

multiprocessing.pool.AsyncResult
Pool.apply_async和pool.map_async()返回的对象所属的类。

  1. get([timeout]):获取执行结果
  2. wait([timeout]): 阻塞直到返回结果
  3. ready(): 返回执行状态,是否已经完成
  4. successful():判断是否已经完成并且未引发异常。如果还未获得结果将引发ValueError
from multiprocessing import Pool 

import time 

def f(x):
    time.sleep(1)
    return x**x 

def mf(x):
    time.sleep(0.5)
    return x*2

def initializer(*args):
    print(args, time.strftime("%X"))

if __name__ =="__main__":
    with Pool(processes=4, initializer=initializer,initargs=("init-",)) as pool:
        print(f"apply - start {time.strftime('%X')}")
        print(pool.apply(f,(10,)))  # 阻塞直到运行完成
        print(f"apply - end{time.strftime('%X')}")
        print(f"apply_async - start {time.strftime('%X')}")
        result = pool.apply_async(f,(10,))  # 异步执行不阻塞当前进程
        print(f"apply_async - end{time.strftime('%X')}")
        print(result.get())

        print(f"map - start {time.strftime('%X')}")
        print(pool.map(mf,[i for i in range(10)]))  # 阻塞直到运行完成
        print(f"map- end{time.strftime('%X')}")
        print(f"map_async - start {time.strftime('%X')}")
        result = pool.map_async(mf,[i for i in range(10)])  # 异步执行不阻塞当前进程
        print(f"mapy_async - end{time.strftime('%X')}")
        print(result.get())

标签:__,queue,print,time,进程,multiprocessing
来源: https://www.cnblogs.com/baiyutang7/p/16438893.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有