1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| import java.util
import org.apache.flink.cep.PatternSelectFunction import org.apache.flink.cep.scala.CEP import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._
//登录样例类 case class LoginEvent(userId:Long,ip:String,eventTpye:String,eventTime:Long)
//输出报警信息样例类 case class Warning(userId: Long,firstFailTime:Long,lastFailTime:Long,warningMSG:String)
object LoginFailWithCEP { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1)
//自定义测试数据 val loginStream = env.fromCollection(List( LoginEvent(1,"192.168.0.1","fail",1558430842), LoginEvent(1,"192.168.0.2","success",1558430843), LoginEvent(3,"192.168.0.3","fail",1558430844), LoginEvent(3,"192.168.0.3","fail",1558430847), LoginEvent(3,"192.168.0.3","fail",1558430848), LoginEvent(4,"192.168.0.5","fail",1558430880), LoginEvent(2,"192.168.0.10","success",1558430950) )).assignAscendingTimestamps(_.eventTime*1000)
//定义pattern.对事件流进行模式匹配 val loginFailPattern = Pattern.begin[LoginEvent]("begin") .where(_.eventTpye.equals("fail")) .next("next") .where(_.eventTpye.equals("fail")) // .within(Time.seconds(2))
//在输入流的基础上应用pattern,得到匹配的pattern stream val patternStream = CEP.pattern(loginStream.keyBy(_.userId),loginFailPattern)
val loginFailDataStream = patternStream.select(new MySelectFunction())
//将得到的警告信息流输出sink loginFailDataStream.print("warning")
env.execute("Login Fail Detect with CEP") } }
class MySelectFunction() extends PatternSelectFunction[LoginEvent,Warning]{ override def select(patternEvents: util.Map[String, util.List[LoginEvent]]): Warning = { val firstFailEvent = patternEvents.getOrDefault("begin",null).iterator().next() val secondEvent = patternEvents.getOrDefault("next",null).iterator().next() Warning(firstFailEvent.userId,firstFailEvent.eventTime,secondEvent.eventTime,"login fail warning") } }
|