ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

celery实际应用例子

2020-05-12 18:54:13  阅读:292  来源: 互联网

标签:priority task max celery length 例子 应用 test


Celery例子

目录

 

 setting

'''
celery配置
'''
task_acks_late = True
worker_prefetch_multiplier = 1
# 限制最大使用内存,限制celery执行10个任务,就销毁重建
worker_max_memory_per_child = 150000
task_reject_on_worker_lost = True
broker_pool_limit = 300
timezone = "Asia/Shanghai"
broker_url = 'amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0'

# 优先级参数必须加
celery_acks_late = True
celeryd_prefetch_multiplier = 1

task文件

import time
from celery import Celery
from celery1 import celery_setting
from kombu import Exchange, Queue

app = Celery('celery1.my_task')
app.config_from_object(celery_setting)
app.conf.update(
broker_url="amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0".format(vhost="test")
)

# 1) x-max-length 提供一个非负整数值来设置最大消息条数。
# 2) x-max-length-bytes 提供一个非负整数值,设置最大字节长度。如果设置了两个参数,那么两个参数都将适用;无论先达到哪个限制,都将强制执行。
# 3) x-overflow 提供字符串值来设置。可能的值是:
# drop-head (默认值):从队列前面丢弃或 dead-letter 消息,保存后n条消息
# reject-publish:最近发布的消息将被丢弃,即保存前n条消息。
# 4) x-max-priority 提供一个非负整数值来设置最大优先级。

app.conf.task_queues = [
Queue('priority_test_1', Exchange('default', type='direct'), routing_key='default', durable=False,
queue_arguments={'x-max-priority': 10, "x-max-length": 10, "x-max-length-bytes": 1000,
"x-overflow": "drop-head", "durable": False}),
Queue('priority_test_2', Exchange('default', type='direct'), routing_key='default', durable=False,
queue_arguments={'x-max-priority': 10, "x-max-length": 10, "x-max-length-bytes": 1000,
"x-overflow": "drop-head"}),

]


@app.task(bind=True,
queue='priority_test_1', # 指定队列名
max_retries=10, # 最大重试
default_retry_delay=600, # 重试间隔时间
autoretry_for=(TypeError, KeyError) # 指定重试错误
)
def priority_test_1(self, data):
# print(self)
print(data)


# celery worker -A celery1.my_task -n priority_test_1 -Q priority_test_1 -c 1 -l info -P gevent

@app.task(bind=True,
queue='priority_test_2',
)
def priority_test_2(self, data):
try:
print(data["1"])
time.sleep(2)
except (TypeError, KeyError) as exc:
raise self.retry(exc=exc, countdown=60 * 5, max_retries=5)

# celery worker -A celery1.my_task -n priority_test_2 -Q priority_test_2 -c 1 -l info -P gevent

添加任务文件

from celery1.my_task import priority_test_1, priority_test_2

for i in range(20):
    priority_test_1.delay({"name": f"{i}"})

 

标签:priority,task,max,celery,length,例子,应用,test
来源: https://www.cnblogs.com/clbao/p/12877990.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有