使用 Avro Schema 注册表的 Kafka 消费者单元测试失败

我正在编写一个消费者,它会收听 Kafka 主题并在消息可用时使用消息。我通过在本地运行 Kafka 测试了逻辑/代码,它运行良好。


在编写单元/组件测试用例时,它因 avro 架构注册表 url 错误而失败。我尝试了互联网上可用的不同选项,但找不到任何有效的方法。我不确定我的方法是否正确。请帮忙。


监听类


@KafkaListener(topics = "positionmgmt.v1", containerFactory = "genericKafkaListenerFactory")

    public void receive(ConsumerRecord<String, GenericRecord> consumerRecord) {

        try {

            GenericRecord generic = consumerRecord.value();

            Object obj = generic.get("metadata");


            ObjectMapper mapper = new ObjectMapper();


            Header headerMetaData = mapper.readValue(obj.toString(), Header.class);


            System.out.println("Received payload :   " + consumerRecord.value());


            //Call backend with details in GenericRecord 


        }catch (Exception e){

            System.out.println("Exception while reading message from Kafka " + e );

        }

卡夫卡配置


@Bean

    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> genericKafkaListenerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(genericConsumerFactory());

        return factory;

    }


public ConsumerFactory<String, GenericRecord> genericConsumerFactory() {

        Map<String, Object> config = new HashMap<>();


        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");

        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

        config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");

        return new DefaultKafkaConsumerFactory<>(config);

    }


红糖糍粑
浏览 198回答 4
4回答

泛舟湖上清波郎朗

我稍微调查了一下,发现问题出在 KafkaAvroSerializer/Deserializer 使用的 CashedSchemaRegistryClient 中。它用于从 Confluent Schema Registry 中获取模式定义。您已经在本地拥有架构定义,因此您无需为它们转到架构注册表。(至少在你的测试中)我有一个类似的问题,我通过创建自定义 KafkaAvroSerializer/KafkaAvroDeserializer 解决了它。这是 KafkaAvroSerializer 的示例。这很简单。您只需要扩展提供的 KafkaAvroSerializer 并告诉他使用 MockSchemaRegistryClient。public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {    public CustomKafkaAvroSerializer() {        super();        super.schemaRegistry = new MockSchemaRegistryClient();    }    public CustomKafkaAvroSerializer(SchemaRegistryClient client) {        super(new MockSchemaRegistryClient());    }    public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {        super(new MockSchemaRegistryClient(), props);    }}这是 KafkaAvroDeserializer 的示例。当调用反序列化方法时,您需要告诉他要使用哪个模式。public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {    @Override    public Object deserialize(String topic, byte[] bytes) {        this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);          return super.deserialize(topic, bytes);    }    private static SchemaRegistryClient getMockClient(final Schema schema$) {        return new MockSchemaRegistryClient() {            @Override            public synchronized Schema getById(int id) {                return schema$;            }        };    }}最后一步是告诉 spring 使用创建的序列化器/反序列化器spring.kafka.producer.properties.schema.registry.url= not-usedspring.kafka.producer.value-serializer = CustomKafkaAvroSerializerspring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.group-id = showcase-producer-idspring.kafka.consumer.properties.schema.registry.url= not-usedspring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializerspring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.group-id = showcase-consumer-idspring.kafka.auto.offset.reset = earliestspring.kafka.producer.auto.register.schemas= truespring.kafka.properties.specific.avro.reader= true

慕神8447489

如果你在 3 年后看这个例子,你可能想要对 CustomKafkaAvroDeserializer 做一些小的修改private static SchemaRegistryClient getMockClient(final Schema schema) {&nbsp; &nbsp; &nbsp; &nbsp; return new MockSchemaRegistryClient() {&nbsp; &nbsp; &nbsp;@Override&nbsp; &nbsp; &nbsp;public ParsedSchema getSchemaBySubjectAndId(String subject, int id)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throws IOException, RestClientException {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return new AvroSchema(schema);&nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp;};}

浮云间

如果您的 @KafkaListener 在测试类中,那么您可以在 StringDeserializer 中读取它,然后手动将其转换为所需的类&nbsp; &nbsp; @Autowired&nbsp; &nbsp; private MyKafkaAvroDeserializer myKafkaAvroDeserializer;&nbsp; &nbsp; @KafkaListener( topics = "test")&nbsp; &nbsp; public void inputData(ConsumerRecord<?, ?> consumerRecord) {&nbsp; &nbsp; &nbsp; &nbsp; log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());&nbsp; &nbsp; &nbsp; &nbsp; GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));&nbsp; &nbsp; &nbsp; &nbsp; Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);}@Componentpublic class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {&nbsp; &nbsp; @Override&nbsp; &nbsp; public Object deserialize(String topic, byte[] bytes) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.schemaRegistry = getMockClient(Myclass.SCHEMA$);&nbsp; &nbsp; &nbsp; &nbsp; return super.deserialize(topic, bytes);&nbsp; &nbsp; }&nbsp; &nbsp; private static SchemaRegistryClient getMockClient(final Schema schema$) {&nbsp; &nbsp; &nbsp; &nbsp; return new MockSchemaRegistryClient() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public synchronized org.apache.avro.Schema getById(int id) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return schema$;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; }}记得在 application.yml 中添加 schema registry 和 key/value serializer 虽然不会用到&nbsp; &nbsp; consumer:&nbsp; &nbsp; &nbsp; key-deserializer: org.apache.kafka.common.serialization.StringDeserializer&nbsp; &nbsp; &nbsp; value-deserializer: org.apache.kafka.common.serialization.StringDeserializer&nbsp; &nbsp; properties:&nbsp; &nbsp; &nbsp; schema.registry.url :http://localhost:8080

慕田峪4524236

如错误所述,您需要在生产者配置中向注册表提供一个字符串,而不是一个对象。由于您使用的是 Mock 类,因此该字符串可以是任何东西......但是,您需要在给定注册表实例的情况下构造序列化程序Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);&nbsp;// make config map with ("schema.registry.url", "unused")&nbsp;serializer.configure(config, false);否则,它将尝试创建一个非模拟客户端并将其放入属性中producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java