在文章 Python RQ 背后的机制 中我们详细介绍了 RQ 如何将数据数据保存到 Redis 上
我们知道如果使用下面的代码则会创建一个默认的队列
q = Queue(connection=Redis())
队列在 Redis 中的键为 rq:queue:default
是一个列表类型
但我们在继续介绍 RQ 中的队列之前,我们先来简单了解下 RQ 中的作业任务
RQ 作业任务 ( job )
RQ 作业任务 ( job ) 是一个可执行的 Python 对象,是一个被处于后台的工作进程( works ) 异步调用的函数
RQ 中只需将对函数及其参数加入到队列中,就可以异步调用任何 Python 函数,这种操作也称之为 入队
创建作业任务
因为 RQ 中的作业任务是一个可执行的对象,例如函数和实现了 __call__
方法的类的实例,所以创建一个作业任务和创建一个普通函数没啥区别
task.py
import requests def length_of_url(url): resp = requests.get(url) return len(resp.text)
从 Python RQ 背后的机制 章节中我们知道作业任务的执行结果有好几种状态,执行成功 ( finished) 和执行失败 (failed)
正常情况下,不管这个可调用的对象有没有返回值,都认为是执行成功的
但如果执行中抛出了异常,任何异常,都会被视为执行失败
注意
作业任务所在的文件有一个特殊要求,就是不能处于 __main__
入口文件中,否则会报错
比如下面的代码
from redis import Redis from rq import Queue import requests def length_of_url(url): resp = requests.get(url) return len(resp.text) q = Queue(connection=Redis()) rs = q.enqueue( length_of_url, 'https://www.twle.cn') print(rs)
运行结果会报错
$ python3 task.py Traceback (most recent call last): File "d.py", line 14, in <module> count_words_at_url, 'https://www.twle.cn') File "/Users/yufei/devops/python/demo/lib/python3.7/site-packages/rq/queue.py", line 279, in enqueue raise ValueError('Functions from the __main__ module cannot be processed ' ValueError: Functions from the __main__ module cannot be processed by workers (demo)
作业任务入队
刚刚我们已经创建了一个作业任务 length_of_url(url)
,现在,是时候把作业任务和所需参数加入到任务队列中了
将一个作业任务入队的方式很简单,短短几行代码就解决了
main.py
from rq import Queue from redis import Redis import task # 告诉 RQ 使用那个 Redis 服务 redis_conn = Redis() q = Queue(connection=redis_conn) # 没有传递任何参数则表示使用默认队列 # 延时异步执行 length_of_url('https://www.twle.cn/') job = q.enqueue(task.length_of_url, 'https://www.twle.cn/') print(job.result) # => None # 然后,我们等待个几秒,直到作业任务被执行完成,就可以查看执行的结果 time.sleep(2) print(job.result) # => 41039
打开一个 shell 并跳转到当前目录下,运行下面的命令开启后台工作进程
$ rq worker 10:17:57 RQ worker 'rq:worker:lie.16080' started, version 0.11.0 10:17:57 *** Listening on default... 10:17:57 Cleaning registries for queue: default
启动过程没啥亮点,唯一值得注意的就是 rq:worker:lie.16080
,这是当前工作进程的 ID
然后打开一个新的 shell
,跳转到当前目录下,运行 python main.py
查看结果
$ python main.py
None
41039
作业任务执行结果
回到 rq worker
的 shell,可以看到输出结果如下
$ rq worker 10:17:57 RQ worker 'rq:worker:lie.16080' started, version 0.11.0 10:17:57 *** Listening on default... 10:17:57 Cleaning registries for queue: default 10:20:03 default: task.length_of_url('https://www.twle.cn/') (c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36) 10:20:04 default: Job OK (c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36) 10:20:04 Result is kept for 500 seconds
日志输出没什么,但可以看到一个 500
数字,这个表示执行结果保存的时间,500 秒之后就会失效
如果我们在 redis-cli
中使用 keys *
命令,可以看到输出结果如下
127.0.0.1:6379> keys * 1) "rq:worker:lie.16080" 2) "rq:workers" 3) "rq:workers:default" 4) "rq:job:c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36" 5) "rq:finished:default" 6) "rq:queues"
可以看到多了几个键,分别是 rq:worker:lie.16080
、 rq:workers
、 rq:workers:default
还有 rq:finished:default
-
rq:finished:default
rq:finished:default 用于保存默认队列的执行结果,是一个
zset
类型127.0.0.1:6379> type rq:finished:default zset
保存的是执行成功的作业任务的 ID
127.0.0.1:6379> zrange rq:finished:default 0 -1 1) "c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36"
-
rq:job:c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36
既然任务已经执行完成,我们就看看执行完成的任务是啥样的
127.0.0.1:6379> hgetall rq:job:c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36 1) "description" 2) "task.length_of_url('https://www.twle.cn/')" 3) "timeout" 4) "180" 5) "started_at" 6) "2018-07-13T02:29:10.524589Z" 7) "enqueued_at" 8) "2018-07-13T02:29:10.510681Z" 9) "created_at" 10) "2018-07-13T02:29:10.510287Z" 11) "result" 12) "\x80\x04\x95\x04\x00\x00\x00\x00\x00\x00\x00MO\xa0." 13) "origin" 14) "default" 15) "ended_at" 16) "2018-07-13T02:29:10.926991Z" 17) "status" 18) "finished" 19) "data" 20) "x\x9ck`\x99j\xca\x00\x01\x1a=B%\x89\xc5\xd9z9\xa9y\xe9%\x19\xf1\xf9i\xf1\xa5E9S\xfczD2JJ\n\x8a\xad\xf4\xf5\xcb\xcb\xcb\xf5J\xcasR\xf5\x92\xf3\xf4\xa7\xb4N\xa9\x9dR2E\x0f\x00r\xe7\x16\x1c"
可以看到,新增加了几个键值对,同时 status 键的值已经改成了
finished
键 值 说明 started_at 2018-07-13T02:29:10.524589Z 开始执行任务的时间 ended_at 2018-07-13T02:29:10.926991Z 结束执行任务的时间 result "\x80\x04\x95\x04\x00\x00\x00\x00\x00\x00\x00MO\xa0." 执行的结果 -
rq:workers
和rq:workers:default
rq:workers
和rq:workers:default
不细说了,跟rq:queue
和rq:queue:default
是一样的 -
rq:worker:lie.16080
rq:worker:lie.16080 是我们的后台工作进程的 ID,而这个键保存的是这个工作进程的状态
127.0.0.1:6379> type rq:worker:lie.16080 hash 127.0.0.1:6379> hgetall rq:worker:lie.16080 1) "birth" 2) "2018-07-13T02:17:57.213198Z" 3) "last_heartbeat" 4) "2018-07-13T02:29:10.937285Z" 5) "queues" 6) "default" 7) "state" 8) "idle" 9) "successful_job_count" 10) "3" 11) "total_working_time" 12) "1421007"
键值对一看就懂,就不细说了