猿问

在 Spring Boot 中控制启用/禁用 Kafka 消费者

我在 Spring Boot 中配置了几个 Kafka 消费者。这就是 kafka.properties 的样子(这里只列出一个消费者的配置):


kafka.topics=

bootstrap.servers=

group.id=

enable.auto.commit=

auto.commit.interval.ms=

session.timeout.ms=

schema.registry.url=

auto.offset.reset=

kafka.enabled=

这是配置:


@Configuration

@PropertySource({"classpath:kafka.properties"})

public class KafkaConsumerConfig {


    @Autowired

    private Environment env;


    @Bean

    public ConsumerFactory<String, String> pindropConsumerFactory() {

        Map<String, Object> dataRiverProps = new HashMap<>();


        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));

        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));

        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));

        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));

        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));


        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));


        return new DefaultKafkaConsumerFactory<>(dataRiverProps);

    }


    @Bean

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(pindropConsumerFactory());

        return factory;

    }

}


有没有办法让我使用道具“kafka.enabled”,这样我就可以控制这个消费者的创建或者消息检索?非常感谢!


慕田峪9158850
浏览 1070回答 2
2回答

手掌心

您可以通过在消费者中使用属性autoStartup (true/false) 来做到这一点,如下所示 -@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",&nbsp; &nbsp; &nbsp; &nbsp; containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")public void consume(String message) {&nbsp; &nbsp; //System.out.println("Consumed message: " + message);}

开心每一天1111

要禁用 Kafka 配置,您可以,例如:用 KafkaConsumerConfig 注释@ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)删除类@Component并将KafkaConsumer其定义为 @Bean in&nbsp;KafkaConsumerConfig。要控制 KafkaConsumer 中的消息检索:只需在 KafkaConsumer 中获取属性值@Value("kafka.enabled") private Boolean enabled;然后在用 . 注释的方法中使用简单的 if&nbsp;@KafkaListener。
随时随地看视频慕课网APP

相关分类

Java
我要回答