ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

celery的分布式任务

2021-12-07 21:00:59  阅读:144  来源: 互联网

标签:py django celery 任务 import 分布式


介绍

celery分布式任务可以处理页面上那种数据读写或高密度计算的任务。这种任务如果频繁的执行的话,会造成网页卡死。
并且定时任务也是一种特殊的分布式任务。本例子讲的是从视图函数中直接写入数据,一旦运行到指定页面,后台自动更新数据库。类似于表单上传更改写入数据上传到某种页面,而该页面就是用分布式任务函数所处理。

分布式任务所需的模块

5个包

在settings.py中配置分布式任务和定时任务

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'index',
    # 添加分布式任务功能
    'django_celery_results',
    # 添加定时任务功能
    'django_celery_beat'
]

# 设置存储Celery任务队列的Redis数据库
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# 设置存储Celery任务结果的数据库
CELERY_RESULT_BACKEND = 'django-db'

# 设置定时任务相关配置
CELERY_ENABLE_UTC = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

定义数据模型来演示分布式任务

from django.db import models

class PersonInfo(models.Model):
    id = models.AutoField(primary_key=True)
    name = models.CharField(max_length=20)
    age = models.IntegerField()
    hireDate = models.DateField()
    def __str__(self):
        return self.name
    class Meta:
        verbose_name = '人员信息'

分布式任务具体流程

在主项目文件夹下新建个celery.py文件

import os
from celery import Celery
# 获取settings.py的配置信息
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyDjango.settings')
# 定义Celery对象,并将项目配置信息加载到对象中。
# Celery的参数一般以项目名称命名
app = Celery('MyDjango')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

然后编写init.py文件实现该实例化app(celery.py)对象与django的绑定

# celery官方文档
# 参考文档http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
from .celery import app as celery_app
__all__ = ['celery_app'] # celery框架的实例化对象app与Mydjango对象进行绑定
# 目的是使得Django运行的时候能自动加载Celery框架的实例化对象app

接着在具体应用中编写分布式任务函数,也要新建文件夹task.py

from celery import shared_task
from .models import *
import time

 # 分布式任务
@shared_task
def updateData(id, info): # updateData分布式任务
    try: # 分别代表主键id和模型字段的修改内容。一般没有要求函数设定返回值,若要有返回值,就作为分布式任务的执行结果
        # 否则将执行结果设为None
        # 所有的结果都会被记录在数据表django_celery_results_taskresult的result字段中
        PersonInfo.objects.filter(id=id).update(**info) # 更新数据表中的字段
        return 'Done'
    except: # 分布式任务的触发条件被设置在了urls.py中,当用户访问特定的路由时,视图函数将调用编写的分布式函数
        return 'Fail'

# 定时任务
@shared_task
def timing(): # 定时任务,须在后台设置,不知怎么他把celery许多的数据表单把注册到后台了,并没有使用django的admin
    now = time.strftime("%H:%M:%S")
    with open("d:\\output.txt", "a") as f:
        f.write("The time is " + now)
        f.write("\n") # 不过页真耗内存
        f.close()

index应用下的路由信息urls.py


from django.urls import path
from .views import *
urlpatterns = [
    path('', index, name='index'),
]

编写视图处理函数views.py


from django.http import HttpResponse
from .tasks import updateData


def index(request):
    # 传递参数并执行异步任务
    id = request.GET.get('id', 1) # 这里是手动写入数据,在views.py中,并没有从页面(表单)或者路由(get请求)中写入
    info = dict(name='May', age=20, hireDate='2019-10-10')
    # 调用分布式函数updateData,在调用delay方法创建任务队列,并将分布式函数设有的参数传入
    updateData.delay(id, info) # delay应该时celery框架自带的方法
    return HttpResponse("Hello Celery")

启动celery框架和定时任务的方法

 celery -A  MyDjango worker -1 info -P eventlet
 这个命令报错
 好吧,1和l真的很像,命令行输入时很容易弄错
 celery -A MyDjango worker -l info -P eventlet 分布式任务
# 启动定时任务就必须保证django和celery框架是运行的
celery -A MyDjango worker -l info -S django 定时任务 

标签:py,django,celery,任务,import,分布式
来源: https://www.cnblogs.com/wkhzwmr/p/15658799.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有