大数据分析平台-项目4

tech2024-12-15  22

前10热门商品统计,用户针对种类的点击,购买,支付

创建CategorySortKey排序工具类

/* * Copyright (c) 2018. Atguigu Inc. All Rights Reserved. */ package com.obj.session case class CategorySortKey(val clickCount: Long, val orderCount: Long, val payCount: Long) extends Ordered[CategorySortKey] { /** Result of comparing `this` with operand `that`. * * Implement this method to determine how instances of A will be sorted. * * Returns `x` where: * * - `x < 0` when `this < that` * * - `x == 0` when `this == that` * * - `x > 0` when `this > that` * */ override def compare(that: CategorySortKey): Int = { if (this.clickCount - that.clickCount != 0) { return (this.clickCount - that.clickCount).toInt } else if (this.orderCount - that.orderCount != 0) { return (this.orderCount - that.orderCount).toInt } else if (this.payCount - that.payCount != 0) { return (this.payCount - that.payCount).toInt } 0 } }

创建Top10Cate Object启动类

package com.obj.session import com.dou.model.UserVisitAction import com.dou.util.StringUtils import org.apache.spark.sql.{SaveMode, SparkSession} import scala.collection.mutable.ArrayBuffer object Top10Cate { //前10热门商品统计,用户针对种类的点击,购买,支付 def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .appName("session") .enableHiveSupport() //开启内置hive .getOrCreate() val startTimeParam = "2020-08-20 19:34:20" val endTimeParam = "2020-08-20 19:40:20" import spark.implicits._ val userVisitRDD = spark .sql("select * from user_visit_action where action_time > '" + startTimeParam + "' and action_time < '" + endTimeParam+"'") .as[UserVisitAction].rdd //获得类别ID flatMap:返回多个值 val categoryId = userVisitRDD.flatMap( userInfo => { val result = ArrayBuffer[(Long, Long)]() //为了方便后期聚合 设置成两个值 //取出所有值组成RDD if (userInfo.click_category_id != null) { result += ((userInfo.click_category_id, userInfo.click_category_id)) } if (userInfo.order_category_ids != null) { for (order_category <- userInfo.order_category_ids) { result += ((order_category, order_category)) } } //支付id if (userInfo.pay_category_ids != null) { for (pay_category <- userInfo.pay_category_ids) { result += ((pay_category, pay_category)) } } result } ) //去重 val distinctCategoryId = categoryId.distinct() //获得点击事件ID val clickCategoryIds = userVisitRDD.filter(item => item.click_category_id!=null) .map(item => (item.click_category_id,1L)) //获得点击商品种类ID和对应数量 val clickIdCount = clickCategoryIds.reduceByKey(_ + _) //获取下单数据 val orderCount = userVisitRDD.filter(item => item.order_category_ids!=null) //因为订单是数组 使用flatMap循环处理 .flatMap( item => { item.order_category_ids .split(",") .map(line => (line.toLong,1L)) } ).reduceByKey(_+_) //支付数量 val payCount = userVisitRDD.filter(item => item.pay_category_ids!=null) .flatMap( item => { item.pay_category_ids.split(",") .map(line => (line.toLong,1L)) } ) //结合四个数据集 val chickJoinRDD = distinctCategoryId.leftOuterJoin(clickIdCount) .map{ case ((cId,(cateId,optionValue))) => { val checkCount = if(optionValue.isDefined) optionValue.get else 0L val value = "checkCount="+checkCount (cId,value) } } val orderJoinRDD = chickJoinRDD.leftOuterJoin(orderCount) .map{ case (cId,(oValue,optionValue)) => { val orderCount = if(optionValue.isDefined) optionValue.get else 0L val value = oValue + "|orderCount=" + orderCount //(1,"checkCount = 10 | orderCount=13") (cId,value) } } val payJoinRDD = orderJoinRDD.leftOuterJoin(payCount) .map{ case (cId,(pValue,payValue)) => { val payCount = if(payValue.isDefined) payValue.get else 0L val value = pValue + "|payCount=" + payCount (cId,value) } } val sortCount = payJoinRDD.map{ case(cid,line) => { val checkCount = StringUtils.getFieldFromConcatString(line,"\\|","checkCount").toLong val orderCount = StringUtils.getFieldFromConcatString(line,"\\|","orderCount").toLong val payCount = StringUtils.getFieldFromConcatString(line,"\\|","payCount").toLong //计算三列的排序 (CategorySortKey(checkCount,orderCount,payCount),cid) } } //降序排序 val sortCategoryRDD = sortCount.sortByKey(false) sortCategoryRDD.take(10).foreach(println(_)) sortCategoryRDD.toDF().write.format("jdbc") .option("url","jdbc:mysql://127.0.0.1:3306/ssm") .option("driver","com.mysql.jdbc.Driver") .option("user","root") .option("password","root") .option("dbtable","session_order")//保存到user1数据表 .mode(SaveMode.Append)//SaveMode:帮助创建 如果没有则创建 .save() } }
最新回复(0)