1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| def oneDimKmeans(points:Seq[Double], k:Int, iterNum:Int ) ={ def doKmeans(points:Array[Vector[Double]],initCenters:Array[Vector[Double]])={ var centers = initCenters for(index <- 0 until iterNum){
//这里我们根据聚类中心利用groupBy()进行分组,最后得到的cluster是Map(Vector[Double],Array[Vector[Double]])类型 //cluster共k个元素,Map中key值就是聚类中心,Value就是依赖于这个中心的点集 val cluster = points.groupBy { closestCenter(centers,_) }
//通过Map集合的get()方法取出每一个簇,然后采用匹配方法match()进行求取新的中心,这里再强调一遍,Vector类型是不可更改类型,即数据存入Vector以后就不能改变 //所以需要你人为的定义Vector类型的加减乘除运算 centers = centers.map { oldCenter => cluster.get(oldCenter) match{ case Some(pointsInCluster) => vectorDivide(pointsInCluster.reduceLeft(vectorAdd),pointsInCluster.length) case None => oldCenter } } } centers }
def vectorDis(v1:Vector[Double],v2:Vector[Double]):Double={ var distance = 0d for(i <- v1.indices){ distance += (v1(i)-v2(i))*(v1(i)-v2(i)) } math.sqrt(distance) }
def vectorAdd(v1:Vector[Double],v2:Vector[Double])={ val len=v1.length val av1=v1.toArray val av2=v2.toArray val av3=Array.fill(len)(0.0) var vector = Vector[Double]() for(i<-0 until len){ av3(i)=av1(i)+av2(i) vector ++= Vector(av3(i)) } vector }
def vectorDivide(v1:Vector[Double],num:Int)={ val av1=v1.toArray val len=v1.size val av2=Array.fill(len)(0.0) var vector = Vector[Double]() for(i<-0 until len){ av2(i)=av1(i)/num vector ++= Vector(av2(i)) } vector }
def closestCenter(centers:Array[Vector[Double]],point:Vector[Double]):Vector[Double]={ centers.reduceLeft((a, b) => if (vectorDis(a,point) < vectorDis(b,point)) a else b )
}
val pointArray = points.sorted.map(List(_).toVector).toArray val initCenters =Array.range(1,k+1).toList.map(_*pointArray.length/(k+1)).map(pointArray(_)).toArray val finalCenter =doKmeans(pointArray,initCenters) finalCenter.toList.map(x=>x(0)).sorted.zipWithIndex } sqlContext.udf.register("oneDimKmeans", oneDimKmeans _)
|