1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
val counts = text .flatMap(_.split(" ")) .map((_, 1)) .groupBy(0) .reduceGroup(new GroupReduceFunction[(String, Int), (String, Int)] { override def reduce(iterable: lang.Iterable[(String, Int)], collector: Collector[(String, Int)]): Unit = { val value = iterable.iterator() var map = Map[String, Int]() while (value.hasNext) { val tuple = value.next() map += (tuple._1 -> (tuple._2 + map.getOrElse(tuple._1, 0))) } map.foreach(x => { collector.collect((x._1, x._2)) }) } })
val counts = text .flatMap(_.split(" ")) .map((_, 1)) .groupBy(0) .reduce((x, y) => (x._1, x._2 + y._2))
case class WordCount(word: String, count: Int) val input = env.fromElements( WordCount("hello", 1), WordCount("world", 2)) input.keyBy("word")
|