Kafka Java Consumer SDK 长拉而不使用while

我尝试使用 Kafka Java SDK 来实现消费者,但是我看到的大多数消费者示例都使用while(true)循环并在循环调用consume方法中获取消息。


while (true) {

            final ConsumerRecords<Long, String> consumerRecords =

                    consumer.poll(1000);

            if (consumerRecords.count()==0) {

                noRecordsCount++;

                if (noRecordsCount > giveUp) break;

                else continue;

            }

            consumerRecords.forEach(record -> {

                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",

                        record.key(), record.value(),

                        record.partition(), record.offset());

            });

            consumer.commitAsync();

        }

我想知道是否有任何优雅的方法可以在不使用while类似于 RabbitMQ 实现的循环的情况下处理此问题:


Consumer consumer = new DefaultConsumer(channel) {

      @Override

      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)

          throws IOException {

        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");

      }

    };

    channel.basicConsume(QUEUE_NAME, true, consumer);


侃侃无极
浏览 292回答 2
2回答

Qyouu

您可以尝试使用Spring-kafkawhich 带有@KafkaListener注释并制作收听主题的方法,有关更多信息,请点击此处因为在 apache-kafka 中没有优雅的方法来使方法作为主题的侦听器,因为消费者需要在特定时间间隔内轮询记录,需要循环中的代码@KafkaListener(topics = "topicName", group = "foo")public void listen(String message) {System.out.println("Received Messasge in group foo: " + message);}

Cats萌萌

轮询循环是在 Kafka 中消费消息的唯一方式。处理消息的优雅代码应该在循环中。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java