加入队列的作业任务是延迟执行的函数调用。这意味着我们正在解决一个问题,但也正在创造另一些问题
处理作业任务执行结果
Python 中的函数可能有返回值,因此作业任务执行结果也可能有返回值
如果作业任务返回非 None
的值,那么工作进程会将该返回值保存到 Redis 中该作业任务相关的键值对中的 result
键中
任务作业运行完成且成功后,作业任务在 Redis 中的键值对也会在 500
秒后失效
作业任务入队操作也会返回一个作业任务 Job
实例,这个 Job
对象是与作业任务 ID
相关联的代理对象
我们可以通过该 Job
实例来获取作业任务的执行结果
# 延时异步执行 length_of_url('https://www.twle.cn/') job = q.enqueue(task.length_of_url, 'https://www.twle.cn/') print(job.result) # => None # 然后,我们等待个几秒,直到作业任务被执行完成,就可以查看执行的结果 time.sleep(2) print(job.result) # => 889
当然了,作业任务的执行结果在 Redis
中也是有过期时间的,一旦到达了过期时间,该作业任务键值对就会被删除
这种机制,仅仅是为了避免不断增长的 Redis
数据库
当然,这个 500
秒的过期时间也不是固定的
从 0.3.1
版本开始,可以在调用 enqueue()
和 enqueue_call()
函数时传递 result_ttl
关键字参数指定作业结果的 TTL
值
我们也可以完全禁用过期时间,也就是结果永远有效,但这意味这需要我们自己负责清理工作,使用时务必小心
下面的代码用于自定义结果过期时间
q.enqueue(foo) # 默认情况下,过期时间设置为 500 秒 q.enqueue(foo, result_ttl=86400) # 指定过期时间为 1 天 q.enqueue(foo, result_ttl=0) # 指定即刻过期 q.enqueue(foo, result_ttl=-1) # 禁用过期时间,需要自己清理已完成的作业任务
当然了,对于没有返回值的作业任务,在任务完成后会被立即删除
为了改变这种默认的机制,我们可以使用 result_ttl
参数设置该作业任务的过期时间
q.enqueue(func_without_rv, result_ttl=500) # 明确指定作业任务过期时间
异常处理
作业任务执行并不总是成功,也可能是会失败并抛出异常,这是现实生活中的事实
对于作业任务执行失败并抛出异常的情况,RQ 采用了如下的处理机制
- 作业任务执行失败是如此的重要,但又往往不被人注意,鉴于这种情况,作业任务的返回值永远不会过期
- 此外,应该可以重试失败的作业任务。 通常,这需要手动来判断。因为没有自动或可靠的方法让 RQ 判断哪些任务是否可以安全地重试
当作业任务执行期间抛出了异常,这些异常会被 工作进程 ( worker ) 捕获,序列化并存储在作业任务键值对中的 exc_info
键下。同时会将作业任务的 ID 放到 failed
队列中
这种情况下,作业任务本身具有一些有用的属性,可用于辅助检查
- 当前作业任务的创建时间
- 当前作业任务被加入队列的时间
- 当前作业任务被加入的队列的名称
- 一段用户描述当前执行函数的简单文本
- 异常信息
通过这些信息,我们可以手动检查和判断问题,并在可能的情况下重新提交作业任务
中断处理
当工作进程以优雅的方式被结束时 ( CTRL + C
或 kill
命令 ) ,工作进程会尽最大努力尝试完成当前还在执行中的作业任务。
当前正在执行的作业任务完成后,工作进程就不会再尝试获取下一个作业任务,而是结束自己
这种机制使得作业任务总能得到执行完成
但是,工作进程也可以通过 kill -9
强行结束,这种情况下,根本就不会有时间留给工作进程来优雅地完成工作或将工作放在失败的队列上
因此,强行结束一个工作任务可能会导致很多预想不到的结果
作业任务超时处理
默认情况下,作业任务应该在 180
秒内执行完毕,否则工作进程会主动结束作业任务并将它加入到执行失败队列中,并指示该作业任务失败的原因是超时
如果一个作业任务需要更多(或更少)的时间来完成,可以正将其加入队列时使用 timeout
参数来放松(或收紧)默认超时期限
q = Queue() q.enqueue(mytask, args=(foo,), kwargs={'bar': qux}, timeout=600) # 10 分钟
我们还可以在创建队列时通过参数 default_timeout
一次性修改加入该队列的所有作业任务的默认超时时间
# 高权重作业任务应该在 8 秒内结束 # 地权重作业任务可以在 10 分钟内结束 high = Queue('high', default_timeout=8) # 8 秒 low = Queue('low', default_timeout=600) # 10 分钟 # 单个作业仍可以覆盖这些默认值 low.enqueue(really_really_slow, timeout=3600) # 1 小时