我在下面的beamSql查询中:
PCollectionTuple query0 = PCollectionTuple.of(
new TupleTag<BeamRecord>("temp2"), temp2).and(new TupleTag<BeamRecord>("temp3"), temp3)"));
PCollection<BeamRecord> rec_3 = query0.apply(
BeamSql.queryMulti("SELECT a.*, \r\n" +
"(case \r\n" +
"when a.grp > 5 then 1 \r\n" +
"when b.grp > 5 then 1 \r\n" +
"else 0 end) as flag \r\n" +
"from temp2 a left join \r\n" +
"temp3 b on a.eventid = b.eventid and b.Weekint = c1(a.Weekint)").withUdf("c1", AddS.class));
在上面的查询中,我在表temp2和temp3之间进行左连接,在ON条件下,我使用名称'AddS'调用UDF。在此UDF AddS中,将Weekint用作BigInt。UDF将Weekint用作输入,并将其转换为日期格式,然后向其添加7,然后将值返回为BigInt。以下是UDF:
public static class AddS implements BeamSqlUdf {
private static final long serialVersionUID = 1L;
public static BigInteger eval(BigInteger input) throws ParseException{
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
String strdate = input.toString();
Date date1 = dateFormat.parse(strdate);
Calendar c = Calendar.getInstance();
c.setTime(date1);
c.add(Calendar.DATE, 7);
String f =c.getTime().toString();
BigInteger x = new BigInteger(f);
return (x);
}
}
我无法弄清楚是什么原因导致此错误,可能是UDF创建不正确或未正确调用?或如果有人可以向我解释此错误的原因。
青春有我
相关分类