我们在我们的项目中使用 Spring Kafka 2.1.4.RELEASE 版本,我们有以下配置:
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Configuration
class ProducerConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ASerializer.class);
return props;
}
@Bean
public ProducerFactory<String, A> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, A> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Configuration
class ConsumerConfig {
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Bean
public Map<String, Object> firstConsumerConfig() {
Map<String, Object> props = getCommonConsumerConfig();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ADeserializer.class);
return props;
}
所以我们在启动这个应用程序时注意到它并不是一直连接到这两个主题。有时它仅连接到第二个主题或仅连接到第一个主题,并且可能连接到第一个和第二个主题(这是正确的)。那么你能帮助理解这里配置错误吗?
繁花如伊
互换的青春
相关分类