一篇文章搞定Celery消息队列配置定时任务
liuian 2024-12-01 00:59 66 浏览
介绍
celery 定时器是一个调度器(scheduler);它会定时地开启(kicks off)任务,然后由集群中可用的工人(worker)来执行。
定时任务记录(entries)默认 从 beat_schedule 设置中获取,但自定义存储也可以使用,如把记录存储到SQL数据库中。
要确保同一时间一份时间表上只有一个调度器在运行,否则会因为重复发送任务而结束。使用集中途径意味着定时任务不用必须同步,并且服务无需用锁操控。
celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
版本和要求
Celery version 4.0 runs on
Python ?2.7, 3.4, 3.5?
PyPy ?5.4, 5.5?
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you’re running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.环境准备
安装rabbitMQ或Redis
安装celery
pip3 install celery快速上手
s1.py
s1.pyimport time
from celery import Celery
app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
@app.task
def xxxxxx(x, y):
time.sleep(10)
return x + ys2.py
from s1 import func
# func,并传入两个参数
result = xxxxxx.delay(4, 4)
print(result.id)s3.py
from celery.result import AsyncResult
from s1 import app
async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')# 执行 s1.py 创建worker(终端执行命令):
celery worker -A s1 -l info
# PS:Windows系统上执行命令时出错解决方法
pip3 install eventlet
# 后期运行修改为:
celery worker -A s1 -l info -P eventlet
# 执行 s2.py ,创建一个任务并获取任务ID:
python3 s2.py
# 执行 s3.py ,检查任务状态并获取结果:
python3 s3.py多任务结构
pro_cel
├── celery_tasks# celery相关文件夹
│ ├── celery.py # celery连接和配置相关文件
│ └── tasks.py # 所有任务函数
├── check_result.py # 检查结果
└── send_task.py # 触发任务pro_cel/celery_tasks/celery
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
celery = Celery('func',
broker='redis://192.168.111.111:6379',
backend='redis://192.168.111.111:6379',
include=['celery_tasks.tasks'])
# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = Falsepro_cel/celery_tasks/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import celery
@celery.task
def func(*args, **kwargs):
time.sleep(5)
return "任务结果"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"pro_cel/check_result.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery.result import AsyncResult
from celery_tasks.celery import celery
async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')pro_cel/send_task.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import celery_tasks.tasks
# 立即告知celery去执行func任务,并传入两个参数
result = celery_tasks.tasks.func.delay(4, 4)
print(result.id)更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html
定时任务
设定时间让celery执行一个任务
import datetime
from celery_tasks.tasks import func
"""
from datetime import datetime
v1 = datetime(2020, 4, 11, 3, 0, 0)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10
# 使用apply_async并设定时间
result = func.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)类似于contab的定时任务
"""
celery beat -A proj
celery worker -A proj -l info
"""
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='amqp://147.918.134.86:5672', backend='amqp://147.918.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
app.conf.beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'proj.s1.add1',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
'add-every-12-seconds': {
'task': 'proj.s1.add1',
'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
'args': (16, 16)
},
}注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。
Flask中应用Celery
pro_flask_celery/
├── app.py
├── celery_tasks
├── celery.py
└── tasks.pyapp.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
from celery.result import AsyncResult
from celery_tasks import tasks
from celery_tasks.celery import celery
app = Flask(__name__)
TASK_ID = None
@app.route('/')
def index():
global TASK_ID
result = tasks.func.delay()
TASK_ID = result.id
return "任务已经提交"
@app.route('/result')
def result():
global TASK_ID
result = AsyncResult(id=TASK_ID, app=celery)
if result.ready():
return result.get()
return "xxxx"
if __name__ == '__main__':
app.run()celery_tasks/celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontab
celery = Celery('func',
broker='redis://192.168.110.148:6379',
backend='redis://192.168.110.148:6379',
include=['celery_tasks.tasks'])
# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = Falsecelery_task/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import celery
@celery.task
def hello(*args, **kwargs):
print('执行hello')
return "hello"
@celery.task
def func(*args, **kwargs):
print('执行func')
return "func"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"记录
为了定时调用任务,你必须添加记录到打点列表中:
from celery import Celery
from celery.schedules import crontab
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 每10秒调用 test('hello') .
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# 每30秒调用 test('world')
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# 每周一上午7:30执行
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)用on_after_configure处理器进行这些设置意味着当使用test.s()时我们不会在模块层面运行app 。
add_periodic_task() 函数在幕后会添加记录到beat_schedule设定,同样的设定可以用来手动设置定时任务:
例子: 每30秒运行 tasks.add .
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'一般会使用配置文件进行配置,如下
celeryconfig.py:
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}程序里使用
app.config_from_object('celeryconfig')
注意
如果你的参数元组里只有一个项目,只用一个逗号就可以了,不要圆括号。时间表使用时间差意味着每30秒间隔会发送任务(第一个任务在celery定时器开启后30秒发送,然后上每次距一次运行后30秒发送一次)
可使用的属性
task:要执行的任务名字
schedule:执行的频率[可以是整数秒数,时间差,或者一个周期( crontab)。你也可以自 定义你的时间表类型,通过扩展schedule接口]
args:位置参数 (list 或 tuple)
kwargs:键值参数 (dict)
options:执行选项 (dict)[这可以是任何被apply_async()支持的参数与—-exchange, routing_key, expires,等]
relative:如果 relative 是 true ,时间表“由时钟时间”安排,意味着 频率近似到最近的秒,分钟,小时或天,这取决于时间差中的时间间隔[默认relative是false,频率不会近似,会相对于celery的启动时间]
Crontab 表达式语法
开启调度
开启celery定时服务
celery -A proj beat可以把定时器嵌入到工人(worker)中,通过启用workers -B选项,如果你永远不会运行超过一个工人节点这就会很方便。但这不太常见,不推荐在生产环境这样使用
celery -A proj worker -B定时器需要在本地数据库文件(默认名为 celerybeat-schedule )存储任务上次运行时间,所以它需要在当前目录中写权限。或者你也可以给这个文件指定一个位置
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule#Python##每天学python##Python入门推荐#
相关推荐
- 手机电脑app安卓下载(手机电脑下载官方下载)
-
apk,这是一种后缀名形式,指的是安卓手机上可运行的文件。但apk文件里面同样会有其他的文件,像jpg文件,文字文件。要是少了一个这些文件,这个apk可能就会出问题。pc,这是一个简称,指的是电脑的意...
- 电脑电源供电不足表现(长按开机键10秒,释放电脑静电)
-
1、电脑主机等电源供电不足,在运行过程中会死机,蓝屏,黑屏,会关机,会断电重启,会死机,时断时续,风扇转速不够,也会支撑不起而导致严重的问题,主要就是对机械硬盘损害严重,别的就是主板,内存,主机本身的...
- dell官网客服电话(dell官网服务电话)
-
戴尔售后服务的官网是:www.dell-wx.com,戴尔售后服务顾问会与您全程合作帮助您规划和建议,包括订单状态、技术支持、戴尔续保服务限时特惠和在线查询产品配置及保修。戴尔中国的官方网站http:...
- win10虚拟桌面(win10虚拟桌面怎么关)
-
方法/步骤新建虚拟桌面可以点击做面左下角的图标或者按快捷键win+Tab然后点击右下角的新建桌面或者直接按快捷键win+Ctrl+D,这时直接出现一个没有打开任何窗口的桌面,桌面图标还在任务栏上打开的...
- 电脑开机密码忘了怎么办解除
-
在登录界面,先随便输入字符并按下回车键,然后窗口会先显示重置密码选项,点击进入密码找回界面,利用提示问题来找回。使用管理员账户登录电脑,然后鼠标右击计算机图标,进入计算机管理界面,在用户选项下,右击用...
- 机械硬盘坏了如何修复(机械硬盘坏了怎么找回数据)
-
1、逻辑坏道修复当机械硬盘出现逻辑坏道,可以将硬盘格式化并重新安装系统,但在之前一定要将硬盘中的重要文件提前备份,以防文件丢失。2、磁头损坏当确认机械硬盘是由于磁头损坏导致识别不出数据的话,如果不是专...
- win7设置屏幕保护(windows7怎么设置屏幕保护)
-
要在Windows7中设置屏幕保护,您可以按照以下步骤进行操作:鼠标右键点击桌面上的空白区域,选择“个性化”。在打开的窗口中,点击左侧导航栏中的“屏幕保护程序”。在屏幕保护程序设置窗口中,您可以选择...
- 笔记本电脑截图是什么键(笔记本截图键是哪个键)
-
1.第一个,通过键盘上的截图键来截取全屏,键盘上都有一个printscreen键,这个键就是用来截图的,只需要按一下这个键,然后再打开word文档,然后按一下ctrl+v键,就可以把这个截图,粘贴...
- 苹果系统下载的文件怎么找(苹果下载的文件在哪)
-
打开文件打开苹果手机自带的文件软件,在打开首页找到右下方浏览选项,点击进入。点击下载项在浏览页面,找到上方个人收藏下的下载项选项,点击进入。查看下载在下载项页面,可以看到手机下载的文件了。1.搜索文件...
- 360安全云盘(360安全云盘免费版下载)
-
关于这个问题,360安全云盘默认的文件存放位置是在本地的“360安全云盘”文件夹中,但是可以通过以下步骤进行设置:1.打开360安全云盘客户端,点击左上角的“设置”按钮;2.在弹出的菜单中选择“常...
- melogin路由器手机设置密码(melogin路由器手机设置密码192.168.1.1)
-
如果是无线路由器,可以用手机无线连接这个无线路由器。然后打开浏览器,输入路由器的地址(不同的路由器的地址并不相同,可以查看路由器的说明书或者到官网下载电子说明书查看)。打开路由器的管理界面后就可以设置...
- 电脑处理器cpu天梯图(电脑cpu处理器性能天梯图)
-
第一名:Intel酷睿i54590 这一款处理器的核心数量为四核,主频为3.3GHz,带有6M的三级缓存,运行的速度很快,接口类型为LGA1150,性价比较高,市面上的价格为1254元。 第二...
- office200732位下载(office2007版下载)
-
我们可以选择一个可靠连接,进入下载界面,点击下载,可以直接选择运行不需要保存,然后下载完成后开始安装。进入安装向导,勾选同意协议,点击继续。系统开始配置,兼容包将自动嵌入office文件包,等待进度完...
- iphone官网查询真伪入口(苹果官网查真伪查询)
-
每台iPhone在外包装盒底部可看到序列号。进入手机设置-通用-关于本机,也能找到序列号,记下它,点击链接,进入苹果官网https://checkcoverage.apple.com/cn/zh,把序...
- 电脑完全黑屏了但是能开机(电脑黑屏但还是开机状态咋办)
-
1.目前造成计算机黑屏的原因主要有两个,一是硬件的故障,二是软件的冲突,而二者的区别主要在于发生黑屏的位置,即是在开机时发生黑屏,还是在正常启动机器后,在使用的过程中出现黑屏。 2.在开机后突然...
- 一周热门
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)
- c++ 字符串查找 (35)
- mysql刷新权限 (34)
