分别运行具有不同主题的 2 个消费者时出现 Kafka CommitFailedException

我正在尝试运行 2 个订阅了 2 个不同主题的消费者。两个消费者程序每次运行一个时都运行正常,但同时运行时,其中一个消费者总是显示异常:


org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

我遵循了建议max.pool.size,将 2设置为session.timeout.ms30000,1000heartbeat.interval.ms


下面是我的消费者函数,这两个文件的函数是相同的,只是主题名称更改为Test2,并且我在同时运行的 2 个不同类中运行这两个函数。


    public void consume()

    {

        //Kafka consumer configuration settings

        List<String> topicNames = new ArrayList<String>();

        topicNames.add("Test1");

        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");

        props.put("group.id", "test");

        props.put("enable.auto.commit", "false");

        props.put("session.timeout.ms", "30000");

        props.put("heartbeat.interval.ms", "1000");

        props.put("max.poll.records", "2");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(topicNames);


由于此错误,记录不会在Kafka主题中提交。我该如何克服这个错误?


汪汪一只猫
浏览 98回答 1
1回答

倚天杖

在您的情况下,您需要为消费者分配不同的组 ID。您正在使用相同的组 ID 创建两个消费者(这是可以的),但是调用 subscribe 两次是不行的。您可以一次运行一个消费者,因为您只调用 subscribe 一次。如果您需要任何进一步的帮助,请告诉我。很高兴能帮助你。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java