1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package com.test.flink.stream.window
import java.util.Properties
import com.test.flink.stream.hive.JsonDeserializationSchema import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.apache.kafka.clients.consumer.ConsumerConfig
import scala.collection.convert.WrapAsJava._ import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector
object SideOutputExample { def main(args: Array[String]): Unit = { val properties = new Properties() properties.setProperty("bootstrap.servers", "cdh04:9092") properties.setProperty("group.id", "test") properties.setProperty("auto.offset.reset", "latest") properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") val consumer010 = new FlinkKafkaConsumer010[String]( List("test"), new SimpleStringSchema(), properties ).setStartFromLatest() System.setProperty("HADOOP_USER_NAME", "hdfs") val senv = StreamExecutionEnvironment.getExecutionEnvironment
val delayOutputTag = OutputTag[String]("delay-side-output") val ds = senv.addSource(consumer010).map(x => { val arr = x.split(",") Demo(arr(0), arr(1).toLong) }).process(new ProcessFunction[Demo, Demo] { override def processElement(value: Demo, ctx: ProcessFunction[Demo, Demo]#Context, out: Collector[Demo]): Unit = { if (value.delayTime < 100) { out.collect(value) } else { ctx.output(delayOutputTag, s"数据 ${value.toString} 迟到了 :" + value.delayTime + "秒") } } })
ds.print()
ds.getSideOutput(delayOutputTag).print()
senv.execute("Side Outputs Test") }
case class Demo(id: String, time: Long) { val delayTime: Long = System.currentTimeMillis() / 1000 - time } }
|