一.简介
 
GraphX中的核心聚合操作为aggregateMessages。该运算符将用户定义的sendMsg函数应用于图形中的每个边三元组,然后使用该mergeMsg函数在其目标顶点处聚合这些消息。
 
class Graph[VD
, ED
] {
  def aggregateMessages
[Msg
: ClassTag
](
      sendMsg
: EdgeContext
[VD
, ED
, Msg
] => Unit
,
      mergeMsg
: (Msg
, Msg
) => Msg
,
      tripletFields
: TripletFields 
= TripletFields
.All
)
    : VertexRDD
[Msg
]
}
 
用户定义的sendMsg函数采用EdgeContext,将公开源和目标属性以及边属性和函数(sendToSrc和sendToDst),以将消息发送到源和目标节点。sendMsg可以认为是 map-reduce中的map函数。用户定义的mergeMsg函数接受两条发往同一顶点的消息,并产生一条消息。可以认为是map-reduce中的reduce函数。Graph的 aggregateMessages操作返回一个VertexRDD[Msg] ,包含发往每个顶点的聚合消息(类型的Msg)。未收到消息的顶点不包含在返回的VertexRDD中。
 
另外,aggregateMessages采用一个可选参数 tripletsFields,该参数指示访问哪些数据EdgeContext (即,源顶点属性,而不是目标顶点属性)。Graph的可能选项在tripletsFields中定义,TripletFields默认值为TripletFields.All,指示用户定义的sendMsg函数可以访问任何顶点。该tripletFields参数可用于限制GraphX仅访问部分顶点, EdgeContext允许GraphX选择优化的联接策略。例如,如果我们正在计算每个用户的关注者的平均年龄,则仅需要源字段,因此我们可以TripletFields.Src用来表明我们仅需要源字段。
 
二.代码实战
 
package spark2
.graphx
import org
.apache
.log4j
.{Level
, Logger
}
import org
.apache
.spark
.graphx
._
import org
.apache
.spark
.rdd
.RDD
import org
.apache
.spark
.sql
.SparkSession
object AggregateMessagesExample 
{
  Logger
.getLogger("org").setLevel(Level
.WARN
)
  def 
main(args
: Array
[String
]): Unit 
= {
    val spark 
= SparkSession
      
.builder
      
.appName(s
"${this.getClass.getSimpleName}")
      .master("local[2]")
      .getOrCreate()
    val sc 
= spark
.sparkContext
    
    val vertexLines
: RDD
[String
] = sc
.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\vertices.txt")
    val vertices
: RDD
[(VertexId
, Double
)] = vertexLines
.map(line 
=> {
      val cols 
= line
.split(",")
      (cols(0).toLong
, cols(1).toDouble
)
    })
    
    val edgeLines
: RDD
[String
] = sc
.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\edges.txt")
    val edges
:RDD
[Edge
[Int
]] = edgeLines
.map(line 
=> {
      val cols 
= line
.split(",")
      Edge(cols(0).toInt
, cols(1).toInt
, cols(2).toInt
)
    })
    
    val graph
:Graph
[Double
, Int
] = Graph(vertices
, edges
)
    graph
.triplets
.collect
.foreach(println
)
    
    val olderFollowers
: VertexRDD
[(Int
, Double
)] = graph
.aggregateMessages
[(Int
, Double
)](
      triplet 
=> { 
        
          
          triplet
.sendToDst((1, triplet
.srcAttr
)) 
       
      },
      
      
      (a
, b
) => (a
._1 
+ b
._1
, a
._2 
+ b
._2
) 
    )
    olderFollowers
.collect
.foreach(println
)
    
    val avgAgeOfOlderFollowers
: VertexRDD
[Double
] =
      olderFollowers
.mapValues( (id
, value
) =>
        value match 
{case (count
, totalAge
) => totalAge 
/ count
})
    
    avgAgeOfOlderFollowers
.collect
.foreach(println
)
    spark
.stop()
  }
}
 
备注:详细注释参考代码中的备注!
 
三.执行结果
 
节点和边的属性信息:  相同源顶点的计算和属性累计值:  属性均值: