手记

Splitter 助力解决“大集合参数”这个性能杀手

1. 概览

1.1. 背景

有一次,一个伙伴问我:“MySQL 主键查询那么慢吗,需要几秒才返回?” 对此我也很好奇,从理论上来讲不大可能,主键查询是最快的查询,没有之一。

带着疑问,查看系统日志,大多数请求非常快,基本都在 1、2 ms 内,个别请求可能超过 500ms,甚至有请求超过 3s,整体响应时间非常不均衡。

问题可能出现在哪呢?

  1. 发生了类型转换,导致全表扫描?

  2. 数据库压力过大,影响全局性能?

查看代码,是一个非常简单的 “select * from t where id in (…)” 语句,其中 id 为 Long 类型,无需进行类型转换。但,稍等 in 了多少,程序中没做限制,直接将参数进行拼接,这可能就是问题所在。

完善日志后,继续观察,果然,in 后的参数可能高达几万,甚至十几万,这就太过分了。随后,对其进行调整,将超限参数进行拆分,提升调用频次,降低入参数量,核心代码如下:

private int maxSize = 1000;
public List<Entity> getByIds(List<Long> ids){
    List<List<Long>> splittedIds = Lists.partition(ids, maxSize);
    List<Entity> entities = Lists.newArrayListWithCapacity(ids.size());
    for (List<Long> ids2Use : splittedIds){
        List<Entity> entities1 = this.dao.getByIds(ids2Use);
        entities.addAll(entities1);
    }
    return entities;
}

自此,伙伴们就 get 到了新技能,主动对大的参数进行拆分处理。随后公司制定了相应规范,对数据库参数进行限制,不允许过大参数的存在。

但,好景不长,一处小小的 bug 险些造成线上事故。

具体代码如下:

private int maxSize = 1000;
public List<Entity> getByIds(List<Long> ids){
    List<List<Long>> splittedIds = Lists.partition(ids, maxSize);
    List<Entity> entities = Lists.newArrayListWithCapacity(ids.size());
    for (List<Long> ids2Use : splittedIds){
        // 在调用方法时,没有使用拆分后的新参数,直接使用拆分前参数
        // 不仅没有解决大参数问题,而且对大参数进行了放大
        // 每遇到一个大参数,内存承压巨大,甚至引起 OOM
        List<Entity> entities1 = this.dao.getByIds(ids);
        entities.addAll(entities1);
    }
    return entities;
}

这种case,很难通过正常测试覆盖;由于过于细节,Code Review 也容易忽略,该怎么从根源上杜绝呢?

1.2. 目标

能力声明式,在不 Coding 的情况下,通过在方法上增加声明式注解,使其具备自动拆分的能力。

目标很明确,拒绝编码,只在方法中增加注解,在方法调用时,使其具备自动拆分和合并的能力。

这就是 splitter 的由来,如果你也遇到过相似问题,可以直接使用。

2. 快速入门

以 spring-boot 项目为例。

2.1. 添加 starter

首先在spring-boot 项目的pom中增加 splitter-starter,坐标如下:

<dependency>
    <groupId>com.geekhalo.lego</groupId>
    <artifactId>lego-starter-splitter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

2.2. 为方法增加 @Split 注解

splitter 提供多种使用方式,可以根据方法签名进行选择。具体如下:

2.2.1. 单集合拆分

这是最简单的方式,其中 @Split 注解:

  1. sizePrePartition。每个分区的参数数量;

  2. taskPreThread。每个线程执行的任务数;

@Split(sizePrePartition = 2, taskPreThread = 2)
public List<Long> splitByList(List<Long> params){
    return convert(params);
}

2.2.2. 多参数集合拆分

如果存在多个入参,要根据其中一个入参进行拆分,需使用 @SplitParam 对要拆分的参数进行标注。

@Split(sizePrePartition = 2, taskPreThread = 2)
public List<Long> splitByList(@SplitParam List<Long> params, Long other){
    Preconditions.checkArgument(other != null);
    return convert(params);
}

2.2.3. 参数对象拆分

如果使用的是 Param Object 模式(使用一个对象对所有入参进行封装),直接在需要拆分的属性上增加 @SplitParam 即可。

拆分方法如下:

@Split(sizePrePartition = 2, taskPreThread = 2)
public List<Long> splitByParam(AnnBasedInputParam param){
    Preconditions.checkArgument(param.getOther() != null);
    return convert(param.getNumbers());
}

AnnBasedInputParam 示例如下:

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class AnnBasedInputParam {
    @SplitParam
    private List<Long> numbers;
    private  Long other;
}

2.2.4. SplittableParam 参数对象

对于复杂的 ParamObject 模式,splitter 提供了 SplittableParam 进行扩展。

拆分方法如下:

@Split(sizePrePartition = 2, taskPreThread = 2)
public List<Long> splitByParam(SplittableInputParam param){
    Preconditions.checkArgument(param.getOther() != null);
    return convert(param.getNumbers());
}

SplittableParam 定义如下:

public interface SplittableParam<P extends SplittableParam<P>> {
    List<P> split(int maxSize);
}

SplittableInputParam 示例如下:

@Value
@Builder
public class SplittableInputParam implements SplittableParam<SplittableInputParam> {
    private final List<Long> numbers;
    private final Long other;

    @Override
    public List<SplittableInputParam> split(int maxSize) {
        List<List<Long>> partition = Lists.partition(this.numbers, maxSize);
        return partition.stream()
                .map(ns -> SplittableInputParam.builder()
                        .numbers(ns)
                        .other(other)
                        .build()
                ).collect(toList());
    }
}

2.3. 运行效果

测试代码如下:

@Test
@Timeout(3)
public void splitByList() {
    List<Long> params = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
    List<Long> longs = this.splitTestService.splitByList(params);
    Assertions.assertEquals(8, longs.size());
}

运行结果如下:

2022-07-24 23:17:23.237  INFO 13309 --- [           main] c.g.lego.splitter.SplitTestService       : Thread main run with [1, 2]
2022-07-24 23:17:23.237  INFO 13309 --- [ecutor-Thread-1] c.g.lego.splitter.SplitTestService       : Thread Default-Split-Executor-Thread-1 run with [5, 6]
2022-07-24 23:17:24.245  INFO 13309 --- [           main] c.g.lego.splitter.SplitTestService       : Thread main run with [3, 4]
2022-07-24 23:17:24.245  INFO 13309 --- [ecutor-Thread-1] c.g.lego.splitter.SplitTestService       : Thread Default-Split-Executor-Thread-1 run with [7, 8]

从日志中可以看出,框架不仅仅对参数进行拆分,还是用多线程技术,并行执行任务,大大提升系统的响应时间。

3. 设计

3.1. 核心流程

splitter 核心流程如下:

核心设计

核心流程包括三个步骤:

  1. 拆分,对入参进行拆封,将一个大入参拆分为多个小参数

  2. 执行,以拆分后的小参数作为入参,执行业务逻辑,获取执行结果;

  3. 合并,对多个执行结果进行合并,获得最终结果。

3.2. 核心组件

与操作步骤对应,核心组件包括:

  1. ParamSplitter 拆分器,完成入参的拆分;

  2. MethodExecutor 执行器,执行业务逻辑;

  3. ResultMerger 合并器,对结果进行合并;

  4. DefaultSplitService 拆分服务,基于以上三个组件,完成拆分流程;

3.2.1. ParamSplitter

ParamSplitter 接口定义如下:

public interface ParamSplitter<P> {
    /**
     * 将 param 按照 maxSize 进行拆分
     * @param param 原输入参数
     * @param maxSize 拆分后,每个分区的最大元素个数
     * @return
     */
    List<P> split(P param, int maxSize);
}

SmartParamSplitter 是 ParamSplitter 的一个重要子类,根据类型完成组件装配,其定义如下:

public interface SmartParamSplitter<P> extends ParamSplitter<P>{
    /**
     * 是否能支持特定类型
     * @param paramType 参数类型
     * @return < br/>
     * 1. true 能支持 paramType 的拆分
     * 2. false 不能支持 paramType 的拆分
     */
    boolean support(Class<P> paramType);
}

系统内置实现如下:

ParamSplitter类图

涉及的类包括:

含义
AbstractParamSplitter ParamSpltter 公共父类,用于封装一些通用行为
AbstractFixTypeParamSplitter 固定类型拆分器的父类,从泛型中获取类型信息,并实现 support 方法
AnnBasedParamSplitter 实现带有 @SplitParam 注解的 Param Object 的拆分
SplittableParamSplitter 实现 SplittableParam 子类的拆分
SetParamSplitter 实现对 Set 的拆分
ListParamSplitter 实现对 List 的拆分
InvokeParamsSplitter 实现对 InvokeParams 的拆分
3.2.2. MethodExecutor

MethodExecutor 接口定义如下:

public interface MethodExecutor {
    /**
     * 执行函数,并返回结果
     * @param function 待执行的函数
     * @param ps 执行函数所需的参数
     * @param <P> 入参
     * @param <R> 返回值
     * @return
     * 所有的执行结果
     */
    <P,R> List<R> execute(Function<P, R> function, List<P> ps);
}

核心实现包括:

ParamSplitter类图

涉及的类有:

含义
AbstractMethodExecutor 抽象父类,实现通用逻辑
SerialMethodExecutor 串行执行器,所有任务在主线程中串行执行
ParallelMethodExecutor 并行执行器,任务在主线程和线程池中并行执行
3.2.3. ResultMerger

ResultMerger 接口定义如下:

public interface ResultMerger<R> {

    /**
     * 对多个执行结果进行合并处理
     * @param rs 执行结果
     * @return 合并之后的最终结果
     */
    R merge(List<R> rs);
}

与 ParamSplitter 类似,存在一个 SmartResultMerger 根据类型完成组件装配,其定义如下:

public interface SmartResultMerger<R> extends ResultMerger<R>{
    /**
     * 是否能支持特定结果的合并
     * @param resultType 结果类型
     * @return
     */
    boolean support(Class<R> resultType);
}

核心实现包括:

ParamSplitter类图

涉及的类有:

含义
AbstractResultMerger 公共父类,对通用逻辑进行封装
AbstractFixTypeResultMerger 固定类型 合并器 公共父类,通过泛型获取类型信息,并实现 support 方法
IntResultMerger 对 int 进行合并,将结果进行 sum 处理
LongResultMerger 对 ling 进合并,将结果进行 sum 处理
ListResultMerger 对 List 进行合并
SetResultMerger 对 Set 进行合并
3.2.4. DefaultSplitService

DefaultSplitService 基于以上三个组件,完成整个拆分流程,核心代码如下:

/**
 * 请求处理流程如下: <br />
 * 1. 对参数 P 进行拆分 <br />
 * 2. 用拆分结果分别调用 function 获取执行结果 <br />
 * 3. 将多个执行结果进行合并,并返回 <br />
 * @param function 执行方法,入参为 P,返回值为 R
 * @param p 调用函数入参
 * @param maxSize 每批次最大数量

 * @return
 */
@Override
public R split(Function<P, R> function, P p, int maxSize) {
    Preconditions.checkArgument(function != null);
    Preconditions.checkArgument(maxSize > 0);

    // 入参为 null,直接调用函数
    if (p == null){
        return function.apply(p);
    }

    // 对参数进行拆分
    List<P> params = this.paramSplitter.split(p, maxSize);

    //没有拆分结果,直接调用函数
    if (CollectionUtils.isEmpty(params)){
        return function.apply(p);
    }

    // 拆分结果为 1,使用拆分直接调用函数
    if (params.size() == 1){
        return function.apply(params.get(0));
    }

    // 基于执行器 和 拆分结果 执行函数
    List<R> results = this.methodExecutor.execute(function, params);

    // 对执行结果进行合并处理
    R result = this.resultMerger.merge(results);

    return result;
}

3.3. Spring 集成

与Spring 集成,核心设计如下:

Spring集成

其中,包括几个核心组件:

  1. SplitInvokerProcessor。基于 BeanPostProcessor 扩展对 Spring 的托管 Bean 进行扫描,将 Invoker 注册至 Registry 中;

  2. SplitInvokerRegistry。用于维护 Method 和 SplitInvoker 的映射关系,是拦截器与处理器间的桥梁;

  3. SplitInterceptor。拆分拦截器,拦截 Proxy 调用,将请求转发至 SplitInvoker,执行拆分逻辑;

由于涉及的组件比较多,为了方便使用,使用 spring-boot 的自动装配机制进行集成,无需关注细节,只需引入对应的 starter 依赖即可。

核心配置类详见 SplitterAutoConfiguration,该配置类将完成:

  1. PointcutAdvisor, 拦截器配置

  2. BeanProcessor,处理器配置,Spring 启动时,对 @Split 注解进行处理

  3. SplitInvokerRegistry,InvokerRegistry 使用单例特性,共享注册信息

  4. ParallelMethodExecutor,多线程执行器

  5. ParamSplitter,预制参数拆分器

  6. ResultMerger,预制结果合并器

3.4. 功能扩展

一般情况下,系统预设功能已经能够满足大多数需求,如有特殊情况,可以对功能进行扩展。

功能扩展主要分两个步骤:

  1. 定义扩展组件,并将其在 Spring 中进行注册;

  2. @Split 注解上手工指定所使用的bean;

3.4.1. @Split 注解

Split 注解定义如下:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Split {
    int sizePrePartition() default 20;

    int taskPreThread() default 3;

    String paramSplitter() default "";

    String executor() default "defaultSplitExecutor";

    String resultMerger() default "";
}

配置含义详见:

配置 含义
sizePrePartition 拆解后,每一个分区最大的元素个数
taskPreThread 每一个线程执行的任务数
paramSplitter 参数拆分器名称(spring bean name),默认通过 smart 组件自动查找
executor 执行器名称(spring bean name),defaultSplitExecutor 并发执行器
resultMerger 结果合并器名称(spring bean name),默认通过 smart 组件自动组装
3.4.2. ParamSplitter 扩展

扩展流程为:

  1. 编写 ParamSplitter 实现类;

  2. 将其注册到 spring 容器;

  3. @Split 中通过 paramSplitter 属性指定 bean name

3.4.3. MethodExecutor 扩展

扩展流程为:

  1. 根据需求在 spring 中注册新的 MethodExecutor(比如调整线程池数量)

  2. @Split 中通过 executor 属性指定 bean name

3.4.4. ResultMerger 扩展

扩展流程为:

  1. 编写 ResultMerger 实现类;

  2. 将其注册到 spring 容器;

  3. @Split 中通过 resultMerger 属性指定 bean name

4. 最后

0人推荐
随时随地看视频
慕课网APP