ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Celery--介绍与使用

2022-03-09 01:32:41  阅读:244  来源: 互联网

标签:celery task -- app py 介绍 Celery 任务 print


1 Celery介绍与架构

# Celery:芹菜(跟翻译没有任何关系),分布式异步任务框架,框架(跟其他web框架无关)

# 官方不支持windows: 
    Celery is a project with minimal funding, so we don’t support Microsoft Windows. 
    Please don’t open any issues related to that platform.

# 架构:
    -broker:任务中间件,用户提交的任务,存在这个里面(redis,rabbitmq)
    -worker:任务执行者,消费者,真正执行任务的进程(真正干活的人)
    -backend:任务结果存储,任务执行后的结果(redis,rabbitmq)
	
# celery服务为为其他项目服务提供异步解决任务需求的

# celery能够做的事:
    -异步任务(区分同步任务)
    -延迟任务(其他框架也可以做 apschedule)
    -定时任务(其他框架也可以做 apschedule)
    
实质:三种任务,跟worker执行没有关系(worker反正是拿到任务就执行),只是提交任务时的不同,
      异步任务:立即提交执行;延迟任务:延迟提交执行;定时任务:间隔循环提交执行
    
# 更好的理解celery
注:会有两个服务同时运行,一个是项目服务(django服务),一个是celery服务,
    项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)
    正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
    人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

2 celery简单使用

# 安装:pip install celery==5.1.2

2.1 制造任务+初始配置

# celery_task.py:

from celery import Celery

# backend='redis://:密码@127.0.0.1:6379/1'  格式:redis协议://:密码@ip:端口/库
broker = 'redis://127.0.0.1:6379/1'  # redis地址
backend = 'redis://127.0.0.1:6379/2'  # redis地址

# 1 实例化得到celery对象
# app=Celery('test',)
app = Celery(__name__, backend=backend, broker=broker)


# 2 写一堆任务函数(计算a+b,挖井,砍树), 再使用装饰器包裹任务(监听函数)
@app.task()
def add(a, b):
    import time
    time.sleep(2)
    return a + b

2.2 提交任务+启动worker

import celery_task

# 1 同步执行
res = celery_task.add(2, 3)  # 普通的同步执行任务 (直接调任务函数执行就行)
print(res)

# 2 异步执行:

# 第一步:提交(方式:任务函数名.apply_async(参数))    返回值是任务的id号,唯一标识这个任务
# res = celery_task.add.apply_async(args=[2, 3])
res = celery_task.add.apply_async(kwargs={'a':2,'b':3})  # 参数可以列表或关键字形式
print(res)  # abab1ad3-0e58-4faa-bc05-14d157dc8217

# 第二步:通过命令启动worker,让worker执行任务--->结果存到redis
# 位置:需要来到,app对象所在的文件(celery_task)目录 启动


# 非windows
# 5.x之前
:celery worker -A celery_task -l info   # '-l info' 表示日志打印级别
# 5.x以后
:celery -A celery_task worker  -l info

# windows:需要按照第三方模块eventlet  pip3 install eventlet
# 5.x之前
: celery worker -A celery_task -l info -P eventlet
# 5.x以后
:celery -A celery_task worker  -l info -P eventlet    # celery_task 是app对象所在的文件

# 第三步:查看任务执行结果

2.3 查看任务结果

# 可以在任意 需要查看结果 的位置执行

from celery_task import app
from celery.result import AsyncResult


id = 'abab1ad3-0e58-4faa-bc05-14d157dc8217'  # id: 提交任务时,返回的任务id号


if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        print('任务执行成功了')
        result = a.get()  # 异步任务执行的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

3 celery包结构使用(常用)

# 目录结构
    -celery_task              # 包名
    	__init__.py
    	celery.py             # app所在py文件  (名字必须叫celery,不然 命令启动worker时,找不到)
    	course_task.py        # 任务
    	order_task.py         # 任务
    	user_task.py          # 任务
    提交任务.py                # 提交任务
    查看结果.py                # 查看结果

3.1 celery_task /celery.py

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include  是一个列表,放被管理的任务 task的py文件
app = Celery(__name__, backend=backend, broker=broker,include=[
    'celery_task.course_task',   # 注意格式:包名.任务 (不用加后缀)
    'celery_task.order_task',
    'celery_task.user_task',
])

# 原来,任务写在这个py文件中

# 后期任务非常多,可能有用户相关任务,课程相关任务,订单相关任务。。。

3.2 celery_task /任务.py

# user_task.py
import time
from .celery import app

# 发送短信任务
@app.task()
def send_sms(phone, code):
    time.sleep(3)  # 模拟发送短信延迟
    print('短信发送成功,手机号是:%s,验证码是:%s' % (phone, code))
    return '短信发送成功'



# order_task.py
from .celery import app
# 生成订单任务
@app.task()
def make_order():
    with open(r'D:\py18\luffy_api\script\2 celery的包结构\celery_task\order.txt', 'a', encoding='utf-8') as f:
        f.write('生成一条订单\n')
    return True



# course_task.py
from .celery import  app

@app.task()
def add(a,b):
    return a+b

3.3 启动worker

# 位置:需要来celery_task包所在的目录,不需要走到里面app对象所在的文件(celery),会自动找到包里的celery文件

celery -A celery_task worker  -l info -P eventlet    # celery_task 是包含celery文件的包名字

3.4 提交任务.py

from celery_task import user_task,order_task

# 提交一个发送短信任务
res = user_task.send_sms.apply_async(args=['18972374345', '8888'])
print(res)


# 提交一个生成订单任务
res=order_task.make_order.apply_async()
print(res)

3.5 查看结果.py

from celery_task.celery import app
from celery.result import AsyncResult

id = '0f283e22-e8d0-40a6-a8ed-8998038bc7a3'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    print(a.conf)
    if a.successful():
        print('任务执行成功了')
        result = a.get()  # 异步任务执行的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

4 celery异步任务与延迟任务

4.1 执行异步任务

# 方式一:不写时间,就表示立即执行
user_task.send_sms.apply_async(args=('12345566677', '8888'))

# 方式二:
res=user_task.send_sms.delay('12345566677', '8888')

4.2 执行延迟任务

# 注意
    1 本质:跟异步任务,只是提交任务时,有所不同,跟worker没关系
    2 时间:需要使用东1区(utc)的时间
    
from datetime import datetime, timedelta

# datetime.utcnow()  获取当前的utc时间
eta=datetime.utcnow() + timedelta(seconds=50) #  50s后的utc时间

# 提交 发送短信 的任务,50s后执行
res=user_task.send_sms.apply_async(args=(200, 50), eta=eta)  # 加上参数 eta=具体时间
print(res)

5 celery定时任务

5.1 第一步:celey.py中写入

# 第一步,在包(celery_task)下的celey.py中写入

# 修改celery的配置信息    app.conf是整个celery类app对象的配置信息
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False


from datetime import timedelta
from celery.schedules import crontab      # schedule: n.日程安排 ;  crontab:n.定时任务
# 配置定时任务
app.conf.beat_schedule = {
    'send_sms_every_3_seconds': {
        'task': 'celery_task.user_task.send_sms',  # 指定执行的是哪个任务
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('18953675221', '8888'),
    },
    'make_order_every_5_seconds': {
        'task': 'celery_task.order_task.make_order',  # 指定执行的是哪个任务
        'schedule': timedelta(seconds=5),
    },
    'add_every_1_seconds': {
        'task': 'celery_task.course_task.add',  # 指定执行的是哪个任务
        'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (3, 5),
    },
}

5.2 第二步:启动worker

# celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task -l info -P eventlet

# 如果beat没有启动,worker是没有活干的,需要启动beat,worker才能干活,和beat启动顺序无先后

5.3 第三步:启动beat

# 本质:启动了一个beat进程,定时的去自动提交任务给 worker

celery beat -A celery_task -l 

6 django中集成celery

# 0 了解
    -django-celery  # 第三方把django和celery集成起来,方便我们使用,但是,第三方写的包版本,跟celery和django版本必须完全对应,就很麻烦
    	
    -我们自己使用包结构集成到django中

# 第一步,把写好的包,直接复制到项目根路径
# 第二步,在视图类中(函数中)提交任务
from celery_task.user_task import send_sms
def test(request):
    mobile = request.GET.get('mobile')
    code = '9999'
    res = send_sms.delay(mobile, code)  # 同步发送假设3分钟,异步发送,直接就返回id了,是否成功不知道,后期通过id查询
    print(res)

    return HttpResponse(res)

7 双写一致性问题

# redis缓存和mysql数据不同步
# 缓存更新策略
    -先更新数据库,再更新缓存(可靠性高一些)
    -先更新数据库,再删缓存(可靠性高一些)
    
    -先删缓存,再更新数据库(缓存删了,数据库还没更新,来了一个请求,缓存了老数据)
    
    -定时更新(对实时性要求不高)
    	-每隔12个小时,更新一下缓存    
        
        
回答:
    先解释下什么是双写一致性问题?就是redis缓存和mysql数据出现了不同步。
    怎么解决?有很多缓存更新策越,一些实时性不重要的就弄一个定时更新缓存,
    实时要求高的:一开始是第三种,后面绝对不是很可靠,又用的 先更新数据库,在更新缓存就行了。

标签:celery,task,--,app,py,介绍,Celery,任务,print
来源: https://www.cnblogs.com/Edmondhui/p/15983330.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有