我正在尝试为我的 Kafka 使用者编写集成测试。我已经遵循了官方参考文档,但是当我开始测试时,我只看到这个重复的广告无限期:
-2019-04-03 15:47:34.002 WARN 13120 --- [主] 组织.apache.kafka.clients.Network客户端 : [消费者客户端 Id=消费者-1,groupId=我的-组] 无法建立与节点 -1 的连接。经纪人可能不可用。
我做错了什么?
我正在使用 JUnit5、弹簧靴和 .spring-kafkaspring-kafka-test
我有我的班级的注释。@EnableKafka@Configuration
我的测试类是这样的:
@ExtendWith(SpringExtension::class)
@SpringBootTest(classes = [TestKafkaConfig::class])
@DirtiesContext
@EmbeddedKafka(
partitions = 1,
topics = [KafkaIntegrationTest.MY_TOPIC])
class KafkaIntegrationTest {
@Autowired
private lateinit var embeddedKafka: EmbeddedKafkaBroker
@Test
fun test() {
val senderProps = KafkaTestUtils.senderProps(embeddedKafka.brokersAsString)
val template = KafkaTemplate(DefaultKafkaProducerFactory<Int, String>(senderProps))
template.defaultTopic = KafkaIntegrationTest.MY_TOPIC
template.sendDefault("foo")
}
}
我看起来像这样:application.yml
kafka:
consumer:
group-id: my-group
bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
specific.avro.reader: true
我也尝试过设置一个,但我得到完全相同的重复消息。(这就是我尝试设置的方式):MockSchemaRegistryClientMockSchemaRegistryClient
@TestConfiguration
@Import(TestConfig::class)
class TestKafkaConfig {
@Autowired
private lateinit var props: KafkaProperties
@Bean
fun schemaRegistryClient() = MockSchemaRegistryClient()
@Bean
fun kafkaAvroSerializer() = KafkaAvroSerializer(schemaRegistryClient())
@Bean
fun kafkaAvroDeserializer() = KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties())
我做错了什么?请注意,我正在使用融合模式注册表,并尝试从Avro反序列化。
炎炎设计
相关分类