Flink和SparkStreaming的区别

一直往前走,但不妨看看曾经与现在不同技术点的闪光点与不足

最开始公司使用的是spark streaming的方式来做实时数仓,根据公司对其他方面的需求对时间上的要求更加苛刻,所以开始研究flink,并在不久之后对spark streaming进行替代;对这两个技术点进行总结区分

编程模型对比

Spark Streaming

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
与kafka的结合主要是两种模型:
1.基于receiver dataStream[kafka高级API的形式]
简单理解是kafka把消息全部封装好,提供给spark去调用,本来kafka的消息分布在不同的partition上面,相当于做了一步数据合并,在发送给spark,效率相对慢一些
接收到的数据存储在executor,会出现数据漏处理或者多处理状况
2.基于direct dataStream[kafka低级API模式]
每次到topic的每个partition依据偏移量进行获取数据,拉取数据以后进行处理,可以实现高可用;效率更快一些,同时偏移量需要自己维护
eg:
val Array(brokers, topics) = args// 创建一个批处理时间是2s的context
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 使用broker和topic创建DirectStream
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print() // 启动流
ssc.start()
ssc.awaitTermination()

一般代码涉及到的内容有:
1.设置批处理时间
2.创建数据流
3.编写transform
4.编写action
5.启动执行

Flink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
flink与kafka是事件驱动,flink内部对poll出来的数据进行了整理,然后逐条emit,形成了时间触发的机制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<KafkaEvent> input = env
.addSource(new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-topic"), new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.setParallelism(1).rebalance()
.keyBy("word")
.map(new RollingAdditionMapper()).setParallelism(0);
input.addSink(new FlinkKafkaProducer010<>(parameterTool.getRequired("output-topic"), new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.10 Example");
flink的一般代码内容:
1.注册数据source
2.编写运行逻辑
3.注册数据sink
4.调用env.execute相比于spark streaming少了设置批处理时间,还有一个显著的区别是flink的所有算子都是lazy,调用env.execute会构建jobGraph;client端负责jobGraph生成并提交它到集群运行;而spark streaming的操作算子分action和transform,其中仅有transform是lazy形式,而且DAG生成,stage划分,任务调度是在driver端进行的

任务调度原理

spark 任务调度

1
2
3
4
5
Spark Streaming 任务如上文提到的是基于微处理的,实际上每个批次都是一个spark core的任务;对于编码完成的spark core任务在生成到最终执行结束主要包括以下几个部分:
1.构建DAG图
2.划分stage
3.生成taskset
4.调度task

flink 任务调度

1
2
对于flink的流任务客户端首先会生成streamGraph,接着生成JobGraph,然后将jobGraph提交给JobManager,由它完成JobGraph到ExecutionGraph的转变,最后由jobManager调度执行
flink的拓扑生成提交执行之后,除非故障,否则拓扑部件执行位置不变,并行度由每一个算子并行度决定;而spark streaming是每个批次都会根据数据本地性和资源情况进行调度,无固定的执行拓扑结构;flink是数据在拓扑结构里流动执行,而spark streaming则是对数据缓存批次并行处理

容错机制及处理语义

spark streaming保证仅一次处理

1
2
3
4
5
6
7
8
9
对于spark streaming任务,可以设置checkpoint,然后假如发生故障并启动,我们可以从上次ck之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义
与kafka结合的direct stream可以手动维护offset到zookeeper,要保证数据恰一次处理语义,结果输出和offset提交必须在一个事务内完成
1.repartition(1),spark Streaming输出的action变成仅一个partition,这样可以利用事务去做
Dstream.foreachRDD(rdd => { rdd.repartition(1).foreachPartition(partition =>{ //开启事务
partition.foreach(each => { //提交数据
}) //提交事务
})})
2.将结果和offset一起提交
结果数据包括offset;这样提交结果和提交offset就是一个操作完成,不会数据丢失,也不会重复处理;故障恢复的时候可以利用上次提交结果带的offset

flink与kafka0.11保证仅一次处理

1
2
3
若要sink支持仅一次语义,必须以事务的方式写数据到kafka,这样当提交事务时两次ck间的所有写入操作作为一个事务被提交;这确保了出现故障或崩溃时这些写入操作能够被回滚
一旦ck开始,flink的jobManager向输入流中写入一个ck barrier,将流中所有消息分割成属于本次ck的消息以及属于下次ck的,barrier也会在操作算子间流转,对于每个operator来说,该barrier会触发operator状态后端为该operator状态打快照;data source保存了kafka的offset,之后把ck barrier传递到后续的operator
当barrier在所有算子中传递一遍,并触发的快照写入完成,预提交阶段完成;所有的触发状态快照都被视为ck的一部分,也可以说ck是整个应用程序的状态快照,包括预提交外部状态,出现故障可以从ck恢复;下一步通知所有的操作算子ck成功;该阶段jobmanager会为每个operator发起ck已完成的回调逻辑

Back pressure

消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少

spark streaming的背压

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
跟kafka结合是存在背压机制,目标是根据当前job的处理情况来调节后续批次的获取kafka消息的条数;为了达到这个目的,spark streaming在原有的架构上加入了一个RateController,利用的算法是PID,需要的反馈数据是任务处理的结束时间,调度时间,处理时间,消息条数,这些数据是通过sparkListener体系获得,然后通过PIDRateEsimator的compute计算得到一个速率,进而可以计算得到一个offset,然后跟限速设置最大消费条数比较得到一个最终要消费的消息最大offset

大概算法实现方式:
def compute(time: Long, // in milliseconds
numElements: Long,
processingDelay: Long, // in milliseconds
schedulingDelay: Long // in milliseconds
): Option[Double] = {
logTrace(s"\ntime = $time, # records = $numElements, " +
s"processing time = $processingDelay, scheduling delay = $schedulingDelay")
this.synchronized {
if (time > latestTime && numElements > 0 && processingDelay > 0) {
val delaySinceUpdate = (time - latestTime).toDouble / 1000
val processingRate = numElements.toDouble / processingDelay * 1000
val error = latestRate - processingRate
val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
// in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate
val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
logTrace(s""" | latestRate = $latestRate, error = $error
| latestError = $latestError, historicalError = $historicalError
| delaySinceUpdate = $delaySinceUpdate, dError = $dError
""".stripMargin)
latestTime = time
if (firstRun) {
latestRate = processingRate
latestError = 0D
firstRun = false
logTrace("First run, rate estimation skipped")
None
} else {
latestRate = newRate
latestError = error
logTrace(s"New rate = $newRate")
Some(newRate)
}
} else {
logTrace("Rate estimation skipped")
None
}
}
}

flink背压

1
2
3
4
5
flink背压是jobmanager针对每一个task每50ms触发100次Thread.getStackTrace()调用,求出阻塞的占比
阻塞占比在web上划分了三个等级
1.ok:0<=Ratio<=0.10 表示状态良好
2.low:0.10<Ratio<=0.5 表示有待观察
3.HIGH: 0.5 <Ratio<=1 表示要处理了