使用 poll() 的消费者单元测试永远不会收到任何东西

考虑以下代码:


@Test(singleThreaded = true)

public class KafkaConsumerTest

{

  private KafkaTemplate<String, byte[]> template;

  private DefaultKafkaConsumerFactory<String, byte[]> consumerFactory;

  private static final KafkaEmbedded EMBEDDED_KAFKA;

  static {

      EMBEDDED_KAFKA = new KafkaEmbedded(1, true, "topic");

      try { EMBEDDED_KAFKA.before(); } catch (final Exception e) { e.printStackTrace(); }

    }


  @BeforeMethod

  public void setUp() throws Exception {

    final Map<String, Object> senderProps = KafkaTestUtils.senderProps(EMBEDDED_KAFKA.getBrokersAsString());

    senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

    final ProducerFactory<String, byte[]> pf = new DefaultKafkaProducerFactory<>(senderProps);

    this.template = new KafkaTemplate<>(pf);

    this.template.setDefaultTopic("topic");

    final Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sender", "false", EMBEDDED_KAFKA);

    this.consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);

    this.consumerFactory.setValueDeserializer(new ByteArrayDeserializer());

    this.consumerFactory.setKeyDeserializer(new StringDeserializer());

  }


我正在尝试向 a 发送消息KafkaTemplate并使用Consumer.poll(). 我使用的测试框架是TestNG。


发送作品,我已经验证使用我在网上找到的“常用”代码(在 上注册一个消息侦听器KafkaMessageListenerContainer)。


只是,我从来没有在消费者那里收到过任何东西。我已经针对“真实”的 Kafka 安装尝试了相同的序列 (create Consumer, poll()),并且它有效。


因此,我设置ConsumerFactory? 任何帮助将不胜感激!


三国纷争
浏览 172回答 1
1回答
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java