百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

一篇文章搞定Celery消息队列配置定时任务

liuian 2024-12-01 00:59 24 浏览

介绍

celery 定时器是一个调度器(scheduler);它会定时地开启(kicks off)任务,然后由集群中可用的工人(worker)来执行。

定时任务记录(entries)默认 从 beat_schedule 设置中获取,但自定义存储也可以使用,如把记录存储到SQL数据库中。

要确保同一时间一份时间表上只有一个调度器在运行,否则会因为重复发送任务而结束。使用集中途径意味着定时任务不用必须同步,并且服务无需用锁操控。


  • user:用户程序,用于告知celery去执行一个任务。
  • broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
  • worker:执行任务
  • celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

    版本和要求

    Celery version 4.0 runs on
            Python ?2.7, 3.4, 3.5?
            PyPy ?5.4, 5.5?
        This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
    
        If you’re running an older version of Python, you need to be running an older version of Celery:
    
            Python 2.6: Celery series 3.1 or earlier.
            Python 2.5: Celery series 3.0 or earlier.
            Python 2.4 was Celery series 2.2 or earlier.
    
        Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

    环境准备

    安装rabbitMQ或Redis

    安装celery

    pip3 install celery

    快速上手

    s1.py

    s1.pyimport time
    from celery import Celery
    
    app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
    
    
    @app.task
    def xxxxxx(x, y):
        time.sleep(10)
        return x + y

    s2.py

    from s1 import func
    
    # func,并传入两个参数
    result = xxxxxx.delay(4, 4)
    print(result.id)

    s3.py

    from celery.result import AsyncResult
    from s1 import app
    
    async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    # 执行 s1.py 创建worker(终端执行命令):
    celery worker -A s1 -l info
    # PS:Windows系统上执行命令时出错解决方法
        pip3 install eventlet
    # 后期运行修改为:
        celery worker -A s1 -l info -P eventlet
    # 执行 s2.py ,创建一个任务并获取任务ID:
        python3 s2.py
    
    # 执行 s3.py ,检查任务状态并获取结果:
        python3 s3.py

    多任务结构

    pro_cel
        ├── celery_tasks# celery相关文件夹
        │   ├── celery.py   # celery连接和配置相关文件
        │   └── tasks.py    #  所有任务函数
        ├── check_result.py # 检查结果
        └── send_task.py    # 触发任务

    pro_cel/celery_tasks/celery

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from celery import Celery
    
    celery = Celery('func',
                    broker='redis://192.168.111.111:6379',
                    backend='redis://192.168.111.111:6379',
                    include=['celery_tasks.tasks'])
    
    # 时区
    celery.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    celery.conf.enable_utc = False

    pro_cel/celery_tasks/tasks.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import time
    from .celery import celery
    
    
    @celery.task
    def func(*args, **kwargs):
        time.sleep(5)
        return "任务结果"
    
    
    @celery.task
    def hhhhhh(*args, **kwargs):
        time.sleep(5)
        return "任务结果"

    pro_cel/check_result.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from celery.result import AsyncResult
    from celery_tasks.celery import celery
    
    async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

    pro_cel/send_task.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import celery_tasks.tasks
    
    # 立即告知celery去执行func任务,并传入两个参数
    result = celery_tasks.tasks.func.delay(4, 4)
    
    print(result.id)

    更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html

    定时任务

    设定时间让celery执行一个任务

    import datetime
    from celery_tasks.tasks import func
    """
    from datetime import datetime
     
    v1 = datetime(2020, 4, 11, 3, 0, 0)
    print(v1)
     
    v2 = datetime.utcfromtimestamp(v1.timestamp())
    print(v2)
     
    """
    ctime = datetime.datetime.now()
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
     
    s10 = datetime.timedelta(seconds=10)
    ctime_x = utc_ctime + s10
     
    # 使用apply_async并设定时间
    result = func.apply_async(args=[1, 3], eta=ctime_x)
    print(result.id)

    类似于contab的定时任务

    """
    celery beat -A proj
    celery worker -A proj -l info
     
    """
    from celery import Celery
    from celery.schedules import crontab
     
    app = Celery('tasks', broker='amqp://147.918.134.86:5672', backend='amqp://147.918.134.86:5672', include=['proj.s1', ])
    app.conf.timezone = 'Asia/Shanghai'
    app.conf.enable_utc = False
     
    app.conf.beat_schedule = {
        # 'add-every-10-seconds': {
        #     'task': 'proj.s1.add1',
        #     'schedule': 10.0,
        #     'args': (16, 16)
        # },
        'add-every-12-seconds': {
            'task': 'proj.s1.add1',
            'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
            'args': (16, 16)
        },
    }

    注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。

    Flask中应用Celery

    pro_flask_celery/
    ├── app.py
    ├── celery_tasks
        ├── celery.py
        └── tasks.py

    app.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from flask import Flask
    from celery.result import AsyncResult
    
    from celery_tasks import tasks
    from celery_tasks.celery import celery
    
    app = Flask(__name__)
    
    TASK_ID = None
    
    
    @app.route('/')
    def index():
        global TASK_ID
        result = tasks.func.delay()
        TASK_ID = result.id
    
        return "任务已经提交"
    
    
    @app.route('/result')
    def result():
        global TASK_ID
        result = AsyncResult(id=TASK_ID, app=celery)
        if result.ready():
            return result.get()
        return "xxxx"
    
    
    if __name__ == '__main__':
        app.run()

    celery_tasks/celery.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from celery import Celery
    from celery.schedules import crontab
    
    celery = Celery('func',
                    broker='redis://192.168.110.148:6379',
                    backend='redis://192.168.110.148:6379',
                    include=['celery_tasks.tasks'])
    
    # 时区
    celery.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    celery.conf.enable_utc = False

    celery_task/tasks.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import time
    from .celery import celery
    
    
    @celery.task
    def hello(*args, **kwargs):
        print('执行hello')
        return "hello"
    
    
    @celery.task
    def func(*args, **kwargs):
        print('执行func')
        return "func"
    
    
    @celery.task
    def hhhhhh(*args, **kwargs):
        time.sleep(5)
        return "任务结果"

    记录

    为了定时调用任务,你必须添加记录到打点列表中:

    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery()
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # 每10秒调用 test('hello') .
        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    
        # 每30秒调用 test('world') 
        sender.add_periodic_task(30.0, test.s('world'), expires=10)
    
        # 每周一上午7:30执行
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1),
            test.s('Happy Mondays!'),
        )
    
    @app.task
    def test(arg):
        print(arg)

    用on_after_configure处理器进行这些设置意味着当使用test.s()时我们不会在模块层面运行app 。

    add_periodic_task() 函数在幕后会添加记录到beat_schedule设定,同样的设定可以用来手动设置定时任务:

    例子: 每30秒运行 tasks.add .

    app.conf.beat_schedule = {
        'add-every-30-seconds': {
            'task': 'tasks.add',
            'schedule': 30.0,
            'args': (16, 16)
        },
    }
    app.conf.timezone = 'UTC'

    一般会使用配置文件进行配置,如下
    celeryconfig.py:

    broker_url = 'pyamqp://'
    result_backend = 'rpc://'
    
    task_serializer = 'json'
    result_serializer = 'json'
    accept_content = ['json']
    timezone = 'Europe/Oslo'
    enable_utc = True
    beat_schedule = {
        'add-every-30-seconds': {
            'task': 'tasks.add',
            'schedule': 30.0,
            'args': (16, 16)
        },
    }

    程序里使用

    app.config_from_object('celeryconfig')
    
    注意
    如果你的参数元组里只有一个项目,只用一个逗号就可以了,不要圆括号。

    时间表使用时间差意味着每30秒间隔会发送任务(第一个任务在celery定时器开启后30秒发送,然后上每次距一次运行后30秒发送一次)

    可使用的属性

    task:要执行的任务名字

    schedule:执行的频率[可以是整数秒数,时间差,或者一个周期( crontab)。你也可以自 定义你的时间表类型,通过扩展schedule接口]

    args:位置参数 (list 或 tuple)

    kwargs:键值参数 (dict)

    options:执行选项 (dict)[这可以是任何被apply_async()支持的参数与—-exchange, routing_key, expires,等]

    relative:如果 relative 是 true ,时间表“由时钟时间”安排,意味着 频率近似到最近的秒,分钟,小时或天,这取决于时间差中的时间间隔[默认relative是false,频率不会近似,会相对于celery的启动时间]

    Crontab 表达式语法


    开启调度

    开启celery定时服务

    celery -A proj beat

    可以把定时器嵌入到工人(worker)中,通过启用workers -B选项,如果你永远不会运行超过一个工人节点这就会很方便。但这不太常见,不推荐在生产环境这样使用

    celery -A proj worker -B

    定时器需要在本地数据库文件(默认名为 celerybeat-schedule )存储任务上次运行时间,所以它需要在当前目录中写权限。或者你也可以给这个文件指定一个位置

    celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

    #Python##每天学python##Python入门推荐#




    相关推荐

    GANs为何引爆机器学习?这篇基于TensorFlow的实例教程为你解惑!

    「机器人圈导览」:生成对抗网络无疑是机器学习领域近三年来最火爆的研究领域,相关论文层出不求,各种领域的应用层出不穷。那么,GAN到底如何实践?本文编译自Medium,该文作者以一朵玫瑰花为例,详细阐...

    高丽大学等机构联合发布StarGAN:可自定义表情和面部特征

    原文来源:arXiv、GitHub作者:YunjeyChoi、MinjeChoi、MunyoungKim、Jung-WooHa、SungKim、JaegulChoo「雷克世界」编译:嗯~...

    TensorFlow和PyTorch相继发布最新版,有何变化

    原文来源:GitHub「机器人圈」编译:嗯~阿童木呀、多啦A亮Tensorflow主要特征和改进在Tensorflow库中添加封装评估量。所添加的评估量列表如下:1.深度神经网络分类器(DNNCl...

    「2022 年」崔庆才 Python3 爬虫教程 - 深度学习识别滑动验证码缺口

    上一节我们使用OpenCV识别了图形验证码躯壳欧。这时候就有朋友可能会说了,现在深度学习不是对图像识别很准吗?那深度学习可以用在识别滑动验证码缺口位置吗?当然也是可以的,本节我们就来了解下使用深度...

    20K star!搞定 LLM 微调的开源利器

    LLM(大语言模型)微调一直都是老大难问题,不仅因为微调需要大量的计算资源,而且微调的方法也很多,要去尝试每种方法的效果,需要安装大量的第三方库和依赖,甚至要接入一些框架,可能在还没开始微调就已经因为...

    大模型DeepSeek本地部署后如何进行自定义调整?

    1.理解模型架构a)查看深度求索官方文档或提供的源代码文件,了解模型的结构、输入输出格式以及支持的功能。模型是否为预训练权重?如果是,可以在预训练的基础上进行微调(Fine-tuning)。是否需要...

    因配置不当,约5000个AI模型与数据集在公网暴露

    除了可访问机器学习模型外,暴露的数据还可能包括训练数据集、超参数,甚至是用于构建模型的原始数据。前情回顾·人工智能安全动态向ChatGPT植入恶意“长期记忆”,持续窃取用户输入数据多模态大语言模型的致...

    基于pytorch的深度学习人员重识别

    基于pytorch的深度学习人员重识别Torchreid是一个库。基于pytorch的深度学习人员重识别。特点:支持多GPU训练支持图像的人员重识别与视频的人员重识别端到端的训练与评估简单的re...

    DeepSeek本地部署:轻松训练你的AI模型

    引言:为什么选择本地部署?在AI技术飞速发展的今天,越来越多的企业和个人希望将AI技术应用于实际场景中。然而,对于一些对数据隐私和计算资源有特殊需求的用户来说,云端部署可能并不是最佳选择。此时,本地部...

    谷歌今天又开源了,这次是Sketch-RNN

    前不久,谷歌公布了一项最新技术,可以教机器画画。今天,谷歌开源了代码。在我们研究其代码之前,首先先按要求设置Magenta环境。(https://github.com/tensorflow/magen...

    Tensorflow 使用预训练模型训练的完整流程

    前面已经介绍了深度学习框架Tensorflow的图像的标注和训练数据的准备工作,本文介绍一下使用预训练模型完成训练并导出训练的模型。1.选择预训练模型1.1下载预训练模型首先需要在Tensorf...

    30天大模型调优学习计划(30分钟训练大模型)

    30天大模型调优学习计划,结合Unsloth和Lora进行大模型微调,掌握大模型基础知识和调优方法,熟练应用。第1周:基础入门目标:了解大模型基础并熟悉Unsloth等工具的基本使用。Day1:大模...

    python爬取喜马拉雅音频,json参数解析

    一.抓包分析json,获取加密方式1.抓包获取音频界面f12打开抓包工具,播放一个(非vip)视频,点击“媒体”单击打开可以复制URL,发现就是我们要的音频。复制“CKwRIJEEXn-cABa0Tg...

    五、JSONPath使用(Python)(json数据python)

    1.安装方法pipinstalljsonpath2.jsonpath与Xpath下面表格是jsonpath语法与Xpath的完整概述和比较。Xpathjsonpath概述/$根节点.@当前节点...

    Python网络爬虫的时候json=就是让你少写个json.dumps()

    大家好,我是皮皮。一、前言前几天在Python白银交流群【空翼】问了一个Python网络爬虫的问题,提问截图如下:登录请求地址是这个:二、实现过程这里【甯同学】给了一个提示,如下所示:估计很多小伙伴和...