猿问

如何在 Kafka-Spring 中捕获反序列化错误?

我正在启动一个使用 kafka 消息的应用程序。


为了捕捉反序列化异常,我遵循了关于反序列化错误处理的Spring 文档。我试过 failedDeserializationFunction 方法。


这是我的消费者配置类


@Bean

    public Map<String, Object> consumerConfigs() {

        Map<String, Object> consumerProps = new HashMap<>();

        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);

        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);

        

        /*  Error Handling */

        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);

        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());

        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);


        return consumerProps;

    }


    @Bean

    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),

                new JsonDeserializer<>(NTCMessageBody.class));

    }

    

    @Bean

    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());


        return factory;

    }

这是 BiFunction 提供者


public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {


    @Override

    public NTCMessageBody apply(byte[] t, Headers u) {

        return new NTCBadMessageBody(t);

    }


}


当我仅发送一条有关该主题的损坏消息时,我收到了此错误(循环中):


org.apache.kafka.common.errors.SerializationException:反序列化键/值时出错


我知道 ErrorHandlingDeserializer2 应该委托 NTCBadMessageBody 类型并继续消费。我还看到(在调试模式下)它从未进入 NTCBadMessageBody 类的构造函数中。


白猪掌柜的
浏览 133回答 4
4回答

qq_遁去的一_1

错误处理反序列化器当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,2.2 版本引入了 ErrorHandlingDeserializer。这个反序列化器委托给一个真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则 ErrorHandlingDeserializer 将返回 DeserializationException,其中包含原因和原始字节。使用记录级 MessageListener 时,如果键或值包含 DeserializationException,则使用失败的 ConsumerRecord 调用容器的 ErrorHandler。使用 BatchMessageListener 时,失败的记录与批处理中的剩余记录一起传递给应用程序,因此应用程序侦听器有责任检查特定记录中的键或值是否是 DeserializationException。因此,根据您使用的代码,record-level MessageListener只需添加ErrorHandler到Container处理异常例如,如果您的错误处理程序实现了此接口,您可以相应地调整偏移量。例如,要重置偏移量以重播失败的消息,您可以执行以下操作;但是请注意,这些都是简单的实现,您可能需要更多地检查错误处理程序。@Beanpublic ConsumerAwareListenerErrorHandler listen3ErrorHandler() {return (m, e, c) -> {&nbsp; &nbsp; this.listen3Exception = e;&nbsp; &nbsp; MessageHeaders headers = m.getHeaders();&nbsp; &nbsp; c.seek(new org.apache.kafka.common.TopicPartition(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; headers.get(KafkaHeaders.OFFSET, Long.class));&nbsp; &nbsp; return null;&nbsp; &nbsp;};}或者您可以像本示例中那样进行自定义实现@Beanpublic ConcurrentKafkaListenerContainerFactory<String, GenericRecord>kafkaListenerContainerFactory()&nbsp; {&nbsp; &nbsp; ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; = new ConcurrentKafkaListenerContainerFactory<>();&nbsp; &nbsp; factory.setConsumerFactory(consumerFactory());&nbsp; &nbsp; factory.getContainerProperties().setErrorHandler(new ErrorHandler() {&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String topics = s.split("-")[0];&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int offset = Integer.valueOf(s.split("offset ")[1]);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TopicPartition topicPartition = new TopicPartition(topics, partition);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //log.info("Skipping " + topic + "-" + partition + " offset " + offset);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumer.seek(topicPartition, offset + 1);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("OKKKKK");&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String topics = s.split("-")[0];&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int offset = Integer.valueOf(s.split("offset ")[1]);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TopicPartition topicPartition = new TopicPartition(topics, partition);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //log.info("Skipping " + topic + "-" + partition + " offset " + offset);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumer.seek(topicPartition, offset + 1);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("OKKKKK");&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; });&nbsp; &nbsp; return factory;}

青春有我

使用 ErrorHandlingDeserializer。当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,2.2 版本引入了 ErrorHandlingDeserializer。这个反序列化器委托给一个真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则 ErrorHandlingDeserializer 将返回 DeserializationException,其中包含原因和原始字节。使用记录级 MessageListener 时,如果键或值包含 DeserializationException,则使用失败的 ConsumerRecord 调用容器的 ErrorHandler。使用 BatchMessageListener 时,失败的记录与批处理中的剩余记录一起传递给应用程序,您可以使用 DefaultKafkaConsumerFactory 构造函数,该构造函数采用键和值 Deserializer 对象并连接到配置有适当委托的适当 ErrorHandlingDeserializer。或者,您可以使用 ErrorHandlingDeserializer 使用的使用者配置属性来实例化委托。属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS;属性值可以是类或类名package com.mypackage.app.config;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException;import com.mypacakage.app.model.kafka.message.KafkaEvent;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ListenerExecutionFailedException;import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;import org.springframework.kafka.support.serializer.JsonDeserializer;import org.springframework.retry.policy.SimpleRetryPolicy;import org.springframework.retry.support.RetryTemplate;import lombok.extern.slf4j.Slf4j;@EnableKafka@Configuration@Slf4jpublic class KafkaConsumerConfig {&nbsp; &nbsp; @Value("${kafka.bootstrap-servers}")&nbsp; &nbsp; private String servers;&nbsp; &nbsp; @Value("${listener.group-id}")&nbsp; &nbsp; private String groupId;&nbsp; &nbsp; @Bean&nbsp; &nbsp; public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() {&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();&nbsp; &nbsp; &nbsp; &nbsp; factory.setConsumerFactory(consumerFactory());&nbsp; &nbsp; &nbsp; &nbsp; factory.setRetryTemplate(retryTemplate());&nbsp; &nbsp; &nbsp; &nbsp; factory.setErrorHandler(((exception, data) -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; /*&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* here you can do you custom handling, I am just logging it same as default&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* Error handler does If you just want to log. you need not configure the error&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* handler here. The default handler does it for you. Generally, you will&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* persist the failed records to DB for tracking the failed records.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.error("Error in process with Exception {} and the record is {}", exception, data);&nbsp; &nbsp; &nbsp; &nbsp; }));&nbsp; &nbsp; &nbsp; &nbsp; return factory;&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; public ConsumerFactory<String, KafkaEvent> consumerFactory() {&nbsp; &nbsp; &nbsp; &nbsp; Map<String, Object> config = new HashMap<>();&nbsp; &nbsp; &nbsp; &nbsp; config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);&nbsp; &nbsp; &nbsp; &nbsp; config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);&nbsp; &nbsp; &nbsp; &nbsp; config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);&nbsp; &nbsp; &nbsp; &nbsp; config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);&nbsp; &nbsp; &nbsp; &nbsp; config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);&nbsp; &nbsp; &nbsp; &nbsp; config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());&nbsp; &nbsp; &nbsp; &nbsp; config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "com.mypackage.app.model.kafka.message.KafkaEvent");&nbsp; &nbsp; &nbsp; &nbsp; config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app");&nbsp; &nbsp; &nbsp; &nbsp; return new DefaultKafkaConsumerFactory<>(config);&nbsp; &nbsp; }&nbsp; &nbsp; private RetryTemplate retryTemplate() {&nbsp; &nbsp; &nbsp; &nbsp; RetryTemplate retryTemplate = new RetryTemplate();&nbsp; &nbsp; &nbsp; &nbsp; /*&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* here retry policy is used to set the number of attempts to retry and what&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* exceptions you wanted to try and what you don't want to retry.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/&nbsp; &nbsp; &nbsp; &nbsp; retryTemplate.setRetryPolicy(retryPolicy());&nbsp; &nbsp; &nbsp; &nbsp; return retryTemplate;&nbsp; &nbsp; }&nbsp; &nbsp; private SimpleRetryPolicy retryPolicy() {&nbsp; &nbsp; &nbsp; &nbsp; Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();&nbsp; &nbsp; &nbsp; &nbsp; // the boolean value in the map determines whether exception should be retried&nbsp; &nbsp; &nbsp; &nbsp; exceptionMap.put(IllegalArgumentException.class, false);&nbsp; &nbsp; &nbsp; &nbsp; exceptionMap.put(TimeoutException.class, true);&nbsp; &nbsp; &nbsp; &nbsp; exceptionMap.put(ListenerExecutionFailedException.class, true);&nbsp; &nbsp; &nbsp; &nbsp; return new SimpleRetryPolicy(3, exceptionMap, true);&nbsp; &nbsp; }}

慕桂英546537

如果分区名称具有“-”之类的字符,上述答案可能会出现问题。所以,我用正则表达式修改了相同的逻辑。&nbsp; &nbsp; import java.util.List;&nbsp; &nbsp; import java.util.regex.Matcher;&nbsp; &nbsp; import java.util.regex.Pattern;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; import org.apache.kafka.clients.consumer.Consumer;&nbsp; &nbsp; import org.apache.kafka.clients.consumer.ConsumerRecord;&nbsp; &nbsp; import org.apache.kafka.common.TopicPartition;&nbsp; &nbsp; import org.apache.kafka.common.errors.SerializationException;&nbsp; &nbsp; import org.springframework.kafka.listener.ErrorHandler;&nbsp; &nbsp; import org.springframework.kafka.listener.MessageListenerContainer;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; import lombok.extern.slf4j.Slf4j;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; @Slf4j&nbsp; &nbsp; public class KafkaErrHandler implements ErrorHandler {&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; /**&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* Method prevents serialization error freeze&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* @param e&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* @param consumer&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/&nbsp; &nbsp; &nbsp; &nbsp; private void seekSerializeException(Exception e, Consumer<?, ?> consumer) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String p = ".*partition (.*) at offset ([0-9]*).*";&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Pattern r = Pattern.compile(p);&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Matcher m = r.matcher(e.getMessage());&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (m.find()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int idx = m.group(1).lastIndexOf("-");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String topics = m.group(1).substring(0, idx);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int partition = Integer.parseInt(m.group(1).substring(idx));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int offset = Integer.parseInt(m.group(2));&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TopicPartition topicPartition = new TopicPartition(topics, partition);&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumer.seek(topicPartition, (offset + 1));&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.info("Skipped message with offset {} from partition {}", offset, partition);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void handle(Exception e, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.error("Error in process with Exception {} and the record is {}", e, record);&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (e instanceof SerializationException)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; seekSerializeException(e, consumer);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MessageListenerContainer container) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.error("Error in process with Exception {} and the records are {}", e, records);&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (e instanceof SerializationException)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; seekSerializeException(e, consumer);&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void handle(Exception e, ConsumerRecord<?, ?> record) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.error("Error in process with Exception {} and the record is {}", e, record);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; }&nbsp;最后在配置中使用错误处理程序。&nbsp;@Beanpublic ConcurrentKafkaListenerContainerFactory<String, GenericType> macdStatusListenerFactory() {&nbsp; &nbsp; ConcurrentKafkaListenerContainerFactory<String, GenericType> factory = new ConcurrentKafkaListenerContainerFactory<>();&nbsp; &nbsp; factory.setConsumerFactory(macdStatusConsumerFactory());&nbsp; &nbsp; factory.setRetryTemplate(retryTemplate());&nbsp; &nbsp; factory.setErrorHandler(new KafkaErrHandler());&nbsp; &nbsp; return factory;}但是不推荐解析错误字符串来获取分区、主题和偏移量。如果有人有更好的解决方案,请在此处发布。

Smart猫小萌

在我的工厂中,我添加了 commonErrorHanderfactory.setCommonErrorHandler(new KafkaMessageErrorHandler());并KafkaMessageErrorHandler创建如下class KafkaMessageErrorHandler implements CommonErrorHandler {&nbsp; &nbsp; @Override&nbsp; &nbsp; public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {&nbsp; &nbsp; &nbsp; &nbsp; manageException(thrownException, consumer);&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {&nbsp; &nbsp; &nbsp; &nbsp; manageException(thrownException, consumer);&nbsp; &nbsp; }&nbsp; &nbsp; private void manageException(Exception ex, Consumer<?, ?> consumer) {&nbsp; &nbsp; &nbsp; &nbsp; log.error("Error polling message: " + ex.getMessage());&nbsp; &nbsp; &nbsp; &nbsp; if (ex instanceof RecordDeserializationException) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; RecordDeserializationException rde = (RecordDeserializationException) ex;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumer.seek(rde.topicPartition(), rde.offset() + 1L);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumer.commitSync();&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.error("Exception not handled");&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}
随时随地看视频慕课网APP

相关分类

Java
我要回答