一篇文章搞定Celery消息队列配置定时任务
liuian 2024-12-01 00:59 28 浏览
介绍
celery 定时器是一个调度器(scheduler);它会定时地开启(kicks off)任务,然后由集群中可用的工人(worker)来执行。
定时任务记录(entries)默认 从 beat_schedule 设置中获取,但自定义存储也可以使用,如把记录存储到SQL数据库中。
要确保同一时间一份时间表上只有一个调度器在运行,否则会因为重复发送任务而结束。使用集中途径意味着定时任务不用必须同步,并且服务无需用锁操控。
celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
版本和要求
Celery version 4.0 runs on
Python ?2.7, 3.4, 3.5?
PyPy ?5.4, 5.5?
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you’re running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
环境准备
安装rabbitMQ或Redis
安装celery
pip3 install celery
快速上手
s1.py
s1.pyimport time
from celery import Celery
app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
@app.task
def xxxxxx(x, y):
time.sleep(10)
return x + y
s2.py
from s1 import func
# func,并传入两个参数
result = xxxxxx.delay(4, 4)
print(result.id)
s3.py
from celery.result import AsyncResult
from s1 import app
async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
# 执行 s1.py 创建worker(终端执行命令):
celery worker -A s1 -l info
# PS:Windows系统上执行命令时出错解决方法
pip3 install eventlet
# 后期运行修改为:
celery worker -A s1 -l info -P eventlet
# 执行 s2.py ,创建一个任务并获取任务ID:
python3 s2.py
# 执行 s3.py ,检查任务状态并获取结果:
python3 s3.py
多任务结构
pro_cel
├── celery_tasks# celery相关文件夹
│ ├── celery.py # celery连接和配置相关文件
│ └── tasks.py # 所有任务函数
├── check_result.py # 检查结果
└── send_task.py # 触发任务
pro_cel/celery_tasks/celery
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
celery = Celery('func',
broker='redis://192.168.111.111:6379',
backend='redis://192.168.111.111:6379',
include=['celery_tasks.tasks'])
# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False
pro_cel/celery_tasks/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import celery
@celery.task
def func(*args, **kwargs):
time.sleep(5)
return "任务结果"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"
pro_cel/check_result.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery.result import AsyncResult
from celery_tasks.celery import celery
async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
pro_cel/send_task.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import celery_tasks.tasks
# 立即告知celery去执行func任务,并传入两个参数
result = celery_tasks.tasks.func.delay(4, 4)
print(result.id)
更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html
定时任务
设定时间让celery执行一个任务
import datetime
from celery_tasks.tasks import func
"""
from datetime import datetime
v1 = datetime(2020, 4, 11, 3, 0, 0)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10
# 使用apply_async并设定时间
result = func.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)
类似于contab的定时任务
"""
celery beat -A proj
celery worker -A proj -l info
"""
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='amqp://147.918.134.86:5672', backend='amqp://147.918.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
app.conf.beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'proj.s1.add1',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
'add-every-12-seconds': {
'task': 'proj.s1.add1',
'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
'args': (16, 16)
},
}
注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。
Flask中应用Celery
pro_flask_celery/
├── app.py
├── celery_tasks
├── celery.py
└── tasks.py
app.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
from celery.result import AsyncResult
from celery_tasks import tasks
from celery_tasks.celery import celery
app = Flask(__name__)
TASK_ID = None
@app.route('/')
def index():
global TASK_ID
result = tasks.func.delay()
TASK_ID = result.id
return "任务已经提交"
@app.route('/result')
def result():
global TASK_ID
result = AsyncResult(id=TASK_ID, app=celery)
if result.ready():
return result.get()
return "xxxx"
if __name__ == '__main__':
app.run()
celery_tasks/celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontab
celery = Celery('func',
broker='redis://192.168.110.148:6379',
backend='redis://192.168.110.148:6379',
include=['celery_tasks.tasks'])
# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False
celery_task/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import celery
@celery.task
def hello(*args, **kwargs):
print('执行hello')
return "hello"
@celery.task
def func(*args, **kwargs):
print('执行func')
return "func"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"
记录
为了定时调用任务,你必须添加记录到打点列表中:
from celery import Celery
from celery.schedules import crontab
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 每10秒调用 test('hello') .
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# 每30秒调用 test('world')
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# 每周一上午7:30执行
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
用on_after_configure处理器进行这些设置意味着当使用test.s()时我们不会在模块层面运行app 。
add_periodic_task() 函数在幕后会添加记录到beat_schedule设定,同样的设定可以用来手动设置定时任务:
例子: 每30秒运行 tasks.add .
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'
一般会使用配置文件进行配置,如下
celeryconfig.py:
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
程序里使用
app.config_from_object('celeryconfig')
注意
如果你的参数元组里只有一个项目,只用一个逗号就可以了,不要圆括号。
时间表使用时间差意味着每30秒间隔会发送任务(第一个任务在celery定时器开启后30秒发送,然后上每次距一次运行后30秒发送一次)
可使用的属性
task:要执行的任务名字
schedule:执行的频率[可以是整数秒数,时间差,或者一个周期( crontab)。你也可以自 定义你的时间表类型,通过扩展schedule接口]
args:位置参数 (list 或 tuple)
kwargs:键值参数 (dict)
options:执行选项 (dict)[这可以是任何被apply_async()支持的参数与—-exchange, routing_key, expires,等]
relative:如果 relative 是 true ,时间表“由时钟时间”安排,意味着 频率近似到最近的秒,分钟,小时或天,这取决于时间差中的时间间隔[默认relative是false,频率不会近似,会相对于celery的启动时间]
Crontab 表达式语法
开启调度
开启celery定时服务
celery -A proj beat
可以把定时器嵌入到工人(worker)中,通过启用workers -B选项,如果你永远不会运行超过一个工人节点这就会很方便。但这不太常见,不推荐在生产环境这样使用
celery -A proj worker -B
定时器需要在本地数据库文件(默认名为 celerybeat-schedule )存储任务上次运行时间,所以它需要在当前目录中写权限。或者你也可以给这个文件指定一个位置
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
#Python##每天学python##Python入门推荐#
相关推荐
- 使用Assembly打包和部署Spring Boot工程
-
SpringBoot项目的2种部署方式目前来说,SpringBoot项目有如下2种常见的部署方式一种是使用docker容器去部署。将SpringBoot的应用构建成一个docke...
- java高级用法之:调用本地方法的利器JNA
-
简介JAVA是可以调用本地方法的,官方提供的调用方式叫做JNI,全称叫做javanativeinterface。要想使用JNI,我们需要在JAVA代码中定义native方法,然后通过javah命令...
- Linux中如何通过Shell脚本来控制Spring Boot的Jar包启停服务?
-
SpringBoot项目在为开发者带来方便的同时,也带来了一个新的问题就是Jar包如何启动?在一般情况下我们都是采用了最为经典的java-jar命令来进行启动。然后通过ps命令找到对应的应用线程通...
- 牛逼!自己手写一个热加载(人民币手写符号一个横还是两个横)
-
热加载:在不停止程序运行的情况下,对类(对象)的动态替换JavaClassLoader简述Java中的类从被加载到内存中到卸载出内存为止,一共经历了七个阶段:加载、验证、准备、解析、初始化、使用、...
- java 错误: 找不到或无法加载主类?看看怎么解决吧!
-
问题扫述:项目名称调整,由原来的com.mp.qms.report.biz调整为com.mp.busicen.mec.qms.report.biz后。项目在IDEA直接运行,但打包部署到服务器...
- 如何将 Spring Boot 工程打包成独立的可执行 JAR 包
-
导语:通过将SpringBoot项目打包成独立的可执行JAR包,可以方便地在任何支持Java环境的机器上运行项目。本文将详细介绍如何通过Maven构建插件将SpringBoot...
- class 增量发包改造为 jar 包方式发布
-
大纲class增量发包介绍项目目录结构介绍jar包方式发布落地方案class增量发包介绍当前项目的迭代修复都是通过class增量包来发版本的将改动的代码class增量打包,如下图cla...
- Jar启动和IDE里启动Sprintboot的区别
-
想聊明白这个问题,需要补充一些前提条件,比如Fatjar、类加载机制等1、Fatjar我们在开发业务程序的时候,经常需要引用第三方的jar包,最终程序开发完成之后,通过打包程序,会把自己的代码和三...
- Java 20年,以后将往哪儿走?(java还能流行多久)
-
在今年的Java20周年的庆祝大会中,JavaOne2015的中心议题是“Java的20年”。甲骨文公司Java平台软件开发部的副总裁GeorgesSaab的主题演讲就将关注点放在了java...
- Spring Boot Jar 包秒变 Docker 镜像实现多环境部署
-
你是否在互联网大厂后端开发工作中,遇到过这样的困扰?当完成一个SpringBoot项目开发,准备将Jar包部署到不同环境时,却发现各个环境依赖不同、配置复杂,部署过程繁琐又容易出错,不仅耗费...
- 从0开始,让你的Spring Boot项目跑在Linux服务器
-
1搭建Linux服务器1.1购买阿里云服务器或安装虚拟机这里建议是CentOS7.X或CentOS8.X,当然其他的Linux如deepin、Ubuntu也可以,只是软件环境的安装包和安装方式...
- 【技术】Maven 上传第三方jar包到私服
-
通过nexus后台上传私服以NexusRepositoryManagerOSS2.14.5-02为例。登录nexus后台。定义Maven坐标Maven坐标有两种方式:1.自定义参数;2....
- JVM参数、main方法的args参数使用
-
一、前言我们知道JVM参数分为自定义参数、JVM系统参数,Javamain方法的参数。今天就谈谈怎么使用吧。二、查看jvm参数定义自定义参数我们打开cmd窗口,输入java,就能看到自定义参数的格式...
- Maven项目如何发布jar包到Nexus私服
-
Maven项目发布jar包到Nexus私服在编码过程中,有些通用的代码模块,有时候我们不想通过复制粘贴来粗暴地复用。因为这样不仅体现不了变化,也不利于统一管理。这里我们使用mavendeploy的方...
- 干货丨Hadoop安装步骤!详解各目录内容及作用
-
Hadoop是Apache基金会面向全球开源的产品之一,任何用户都可以从ApacheHadoop官网下载使用。今天,播妞将以编写时较为稳定的Hadoop2.7.4版本为例,详细讲解Hadoop的安...
- 一周热门
-
-
Python实现人事自动打卡,再也不会被批评
-
【验证码逆向专栏】vaptcha 手势验证码逆向分析
-
Psutil + Flask + Pyecharts + Bootstrap 开发动态可视化系统监控
-
一个解决支持HTML/CSS/JS网页转PDF(高质量)的终极解决方案
-
再见Swagger UI 国人开源了一款超好用的 API 文档生成框架,真香
-
网页转成pdf文件的经验分享 网页转成pdf文件的经验分享怎么弄
-
C++ std::vector 简介
-
系统C盘清理:微信PC端文件清理,扩大C盘可用空间步骤
-
10款高性能NAS丨双十一必看,轻松搞定虚拟机、Docker、软路由
-
python使用fitz模块提取pdf中的图片
-
- 最近发表
-
- 使用Assembly打包和部署Spring Boot工程
- java高级用法之:调用本地方法的利器JNA
- Linux中如何通过Shell脚本来控制Spring Boot的Jar包启停服务?
- 牛逼!自己手写一个热加载(人民币手写符号一个横还是两个横)
- java 错误: 找不到或无法加载主类?看看怎么解决吧!
- 如何将 Spring Boot 工程打包成独立的可执行 JAR 包
- class 增量发包改造为 jar 包方式发布
- Jar启动和IDE里启动Sprintboot的区别
- Java 20年,以后将往哪儿走?(java还能流行多久)
- Spring Boot Jar 包秒变 Docker 镜像实现多环境部署
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- table.render (33)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)
- c++ 字符串查找 (35)