使用Celery实现Python分布式任务处理
Celery是一个任务队列,它可以实现跨进程和机器的分布式任务处理。任务队列的输入端会输入各种任务(task),这些任务会在输出端由worker进行处理,这些任务会由客户端通过发送消息的方式交给broker,随后broker把任务分发给worker。
安装组件
本文使用到的组件版本
组件 | 版本 |
---|---|
Python | 2.7.16 |
Celery | 4.4.7 |
Redis | 6.2.4 |
redis-py | 3.2.1 |
首先我们需要安装celery和Redis的依赖包
pip install celery==4.4.7
pip install redis==3.2.1
Celery支持多种类型的broker,在这里我们主要使用Redis作为Celery的broker,关于Redis的安装和使用可以参考我之前的文章Redis failover。
构建应用
我们首先创建如下的目录结构(本文的示例代码都放在了GitHub上面)
.
├── run.py
└── search
├── __init__.py
├── config.py
└── tasks.py
创建celery应用
search/config.py
包含了一些celery的配置文件,具体配置如下
1 | # 设置任务模块 |
在search/__init__.py
中我们利用如上的配置信息初始化一个celery的app
1 | from __future__ import absolute_import |
创建celery任务
如上我们创建了一个celery的app,该app使用Redis作为broker和backend,之后我们在search/tasks.py
中创建任务
1 | import requests |
启动celery的worker
写完代码之后我们在项目根目录执行如下命令启动一个celery的worker
celery -A search worker -Q queue_1,celery -l info
其中,-A
是--app
的简写,代表启动的应用;worker
表示当前命令是要启动一个celery的worker;-Q queue_1,celery
表示当前worker监听queue_1
和celery
队列,不指定的话默认使用一个名叫celery
的队列;-l info
表示日志的级别。启动后输出如下内容,代表celery的worker已经成功启动了
-------------- celery@Mac v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Darwin-19.6.0-x86_64-i386-64bit 2022-04-15 14:30:06
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: search:0x104604090
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
.> queue_1 exchange=queue_1(direct) key=queue_1
[tasks]
. search.tasks.add
. search.tasks.search_url
. search.tasks.sort_list
[2022-04-15 14:30:06,838: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2022-04-15 14:30:06,855: INFO/MainProcess] mingle: searching for neighbors
[2022-04-15 14:30:07,930: INFO/MainProcess] mingle: all alone
[2022-04-15 14:30:07,983: INFO/MainProcess] celery@Mac ready.
发送任务
之后我们在run.py
中发送celery任务并交给worker执行
1 | import logging |
只需要在原有的方法基础上添加一个delay
方法,就可以实现任务的发送并交给worker执行,非常简单。delay
是apply_async
方法的简化版,在apply_async
方法中我们还可以指定该任务的发送队列,以及一些其它的配置。
apply_async
方法的返回值是一个AsyncResult类型,该类型的对象可以获取任务的信息,例如successful()
和failed()
方法可以获取到该任务是否执行成功,id
和state
属性可以获取到该任务的id和状态。如上所示,get()
方法可以获取到该任务的返回值,为了避免卡死可以在执行时添加get(timeout)
方法的超时时间。
通过设置队列实现路由功能
celery可以通过设置队列来实现任务的路由。假设我们有三个任务,它们发送任务的队列设置分别为q1,q2,q3。同时我们还有三个worker,它们的队列设置分别为q1,q1,q2,q3。那么任务1将会发送到worker1或者worker2上执行,而任务2和任务3都会在worker3上执行,通过队列就可以实现把任务发送到指定的worker上执行的功能。
worker | queue | task |
---|---|---|
1 | 1 | 1 |
2 | 1 | 1 |
3 | 2,3 | 2,3 |
由上可见,task是和queue绑定的,一个task只能发送到一个指定的queue。而一个worker既可以监听多个queue,也可以多个worker监听一个queue,前者可以实现worker能力的扩展,后者可以实现任务的多负载均衡。
celery的监控
Flower是一个celery的网页监控和管理工具,使用前需要先安装
pip install flower==0.9.2
之后我们可以启动它
celery -A search flower --port=5555 -Q queue_1,celery -l info
随后访问http://127.0.0.1:5555就可以查看celery的监控信息了,我们也可以在网页上对celery进行一些管理操作。
参考
https://docs.celeryq.dev/en/v4.4.7/index.html
https://github.com/RitterHou/celery_demo