猿问

具有相同组 ID 的 Kafka 消费者线程使用相同的记录

我需要在多个线程中使用来自 Kafka 分区的记录,每个线程上都有唯一的记录进行处理。我有以下代码,我不知道是什么错误


public class ConsumerThread implements Runnable {

    public String name;

    public ConsumerThread(String name){

        this.name = name;

    }

    public Properties getDefaultProperty(){

        Properties prop = new Properties();

        prop.setProperty("group.id", "4");

        prop.put("enable.auto.commit", "false");

        prop.put("auto.offset.reset", "earliest");

        prop.setProperty("bootstrap.servers", "localhost:9092");

        prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        prop.setProperty("max.poll.records","150");

        return prop;

    }

    public void run() {

        TopicPartition tp = new TopicPartition("my.topic", 0);

        KafkaConsumer consumer = new KafkaConsumer(getDefaultProperty());

        ArrayList tpList = new ArrayList<TopicPartition>();

        tpList.add(tp);

        consumer.assign(tpList);

        ConsumerRecords poll = consumer.poll(1000);

        Iterator it = poll.iterator();

        consumer.commitAsync();

        while(it.hasNext()){

            ConsumerRecord cr = (ConsumerRecord) it.next();

            System.out.println("From "+this.name+" : "+cr.value());

        }

        consumer.close();

        System.out.println("Thread Exiting "+this.name);

    }

}

结果


From Thread1 : produced_0

From Thread1 : produced_1

From Thread1 : produced_2

From Thread1 : produced_3

.

.

.

From Thread1 : produced_136

From Thread2 : produced_0

From Thread2 : produced_1

From Thread2 : produced_2

From Thread2 : produced_3

.

.

.


预期的 :


From Thread1 : produced_0

From Thread1 : produced_1

From Thread1 : produced_2

From Thread1 : produced_3

.

.

.

From Thread1 : produced_136

From Thread2 : produced_4

From Thread2 : produced_5

From Thread2 : produced_6

From Thread2 : produced_137


繁花不似锦
浏览 260回答 2
2回答

波斯汪

就像 Lior Chaga 在他的评论中所说的那样,您正在手动将主题分区分配给您的消费者。这不是执行此操作的推荐方法。最重要的是,您的所有消费者似乎都在使用完全相同的 groupID。利用这种结构,有两个线程消费,如果消费者的至少一个有一个特定的消息,没有其他线程会得到一个。如果您希望所有的消费者线程都获得自己的“一组”消息,而不会相互中断,那么您需要给它们不同的group.ids。要订阅主题以便它为您处理自动重新平衡,然后消费,您应该执行以下操作(取自下面链接的 KafkaConsumer javadoc):&nbsp;consumer.subscribe(Arrays.asList("foo", "bar"));&nbsp;while (true) {&nbsp; &nbsp; &nbsp;ConsumerRecords<String, String> records = consumer.poll(100);&nbsp; &nbsp; &nbsp;for (ConsumerRecord<String, String> record : records)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());&nbsp;}Kafka 官方 javadocs 有更详细的解释:https ://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

大话西游666

只有使用kafka 消费者的subscribe方法才能将分区自动分配给消费者组。但是,您使用assign特定主题分区,因此您承担将特定分区分配给不同消费者的责任(但您始终使用相同的分区0,因此所有消费者都从同一主题分区消费)。
随时随地看视频慕课网APP

相关分类

Java
我要回答