手记

【金秋打卡】第7天 Go中间件集成学习-go使用kafka

课程名称海量数据高并发场景,构建Go+ES8企业级搜索微服务

课程章节:6-4

课程讲师少林码僧

课程内容

★如何处理消息丢失问题? 

    先说一个前置设置:当生产者向集群生产消息的时候,有三种模式。

    acks = 0 的时候,是生产者不管集群有没有返回ack响应,都只管发自己的

    acks = 1 , 至少要等待集群的leader写入成功,无需等待follower写入成功。但是当leader已经宕掉,新的leader还未选举出来,生产者就会收到错误信息。生产者可以根据设置,重发消息,但是如果新的leader节点(raft算法选举)还没有同步到你发的消息,依然是报错的。这时候,可以采用回调的方式

    acks = -1/all, 此模式要保证除了leader收到消息,还要保证 min.insync.replicas=2 这个配置中设定的副本数量收到消息,比如2就是至少两个2副本。一般只有金融系统才需要这么严格。

    

  1. 如果业务对消息丢失人容忍度很低,可以设置acks=-1/all

  2. 设置发送重试的次数,次数越多丢的几率越小。比如在go kafka的sdk中设置 config.Producer.Retry.Max = 5 //默认是3

  3. 不允许落后的broker成为新的leader

    Broker配置文件中设置:unclean.leader.election.enable:false

    这个配置是为了限制kafka选举的时候,谁有资格成为leader。当该配置为false的时候,落后于leader的follower将没有资格选为leader。因为在raft算法中,如果一个节点被选举为leader,那么其他节点都要和这个节点进行同步,如果比该节点先进,那么他将放弃他领先于leader的部分,这在kafka上就成为了丢数据

  4. 增加消息副本

    以空间来换取可靠性。Broker配置文件中配置 replocation.factor:3,就是包含主分区在内所有副本为3,每次新消息将被保存为3份。

  5. 设置至少2个副本写入成功

    Broker配置文件中设置:min.insync.replicas:2

    需要注意的是,副本数量必须大于该值,replication.factor 要设置大于 min.insync.replicas,如果相同,那么一个副本挂掉就全挂了。


如何避免重复消费的问题?

    前置知识:

    ▲消息传递的三种质量标准

        ●至多一次(At most once)    允许消息丢失,不允许消息重复

        ●至少一次(At least once)     允许消息重复,不允许消息丢失

        ●只有一次(Exactly once)      不允许消息重复,也不允许消息丢失

    市面上的队列,包括kafka ,rabbitMQ 等都是默认的  至少一次 送达,那么避免重复消费就有以下几个解决方式:

  1. 通过幂等性来处理重复消费问题

    使用业务主键来做ES的文档_id

    ▲利用数据库的主键或者唯一索引来实现幂等性

    ▲通过Token机制或者全局唯一ID的机制记录消息的消费状态来实现,分布式中实现起来比较困难,需要加分布式锁,耗费资源较大


如何处理消息乱序的问题?

    不同应用服务中,多个消费者消费一个topic的时候,并不能保证消费的顺序。

  1. 设置分区策略为Hash

    在go中,只需要指定消息的key即可,在kafka中,如果有key,那么会对key进行hash,然后对Partition数量进行取模来确定要去哪个Partition、

★哪些情况会触发kafka消费者的rebalance

        ▲ 什么是消费者的reblance

            Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。

            例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。

       ▲触发条件

  1. 消费组中的消费者加入或者离开,这也是发生最多的情况

  2. partion 数量的变化

  3. 对topic的订阅发生变化

        ▲go的kafka的sdk一些坑

主流的go的sdk由于在设计上出现一些缺陷(可以选用其他的sdk),导致修改完topic配置,新增了partion后,go如果不重启消费者和生产者,有可能一直无法识别新的partion,7天后新分区上的msg可能就丢了

        ▲如何避免消费者rebalance

  • 将消费者代码尽可能隔离出来,拆分成单独的微服务 

    消费者所在的服务如果出现panic,会导致整个服务挂掉重启,这就会造成消费者的rebalance

  • 合理设置这个max.poll.interval.ms这个参数

    消费者消费时间过长,导致间隔过大,长时间不通知就会被kafka判定为死亡,踢出了消费组。根据业务处理最长时间来设置

  • 合理设置 session.timeout.ms 和 heartbeat.interval.ms        

    

            



0人推荐
随时随地看视频
慕课网APP