* Copyright (c) 2018. Atguigu Inc. All Rights Reserved.
package com.obj.session
case class CategorySortKey(val clickCount: Long, val orderCount: Long, val payCount: Long) extends Ordered[CategorySortKey] {
/** Result of comparing `this` with operand `that`.
* Implement this method to determine how instances of A will be sorted.
* Returns `x` where:
* - `x < 0` when `this < that`
* - `x == 0` when `this == that`
* - `x > 0` when `this > that`
override def compare(that: CategorySortKey): Int = {
if (this.clickCount - that.clickCount != 0) {
return (this.clickCount - that.clickCount).toInt
} else if (this.orderCount - that.orderCount != 0) {
return (this.orderCount - that.orderCount).toInt
} else if (this.payCount - that.payCount != 0) {
return (this.payCount - that.payCount).toInt
创建Top10Cate Object启动类
package com.obj.session
import com.dou.model.UserVisitAction
import com.dou.util.StringUtils
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.collection.mutable.ArrayBuffer
object Top10Cate {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.enableHiveSupport() //开启内置hive
val startTimeParam = "2020-08-20 19:34:20"
val endTimeParam = "2020-08-20 19:40:20"
import spark.implicits._
val userVisitRDD = spark
.sql("select * from user_visit_action where action_time > '" +
startTimeParam + "' and action_time < '" + endTimeParam+"'")
//获得类别ID flatMap:返回多个值
val categoryId = userVisitRDD.flatMap(
userInfo => {
val result = ArrayBuffer[(Long, Long)]() //为了方便后期聚合 设置成两个值
if (userInfo.click_category_id != null) {
result += ((userInfo.click_category_id, userInfo.click_category_id))
if (userInfo.order_category_ids != null) {
for (order_category <- userInfo.order_category_ids) {
result += ((order_category, order_category))
if (userInfo.pay_category_ids != null) {
for (pay_category <- userInfo.pay_category_ids) {
result += ((pay_category, pay_category))
val distinctCategoryId = categoryId.distinct()
val clickCategoryIds = userVisitRDD.filter(item => item.click_category_id!=null)
.map(item => (item.click_category_id,1L))
val clickIdCount = clickCategoryIds.reduceByKey(_ + _)
val orderCount = userVisitRDD.filter(item => item.order_category_ids!=null)
//因为订单是数组 使用flatMap循环处理
item => {
.map(line => (line.toLong,1L))
val payCount = userVisitRDD.filter(item => item.pay_category_ids!=null)
item => {
.map(line => (line.toLong,1L))
val chickJoinRDD = distinctCategoryId.leftOuterJoin(clickIdCount)
case ((cId,(cateId,optionValue))) => {
val checkCount = if(optionValue.isDefined) optionValue.get else 0L
val value = "checkCount="+checkCount
val orderJoinRDD = chickJoinRDD.leftOuterJoin(orderCount)
case (cId,(oValue,optionValue)) => {
val orderCount = if(optionValue.isDefined) optionValue.get else 0L
val value = oValue + "|orderCount=" + orderCount
//(1,"checkCount = 10 | orderCount=13")
val payJoinRDD = orderJoinRDD.leftOuterJoin(payCount)
case (cId,(pValue,payValue)) => {
val payCount = if(payValue.isDefined) payValue.get else 0L
val value = pValue + "|payCount=" + payCount
val sortCount = payJoinRDD.map{
case(cid,line) => {
val checkCount = StringUtils.getFieldFromConcatString(line,"\\|","checkCount").toLong
val orderCount = StringUtils.getFieldFromConcatString(line,"\\|","orderCount").toLong
val payCount = StringUtils.getFieldFromConcatString(line,"\\|","payCount").toLong
val sortCategoryRDD = sortCount.sortByKey(false)
.mode(SaveMode.Append)//SaveMode:帮助创建 如果没有则创建