Python RQ 背后的机制

yufei       6 年, 5 月 前       2110

在昨天的文章 Python Redis 任务队列库 RQ 中我们简单的介绍了下 RQ 和 RQ 的一般用法

今天我们就来深入了解 RQ 背后如何使用 Redis

首先,我们改良下昨天的代码

task.py

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return resp.text

main.py

from redis import Redis
from rq import Queue

import task

q = Queue(connection=Redis())

rs = q.enqueue(
          task.count_words_at_url, 'https://www.twle.cn')

print(rs)

也就是在 main.py 中添加了最后一句 print(rs)

运行

首先停止运行 rq workshell

运行 main.py 得到结果如下

<Job 01421493-ba12-4310-8dfa-bb9f72cc9068: task.count_words_at_url('https://www.twle.cn')>

想必经常使用 Python 的你对这个语法不会太陌生

我们一般都重写类的 __repr__ 函数来输出这种格式

  1. Job 表示这是一个 Job 类生成的实例
  2. 01421493-ba12-4310-8dfa-bb9f72cc9068 是当前作业任务的 ID
  3. task.count_words_at_url 表示要执行的函数
  4. 'https://www.twle.cn' 表示执行函数时要传递的参数

因为我们使用的是默认的 Redis,所以,RQ 会把所有的作业任务保存到 127.0.0.1:6379 上的第 0 个数据库中

我们使用 redis-cli 命令登录到 Redis 服务器上,然后使用 keys * 命令获取所有的键,返回结果如下

$ redis-cli
127.0.0.1:6379> keys *
1) "rq:queue:default"
2) "rq:job:01421493-ba12-4310-8dfa-bb9f72cc9068"
3) "rq:queues"

为啥键这么少,是因为教程需要,我运行 python main.py 前先用 flushall 命令清空了缓存

可以看到有三个键

  1. rq:queue:default

    rq:queue:default 是默认的任务队列,是一个列表型,保存了所有任务的 ID

    使用 type 命令可以看到 rq:queue:default 的数据类型是 list

    127.0.0.1:6379> type rq:queue:default
    list
    

    然后我们可以说使用 lrange 命令查看 rq:queue:default 中所有的作业任务 ID

    127.0.0.1:6379> LRANGE rq:queue:default 0 -1
    1) "01421493-ba12-4310-8dfa-bb9f72cc9068"
    
  2. rq:job:01421493-ba12-4310-8dfa-bb9f72cc9068

    rq:job:01421493-ba12-4310-8dfa-bb9f72cc9068 键保存了具体的作业任务,是一个哈希类型

    127.0.0.1:6379> type rq:job:01421493-ba12-4310-8dfa-bb9f72cc9068
    hash
    

    我们可以使用 hgetall 查看下这个哈希键值对的数据

    127.0.0.1:6379> hgetall rq:job:01421493-ba12-4310-8dfa-bb9f72cc9068
    1) "timeout"
    2) "180"
    3) "origin"
    4) "default"
    5) "description"
    6) "task.count_words_at_url('https://www.twle.cn')"
    7) "status"
    8) "queued"
    9) "enqueued_at"
    10) "2018-07-13T00:56:08.173963Z"
    11) "data"
    12) "x\x9ck`\x99j\xc9\x00\x01\x1a=\xe2%\x89\xc5\xd9z\xc9\xf9\xa5y%\xf1\xe5\xf9E)\xc5\xf1\x89%\xf1\xa5E9S\xfcz\x843JJ\n\x8a\xad\xf4\xf5\xcb\xcb\xcb\xf5J\xcasR\xf5\x92\xf3\xa6\xb4N\xa9\x9dR2E\x0f\x00\xdcL\x18*"
    13) "created_at"
    14) "2018-07-13T00:56:08.173413Z"
    

    因为 hgetall 命令中奇数是键,偶数是值,所以,一个作业任务有以下属性

    说明
    timeout 任务执行的超时时间 180
    origin 保存的作业任务队列 default
    description 描述 一般是该作业任务的 __repr__ 函数结果
    status 状态 queued
    enqueued_at 加入队列时间 2018-07-13T00:56:08.173963Z
    data 作业任务要执行的函数,pickle 序列化后的结果
    created_at 创建时间 2018-07-13T00:56:08.173963Z
  3. rq:queues

    rq:queues 保存了 RQ 的所有作业任务队列,是一个集合 (set) 型,保存的是所有作业任务的键值 key

    127.0.0.1:6379> type rq:queues
    set
    

    可以使用 SINTER 命令查看该集合中所有的元素

    127.0.0.1:6379> SINTER rq:queues
    1) "rq:queue:default"
    

正常情况下,RQ 在 Redis 中存储的数据大概就这些了

最后我们来看看一个作业任务有多少状态

作业任务的状态在 job.py 文件中定义

QUEUED='queued',
FINISHED='finished',
FAILED='failed',
STARTED='started',
DEFERRED='deferred'
说明
queued 处于队列中,等待被执行
finished 已经完成
failed 作业任务运行失败
started 作业任务开始执行
deferred 作业任务被延时执行
目前尚无回复
简单教程 = 简单教程,简单编程
简单教程 是一个关于技术和学习的地方
现在注册
已注册用户请 登入
关于   |   FAQ   |   我们的愿景   |   广告投放   |  博客

  简单教程,简单编程 - IT 入门首选站

Copyright © 2013-2022 简单教程 twle.cn All Rights Reserved.