我创建了一个自定义 Flink RichSinkFunction 并尝试JpaRepository在这个自定义类中自动装配 a但我不断地得到一个NullPointerException. 如果我在构造函数中自动装配它,我可以看到已找到 JpaRepo - 但是当调用 invoke 方法时,我收到一个NullPointerException.
public interface MessageRepo extends JpaRepository<Message, Long> {
}
@Component
public class MessageSink extends RichSinkFunction<Message> {
private final transient MessageRepo messageRepo; //if i don't make this transient, i get the error message "The implementation of the RichSinkFunction is not serializable"
@Autowired
public MessageSink(MessageRepo messageRepo){
this.messageRepo = messageRepo;
messageRepo.save(new Message()); //no issues when i do this
}
@Override
public void invoke(Message message, Context context) {
// the message is not null
messageRepo.save(message); // NPE
}
有没有人遇到过这个问题?看起来MessageSinkinvoke 方法是在单独的线程中调用的,这就是为什么messageRepo总是null? 除了当我有自己的自定义接收器时,我的代码的其他部分能够使用 MessageRepo。
慕桂英4014372
慕无忌1623718
相关分类