前10热门商品统计,用户针对种类的点击,购买,支付
 
创建CategorySortKey排序工具类
 
/*
 * Copyright (c) 2018. Atguigu Inc. All Rights Reserved.
 */
package com.obj.session
case class CategorySortKey(val clickCount: Long, val orderCount: Long, val payCount: Long) extends Ordered[CategorySortKey] {
  /** Result of comparing `this` with operand `that`.
    *
    * Implement this method to determine how instances of A will be sorted.
    *
    * Returns `x` where:
    *
    *   - `x < 0` when `this < that`
    *
    *   - `x == 0` when `this == that`
    *
    *   - `x > 0` when  `this > that`
    *
    */
  override def compare(that: CategorySortKey): Int = {
    if (this.clickCount - that.clickCount != 0) {
      return (this.clickCount - that.clickCount).toInt
    } else if (this.orderCount - that.orderCount != 0) {
      return (this.orderCount - that.orderCount).toInt
    } else if (this.payCount - that.payCount != 0) {
      return (this.payCount - that.payCount).toInt
    }
    0
  }
}
 
创建Top10Cate Object启动类
 
package com.obj.session
import com.dou.model.UserVisitAction
import com.dou.util.StringUtils
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.collection.mutable.ArrayBuffer
object Top10Cate {
  //前10热门商品统计,用户针对种类的点击,购买,支付
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local")
      .appName("session")
      .enableHiveSupport()    //开启内置hive
      .getOrCreate()
    val startTimeParam = "2020-08-20 19:34:20"
    val endTimeParam = "2020-08-20 19:40:20"
    import spark.implicits._
    val userVisitRDD = spark
      .sql("select * from user_visit_action where action_time > '" +
      startTimeParam + "' and action_time < '" + endTimeParam+"'")
        .as[UserVisitAction].rdd
    //获得类别ID  flatMap:返回多个值
    val categoryId = userVisitRDD.flatMap(
      userInfo => {
        val result = ArrayBuffer[(Long, Long)]() //为了方便后期聚合 设置成两个值
        //取出所有值组成RDD
        if (userInfo.click_category_id != null) {
          result += ((userInfo.click_category_id, userInfo.click_category_id))
        }
        if (userInfo.order_category_ids != null) {
          for (order_category <- userInfo.order_category_ids) {
            result += ((order_category, order_category))
          }
        }
        //支付id
        if (userInfo.pay_category_ids != null) {
          for (pay_category <- userInfo.pay_category_ids) {
            result += ((pay_category, pay_category))
          }
        }
        result
      }
    )
    //去重
    val distinctCategoryId = categoryId.distinct()
    //获得点击事件ID
    val clickCategoryIds = userVisitRDD.filter(item => item.click_category_id!=null)
      .map(item => (item.click_category_id,1L))
    //获得点击商品种类ID和对应数量
    val clickIdCount = clickCategoryIds.reduceByKey(_ + _)
    //获取下单数据
    val orderCount = userVisitRDD.filter(item => item.order_category_ids!=null)
      //因为订单是数组 使用flatMap循环处理
      .flatMap(
        item => {
          item.order_category_ids
            .split(",")
            .map(line => (line.toLong,1L))
        }
      ).reduceByKey(_+_)
    //支付数量
    val payCount = userVisitRDD.filter(item => item.pay_category_ids!=null)
      .flatMap(
        item => {
          item.pay_category_ids.split(",")
            .map(line => (line.toLong,1L))
        }
      )
    //结合四个数据集
    val chickJoinRDD = distinctCategoryId.leftOuterJoin(clickIdCount)
      .map{
        case ((cId,(cateId,optionValue))) => {
          val checkCount = if(optionValue.isDefined) optionValue.get else 0L
          val value = "checkCount="+checkCount
          (cId,value)
        }
    }
    val orderJoinRDD = chickJoinRDD.leftOuterJoin(orderCount)
      .map{
        case (cId,(oValue,optionValue)) => {
          val orderCount = if(optionValue.isDefined) optionValue.get else 0L
          val value = oValue + "|orderCount=" + orderCount
          //(1,"checkCount = 10 | orderCount=13")
          (cId,value)
        }
      }
    val payJoinRDD = orderJoinRDD.leftOuterJoin(payCount)
      .map{
        case (cId,(pValue,payValue)) => {
          val payCount = if(payValue.isDefined) payValue.get else 0L
          val value = pValue + "|payCount=" + payCount
          (cId,value)
        }
      }
    val sortCount = payJoinRDD.map{
      case(cid,line) => {
        val checkCount = StringUtils.getFieldFromConcatString(line,"\\|","checkCount").toLong
        val orderCount = StringUtils.getFieldFromConcatString(line,"\\|","orderCount").toLong
        val payCount = StringUtils.getFieldFromConcatString(line,"\\|","payCount").toLong
        //计算三列的排序
        (CategorySortKey(checkCount,orderCount,payCount),cid)
      }
    }
    //降序排序
    val sortCategoryRDD = sortCount.sortByKey(false)
    sortCategoryRDD.take(10).foreach(println(_))
    sortCategoryRDD.toDF().write.format("jdbc")
      .option("url","jdbc:mysql://127.0.0.1:3306/ssm")
      .option("driver","com.mysql.jdbc.Driver")
      .option("user","root")
      .option("password","root")
      .option("dbtable","session_order")//保存到user1数据表
      .mode(SaveMode.Append)//SaveMode:帮助创建 如果没有则创建
      .save()
  }
}