在上一章节 Python RQ 任务队列中的工作进程 ( 一 ) 章节中我们讲述了 RQ 工作进程的日常使用的一些知识
本章节我们我们来做一些深入了解,也借此机会一窥工作进程的全貌
工作进程
默认情况下,rq worker
shell 脚本是一个简单的 fetch-fork-execute
循环
如果我们的作业任务需要进行冗长的设置,或者都依赖于同一组模块时,你每次运行一个作业任务时就要消耗大量的时间。因为这些配置或模块只有在 fork
之后才进行了导入
从某些方面说,这很干净,因为 RQ 不会以这种方式泄漏内存
但也会变慢
可用于提高这类作业的吞吐量性能的方式是 在 fork
之前导入必要的模块
但 RQ 并没有提供相关的选项用于配置此设置,不过,我们可以自定义脚本,并在开始工作循环之前自己完成此操作
为此,我们需要提供自己的工作脚本 ( 而不是使用 rq worker
)
一个简单的实现范例
#!/usr/bin/env python import sys from rq import Connection, Worker # 预先载入的模块 import library_that_you_want_preloaded # 使用一个队列名作为参数来来监听该队列 # 类似于 rq worker with Connection(): qs = sys.argv[1:] or ['default'] w = Worker(qs) w.work()
工作进程名称
工作进程在注册到系统时需要使用一个名字
默认情况下,这个名字由 rq:worker:
、当前的主机名和当前的进程名组成,例如
rq:worker:lie.16080
当然了,我们可以自定义这个名称,只需要在启动工作进程时使用 --name
选项即可
rq worker --name rq:worker:lie:1
获取工作进程信息
这一特性在 0.10.0
版本中被实现
工作进程会将它们的运行时信息存储在 Redis 中,我们可以使用下面的代码来获取这些信息
from redis import Redis from rq import Queue, Worker # 返回该连接中注册的所有工作进程 redis = Redis() workers = Worker.all(connection=redis) # 返回监听某个队列的所有工作进程 ( 版本 0.10.0 中新增 ) queue = Queue('queue_name') workers = Worker.all(queue=queue)
处于监控目的,如果我们只想知道有多少工作进程,使用 Worker.count()
函数会更高效
from redis import Redis from rq import Worker redis = Redis() # 统计该 Redis 连接中注册的所有工作进程 workers = Worker.count(connection=redis) # 统计监听某个队列的所有工作进程 queue = Queue('queue_name', connection=redis) workers = Worker.all(queue=queue)
工作进程统计信息
这一特性在 0.9.0
中被实现
为了检查队列的利用率,工作进程会存储一些有用的信息
from rq.worker import Worker worker = Worker.find_by_key('rq:worker:name') worker.successful_job_count # 成功执行的作业任务数 worker.failed_job_count. # 当前工作进程执行失败的作业任务数 worker.total_working_time # 执行作业任务的总耗时
下线工作进程
无论何时,只要工作进程接收到 SIGINT
信号 ( 通过 Ctrl + C
) 或 SIGTERM
信号 ( 通过 kill
) ,则工作进程会等待当前正在运行的任务完成,然后停止循环并优雅地注册自己的死亡
如果在此下线阶段,工作进程再次收到 SIGINT
或 SIGTERM
,工作进程将强制终止子进程 ( 发送 SIGKILL
) ,但仍会尝试注册自己的死亡
也就是俗称两次 CTRL + C
退出进程
使用配置文件
这一特性在 0.3.2
版本中被实现
如果我们想通过配置文件而不是通过命令行参数配置 rq worker
可以通过创建类似 settings.py
的 Python
文件来完成此操作
REDIS_URL = 'redis://localhost:6379/1' # You can also specify the Redis DB to use # REDIS_HOST = 'redis.example.com' # REDIS_PORT = 6380 # REDIS_DB = 3 # REDIS_PASSWORD = 'very secret' # Queues to listen on QUEUES = ['high', 'normal', 'low'] # If you're using Sentry to collect your runtime exceptions, you can use this # to configure RQ for it in a single step # The 'sync+' prefix is required for raven: https://github.com/nvie/rq/issues/350#issuecomment-43592410 SENTRY_DSN = 'sync+http://public:secret@example.com/1'
这个配置文件显示了当前支持的所有选项
需要注意的是 QUEUES
和 REDIS_PASSWORD
设置从 0.3.3
版本开始才被支持
创建完配置文件后,我们就可以使用 -c
参数来指定工作进程启动时使用的配置文件
$ rq worker -c settings
自定义工作进程类
这一特性在 0.4.0
版本中被实现
比如有时候我们需要自定义工作进程的行为
到目前为止,需要这么做的情况一般是
- 在运行作业任务前管理数据库连接
- 使用自定义的作业执行模型,而不是使用
os.fork
- 使用不同的并发模型,例如
multiprocessing
或gevent
不管处于何种目的,我们都可以通过 -w
选项来自定义工作进程类
$ rq worker -w 'path.to.GeventWorker'
自定义作业任务类和队列类
这是一个还未实现的特性,据说下一个版本会实现
简单来说,就是在启动 rq worker
时通过选项 --job-class
和/或 --queue-class
告诉 worker
使用自定义的作业类和队列类
$ rq worker --job-class 'custom.JobClass' --queue-class 'custom.QueueClass'
当然,如果使用了自定义的作业类和队列类,那么在将作业任务加入队列时也必须使用自定义的类
例如
from rq import Queue from rq.job import Job class CustomJob(Job): pass class CustomQueue(Queue): job_class = CustomJob queue = CustomQueue('default', connection=redis_conn) queue.enqueue(some_func)
自定义异常处理器
上一章节提到工作进程启动参数的时候,我们说过可以使用 --exception-handler
选项来自定义异常处理器
这个特性在 0.5.5
版本中被添加
如果我们需要针对不同类型的作业以不同方式处理错误,或者只是想要自定义 RQ
的默认错误处理行为,可以使用 --exception-handler
选项运行 rq worker
$ rq worker --exception-handler 'path.to.my.ErrorHandler'
RQ 还支持多个异常处理器,使用方式也简单,就是多次使用 --exception-handler
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'