导航
导航
文章目录
  1. 0x00 介绍
  2. 0x01 简易入门
    1. 使用 Huey
    2. 获取任务执行的结果
    3. 设置任务的执行时间
    4. 异常重试
    5. 取消或暂停任务
    6. 任务链
    7. 加锁任务
  3. 0x02 Flask 与 Huey 的应用

任务队列调度系统 Huey 简易入门

0x00 介绍

Huey 是一个基于 Python 开发的简易任务队列调度系统。类似于 Celery, 虽然没有 Celery 那么强大,但是胜在简洁。任务队列调度系统该有的功能它都有。

Huey 支持:

  • 进程、线程、协程任务并发模型
  • 安排任务的调度时间
  • 定时任务
  • 异常重试机制
  • 定制任务优先级
  • 存储任务执行结果
  • 加锁任务
  • 任务管道及任务链

0x01 简易入门

使用 Huey

环境

1
2
3
Python 3.7
Huey 2.0.1
Redis

demo 地址

使用 Huey 需要有三个组件:

  • 一个生产者程序,例如 Web 应用
  • 一个消费者程序,用来执行队列中的任务
  • 存储任务的队列,例如 Redis

项目结构:

1
2
3
4
├── __init__.py
├── config.py
├── main.py
└── tasks.py

Huey 的 API 与 Celery 类似,需要先创建一个 Huey 实例

1
2
3
4
# config.py
from huey import RedisHuey

huey = RedisHuey('huey_demo', host='localhost', password='just4redis')

然后定义一些任务

1
2
3
4
5
6
7
# task.py

from config import huey

@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)

然后创建一个生产者程序

1
2
3
4
5
6
7
8
9
10
# main.py

from config import huey
from tasks import count_beans


if __name__ == '__main__':
beans = input('How many beans?\n')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans)

运行 Huey 消费者程序

1
huey_consumer.py main.huey

再运行生产者程序

1
python main.py

获取任务执行的结果

有时候生产者程序在生产了一个任务之后,需要得到任务执行后的结果,只需要在定义任务的时候加上返回操作

1
2
3
4
5
6
7
# task.py
from config import huey

@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
return 'Count %s beans' % num

重启 Huey 消费者程序, 在解释器中执行任务

1
2
3
4
5
6
7
8
9
In [1]: from main import count_beans

In [2]: res = count_beans(100)

In [3]: res
Out[3]: <huey.api.TaskResultWrapper at 0x1058cdc88>

In [4]: res() # 获取结果
Out[4]: 'Count 100 beans'

设置任务的执行时间

有时候,我们不想立即执行任务,想要过一段时间再执行。比如,想要两个小时候发布一篇博客。我们可以这样执行

1
2
3
4
5
6
7
8
9
10
In [1]: from main import count_beans

In [2]: res = count_beans.schedule(args=(100,), delay=30)

In [3]: res() # 返回是 None, 任务还没有执行

In [4]: res()

In [5]: res(blocking=True)
Out[5]: 'Count 100 beans'

异常重试

有时候,任务在执行过程中,由于一些原因,导致执行失败,需要我们重新执行任务。 Huey 支持任务异常重试操作,在定义任务的时候指定 retries 参数,任务执行异常之后,会重新进入队列被消费者消费执行,直到指定的次数为止。

1
2
3
4
5
6
7
# task.py
from config import huey

@huey.task(retries=3)
def try_thrice():
print('tring...')
raise Exception('nope')

我们也可以指定每次重新执行的间隔时间

1
2
3
4
5
6
7
# task.py
from config import huey

@huey.task(retries=3, retry_delay=10)
def try_thrice():
print('tring...')
raise Exception('nope')

取消或暂停任务

有时候,我们想取消队列当中的任务执行,可以使用 revoke 操作,在使用这个操作前,实例化 Huey 类的时候,需要将 result_store 设置为 True.

1
2
3
4
5
res = count_beans(10000000)
res.revoke()

res = count_beans.schedule(args=(100000,) delay=100)
res.revoke()

任务链

Huey 支持多个任务串在一起链成一个任务执行。

1
2
3
4
5
@huey.task()
def add(a, b):
return a + b

res = add(1, 2)

上面的操作与下面的操作是等价的

1
2
task = add.s(1, 2)
result = huey.enqueue(task)

huey.task() 装饰的任务,会返回一个 TaskWrapper 对象,这个对象的 s() 方法会创建一个 QueueTask 实例,QueueTask 就是消息队列中的消息实例,它被序列化入队,然后出队反序列化被消费者执行。

我们可以用 QueueTask.then() 方法将所有的 QueueTask 链接起来。

1
2
3
4
5
6
7
8
9
10
11
12
add_task = add.s(1, 2)

pipline = (add_task
.then(add, 3)
.then(add, 4)
.then(add, 5))

res = huey.enqueue(pipline)

print([res.get(blocking=True) for result in results])

# [3, 6, 10, 15]

加锁任务

有时候,我们在执行一个任务的时候,需要加锁执行。可以使用 huey.lock_task() 方法,这个方法可以用装饰器或者上下文管理方式使用。如果是使用装饰器方法的话,需要装饰在方法的最内层。

1
2
3
4
5
6
7
8
9
10
11
12
@huey.task()
@huey.lock_task('reports-lock')
def generate_report();
run_something()


@huey.task()
def backup():
do_something()

with huey.lock_task('db-backup'):
db_db_backup()

0x02 Flask 与 Huey 的应用

在 Flask 中使用 Huey 的项目结构如下

1
2
3
4
5
6
7
8
9
├── __init__.py
├── autoapp.py
├── huey_worker.py
└── web
├── __init__.py
├── app.py
├── hueyapp.py
├── tasks.py
└── views.py

Demo 地址

需要注意的是 Flask 中有个应用上线文的概念,如果定义的 task 中 存在跟 Flask 应用上下文有关的操作,比如用 Flask-SQLAlachemy 操作数据库等,需要引入应用上下文环境。

1
2
3
4
5
6
7
8
9
10
# flask_huey_demo/web/tasks.py

@huey.task()
def count_beans(num):
from autoapp import app

with app_context():
# 一些 db 操作
print('-- counted %s beans --' % num)
return 'Count %s beans' % num