猿问

使用 Java 进行 Apache 闪光灯 CEP 模式检测

我想做;

从映射结构中包含的任何数组列表元素的 CEP 开始,并继续使用我已启动的其余数组列表元素。

映射和图案结构:


final Map< Integer,ArrayList<String>> deger = new HashMap<Integer,ArrayList<String>>();

        deger.put(1,new ArrayList<String>(Arrays.asList("h:1","l:1","g:0")));

        deger.put(2,new ArrayList<String>(Arrays.asList("h:1","l:1","g:1")));

        deger.put(3,new ArrayList<String>(Arrays.asList("h:2","l:3","g:1")));

        deger.put(4,new ArrayList<String>(Arrays.asList("h:0","l:2","g:2")));


 for(int i=1;i<deger.size()+1;i++) {

            temp1.add(deger.get(i));

        }


Pattern<String,?> pattern = Pattern.<String>begin("start").where(

                new SimpleCondition<String>() {

//                    @Override

                    public boolean filter(String value) throws Exception {


                        for (ArrayList<String> aa: temp1){

                            for (String dd : aa)

                                if(value.equals(dd)){ 

                                    return true;

                                }

                        }

                        return false;

                    }

                }

        ).followedBy("middle").where(

                new SimpleCondition<String>() {

                    @Override

                    public boolean filter(String value) throws Exception {

                        return value.equals(temp1.get(1));

                    }

                }

        ).followedBy("end").where(

                new SimpleCondition<String>() {

                    @Override

                    public boolean filter(String value) throws Exception {

                        return value.equals(temp1.get(2));

                    }

                }

        );

我的目的是在映射中使用数组列表元素发出警告,但是数组列表元素的顺序并不重要,因为其中的流流。我想继续处理此数组的其余元素,当我从此处的任何数组开始时,我可以返回此数组的信息。例如:


Incoming data = "l:1","h:1","g:0"

my pattern = "h:1","l:1","g:0" 

Start -> l:1 find

Middle -> g:0 or h:1 | h:1 find

End -> g:0 find -> alarm


qq_遁去的一_1
浏览 114回答 2
2回答

慕妹3242003

&nbsp;public static&nbsp; Integer temp1;&nbsp; &nbsp; public static&nbsp; Map<Integer,ArrayList<String>> temp2 = new HashMap<>();&nbsp; &nbsp; &nbsp;final Map< Integer,ArrayList<String>> deger = new HashMap<>();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; deger.put(1,new ArrayList<>(Arrays.asList("h:1","g:1","s:0")));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; deger.put(2,new ArrayList<>(Arrays.asList("h:1","g:1","g:0")));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; deger.put(3,new ArrayList<>(Arrays.asList("h:1","c:0","g:0")));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; deger.put(4,new ArrayList<>(Arrays.asList("h:1","s:1","g:0")));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Pattern<String,?> pattern = Pattern.<String>begin("start").where(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new SimpleCondition<String>() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public boolean filter(String value) throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flag = false;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for(Map.Entry<Integer, ArrayList<String>> entryStart : deger.entrySet()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(entryStart.getValue().contains(value) && !temp2.containsKey(entryStart.getKey())){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ArrayList<String> newList = new ArrayList<String>();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; newList.addAll(entryStart.getValue());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; newList.remove(value);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; temp2.put(entryStart.getKey(),newList);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flag = true;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return flag;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ).followedBy("middle").where(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new SimpleCondition<String>() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public boolean filter(String middle) throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flag = false;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for(Map.Entry<Integer, ArrayList<String>> entryMiddle : temp2.entrySet()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(entryMiddle.getValue().contains(middle) && entryMiddle.getValue().size() == 2){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ArrayList<String> newListMiddle = new ArrayList<String>();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; newListMiddle.addAll(entryMiddle.getValue());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; newListMiddle.remove(middle);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; temp2.put(entryMiddle.getKey(),newListMiddle);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flag = true;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return flag;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ).followedBy("end").where(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new SimpleCondition<String>() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public boolean filter(String end) throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flag = false;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for(Map.Entry<Integer, ArrayList<String>> entryEnd : temp2.entrySet()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(entryEnd.getValue().contains(end) && entryEnd.getValue().size() == 1){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flag = true;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; temp1 = entryEnd.getKey();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (flag)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; temp2.remove(temp1);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return flag;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; PatternStream<String> patternStream = CEP.pattern(stream_itemset_ham,pattern);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; DataStream<String> result = patternStream.select(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new PatternSelectFunction<String, String>() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String select(Map<String, List<String>> map) throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ArrayList<String> NewList= new ArrayList<>();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; NewList.addAll(deger.get(temp1));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String found = "Found";&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for (String list_element : NewList)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; found += " " + list_element ;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return found;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result.print();我从你的问题中了解到,可以提供这种解决方案。

慕少森

所以目前AFAIK Flink不支持开箱即用的无序模式,所以基本上我看到了两种解决这个问题的方法:1)您可以创建要搜索的所有可能的模式,并简单地合并所有生成的数据流。2)正如这篇文章所建议的FlinkCEP:我可以引用一个较早的事件来定义后续匹配吗?您可以尝试使用它来允许您访问已经匹配的先前元素,因此基本上您必须定义与列表中所有可能的元素匹配的模式,然后只需检查最后一个条件,如果所有三个元素都属于同一列表。如果是这样,则找到该模式。IterativeCondition
随时随地看视频慕课网APP

相关分类

Java
我要回答