猿问

Spring 启动 kafka 消息传递。如何简化处理程序的 dto 映射?

我已经使用 Kafka 配置了我的 Spring Boot 项目。我可以接收和发布任何基于字符串的消息。


字符串消息不是处理的最佳方式。具有将消息从字符串默认转换为对象的功能会更有用。


实现这个功能我需要将几乎所有的 Kafka 配置从yml到java(使用属性)。...生产者示例


@Bean

public Map<String, Object> producerConfigs() {

    Map<String, Object> props = new HashMap<>();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AccountSerializer.class);

    return props;

}


@Bean

public ProducerFactory<String, Account> producerFactory() {

    return new DefaultKafkaProducerFactory<>(producerConfigs());

}


@Bean

public KafkaTemplate<String, Account> kafkaTemplate() {

    return new KafkaTemplate<>(producerFactory());

}

该代码有效,但我接受了简化。在最好的情况下,我想优雅地配置yml,可能是一些 java 更改。但是以直接方式进行操作时,我将获得额外的每 3 个 bean 来配置每个kafkaTemplate和listenerFactory.


它是否可能简化未来的配置(我需要更多额外的Serializer“解串器”)?如何?


GCT1015
浏览 197回答 2
2回答

临摹微笑

似乎我没有任何机会为相同的侦听器配置不同SERIALIZER| DESERIALIZERs。但是 id 并不意味着我的问题没有解决方案。我对所有对象都使用了继承,并提供了一个抽象AbstractEvent。AbstractEvent一般没用,但它在我的解决方案中使用,如指定的输入点SERIALIZER| DESERIALIZER. 为了获取上下文中哪个对象的信息,我使用了自定义标题。org.apache.kafka.common.serialization.Deserializer没有标题参数,但我已经实现了我的DESERIALIZER基于ExtendedDeserializer. 这种方式让我可以访问标题via public T deserialize(String topic, Headers headers, byte[] data)我的解串器示例:@Slf4jpublic class AbstractEventDeserializer<T extends AbstractEvent> implements ExtendedDeserializer<T> {&nbsp; &nbsp; private Map<String, Class<T>> mappers = new HashMap<>();&nbsp; &nbsp; // default behavior&nbsp; &nbsp; @Override&nbsp; &nbsp; public T deserialize(String arg0, byte[] devBytes) {&nbsp; &nbsp; &nbsp; &nbsp; ObjectMapper mapper = new ObjectMapper();&nbsp; &nbsp; &nbsp; &nbsp; T bar = null;&nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; bar = (T) mapper.readValue(devBytes, Bar.class);&nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return bar;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void close() {&nbsp; &nbsp; &nbsp; &nbsp; // TODO Auto-generated method stub&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public T deserialize(String topic, Headers headers, byte[] data) {&nbsp; &nbsp; &nbsp; &nbsp; log.info("handling...");&nbsp; &nbsp; &nbsp; &nbsp; headers.forEach(header -> log.info("&nbsp; &nbsp;{}: {}", header.key(), getHeaderValueAsString(header)));&nbsp; &nbsp; &nbsp; &nbsp; Optional<String> classTypeFromHeader = getClassTypeFromHeader(headers);&nbsp; &nbsp; &nbsp; &nbsp; if (classTypeFromHeader.isPresent()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return parseFromJson(data, mappers.get(classTypeFromHeader.get()));&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return deserialize(topic, data);&nbsp; &nbsp; }&nbsp; &nbsp; private Optional<String> getClassTypeFromHeader(Headers headers) {&nbsp; &nbsp; &nbsp; &nbsp; return StreamSupport.stream(headers.headers("X-CLASS-TYPE").spliterator(), false)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(Header::value)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(String::new)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .findFirst();&nbsp; &nbsp; }&nbsp; &nbsp; private String getHeaderValueAsString(Header header) {&nbsp; &nbsp; &nbsp; &nbsp; return Optional.ofNullable(header.value())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(String::new)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .orElse(null);&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void configure(Map<String, ?> arg0, boolean arg1) {&nbsp; &nbsp; &nbsp; &nbsp; log.info("configuring deserialiser");&nbsp; &nbsp; &nbsp; &nbsp; if (arg0.containsKey("mappers")) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.mappers = (Map<String, Class<T>>) arg0.get("mappers");&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; arg0.keySet().forEach(key -> log.info("&nbsp; &nbsp;{}:{}", key, arg0.get(key)));&nbsp; &nbsp; }}如果您想尝试工作解决方案,请查看实验示例。

倚天杖

Spring 云服务可以为消费者提供更好的配置、并发、反序列化和更少的样板代码。&nbsp; &nbsp;<dependency>&nbsp; &nbsp; &nbsp; &nbsp; <groupId>org.springframework.cloud</groupId>&nbsp; &nbsp; &nbsp; &nbsp; <artifactId>spring-cloud-starter-stream-kafka</artifactId>&nbsp; &nbsp; </dependency>水槽样品@SpringBootApplication@EnableBinding(Sink.class)public class LoggingConsumerApplication {public static void main(String[] args) {&nbsp; &nbsp; SpringApplication.run(LoggingConsumerApplication.class, args);}@StreamListener(Sink.INPUT)public void handle(Person person) {&nbsp; &nbsp; System.out.println("Received: " + person);}public static class Person {&nbsp; &nbsp; private String name;&nbsp; &nbsp; public String getName() {&nbsp; &nbsp; &nbsp; &nbsp; return name;&nbsp; &nbsp; }&nbsp; &nbsp; public void setName(String name) {&nbsp; &nbsp; &nbsp; &nbsp; this.name = name;&nbsp; &nbsp; }&nbsp; &nbsp; public String toString() {&nbsp; &nbsp; &nbsp; &nbsp; return this.name;&nbsp; &nbsp; }}}示例配置:spring:&nbsp; cloud:&nbsp; &nbsp; stream:&nbsp; &nbsp; &nbsp; bindings:&nbsp; &nbsp; &nbsp; &nbsp; input:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; destination: <your topic>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; group: <your consumer group>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumer:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; headerMode: raw&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; partitioned: true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; concurrency: 10&nbsp; &nbsp; &nbsp; kafka:&nbsp; &nbsp; &nbsp; &nbsp; binder:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; brokers: <Comma seperated list of kafka brokers>此处提供更多信息https://cloud.spring.io/spring-cloud-stream/
随时随地看视频慕课网APP

相关分类

Java
我要回答