RQ (Redis Queue)是一个轻量级的Python任务队列,这里记录一下它的简单使用。

首先安装RQ(这里使用的Python版本是3.8.0)

pip install rq==1.10.1

随后创建如下的文件

.
├── __init__.py
├── jobs.py
└── run.py

其中__init__.py中通过连接redis-server创建了两个queue:default和queue_1

1
2
3
4
5
6
from redis import Redis
from rq import Queue

redis_conn = Redis('127.0.0.1', db=0)
rq_default_queue = Queue('default', connection=redis_conn)
rq_queue_1 = Queue('queue_1', connection=redis_conn)

之后我们在jobs.py中定义需要执行的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import requests
from redis import Redis

redis_cli = Redis('127.0.0.1', db=0)

# 任务执行成功的回调函数
def report_success(job, connection, result, *args, **kwargs):
print(result)

# 统计网页单词数量的任务
def count_words_at_url(url):
resp = requests.get(url)
return len(resp.text.split())

# 获取Redis的key的服务
def get_redis_keys():
return redis_cli.keys()

创建好了任务发送队列和任务本身之后,我们就可以启动worker了。我们让worker监听两个队列:default和queue_1

rq worker queue_1 default --with-scheduler

随后我们就可以在run.py中发送任务给worker去执行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from datetime import timedelta
import time
from rq import Retry
from rq_demo import rq_default_queue, rq_queue_1
from jobs import count_words_at_url, get_redis_keys, report_success

rq_default_queue.enqueue(count_words_at_url, 'https://www.jd.com', on_success=report_success, retry=Retry(max=3, interval=[1, 3, 6]))

job1 = rq_default_queue.enqueue(count_words_at_url, 'http://nvie.com')
print(job1.result)
time.sleep(2)
print(job1.result)

job2 = rq_queue_1.enqueue_in(timedelta(seconds=5), get_redis_keys)
print(job2.result)
time.sleep(6)
print(job2.result)

执行结果如下

None
330
None
[b'rq:workers:queue_1', b'rq:worker:93618c280dbb445d92380fc26a33bc93', b'rq:job:909c68e7-a517-469a-ad33-c7f350e4f1dd', b'rq:scheduler-lock:default', b'rq:failed:default', b'rq:wip:queue_1', b'rq:job:f098f9f7-82b2-4a4f-b482-6d4cffd9b09e', b'rq:job:e006b2b5-69cf-4240-a0f2-202d31d71ad9', b'rq:scheduler-lock:queue_1', b'rq:finished:default', b'rq:queues', b'rq:job:aaaaf0e1-3673-4652-8109-20c46ebb89e0', b'rq:job:75b3839e-b8bb-45d8-bfe0-64ce63a502ab', b'rq:job:3c013ebb-7e68-4af2-9291-2abc7f8f0ba3', b'rq:job:8a40b7af-6b10-46a7-9b20-c9af734c0e26', b'rq:clean_registries:default', b'rq:job:43db8266-c1d0-4256-b355-00135e6a67c6', b'rq:clean_registries:queue_1', b'rq:job:e0f68454-c599-45db-b572-a7364f911b4f', b'rq:job:f9adc85e-400a-4e33-95cc-6f68c4d8a0d2', b'rq:workers', b'rq:workers:default']

worker的输出如下

➜  rq_demo git:(master) ./rq.sh
17:28:32 Worker rq:worker:93618c280dbb445d92380fc26a33bc93: started, version 1.10.1
17:28:32 Subscribing to channel rq:pubsub:93618c280dbb445d92380fc26a33bc93
17:28:32 *** Listening on queue_1, default...
17:28:32 Trying to acquire locks for default, queue_1
17:28:32 Cleaning registries for queue: queue_1
17:28:32 Cleaning registries for queue: default
17:28:38 default: jobs.count_words_at_url('https://www.jd.com') (f098f9f7-82b2-4a4f-b482-6d4cffd9b09e)
3700
17:28:39 default: Job OK (f098f9f7-82b2-4a4f-b482-6d4cffd9b09e)
17:28:39 Result is kept for 500 seconds
17:28:39 default: jobs.count_words_at_url('http://nvie.com') (43db8266-c1d0-4256-b355-00135e6a67c6)
17:28:40 default: Job OK (43db8266-c1d0-4256-b355-00135e6a67c6)
17:28:40 Result is kept for 500 seconds
17:28:45 queue_1: jobs.get_redis_keys() (75b3839e-b8bb-45d8-bfe0-64ce63a502ab)
17:28:45 queue_1: Job OK (75b3839e-b8bb-45d8-bfe0-64ce63a502ab)
17:28:45 Result is kept for 500 seconds

可以看到任务已经被执行了。当然,我们也可以起多个worker来同时接受任务去执行,这样可以提高任务的执行效率。

本文涉及到的代码都放在了我的GitHub上面。