在电商领域通常会有这样一种需要,如果客户下单了,但是在10分钟内不付款,应该需要通知客服,再由客服寻问客户为什么还没有付款,从而提高付款效率,我们可以采用Flink - CEP的超时机制来处理。
执行流程
Flink-CEP监控
Usage
导入依赖
compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: "1.6.2"compile group: 'org.apache.flink', name: 'flink-cep-scala_2.11', version: "1.6.2"
使用Iterator模拟用户下单行为
case class OrderEvent( userId: String, `type`: String )class DataSource extends Iterator[OrderEvent] with Serializable { val atomicInteger = new AtomicInteger(0) val orderEventList = List( OrderEvent("1", "create"), OrderEvent("2", "create"), OrderEvent("2", "pay") ) override def hasNext: Boolean = { TimeUnit.SECONDS.sleep(1) true } override def next(): OrderEvent = { orderEventList(atomicInteger.getAndIncrement() % 3) } }
创建定单流
val orderEventStream = env.fromCollection(new DataSource())
创建定单匹配流程
以下表示如果在1秒钟内创建定单并付款则完成购物操作
val orderPayPattern = Pattern.begin[OrderEvent]("begin") .where(_.`type`.equals("create")) .next("next") .where(_.`type`.equals("pay")) .within(Time.seconds(1))
创建侧输出流
val orderTiemoutOutput = OutputTag[OrderEvent]("orderTimeout")
把定单流应用到匹配流程中
val patternStream = CEP.pattern(orderEventStream.keyBy("userId"), orderPayPattern)
将正常定单流与侧超时流分开
val complexResult = patternStream.select(orderTiemoutOutput) { (pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => { val createOrder = pattern.get("begin") OrderEvent("timeout", createOrder.get.iterator.next().userId) } } { pattern: Map[String, Iterable[OrderEvent]] => { val payOrder = pattern.get("next") OrderEvent("success", payOrder.get.iterator.next().userId) } }
将正常定单流与超时定单流打印输出
val timeoutResult = complexResult.getSideOutput(orderTiemoutOutput) complexResult.print() timeoutResult.print() env.execute
也可以自行添加Sink将消息发送到消息队列等
timeoutResult.addSink(new SinkFunction[OrderEvent] { override def invoke(value: OrderEvent, context: SinkFunction.Context[_]): Unit = { //do something or send message } })
完整项目
有Java
与Scala
两个版本
作者:dounine
链接:https://www.jianshu.com/p/4aef81468b6d