使用spring kafka模板时如何指定类类型?

我当前的设置具有以下配置:


@Bean

public ConcurrentKafkaListenerContainerFactory<String, String> myKafkaListenerContainerFactory(

        ConsumerFactory<String, String> consumerFactory) {


    ConcurrentKafkaListenerContainerFactory<String, String> factory = new

            ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory);

    factory.setMessageConverter(stringJsonMessageConverter());


    return factory;

}

其中 stringJsonMessageConverter 有


@Bean

public StringJsonMessageConverter stringJsonMessageConverter() {

    return new StringJsonMessageConverter(objectMapper());

}

使用我的对象映射器


@Bean

public ObjectMapper objectMapper() {

    return new ObjectMapper()

            .registerModule(new JavaTimeModule())

            .registerModule(myCustomJacksonModules())

            .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)

            .configure(ACCEPT_SINGLE_VALUE_AS_ARRAY, true)

            .configure(WRITE_DATES_AS_TIMESTAMPS, false);

}

使用此配置,我可以发布为:


...

headers = new MessageHeaders(singletonMap(TOPIC, topic));

Foo foo = ....

Message<?> message = new GenericMessage<>(foo, headers);

kafkaTemplate.send(message);

并消费为:


@KafkaListener(topics = "myTopic",

        groupId = "g1",

        containerFactory = "myKafkaListenerContainerFactory")

public void onMessageReceived(Foo foo) {

    ... works with foo here

}

如果 Foo 是具体类,这在本例中工作正常。但是,如果 Foo 是抽象的,并且作为消息发送的是它的子类


F1 extends Foo; F2 extends Foo ...


... // publisher


headers = new MessageHeaders(singletonMap(TOPIC, topic));

F1 f1 = ....

Message<?> message = new GenericMessage<>(f1, headers);

kafkaTemplate.send(message);


... // Listener


@KafkaListener(topics = "myTopic",

        groupId = "g1",

        containerFactory = "myKafkaListenerContainerFactory")

public void onMessageReceived(Foo foo) {

    ... wont work

}

在我的消费者中将 Foo 声明为类型会失败,但这会起作用:


@KafkaListener(topics = "myTopic",

        groupId = "g1",

        containerFactory = "myKafkaListenerContainerFactory")

public void onMessageReceived(Map<String,Object> fooAsMap) {

    ... this works too

}

有没有办法在我发送消息时指定目标类型?


素胚勾勒不出你
浏览 202回答 1
1回答

有只小跳蛙

配置StringJsonMessageConverter带有自定义DefaultJackson2JavaTypeMapper的地方有setTypePrecedence(TypePrecedence.TYPE_ID)。还要根据发送类型 id 标头设置类型映射以映射到所需的类(仅当生产者的类与消费者的类不同时才需要)。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java