猿问

初始化MapState的内容

我实现了一个RichFunction具有以下结构的 Flink:


public class MyFunction extends KeyedBroadcastProcessFunction <String, InputType, BroadcastedStateType, OutputType> {


    private MapState<String, MyState> myState;              


    @Override

    public void open(Configuration conf)throws Exception{

        myState = getRuntimeContext().getMapState(new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)));

    }


    @Override

    public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {

        MyState state = myState.get(value.ID());


        // Do things

    }


    @Override

    public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {

        state.put(value.ID(), value.state());   // Update the mapState with value from broadcast

    }


    // retrieve all the state values and put them in the MapState

    private void initialState() throws Exception{

       Map<String, MyState> initialValues = ...;

       this.cameras.putAll(initialValues);

    }

}

该mapState变量存储通过BroadcastedStream. 更新是在processBroadcastElement()函数中完成的。


在作业开始时,我想mapState使用该initialState()函数来初始化。


问题是我无法在函数中使用它open()(请参阅此处原因)


在这种情况下初始化的正确方法是什么mapState?(在所有使用 RichFunctions 的情况下)


qq_遁去的一_1
浏览 87回答 2
2回答

子衿沉夜

您想要实现 org.apache.flink.streaming.api.checkpoint.CheckpointedFunction当您这样做时,您将实现两种方法:@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {&nbsp; &nbsp; // called when it's time to save state&nbsp; &nbsp; myState.clear();&nbsp; &nbsp; &nbsp; &nbsp; // Update myState with current application state&nbsp;}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {&nbsp; &nbsp; // called when things start up, possibly recovering from an error&nbsp; &nbsp; descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));&nbsp; &nbsp; myState = context.getKeyedStateStore().getMapState(descriptor);&nbsp; &nbsp; if (context.isRestored()) {&nbsp; &nbsp; &nbsp; &nbsp; // restore application state from myState&nbsp;&nbsp;&nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp;}您可以在initializeState() 方法而不是open() 中初始化myState 变量。

梵蒂冈之花

我不相信你实际上可以在initializeState()中初始化广播状态。修改广播状态的唯一方法是通过在 processBroadcastElement 方法中获得的读/写上下文。但是你可以做的是在initializeState中使用context.isRestored()来确定KeyedBroadcastProcessFunction是否是第一次初始化,并设置一个瞬态局部变量来记录此信息。然后,第一次调用 processBroadcastElement 方法时,您可以使用此信息来决定在广播状态中存储什么。但您必须在广播流上发送一些内容才能启动此操作。
随时随地看视频慕课网APP

相关分类

Java
我要回答