ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Celery任务调度模板

2021-06-15 12:57:25  阅读:159  来源: 互联网

标签:celery task 10 Celery add result print 任务调度 模板


Celery任务调度使用:https://docs.celeryproject.org/en/stable/

目录路结构:
在这里插入图片描述
其中celerybeat-schedule.bak、celerybeat-schedule.dat、celerybeat-schedule.dir是任务调度过程中产生过的文件。
setting是整个架构的配置项。
start_celery是启动任务调度等待队列,等待任务分发过来。
tasks是任务分发过来后的具体任务执行内容。
other_test是任务调度过程中产生的一些数据和状态。
start_task是启动任务,把消息给celery进行分发。

setting:

# 官方文档:https://docs.celeryproject.org/en/stable/userguide/routing.html
### 基础配置 ###

# 使用Redis作为消息代理
BROKER_URL = 'redis://:@127.0.0.1:6379/1'
# BROKER_URL = 'redis://:123456@127.0.0.1:6379/1'
# 把任务结果存在了Redis
CELERY_RESULT_BACKEND = 'redis://:@127.0.0.1:6379/0'
# CELERY_RESULT_BACKEND = 'redis://:123456@127.0.0.1:6379/0'
# 任务序列化和反序列化使用msgpack方案
CELERY_TASK_SERIALIZER = 'msgpack'
# 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_RESULT_SERIALIZER = 'json'
# 任务过期时间
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 指定接受的内容类型
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
# 指定时间为utc
TIMEZONE = 'asia/shanghai'
ENABLE_UTC = False
# 配置路由
TASK_ROUTES = {'queue': 'my_queue'}



start_celery:

from celery import Celery
from datetime import timedelta
from celery.schedules import crontab


# celery -A [文件名] worker -l [日志等级] -P [eventlet/gevent等多任务] -c [指定同时任务数量] -Q 指定队列[默认celery]
# 使用celery help来获取更多的参数说明
# celery -A tasks worker -l info -P eventlet -c 8 -Q my_queue,celery 运行任务队列
# 实例化celery,参数是对于的文件名(好像不是很严格),include是对应的任务文件
app = Celery(__name__, include=["myCelery.tasks", ])
# 导入Celery相关配置
app.config_from_object('myCelery.setting')
# 定时任务
# app.conf.beat_schedule = {
#     "each10s_task": {
#         "task": "myCelery.tasks.add",
#         "schedule": timedelta(seconds=10),  # 每10秒钟执行一次
#         "args": (10, 10)
#     },
#     "each1m_task": {
#         "task": "tasks.add",
#         "schedule": crontab(minute=1),  # 每1分钟执行一次
#         "args": (10, 10)
#     },
#     "each1hours_task": {
#         "task": "tasks.add",
#         "schedule": crontab(hour=1),  # 每1小时执行一次
#         "args": (10, 10)
#     },
# }

tasks:

import time
import sys
sys.path.append("..")
from myCelery.start_celery import app
from celery.utils.log import get_task_logger


# 日志模块
logger = get_task_logger(__name__)


# 任务一
@app.task(name="add.task")
def add(x, y):
    z = x + y
    # 假设这里在执行下载任务花时久
    time.sleep(5)
    logger.info('***add result is {0} !'.format(z))
    return z


# 任务二
@app.task(name='del.task')
def delete(s):
    # 假设这里在执行删除任务花时久
    time.sleep(5)
    logger.info('***{0} already delete !'.format(s))
    return "%s already delete !" % s

other_test:

from myCelery.tasks import add


# 根据任务ID获取想要的信息
task_id = "ba8fb2bb-0ea8-49c9-a7a0-3daddda5ddb9"
print(add.AsyncResult(task_id).get())  # 用propagate来覆盖异常
print(add.AsyncResult(task_id).status)
print(add.AsyncResult(task_id).traceback)
print(add.AsyncResult(task_id).children)

start_task:

# coding=utf-8
import time
from myCelery.tasks import add
from threading import Thread
from celery import group
from celery import chain


# 借助线程实现异步
def func_async(f):
    def wrapper(*args, **kwargs):
        thr = Thread(target=f, args=args, kwargs=kwargs)
        thr.start()
    return wrapper


def get_result(result):
    print('Task Done:')
    print('  -->failed: {0}'.format(result.failed()))
    print('  -->successful: {0}'.format(result.successful()))
    print('  -->result: {0}'.format(result.get(propagate=False)))  # 获取结果,propagate=False如果异常不传递异常
    print('  -->result: {0}'.format(result.result))
    print('  -->status: {0}'.format(result.status))
    print('  -->traceback: {0}'.format(result.traceback))
    print('  -->children: {0}'.format(result.children))
    print('  -->task_id: {0}'.format(result.task_id))
    return result.ready()


@func_async
def by_chain():
    # 可以将任务链接在一起,以便在一个任务返回后又调用另一个任务,串行
    res = chain(add.s(10) | add.s(20))(10)  # add(add(10, 10), 20)
    print("**********链式处理结果:{0}***********".format(res.get()))
    print("**********链式处理父的结果:{0}***********".format(res.parent.get()))  # 一个parent对应一个父结果


@func_async
def by_group():
    # 并行调用任务列表,它返回一个特殊的结果实例,该实例使您可以将结果作为一个组进行检查,并按顺序检索返回值。
    res_list = group(add.s(i, i) for i in range(3, 10))().get()  # 分组处理
    print("########分组处理结果:{0}#########".format(res_list))


@func_async
def listen(result):
    print(result.task_id)
    while not result.ready():
        time.sleep(1)
    else:
        return get_result(result)


if __name__ == '__main__':
    # 启动数个任务,异步启动,不影响主程序
    # r1 = add.delay(1, 1)
    r2 = add.apply_async((2, 2), queue='my_queue', countdown=5)  # 指定任务队列和延后运行时间
    # d1 = delete.delay("task1")
    # d2 = delete.s("task2")
    # res = d2.delay()
    # d3 = delete.apply_async(("task3", ), queue='my_queue', countdown=10)
    # task_list = [r1, r2, d1, d3]
    # # 程序异步执行监听以上任务
    # for ts in task_list:
    #     listen(ts)
    # # 异步调用分组处理
    # by_group()
    # # 代表程序异步执行主逻辑
    # for i in range(10):
    #     time.sleep(2)
    #     print("程序继续走!!!")
    # print("-->signature result: {0}".format(res.get()))
    # ##异步调用链条处理,要和分组处理错开
    # # by_chain()

标签:celery,task,10,Celery,add,result,print,任务调度,模板
来源: https://blog.csdn.net/xiaoxin_OK/article/details/117922052

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有