MassTransit RabbitMq 调度消息未发送给消费者

我想使用 MassTransit + RabbitMq 总线来调度来自总线的消息。我编写了两个 C# 控制台应用程序,一个用于消息创建者并将消息发送到调度程序,另一个用于消息使用者。


以下代码用于在总线中进行调度,以便每秒向调度程序发送一条消息,然后调度程序以 10 秒的延迟发送给消费者。我的问题是在rabbitMq 客户端没有消息发送到消费者或消费者队列。我的错误在哪里?


注意: UseInMemoryScheduler 工作正常,但 UseMessageScheduler 不起作用。


总线消息创建器


class Program

{

    public static void Main(string[] args)

    {

        MainAsync(args).GetAwaiter().GetResult();


        Console.ReadKey();

    }


    static async Task MainAsync(string[] args)

    {

        var busControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>

        {

            var host = rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>

            {

                settings.Username("guest");

                settings.Password("guest");

            });


            //rabbit.UseInMemoryScheduler(); // This works

            rabbit.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));// This doesn't work,

        });


        busControl.Start();


        var sendEndpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/quartz"));


        for (int i = 0; i < 1000000; i++)

        {

            await sendEndpoint.ScheduleSend(new Uri("rabbitmq://localhost/publisher"), 

                DateTime.Now.AddSeconds(10), 

                new MessageCreated()

                {

                    Text = $"message {i}"

                });


            Thread.Sleep(1000);

        }


        Console.ReadKey();


        busControl.Stop();

    }

}

消息消费者。


class Program

{

    static void Main(string[] args)

    {

        var busControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>

        {

            var host = rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>

            {

                settings.Password("guest");

                settings.Username("guest");

            });


            rabbit.ReceiveEndpoint(host, "publisher", conf =>

            {

                conf.Consumer<Consumer>();

            });

        });


        busControl.Start();

        Console.ReadKey();

        busControl.Stop();

    }

}

慕沐林林
浏览 156回答 2
2回答

杨__羊羊

你快到了,只需用这个更新逻辑:busControl.Start();scheduler.JobFactory = new MassTransitJobFactory(busControl);scheduler.Start().Wait();Console.ReadKey();busControl.Stop();

Cats萌萌

所以你没有提到你是否有第三个消费者正在运行(在第三个控制台应用程序中)。为了调度使用 Quartz,您需要专门用于 Quartz 的第三个使用者。它必须正在运行,在这种情况下,石英接收端点将侦听“石英”队列。[更新]这是第三个控制台应用程序(石英服务)所需的配置示例:var scheduler = CreateScheduler();configurator.ReceiveEndpoint("quartz", e =>{&nbsp; &nbsp; configurator.UseMessageScheduler(e.InputAddress);&nbsp; &nbsp; e.Consumer(() => new ScheduleMessageConsumer(scheduler));&nbsp; &nbsp; e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));});...private static IScheduler CreateScheduler(){&nbsp; &nbsp; ISchedulerFactory schedulerFactory = new StdSchedulerFactory();&nbsp; &nbsp; var scheduler = schedulerFactory.GetScheduler();&nbsp; &nbsp; return scheduler;}并且您还需要为我们配置石英存储(如果您想在内存中进行测试,则为 SQLite、MSSql、RAM)。请在此处查看示例配置。[/更新][更新2]有人在群里发过类似的问题。幸运的是,他们提供了一个示例 github,其中包含一系列不同的 MT 功能,其中之一是单独的调度程序。请看一下,那里应该有你需要的一切。[更新2]如果您想在不运行完整的石英调度程序的情况下进行测试,那么您可以使用InMemory 调度程序。但这仅用于测试目的,您不应该在生产中使用它。此外,在您的第一个代码片段中,您不需要获取调度程序的发送端点: var sendEndpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/quartz"));因为,在总线配置中,rabbit.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));表明无论何时您调用 ScheduleSend(从 ConsumeContext 或 IBus/IBusControl),它将始终使用该 /quartz 地址。最后,这一行await sendEndpoint.ScheduleSend(new Uri("rabbitmq://localhost/publisher"),,您可以更改为 busControl.ScheduleSend(...)
打开App,查看更多内容
随时随地看视频慕课网APP