我正在尝试运行 2 个订阅了 2 个不同主题的消费者。两个消费者程序每次运行一个时都运行正常,但同时运行时,其中一个消费者总是显示异常:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
我遵循了建议max.pool.size,将 2设置为session.timeout.ms30000,1000heartbeat.interval.ms
下面是我的消费者函数,这两个文件的函数是相同的,只是主题名称更改为Test2,并且我在同时运行的 2 个不同类中运行这两个函数。
public void consume()
{
//Kafka consumer configuration settings
List<String> topicNames = new ArrayList<String>();
topicNames.add("Test1");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("heartbeat.interval.ms", "1000");
props.put("max.poll.records", "2");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topicNames);
由于此错误,记录不会在Kafka主题中提交。我该如何克服这个错误?
倚天杖
相关分类