在开始讨论之前,我想强调一下 Azure 服务总线和 Celery 之间的差异。Azure服务总线:Microsoft Azure 服务总线是完全托管的企业集成消息代理。芹菜 :分布式任务队列。Celery是一个基于分布式消息传递的异步任务队列/作业队列。对于你的情况我可以想到两种可能性:您希望将 Service Bus 与 Celery 一起使用来代替其他消息代理。用服务总线替换 Celery1:您希望将 Service Bus 与 Celery 一起使用来代替其他消息代理。2:完全用Service Bus替换Celery 以满足您的要求:考虑消息发送者是生产者消息接收者是消费者这是您必须处理的两个不同的应用程序。解释 :每次您想要执行操作时,您都可以从生产者客户端向主题发送消息。消费者客户端 - 正在侦听的应用程序将接收消息并处理该消息。您可以将自定义流程附加到它 - 这样,只要消费者客户端收到消息,您的自定义流程就会执行。以下是接收客户端的示例:from azure.servicebus.aio import SubscriptionClientimport asyncioimport nest_asyncionest_asyncio.apply() Receiving = True#Topic 1 receiver : conn_str= "<>"name="Allmessages1"SubsClient = SubscriptionClient.from_connection_string(conn_str, name)receiver = SubsClient.get_receiver()async def receive_message_from1(): await receiver.open() print("Opening the Receiver for Topic1") async with receiver: while(Receiving): msgs = await receiver.fetch_next() for m in msgs: print("Received the message from topic 1.....") ##### - Your code to execute when a message is received - ######## print(str(m)) ##### - Your code to execute when a message is received - ######## await m.complete() loop = asyncio.get_event_loop()topic1receiver = loop.create_task(receive_message_from1())下面一行之间的部分是每次收到消息时将执行的指令。##### - Your code to execute when a message is received - ########