前10热门商品统计,用户针对种类的点击,购买,支付
创建CategorySortKey排序工具类
/*
* Copyright (c) 2018. Atguigu Inc. All Rights Reserved.
*/
package com.obj.session
case class CategorySortKey(val clickCount: Long, val orderCount: Long, val payCount: Long) extends Ordered[CategorySortKey] {
/** Result of comparing `this` with operand `that`.
*
* Implement this method to determine how instances of A will be sorted.
*
* Returns `x` where:
*
* - `x < 0` when `this < that`
*
* - `x == 0` when `this == that`
*
* - `x > 0` when `this > that`
*
*/
override def compare(that: CategorySortKey): Int = {
if (this.clickCount - that.clickCount != 0) {
return (this.clickCount - that.clickCount).toInt
} else if (this.orderCount - that.orderCount != 0) {
return (this.orderCount - that.orderCount).toInt
} else if (this.payCount - that.payCount != 0) {
return (this.payCount - that.payCount).toInt
}
0
}
}
创建Top10Cate Object启动类
package com.obj.session
import com.dou.model.UserVisitAction
import com.dou.util.StringUtils
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.collection.mutable.ArrayBuffer
object Top10Cate {
//前10热门商品统计,用户针对种类的点击,购买,支付
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local")
.appName("session")
.enableHiveSupport() //开启内置hive
.getOrCreate()
val startTimeParam = "2020-08-20 19:34:20"
val endTimeParam = "2020-08-20 19:40:20"
import spark.implicits._
val userVisitRDD = spark
.sql("select * from user_visit_action where action_time > '" +
startTimeParam + "' and action_time < '" + endTimeParam+"'")
.as[UserVisitAction].rdd
//获得类别ID flatMap:返回多个值
val categoryId = userVisitRDD.flatMap(
userInfo => {
val result = ArrayBuffer[(Long, Long)]() //为了方便后期聚合 设置成两个值
//取出所有值组成RDD
if (userInfo.click_category_id != null) {
result += ((userInfo.click_category_id, userInfo.click_category_id))
}
if (userInfo.order_category_ids != null) {
for (order_category <- userInfo.order_category_ids) {
result += ((order_category, order_category))
}
}
//支付id
if (userInfo.pay_category_ids != null) {
for (pay_category <- userInfo.pay_category_ids) {
result += ((pay_category, pay_category))
}
}
result
}
)
//去重
val distinctCategoryId = categoryId.distinct()
//获得点击事件ID
val clickCategoryIds = userVisitRDD.filter(item => item.click_category_id!=null)
.map(item => (item.click_category_id,1L))
//获得点击商品种类ID和对应数量
val clickIdCount = clickCategoryIds.reduceByKey(_ + _)
//获取下单数据
val orderCount = userVisitRDD.filter(item => item.order_category_ids!=null)
//因为订单是数组 使用flatMap循环处理
.flatMap(
item => {
item.order_category_ids
.split(",")
.map(line => (line.toLong,1L))
}
).reduceByKey(_+_)
//支付数量
val payCount = userVisitRDD.filter(item => item.pay_category_ids!=null)
.flatMap(
item => {
item.pay_category_ids.split(",")
.map(line => (line.toLong,1L))
}
)
//结合四个数据集
val chickJoinRDD = distinctCategoryId.leftOuterJoin(clickIdCount)
.map{
case ((cId,(cateId,optionValue))) => {
val checkCount = if(optionValue.isDefined) optionValue.get else 0L
val value = "checkCount="+checkCount
(cId,value)
}
}
val orderJoinRDD = chickJoinRDD.leftOuterJoin(orderCount)
.map{
case (cId,(oValue,optionValue)) => {
val orderCount = if(optionValue.isDefined) optionValue.get else 0L
val value = oValue + "|orderCount=" + orderCount
//(1,"checkCount = 10 | orderCount=13")
(cId,value)
}
}
val payJoinRDD = orderJoinRDD.leftOuterJoin(payCount)
.map{
case (cId,(pValue,payValue)) => {
val payCount = if(payValue.isDefined) payValue.get else 0L
val value = pValue + "|payCount=" + payCount
(cId,value)
}
}
val sortCount = payJoinRDD.map{
case(cid,line) => {
val checkCount = StringUtils.getFieldFromConcatString(line,"\\|","checkCount").toLong
val orderCount = StringUtils.getFieldFromConcatString(line,"\\|","orderCount").toLong
val payCount = StringUtils.getFieldFromConcatString(line,"\\|","payCount").toLong
//计算三列的排序
(CategorySortKey(checkCount,orderCount,payCount),cid)
}
}
//降序排序
val sortCategoryRDD = sortCount.sortByKey(false)
sortCategoryRDD.take(10).foreach(println(_))
sortCategoryRDD.toDF().write.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3306/ssm")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root")
.option("password","root")
.option("dbtable","session_order")//保存到user1数据表
.mode(SaveMode.Append)//SaveMode:帮助创建 如果没有则创建
.save()
}
}