手记

使用Celery和Python构建一个样例项目

我们将介绍如何搭建Celery,创建任务(生产者),消费者(工作者),以及如何在不同队列中处理任务。

咱们开始吧。

先决条件

在开始之前,请确保你已经安装了所需的软件或工具。

- Python 3.7 或更高版本。
- Redis 或 RabbitMQ(作为消息队列。)

配置 Celery

1. 安装 Celery 和一个消息中间件(比如 Redis),运行如下命令:

    pip install celery[redis]  # 安装Celery并使用Redis作为消息代理

芹菜在本上下文中应指消息中间件,用来发送和接收消息。在本教程中,我们将使用 Redis,但你也可以选择使用 RabbitMQ。

2. 确保 Redis 已安装并运行。

redis-server
redis-server  
项目启动
  1. 为你的项目新建一个文件夹
    mkdir celery-sample-project  
    cd celery-sample-project   

现在我们创建一个名为celery-sample-project的目录,然后进入它。

  1. 搭建一个Python虚拟环境并激活它,
    python -m venv venv   
    source venv/bin/activate  # 在 Windows 上请使用:venv/Scripts/activate  

3. 在项目的根目录下创建一个 celery.py 文件作为所需的配置文件如下:

    # celery.py  
    from celery import Celery  

    # 创建Celery实例,broker使用本地Redis
    app = Celery('sample_project', broker='redis://localhost:6379/0')  

    # 配置Celery实例的参数
    app.conf.update(  
        # 结果后端使用本地Redis
        result_backend='redis://localhost:6379/0',  
        # 任务序列化格式为json
        task_serializer='json',  
        # 接受的内容类型为json
        accept_content=['json'],  
        # 结果序列化格式为json
        result_serializer='json',  
    )   
创建 Celery 任务(生成任务)

创建一个名为 tasks.py 的新文件,然后在里面加入以下代码:

    # tasks.py  
    from celery import shared_task  

    @shared_task  
    def send_email(to, subject, body):  
        # 模拟发送邮件的过程  
        print(f"向 {to} 发送邮件,主题:{subject}")  
        return f"邮件已发送至 {to}"  

    @shared_task  
    def send_sms(phone_number, message):  
        # 模拟发送短信的过程  
        print(f"向 {phone_number} 发送短信:{message}")  
        return f"短信已发送至 {phone_number}"  

    # 示例任务,根据类型发送通知:  
    @shared_task  
    def process_notification(notification_type, payload):  
        if notification_type == 'email':  
            return send_email(payload['to'], payload['subject'], payload['body'])  
        elif notification_type == 'sms':  
            return send_sms(payload['phone_number'], payload['message'])  
        else:  
            print("未知的通知类型:")  
            return "错误:未知的通知类型。"   

这个文件定义了三个 Celery 任务:send_emailsend_smsprocess_notification

这些任务包括模拟发送电子邮件和短信消息。

process_notification 任务会依照通知的类型来处理。

运行 Celery 消费者端

创建一个名为 worker.py 的新文件来运行 Celery 工作服务:

# worker.py (工作进程文件,一个工作进程脚本,负责处理发送邮件、短信和处理通知等任务)
from celery import Celery  
from tasks import send_email, send_sms, process_notification  

app = Celery('worker', broker='redis://localhost:6379/0')  

if __name__ == '__main__':  
    app.start()   

启动Celery worker,可以运行以下命令:

celery worker --app your_project_name -l info
celery -A tasks worker --loglevel=info   # 启动Celery worker并设置日志级别为信息级

此命令启动一个 Celery 工人,该工人监听并处理在 Redis 队列中的任务。

用 Python 发送任务指令(任务发送方)

要将任务发送到 Celery,请创建另一个名为 send_tasks.py 的 Python 脚本。

    # send_tasks.py
    from tasks import process_notification

    if __name__ == '__main__':
        # 示例:发送电子邮件通知如下
        email_payload = {
            'to': 'receiver@example.com',
            'subject': '示例邮件',
            'body': '这是一封示例电子邮件通知',
        }
        result = process_notification.delay('email', email_payload)
        print(f"任务ID: {result.id}")

        # 示例:发送短信通知如下
        sms_payload = {
            'phone_number': '1234567890',
            'message': '这是一条示例短信通知',
        }
        result = process_notification.delay('sms', sms_payload)
        print(f"任务ID: {result.id}")
``

这个脚本使用 `.delay()` 方法将任务发送给 Celery 工作节点。它发送邮件和短信通知。

## 运行示例程序(示例项目)

1. 启动 Redis 服务,并确保它正在运行:  
通过执行命令来确保 Redis 正在运行:
redis-server  

2\. 启动 Celery 工作节点:  
切换到您的项目目录,在单独的终端窗口中启动 Celery 工作节点:
celery -A tasks worker --loglevel=info   
此命令用于启动Celery工作进程,并设置日志级别为信息。

3\. 发送任务:  
在另一个终端窗口里运行 send_tasks.py 脚本以向工件发送任务:

运行 python send_tasks.py 脚本来发送任务.


你应该看到 Celery 任务进程正在处理邮件和短信任务,并显示输出信息。

[2024-10-18 14:20:00,123: 消息/主进程] 任务 tasks.send_email[abcd1234] 在 0.0100s 内成功执行: '邮件已成功发送至 receiver@example.com'
[2024-10-18 14:20:00,456: 消息/主进程] 任务 tasks.send_sms[efgh5678] 在 0.0050s 内成功执行: '短信已成功发送至 1234567890'



就这样吧。

你现在可以在Python应用程序中使用分布式任务队列了,这样可以异步处理后台任务和消息。

你可以了解 Celery 的其他功能,比如错误处理机制、任务重试功能和高级设置。

你还可以将这个例子扩展到处理更复杂的流程,包括计划任务和并行任务。

编程愉快💪
0人推荐
随时随地看视频
慕课网APP