1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
| package com.dev.stream import com.dev.scala.ETLStreaming.LOG import com.dev.scala.util.InternalRedisClient import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext, TaskContext} import org.slf4j.LoggerFactory
object KafkaRedisStreaming { private val LOG = LoggerFactory.getLogger("KafkaRedisStreaming") private val STOP_FLAG = "TEST_STOP_FLAG" def initRedisPool() = { val maxTotal = 20 val maxIdle = 10 val minIdle = 1 val redisHost = "47.98.119.122" val redisPort = 6379 val redisTimeout = 30000 InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle) }
def getLastCommittedOffsets(topicName: String, partitions: Int): Map[TopicPartition, Long] = { if (LOG.isInfoEnabled()) LOG.info("||--Topic:{},getLastCommittedOffsets from Redis--||", topicName) val jedis = InternalRedisClient.getPool.getResource val fromOffsets = collection.mutable.HashMap.empty[TopicPartition, Long] for (partition <- 0 to partitions - 1) { val topic_partition_key = topicName + "_" + partition val lastSavedOffset = jedis.get(topic_partition_key) val lastOffset = if (lastSavedOffset == null) 0L else lastSavedOffset.toLong fromOffsets += (new TopicPartition(topicName, partition) -> lastOffset) } jedis.close() fromOffsets.toMap } def main(args: Array[String]): Unit = { initRedisPool() val conf = new SparkConf() .setAppName("ScalaKafkaStream") .setMaster("local[2]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds(3)) val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092" val groupId = "kafka-test-group" val topicName = "Test" val maxPoll = 20000 val kafkaParams = Map( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] ) val fromOffsets = getLastCommittedOffsets(topicName, 3) val kafkaTopicDS = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)) kafkaTopicDS.foreachRDD(rdd => { val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges if (!rdd.isEmpty()) { val jedis = InternalRedisClient.getPool.getResource val p = jedis.pipelined() p.multi() rdd .map(_.value) .flatMap(_.split(" ")) .map(x => (x, 1L)) .reduceByKey(_ + _) .sortBy(_._2, false) .foreach(println) offsetRanges.foreach { offsetRange => println("partition : " + offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset) val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition p.set(topic_partition_key, offsetRange.untilOffset + "") } p.exec() p.sync jedis.close() } }) ssc.start() stopByMarkKey(ssc) ssc.awaitTermination() } }
|