我有一个监听 kafka 主题的消费者应用程序,在异常情况下,它会按预期将记录发送到 dlq(死信队列)。
在抛出异常之前,我在消费者应用程序中设置标头,并且它正在设置,但没有发送到死信主题,死信主题只有内置标头,例如“x-exception-message”、“x-”原始分区'等...而不是我设置的那个。
以下是我的消费者应用程序中的一段代码:
modifiedMessage = MessageBuilder.fromMessage(consumedMessage).setHeader("x-ecode", new Integer(100)).setHeader(BinderHeaders.PARTITION_OVERRIDE,consumedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)).build();
System.out.println("error header:"+modifiedMessage.getHeaders().get("x-ecode",Integer.class)); //100
throw new RuntimeException(modifiedMessage.toString());
注意:我在 spring.cloud.stream.kafka.binder.header=x-ecode 下的 application.yml 中设置 x-code
在上面的代码中,我能够设置标头,并实际验证它已设置但未发送到死信主题。
有效负载已正确发送,我如何将该标头发送到死信主题?我是否需要在 application.yml 中添加任何属性才能使其发送?
米琪卡哇伊
相关分类