当前位置:网站首页>Celery进阶_任务优先级分配
Celery进阶_任务优先级分配
2022-08-09 02:54:00 【Loganer】
Background
书接上回Celery入门,简单使用了Celery的基本功能使我们的任务能够“并发”的跑起来,但是当所有的worker都被占用时我们想让一些任务优先的执行,这个时候我们应该怎么做呢?
- 设置任务的优先级(有时会不生效,具体原因未知)
- 设置消息队列的优先级
PS:本文使用的Celery版本为5.0.2,不同版本会有差异。
参数说明
CELERYD_PREFETCH_MULTIPLIER:
预取消息数量,默认会取4*并发数,在上面的例子中则会最多预取40个消息,
如果设置为1,则表示禁止预取。
CELERY_ACKS_LATE:
默认为FALSE,这个参数的理解需要先了解下rabbitMQ的ACK机制,
简单说就是如果设置True则会在任务执行完成后才会对消息队列发送确认,表明消息已被消费,
如果任务执行中发生了异常情况,未发送确认消息,则消息队列会继续保留此消息,直到下一个worker取走并成功执行。
此时衍生出了一个问题,需要保证此消息是幂等的,
也就是无论执行多少次结果都一样,否则可能会得到一些预料之外的结果。
Code
# @Time : 2022/8/5 10:31
# @Author :
# @FileName: my_task.py
# @Description:
import time
from celery import Celery, signature, group
from celery.exceptions import TimeoutError
from celery.result import AsyncResult
from kombu import Queue, Exchange
CONFIG = {
# 优先级需设置这些参数
'CELERY_ACKS_LATE': True,
'CELERYD_PREFETCH_MULTIPLIER': 2,
# 设置时区
'CELERY_TIMEZONE': 'Asia/Shanghai',
# 默认为true,UTC时区
'CELERY_ENABLE_UTC': True,
# broker
'BROKER_URL': 'redis://10.11.ip.ip:6379/1',
# backend配置,注意指定redis数据库
'CELERY_RESULT_BACKEND': 'redis://10.11.ip.ip:6379/2',
# worker最大并发数
'CELERYD_CONCURRENCY': 1,
# 如果不设置,默认是celery队列,此处使用默认的直连交换机,routing_key完全一致才会调度到celery_demo队列
# 此处注意,元组中只有一个值的话,需要最后加逗号
'CELERY_QUEUES': (
Queue("celery_demo", Exchange("celery_demo"), routing_key="celery_demo", priority=3),
Queue("celery_demo_high", Exchange("celery_demo_high"), routing_key="celery_demo_high", priority=9)
)
}
app = Celery()
app.config_from_object(CONFIG)
@app.task(name='demo_task')
def demo_task(x, y):
print(f"这是一个demo任务,睡了2秒,并返回了{
x}+{
y}的结果。")
time.sleep(2)
return x + y
if __name__ == '__main__':
# call()
l1 = list()
for i in range(1, 50):
# sig = demo_task.s(i, i)
# sig.apply_async(priority=9)
l1.append(signature("demo_task", args=(i, i), app=app))
task_group = group(l1).apply_async(priority=9)
# task_group = group(l1).apply_async(queue='celery_demo')
task_group.get()
# sig = demo_task.s(3, 3)
# sig.apply_async(priority=9)
# sig = demo_task.s(4, 4)
# sig.apply_async(priority=9)
# sig = demo_task.s(5, 5)
# sig.apply_async(priority=9)
# sig = demo_task.s(6, 6)
# sig.apply_async(priority=6)
# sig = demo_task.s(7, 7)
# sig.apply_async(priority=6)
# sig = demo_task.s(8, 8)
# sig.apply_async(priority=6)
# sig = demo_task.s(9, 9)
# sig.apply_async(priority=6)
# sig = demo_task.s(10, 10)
# sig.apply_async(priority=6)
## 另起一群任务
if __name__ == '__main__':
# call()
l2 = list()
for i in range(101, 120):
# sig = demo_task.s(i, i)
# sig.apply_async(priority=6)
l2.append(signature("demo_task", args=(i, i), app=app))
task_group = group(l2).apply_async(priority=6)
task_group.get()
效果

TIPS

官方文档中使用Redis做消息队列时priority数字越大优先级越低,而RabbitMQ正好相反。摊手.jpg
引用
https://docs.celeryq.dev/en/v5.0.2/userguide/calling.html
http://allynh.com/blog/flask-asynchronous-background-tasks-with-celery-and-redis/
https://segmentfault.com/a/1190000039939846
https://juejin.cn/post/6844904133514756109
https://www.celerycn.io/v/4.4.0/yong-hu-zhi-nan/lu-you-ren-wu-routing-tasks
边栏推荐
- CI/CD:持续集成/持续部署(难舍难分)
- Take you do interface test from zero to the first case summary
- 三箭资本濒临破产?市场陷入1907年恐慌之中?加密监管不可避免
- MVVM项目开发(商品管理系统二)
- opencv学习入门
- Solve the Final Fantasy 13-2 Clock Puzzle with DFS
- ERROR:Module not found: Error: Can‘t resolve ‘core-js/modules/es.promise.js‘ in ‘address‘
- C专家编程 第9章 再论数组 9.7 轻松一下---软件/硬件平衡
- 【es6】教程 Symbol数据以及迭代器和生成器
- 关于eBPF与可观测性,你想知道的都在这里
猜你喜欢

Building PO layered architecture of automated testing framework from 0

笔算开2次方根、3次方根详细教程

Jenkins environment deployment, (packaging, publishing, deployment, automated testing)

Likou Brush Question Record 1.5-----367. Valid perfect squares

普通人如何增加收入

工作小计 rtcp的length和网络字节序

并查集相关知识点

redis集群详解

加密公司集体裁员 以应对加密寒冬和通货膨胀?现加密总市值低于1万亿美元

【图像去噪】基于边缘增强扩散 (cEED) 和 Coherence Enhancing Diffusion (cCED) 滤波器实现图像去噪附matlab代码
随机推荐
从0开始搭建自动化测试框架之PO分层架构
【扫雷--1】
使用TensorRT对AlphaPose模型进行加速
JS 将对象拆开拼接成 URL
What aspects should we start with for interface security testing?
独立机器连接cdh的spark集群,远程提交任务(绝对可以成功,亲测了n遍)
【剑指offer】二进制中1的个数&&2的幂
【图像去噪】基于边缘增强扩散 (cEED) 和 Coherence Enhancing Diffusion (cCED) 滤波器实现图像去噪附matlab代码
1261. 在受污染的二叉树中查找元素
Jenkins configuration nail notification
grafana的panel点击title,没有反应,没有出现edit选项
C专家编程 第9章 再论数组 9.6 C语言的多维数组
Jenkins配置钉钉通知
VS2019编译boost_1_79,生成32位和64位静态库
机器学习入门
加密公司集体裁员 以应对加密寒冬和通货膨胀?现加密总市值低于1万亿美元
2022年最流行的自动化测试工具有哪些?全网最全最细都在这里了
多御安全浏览安卓版升级尝鲜,新增下载管理功能
20220527动态规划:零钱兑换
C专家编程 第8章 为什么程序员无法分清万圣节和圣诞节 8.10 轻松一下---国际C语言混乱代码大赛