一篇文章搞定Celery消息队列配置定时任务
liuian 2024-12-01 00:59 52 浏览
介绍
celery 定时器是一个调度器(scheduler);它会定时地开启(kicks off)任务,然后由集群中可用的工人(worker)来执行。
定时任务记录(entries)默认 从 beat_schedule 设置中获取,但自定义存储也可以使用,如把记录存储到SQL数据库中。
要确保同一时间一份时间表上只有一个调度器在运行,否则会因为重复发送任务而结束。使用集中途径意味着定时任务不用必须同步,并且服务无需用锁操控。
celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
版本和要求
Celery version 4.0 runs on
Python ?2.7, 3.4, 3.5?
PyPy ?5.4, 5.5?
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you’re running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.环境准备
安装rabbitMQ或Redis
安装celery
pip3 install celery快速上手
s1.py
s1.pyimport time
from celery import Celery
app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
@app.task
def xxxxxx(x, y):
time.sleep(10)
return x + ys2.py
from s1 import func
# func,并传入两个参数
result = xxxxxx.delay(4, 4)
print(result.id)s3.py
from celery.result import AsyncResult
from s1 import app
async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')# 执行 s1.py 创建worker(终端执行命令):
celery worker -A s1 -l info
# PS:Windows系统上执行命令时出错解决方法
pip3 install eventlet
# 后期运行修改为:
celery worker -A s1 -l info -P eventlet
# 执行 s2.py ,创建一个任务并获取任务ID:
python3 s2.py
# 执行 s3.py ,检查任务状态并获取结果:
python3 s3.py多任务结构
pro_cel
├── celery_tasks# celery相关文件夹
│ ├── celery.py # celery连接和配置相关文件
│ └── tasks.py # 所有任务函数
├── check_result.py # 检查结果
└── send_task.py # 触发任务pro_cel/celery_tasks/celery
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
celery = Celery('func',
broker='redis://192.168.111.111:6379',
backend='redis://192.168.111.111:6379',
include=['celery_tasks.tasks'])
# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = Falsepro_cel/celery_tasks/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import celery
@celery.task
def func(*args, **kwargs):
time.sleep(5)
return "任务结果"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"pro_cel/check_result.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery.result import AsyncResult
from celery_tasks.celery import celery
async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')pro_cel/send_task.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import celery_tasks.tasks
# 立即告知celery去执行func任务,并传入两个参数
result = celery_tasks.tasks.func.delay(4, 4)
print(result.id)更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html
定时任务
设定时间让celery执行一个任务
import datetime
from celery_tasks.tasks import func
"""
from datetime import datetime
v1 = datetime(2020, 4, 11, 3, 0, 0)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10
# 使用apply_async并设定时间
result = func.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)类似于contab的定时任务
"""
celery beat -A proj
celery worker -A proj -l info
"""
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='amqp://147.918.134.86:5672', backend='amqp://147.918.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
app.conf.beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'proj.s1.add1',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
'add-every-12-seconds': {
'task': 'proj.s1.add1',
'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
'args': (16, 16)
},
}注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。
Flask中应用Celery
pro_flask_celery/
├── app.py
├── celery_tasks
├── celery.py
└── tasks.pyapp.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
from celery.result import AsyncResult
from celery_tasks import tasks
from celery_tasks.celery import celery
app = Flask(__name__)
TASK_ID = None
@app.route('/')
def index():
global TASK_ID
result = tasks.func.delay()
TASK_ID = result.id
return "任务已经提交"
@app.route('/result')
def result():
global TASK_ID
result = AsyncResult(id=TASK_ID, app=celery)
if result.ready():
return result.get()
return "xxxx"
if __name__ == '__main__':
app.run()celery_tasks/celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontab
celery = Celery('func',
broker='redis://192.168.110.148:6379',
backend='redis://192.168.110.148:6379',
include=['celery_tasks.tasks'])
# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = Falsecelery_task/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import celery
@celery.task
def hello(*args, **kwargs):
print('执行hello')
return "hello"
@celery.task
def func(*args, **kwargs):
print('执行func')
return "func"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"记录
为了定时调用任务,你必须添加记录到打点列表中:
from celery import Celery
from celery.schedules import crontab
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 每10秒调用 test('hello') .
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# 每30秒调用 test('world')
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# 每周一上午7:30执行
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)用on_after_configure处理器进行这些设置意味着当使用test.s()时我们不会在模块层面运行app 。
add_periodic_task() 函数在幕后会添加记录到beat_schedule设定,同样的设定可以用来手动设置定时任务:
例子: 每30秒运行 tasks.add .
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'一般会使用配置文件进行配置,如下
celeryconfig.py:
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}程序里使用
app.config_from_object('celeryconfig')
注意
如果你的参数元组里只有一个项目,只用一个逗号就可以了,不要圆括号。时间表使用时间差意味着每30秒间隔会发送任务(第一个任务在celery定时器开启后30秒发送,然后上每次距一次运行后30秒发送一次)
可使用的属性
task:要执行的任务名字
schedule:执行的频率[可以是整数秒数,时间差,或者一个周期( crontab)。你也可以自 定义你的时间表类型,通过扩展schedule接口]
args:位置参数 (list 或 tuple)
kwargs:键值参数 (dict)
options:执行选项 (dict)[这可以是任何被apply_async()支持的参数与—-exchange, routing_key, expires,等]
relative:如果 relative 是 true ,时间表“由时钟时间”安排,意味着 频率近似到最近的秒,分钟,小时或天,这取决于时间差中的时间间隔[默认relative是false,频率不会近似,会相对于celery的启动时间]
Crontab 表达式语法
开启调度
开启celery定时服务
celery -A proj beat可以把定时器嵌入到工人(worker)中,通过启用workers -B选项,如果你永远不会运行超过一个工人节点这就会很方便。但这不太常见,不推荐在生产环境这样使用
celery -A proj worker -B定时器需要在本地数据库文件(默认名为 celerybeat-schedule )存储任务上次运行时间,所以它需要在当前目录中写权限。或者你也可以给这个文件指定一个位置
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule#Python##每天学python##Python入门推荐#
相关推荐
- MySQL慢查询优化:从explain到索引,DBA手把手教你提升10倍性能
-
数据库性能是应用系统的生命线,而慢查询就像隐藏在系统中的定时炸弹。某电商平台曾因一条未优化的SQL导致订单系统响应时间从200ms飙升至8秒,最终引发用户投诉和订单流失。今天我们就来系统学习MySQL...
- 一文读懂SQL五大操作类别(DDL/DML/DQL/DCL/TCL)的基础语法
-
在SQL中,DDL、DML、DQL、DCL、TCL是按操作类型划分的五大核心语言类别,缩写及简介如下:DDL(DataDefinitionLanguage,数据定义语言):用于定义和管理数据库结构...
- 闲来无事,学学Mysql增、删,改,查
-
Mysql增、删,改,查1“增”——添加数据1.1为表中所有字段添加数据1.1.1INSERT语句中指定所有字段名语法:INSERTINTO表名(字段名1,字段名2,…)VALUES(值1...
- 数据库:MySQL 高性能优化规范建议
-
数据库命令规范所有数据库对象名称必须使用小写字母并用下划线分割所有数据库对象名称禁止使用MySQL保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来)数据库对象的命名要能做到见名识意,...
- 下载工具合集_下载工具手机版
-
迅雷,在国内的下载地位还是很难撼动的,所需要用到的地方还挺多。缺点就是不开会员,软件会限速。EagleGet,全能下载管理器,支持HTTP(S)FTPMMSRTSP协议,也可以使用浏览器扩展检测...
- mediamtx v1.15.2 更新详解:功能优化与问题修复
-
mediamtxv1.15.2已于2025年10月14日发布,本次更新在功能、性能优化以及问题修复方面带来了多项改进,同时也更新了部分依赖库并提升了安全性。以下为本次更新的详细内容:...
- 声学成像仪:泄露监测 “雷达” 方案开启精准防控
-
声学成像仪背景将声像图与阵列上配装的摄像实所拍的视频图像以透明的方式叠合在一起,就形成了可直观分析被测物产生状态。这种利用声学、电子学和信息处理等技术,变换成人眼可见的图像的技术可以帮助人们直观地认识...
- 最稳存储方案:两种方法将摄像头接入威联通Qu405,录像不再丢失
-
今年我家至少被4位邻居敲门,就是为了查监控!!!原因是小区内部监控很早就停止维护了,半夜老有小黄毛掰车门偷东西,还有闲的没事划车的,车主损失不小,我家很早就配备监控了,人来亮灯有一定威慑力,不过监控设...
- 离岗检测算法_离岗检查内容
-
一、研发背景如今社会许多岗位是严禁随意脱离岗位的,如塔台、保安室、监狱狱警监控室等等,因为此类行为可能会引起重大事故,而此类岗位监督管理又有一定困难,因此促生了智能视频识别系统的出现。二、产品概述及工...
- 消防安全通道占用检测报警系统_消防安全通道占用检测报警系统的作用
-
一、产品概述科缔欧消防安全通道占用检测报警系统,是创新行业智能监督管理方式、完善监管部门动态监控及预警预报体系的信息化手段,是实现平台远程监控由“人为监控”向“智能监控”转变的必要手段。产品致力于设...
- 外出住酒店、民宿如何使用手机检测隐藏的监控摄像头
-
最近,一个家庭在他们的民宿收到了一个大惊喜:客厅里有一个伪装成烟雾探测器的隐藏摄像头,监视着他们的一举一动。隐藏摄像头的存在如果您住在酒店或民宿,隐藏摄像头不应再是您的担忧。对于民宿,房东应报告所有可...
- 基于Tilera众核平台的流媒体流量发生系统的设计
-
曾帅,高宗彬,赵国锋(重庆邮电大学通信与信息工程学院,重庆400065)摘要:设计了一种基于Tilera众核平台高强度的流媒体流量发生系统架构,其主要包括:系统界面管理模块、服务承载模块和流媒体...
- 使用ffmpeg将rtsp流转流实现h5端播放
-
1.主要实现rtsp转tcp协议视频流播放ffmpeg下载安装(公认业界视频处理大佬)a、官网地址:www.ffmpeg.org/b、gitHub:github.com/FFmpeg/FFmp…c、推...
- 将摄像头视频流从Rtsp协议转为websocket协议
-
写在前面很多通过摄像头拿到的视频流格式都是Rtsp协议的,比如:海康威视摄像头。在现代的浏览器中,已经不支持直接播放Rtsp视频流,而且,海康威视提供的本身的webSdk3.3.0视频插件有很多...
- 华芸科技推出安全监控中心2.1 Beta测试版
-
全球独家支持hdmi在线实时监看摄像机画面,具单一、循环或同时监看四频道视频影像,可透过华芸专用红外线遥控器、airemote或是键盘鼠标进行操作,提供摄像机频道增购服务,满足用户弹性扩增频道需...
- 一周热门
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)
- c++ 字符串查找 (35)
- mysql刷新权限 (34)
