celery学习笔记:celery安装,并运行第一个应用

tech2023-10-21  93

1、celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。

2、Celery需要一种解决消息的发送和接受的方式,我们把这种用来存储消息的的中间装置叫做message broker, 也可叫做消息中间人。 作为中间人,通常用的是RabbitMQ或者Redis:

(1)、RabbitMQ是一个功能完备,稳定的并且易于安装的broker. 它是生产环境中最优的选择。使用RabbitMQ的细节参照以下链接: http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq

如果我们使用的是Ubuntu或者Debian发行版的Linux,可以直接通过下面的命令安装RabbitMQ: sudo apt-get install rabbitmq-server 安装完毕之后,RabbitMQ-server服务器就已经在后台运行。如果您用的并不是Ubuntu或Debian, 可以在以下网址: http://www.rabbitmq.com/download.html 去查找自己所需要的版本软件。

(2)、Redis也是一款功能完备的broker可选项,但是其更可能因意外中断或者电源故障导致数据丢失的情况。 关于是有那个Redis作为Broker,可访下面网址: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis

3、安装celery,很简单,使用下面命令即可:

pip install -U Celery

如果是在项目虚拟目录底下的话,只会安装在虚拟目录底下,不过还可以去官网下载压缩包:https://pypi.python.org/pypi/celery/

找到想安装的版本,解压后,通过python命令编译:

tar xvfz celery-0.0.0.tar.gz cd celery-0.0.0 python setup.py build python setup.py install

此时celery可就安装到本机的python环境目录底下了,而不是对应项目的虚拟目录底下。

4、创建一个项目,这里我先创建了一个flask项目,其中可能还要用使用命令:pip install flask安装下flask,如果没有celery的话,也要用pip install -U Celery安装下

5、创建应用,在那个项目底下新建一个tasks.py文件,代码如下:

from celery import Celery # 测试应用,使用redis作为broker(消息中间人) app = Celery('test', broker='redis://:332572@127.0.0.1/1') # 创建任务函数 @app.task def my_task(): print("任务函数正在执行。。。。。")

(1)、Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。my_task函数是我们编写的一个任务函数, 通过加上装饰器app.task, 将其注册到broker的队列中。

(2)、现在我们在创建一个worker, 等待处理队列中的任务.打开终端,cd到tasks.py同级目录中,执行命令:

celery -A tasks worker --loglevel=info

(3)、发现报了这个错误:ImportError: Missing redis library (pip install redis)

(4)、于是要装下相关redis的库,使用命令:

pip install -U "celery[redis]"

(5)、安装成功后,运行第(2)点,创建一个worker的命令,发现:

原来是没启动redis的服务端,于是在redis安装目录执行命令:

redis-server.exe redis.windows.conf

接着又发现

(6)、好像说客户端认证,没有一个密码设置,redis的密码没设置的问题吗?于是查找资料说redis默认是不需要密码的,在配置文件里用#注释了,需要打开来,于是找到对应的配置文件redis.windows.conf的那一行,去掉#注释:

(7)、保存后,重新用命令redis-server.exe redis.windows.conf,启动下redis的服务端:

(8)、好像密码不对,结果发现是程序底下写的配置redis的密码是332572,redis的URL的配置:

redis://:password@hostname:port/db_number

URL 的所有配置都可以自定义配置的,默认使用的是 localhost 的 6379 端口中 0 数据库。( Redis 默认有 16 个数据库),当然我们上述代码中,是用的1数据库。

(9)、于是改下代码:

from celery import Celery # 测试应用,使用redis作为broker(消息中间人) app = Celery('test', broker='redis://:foobared@127.0.0.1/1') # 创建任务函数 @app.task def my_task(): print("任务函数正在执行。。。。。")

运行后如下图所示:

6、调用任务,任务加入到broker队列中,以便刚才我们创建的celery workder服务器能够从队列中取出任务并执行。如何将任务函数加入到队列中,可使用delay()。

(1)、进入python终端, 执行如下代码:

from tasks import my_task my_task.delay()

如下图所示:

(2)、回到创建worker执行的那个终端页面,发现执行好像有点问题,如下图所示:

(3)、找下解决办法,开始尝试,需要安装eventlet模块,执行命令:

pip install eventlet

(4)、安装成功后,然后启动worker的时候加一个参数,如下:(执行任务)

之前的:

celery -A tasks worker --loglevel=info

改为:

celery -A tasks worker --loglevel=info -P eventlet

执行后如下图所示:

(5)、最后再重新执行下my_task.delay()命令,回到执行worker的命令行页面,如下图所示:

我们通过worker的控制台,可以看到我们的任务被worker处理,并打印出了任务函数正在执行的字符串。调用一个任务函数,将会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。

7、存储结果,如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

(1)、例子仍然使用Redis作为存储结果的方案,任务结果存储配置通过Celery的backend参数来设定。修改tasks.py的代码如下:

from celery import Celery # 测试应用,使用redis作为broker(消息中间人) app = Celery('test', backend='redis://:foobared@127.0.0.1:6379/2', broker='redis://:foobared@127.0.0.1/1') # 创建任务函数 @app.task def my_task(a, b): print("任务函数正在执行。。。。。") return a + b

我们给Celery增加了backend参数,指定redis作为结果存储,并将任务函数修改为两个参数,并且有返回值。

(2)、ctrl+c关掉之前的worker,重新执行下命令,再创建一个worker:

celery -A tasks worker --loglevel=info -P eventlet

(3)、重新进入python命令行,重新执行命令,调用任务:

from tasks import my_task my_task.delay(1,1)

(4)、回到创建worker执行的那个命令行窗口,如下图所示,发现把1加1的值打印出来了:

(5)、更多关于AsyncResult对象信息,可在这个网站查阅:http://docs.celeryproject.org/en/latest/reference/celery.result.html#module-celery.result

8、配置,Celery使用简单,配置也非常简单。Celery有很多配置选项能够使得celery能够符合我们的需要,但是默认的几项配置已经足够应付大多数应用场景了。配置信息可以直接在app中设置,或者通过专有的配置模块来配置。

(1)、直接通过app来配置:

from celery import Celery app = Celery('test') # 增加配置 app.conf.update( result_backend='redis://:foobared@127.0.0.1:6379/2', broker_url='redis://:foobared@127.0.0.1:6379/1', )

(2)、专有配置文件,对于比较大的项目,我们建议配置信息作为一个单独的模块。我们可以通过调用app的函数来告诉Celery使用我们的配置模块。

        配置模块的名字我们取名为celeryconfig, 这个名字不是固定的,我们可以任意取名,建议这么做。我们必须保证配置模块能够被导入。 配置模块的名字我们取名为celeryconfig, 这个名字不是固定的,我们可以任意取名,建议这么做。我们必须保证配置模块能够被导入。

下面我们在tasks.py模块 同级目录下创建配置模块celeryconfig.py:

result_backend = 'redis://:foobared@127.0.0.1:6379/2' broker_url = 'redis://:foobared@127.0.0.1:6379/1'

tasks.py文件修改为:

from celery import Celery import celeryconfig # 我们这里案例使用redis作为broker app = Celery('test') # 从单独的配置模块中加载配置 app.config_from_object('celeryconfig')

(3)、更多配置,可参考文档: http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration

以上内容仅供参考学习,也是我参考网上的资料来的,后面遇到的问题解决之类的,但是我找不到出处,如知道的,请下方评论,并附上链接,我这边修改下,谢谢!

 

最新回复(0)