手记

@DelayBasedRocketMQ 让方法延时执行

1. 概览

在日常开发中,延时任务是一个无法规避的话题。也存在各种不同的方案,比如:

  1. 数据库轮询方案。建立一个调度任务,周期性从数据库中查询待执行的任务,如果满足延时要求,并执行对于的业务操作;

  2. 单机内存解决方案。可以使用 DelayQueue、ScheduledExecutorService、TimerWheel等数据结构在内存中对任务进行维护,并运行满足延时的任务;

  3. 分布式延时队列方案。可以使用基于 redis 的 redisson 延时任务,也可以使用 RocketMQ 延时队列。

当然,在所有的方案中,分布式延时队列方案是最佳方法,当然也是最复杂的方案。

1.1. 背景

在延时任务这个场景,分布式延时队列方案 是最优策略,所以,不少公司制定了相关规范,只能使用 RocketMQ 实现延时调度。
系统的稳定性有了一定的保障,但操作的复杂性抛给了下面的研发人员。

从一个 Leader 角度,我一直认为“只定规范,不提供工具,是极度不负责任的表现”。

1.2. 目标

期望框架能够提供:

  1. 不需要 Coding,快速使一个方法具有延时运行的能力;

  2. 可通过参数指定延时时长;

  3. 任务创建和消费分离,在不同的集群中完成,以更好的支持资源隔离;

2. 快速入门

框架基于 RocketMQ 进行构建,请自行完成 RocketMQ 的搭建。

2.1. 引入 RocketMQ

我们使用 rocketmq starter 完成基本配置。

首先,在 pom 中增加 rocketmq starter 依赖,具体如下:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

其次,在 application.yml 中添加 rocketmq 配置,具体如下:

rocketmq:
  name-server: http://127.0.0.1:9876
  producer:
    group: async-demo

其中,name-server 根据具体情况进行配置。

配置完成,可以在项目中:

  1. 注入 RocketMQTemplate 进行消息发送;

  2. 使用 @RocketMQMessageListener 标记处理方法,进行消息消费;

2.2. 添加 lego-starter 依赖

为了方便与 spring-boot 项目集成,lego 提供 lego-starter,以完成快速接入。

在 pom 中增加 starter,具体如下:

<dependency>
    <groupId>com.geekhalo.lego</groupId>
    <artifactId>lego-starter</artifactId>
    <version>0.1.5-delay_task-SNAPSHOT</version>
</dependency>

其中,自动配置机制将完成:

  1. 注册 DelayMethodInterceptor,对 @DelayBasedRocketMQ 注解进行拦截,将请求发送至 RocketMQ;

  2. 构建并启动DelayConsumerContainer,监听 topice 的消息,用于消费消息

2.3. @DelayBasedRocketMQ

我们只需在方法上添加 @DelayBasedRocketMQ 注解,完成基础配置,该方法便具备延时处理的能力。具体如下:

@DelayBasedRocketMQ(
        topic = "${cancelOrder.delay.topic}",
        tag = "delayCancelOrder",
        consumerGroup = "${cancelOrder.delay.consumerGroup1}",
        delayLevel = 2
)
public void delayCancelOrder(Long orderId, String reason){
    DelayTask delayTask = new DelayTask(orderId, reason, null);
    log.info("Run Cancel Order {}", delayTask);
    this.tasks.add(delayTask);
}

@DelayBasedRocketMQ 定义如下:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DelayBasedRocketMQ {
    /**
     * RocketMQ topic
     * @return
     */
    String topic();

    /**
     * Tag
     * @return
     */
    String tag() default "*";

    /**
     * 延迟级别
     * @return
     */
    int delayLevel() default -1;

    /**
     * 延迟时间 SpEL 表达式
     * @return
     */
    String delayLevelSpEl() default "0";

    /**
     * nameServer 配置
     * @return
     */
    String nameServer() default "${rocketmq.name-server:}";

    /**
     * 消费者组信息
     * @return
     */
    String consumerGroup();

    /**
     * 消费者运行的 profile,主要用于发送和消费分离的场景
     * @return
     */
    String consumerProfile() default "";
}

在 application 文件中增加相关配置,具体如下:

cancelOrder:
  delay:
    topic: delay-task-test-topic
    consumerGroup1: delay-cancel-order-group1
    consumerGroup2: delay-cancel-order-group2

写一个简单的单测,代码如下:

@Test
void delayCancelOrder() throws Exception{
    Long orderId = RandomUtils.nextLong();
    String reason = "超时自动取消";

    this.delayService.delayCancelOrder(orderId, reason);

    Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks()));

    TimeUnit.SECONDS.sleep(4);

    Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks()));

    TimeUnit.SECONDS.sleep(6);

    Assertions.assertTrue(CollectionUtils.isNotEmpty(this.delayService.getTasks()));

}

运行单测,日志如下:

10:16:09.951  [           main] c.g.l.core.delay.DelayMethodInterceptor  : success to sent Delay Task to RocketMQ for [7973190633030104064, 超时自动取消]
10:16:14.964  [MessageThread_1] com.geekhalo.lego.delay.DelayService     : Run Cancel Order DelayService.DelayTask(orderId=7973190633030104064, reason=超时自动取消, timeOutLevel=null)
10:16:14.964  [MessageThread_1] c.g.l.c.s.AbstractConsumerContainer      : consume message 2408820718E04140899ECD9401476CBB137358644D46308D59D10000, cost: 8 ms

为了方便,对部分日志进行简化,但不影响分析结果。

从运行日志可以得出:

  1. 10:16:09 主线程 main 向 MQ 发送一条延时消息;

  2. 5秒以后(10:16:14)消费线程拉取到延时任务,并调用业务方法(DelayService#delayCancelOrder)

从日志上看,完全符合设计预期。

2.4. 动态设置延时级别

不同的场景需要不同的延时级别,在 @DelayBasedRocketMQ 直接指定死,不方便业务扩展。

如果需要动态指定延时级别,可以使用 @DelayBasedRocketMQ 的 delayLevelSpEl ,通过 SpEL 从上下文中读取配置,具体如下:

@DelayBasedRocketMQ(
        topic = "${cancelOrder.delay.topic}",
        tag = "delayCancelOrderForTimeout",
        consumerGroup = "${cancelOrder.delay.consumerGroup2}",
        delayLevelSpEl = "#timeOutLevel"
)
public void delayCancelOrderForTimeout(Long orderId, String reason, int timeOutLevel){
    DelayTask delayTask = new DelayTask(orderId, reason, timeOutLevel);
    log.info("Run Cancel Order {}", delayTask);
    this.tasks.add(delayTask);
}

其中,delayLevelSpEl = “#timeOutLevel” 含义为,将参数 timeOutLevel 的值作为 延时级别。

编写单元测试用例,具体如下:

@Test
void delayCancelOrder_DelayTime() throws Exception{
    Long orderId = RandomUtils.nextLong();
    String reason = "超时自动取消";


    this.delayService.delayCancelOrderForTimeout(orderId, reason, 3);

    Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks()));

    TimeUnit.SECONDS.sleep(9);

    Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks()));

    TimeUnit.SECONDS.sleep(11);

    Assertions.assertTrue(CollectionUtils.isNotEmpty(this.delayService.getTasks()));

}

运行测试用例,观察日志如下:

10:27:56.257  [           main] c.g.l.core.delay.DelayMethodInterceptor  : success to sent Delay Task to RocketMQ for [844282856080074752, 超时自动取消, 3]
10:28:06.281  [MessageThread_1] com.geekhalo.lego.delay.DelayService     : Run Cancel Order DelayService.DelayTask(orderId=844282856080074752, reason=超时自动取消, timeOutLevel=3)
10:28:06.282  [MessageThread_1] c.g.l.c.s.AbstractConsumerContainer      : consume message 2408820718E04140899ECD9401476CBB13D358644D46309820DB0000, cost: 13 ms

从日志上可见,延时时间已经成功调整为 10秒(10:27:56 发送任务,10:28:06 接收到任务),符合设计预期。

2.5. 任务创建和消费分离

为了更好的对资源进行隔离,有时需要单独部署一组集群,用于处理后台任务。

为支持该模式,@DelayBasedRocketMQ提供了 consumerProfile 配置,用于指定 Consumer 在哪个 profile 下执行,如果不设置,则对环境不进行任何要求。

3.设计&扩展

3.1. 核心设计

整体架构

在方法上添加注解后,框架自动完成:

  1. 增加 DelayMethodInterceptor 和 PointcutAdvisor Bean,用于对方法进行拦截,将请求转发至 MQ;

  2. 创建并启动 DelayConsumerContainer,通过 MQConsumer 监听消息变更,并调用 业务方法;

3.2. 核心流程

核心流程如下:

  1. 方法被调用,被 DelayMethodInterceptor 拦截;
  • 首先,对调用参数进行序列化;

  • 然后,将信息封装为 Message

  • 最后,向 RocketMQ 发送延时消息

  1. 消息在RocketMQ进行存储,当到达延时时间时,将 Message 投放至 Consumer;

  2. MQPushConsumer,监听到消息,并完成业务操作;

  • Consumer 获得 Message 信息

  • 将消息进行反序列化,获得调用参数

  • 使用调用参数调用业务方法

4. 项目信息

0人推荐
随时随地看视频
慕课网APP