GraphX图计算之aggregateMessages算子案例详解

tech2024-03-25  55

一.简介

GraphX中的核心聚合操作为aggregateMessages。该运算符将用户定义的sendMsg函数应用于图形中的每个边三元组,然后使用该mergeMsg函数在其目标顶点处聚合这些消息。

class Graph[VD, ED] { def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[Msg] }

用户定义的sendMsg函数采用EdgeContext,将公开源和目标属性以及边属性和函数(sendToSrc和sendToDst),以将消息发送到源和目标节点。sendMsg可以认为是 map-reduce中的map函数。用户定义的mergeMsg函数接受两条发往同一顶点的消息,并产生一条消息。可以认为是map-reduce中的reduce函数。Graph的 aggregateMessages操作返回一个VertexRDD[Msg] ,包含发往每个顶点的聚合消息(类型的Msg)。未收到消息的顶点不包含在返回的VertexRDD中。

另外,aggregateMessages采用一个可选参数 tripletsFields,该参数指示访问哪些数据EdgeContext (即,源顶点属性,而不是目标顶点属性)。Graph的可能选项在tripletsFields中定义,TripletFields默认值为TripletFields.All,指示用户定义的sendMsg函数可以访问任何顶点。该tripletFields参数可用于限制GraphX仅访问部分顶点, EdgeContext允许GraphX选择优化的联接策略。例如,如果我们正在计算每个用户的关注者的平均年龄,则仅需要源字段,因此我们可以TripletFields.Src用来表明我们仅需要源字段。

二.代码实战

package spark2.graphx import org.apache.log4j.{Level, Logger} import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object AggregateMessagesExample { Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName(s"${this.getClass.getSimpleName}") .master("local[2]") .getOrCreate() val sc = spark.sparkContext // 加载顶点数据 val vertexLines: RDD[String] = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\vertices.txt") val vertices: RDD[(VertexId, Double)] = vertexLines.map(line => { val cols = line.split(",") (cols(0).toLong, cols(1).toDouble) }) // 加载边数据 val edgeLines: RDD[String] = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\edges.txt") val edges:RDD[Edge[Int]] = edgeLines.map(line => { val cols = line.split(",") Edge(cols(0).toInt, cols(1).toInt, cols(2).toInt) }) // 使用vertices和edges生成图 val graph:Graph[Double, Int] = Graph(vertices, edges) graph.triplets.collect.foreach(println) // 统计相同目标顶点的个数和属性之和 val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)]( triplet => { // 类似旧算子mapReduceTriplets中的map //if (triplet.srcAttr > triplet.dstAttr) { // 项目标顶点发送源顶点信息 triplet.sendToDst((1, triplet.srcAttr)) // 发往目标顶点的源顶点个数,msg // } }, // 计数,求和 // 类似旧算子mapReduceTriplets中的reduce (a, b) => (a._1 + b._1, a._2 + b._2) // 相同目标顶点的源顶点个数之和及属性和 ) olderFollowers.collect.foreach(println) /** * 根据个数和属性之和求属性的平均值 */ val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues( (id, value) => value match {case (count, totalAge) => totalAge / count}) // 打印结果信息 avgAgeOfOlderFollowers.collect.foreach(println) spark.stop() } }

备注:详细注释参考代码中的备注!

三.执行结果

节点和边的属性信息: 相同源顶点的计算和属性累计值: 属性均值:

最新回复(0)