自定义 AllWindowFunction 类成员

我有一个自定义AllWindowFunction类,它有一个负责数据库插入的类成员。与数据库的连接是持久的,并在构建过程中打开。


问题是,AllWindowFunction创建/打开连接的实例与应用事件调用的实例不同。解决此问题的方法是静态成员,但我想知道这是否是唯一的解决方法?


示例代码:


public class CustomWindowFunction implements AllWindowFunction<String, String, TimeWindow> {


    private static Connection database;


    CustomWindowFunction() {

        database = new Connection();

    }


    @Override

    public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {

        // process data

        database.save(data);

        out.collect(data.toString());

    }

}

我找不到有关此机制的任何信息,我只知道构造函数中的对象 ID 与调用的对象 ID 不同apply。


慕尼黑8549860
浏览 277回答 2
2回答

函数式编程

这是因为每个函数都必须序列化才能分布在集群的节点上。您可以尝试使用RichAllWindowFunction,即所谓的“Rich”版本,您可以在其中使用open()方法,该方法将在每个并行运算符开始时调用。在这种方法中,您可以创建连接public class CustomWindowFunction implements RichAllWindowFunction<String, String, TimeWindow> {&nbsp; &nbsp; private Connection database;&nbsp; &nbsp; @Override&nbsp; &nbsp; public void open(Configuration parameters) {&nbsp; &nbsp; &nbsp; &nbsp; database = new Connection();&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {&nbsp; &nbsp; &nbsp; &nbsp; // process data&nbsp; &nbsp; &nbsp; &nbsp; database.save(data);&nbsp; &nbsp; &nbsp; &nbsp; out.collect(data.toString());&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java