猿问

如何测试是否正在调用带有@KafkaListener 的方法

我真的很难编写一个测试来检查当消息发送到它的指定主题时我的 Kafka 消费者是否被正确调用。


我的消费者:


@Service

@Slf4j

@AllArgsConstructor(onConstructor = @__(@Autowired))

public class ProcessingConsumer {


  private AppService appService;


  @KafkaListener(

      topics = "${topic}",

      containerFactory = "processingConsumerContainerFactory")

  public void listen(ConsumerRecord<Key, Value> message, Acknowledgment ack) {

    try {

      appService.processMessage(message);

      ack.acknowledge();

    } catch (Throwable t) {

      log.error("error while processing message!", t);

    }

  }

}

我的消费者配置:


@EnableKafka

@Configuration

public class ProcessingCosumerConfig {


  @Value("${spring.kafka.schema-registry-url}")

  private String schemaRegistryUrl;


  private KafkaProperties props;


  public ProcessingCosumerConfig(KafkaProperties kafkaProperties) {

    this.props = kafkaProperties;

  }


  public Map<String, Object> deserializerConfigs() {

    Map<String, Object> props = new HashMap<>();

    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);


    return props;

  }


  private KafkaAvroDeserializer getKafkaAvroDeserializer(Boolean isKey) {

    KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();

    kafkaAvroDeserializer.configure(deserializerConfigs(), isKey);

    return kafkaAvroDeserializer;

  }


  private DefaultKafkaConsumerFactory consumerFactory() {

    return new DefaultKafkaConsumerFactory<>(

        props.buildConsumerProperties(),

        getKafkaAvroDeserializer(true),

        getKafkaAvroDeserializer(false));

  }



慕森卡
浏览 123回答 1
1回答

人到中年有点甜

将模拟 AppService 注入侦听器并验证其 processMessage() 已被调用。
随时随地看视频慕课网APP

相关分类

Java
我要回答