在某些使用场景中,可能需要在作业任务函数内部访问当前作业 ID 或实例,或者在作业任务中存储任意数据
好在随着 RQ 的不断完善,这些功能都渐渐的添加上了
访问 「当前」的作业任务
这一特性在 0.3.3
版本中被实现
由于作业任务函数是一个常规的 Python 函数
因此如果我们要获取当前作业任务的 ID ( 如果有) ,则必须向 RQ 询问当前作业任务的 ID
RQ 提供了 get_current_job()
函数用于在作业任务函数内访问当前作业 ID
from rq import get_current_job def add(x, y): job = get_current_job() print 'Current job: %s' % (job.id,) return x + y
在作业任务实例上存储任意数据
这一特性在 0.8.0
版本中实现
如果想要在作业任务实例上添加/更新自定义状态信息,可以使用 meta
属性
该属性允许我们在作业任务实例上存储任意可使用 pickle
模块序列化的数据
import socket def add(x, y): job = get_current_job() job.meta['handled_by'] = socket.gethostname() job.save_meta() # do more work time.sleep(1) return x + y
队列中的作业任务过期时间
默认情况下,队列中的作业任务是有过期时间的,这一特性在 0.4.7
版本中被添加
任何一个作业任务都有两个 过期时间 ( TTL ),一个用于设置作业任务执行结果的过期时间,一个用户作业任务本身
什么意思呢? 也就是说,如果在指定的时间内该作业任务没有被执行,那么它将被设置为超时
当然了,RQ 也提供了两个机制可以手动设置作业任务本身的过期时间
-
创建一个作业并设置过期时间
job = Job.create(func=say_hello, ttl=43)
-
加入队列时设置过期时间
job = q.enqueue(count_words_at_url, 'http://nvie.com', ttl=43)
执行失败的任务
如果一个作业任务执行失败且抛出了一个异常,那么工作进程会将该作业任务放到 失败队列 中,并且将该作业任务的属性 is_failed
设置为 True
如果想要获取所有的执行失败的作业任务,可以扫描 get_failed_queue()
返回的失败队列
from redis import StrictRedis from rq import push_connection, get_failed_queue, Queue from rq.job import Job con = StrictRedis() push_connection(con) def div_by_zero(x): return x / 0 job = Job.create(func=div_by_zero, args=(1, 2, 3)) job.origin = 'fake' job.save() fq = get_failed_queue() fq.quarantine(job, Exception('Some fake error')) assert fq.count == 1 fq.requeue(job.id) assert fq.count == 0 assert Queue('fake').count == 1
目前尚无回复