如何在反序列化过程中不使用无限循环的情况下编写kafka消费者?

如何在java中编写kafka消费者而不使用无限循环进行轮询?

在处理传入记录函数中编写了 while(true) 循环,其中轮询新事件。如果我在我的项目中使用它,除了这个我无法做任何其他事情。有没有办法避免使用这种无限循环来获取新事件?

 public static void main(String[] str) throws InterruptedException {

    System.out.println("Starting  AtMostOnceConsumer ...");

    execute();

}

private static void execute() throws InterruptedException {

    KafkaConsumer<String, Event> consumer = createConsumer();

    // Subscribe to all partition in that topic. 'assign' could be used here

    // instead of 'subscribe' to subscribe to specific partition.

    consumer.subscribe(Arrays.asList("topic"));

    processRecords(consumer);

}

private static KafkaConsumer<String, Event> createConsumer() {

    Properties props = new Properties();

    String consumeGroup = "group_id";

    props.put("group.id", consumeGroup);

    props.put("org.slf4j.simpleLogger.defaultLogLevel", "INFO");

    props.put("client.id", "clientId");

    props.put("security.protocol", "SASL_SSL");


    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "servers");

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");

    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

    props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + "username" + " password="" + "password";");

    props.put("enable.auto.commit", "true");

    // Auto commit interval, kafka would commit offset at this interval.

    props.put("auto.commit.interval.ms", "101");

    // This is how to control number of records being read in each poll

    props.put("max.partition.fetch.bytes", "135");

    // Set this if you want to always read from beginning.

    // props.put("auto.offset.reset", "earliest");

    props.put("heartbeat.interval.ms", "3000");

    props.put("session.timeout.ms", "6001");

}

有人可以帮我修改这个,以便我可以避免while(true)循环并且可以只监听我传入的事件吗?


慕桂英4014372
浏览 124回答 3
3回答

至尊宝的传说

你可以尝试这样的事情:public class ConsumerDemoWithThread {private Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());private String bootstrapServers = "127.0.0.1:9092";private String groupId = "my-first-application";private String topic = "first-topic";KafkaConsumer consumer = createConsumer(bootstrapServers, groupId, topic);private void pollForRecords() {    ExecutorService executor = Executors.newSingleThreadExecutor();    executor.submit(() -> processRecords());}private KafkaConsumer createConsumer(String bootstrapServers, String groupId, String topic) {    Properties properties = new Properties();    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");    // create consumer    KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);    // subscribe consumer to our topic(s)    consumer.subscribe(Arrays.asList(topic));    return consumer;}private void processRecords() {    try {        while (true) {            ConsumerRecords<String, String> records =                    consumer.poll(Duration.ofMillis(100));            for (ConsumerRecord<String, String> record : records) {                logger.info("Key: " + record.key() + ", Value: " + record.value());                logger.info("Partition: " + record.partition() + ", Offset:" + record.offset());            }        }    } catch (WakeupException e) {        logger.info("Received shutdown signal!");    } finally {        consumer.close();    }}public static void main(String[] args) {    ConsumerDemoWithThread consumerDemoWithThread = new ConsumerDemoWithThread();    consumerDemoWithThread.pollForRecords();}}

宝慕林4294392

您可以使用@KafkaListener,然而,它也会以无限循环进行轮询,因为这就是 Kafka 的设计方式——它不是一个队列,而是一个存储一段时间记录的事件总线。没有通知其消费者的机制。轮询不同的线程并以优雅的方式退出循环。

慕村225694

如果您希望能够在代码中同时执行多项操作,则需要后台线程。为了更轻松地做到这一点,您可以使用更高级别的 Kafka 库,例如 Spring(已回答)、Vert.x或Smallrye这是一个 Vert.x 示例,首先创建一个KafkaConsumer,然后分配处理程序并订阅您的主题consumer.handler(record -> {  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +    ",partition=" + record.partition() + ",offset=" + record.offset());});// subscribe to a single topicconsumer.subscribe("a-single-topic");
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java