一.简介
GraphX中的核心聚合操作为aggregateMessages。该运算符将用户定义的sendMsg函数应用于图形中的每个边三元组,然后使用该mergeMsg函数在其目标顶点处聚合这些消息。
class Graph[VD
, ED
] {
def aggregateMessages
[Msg
: ClassTag
](
sendMsg
: EdgeContext
[VD
, ED
, Msg
] => Unit
,
mergeMsg
: (Msg
, Msg
) => Msg
,
tripletFields
: TripletFields
= TripletFields
.All
)
: VertexRDD
[Msg
]
}
用户定义的sendMsg函数采用EdgeContext,将公开源和目标属性以及边属性和函数(sendToSrc和sendToDst),以将消息发送到源和目标节点。sendMsg可以认为是 map-reduce中的map函数。用户定义的mergeMsg函数接受两条发往同一顶点的消息,并产生一条消息。可以认为是map-reduce中的reduce函数。Graph的 aggregateMessages操作返回一个VertexRDD[Msg] ,包含发往每个顶点的聚合消息(类型的Msg)。未收到消息的顶点不包含在返回的VertexRDD中。
另外,aggregateMessages采用一个可选参数 tripletsFields,该参数指示访问哪些数据EdgeContext (即,源顶点属性,而不是目标顶点属性)。Graph的可能选项在tripletsFields中定义,TripletFields默认值为TripletFields.All,指示用户定义的sendMsg函数可以访问任何顶点。该tripletFields参数可用于限制GraphX仅访问部分顶点, EdgeContext允许GraphX选择优化的联接策略。例如,如果我们正在计算每个用户的关注者的平均年龄,则仅需要源字段,因此我们可以TripletFields.Src用来表明我们仅需要源字段。
二.代码实战
package spark2
.graphx
import org
.apache
.log4j
.{Level
, Logger
}
import org
.apache
.spark
.graphx
._
import org
.apache
.spark
.rdd
.RDD
import org
.apache
.spark
.sql
.SparkSession
object AggregateMessagesExample
{
Logger
.getLogger("org").setLevel(Level
.WARN
)
def
main(args
: Array
[String
]): Unit
= {
val spark
= SparkSession
.builder
.appName(s
"${this.getClass.getSimpleName}")
.master("local[2]")
.getOrCreate()
val sc
= spark
.sparkContext
val vertexLines
: RDD
[String
] = sc
.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\vertices.txt")
val vertices
: RDD
[(VertexId
, Double
)] = vertexLines
.map(line
=> {
val cols
= line
.split(",")
(cols(0).toLong
, cols(1).toDouble
)
})
val edgeLines
: RDD
[String
] = sc
.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\edges.txt")
val edges
:RDD
[Edge
[Int
]] = edgeLines
.map(line
=> {
val cols
= line
.split(",")
Edge(cols(0).toInt
, cols(1).toInt
, cols(2).toInt
)
})
val graph
:Graph
[Double
, Int
] = Graph(vertices
, edges
)
graph
.triplets
.collect
.foreach(println
)
val olderFollowers
: VertexRDD
[(Int
, Double
)] = graph
.aggregateMessages
[(Int
, Double
)](
triplet
=> {
triplet
.sendToDst((1, triplet
.srcAttr
))
},
(a
, b
) => (a
._1
+ b
._1
, a
._2
+ b
._2
)
)
olderFollowers
.collect
.foreach(println
)
val avgAgeOfOlderFollowers
: VertexRDD
[Double
] =
olderFollowers
.mapValues( (id
, value
) =>
value match
{case (count
, totalAge
) => totalAge
/ count
})
avgAgeOfOlderFollowers
.collect
.foreach(println
)
spark
.stop()
}
}
备注:详细注释参考代码中的备注!
三.执行结果
节点和边的属性信息: 相同源顶点的计算和属性累计值: 属性均值: