ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

07_线程池

2020-09-06 10:33:40  阅读:225  来源: 互联网

标签:num 07 future 线程 time print ThreadPoolExecutor


1.为什么用线程池

    1.启动一个新线程的消耗较高且涉及与操作系统的交互,尤其是程序中需要创建大量生存期很短暂的线程,而使用线程池可以很好地提升性能

    2.线程池则是创建指定线程数量等待执行事件,当该事件执行结束后该线程并不会死亡,而是回到线程池中变成空闲状态等待执行下一个事件

    3.当系统中包含有大量的并发线程时,会导致系统性能急剧下降甚至导致解释器崩溃,而使用线程池可以有效地控制系统中并发线程的数量

2.线程池的使用步骤

    1.threadpool 模块实现线程池不推荐继续使用,此处推荐是用 concurrent.futures 模块中的 ThreadPoolExecutor 类实现线程池

    2.调用 ThreadPoolExecutor 类的构造器创建一个线程池,定义一个普通函数作为线程任务

    3.调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池

3.语法概述

from concurrent.futures import ThreadPoolExecutor

thread_pool = ThreadPoolExecutor(max_workers=5)  # 创建指定进程数量进程池并返回进程池对象
 future = thread_pool.submit(fn, *args, **kwargs)  # 将fn函数提交给线程池,并返回一个 future 对象
    参数:
        *args 代表传给 fn 函数的参数
        *kwargs 代表以关键字参数的形式为 fn 函数传入参数
# 该函数类似于内建函数 map(func, *iterables) 只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理
thread_pool.map(func, *iterables, timeout=None, chunksize=1)  # 提交多个任务给池中,等效for + submit
thread_pool.shutdown(wait=True)  # 等待池内所有任务执行完毕后关闭线程池

future.cancel()  # 取消该future代表的线程任务,如果该任务正在执行不可取消则该方法返回False,否则程序会取消该任务并返回True
future.cancelled()  # 返回future代表的线程任务是否被成功取消
future.running()  # 如果该future代表的线程任务正在执行不可被取消,该方法返回True
future.done()  # 如果该funture代表的线程任务被成功取消或执行完成,则该方法返回True
future.result(timeout=None) # 获取该future代表的线程任务最后返回的结果,如果future代表的线程任务还未完成该方法将会阻塞当前线程
    参数: timeout 指定最多阻塞等待多少秒
future.exception(timeout=None)  # 获取该future代表的线程任务所引发的异常,如果该任务成功完成没有异常则该方法返回None
future.add_done_callback(fn)  # 为该future代表的线程任务注册一个回调函数,当该任务成功完成时程序会自动触发该fn函数

4.线程池的基本使用

import time
from concurrent.futures import ThreadPoolExecutor


def func(num):
    time.sleep(1)
    print("num is %s" % num)
    return num * num


def main():
    ret_list = list()
    executor = ThreadPoolExecutor(max_workers=5)  # 指定线程池中的线程数量
    for i in range(5):
        future = executor.submit(func, i)  # 异步提交任务,返回一个未来对象future
        ret_list.append(future)
    executor.shutdown(True)  # 等待池内所有任务执行完毕回收完资源后才继续

    for ret in ret_list:
        ret = ret.result()  # 获取该future代表的线程任务最后返回的结果
        print(ret)


if __name__ == "__main__":
    main()
"""执行结果
    num is 1
    num is 0
    num is 3
    num is 2
    num is 4
    0
    1
    4
    9
    16
"""

5.线程池的多任务提交

from concurrent.futures import ThreadPoolExecutor
import time


def func(num):
    sum = 0
    for i in range(num):
        sum += i ** 2
    print(sum)


t = ThreadPoolExecutor(20)
start = time.time()
t.map(func, range(1000))  # 提交多个任务给池中,等效于 for + submit
t.shutdown()
print(time.time() - start)

6.线程池的返回值

from concurrent.futures import ThreadPoolExecutor
import time


def func(num):
    sum = 0
    # time.sleep(5)
    # print(num)  # 异步的效果
    for i in range(num):
        sum += i ** 2
    return sum


t = ThreadPoolExecutor(20)

# 下列代码是用map的方式提交多个任务,对应拿结果的方法是__next__()返回的是一个生成器对象
res = t.map(func, range(1000))
t.shutdown()
print(res.__next__())
print(res.__next__())
print(res.__next__())
print(res.__next__())

# 下列代码是用for + submit提交多个任务的方式,对应拿结果的方法是result
t = ThreadPoolExecutor(20)
res_l = []
for i in range(1000):
    re = t.submit(func, i)
    res_l.append(re)
t.shutdown()
[print(i.result()) for i in res_l]
# 在Pool进程池中拿结果,是用get方法,在ThreadPoolExecutor里边拿结果是用result方法

7.线程池中子线程调用回调函数

from threading import current_thread
from concurrent.futures import ThreadPoolExecutor
# 在线程池中,回调函数是子线程调用的,和父线程没有关系
from concurrent.futures import ProcessPoolExecutor
# 不管是ProcessPoolExecutor的进程池 还是Pool的进程池,回调函数都是父进程调用的,和子进程没有关系
import os


def func(num):
    sum = 0
    for i in range(num):
        sum += i ** 2
    print('这是在子线程中', current_thread())  # current_thread()查看线程标识,类似于进程中的getpid()
    return sum


def call_back_fun(res):
    # print(res.result(), os.getpid())
    print('这是在回调函数中', res.result(), current_thread())
    # print(os.getpid())


if __name__ == '__main__':
    print(os.getpid())
    t = ThreadPoolExecutor(20)  # 线程池
    # t = ProcessPoolExecutor(20)  # 进程池
    for i in range(10):
        t.submit(func, i).add_done_callback(call_back_fun)
    t.shutdown()
    print('这是在主线程中', current_thread())

8.进程池和线程池效率对比

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Pool

# concurrent.futures 这个模块是异步调用的机制
# concurrent.futures 提交任务都是用submit
# for + submit 多个任务的提交
# shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务

# from multiprocessing import Pool.apply / apply_async
import time


def func(num):
    sum = 0
    for i in range(num):
        for j in range(i):
            for x in range(j):
                sum += x ** 2
    # print(sum)


if __name__ == '__main__':
    # pool的进程池的效率演示
    p = Pool(5)
    start = time.time()
    for i in range(100):
        p.apply_async(func, args=(i,))
    p.close()
    p.join()
    print('Pool进程池的效率时间是%s' % (time.time() - start))  # 0.51

    # 多进程的效率演示
    tp = ProcessPoolExecutor(5)
    start = time.time()
    for i in range(100):
        tp.submit(func, i)
    tp.shutdown()  # 等效于进程池中的 close + join
    print('ProcessPoolExecutor进程池的消耗时间为%s' % (time.time() - start))  # 0.49

    # 多线程的效率
    tp = ThreadPoolExecutor(20)
    start = time.time()
    for i in range(100):
        tp.submit(func, i)
    tp.shutdown()  # 等效于 进程池中的 close + join
    print('ThreadPoolExecutor线程池的消耗时间为%s' % (time.time() - start))  # 1.40

# 结果: 针对计算密集的程序来说
#     不管是Pool的进程池还是ProcessPoolExecutor()的进程池,执行效率相当
#     ThreadPoolExecutor 的效率要差很多
#     所以当计算密集时,使用多进程

标签:num,07,future,线程,time,print,ThreadPoolExecutor
来源: https://www.cnblogs.com/tangxuecheng/p/13620899.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有