如何在java中编写kafka消费者而不使用无限循环进行轮询?
在处理传入记录函数中编写了 while(true) 循环,其中轮询新事件。如果我在我的项目中使用它,除了这个我无法做任何其他事情。有没有办法避免使用这种无限循环来获取新事件?
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AtMostOnceConsumer ...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, Event> consumer = createConsumer();
// Subscribe to all partition in that topic. 'assign' could be used here
// instead of 'subscribe' to subscribe to specific partition.
consumer.subscribe(Arrays.asList("topic"));
processRecords(consumer);
}
private static KafkaConsumer<String, Event> createConsumer() {
Properties props = new Properties();
String consumeGroup = "group_id";
props.put("group.id", consumeGroup);
props.put("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
props.put("client.id", "clientId");
props.put("security.protocol", "SASL_SSL");
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "servers");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + "username" + " password="" + "password";");
props.put("enable.auto.commit", "true");
// Auto commit interval, kafka would commit offset at this interval.
props.put("auto.commit.interval.ms", "101");
// This is how to control number of records being read in each poll
props.put("max.partition.fetch.bytes", "135");
// Set this if you want to always read from beginning.
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
}
有人可以帮我修改这个,以便我可以避免while(true)循环并且可以只监听我传入的事件吗?
至尊宝的传说
宝慕林4294392
慕村225694
相关分类