ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

理解python中的yield、yield在协程中的作用以及实现一个简单的事件循环

2022-06-29 17:31:20  阅读:200  来源: 互联网

标签:task 协程 python self yield future result ._ def


Future 和 Task对象

import uuid


class Future:
    def __init__(self, loop):
        self._result = None
        self._done = False
        self._callbacks = []
        self._loop = loop

    # 给_result 属性赋值,_result 的值结束耗时操作返回的数据
    def set_result(self, data):
        if self._done:
            raise RuntimeError("Future 对象不能重复设置值")
        self._done = True
        self._result = data

        if isinstance(data, Future):
            self._result = data._result

        for callback in self._callbacks:
            self._loop.add_ready_task(callback)

    # 获取 future对象中的result 值
    def result(self):
        if self._done:
            return self._result
        raise RuntimeError("Future对象 值结果还没就绪")

    # await 等待
    def __await__(self):
        # yield 在异步协程中的作用就是:当执行到调用系统发起io操作后,暂停函数的执行,
        # 将当前 future 对象返回,并让出执行权
        yield self
        return self._result

    # 添加回调事件,当 set_result方法被调用的时候,将协程的回调对象放到事件循环中进行执行
    def add_done_callback(self, callback, *args):
        self._callbacks.append(callback)


class Task(Future):
    def __init__(self, core, loop):
        super(Task, self).__init__(loop)
        # core 就是一个协程任务
        self.core = core
        self._task_id = uuid.uuid4()
        self._future = None

    # run方法相当于启动器,启动协程任务函数,io耗时操作都必须与future对象进行关联,当执行到 await future对象的时候
    # await 触发future 对象中的 __await__ 方法,yield 暂停函数执行,并返回当前future对象,
    #  t = self.core.send(Node)执行结束, 此时 future 是执行io操作的Future 对象
    def run(self):
        try:
            print(f"{self._task_id} 任务 开始执行")
            future = self.core.send(None)
        except StopIteration:
            self.set_result(self._future)
            print(f"{self._task_id} 协程任务 执行结束")
            print("-" * 50)
        # 当 self.core 第一次send的时候不会出现报错,并将执行io操作中的future对象返回回来,
        # future 对象中执行io操作的地方与系统进行交换,当io操作执行完成后会调用future 对象中的 set_result 方法,
        # set_result 方法 将io结果挂到future 属性中,并将回调函数重新放到事件循环中进行执行
        else:
            print(f"{self._task_id} 任务 执行到io耗时操作,将执行权让出去,设置io回调通知")
            print("-" * 50)
            future.add_done_callback(self)
            self._future = future

EventLoop 事件循环对象

import collections
import heapq
import time
from random import random, randint
from threading import Thread

from async_future_task import Future, Task


class EventLoop:
    loop = None

    # 单例,事件循环只能有一个
    def __new__(cls, *args, **kwargs):
        if not cls.loop:
            cls.loop = super().__new__(cls)
        return cls.loop

    def __init__(self):
        # 已经准备好可以运行的任务队列
        self._ready_que = collections.deque()

        # 延时任务列表
        self._scheduled = []

        self.stop = False

    # 创建协程任务对象,并添加到可执行队列中
    def create_task(self, core, *args):
        task = Task(core, self)
        self._ready_que.append(task)
        return task

    # 添加任务到延时任务队列中
    def add_delay_task(self, delay, callback, *args):
        t = time.time() + delay
        heapq.heappush(self._scheduled, (t, callback, args))

    # 添加可执行的任务到任务队列中, 这个函数主要是给future对象进行添加回调任务
    def add_ready_task(self, task, *args):
        self._ready_que.append(task)

    def run_forever(self):
        while True:
            self.exec_task()
            if self.stop and len(self._scheduled) == 0 and len(self._ready_que) == 0:
                break

    def stop_exit(self):
        self.stop = True

    # 执行任务
    def exec_task(self):
        t = time.time()
        len_scheduled = len(self._scheduled)

        for i in range(len_scheduled):
            task = heapq.heappop(self._scheduled)
            if task[0] <= t:
                self._ready_que.append((task[1], task[2]))
            else:
                heapq.heappush(self._scheduled, task)
                break

        len_ready = len(self._ready_que)
        for i in range(len_ready):
            task = self._ready_que.popleft()
            # 如果是task 是 Task 对象的话就执行 run方法
            if isinstance(task, Task):
                task.run()
            # 如果不是Task对象的话 就把task当做函数来执行
            else:
                task[0](*task[1])


# 这是用户层, 用户只需要 await 框架的异步方法就可以了,
# 不需要关系框架底部是如何实现的
async def get_baidu():
    # 在调用fake_io 后等待future 对象,此时会触发 future 对象中的 __await__ 方法,又因为 __await__
    # 方法中有 yield , 它会暂停函数的执行,返回future本身对象
    data = await aiohttp_request_url()
    print("异步任务结束, io操作获取到的值是: ", data)
    return data


# aiohttp_request_url 模拟的是异步 http请求,
# 该方法模拟的是框架封装好的、执行调用系统io的步骤
async def aiohttp_request_url():
    # 创建future 等待对象
    future = Future(loop)

    # 执行io耗时操作,此时并不等待,只调用,不等待,将耗时操作托管给系统,
    # 系统执行完io耗时操作,自动回调future set_result 方法, fake_io 模拟调用系统发起io操作,系统自动回调结果
    fake_io(future)

    data = await future
    # 可以在await 获取到data 或进行一些数据的处理

    return data


def fake_io(future):
    def sleep():
        global task_run_time

        # 随机休眠 0-1秒
        task_time = random()
        task_run_time += task_time
        time.sleep(task_time)
        # io耗时操作执行完成,模拟系统回调 set_result 方法,给future对象设置随机值
        data = randint(1, 10)
        future.set_result(data)

    Thread(target=sleep).start()


loop = EventLoop()

start_time = time.time()
task_run_time = 0

for _ in range(1000):
    loop.create_task(get_baidu())

loop.add_delay_task(2, loop.stop_exit)
loop.run_forever()
print(f"所有任务执行时间:{task_run_time}, 实际执行时间{time.time() - start_time}")

理解图

请添加图片描述
参考b站大佬 DavyCloud asyncio系列教程

标签:task,协程,python,self,yield,future,result,._,def
来源: https://www.cnblogs.com/tnan/p/16424272.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有