大数据分析平台-项目1

tech2024-12-18  24

日志数据随机生成

项目结构

依赖

<properties> <scala.version>2.12.8</scala.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>2.4.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.12</artifactId> <version>2.4.3</version> </dependency> <dependency> <groupId>org.spark-project.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.2.1.spark2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.37</version> </dependency> <dependency> <groupId>org.jblas</groupId> <artifactId>jblas</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>2.4.3</version> </dependency> <dependency> <groupId>org.spark-project.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.2.1.spark2</version> </dependency> </dependencies>

DataModel&Utils工具类创建

DataModel

/* * Copyright (c) 2018. Atguigu Inc. All Rights Reserved. */ package com.dou.model //***************** 输入表 ********************* /** * 用户访问动作表 * * @param date 用户点击行为的日期 * @param user_id 用户的ID * @param session_id Session的ID * @param page_id 某个页面的ID * @param action_time 点击行为的时间点 * @param search_keyword 用户搜索的关键词 * @param click_category_id 某一个商品品类的ID * @param click_product_id 某一个商品的ID * @param order_category_ids 一次订单中所有品类的ID集合 * @param order_product_ids 一次订单中所有商品的ID集合 * @param pay_category_ids 一次支付中所有品类的ID集合 * @param pay_product_ids 一次支付中所有商品的ID集合 * @param city_id 城市ID */ case class UserVisitAction(date: String, user_id: Long, session_id: String, page_id: Long, action_time: String, search_keyword: String, click_category_id: Long, click_product_id: Long, order_category_ids: String, order_product_ids: String, pay_category_ids: String, pay_product_ids: String, city_id: Long ) /** * 用户信息表 * * @param user_id 用户的ID * @param username 用户的名称 * @param name 用户的名字 * @param age 用户的年龄 * @param professional 用户的职业 * @param city 用户所在的城市 * @param sex 用户的性别 */ case class UserInfo(user_id: Long, username: String, name: String, age: Int, professional: String, city: String, sex: String ) /** * 产品表 * * @param product_id 商品的ID * @param product_name 商品的名称 * @param extend_info 商品额外的信息 */ case class ProductInfo(product_id: Long, product_name: String, extend_info: String ) /** * 聚合统计表 * * @param taskid 当前计算批次的ID * @param session_count 所有Session的总和 * @param visit_length_1s_3s_ratio 1-3sSession访问时长占比 * @param visit_length_4s_6s_ratio 4-6sSession访问时长占比 * @param visit_length_7s_9s_ratio 7-9sSession访问时长占比 * @param visit_length_10s_30s_ratio 10-30sSession访问时长占比 * @param visit_length_30s_60s_ratio 30-60sSession访问时长占比 * @param visit_length_1m_3m_ratio 1-3mSession访问时长占比 * @param visit_length_3m_10m_ratio 3-10mSession访问时长占比 * @param visit_length_10m_30m_ratio 10-30mSession访问时长占比 * @param visit_length_30m_ratio 30mSession访问时长占比 * @param step_length_1_3_ratio 1-3步长占比 * @param step_length_4_6_ratio 4-6步长占比 * @param step_length_7_9_ratio 7-9步长占比 * @param step_length_10_30_ratio 10-30步长占比 * @param step_length_30_60_ratio 30-60步长占比 * @param step_length_60_ratio 大于60步长占比 */ case class SessionAggrStat(taskid: String, session_count: Long, visit_length_1s_3s_ratio: Double, visit_length_4s_6s_ratio: Double, visit_length_7s_9s_ratio: Double, visit_length_10s_30s_ratio: Double, visit_length_30s_60s_ratio: Double, visit_length_1m_3m_ratio: Double, visit_length_3m_10m_ratio: Double, visit_length_10m_30m_ratio: Double, visit_length_30m_ratio: Double, step_length_1_3_ratio: Double, step_length_4_6_ratio: Double, step_length_7_9_ratio: Double, step_length_10_30_ratio: Double, step_length_30_60_ratio: Double, step_length_60_ratio: Double )

Utiles

/* * Copyright (c) 2018. Atguigu Inc. All Rights Reserved. */ package com.dou.util import java.text.SimpleDateFormat import java.util.{Calendar, Date} import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat import org.json.JSONObject import scala.collection.mutable /** * 日期时间工具类 * 使用Joda实现,使用Java提供的Date会存在线程安全问题 * 使用Joda实现,使用Java提供的Date会存在线程安全问题 * */ object DateUtils { val TIME_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") val DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd") val DATEKEY_FORMAT = DateTimeFormat.forPattern("yyyyMMdd") val DATE_TIME_FORMAT = DateTimeFormat.forPattern("yyyyMMddHHmm") /** * 判断一个时间是否在另一个时间之前 * @param time1 第一个时间 * @param time2 第二个时间 * @return 判断结果 */ def before(time1:String, time2:String):Boolean = { if(TIME_FORMAT.parseDateTime(time1).isBefore(TIME_FORMAT.parseDateTime(time2))) { return true } false } /** * 判断一个时间是否在另一个时间之后 * @param time1 第一个时间 * @param time2 第二个时间 * @return 判断结果 */ def after(time1:String, time2:String):Boolean = { if(TIME_FORMAT.parseDateTime(time1).isAfter(TIME_FORMAT.parseDateTime(time2))) { return true } false } /** * 计算时间差值(单位为秒) * @param time1 时间1 * @param time2 时间2 * @return 差值 */ def minus(time1:String, time2:String): Int = { return (TIME_FORMAT.parseDateTime(time1).getMillis - TIME_FORMAT.parseDateTime(time2).getMillis)/1000 toInt } /** * 获取年月日和小时 * @param datetime 时间(yyyy-MM-dd HH:mm:ss) * @return 结果(yyyy-MM-dd_HH) */ def getDateHour(datetime:String):String = { val date = datetime.split(" ")(0) val hourMinuteSecond = datetime.split(" ")(1) val hour = hourMinuteSecond.split(":")(0) date + "_" + hour } /** * 获取当天日期(yyyy-MM-dd) * @return 当天日期 */ def getTodayDate():String = { DateTime.now().toString(DATE_FORMAT) } /** * 获取昨天的日期(yyyy-MM-dd) * @return 昨天的日期 */ def getYesterdayDate():String = { DateTime.now().minusDays(1).toString(DATE_FORMAT) } /** * 格式化日期(yyyy-MM-dd) * @param date Date对象 * @return 格式化后的日期 */ def formatDate(date:Date):String = { new DateTime(date).toString(DATE_FORMAT) } /** * 格式化时间(yyyy-MM-dd HH:mm:ss) * @param date Date对象 * @return 格式化后的时间 */ def formatTime(date:Date):String = { new DateTime(date).toString(TIME_FORMAT) } /** * 解析时间字符串 * @param time 时间字符串 * @return Date */ def parseTime(time:String):Date = { TIME_FORMAT.parseDateTime(time).toDate } def main(args: Array[String]): Unit = { print(DateUtils.parseTime("2017-10-31 20:27:53")) } /** * 格式化日期key * @param date * @return */ def formatDateKey(date:Date):String = { new DateTime(date).toString(DATEKEY_FORMAT) } /** * 格式化日期key * @return */ def parseDateKey(datekey: String ):Date = { DATEKEY_FORMAT.parseDateTime(datekey).toDate } /** * 格式化时间,保留到分钟级别 * yyyyMMddHHmm * @param date * @return */ def formatTimeMinute(date: Date):String = { new DateTime(date).toString(DATE_TIME_FORMAT) } } /** * 数字格工具类 * */ object NumberUtils { /** * 格式化小数 * @param scale 四舍五入的位数 * @return 格式化小数 */ def formatDouble(num:Double, scale:Int):Double = { val bd = BigDecimal(num) bd.setScale(scale, BigDecimal.RoundingMode.HALF_UP).doubleValue() } } /** * 参数工具类 * */ object ParamUtils { /** * 从JSON对象中提取参数 * @param jsonObject JSON对象 * @return 参数 */ def getParam(jsonObject:JSONObject, field:String):String = { jsonObject.getString(field) /*val jsonArray = jsonObject.getJSONArray(field) if(jsonArray != null && jsonArray.size() > 0) { return jsonArray.getString(0) } null*/ } } /** * 字符串工具类 * */ object StringUtils { /** * 判断字符串是否为空 * @param str 字符串 * @return 是否为空 */ def isEmpty(str:String):Boolean = { str == null || "".equals(str) } /** * 判断字符串是否不为空 * @param str 字符串 * @return 是否不为空 */ def isNotEmpty(str:String):Boolean = { str != null && !"".equals(str) } /** * 截断字符串两侧的逗号 * @param str 字符串 * @return 字符串 */ def trimComma(str:String):String = { var result = "" if(str.startsWith(",")) { result = str.substring(1) } if(str.endsWith(",")) { result = str.substring(0, str.length() - 1) } result } /** * 补全两位数字 * @param str * @return */ def fulfuill(str: String):String = { if(str.length() == 2) { str } else { "0" + str } } /** * 从拼接的字符串中提取字段 * @param str 字符串 * @param delimiter 分隔符 * @param field 字段 * @return 字段值 */ def getFieldFromConcatString(str:String, delimiter:String, field:String):String = { try { val fields = str.split(delimiter); for(concatField <- fields) { // searchKeywords=|clickCategoryIds=1,2,3 if(concatField.split("=").length == 2) { val fieldName = concatField.split("=")(0) val fieldValue = concatField.split("=")(1) if(fieldName.equals(field)) { return fieldValue } } } } catch{ case e:Exception => e.printStackTrace() } null } /** * 从拼接的字符串中给字段设置值 * @param str 字符串 * @param delimiter 分隔符 * @param field 字段名 * @param newFieldValue 新的field值 * @return 字段值 */ def setFieldInConcatString(str:String, delimiter:String, field:String, newFieldValue:String):String = { val fieldsMap = new mutable.HashMap[String,String]() for(fileds <- str.split(delimiter)){ var arra = fileds.split("=") if(arra(0).compareTo(field) == 0) fieldsMap += (field -> newFieldValue) else fieldsMap += (arra(0) -> arra(1)) } fieldsMap.map(item=> item._1 + "=" + item._2).mkString(delimiter) } } /** * 校验工具类 * */ object ValidUtils { /** * 校验数据中的指定字段,是否在指定范围内 * @param data 数据 * @param dataField 数据字段 * @param parameter 参数 * @param startParamField 起始参数字段 * @param endParamField 结束参数字段 * @return 校验结果 */ def between(data:String, dataField:String, parameter:String, startParamField:String, endParamField:String):Boolean = { val startParamFieldStr = StringUtils.getFieldFromConcatString(parameter, "\\|", startParamField) val endParamFieldStr = StringUtils.getFieldFromConcatString(parameter, "\\|", endParamField) if(startParamFieldStr == null || endParamFieldStr == null) { return true } val startParamFieldValue = startParamFieldStr.toInt val endParamFieldValue = endParamFieldStr.toInt val dataFieldStr = StringUtils.getFieldFromConcatString(data, "\\|", dataField) if(dataFieldStr != null) { val dataFieldValue = dataFieldStr.toInt if(dataFieldValue >= startParamFieldValue && dataFieldValue <= endParamFieldValue) { return true } else { return false } } false } /** * 校验数据中的指定字段,是否有值与参数字段的值相同 * @param data 数据 * @param dataField 数据字段 * @param parameter 参数 * @param paramField 参数字段 * @return 校验结果 */ def in(data:String, dataField:String, parameter:String, paramField:String):Boolean = { val paramFieldValue = StringUtils.getFieldFromConcatString(parameter, "\\|", paramField) if(paramFieldValue == null) { return true } val paramFieldValueSplited = paramFieldValue.split(",") val dataFieldValue = StringUtils.getFieldFromConcatString(data, "\\|", dataField) if(dataFieldValue != null && dataFieldValue != "-1") { val dataFieldValueSplited = dataFieldValue.split(",") for(singleDataFieldValue <- dataFieldValueSplited) { for(singleParamFieldValue <- paramFieldValueSplited) { if(singleDataFieldValue.compareTo(singleParamFieldValue) ==0) { return true } } } } false } /** * 校验数据中的指定字段,是否在指定范围内 * @param data 数据 * @param dataField 数据字段 * @param parameter 参数 * @param paramField 参数字段 * @return 校验结果 */ def equal(data:String, dataField:String, parameter:String, paramField:String):Boolean = { val paramFieldValue = StringUtils.getFieldFromConcatString(parameter, "\\|", paramField) if(paramFieldValue == null) { return true } val dataFieldValue = StringUtils.getFieldFromConcatString(data, "\\|", dataField) if(dataFieldValue != null) { if(dataFieldValue.compareTo(paramFieldValue) == 0) { return true } } false } }

MockDataGenerate:主方法入口

package com.dou.data /* * Copyright (c) 2018. Atguigu Inc. All Rights Reserved. */ import java.util.UUID import com.dou.util.{DateUtils, StringUtils} import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import com.dou.model.UserInfo import com.dou.model.UserVisitAction import com.dou.model.ProductInfo import scala.collection.mutable.ArrayBuffer import scala.util.Random /** * 模拟的数据 * date:是当前日期 * age: 0 - 59 * professionals: professional[0 - 59] * cities: 0 - 9 * sex: 0 - 1 * keywords: ("火锅", "蛋糕", "重庆辣子鸡", "重庆小面", "呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉") * categoryIds: 0 - 99 * ProductId: 0 - 99 */ object MockDataGenerate { /** * 模拟用户行为信息 * * @return */ private def mockUserVisitActionData(): Array[UserVisitAction] = { val searchKeywords = Array("华为手机", "联想笔记本", "小龙虾", "卫生纸", "吸尘器", "Lamer", "机器学习", "苹果", "洗面奶", "保温杯") // yyyy-MM-dd val date = DateUtils.getTodayDate() // 关注四个行为:搜索、点击、下单、支付 val actions = Array("search", "click", "order", "pay") val random = new Random() val rows = ArrayBuffer[UserVisitAction]() // 一共100个用户(有重复) for (i <- 0 to 100) { val userid = random.nextInt(100) // 每个用户产生10个session for (j <- 0 to 10) { // 不可变的,全局的,独一无二的128bit长度的标识符,用于标识一个session,体现一次会话产生的sessionId是独一无二的 val sessionid = UUID.randomUUID().toString().replace("-", "") // 在yyyy-MM-dd后面添加一个随机的小时时间(0-23) val baseActionTime = date + " " + random.nextInt(23) // 每个(userid + sessionid)生成0-100条用户访问数据 for (k <- 0 to random.nextInt(100)) { val pageid = random.nextInt(10) // 在yyyy-MM-dd HH后面添加一个随机的分钟时间和秒时间 val actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) var searchKeyword: String = null var clickCategoryId: Long = -1L var clickProductId: Long = -1L var orderCategoryIds: String = null var orderProductIds: String = null var payCategoryIds: String = null var payProductIds: String = null val cityid = random.nextInt(10).toLong // 随机确定用户在当前session中的行为 val action = actions(random.nextInt(4)) // 根据随机产生的用户行为action决定对应字段的值 action match { case "search" => searchKeyword = searchKeywords(random.nextInt(10)) case "click" => clickCategoryId = random.nextInt(100).toLong clickProductId = String.valueOf(random.nextInt(100)).toLong case "order" => orderCategoryIds = random.nextInt(100).toString orderProductIds = random.nextInt(100).toString case "pay" => payCategoryIds = random.nextInt(100).toString payProductIds = random.nextInt(100).toString } rows += UserVisitAction(date, userid, sessionid, pageid, actionTime, searchKeyword, clickCategoryId, clickProductId, orderCategoryIds, orderProductIds, payCategoryIds, payProductIds, cityid) } } } rows.toArray } /** * 模拟用户信息表 * * @return */ private def mockUserInfo(): Array[UserInfo] = { val rows = ArrayBuffer[UserInfo]() val sexes = Array("male", "female") val random = new Random() // 随机产生100个用户的个人信息 for (i <- 0 to 100) { val userid = i val username = "user" + i val name = "name" + i val age = random.nextInt(60) val professional = "professional" + random.nextInt(100) val city = "city" + random.nextInt(100) val sex = sexes(random.nextInt(2)) rows += UserInfo(userid, username, name, age, professional, city, sex) } rows.toArray } /** * 模拟产品数据表 * * @return */ private def mockProductInfo(): Array[ProductInfo] = { val rows = ArrayBuffer[ProductInfo]() val random = new Random() val productStatus = Array(0, 1) // 随机产生100个产品信息 for (i <- 0 to 100) { val productId = i val productName = "product" + i val extendInfo = "{\"product_status\": " + productStatus(random.nextInt(2)) + "}" rows += ProductInfo(productId, productName, extendInfo) } rows.toArray } /** * 将DataFrame插入到Hive表中 * * @param spark SparkSQL客户端 * @param tableName 表名 * @param dataDF DataFrame */ private def insertHive(spark: SparkSession, tableName: String, dataDF: DataFrame): Unit = { spark.sql("DROP TABLE IF EXISTS " + tableName) dataDF.write.saveAsTable(tableName) } val USER_VISIT_ACTION_TABLE = "user_visit_action" val USER_INFO_TABLE = "user_info" val PRODUCT_INFO_TABLE = "product_info" /** * 主入口方法 * * @param args 启动参数 */ //作用:帮助创建表 def main(args: Array[String]): Unit = { // 创建Spark配置 val sparkConf = new SparkConf().setAppName("MockData").setMaster("local[*]") // 创建Spark SQL 客户端 val spark = SparkSession.builder().config(sparkConf) .enableHiveSupport().getOrCreate()//enableHiveSupport():当前项目中创建hive数据库数据表。开启内置hive // 模拟数据 val userVisitActionData = this.mockUserVisitActionData() val userInfoData = this.mockUserInfo() val productInfoData = this.mockProductInfo() // 将模拟数据装换为RDD val userVisitActionRdd = spark.sparkContext.makeRDD(userVisitActionData) val userInfoRdd = spark.sparkContext.makeRDD(userInfoData) val productInfoRdd = spark.sparkContext.makeRDD(productInfoData) // 加载SparkSQL的隐式转换支持 import spark.implicits._ // 将用户访问数据装换为DF保存到Hive表中 val userVisitActionDF = userVisitActionRdd.toDF() insertHive(spark, USER_VISIT_ACTION_TABLE, userVisitActionDF) // 将用户信息数据转换为DF保存到Hive表中 val userInfoDF = userInfoRdd.toDF() insertHive(spark, USER_INFO_TABLE, userInfoDF) // 将产品信息数据转换为DF保存到Hive表中 val productInfoDF = productInfoRdd.toDF() insertHive(spark, PRODUCT_INFO_TABLE, productInfoDF) spark.close } }

main方法启动成功后项目结构生成spark-warehouse&metastore_db表示hive数据仓库创建成功

最新回复(0)