猿问

多个消费者如何在spring boot Kafka中收听多个主题?

当有多个消费者时,我无法收听 kafka 主题(我的案例 2 主题)。在下面的示例中,我有 2 个消费者工厂,它们将接收 2 个不同的 JSON 消息(一个是用户类型,另一个是事件类型)。两条消息都发布到不同的主题。在这里,当我尝试从 topic1 访问事件消息时,我无法访问,但我可以访问用户主题消息。


前任:


@Configuration

@EnableKafka

public class KafkaConsumerConfiguration {      

@Autowired

private Environment environment;


@Bean

public ConsumerFactory<String,User> consumerFactory() {

    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));

    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));

    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);


    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),

            new JsonDeserializer<>(User.class));


}

@Bean

public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());

    return factory;

}


@Bean

public ConsumerFactory<String , Event> consumerFactoryEvent(){

    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));

    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("event.consumer.group"));

    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);


    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),

            new JsonDeserializer<>(Event.class));

}



我的需要是首先侦听事件主题并对消息进行一些按摩,然后将其发送到用户主题,我有另一种方法可以侦听用户主题并对该消息执行某些操作。我尝试将不同的选项传递给@KafkaListener 如


@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent")

但它不起作用..我不确定出了什么问题..任何建议都有帮助!


拉丁的传说
浏览 381回答 3
3回答

料青山看我应如是

如果你没有在 bean 中指定名称,那么方法名称将是 bean 名称,添加带有 groupid 的 bean 名称 @KafkaListener@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent", groupId="")@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactory", groupId="")或者指定名称@Bean并将该名称添加到@kafkaListener@Bean(name="kafkaListenerContainerFactoryEvent")public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactoryEvent());return factory;}
随时随地看视频慕课网APP

相关分类

Java
我要回答