JpaRepository 未在自定义 RichSinkFunction 中自动装配

我创建了一个自定义 Flink RichSinkFunction 并尝试JpaRepository在这个自定义类中自动装配 a但我不断地得到一个NullPointerException. 如果我在构造函数中自动装配它,我可以看到已找到 JpaRepo - 但是当调用 invoke 方法时,我收到一个NullPointerException.


public interface MessageRepo extends JpaRepository<Message, Long> {

}



@Component

public class MessageSink extends RichSinkFunction<Message> {


    private final transient MessageRepo messageRepo; //if i don't make this transient, i get the error message "The implementation of the RichSinkFunction is not serializable"


    @Autowired

    public MessageSink(MessageRepo messageRepo){

        this.messageRepo = messageRepo;

        messageRepo.save(new Message()); //no issues when i do this

    }


    @Override

    public void invoke(Message message, Context context) {

         // the message is not null

         messageRepo.save(message); // NPE

    }

有没有人遇到过这个问题?看起来MessageSinkinvoke 方法是在单独的线程中调用的,这就是为什么messageRepo总是null? 除了当我有自己的自定义接收器时,我的代码的其他部分能够使用 MessageRepo。




慕标琳琳
浏览 267回答 2
2回答

慕桂英4014372

我认为这里的问题是 flink 在分发给它的工作人员之前需要序列化自定义接收器函数。通过标记 MessageRepo 传输,意味着当工作节点反序列化此函数时该字段将为空。通常,您会在 open 函数中初始化传输依赖项,该函数将在对象反序列化后调用。

慕无忌1623718

我不太清楚原因,但我认为在注入 bean 时,spring boot 会优先考虑您的服务类。当我尝试为我的实体类编写侦听器时,我遇到了类似的问题。我就是这样解决的。创建一个实现 ApplicationContextAware 接口并覆盖 setApplicationContext 方法的组件类。在您的类中有一个名为 getBean 的静态方法,它将在您的第一个请求时自动装配。示例代码 ---@Componentpublic class SpringBeansUtil implements ApplicationContextAware {private static ApplicationContext context;&nbsp; &nbsp; @SuppressWarnings("static-access")&nbsp; &nbsp; @Override&nbsp; &nbsp; public void setApplicationContext(ApplicationContext applicationContext)&nbsp;&nbsp; &nbsp; throws BeansException {&nbsp; &nbsp; this.context = applicationContext;}&nbsp; &nbsp; public static <T> T getBean(Class<T> beanClass) {&nbsp; &nbsp; &nbsp; &nbsp; return context.getBean(beanClass);&nbsp; &nbsp; }}然后在你的代码中获取 bean ------->> ClassName referenceName = (ClassName)SpringBeansUtil.getBean(ClassName.class);
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java