ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

异步组件Celery

2021-07-29 10:33:12  阅读:192  来源: 互联网

标签:异步 task celery send Celery print 任务 组件


文章目录

1. Celery概述

Celery是一个简单灵活且可靠的,用于处理大量消息的分布式系统。专注于实时处理的异步任务队列,同时也支持任务调度。

分布式系统:简单理解是指系统应用(例如网站),涉及到相关组件,如web服务器、web应用、数据库、消息中间件等等。将系统应用的相关组件架构在不同的服务器上,在不同的服务器上的不同组件之间通过消息通信的方式来实现协调工作的这种模式就是分布式系统。但是一定要做到,当用户访问分布式系统(多台服务器)的时候如访问一台服务器的用户感知是一样的。分布式系统可以实现负载均衡、避免单点故障。

异步任务:异步对应同步,同步是阻塞。当发送一个请求遇到IO操作的时候,如果是同步请求,必须得等待着。当IO结束之后,主程序继续向下执行,同步请求需要等待IO操作完成,浪费了资源。异步请求发送之后遇到IO操作,不需要等待,不需要返回值,直接向下执行其他的业务操作。IO操作什么时间完成,就把结果存到数据库中或者文件中,没有结果就算了。当主进程需要用到IO操作异步请求的结果时到数据库中取出来。异步请求极大程度地利用资源,把IO操作的时间节省下来。


2. Celery组成结构

Celery 由 message broker(消息中间件)、worker (任务执行清单) 和 task result stroe (任务执行结果存储) 三部分组成!

(1)消息中间件:Celery本身不提供消息服务,但是可以很方便地和第三方提供的消息中间件集成,包括RabbitMQ和Redis等

Redis 在一定程度上可以充当消息中间件,只不过没有RabbitMQ持久稳定,官方推荐使用RabbitMQ。

如果没有 Celery 可以使用并发技术,并发技术有线程、进程、协程、IO多路复用!可以自己写,但是偏于复杂,偏底层操作!使用Celery就可以实现,只需要知道Celery的接口调用方式。

(2)任务执行单元:worker 是 Celery 提供的任务执行单元,worker 并发地运行在分布式系统节点中

(3)任务结果存储:用来存储 worker 执行的任务结果,Celery 支持不同的并发和序列化方式的存储任务结果,包括 AMQP,Redis 等

并发:Prefork,Eventlet,gevent,threads/single threaded
序列化:pickle,json,yaml,msgpack,zlib,bzip2 compression,Cryptographic message signing 等等

在这里插入图片描述


3. Celery使用场景

Celery是一个 强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab):

(1)异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

(2)定时任务:定时执行某件事情,比如 每天数据统计


4. Celery的优点

(1)简单:Celery 使用和维护都非常简单,并且 不需要配置文件

(2)高可用:Worker和Client会在网络连接丢失或者失败时自动进行重试,并且 有的Brokers也支持“双主”或者“主从”的方式实现高可用

(3)快速:单个的Celery进程每分钟可以处理百万级的任务,并且只需要 毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)

应用了我们几乎能够利用的所有并发技术!

(4)灵活:Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等


5. Celery的安装

$ pip install -U celery -i https://mirrors.aliyun.com/pypi/simple

6. 异步任务之简单任务结构

消费者进程:celery_task.py

import celery
import time

"""
消费者进程
"""
backend = 'redis://localhost:6379/1'  # 异步的结果
broker = 'redis://localhost:6379/2'  # 消息中间件
cel = celery.Celery('test', backend=backend, broker=broker)

@cel.task
def send_email(name):
    print(f'向{name}发送邮件!')
    time.sleep(5)
    print(f'向{name}发送邮件!')
    return 'ok'

@cel.task
def send_msg(name):
    print(f'向{name}发送短信!')
    time.sleep(5)
    print(f'向{name}发送短信!')
    return 'ok'

执行命令用于监听队列:

$ celery worker -A celery_task -l info

这条命令会使用Celery连接消息中间件(Redis),创建一个队列并监听这个队列,启动多个Worker监听任务。

-------------- celery@thanlon v4.4.6 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-5.4.0-42-generic-x86_64-with-glibc2.29 2020-07-29 11:44:44
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         test:0x7fd3d1ac72b0
- ** ---------- .> transport:   redis://localhost:6379/2
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
-------------- [queues]
                .> celery           exchange=celery(direct) key=celery
[tasks]
  . celery_task.send_email
  . celery_task.send_msg

[2020-07-29 11:44:44,735: INFO/MainProcess] Connected to redis://localhost:6379/2
[2020-07-29 11:44:44,743: INFO/MainProcess] mingle: searching for neighbors
[2020-07-29 11:44:45,764: INFO/MainProcess] mingle: all alone
[2020-07-29 11:44:45,803: INFO/MainProcess] celery@thanlon ready.

生产者进程:produce_task.py

from celery_task import send_email, send_msg  # 导入异步任务函数

# 调用delay方法,就会帮助生产者去连接消息中间中创建好的队列,有什么数据直接插入这个队列
ret1 = send_email.delay('erics')  # 结果不会被返回,ret1不会是返回的结果ok
print(ret1.id)  # 会把返回的值ok放在数据库中,每一步异步请求都会返回唯一的id值,在任何时刻都可以到数据库中拿这个结果
ret2 = send_msg.delay('erics')
print(ret2.id)
"""
aaa30e8a-e9c6-4195-ba36-b5455fd48f42
e063cbde-5b30-4f17-a755-751c5e796a83
"""
[2020-07-29 12:09:05,649: WARNING/ForkPoolWorker-1] 向erics发送短信!
[2020-07-29 12:09:05,649: WARNING/ForkPoolWorker-8] 向erics发送邮件!
[2020-07-29 12:09:10,653: WARNING/ForkPoolWorker-1] 向erics发送短信!
[2020-07-29 12:09:10,654: WARNING/ForkPoolWorker-8] 向erics发送邮件!

获取结果:get_result.py

from celery_task import cel
from celery.result import AsyncResult

async_result = AsyncResult(id='f0ef6bfe-063c-4d01-be92-bb66de9f60bd', app=cel)
if async_result.successful():
    ret = async_result.get()
    print(ret)  # ok
    # ret.forget()  # 将结果删除

elif async_result.failed():
    print('任务执行失败!')
elif async_result.status == 'PENDING':
    print('任务等待被执行中!')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试!')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行!')
"""
ok
"""

7. 异步任务之多任务结构

在这里插入图片描述
消费者进程:celery.py

from celery import Celery

cel = Celery('celery_demo', backend='redis://localhost:6379/1', broker='redis://localhost:6379/2',
             include=['celery_tasks.task01', 'celery_tasks.task02', ])
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否适用UTC
cel.conf.enable_utc = False

task01.py

from celery_tasks.celery import cel
import time

@cel.task
def send_email(name):
    print(f'完成向{name}发送邮件的任务!')
    time.sleep(5)
    return '完成发送邮件!'

task02.py

from celery_tasks.celery import cel
import time

@cel.task
def send_msg(name):
    print(f'完成向{name}发送短信的任务!')
    time.sleep(5)
    return '完成发送短信!'

执行 Celery 命令监听消息队列:

$ celery worker -A celery_tasks -l info -P eventlet

生产者进程:produce_task.py

from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg

ret1 = send_email.delay('erics')
print(ret1.id)
ret2 = send_msg.delay('erics')
print(ret2.id)

获取结果:check_result.py

from celery_tasks.celery import cel
from celery.result import AsyncResult

async_result = AsyncResult(id='ddeeb879-068d-4174-97da-e8c5681a3912', app=cel)
if async_result.successful():
    ret = async_result.get()
    print(ret)  # ok
    # ret.forget()  # 将结果删除。执行完成,结果不会删除
    # async_result.revoke(terminate=True)  # 无论现在是什么时候,都要终止
    # async_result.revoke(terminate=False)  # 如果任务还没有开始执行,那么就可以终止

elif async_result.failed():
    print('任务执行失败!')
elif async_result.status == 'PENDING':
    print('任务等待被执行中!')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试!')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行!')

9. 定时任务简单任务结构

执行定时任务只需要修改生产者部分,定时调度任务就可以,方式1是 produce_task.py:

from celery_task import send_email  # 导入异步任务函数
from datetime import datetime

# 方式1
v1 = datetime(2020, 7, 29, 20, 51, 00)
print(v1)
# 先把日期对象转换成时间戳,然后使用utcfromtimestamp把时间戳转换成国标的时间
v2 = datetime.utcfromtimestamp(v1.timestamp())  # v2是国标日期对象,差8个小时
print(v2)
result = send_email.apply_async(args=['erics', ], eta=v2)  # apply_async可以接纳更多参数, 如果没有eta与delay方法相同
print(result)

方式2:

from celery_task import send_email  # 导入异步任务函数
from datetime import datetime, timedelta

ctime = datetime.now()
print(ctime)
# 默认使用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
result = send_email.apply_async(args=['erics', ], eta=task_time)
print(result.id)

10. 定时任务之多任务结构

在这里插入图片描述
celery.py:

from celery import Celery
from datetime import timedelta

cel = Celery('celery_demo', backend='redis://localhost:6379/1', broker='redis://localhost:6379/2',
             include=['celery_tasks.task01', 'celery_tasks.task02', ])
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否适用UTC
cel.conf.enable_utc = False
# beat_schedule是定时任务相关的调度器,每一个键值对就是一个定时任务,celery beat命令会读这部分信息
cel.conf.beat_schedule = {
    # 名字随意命名(增加一个每10s执行的任务)
    'add-every-10-seconds': {
        # 执行tasks1下的test_celery函数
        'task': 'celery_tasks.task01.send_email',
        'schedule': 2.0,  # 每隔2秒执行一次
        # 'schedule': crontab(minute="*/1"),# 每一分钟
        # 'schedule': timedelta(seconds=10),  # 与'schedule': 6同,但是更丰富
        'args': ('Erics',)  # 传递参数
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_tasks.task01.send_email',
    #     每年4月11号,8点42分执行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': ('张三',)
    # },
}

使用 Redis 存放在 Redis 链 ( list )

执行 Celery 命令监听消息队列:

$ celery -A celery_tasks worker -l info -c 10

如果任务比较多,没有设置并发数,Celery会开多个进程执行任务。

定时任务的多任务结构,向消息队列中添加任务不需要使用生产者,只需要使用命令就可以了:

$ celery beat -A celery_tasks

每隔n秒中插入一次任务,那么监听的时候每隔n秒中执行一次任务。保持平衡的状态,队列中也不会存在遗留的任务。如果停止监听执行任务,但是不影响celery beat命令想消息队列插入任务。


11. Django中使用Celery

在这里插入图片描述
config.py:

broker_url = 'redis://127.0.0.1:6379/1'
result_backend = 'redis://127.0.0.1:6379/2'

main.py:

# 主程序
import os
from celery import Celery

# 创建celery实例对象
app = Celery("sms")

# 把celery和django进行组合,识别和加载django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')

# 通过app对象加载配置
app.config_from_object("mycelery.config")

# 加载任务
# 参数必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms", ]) # 不需要带task.py,默认是这个文件

# 启动Celery的命令
# 强烈建议切换目录到mycelery根目录下启动
# celery -A mycelery.main worker --loglevel=info

task.py:

# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
from mycelery.main import app
import time

import logging

log = logging.getLogger("django")


@app.task  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms(mobile):
    """发送短信"""
    print("向手机号%s发送短信成功!" % mobile)
    time.sleep(5)

    return "send_sms OK"


@app.task  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms2(mobile):
    print("向手机号%s发送短信成功!" % mobile)
    time.sleep(5)

    return "send_sms2 OK"

views.py:

from django.shortcuts import render, HttpResponse
from mycelery.sms.tasks import send_sms, send_sms2
from datetime import timedelta, datetime


def test(request):
    ############### 异步任务 #################

    # 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决
    # send_sms.delay("110")
    # send_sms2.delay("119")
    # send_sms.delay() 如果调用的任务函数没有参数,则不需要填写任何内容

    ############### 定时任务 #################

    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    result = send_sms.apply_async(["911", ], eta=task_time)
    print(result.id)

    return HttpResponse('ok')

由于是异步调度,所以不影响视图显示,立刻能够显示。如果是同步需要 10s 后才能显示。

如果生产者和消费者在同一台服务器,只需要导入即可。如果生产者和消费者不在同一台服务器上,必须拷贝,一份是生产者调用、一份用于消费者监听。要保证生产者所在的服务器上要有消费者的一份代码,消费者所在的服务器上要有生产者的一份代码。

参考:https://www.cnblogs.com/pyedu/p/12461819.html

标签:异步,task,celery,send,Celery,print,任务,组件
来源: https://blog.csdn.net/Thanlon/article/details/107351363

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有