一直往前走,但不妨看看曾经与现在不同技术点的闪光点与不足
最开始公司使用的是spark streaming的方式来做实时数仓,根据公司对其他方面的需求对时间上的要求更加苛刻,所以开始研究flink,并在不久之后对spark streaming进行替代;对这两个技术点进行总结区分
编程模型对比 Spark Streaming
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 与kafka的结合主要是两种模型: 1.基于receiver dataStream[kafka高级API的形式] 简单理解是kafka把消息全部封装好,提供给spark去调用,本来kafka的消息分布在不同的partition上面,相当于做了一步数据合并,在发送给spark,效率相对慢一些 接收到的数据存储在executor,会出现数据漏处理或者多处理状况 2.基于direct dataStream[kafka低级API模式] 每次到topic的每个partition依据偏移量进行获取数据,拉取数据以后进行处理,可以实现高可用;效率更快一些,同时偏移量需要自己维护 eg: val Array(brokers, topics) = args// 创建一个批处理时间是2s的context val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) // 使用broker和topic创建DirectStream val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Get the lines, split them into words, count the words and print val lines = messages.map(_.value) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // 启动流 ssc.start() ssc.awaitTermination() 一般代码涉及到的内容有: 1.设置批处理时间 2.创建数据流 3.编写transform 4.编写action 5.启动执行
Flink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 flink与kafka是事件驱动,flink内部对poll出来的数据进行了整理,然后逐条emit,形成了时间触发的机制 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // create a checkpoint every 5 seconds env.enableCheckpointing(5000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<KafkaEvent> input = env .addSource(new FlinkKafkaConsumer010<>( parameterTool.getRequired("input-topic"), new KafkaEventSchema(), parameterTool.getProperties()) .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())) .setParallelism(1).rebalance() .keyBy("word") .map(new RollingAdditionMapper()).setParallelism(0); input.addSink(new FlinkKafkaProducer010<>(parameterTool.getRequired("output-topic"), new KafkaEventSchema(), parameterTool.getProperties())); env.execute("Kafka 0.10 Example"); flink的一般代码内容: 1.注册数据source 2.编写运行逻辑 3.注册数据sink 4.调用env.execute相比于spark streaming少了设置批处理时间,还有一个显著的区别是flink的所有算子都是lazy,调用env.execute会构建jobGraph;client端负责jobGraph生成并提交它到集群运行;而spark streaming的操作算子分action和transform,其中仅有transform是lazy形式,而且DAG生成,stage划分,任务调度是在driver端进行的
任务调度原理 spark 任务调度
1 2 3 4 5 Spark Streaming 任务如上文提到的是基于微处理的,实际上每个批次都是一个spark core的任务;对于编码完成的spark core任务在生成到最终执行结束主要包括以下几个部分: 1.构建DAG图 2.划分stage 3.生成taskset 4.调度task
flink 任务调度
1 2 对于flink的流任务客户端首先会生成streamGraph,接着生成JobGraph,然后将jobGraph提交给JobManager,由它完成JobGraph到ExecutionGraph的转变,最后由jobManager调度执行 flink的拓扑生成提交执行之后,除非故障,否则拓扑部件执行位置不变,并行度由每一个算子并行度决定;而spark streaming是每个批次都会根据数据本地性和资源情况进行调度,无固定的执行拓扑结构;flink是数据在拓扑结构里流动执行,而spark streaming则是对数据缓存批次并行处理
容错机制及处理语义 spark streaming保证仅一次处理
1 2 3 4 5 6 7 8 9 对于spark streaming任务,可以设置checkpoint,然后假如发生故障并启动,我们可以从上次ck之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义 与kafka结合的direct stream可以手动维护offset到zookeeper,要保证数据恰一次处理语义,结果输出和offset提交必须在一个事务内完成 1.repartition(1),spark Streaming输出的action变成仅一个partition,这样可以利用事务去做 Dstream.foreachRDD(rdd => { rdd.repartition(1).foreachPartition(partition =>{ //开启事务 partition.foreach(each => { //提交数据 }) //提交事务 })}) 2.将结果和offset一起提交 结果数据包括offset;这样提交结果和提交offset就是一个操作完成,不会数据丢失,也不会重复处理;故障恢复的时候可以利用上次提交结果带的offset
flink与kafka0.11保证仅一次处理
1 2 3 若要sink支持仅一次语义,必须以事务的方式写数据到kafka,这样当提交事务时两次ck间的所有写入操作作为一个事务被提交;这确保了出现故障或崩溃时这些写入操作能够被回滚 一旦ck开始,flink的jobManager向输入流中写入一个ck barrier,将流中所有消息分割成属于本次ck的消息以及属于下次ck的,barrier也会在操作算子间流转,对于每个operator来说,该barrier会触发operator状态后端为该operator状态打快照;data source保存了kafka的offset,之后把ck barrier传递到后续的operator 当barrier在所有算子中传递一遍,并触发的快照写入完成,预提交阶段完成;所有的触发状态快照都被视为ck的一部分,也可以说ck是整个应用程序的状态快照,包括预提交外部状态,出现故障可以从ck恢复;下一步通知所有的操作算子ck成功;该阶段jobmanager会为每个operator发起ck已完成的回调逻辑
Back pressure 消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少
spark streaming的背压
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 跟kafka结合是存在背压机制,目标是根据当前job的处理情况来调节后续批次的获取kafka消息的条数;为了达到这个目的,spark streaming在原有的架构上加入了一个RateController,利用的算法是PID,需要的反馈数据是任务处理的结束时间,调度时间,处理时间,消息条数,这些数据是通过sparkListener体系获得,然后通过PIDRateEsimator的compute计算得到一个速率,进而可以计算得到一个offset,然后跟限速设置最大消费条数比较得到一个最终要消费的消息最大offset 大概算法实现方式: def compute(time: Long, // in milliseconds numElements: Long, processingDelay: Long, // in milliseconds schedulingDelay: Long // in milliseconds ): Option[Double] = { logTrace(s"\ntime = $time, # records = $numElements, " + s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) { val delaySinceUpdate = (time - latestTime).toDouble / 1000 val processingRate = numElements.toDouble / processingDelay * 1000 val error = latestRate - processingRate val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2) val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate - proportional * error - integral * historicalError - derivative * dError).max(minRate) logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin) latestTime = time if (firstRun) { latestRate = processingRate latestError = 0D firstRun = false logTrace("First run, rate estimation skipped") None } else { latestRate = newRate latestError = error logTrace(s"New rate = $newRate") Some(newRate) } } else { logTrace("Rate estimation skipped") None } } }
flink背压
1 2 3 4 5 flink背压是jobmanager针对每一个task每50ms触发100次Thread.getStackTrace()调用,求出阻塞的占比 阻塞占比在web上划分了三个等级 1.ok:0<=Ratio<=0.10 表示状态良好 2.low:0.10<Ratio<=0.5 表示有待观察 3.HIGH: 0.5 <Ratio<=1 表示要处理了