手记

kafka-08-SpringBoot Kafka实战

注意:
当前的 Kafka 版本无法保证每个消息“只被保存一次”。现实中的很多应用程序在消息里加入唯一标识符,用于检测重复消息,消费者在读取消息时可以对它们进行清理。应用程序需要可以做到消息的“幂等”,也就是说,即使出现了重复消息,也不会对处理结果的正确性造成负面影响。

整合SpringBoot kafka,加入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

1. 新建或者更新主题

KafkaConfig.java

/**
 * Description: kafka相关配置
 *
 * @author Xander
 * datetime: 2021-01-08 17:40
 */
@Configuration
public class KafkaConfig {
    /**
     * 主题
     */
    public static final String TOPIC_SPRING_KAFKA = "SpringKafka";

    /**
     * 新建或者更新Topic并设置分区数为3,分区副本数为1,
     * 这里设置仅仅时测试使用,主题的分区数和每个分区的副本数,需要根据业务场景和硬件条件去考虑
     * <p>
     * 我们也可以不手动创建topic,因为kafka server.properties 配置文件中 auto.create.topics.enable 默认为 true,
     * 表示如果主题不存在,则自动创建主题,
     * 分区数量由kafka server.properties 配置文件中 num.partitions 指定,默认是 1
     * 所以如果是自动创建主题,则默认的分区数为1,分区副本数为1
     *
     * @return
     */
    @Bean
    public NewTopic newOrUpdateTopic() {
        // 通过TopicBuilder新建或者update Topic,
        // 注意:主题的分区只能新增,不能减少分区
        return TopicBuilder.name(TOPIC_SPRING_KAFKA).replicas(1).partitions(3).build();
    }

}

我们也可以不手动创建主题,因为kafka server.properties 配置文件中 auto.create.topics.enable 默认为 true,表示如果主题不存在,则自动创建主题,分区数量由kafka server.properties 配置文件中 num.partitions 指定,默认是 1,所以如果这里不手动创建主题的话,kafka如果检查到主题不存在,会自动新建分区数和副本数都为1的主题。

注意: 如果主题已存在,NewTopic如果要update已存在的主题,分区数只能大于等于已有的分区数,不能减少分区。

2. SpringBoot kafka 配置

常用的生产者和消费者相关的配置都列出来,并表明了注释。
application.yml

spring:
  kafka:
#    kafka集群broker列表 host1:por1,host2:port2,host3:port3
    bootstrap-servers: docker01:9092

########生产者配置########
    producer:
#     compression-type 消息的压缩算法
#     默认情况下是 none,消息发送时不会被压缩。 该参数可以设置为 none, gzip, snappy, lz4, zstd
      compression-type: none
#     acks 有多少个分区副本收到消息,生产者才会认为消息写入是成功的,只能选(0、1、all)
      acks: all
#     bufferMemory 生产者内存缓冲区的大小,下面是32MB
      bufferMemory: 33554432
#     retries 发生临时性的错误(比如分区找不到首领)重试次数,
#      默认情况下,生产者会在每次重试之间等待 100 ms,可以通过 retry.backoff.ms 参数来改变这个时间间隔
      retries: 3
#      key和value 的序列化器,这两个默认是 StringSerializer.class
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#      batch-size 批次大小,按照字节数计算
      batch-size: 1024
      properties:
#        自定义分区器
#        partitioner:
#          class: com.xander.kafka.partitioner.XdPartitioner
#        request.timeout.ms 在发送数据时等待服务器返回响应的时间
        request:
          timeout:
            ms: 1000
#        发送批次之前等待更多消息加入批次的时间
#        linger.ms为0,表示生产者每条消息都直接提交给kafka,不等待批次,这时候batch-size其实就没用了
        linger:
          ms: 100
#      retry.backoff.ms  每次重试之间的时间间隔,默认是100ms,这里配置50ms
        retry:
          backoff:
            ms: 50
#   max.in.flight.requests.per.connection 在收到服务器响应之前可以发送多少个消息,如果不需要保证消息顺序性的场景,建议不用配置该属性
#  把它设为 1 可以保证消息在同一个生产者的某一个分区上,是按照发送的顺序写入服务器的,即使发生了重试。但是会降低Kafka的吞吐量
        max:
          in:
            flight:
              requests:
                per:
                  connection: 1
#  max.block.ms 缓冲区满时的最大阻塞时间,在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。
          block:
            ms: 200

########### 消费者配置 ###############
    consumer:
#  auto-offset-reset: 没有偏移量的分区或者偏移量无效时如何处理
# earliest: 消费者将从起始位置读取分区的记录
# latest: 消费者将从最新的记录开始读取数据
# none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: earliest
# group-id 默认的消费者群组
      group-id: defaultGroup
# enable.auto.commit 是否自动提交偏移量,
      enableAutoCommit: true
#      自动提交偏移量的间隔时间,100ms
      autoCommitInterval: 100ms
#  单次请求能够返回的记录数量
      max-poll-records: 3
#  fetch.max.wait.ms 指定获取记录的最大等待时间,这里是100ms
      fetchMaxWait: 100ms
#      key和value 的反序列化器,这两个默认是 StringSerializer.class
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
#        fetch.min.bytes 从服务器获取记录的最小字节数
        fetch:
          min:
            bytes: 102400
#        request.timeout.ms 消费者请求超时时间
        request:
          timeout:
            ms: 1000
#        会话过期时间
        session:
          timeout:
            ms: 120000
#       向协调器发送心跳的频率
        heartbeat:
          interval:
            ms: 40000
# 如果需要批量消费,则需要修改 spring.kafka.listener.type = batch,默认是 single,单次消费单条消息
#    listener:
#      type: batch
#   手动提交偏移量时:消费者消息确认模式改为手动确认
#    listener:
#      ack-mode: manual

3. 生产者向kafka写数据

3.1 发送并忽略结果

我们把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。因为我们会忽略返回值,所以无法知道消息是否发送成功。
如果允许丢失一小部分消息,并且不关心发送结果,那么可以使用这种发送方式。这种方式可以达到最大的响应速度和吞吐性能。

3.2 同步发送

返回一个 Future 对象,然后调用 Future 对象的 get() 方法等待 Kafka 响应。如果服务器返回错误, get() 方法会抛出异常。如果没有发生错误,我们会得到一个 RecordMetadata 对象,可以用它获取消息的主题、分区和偏移量等信息。

3.3 异步发送

在异步发送消息方式中生产者提供了回调支持,可以在回调中处理异常和获取消息的主题、分区和偏移量等信息

Kafka生产者

/**
 * Description: Kafka生产者
 *
 * @author Xander
 * datetime: 2021-01-10 10:29
 */
@RestController
@RequestMapping("/kafka")
public class KafkaController {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    // 发送消息
    @GetMapping("/{msg}")
    public void send(@PathVariable String msg) throws ExecutionException, InterruptedException {
        long start = Instant.now().toEpochMilli();
        this.logger.info("------start");
        // 发送并忽略结果
        // this.sendAndForget(msg);
        // 同步发送
        this.sendSync(msg);
        // 异步发送
        // this.sendAsync(msg);
        this.logger.info("------end: " + (Instant.now().toEpochMilli() - start));
    }

    /**
     * 发送并忽略结果
     *
     * @param msg
     */
    private void sendAndForget(String msg) {
        kafkaTemplate.send(KafkaConfig.TOPIC_SPRING_KAFKA, msg);
    }

    /**
     * 同步发送
     *
     * @param msg
     */
    private void sendSync(String msg) throws ExecutionException, InterruptedException {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(KafkaConfig.TOPIC_SPRING_KAFKA, msg);
        SendResult<String, String> sendResult = future.get();
        RecordMetadata recordMetadata = sendResult.getRecordMetadata();
        this.logger.info("发送成功:" + recordMetadata.topic() + "--" + recordMetadata.partition() + "---" + recordMetadata.offset());
    }

    /**
     * 异步发送
     *
     * @param msg
     */
    private void sendAsync(String msg) {
        kafkaTemplate.send(KafkaConfig.TOPIC_SPRING_KAFKA, msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {

            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata recordMetadata = sendResult.getRecordMetadata();
                logger.info("发送成功:" + recordMetadata.topic() + "--" + recordMetadata.partition() + "---" + recordMetadata.offset());
            }
        });
    }
}

3.1.2 发送成功

发送下面的请求:
http://localhost:8080/kafka/123

kafka-console-consumer.sh 工具订阅 SpringKafka 主题,可以看到消息 ‘123’ 发送成功

[root@docker01 ~]# /usr/local/kafka_2.13-2.6.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.31:9092 --topic SpringKafka
123

4. 消费者从kafka读数据

**说明:**这里只演示每次消费单条记录的案例,如果要批量消费记录,需要修改 spring.kafka.listener.type = batch,默认是 single (单次消费单条消息)。
批量消费,请参考 Springboot kafka参考文档: docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation

Springboot kafka参考文档的批量消费举例

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

4.1 自动提交偏移量

如果消费者属性 enable.auto.commit 被设为 true ,那么每过 auto.commit.interval.ms (提交时间间隔,默认值是 5s ),消费者会自动把上一次轮询接收到的最大偏移量提交上去。
自动提交是在轮询里进行的,消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

自动提交会可能会导致消息重复消费
假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s ,所以在这 3s 内到达的消息会被重复处理。

每次消费单个记录,并在轮询中自动提交偏移量

/**
 * Description: Kafka消费者
 *
 * @author Xander
 * datetime: 2021-01-10 10:32
 */
@Component
public class KafkaConsumer {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 每次消费单个记录
     *
     * @param record
     */
    @KafkaListener(topics = {KafkaConfig.TOPIC_SPRING_KAFKA})
    public void onListen(ConsumerRecord<?, ?> record) {
        this.logger.info("消费单个记录----- 主题:" + record.topic() + "-分区:" + record.partition() + "-key:" + record.key()
                + "-value:" + record.value() + "-偏移量:" + record.offset());
    }


}

4.2 手动提交偏移量

消费者也可以手动提交偏移量,在每处理成功一条消息后就手动提交一次偏移,这能够保证已经处理的消息都被准确的提交。但是在前面说过,当前的 Kafka 版本无法保证每个消息“只被保存一次”,例如:当生产者发送消息到broker,broker发送响应的时候,因为网络关系,生产者没有接收到正确的响应,这时候,会发生重试,再次发送消息,这时,就可能产生重复的消息。

防止消息重复消费:
建议在生产中对消息添加唯一标识,在消费者消费消息的时候,对唯一标识进行判断,是否已经消费了该消息,如果已经消费过,则不做任何处理,从而达到防止消息重复消费的目的。

手动提交偏移量,需要配置 enable.auto.commit = false 取消自动提交,并且 spring.kafka.listener.ack-mode = manual 消费者消息确认模式改为手动确认

/**
 * Description: Kafka消费者,手动提交偏移量,
 * 需要配置 enable.auto.commit = false 取消自动提交,并且 spring.kafka.listener.ack-mode = manual 消费者消息确认模式改为手动确认
 *
 * 提示:手动提交偏移量,能够最大程度减少重复消费消息,但是在消息未处理完成,提前提交偏移量,也可能导致消息丢失
 * 关于提交偏移量,请参考下面文章的第6节
 * [CSDN同步:kafka-05-消费者] https://blog.csdn.net/qq_20633779/article/details/112335534
 *
 * @author Xander
 * datetime: 2021-01-10 10:32
 */
@Component
public class KafkaConsumerWithAck {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 每次消费单个记录,并且手动提交偏移量
     *
     *
     * @param record
     */
    @KafkaListener(topics = {KafkaConfig.TOPIC_SPRING_KAFKA})
    public void onListenWithAck(ConsumerRecord<?, ?> record, Acknowledgment ack) throws InterruptedException {
        this.logger.info("消费单个记录----- 主题:" + record.topic() + "-分区:" + record.partition() + "-key:" + record.key()
                + "-value:" + record.value() + "-偏移量:" + record.offset());
        //模拟业务逻辑处理。。。
        this.logger.info("业务处理中...");
        TimeUnit.SECONDS.sleep(10);
        // 手动提交偏移量,表示这个偏移量之前的所有记录已经被处理
        ack.acknowledge();
    }
}

4.3 模拟MQ的死信队列

在遇到可重试错误时,把错误写入一个独立的主题, 一个独立的消费者群组负责从该主题上读取错误消息,并进行重试,这种模式有点像其他消息系统里的 dead-letter-queue

/**
 * Description: Kafka消费者: 消息转发
 * 在遇到可重试错误时,把错误写入一个独立的主题, 一个独立的消费者群组负责从该主题上读取错误消息,并进行重试,这种模式有点像其他消息系统里的 `dead-letter-queue`
 *
 * @author Xander
 * datetime: 2021-01-10 10:32
 */
@Component
public class KafkaConsumerSendTo {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 消息转发
     *
     * @param record
     */
    @KafkaListener(topics = {KafkaConfig.TOPIC_SPRING_KAFKA}, groupId = "sendToGroupId")
    @SendTo("test")
    public String onListen(ConsumerRecord<String, String> record) {
        this.logger.info("转发消息到test主题 ----- 主题:" + record.topic() + "-分区:" + record.partition() + "-key:" + record.key()
                + "-value:" + record.value() + "-偏移量:" + record.offset());
        // return的数据就是转发到 test 主题的消息
        return record.value();
    }


}

5. 自定义分区器

发送消息时候,kafkaTemplate会通过传入的 主题topic、分区partition、键key、值value,其中分区partition和键key是可选的,创建一个 ProducerRecord 对象。

  • 如果在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。

  • 如果没有指定分区 ,那么分区器会根据 key 来选择一个分区 。
    选好分区以后 ,生产者就知道该往哪个主题和分区发送这条记录了。

  • 如果 key 为 null , 并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用 轮询(Round Robin )算法 将消息均衡地分布到各个分区上。

  • 如果键不为空,并且使用了默认的分区器,那么 Kafka 会 对键进行散列,然后根据散列值把消息映射到特定的分区上。这里的关键之处在于 ,同一个键总是被映射到同一个分区上 ,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区 。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。

上面说的是默认的分区器,我们也可以根据业务场景自定义分区器。
新建一个 org.apache.kafka.clients.producer.Partitioner 接口的实现类 com.xander.kafka.partitioner.XdPartitioner,然后配置
spring.kafka.producer.properties.partitioner.class= com.xander.kafka.partitioner.XdPartitioner

自定义Kafka分区器,每条消息都发送到分区0

/**
 * Description: 自定义Kafka分区器,每条消息都发送到分区0
 *
 * @author Xander
 * datetime: 2021-01-13 19:41
 */
public class XdPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //这里可以根据业务场景将消息路由到不同的分区
        // return 0 表示每条消息都发送到分区0
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

通过日志打印可以看到,发送的消息都被路由到分区0上了,打印的日志格式是 “发送成功:主题–分区—偏移量”。

2021-01-13 19:55:27.957  INFO 17384 --- [nio-8080-exec-1] c.xander.kafka.producer.KafkaController  : ------start
2021-01-13 19:55:28.192  INFO 17384 --- [nio-8080-exec-1] c.xander.kafka.producer.KafkaController  : 发送成功:SpringKafka--0---0
2021-01-13 19:55:28.193  INFO 17384 --- [nio-8080-exec-1] c.xander.kafka.producer.KafkaController  : ------end: 236
2021-01-13 19:55:34.935  INFO 17384 --- [nio-8080-exec-2] c.xander.kafka.producer.KafkaController  : ------start
2021-01-13 19:55:35.037  INFO 17384 --- [nio-8080-exec-2] c.xander.kafka.producer.KafkaController  : 发送成功:SpringKafka--0---1
2021-01-13 19:55:35.038  INFO 17384 --- [nio-8080-exec-2] c.xander.kafka.producer.KafkaController  : ------end: 103
2021-01-13 19:55:38.163  INFO 17384 --- [nio-8080-exec-4] c.xander.kafka.producer.KafkaController  : ------start
2021-01-13 19:55:38.268  INFO 17384 --- [nio-8080-exec-4] c.xander.kafka.producer.KafkaController  : 发送成功:SpringKafka--0---2
2021-01-13 19:55:38.268  INFO 17384 --- [nio-8080-exec-4] c.xander.kafka.producer.KafkaController  : ------end: 105

6. 业务场景举例

一个应用程序在很多情况下需要往 Kafka 写入消息 :

  • 记录用户的活动(用于审计和分析)、
  • 记录度量指标、
  • 保存日志消息、
  • 记录智能家电的信息、
  • 与其他应用程序进行异步通信、
  • 缓冲即将写入到数据库的数据,等等。

多样的使用场景意味着多样的需求:

  • 是否每个消息都很重要?
  • 是否允许丢失一小部分消息?
  • 偶尔出现重复消息是否可以接受?
  • 是否有严格的延迟和吞吐量要求?

6.1 不允许的消息丢失或消息重复,允许一点点的延迟

在信用卡事务处理系统里,消息丢失或消息重复是不允许的,可以接受的延迟最大为 500ms ,对吞吐量要求较高 我们希望每秒钟可以处理一百万个消息。

这种情况下,实现方案: 建议生产者端可以使用 同步发送 解决消息丢失问题,同时给消息 添加唯一标识,来解决消息的重复消费问题。

6.2 允许丢失少量的消息或出现少量的消息重复,追求高响应和高吞吐

保存网站的点击信息是另一种使用场景。在这个场景里,允许丢失少量的消息或出现少量的消息重复,只要不影响用户体验就行,在数以千万计的点击量中,丢失少量的消息并不会有什么影响。

这种情况下,实现方案: 建议生产者使用 发送并忘记的方式 来发送消息,如果系统要对发送失败的消息进行处理,则可以使用 异步发送 的方式,在回调中处理异常,以追求最大的吞吐量。

代码:
github.com/wengxingxia/kafka-springboot.git

0人推荐
随时随地看视频
慕课网APP