Celery是一个任务队列,它可以实现跨进程和机器的分布式任务处理。任务队列的输入端会输入各种任务(task),这些任务会在输出端由worker进行处理,这些任务会由客户端通过发送消息的方式交给broker,随后broker把任务分发给worker。

安装组件

本文使用到的组件版本

组件 版本
Python 2.7.16
Celery 4.4.7
Redis 6.2.4
redis-py 3.2.1

首先我们需要安装celery和Redis的依赖包

pip install celery==4.4.7
pip install redis==3.2.1

Celery支持多种类型的broker,在这里我们主要使用Redis作为Celery的broker,关于Redis的安装和使用可以参考我之前的文章Redis failover

构建应用

我们首先创建如下的目录结构(本文的示例代码都放在了GitHub上面

.
├── run.py
└── search
    ├── __init__.py
    ├── config.py
    └── tasks.py

创建celery应用

search/config.py包含了一些celery的配置文件,具体配置如下

1
2
3
4
5
6
7
8
# 设置任务模块
include = ['search.tasks']

# 对指定的任务使用一个特定的队列进行路由,该任务在发送时会被发送到该指定队列中
# 未指定队列的任务默认发送到一个名叫celery的队列
task_routes = {
'search.tasks.sort_list': {'queue': 'queue_1'}
}

search/__init__.py中我们利用如上的配置信息初始化一个celery的app

1
2
3
4
5
6
from __future__ import absolute_import
from celery import Celery
from . import config

app = Celery('search', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/1')
app.config_from_object(config)

创建celery任务

如上我们创建了一个celery的app,该app使用Redis作为broker和backend,之后我们在search/tasks.py中创建任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import requests
from . import app

@app.task
def search_url(url):
r = requests.get(url)
return r.status_code

@app.task
def add(x, y):
return x + y

@app.task
def sort_list(data):
return sorted(data)

启动celery的worker

写完代码之后我们在项目根目录执行如下命令启动一个celery的worker

celery -A search worker -Q queue_1,celery -l info

其中,-A--app的简写,代表启动的应用;worker表示当前命令是要启动一个celery的worker;-Q queue_1,celery表示当前worker监听queue_1celery队列,不指定的话默认使用一个名叫celery的队列;-l info表示日志的级别。启动后输出如下内容,代表celery的worker已经成功启动了

-------------- celery@Mac v4.4.7 (cliffs)
--- ***** ----- 
-- ******* ---- Darwin-19.6.0-x86_64-i386-64bit 2022-04-15 14:30:06
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         search:0x104604090
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
-------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                .> queue_1          exchange=queue_1(direct) key=queue_1

[tasks]
. search.tasks.add
. search.tasks.search_url
. search.tasks.sort_list

[2022-04-15 14:30:06,838: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2022-04-15 14:30:06,855: INFO/MainProcess] mingle: searching for neighbors
[2022-04-15 14:30:07,930: INFO/MainProcess] mingle: all alone
[2022-04-15 14:30:07,983: INFO/MainProcess] celery@Mac ready.

发送任务

之后我们在run.py中发送celery任务并交给worker执行

1
2
3
4
5
6
7
8
9
10
import logging
from search.tasks import search_url, add, sort_list, get_redis_keys

if __name__ == '__main__':
logging.info(search_url.delay('https://www.jd.com').get(5))
logging.info(add.delay(1.5, 3.5).get(5))
data = [5, 86, 59, 17, 24, 92, 38, 95, 13, 89, 63, 3, 4, 60, 6]
logging.info(sort_list.delay(data).get(5))
# 将任务发送到指定队列,如果该队列没有worker监听,则此任务不会执行,5秒后超时
logging.info(add.apply_async((1.5, 3.5), queue='queue_2').get(5))

只需要在原有的方法基础上添加一个delay方法,就可以实现任务的发送并交给worker执行,非常简单。delayapply_async方法的简化版,在apply_async方法中我们还可以指定该任务的发送队列,以及一些其它的配置。

apply_async方法的返回值是一个AsyncResult类型,该类型的对象可以获取任务的信息,例如successful()failed()方法可以获取到该任务是否执行成功,idstate属性可以获取到该任务的id和状态。如上所示,get()方法可以获取到该任务的返回值,为了避免卡死可以在执行时添加get(timeout)方法的超时时间。

通过设置队列实现路由功能

celery可以通过设置队列来实现任务的路由。假设我们有三个任务,它们发送任务的队列设置分别为q1,q2,q3。同时我们还有三个worker,它们的队列设置分别为q1,q1,q2,q3。那么任务1将会发送到worker1或者worker2上执行,而任务2和任务3都会在worker3上执行,通过队列就可以实现把任务发送到指定的worker上执行的功能。

worker queue task
1 1 1
2 1 1
3 2,3 2,3

由上可见,task是和queue绑定的,一个task只能发送到一个指定的queue。而一个worker既可以监听多个queue,也可以多个worker监听一个queue,前者可以实现worker能力的扩展,后者可以实现任务的多负载均衡。

celery的监控

Flower是一个celery的网页监控和管理工具,使用前需要先安装

pip install flower==0.9.2

之后我们可以启动它

celery -A search flower --port=5555 -Q queue_1,celery -l info

随后访问http://127.0.0.1:5555就可以查看celery的监控信息了,我们也可以在网页上对celery进行一些管理操作。

参考

https://docs.celeryq.dev/en/v4.4.7/index.html
https://github.com/RitterHou/celery_demo