考虑以下代码:
@Test(singleThreaded = true)
public class KafkaConsumerTest
{
private KafkaTemplate<String, byte[]> template;
private DefaultKafkaConsumerFactory<String, byte[]> consumerFactory;
private static final KafkaEmbedded EMBEDDED_KAFKA;
static {
EMBEDDED_KAFKA = new KafkaEmbedded(1, true, "topic");
try { EMBEDDED_KAFKA.before(); } catch (final Exception e) { e.printStackTrace(); }
}
@BeforeMethod
public void setUp() throws Exception {
final Map<String, Object> senderProps = KafkaTestUtils.senderProps(EMBEDDED_KAFKA.getBrokersAsString());
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
final ProducerFactory<String, byte[]> pf = new DefaultKafkaProducerFactory<>(senderProps);
this.template = new KafkaTemplate<>(pf);
this.template.setDefaultTopic("topic");
final Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sender", "false", EMBEDDED_KAFKA);
this.consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
this.consumerFactory.setValueDeserializer(new ByteArrayDeserializer());
this.consumerFactory.setKeyDeserializer(new StringDeserializer());
}
我正在尝试向 a 发送消息KafkaTemplate并使用Consumer.poll(). 我使用的测试框架是TestNG。
发送作品,我已经验证使用我在网上找到的“常用”代码(在 上注册一个消息侦听器KafkaMessageListenerContainer)。
只是,我从来没有在消费者那里收到过任何东西。我已经针对“真实”的 Kafka 安装尝试了相同的序列 (create Consumer, poll()),并且它有效。
因此,我设置ConsumerFactory? 任何帮助将不胜感激!
相关分类