ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

python – 如何连接不断生成和使用数据的asyncio.coroutines?

2019-06-09 14:43:19  阅读:277  来源: 互联网

标签:python python-3-4 python-asyncio


我正在尝试学习如何(惯用)使用Python 3.4的asyncio.我最大的绊脚石是如何“链接”连续消耗数据的协同程序,用它更新状态,并允许该状态被另一个协同程序使用.

我期望从这个示例程序中观察到的行为只是定期报告从子进程接收的数字总和.报告应该以与Source对象从子进程接收数字大致相同的速率发生.报告功能中的IO阻塞不应阻止从子进程读取.如果报告功能阻塞的时间长于从子进程读取的迭代次数,我不关心它是否会向前跳过或立即报告一堆;但是,在足够长的时间范围内,应该有尽可能多的reporter()迭代次数和expect_exact().

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)

        p.terminate()

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source # ???
        print("New sum is: {:d}".format(new_sum))
        # Potentially some other blocking operation
        yield from limited_throughput.write(new_sum)

def main():
    loop = asyncio.get_event_loop()

    source = Source()
    loop.call_later(1, source.start)
    loop.call_later(11, source.stop)

    # Again, not sure what goes here...
    asyncio.async(reporter(source))

    loop.run_until_complete(source.run())
    loop.close()

if __name__ == '__main__':
    main()

这个例子需要从git安装pexpect;您可以轻松地将run()替换为:

@asyncio.coroutine
def run(self):
    yield from self.flag.wait()

    while self.flag.is_set():
        value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
        self.sum += value

但是我感兴趣的真正的子进程需要在pty中运行,我认为这意味着asyncio中提供的子进程传输/协议框架不足以实现这一点.关键是异步活动的源是一个可以与yield一起使用的协同程序.

请注意,此示例中的reporter()函数不是有效代码;我的问题是我不知道那里应该去做什么.理想情况下,我想将reporter()代码与run()分开;这个问题的关键在于如何使用asyncio中的组件将更复杂的程序分解为更小的代码单元.

有没有办法用asyncio模块构建这种行为?

解决方法:

asyncio中的锁定原语和队列本身提供了一些执行此操作的机制.

条件

asyncio.Condition()提供了一种通知条件的方法.如果丢弃某些事件并不重要,请使用此选项.

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

        # For consumers
        self.ready = asyncio.Condition()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)
            with (yield from self.ready):
                self.ready.notify_all() # Or just notify() depending on situation

        p.terminate()

    @asyncio.coroutine
    def read(self):
        with (yield from self.ready):
            yield from self.ready.wait()
            return self.sum


@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff in here

队列

asyncio.Queue()允许您将数据放入队列(LIFO或FIFO)并从中读取其他内容.如果您绝对想要回复每个事件,即使您的消费者落后(及时),请使用此选项.请注意,如果限制队列的大小,如果您的消费者足够慢,您的生产者最终会阻止.

请注意,这允许我们将sum转换为局部变量.

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        # NOTE: self.sum removed!

        # For consumers
        self.output = asyncio.Queue()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        sum = 0

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            sum += int(p.before)
            yield from self.output.put(sum)

        p.terminate()

    @asyncio.coroutine
    def read(self):
        return (yield from self.output.get())

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff here

请注意,Python 3.4.4将task_done()和join()方法添加到Queue中,以便在您知道使用者完成后(适用时)优雅地完成所有内容的处理.

标签:python,python-3-4,python-asyncio
来源: https://codeday.me/bug/20190609/1205438.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有