ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

第五篇 asynico ,IO模型

2021-05-02 13:01:59  阅读:202  来源: 互联网

标签:await asynico 第五篇 IO print import def asyncio


  • asynico
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 
     4 # learn:https://pythonav.com/wiki/detail/6/91/
     5 
     6 # Python3.8之后 @asyncio.coroutine 装饰器就会被移除,推荐使用async & awit 关键字实现协程代码。
     7 
     8 import asyncio
     9 async def func1():
    10     print(1)
    11     await asyncio.sleep(2)
    12     print(2)
    13 async def func2():
    14     print(3)
    15     await asyncio.sleep(2)
    16     print(4)
    17 tasks = [
    18     asyncio.ensure_future(func1()),
    19     asyncio.ensure_future(func2())
    20 ]
    21 loop = asyncio.get_event_loop()
    22 loop.run_until_complete(asyncio.wait(tasks))
    23 
    24 
    25 """
    26 import asyncio
    27 @asyncio.coroutine
    28 def func1():
    29     print(1)
    30     yield from asyncio.sleep(2)  # 遇到IO耗时操作,自动化切换到tasks中的其他任务
    31     print(2)
    32 @asyncio.coroutine
    33 def func2():
    34     print(3)
    35     yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
    36     print(4)
    37 tasks = [
    38     asyncio.ensure_future( func1() ),
    39     asyncio.ensure_future( func2() )
    40 ]
    41 loop = asyncio.get_event_loop()
    42 loop.run_until_complete(asyncio.wait(tasks))
    43 """
    asynico_text
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 
     4 import time
     5 from functools import wraps
     6 def time_cost(func):
     7     @wraps(func)
     8     def inner(*args, **kwargs):
     9         start_time = time.time()
    10         ret = func(*args, **kwargs)
    11         end_time = time.time()
    12         print('总用时:%s 秒' % (end_time - start_time))
    13 
    14     return inner
    15 
    16 """
    17 
    18 # 方式一:同步编程实现
    19 import requests
    20 
    21 
    22 def download_image(url):
    23     print("开始下载:", url)
    24     # 发送网络请求,下载图片
    25     response = requests.get(url)
    26     print("下载完成")
    27     # 图片保存到本地文件
    28     file_name = url.rsplit('_')[-1]
    29     with open('img/' + file_name, mode='wb') as file_object:
    30         file_object.write(response.content)
    31 
    32 
    33 @time_cost
    34 def start_downloads(url_list):
    35     for item in url_list:
    36         download_image(item)
    37 
    38 
    39 if __name__ == '__main__':
    40     url_lists = [
    41         'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
    42         'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
    43         'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    44     ]
    45     start_downloads(url_list=url_lists)
    46 
    47 """
    48 
    49 # 方式二:基于协程的异步编程实现 (请提前安装:pip3 install aiohttp)
    50 import aiohttp
    51 import asyncio
    52 async def fetch(session, url):
    53     print("发送请求:", url)
    54     async with session.get(url, verify_ssl=False) as response:
    55         content = await response.content.read()
    56         file_name = url.rsplit('_')[-1]
    57         with open('img/' + file_name, mode='wb') as file_object:
    58             file_object.write(content)
    59 
    60 async def main():
    61     async with aiohttp.ClientSession() as session:
    62         url_list = [
    63             'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
    64             'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
    65             'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    66         ]
    67         tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
    68         await asyncio.wait(tasks)
    69 if __name__ == '__main__':
    70     @time_cost
    71     def run():
    72         asyncio.run(main())
    73     run()
    asynico_爬虫示例
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 import asyncio
     4 async def func():
     5     print("协程内部代码")
     6 # 调用协程函数,返回一个协程对象。
     7 result = func()
     8 # 方式一
     9 # loop = asyncio.get_event_loop() # 创建一个事件循环
    10 # loop.run_until_complete(result) # 将协程当做任务提交到事件循环的任务列表中,协程执行完成之后终止。
    11 # 方式二
    12 # 本质上方式一是一样的,内部先 创建事件循环 然后执行 run_until_complete,一个简便的写法。
    13 # asyncio.run 函数在 Python 3.7 中加入 asyncio 模块,
    14 asyncio.run(result)
    asynico_协程函数
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 
     4 # await
     5 # await是一个只能在协程函数中使用的关键字,用于遇到IO操作时挂起 当前协程(任务),当前协程(任务)挂起过程中 事件循环可以去执行其他的协程(任务),当前协程IO处理完成时,可以再次切换回来执行await之后的代码。代码如下:
     6 
     7 # # 示例1:
     8 
     9 # import asyncio
    10 # async def func():
    11 #     print("执行协程函数内部代码")
    12 #     # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。
    13 #     # 当前协程挂起时,事件循环可以去执行其他协程(任务)。
    14 #     response = await asyncio.sleep(2)
    15 #     print("IO请求结束,结果为:", response)
    16 # result = func()
    17 # asyncio.run(result)
    18 
    19 # 示例2:
    20 # import asyncio
    21 # async def others():
    22 #     print("start")
    23 #     await asyncio.sleep(2)
    24 #     print('end')
    25 #     return '返回值'
    26 # async def func():
    27 #     print("执行协程函数内部代码")
    28 #     # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
    29 #     response = await others()
    30 #     print("IO请求结束,结果为:", response)
    31 # asyncio.run( func() )
    32 
    33 # 示例3:
    34 import asyncio
    35 async def others():
    36     print("start")
    37     await asyncio.sleep(2)
    38     print('end')
    39     return '返回值'
    40 async def func():
    41     print("执行协程函数内部代码")
    42     # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
    43     response1 = await others()
    44     print("IO请求结束,结果为:", response1)
    45     response2 = await others()
    46     print("IO请求结束,结果为:", response2)
    47 asyncio.run( func() )
    awit
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 
     4 import asyncio
     5 async def func():
     6     print(1)
     7     await asyncio.sleep(2)
     8     print(2)
     9     return "返回值"
    10 async def main():
    11     print("main开始")
    12     # 创建协程,将协程封装到Task对象中并添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    13     # 在调用
    14     task_list = [
    15         asyncio.create_task(func()),
    16         asyncio.create_task(func())
    17     ]
    18     print("main结束")
    19     # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
    20     # 此处的await是等待所有协程执行完毕,并将所有协程的返回值保存到done
    21     # 如果设置了timeout值,则意味着此处最多等待的秒,完成的协程返回值写入到done中,未完成则写到pending中。
    22     done, pending = await asyncio.wait(task_list, timeout=None)
    23     print(done, pending)
    24 
    25 asyncio.run(main())
    tasks
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 
     4 import time
     5 from concurrent.futures import Future
     6 from concurrent.futures.thread import ThreadPoolExecutor
     7 from concurrent.futures.process import ProcessPoolExecutor
     8 def func(value):
     9     time.sleep(1)
    10     print(value)
    11 pool = ThreadPoolExecutor(max_workers=5) # 创建线程池
    12 # 或 pool = ProcessPoolExecutor(max_workers=5) 3 创建进程池
    13 for i in range(10):
    14     fut = pool.submit(func, i)
    15     print(fut)
    future.Future对象
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 
     4 
     5 # 此种对象通过定义 __aenter__() 和 __aexit__() 方法来对 async with 语句中的环境进行控制。由 PEP 492 引入。
     6 import asyncio
     7 class AsyncContextManager:
     8     def __init__(self):
     9         self.conn = None
    10     async def do_something(self):
    11         # 异步操作数据库
    12         return 666
    13     async def __aenter__(self):
    14         # 异步链接数据库
    15         self.conn = await asyncio.sleep(1)
    16         return self
    17     async def __aexit__(self, exc_type, exc, tb):
    18         # 异步关闭数据库链接
    19         await asyncio.sleep(1)
    20 async def func():
    21     async with AsyncContextManager() as f:
    22         result = await f.do_something()
    23         print(result)
    24 asyncio.run(func())
    异步上下文管理器
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 
     4 # pip3 install uvloop
     5 # 在项目中想要使用uvloop替换asyncio的事件循环也非常简单,只要在代码中这么做就行。
     6 
     7 import asyncio
     8 import uvloop
     9 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    10 # 编写asyncio的代码,与之前写的代码一致。
    11 # 内部的事件循环自动化会变为uvloop
    12 asyncio.run(...)
    提高效率uvloop
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 
     4 # 安装Python异步操作redis模块
     5 # pip3 install aioredis
     6 
     7 # 示例1:异步操作redis。
     8 
     9 #!/usr/bin/env python
    10 # -*- coding:utf-8 -*-
    11 
    12 # import asyncio
    13 # import aioredis
    14 # async def execute(address, password):
    15 #     print("开始执行", address)
    16 #     # 网络IO操作:创建redis连接
    17 #     redis = await aioredis.create_redis(address, password=password)
    18 #     # 网络IO操作:在redis中设置哈希值car,内部在设三个键值对,即: redis = { car:{key1:1,key2:2,key3:3}}
    19 #     await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    20 #     # 网络IO操作:去redis中获取值
    21 #     result = await redis.hgetall('car', encoding='utf-8')
    22 #     print(result)
    23 #     redis.close()
    24 #     # 网络IO操作:关闭redis连接
    25 #     await redis.wait_closed()
    26 #     print("结束", address)
    27 # asyncio.run(execute('redis://47.93.4.198:6379', "root!2345"))
    28 
    29 
    30 # 示例2:连接多个redis做操作(遇到IO会切换其他任务,提供了性能)。
    31 
    32 import asyncio
    33 import aioredis
    34 async def execute(address, password):
    35     print("开始执行", address)
    36     # 网络IO操作:先去连接 47.93.4.197:6379,遇到IO则自动切换任务,去连接47.93.4.198:6379
    37     redis = await aioredis.create_redis_pool(address, password=password)
    38     # 网络IO操作:遇到IO会自动切换任务
    39     await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    40     # 网络IO操作:遇到IO会自动切换任务
    41     result = await redis.hgetall('car', encoding='utf-8')
    42     print(result)
    43     redis.close()
    44     # 网络IO操作:遇到IO会自动切换任务
    45     await redis.wait_closed()
    46     print("结束", address)
    47 
    48 task_list = [
    49     execute('redis://47.93.4.197:6379', "root!2345"),
    50     execute('redis://47.93.4.198:6379', "root!2345")
    51 ]
    52 asyncio.run(asyncio.wait(task_list))
    异步redis
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 
     4 # 当通过python去操作MySQL时,连接、执行SQL、关闭都涉及网络IO请求,使用asycio异步的方式可以在IO等待时去做一些其他任务,从而提升性能。
     5 #
     6 # 安装Python异步操作redis模块
     7 # pip3 install aiomysql
     8 
     9 # # 示例1:
    10 #
    11 # import asyncio
    12 # import aiomysql
    13 # async def execute():
    14 #     # 网络IO操作:连接MySQL
    15 #     conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )
    16 #     # 网络IO操作:创建CURSOR
    17 #     cur = await conn.cursor()
    18 #     # 网络IO操作:执行SQL
    19 #     await cur.execute("SELECT Host,User FROM user")
    20 #     # 网络IO操作:获取SQL结果
    21 #     result = await cur.fetchall()
    22 #     print(result)
    23 #     # 网络IO操作:关闭链接
    24 #     await cur.close()
    25 #     conn.close()
    26 # asyncio.run(execute())
    27 
    28 # 示例2:
    29 
    30 #!/usr/bin/env python
    31 # -*- coding:utf-8 -*-
    32 import asyncio
    33 import aiomysql
    34 async def execute(host, password):
    35     print("开始", host)
    36     # 网络IO操作:先去连接 47.93.40.197,遇到IO则自动切换任务,去连接47.93.40.198:6379
    37     conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
    38     # 网络IO操作:遇到IO会自动切换任务
    39     cur = await conn.cursor()
    40     # 网络IO操作:遇到IO会自动切换任务
    41     await cur.execute("SELECT Host,User FROM user")
    42     # 网络IO操作:遇到IO会自动切换任务
    43     result = await cur.fetchall()
    44     print(result)
    45     # 网络IO操作:遇到IO会自动切换任务
    46     await cur.close()
    47     conn.close()
    48     print("结束", host)
    49 task_list = [
    50     execute('47.93.40.197', "root!2345"),
    51     execute('47.93.40.197', "root!2345")
    52 ]
    53 asyncio.run(asyncio.wait(task_list))
    异步mysql
     1 #!\Users\Local\Programs\Python37
     2 # -*- coding: utf-8 -*-
     3 
     4 # 爬虫
     5 # 在编写爬虫应用时,需要通过网络IO去请求目标数据,这种情况适合使用异步编程来提升性能,接下来我们使用支持异步编程的aiohttp模块来实现。
     6 # 安装aiohttp模块
     7 # pip3 install aiohttp
     8 # 示例:
     9 
    10 import aiohttp
    11 import asyncio
    12 async def fetch(session, url):
    13     print("发送请求:", url)
    14     async with session.get(url, verify_ssl=False) as response:
    15         text = await response.text()
    16         print("得到结果:", url, len(text))
    17 async def main():
    18     async with aiohttp.ClientSession() as session:
    19         url_list = [
    20             'https://python.org',
    21             'https://www.baidu.com',
    22             'https://www.pythonav.com'
    23         ]
    24         tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
    25         await asyncio.wait(tasks)
    26 if __name__ == '__main__':
    27     asyncio.run(main())
    爬虫

     

  • IO 模型
    阻塞io
     1 from socket import *
     2 
     3 client=socket(AF_INET,SOCK_STREAM)
     4 client.connect(('127.0.0.1',8080))
     5 
     6 
     7 while True:
     8     msg=input('>>: ').strip()
     9     if not msg:continue
    10     client.send(msg.encode('utf-8'))
    11     data=client.recv(1024)
    12     print(data.decode('utf-8'))
    13 
    14 client.close()
    客户端
     1 from socket import *
     2 from threading import Thread
     3 
     4 def communicate(conn):
     5     while True:
     6         try:
     7             data = conn.recv(1024)
     8             if not data: break
     9             conn.send(data.upper())
    10         except ConnectionResetError:
    11             break
    12 
    13     conn.close()
    14 
    15 
    16 
    17 server = socket(AF_INET, SOCK_STREAM)
    18 server.bind(('127.0.0.1',8080))
    19 server.listen(5)
    20 
    21 while True:
    22     print('starting...')
    23     conn, addr = server.accept()
    24     print(addr)
    25 
    26     t=Thread(target=communicate,args=(conn,))
    27     t.start()
    28 
    29 server.close()
    服务端

    非阻塞io

     1 from socket import *
     2 
     3 client=socket(AF_INET,SOCK_STREAM)
     4 client.connect(('127.0.0.1',8083))
     5 
     6 
     7 while True:
     8     msg=input('>>: ').strip()
     9     if not msg:continue
    10     client.send(msg.encode('utf-8'))
    11     data=client.recv(1024)
    12     print(data.decode('utf-8'))
    13 
    14 client.close()
    客户端
     1 from socket import *
     2 
     3 server = socket(AF_INET, SOCK_STREAM)
     4 server.bind(('127.0.0.1',8083))
     5 server.listen(5)
     6 server.setblocking(False)
     7 print('starting...')
     8 
     9 
    10 rlist=[]
    11 wlist=[]
    12 while True:
    13 
    14     try:
    15         conn, addr = server.accept()
    16         rlist.append(conn)
    17         print(rlist)
    18     except BlockingIOError:
    19         # print('干其他的活')
    20 
    21         #收消息
    22         del_rlist = []
    23         for conn in rlist:
    24             try:
    25                 data=conn.recv(1024)
    26                 if not data:
    27                     del_rlist.append(conn)
    28                     continue
    29                 wlist.append((conn,data.upper()))
    30             except BlockingIOError:
    31                 continue
    32             except Exception:
    33                 conn.close()
    34                 del_rlist.append(conn)
    35 
    36         #发消息
    37         del_wlist=[]
    38         for item in wlist:
    39             try:
    40                 conn=item[0]
    41                 data=item[1]
    42                 conn.send(data)
    43                 del_wlist.append(item)
    44             except BlockingIOError:
    45                 pass
    46 
    47         for item in del_wlist:
    48             wlist.remove(item)
    49 
    50         for conn in del_rlist:
    51             rlist.remove(conn)
    52 
    53 
    54 server.close()
    服务端

    多路复用IO (select的使用)

     1 from socket import *
     2 
     3 client=socket(AF_INET,SOCK_STREAM)
     4 client.connect(('127.0.0.1',8083))
     5 
     6 
     7 while True:
     8     msg=input('>>: ').strip()
     9     if not msg:continue
    10     client.send(msg.encode('utf-8'))
    11     data=client.recv(1024)
    12     print(data.decode('utf-8'))
    13 
    14 client.close()
    客户端
     1 from socket import *
     2 import select
     3 
     4 server = socket(AF_INET, SOCK_STREAM)
     5 server.bind(('127.0.0.1',8083))
     6 server.listen(5)
     7 server.setblocking(False)
     8 print('starting...')
     9 
    10 rlist=[server,]
    11 wlist=[]
    12 wdata={}
    13 
    14 while True:
    15     rl,wl,xl=select.select(rlist,wlist,[],0.5)
    16     print('rl',rl)
    17     print('wl',wl)
    18 
    19     for sock in rl:
    20         if sock == server:
    21             conn,addr=sock.accept()
    22             rlist.append(conn)
    23         else:
    24             try:
    25                 data=sock.recv(1024)
    26                 if not data:
    27                     sock.close()
    28                     rlist.remove(sock)
    29                     continue
    30                 wlist.append(sock)
    31                 wdata[sock]=data.upper()
    32             except Exception:
    33                 sock.close()
    34                 rlist.remove(sock)
    35 
    36     for sock in wl:
    37         data=wdata[sock]
    38         sock.send(data)
    39         wlist.remove(sock)
    40         wdata.pop(sock)
    41 
    42 server.close()
    服务端

     

标签:await,asynico,第五篇,IO,print,import,def,asyncio
来源: https://www.cnblogs.com/huahuawang/p/14725537.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有