Python RQ 任务队列中的队列 ( Queue )

yufei       6 年, 6 月 前       5493

本来上一章节 Python RQ 的运行流程 要学习队列和初始化的问题,但是,上一章节偏着偏着就将运行流程去了

所以,无论如何,本章节我们必须将 RQ 中队列学习完

队列初始化

前几章节,我们使用的都是默认的队列,也就是除了 Redis 连接外,不传递额外参数来创建一个队列

q =  q = Queue(connection=Redis())

实际上,Queue 可以接受很多参数,第一个参数 name 用于标识一个队列

queue.py

def __init__(self, name='default', default_timeout=None, connection=None,
                 is_async=True, job_class=None, **kwargs):

但这个 name 参数除了用于识别队列外,其实没啥大作用

虽然,有时候,我们经常会命名为一些看起来很有意思的名字,比如 low  、 mediumhigh 。 但也仅仅是用于标识一个队列而已

例如我们要创建一个 low 的队列,可以使用下面的方法

q = Queue('low', connection=redis_conn)

入队

一般情况下我们都使用下面的方式将一个作业任务加入到队列中

q.enqueue(length_of_url, 'https://www.twle.cn/')

该函数的原型为

def enqueue(self, f, *args, **kwargs)

第一个参数用于指定作业任务函数,后面的 *args 参数用于指定一些函数调用需要用到的参数

此外,我们可以添加一些选项来作业修改加入队列的后的行为

默认情况下,这些选项通过 kwargs 这个参数来接收

下表列出了一些常见的选项

选项 说明
timeout 用于指定作业被中断并标记为失败之前的最大运行时间。默认单位是秒,可以是整数或表示整数的字符串 ( 例如,2,'2') 。此外,还可以是具有指定单位的字符串,包括小时,分钟,秒(例如'1h','3m','5s')
result_ttl 用于指定作业任务执行结果保存的时间
ttl 用于指定作业任务在队列中排队的最长时间,超过该时间后,该作业任务就会被取消。如果指定值 -1,则表示不限时间,也就是说会一直等待,知道该作业任务被执行
depends_on 用于指定该作业任务运行之前必须完成的另一个作业任务( 或作业 ID )
job_id 用于手动指定该作业任务的 id job_id
at_front 用于将该作业任务放置在队列的头部,而不是尾部,也就是说可以优先被执行
kwargs 或 args 使用字典或命名参数的方式指定上面提到的任何参数

针对最后一个参数使用的情况,使用 .enqueue() 的显式版本 .enqueue_call() 是一个更好的选择

q = Queue('low', connection=redis_conn)
q.enqueue_call(func=length_of_url,
               args=('https://www.twle.cn',),
               timeout=30)

作业入队时,针对无法访问后台工作进程 ( worker ) 中运行的源代码的情况( 即代码库 X 从代码库 Y 调用延迟函数)

我们可以将该函数作为字符串引用传递

q = Queue('low', connection=redis_conn)
q.enqueue('my_package.my_module.my_func', 3, 4)

队列

除了将作业入队的方法 enqueue() 外,Queues 还有一些有用的方法:

from rq import Queue
from redis import Redis

redis_conn = Redis()
q = Queue(connection=redis_conn)

获取队列中作业任务的长度

print(len(q))

获取一个队列中的所有作业任务的 ID

queued_job_ids = q.job_ids

获取所有的作业任务 Job 实例

queued_jobs = q.jobs

获取一个 ID 为 my_id 的作业任务

job = q.fetch_job('my_id')

删除队列

q.delete(delete_jobs=True)

如果传递 delete_jobs=True 则同时会移除队列中的所有作业任务

该方法执行之后,该队列就不可再使用了,不能重新实例化了

背后的设计思维

通过之前的篇幅介绍,想必你也清楚,使用 RQ 时,是不需要预先设置任何队列,也不必指定任何通道,交换,路由规则或诸如此类的东西

我们可以将一个作业任务加入到任何想要的队列上

如果将作业任务加入到尚不存在的队列,这个队列就会被立即创建

RQ 不使用任何更加高级的中间件来实现路由功能。

你可能会认为这是一个很棒的设计,也可能认为是一个糟糕的障碍,当然了,取决于你正在解决的问题

最后,RQ 不使用任何便携式协议,只依赖于 pickle 来序列化作业任务,但也正因为如此,RQ 仅限于 Python 中使用

延迟返回的执行结果

当一个作业任务被加入队列时,queue.enqueue() 调用返回并不是作业任务的执行结果,而是一个 Job 类的实例

当然了,这个类也没有其它用途,仅仅是一个可用于检查实际作业任务结果的代理对象

为此,它有一个属性 result ,当作业尚未完成时将返回 None,或者当作业完成时返回非 None 值(前提是,假设作业首先具有返回值)

# 延时异步执行 length_of_url('https://www.twle.cn/')
job = q.enqueue(task.length_of_url, 'https://www.twle.cn/')
print(job.result)   # => None

# 然后,我们等待个几秒,直到作业任务被执行完成,就可以查看执行的结果
time.sleep(2)
print(job.result)   # => 889

@job 装饰器

为了简化将一个作业任务加入队列的代码,同时也为了兼容 Celery 中的 @task 装饰器

RQ 0.3 加入了一个新的特性: @job 装饰器,使用方法也很简单

from rq.decorators import job

@job('low', connection=my_redis_conn, timeout=5)
def add(x, y):
    return x + y

job = add.delay(3, 4)
time.sleep(1)
print(job.result)

当然了,使用之前必须从 rq.decorators 模块中导入 job 装饰器

绕过工作进程 ( worker )

由于某些目的,比如测试,我们可能更希望将作业任务加入队列时就返回其执行的结果,而不是等待工作进程执行完成

RQ 0.3.1 新增了这项功能,可以在创建队列时通过传入 is_async=False 来达到这个目的

>>> q = Queue('low', is_async=False, connection=my_redis_conn)
>>> job = q.enqueue(fib, 8)
>>> job.result
21

这种情况下,任何加入 q 队列的作业任务都会立刻返回其执行结果,而不是异步等待 rq worker 的执行结果

这种特性,在 Celery 称之为 ALWAYS_EAGER

但请注意,我们仍需要与 Redis 实例建立连接,以存储与作业执行和完成相关的状态

作业任务相互依赖

RQ 0.4.0 新增加了一个功能,就是可以链式执行多个作业任务

一个作业任务还可以依赖另一个作业任务,为了添加这种依赖关系,将作业任务入队时可以使用 depends_on 来声明依赖关系

q = Queue('low', connection=my_redis_conn)
report_job = q.enqueue(generate_report)
q.enqueue(send_report, depends_on=report_job)

上面的代码中,执行 send_report 作业任务前,必须先执行 report_job 作业任务

这种作业任务依赖性使得我们可以将一个大型的作业任务分割成几个较小的作业任务,依赖另一个作业任务的作业任务仅在其依赖关系完成时才会执行

工作进程 ( worker )

如果想了解更多工作进程的信息,可以访问 Worker 文档

作业任务的限制因素

从技术上讲,我们可以将任何 Python 函数调用放在队列中

但,这并不意味着这样做总是明智的

在将作业任务放入队列之前需要考虑的一些因素

  1. 确保作业任务所在的模块 ( __module__ ) 可以被工作进程导入

    这意味着,无法将 __main__ 模块中声明的作业函数加入队列

    如果使用了 __main__ 模块则会报错,详情请参考 Python RQ 的运行流程t/38#reply0)

  2. 确保工作进程 ( worker ) 和作业任务生成器共享完全相同的源代码

    也就是说,必须调用相同的函数实现,不能生成作业任务时调用的是 a.py 中的 length_of_url() 函数,而工作进程调用的却是另一个 a.py 中的 length_of_url() 函数

  3. 确保函数调用不依赖于其上下文,只依赖传递的参数

    首先要说明的是,全局变量是邪恶的 ( 一如既往 ) ,因为谁都可以更改,谁都可以删除

    工作进程在处理作业任务时,函数所依赖的任何状态 ( 例如 「当前」 用户或 「当前 」 Web 请求)都不存在

    如果希望为 「当前」用户完成工作,则应将该用户解析为具体实例,并将对该用户对象的引用作为参数传递给作业

运行时限制

RQ 的工作进程 rq worker 只能运行在实现了 fork() 函数的系统上

也就是说,一般无法运行在 Windows 操作系统上,除非安装并使用 Windows 下 Linux 子系统 且使用 bash 作为 shell

目前尚无回复
简单教程 = 简单教程,简单编程
简单教程 是一个关于技术和学习的地方
现在注册
已注册用户请 登入
关于   |   FAQ   |   我们的愿景   |   广告投放   |  博客

  简单教程,简单编程 - IT 入门首选站

Copyright © 2013-2022 简单教程 twle.cn All Rights Reserved.