我需要在多个线程中使用来自 Kafka 分区的记录,每个线程上都有唯一的记录进行处理。我有以下代码,我不知道是什么错误
public class ConsumerThread implements Runnable {
public String name;
public ConsumerThread(String name){
this.name = name;
}
public Properties getDefaultProperty(){
Properties prop = new Properties();
prop.setProperty("group.id", "4");
prop.put("enable.auto.commit", "false");
prop.put("auto.offset.reset", "earliest");
prop.setProperty("bootstrap.servers", "localhost:9092");
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("max.poll.records","150");
return prop;
}
public void run() {
TopicPartition tp = new TopicPartition("my.topic", 0);
KafkaConsumer consumer = new KafkaConsumer(getDefaultProperty());
ArrayList tpList = new ArrayList<TopicPartition>();
tpList.add(tp);
consumer.assign(tpList);
ConsumerRecords poll = consumer.poll(1000);
Iterator it = poll.iterator();
consumer.commitAsync();
while(it.hasNext()){
ConsumerRecord cr = (ConsumerRecord) it.next();
System.out.println("From "+this.name+" : "+cr.value());
}
consumer.close();
System.out.println("Thread Exiting "+this.name);
}
}
结果
From Thread1 : produced_0
From Thread1 : produced_1
From Thread1 : produced_2
From Thread1 : produced_3
.
.
.
From Thread1 : produced_136
From Thread2 : produced_0
From Thread2 : produced_1
From Thread2 : produced_2
From Thread2 : produced_3
.
.
.
预期的 :
From Thread1 : produced_0
From Thread1 : produced_1
From Thread1 : produced_2
From Thread1 : produced_3
.
.
.
From Thread1 : produced_136
From Thread2 : produced_4
From Thread2 : produced_5
From Thread2 : produced_6
From Thread2 : produced_137
波斯汪
大话西游666
相关分类