优雅停止SparkStreaming

SparkStreaming从Kafka中读取数据,并使用Redis进行Offset保存,同时监听Redis中的Key来确定是否停止程序。

监听Redis中的Key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 优雅的停止Streaming程序
*
* @param ssc
*/
def stopByMarkKey(ssc: StreamingContext): Unit = {
val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
var isStop = false
while (!isStop) {
isStop = ssc.awaitTerminationOrTimeout(intervalMills)
if (!isStop && isExists(STOP_FLAG)) {
LOG.warn("2秒后开始关闭sparstreaming程序.....")
Thread.sleep(2000)
ssc.stop(true, true)
}
}
}

/**
* 判断Key是否存在
*
* @param key
* @return
*/
def isExists(key: String): Boolean = {
val jedis = InternalRedisClient.getPool.getResource
val flag = jedis.exists(key)
jedis.close()
flag
}

主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.dev.stream

import com.dev.scala.ETLStreaming.LOG
import com.dev.scala.util.InternalRedisClient
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.slf4j.LoggerFactory

/**
*
*/
object KafkaRedisStreaming {
private val LOG = LoggerFactory.getLogger("KafkaRedisStreaming")

private val STOP_FLAG = "TEST_STOP_FLAG"

def initRedisPool() = {
// Redis configurations
val maxTotal = 20
val maxIdle = 10
val minIdle = 1
val redisHost = "47.98.119.122"
val redisPort = 6379
val redisTimeout = 30000
InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
}

/**
* 从redis里获取Topic的offset值
*
* @param topicName
* @param partitions
* @return
*/
def getLastCommittedOffsets(topicName: String, partitions: Int): Map[TopicPartition, Long] = {
if (LOG.isInfoEnabled())
LOG.info("||--Topic:{},getLastCommittedOffsets from Redis--||", topicName)

//从Redis获取上一次存的Offset
val jedis = InternalRedisClient.getPool.getResource
val fromOffsets = collection.mutable.HashMap.empty[TopicPartition, Long]
for (partition <- 0 to partitions - 1) {
val topic_partition_key = topicName + "_" + partition
val lastSavedOffset = jedis.get(topic_partition_key)
val lastOffset = if (lastSavedOffset == null) 0L else lastSavedOffset.toLong
fromOffsets += (new TopicPartition(topicName, partition) -> lastOffset)
}
jedis.close()

fromOffsets.toMap
}

def main(args: Array[String]): Unit = {
//初始化Redis Pool
initRedisPool()

val conf = new SparkConf()
.setAppName("ScalaKafkaStream")
.setMaster("local[2]")

val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

val ssc = new StreamingContext(sc, Seconds(3))

val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
val groupId = "kafka-test-group"
val topicName = "Test"
val maxPoll = 20000

val kafkaParams = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)

// 这里指定Topic的Partition的总数
val fromOffsets = getLastCommittedOffsets(topicName, 3)

// 初始化KafkaDS
val kafkaTopicDS =
KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets))

kafkaTopicDS.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// 如果rdd有数据
if (!rdd.isEmpty()) {
val jedis = InternalRedisClient.getPool.getResource
val p = jedis.pipelined()
p.multi() //开启事务

// 处理数据
rdd
.map(_.value)
.flatMap(_.split(" "))
.map(x => (x, 1L))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.foreach(println)

//更新Offset
offsetRanges.foreach { offsetRange =>
println("partition : " + offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
p.set(topic_partition_key, offsetRange.untilOffset + "")
}

p.exec() //提交事务
p.sync //关闭pipeline
jedis.close()
}
})

ssc.start()

// 优雅停止
stopByMarkKey(ssc)

ssc.awaitTermination()
}
}