我有类似的json,
{
"name":"someone",
"job":"doctor",
"etc":"etc"
}
在每个 json 中,“工作”都有不同的值,比如医生、飞行员、司机、守望者等。我想根据“工作”值分离每个 json,并将其存储在不同的位置,如,/home/doctor等。/home/pilot/home/driver
我已经尝试过 SplitStream 函数来执行此操作,但我必须指定这些值以匹配条件。
public class MyFlinkJob {
private static JsonParser jsonParser = new JsonParser();
private static String key_1 = "doctor";
private static String key_2 = "driver";
private static String key_3 = "pilot";
private static String key_default = "default";
public static void main(String args[]) throws Exception {
Properties prop = new Properties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafka);
props.setProperty("group.id", "myjob");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
DataStream<String> record = env.addSource(myConsumer).rebalance()
SplitStream<String> split = record.split(new OutputSelector<String>() {
@Override
public Iterable<String> select(String val) {
JsonObject json = (JsonObject)jsonParser.parse(val);
String jsonValue = CommonFields.getFieldValue(json, "job");
List<String> output = new ArrayList<String>();
if (key_1.equalsIgnoreCase(jsonValue)) {
} output.add("doctor");
} else if (key_2.equalsIgnoreCase(jsonValue)) {
output.add("driver");
} else if (key_3.equalsIgnoreCase(jsonValue)) {
output.add("pilot");
} else {
output.add("default");
}
return output;
}});
}
假设如果任何其他值出现在“job”中,比如工程师或其他东西,并且我没有在类中指定,那么它会转到默认文件夹有没有办法根据“job”的值自动拆分这些 json 事件而不指定它和创建一个包含值名称的路径,例如 /home/enginerr?
守候你守候我
相关分类