手记

Kafka学习分享(4)

1-再均衡
2-seek的使用
3-在flume中的使用

再均衡概念

再均衡是指分区的所属权从一个消费者转移到另一个消费者,它为消费组具备高可用性和伸缩性提供
保障,使我们方便安全的删除消费组内的消费者或者往消费组内添加消费者。

不过在发生再均衡期间,消费组内的消费者是无法读取消息的。消费组会变得不可用。

另外当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个
分区中的一部分消息时还没来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消
费组内的另一个消费组,原来被消费完的那部分消息又被重新消费了一遍,也就是发生了重复消费。

ConsumerRebalanceListener

再均衡监听器用来设定发生再均衡动作前后的一些准备或者收尾动作。ConsumerRebalanceListener是一个接口,包含两个方法:
void onPartitionsRevoked(Collection<TopicPartition> partitions)
这个方法会在在均衡开始之前和消费者停止读取消息之后被调用。可以通过这个方法来处理消费位
移的提交,以避免一些不必要的重复消费现象。
参数partitions表示再均衡前所分配到的分区

void onPartitionsAssigned(Collection<TopicPartition> partitions)
这个方法会在重新分配之后和消费者开始读取消费之前被调用。参数partitions表示再均衡后所分
配到的分区。

监听器的简单使用
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                //提交消费位移
                consumer.commitSync(currentOffsets);
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                //do nothing.
            }
        });

        try {
            while (true) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    //消费消息
                    currentOffsets.put(
                            new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1));
                }
                consumer.commitAsync(currentOffsets, null);
            }
        } finally {
            consumer.close();
        }

seek的使用

一般再均衡的时候可以配合seek方法,指定位移消费。
KafkaConsumer提供了seek()方法:
public void seek(TopicPartition partition,long offset)
seek()方法中partition表示分区,而offset参数用来指定从分区中的哪个位置开始消费。
seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在poll()方法的调用过程中
实现的。
也就是说在执行seek()方法之前需要先执行一次poll()方法等到分配到分区之后才可以重置消费位
置。

 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        long start = System.currentTimeMillis();
        Set<TopicPartition> assignment = new HashSet<>();
        while (assignment.size() == 0) {
            consumer.poll(Duration.ofMillis(100));
            //获取到消费者被分配到的分区的信息
            assignment = consumer.assignment();
        }
        long end = System.currentTimeMillis();
        System.out.println(end - start);
        System.out.println(assignment);
        for (TopicPartition tp : assignment) {
            consumer.seek(tp, 10);
        }

因此发生再均衡的时候我们可以在onPartitionsAssigned中使用seek获取到开始消费的位移,进行
设置

在flume中的使用

在flume中的KafkaSource中也进行了再均衡的处理。

doStart():
// Subscribe for topics by already specified strategy
subscriber.subscribe(consumer, new SourceRebalanceListener(rebalanceFlag));

具体的实现就是SourceRebalanceListener:
class SourceRebalanceListener implements ConsumerRebalanceListener {
  private static final Logger log = LoggerFactory.getLogger(SourceRebalanceListener.class);
  private AtomicBoolean rebalanceFlag;

  public SourceRebalanceListener(AtomicBoolean rebalanceFlag) {
    this.rebalanceFlag = rebalanceFlag;
  }
  // Set a flag that a rebalance has occurred. Then commit already read events to kafka.
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition : partitions) {
      log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition());
      rebalanceFlag.set(true);
    }
  }
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition partition : partitions) {
      log.info("topic {} - partition {} assigned.", partition.topic(), partition.partition());
    }
  }
}

可以看到在flume中,如果发生了再均衡就会在上面提到的两个回调方法中执行对应的逻辑。
具体执行的内容大家可以阅读源码进行学习。

后续

Spark Streaming+kafka
0人推荐
随时随地看视频
慕课网APP