大数据分析平台-项目3

tech2024-12-17  21

项目功能流程

​ 需求:获得用户访问初始时间和结束时间 通多计算步长来计算用户所占时间比 加入条件性别 年龄时长进行筛选

​ 当用户访问后会有session存在数据库或者存在日志中

​ 转化所有数据>计算>保存

用户操作,放在一起 转化格式 session对应所有数据 value(Array) 统计每一个用户的Array 再遍历处理 计算 步长 时长 转换为范围 便于统计 >计算百分比 >永久性存储数据库 (存储多种格式 比如数据库 redis Hive hBase) >Echarts展示数据

创建工具类Constant

object Constant { val TIME_1S_3S ="1s_3s" val TIME_4S_6S ="4s_6s" val TIME_7S_9S ="7s_9s" val TIME_10S_30S ="10s_30s" val TIME_30S_60S ="30s_60s" val TIME_1M_10M ="1m_10m" val TIME_10M_30M ="10m_30m" val TIME_30M = "30m+" val STEP_1_3 ="1_3" val STEP_4_6 ="4_6" val STEP_7_9 ="7_9" val STEP_10_30 ="10_30" val STEP_30_60 ="30_60" val STEP_60 ="60+" val SESSION_COUNT = "session_count" }

创建UserSeesion Object启动类

package com.obj.session import com.dou.model.{SessionAggrStat, UserInfo, UserVisitAction} import com.dou.util.{DateUtils, NumberUtils} import org.apache.spark.sql.{SaveMode, SparkSession} import java.util.{Date, UUID} import com.alibaba.fastjson.JSON object UserSession { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .appName("session") .enableHiveSupport() //开启内置hive .getOrCreate() //生产环境下,参数需要从外部传入 //设定条件,计算数据,最后得到结果,然后使用es展示在后台管理系统 //和当前项目结合使用 /* 页面埋点记录数据 发送后台记录日志 spark解析日志,计算,存入数据库 后台展示数据 * */ //可以根据用户性别 年龄 职业。。计算session val startTimeParam = "2020-08-20 19:34:20" val endTimeParam = "2020-08-20 19:40:20" //定义json类型参数 val json = "{starTimeParam:'2020-08-20 19:34:20'," + "endTimeParam:'2020-08-20 19:40:20'," + "age:'1-10'," + "city:'null'," + "sex:'null'}" val param = JSON.parseObject(json) import spark.implicits._//隐式转换 //hive中读取这张表 val userVisitRDD = spark.sql("select * from user_visit_action where action_time > '" + startTimeParam + "' and action_time < '" + endTimeParam+"'") .as[UserVisitAction].rdd//转换模板类或许数据 会变成一个对象 用的话直接对象. val sessionUserVisit = userVisitRDD //再次转换 userVisit.session_id:作为key 进行聚合 .map{case(userVisit) => (userVisit.user_id,userVisit)} //加上条件筛选 //读取user_info用户相关信息表数据 val userInfoDF = spark .sql("select * from user_info") .as[UserInfo].rdd .map(item => (item.user_id,item)) //用户表结合在一起 val sessionUserRDD = sessionUserVisit.join(userInfoDF).map { case (uId, (userVisit, userInfo)) => { //聚合过的 (userVisit.session_id, (userInfo.city, userInfo.sex, userInfo.age, userVisit.action_time)) } } import spark.implicits val sessionFilterUserRDD = sessionUserRDD.filter { case (sessionid,user) => { val sex = user._2 val age = user._3 val city = user._1 var istrue = true val paramAge = param.getString("age") if(!paramAge.equals("null")){ val ages = paramAge.split("-") //如果有条件去判断 改值 if (age < ages(0).toInt || age > ages(1).toInt){ istrue = false } } if(!city.equals("null")){ if(!city.equals(city)){ istrue = false } } if(!sex.equals("null")){ if(!sex.equals(sex)){ istrue = false } } istrue; //如果为false删除 } } val session2UserVisit = sessionFilterUserRDD.groupByKey() val sessionVisitRDD = session2UserVisit.map{ case (sessionId,userInfos) => { //步长 var step = 0 //起始时间 var startTime:Date = null var endTime:Date = null //userInfo:city,sex,age,action_time userInfos.foreach( userInfo => { //目的:希望获得用户开始访问时间 //actionTime:活动时间 字符串转换时间类型 val actionTime = DateUtils.parseTime(userInfo._4) //为起始时间赋值 if(startTime==null){ startTime = actionTime } if(endTime==null){ endTime = actionTime } //获得最后的值 before:对比时间 测试此日期是否早于指定日期 if(actionTime.before(startTime)){ //如果actionTime在startTime之前 就重新赋值 startTime = actionTime } //after:之后 //直到用户访问的时间是最开始的时间 和最后的时间 if(actionTime.after(endTime)){ endTime = actionTime } step += 1 } ) //经过遍历得到长度 getTime:获得时间 val visitLeng = (endTime.getTime - startTime.getTime) / 1000 (sessionId,visitLeng,step) } } //计算每个级别所占比例 统计后的数据 //计算长度 最后计算数量 var sessionAccumulator = new SessionAccumulator spark.sparkContext.register(sessionAccumulator) def getTimeLength(visitLeng:Long)={ if(visitLeng >= 1&& visitLeng <= 3){ //使用累加器计算数量 //1~3秒时间段 sessionAccumulator.add(Constant.TIME_1S_3S) }else if(visitLeng >= 4&& visitLeng <= 6){ sessionAccumulator.add(Constant.TIME_4S_6S) }else if(visitLeng >= 7&& visitLeng <= 9){ sessionAccumulator.add(Constant.TIME_7S_9S) }else if(visitLeng >= 10&& visitLeng <= 30){ sessionAccumulator.add(Constant.TIME_10S_30S) }else if(visitLeng > 30&& visitLeng <= 60){ sessionAccumulator.add(Constant.TIME_30S_60S) }else if(visitLeng > 60&& visitLeng <= 600){ sessionAccumulator.add(Constant.TIME_1M_10M) }else if(visitLeng > 600&& visitLeng <= 1800){ sessionAccumulator.add(Constant.TIME_10M_30M) }else if(visitLeng > 1800){ sessionAccumulator.add(Constant.TIME_30M) } } //范围计算 def getStepLength(visitLength:Long)={ if(visitLength >= 1&& visitLength <= 3){ //使用累加器计算数量 //1~3秒时间段 sessionAccumulator.add(Constant.STEP_1_3) }else if(visitLength >= 4&& visitLength <= 6){ sessionAccumulator.add(Constant.STEP_4_6) }else if(visitLength >= 7&& visitLength <= 9){ sessionAccumulator.add(Constant.STEP_7_9) }else if(visitLength >= 10&& visitLength <= 30){ sessionAccumulator.add(Constant.STEP_10_30) }else if(visitLength > 30&& visitLength <= 60){ sessionAccumulator.add(Constant.STEP_30_60) }else if(visitLength > 60){ sessionAccumulator.add(Constant.STEP_60) } } //懒加载 val result = sessionVisitRDD.map{ case(sessionId,visitLeng,step)=>{ //主要调用累加器 //得到一共多少次处理 session的数量 也可直接.count都可以 sessionAccumulator.add(Constant.SESSION_COUNT) getTimeLength(visitLeng) getStepLength(step) (sessionId) } }.count() //计算每一步所占比 //首先需要得到总共数量 val value = sessionAccumulator.value val session_count:Double = value(Constant.SESSION_COUNT).toDouble //计算时长 getOrElse:用于获得key对应的value 如果没有返回为0 val time_length_1s_3s = value.getOrElse(Constant.TIME_1S_3S,0) val time_length_4s_6s = value.getOrElse(Constant.TIME_4S_6S,0) val time_length_7s_9s = value.getOrElse(Constant.TIME_7S_9S,0) val time_length_10s_30s = value.getOrElse(Constant.TIME_10S_30S,0) val time_length_30s_60s = value.getOrElse(Constant.TIME_30S_60S,0) val time_length_1m_10m = value.getOrElse(Constant.TIME_1M_10M,0) val time_length_10m_30m = value.getOrElse(Constant.TIME_10M_30M,0) val time_length_30m = value.getOrElse(Constant.TIME_30M,0) //获得步长 val step_length_1_3 = value.getOrElse(Constant.STEP_1_3,0) val step_length_4_6 = value.getOrElse(Constant.STEP_4_6,0) val step_length_7_9 = value.getOrElse(Constant.STEP_7_9,0) val step_length_10_30 = value.getOrElse(Constant.STEP_10_30,0) val step_length_30_60 = value.getOrElse(Constant.STEP_30_60,0) val step_length_60 = value.getOrElse(Constant.STEP_60,0) //计算所占比例 val time_length_1s_3s_ratio = NumberUtils.formatDouble(time_length_1s_3s/session_count,2) val time_length_4s_6s_ratio = NumberUtils.formatDouble(time_length_4s_6s/session_count,2) val time_length_7s_9s_ratio = NumberUtils.formatDouble(time_length_7s_9s/session_count,2) val time_length_10s_30s_ratio = NumberUtils.formatDouble(time_length_10s_30s/session_count,2) val time_length_30s_60s_ratio = NumberUtils.formatDouble(time_length_30s_60s/session_count,2) val time_length_1m_10m_ratio = NumberUtils.formatDouble(time_length_1m_10m/session_count,2) val time_length_10m_30m_ratio = NumberUtils.formatDouble(time_length_10m_30m/session_count,2) val time_length_30m_ratio = NumberUtils.formatDouble(time_length_30m/session_count,2) //计算步长所占比 val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3/session_count,2) val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6/session_count,2) val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9/session_count,2) val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30/session_count,2) val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60/session_count,2) val step_length_60_ratio = NumberUtils.formatDouble(step_length_60/session_count,2) //执行保存操作 //声明id作为主键存储 生成随机数作为ID val taskId = UUID.randomUUID().toString val sessionStart = SessionAggrStat(taskId,session_count,time_length_1s_3s_ratio,time_length_4s_6s_ratio, time_length_7s_9s_ratio,time_length_10s_30s_ratio,time_length_30s_60s_ratio, time_length_1m_10m_ratio,time_length_10m_30m_ratio,time_length_30m_ratio, step_length_1_3_ratio,step_length_4_6_ratio,step_length_7_9_ratio,step_length_10_30_ratio, step_length_30_60_ratio,step_length_60_ratio) //存储数据库 转换RDD模式 makeRDD:把数组转换为RDD格式 val sessionStartRDD = spark.sparkContext.makeRDD(Array(sessionStart)) //转换dataform格式 sessionStartRDD.toDF().write.format("jdbc") .option("url","jdbc:mysql://127.0.0.1:3306/ssm") .option("driver","com.mysql.jdbc.Driver") .option("user","root") .option("password","root") .option("dbtable","session_star")//保存到user1数据表 .mode(SaveMode.Append)//SaveMode:帮助创建 如果没有则创建 .save() // result.count() // //sessionVisitRDD.take(10).foreach(println(_)) // val value = sessionAccumulator.value // println(value(Constant.TIME_30M)) } }
最新回复(0)