【Django】 Celery 进程执行异步任务(生产者与消费者)

tech2023-01-05  105

我们在项目中可能会遇到需要多任务执行的情况,例如给用户发送短信验证码、邮件等,但是可能会因为网络问题导致进程阻塞,这时候我们可以开启其他进程。有了 Celery,我们在使用生产者消费者模式时,只需要关注任务本身,极大的简化了程序员的开发流程

celery简单可靠,还可以在多台服务器上运行,单个celery进程每分钟可以处理数以百万计的任务

1.安装celery

我们进入到虚拟环境中安装celery

pip install Celery -i https://pypi.tuna.tsinghua.edu.cn/simple

2.创建实例和配置

2.1 新增一个任务包

在Django项目外层目录中新建一个celery_task包,里面分别新建main.py主文件、config.py配置文件、任务包(这里的包名叫sms),结构如下

2.2 配置数据库

这里以Redis数据库为例

config.py

# 设置中间人,这里是redis三号库 broker_url = "redis://127.0.0.1:6379/3" # 若是使用rabbitmq # broker_url = "amqp://用户名:密码@ip地址:5672"
2.3 设置任务

我们需要在任务包(这里是sms)里 新建一个tasks.py文件,这个文件名是固定的,不要用其他名字

然后在tasks.py文件里设置任务 tasks.py

from celery_tasks.main import celery_app # 这是在main.py里定义的对象名 # 使用装饰器装饰要执行的函数名,之后可以使用delay()方法 @celery_app.task(name='ccp_send_sms_code') def ccp_send_sms_code(mobile, sms_code): '''该函数就是一个任务, 用于发送短信''' result = CCP().send_template_sms(mobile,[sms_code, 5],1) return result
2.4 main文件

我们要在主文件里实例化Celery对象,并且使用上面的配置和指定任务文件

from celery import Celery celery_app = Celery('main') celery_app.config_from_object('celery_tasks.config') # 指定任务所在包的位置,它会自动捕捉执行 celery_app.autodiscover_tasks(['celery_tasks.sms'])
2.5 调用任务

因为我们给函数加上了装饰器,所以我们只要使用原来的函数名加delay()运行,它就会自动开启异步执行了(需要开启celery服务)

from celery_tasks.sms.tasks import ccp_send_sms_code # 原来的写法: # CCP().send_template_sms(mobile, [sms_code, 5], 1) # 改为现在的写法,注意使用delay()方法 ccp_send_sms_code.delay(mobile, sms_code)

3.开启服务

我们要开启celery服务才能使用异步 我们进入项目所在文件夹(这里是处于虚拟环境),然后使用celery命令开启服务

celery -A celery_tasks.main worker -l info

参数说明

参数说明-A启动文件,即上面创建的main.pyworker启动的对象-l日志级别,如info–concurrency每个CPU开启的进程数

如果想以协程的方式运行,可以下载eventlet

# 安装 eventlet 模块 $ pip install eventlet -i https://pypi.tuna.tsinghua.edu.cn/simple # 启用 Eventlet 池 $ celery -A celery_tasks.main worker -l info -P eventlet -c 1000
最新回复(0)