ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

PYTHON中的CONCURRENT.FUTURES模块

2022-04-16 22:03:08  阅读:163  来源: 互联网

标签:__ ProcessPoolExecutor PYTHON workers FUTURES CONCURRENT url import ThreadPoolEx


一 : 概述

  concurrent.futures模块提供了高度封装的异步调用接口

  ThreadPoolExecutor:线程池,提供异步调用

  ProcessPoolExecutor: 进程池,提供异步调用

  Both implement the same interface, which is defined by the abstract Executor class.

二 : 基本方法

  submit(fn, *args, **kwargs) 异步提交任务

  map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作

  shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作, wait=True,等待池内所有任务执行完毕回收完资源后才继续 , wait=False,立即返回,并不会等待池内的任务执行完毕 , 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 , submit和map必须在shutdown之前.

  result(timeout=None) 取得结果

  add_done_callback(fn) 添加回调函数 

 1 #介绍
 2 The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
 3 
 4 class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
 5 An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
 6 
 7 
 8 #用法
 9 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
10 
11 import os,time,random
12 def task(n):
13     print('%s is runing' %os.getpid())
14     time.sleep(random.randint(1,3))
15     return n**2
16 
17 if __name__ == '__main__':
18 
19     executor=ProcessPoolExecutor(max_workers=3)
20 
21     futures=[]
22     for i in range(11):
23         future=executor.submit(task,i)
24         futures.append(future)
25     executor.shutdown(True)
26     print('+++>')
27     for future in futures:
28         print(future.result())
29 
30 ProcessPoolExecutor
#介绍
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

#用法
与ProcessPoolExecutor相同

ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit

map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果

回调函数

转载至:https://www.cnblogs.com/DoingBe/p/9545066.html

标签:__,ProcessPoolExecutor,PYTHON,workers,FUTURES,CONCURRENT,url,import,ThreadPoolEx
来源: https://www.cnblogs.com/bbjs/p/16154372.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有