ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

celery框架

2020-01-10 19:56:30  阅读:251  来源: 互联网

标签:task 框架 app redis celery 任务 import


[toc]

celery框架:

介绍:

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

  Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

	celery 组成: broker   |   worker    |   backend

    消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。

  任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

  任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。

    
使用场景:
	Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。
    还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。
    延时任务:解决延迟任务
    

注意: RabbitMQ : 异步的消息队列 (线上使用)

celery + redis :

环境搭建:

配置:
	pip install celery
    pip install django-redis
    # Windows中还需要安装以下模块,用于任务执行单元
    pip install eventlet
    
redis-- settings.py:
    CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 100}
            # "PASSWORD": "123",
        }
    }
}


任务结构:

创建文件目录结构:
	pro_cel
    ├── celery_task# celery相关文件夹
    │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
    │   └── tasks1.py    #  所有任务函数
    │   └── tasks2.py    #  所有任务函数
    ├── check_result.py # 检查结果
    └── send_task.py    # 触发任务
    
 注意,检查结果与触发任务的模块不能写在celery_task模块中,不然会报导入celery的错误。

celery框架工作流程
    1)创建Celery框架对象app,配置broker和backend,得到的app就是worker
    2)给worker对应的app添加可处理的任务函数,用include配置给worker的app
    3)完成提供的任务的定时配置app.conf.beat_schedule
    4)启动celery服务,运行worker,执行任务
    5)启动beat服务,运行beat,添加任务

任务实现:

基本使用:
	1. 创建环境: celery 环境搭建
	2.创建app  + 任务
    	(创建: broker  + app  + backend)
	3.执行任务:
    	# 非windows
        celery worker -A celery_task -l info
        # windows:
        pip3 install eventlet
        celery worker -A celery_task -l info -P eventlet
   4.添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本

   5.获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本

eg:
    from celery import Celery
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    
task.py:
    from .celery import app
    @app.task
    def add(n, m)
    	pass

定时任务

celery.py:

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

# 消息中间件,密码是你redis的密码
# broker='redis://:123456@127.0.0.1:6379/2' 密码123456
broker = 'redis://127.0.0.1:6379/0'  # 无密码
# 任务结果存储
backend = 'redis://127.0.0.1:6379/1'

# 生成celery对象,'task'相当于key,用于区分celery对象(任意名)
# include参数需要指定任务模块
app = Celery('task', broker=broker, backend=backend, include=[
    'celery_task.add_task',
    'celery_task.send_email'
])

# 时区 (可设置任意一个)
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 定时执行
app.conf.beat_schedule = {
    # 名字随意命名
    'add-every-5-seconds': {
        # 执行add_task下的addy函数
        'task': 'celery_task.add_task.add',
        # 每10秒执行一次
        'schedule': timedelta(seconds=10),
        # add函数传递的参数
        'args': (1, 2)
    },
    'add-every-10-seconds': {
        'task': 'celery_task.add_task.add',
        # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
        'schedule': crontab(minute=5),
        'args': (1, 2)
    }
}

send_msg.py:

#项目配置

# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com'  # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '1504703554@qq.com'  # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = '授权码'  # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件,收件人处就会这样显示
# DEFAULT_FROM_EMAIL = '2333<'1504703554@qq.com>'
EMAIL_USE_SSL = True   # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True


import os
if __name__ == "celery_task.send_email":
    # 因为需要用到django中的内容,所以需要配置django环境
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "do_celery.settings")
    import django
    django.setup()
    # 导入celery对象app
    from celery_task.celery import app
    from app01 import models
    # 导入django自带的发送邮件模块
    from django.core.mail import send_mail
    import threading


	@app.task
    def send_email1(id):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作
        # 此处的id由用户注册的视图函数中传入
        user_obj = models.UserInfo.objects.filter(pk=id).first()
        email = user_obj.email
        # 启用线程发送邮件,此处最好加线程池
        t = threading.Thread(target=send_mail, args=(
            "激活邮件,点击激活账号",  # 邮件标题
            '点击该邮件激活你的账号,否则无法登陆',  # 给html_message参数传值后,该参数信息失效
            settings.EMAIL_HOST_USER,  # 用于发送邮件的邮箱地址
            [email],  # 接收邮件的邮件地址,可以写多个
            ),
            # html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中
            kwargs={'html_message': "<a href='http://127.0.0.1:8000/active_user/?id=%s'>点击激活gogogo</a>" % id}
        )
        t.start()

check_result.py:

from celery.result import AsyncResult
from celery_task.celery import app

def check_result(task_id):
    async1 = AsyncResult(id=task_id, app=app)

    if async1.successful():
        result = async1.get()
        print(result)
        return result
        # result.forget() # 将结果删除
        # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
	 elif async1.failed():
        print('执行失败')
        return '执行失败'
    elif async1.status == 'PENDING':
        print('任务等待中被执行')
        return '任务等待中被执行'
    elif async1.status == 'RETRY':
        print('任务异常后正在重试')
        return '任务异常后正在重试'
    elif async1.status == 'STARTED':
        print('任务已经开始被执行')
        return '任务已经开始被执行'

执行:

启用任务执行单元worker(以windows为例):

celery worker -A celery_task -l info  -P  eventlet

app.conf.beat_schdule定时任务时,还需要启用beat,用于定时朝消息队列提交任务:

celery beat -A celery_task -l info

延时任务:

from celery_app_task import add
from datetime import datetime

# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间,这里是10秒后执行任务
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

django + celery:

from datetime import timedelta
from celery import Celery 
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/1', include=[
    'celery_task.tasks1',
    'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名
    'add-every-10-seconds': {
        # 执行tasks1下的test_celery函数
        'task': 'celery_task.tasks1.test_celery',
        # 每隔2秒执行一次
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 传递参数
        'args': ('test',)
    },
}

执行:
    # 启动一个beat
    celery beat -A celery_task -l info

    # 启动work执行
    celery worker -A celery_task -l info -P  eventlet

标签:task,框架,app,redis,celery,任务,import
来源: https://www.cnblogs.com/shaozheng/p/12177855.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有