ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Celery

2021-08-30 01:31:48  阅读:193  来源: 互联网

标签:task redis Celery 任务 result print


Celery简介

Celery是什么

  1. Celery是python中使用比较多的并行分布式框架
  2. Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
  3. Celery专注于实时处理的异步任务队列
  4. Celery同时也支持任务调

Celery使用场景

celery是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)定时任务(crontab)

  • 异步任务: 将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
  • 定时任务: 定时执行某件事情,比如每天数据统计

Celery工作流程

Celery核心组件

Celery的架构由三部分组成,消息中间件(Broker),任务执行单元(Worker)和任务执行结果存储(Result)组成。

  • 消息中间件(Broker)
    Broker负责创建任务队列,根据一些路由规则将任务分派到任务队列,然后将任务从任务队列交付给worker
  • 任务执行单元(Worker)
    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中,运行后台作业的进程
  • 任务结果存储(Result)
    Result用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis等

另外: Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

其他: Celery还支持不同的并发、序列化和压缩的手段

  • 并发:prefork、eventlet、gevent、threads
  • 序列化:pickle、json、yaml、msgpack 等
  • 压缩:zlib,、bzip2

Celery安装相关软件

备注:我是在windows10环境下做的,Linux应该大同小异

Celery安装

pip install celery

其他安装

这里有个坑.win10系统启动worker报错ValueError: not enough values to unpack (expected 3, got 0),解决办法:

pip install eventlet

Django中使用的时候,报错 AttributeError: ‘str’ object has no attribute ‘items’。redis版本太高,降低版本 pip install redis==2.10.6

pip install redis==2.10.6

Celery代码实现

Celery基本使用方法

备注:在这里,我使用的是redis作为消息中间件

目录结构

代码实现

# app.py
from task import add

if __name__ == '__main__':
    print("Start Task ...")
    result = add.delay(2, 8)
    print("result:",result)             # 存到redis之后,返回的id
    print("result_id:",result.id)       # 存到redis之后,返回的id
    print("result:", result.get())      # 方法返回值
    print("End Task ...")
# task.py

import time
from celery import Celery

# 实例化一个Celery
broker = 'redis://ip:6379/1'
backend = 'redis://ip:6379/2'


# 参数1 自动生成任务名的前缀
# 参数2 broker 是我们的redis的消息中间件
# 参数3 backend 用来存储我们的任务结果的
app = Celery('my_task', broker=broker, backend=backend)


# 加入装饰器变成异步的函数
@app.task
def add(x, y):
    print('Enter call function ...')
    time.sleep(4)
    return x + y


if __name__ == '__main__':
    # 这里生产的任务不可用,导入的模块不能包含task任务。会报错
    print("Start Task ...")
    result = add.delay(2, 8)
    print("result:", result)
    print("End Task ...")

终端启动服务

celery -A task worker -l info -P eventlet
  • A :参数指定celery对象的位置
  • l :参数指定worker的日志级别

服务启动后的展示

运行代码,app.py

备注:不要运行task.py,会报错

这个时候可以看终端,看看请求

检验这个id的值

新增检查文件 check.py

# check.py

from celery.result import AsyncResult
from task import app

async_result=AsyncResult(id="455d6ad7-39cc-4e94-9fa9-456ae49cdd97", app=app)

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

标签:task,redis,Celery,任务,result,print
来源: https://www.cnblogs.com/dongye95/p/15204258.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有