Python 任务队列 RQ 中的作业任务 ( Job )

yufei       6 年, 6 月 前       1485

在某些使用场景中,可能需要在作业任务函数内部访问当前作业 ID 或实例,或者在作业任务中存储任意数据

好在随着 RQ 的不断完善,这些功能都渐渐的添加上了

访问 「当前」的作业任务

这一特性在 0.3.3 版本中被实现

由于作业任务函数是一个常规的 Python 函数

因此如果我们要获取当前作业任务的 ID ( 如果有) ,则必须向 RQ 询问当前作业任务的 ID

RQ 提供了 get_current_job() 函数用于在作业任务函数内访问当前作业 ID

from rq import get_current_job

def add(x, y):
    job = get_current_job()
    print 'Current job: %s' % (job.id,)
    return x + y

在作业任务实例上存储任意数据

这一特性在 0.8.0 版本中实现

如果想要在作业任务实例上添加/更新自定义状态信息,可以使用 meta 属性

该属性允许我们在作业任务实例上存储任意可使用 pickle 模块序列化的数据

import socket

def add(x, y):
    job = get_current_job()
    job.meta['handled_by'] = socket.gethostname()
    job.save_meta()

    # do more work
    time.sleep(1)
    return x + y

队列中的作业任务过期时间

默认情况下,队列中的作业任务是有过期时间的,这一特性在 0.4.7 版本中被添加

任何一个作业任务都有两个 过期时间 ( TTL ),一个用于设置作业任务执行结果的过期时间,一个用户作业任务本身

什么意思呢? 也就是说,如果在指定的时间内该作业任务没有被执行,那么它将被设置为超时

当然了,RQ 也提供了两个机制可以手动设置作业任务本身的过期时间

  1. 创建一个作业并设置过期时间

    job = Job.create(func=say_hello, ttl=43)
    
  2. 加入队列时设置过期时间

    job = q.enqueue(count_words_at_url, 'http://nvie.com', ttl=43)
    

执行失败的任务

如果一个作业任务执行失败且抛出了一个异常,那么工作进程会将该作业任务放到 失败队列 中,并且将该作业任务的属性 is_failed 设置为 True

如果想要获取所有的执行失败的作业任务,可以扫描 get_failed_queue() 返回的失败队列

from redis import StrictRedis
from rq import push_connection, get_failed_queue, Queue
from rq.job import Job


con = StrictRedis()
push_connection(con)

def div_by_zero(x):
    return x / 0

job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()
fq = get_failed_queue()
fq.quarantine(job, Exception('Some fake error'))
assert fq.count == 1

fq.requeue(job.id)

assert fq.count == 0
assert Queue('fake').count == 1
目前尚无回复
简单教程 = 简单教程,简单编程
简单教程 是一个关于技术和学习的地方
现在注册
已注册用户请 登入
关于   |   FAQ   |   我们的愿景   |   广告投放   |  博客

  简单教程,简单编程 - IT 入门首选站

Copyright © 2013-2022 简单教程 twle.cn All Rights Reserved.