日志数据随机生成
项目结构
依赖
<properties>
<scala.version>2.12.8</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1.spark2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1.spark2</version>
</dependency>
</dependencies>
DataModel&Utils工具类创建
DataModel
/*
* Copyright (c) 2018. Atguigu Inc. All Rights Reserved.
*/
package com.dou.model
//***************** 输入表 *********************
/**
* 用户访问动作表
*
* @param date 用户点击行为的日期
* @param user_id 用户的ID
* @param session_id Session的ID
* @param page_id 某个页面的ID
* @param action_time 点击行为的时间点
* @param search_keyword 用户搜索的关键词
* @param click_category_id 某一个商品品类的ID
* @param click_product_id 某一个商品的ID
* @param order_category_ids 一次订单中所有品类的ID集合
* @param order_product_ids 一次订单中所有商品的ID集合
* @param pay_category_ids 一次支付中所有品类的ID集合
* @param pay_product_ids 一次支付中所有商品的ID集合
* @param city_id 城市ID
*/
case class UserVisitAction(date: String,
user_id: Long,
session_id: String,
page_id: Long,
action_time: String,
search_keyword: String,
click_category_id: Long,
click_product_id: Long,
order_category_ids: String,
order_product_ids: String,
pay_category_ids: String,
pay_product_ids: String,
city_id: Long
)
/**
* 用户信息表
*
* @param user_id 用户的ID
* @param username 用户的名称
* @param name 用户的名字
* @param age 用户的年龄
* @param professional 用户的职业
* @param city 用户所在的城市
* @param sex 用户的性别
*/
case class UserInfo(user_id: Long,
username: String,
name: String,
age: Int,
professional: String,
city: String,
sex: String
)
/**
* 产品表
*
* @param product_id 商品的ID
* @param product_name 商品的名称
* @param extend_info 商品额外的信息
*/
case class ProductInfo(product_id: Long,
product_name: String,
extend_info: String
)
/**
* 聚合统计表
*
* @param taskid 当前计算批次的ID
* @param session_count 所有Session的总和
* @param visit_length_1s_3s_ratio 1-3sSession访问时长占比
* @param visit_length_4s_6s_ratio 4-6sSession访问时长占比
* @param visit_length_7s_9s_ratio 7-9sSession访问时长占比
* @param visit_length_10s_30s_ratio 10-30sSession访问时长占比
* @param visit_length_30s_60s_ratio 30-60sSession访问时长占比
* @param visit_length_1m_3m_ratio 1-3mSession访问时长占比
* @param visit_length_3m_10m_ratio 3-10mSession访问时长占比
* @param visit_length_10m_30m_ratio 10-30mSession访问时长占比
* @param visit_length_30m_ratio 30mSession访问时长占比
* @param step_length_1_3_ratio 1-3步长占比
* @param step_length_4_6_ratio 4-6步长占比
* @param step_length_7_9_ratio 7-9步长占比
* @param step_length_10_30_ratio 10-30步长占比
* @param step_length_30_60_ratio 30-60步长占比
* @param step_length_60_ratio 大于60步长占比
*/
case class SessionAggrStat(taskid: String,
session_count: Long,
visit_length_1s_3s_ratio: Double,
visit_length_4s_6s_ratio: Double,
visit_length_7s_9s_ratio: Double,
visit_length_10s_30s_ratio: Double,
visit_length_30s_60s_ratio: Double,
visit_length_1m_3m_ratio: Double,
visit_length_3m_10m_ratio: Double,
visit_length_10m_30m_ratio: Double,
visit_length_30m_ratio: Double,
step_length_1_3_ratio: Double,
step_length_4_6_ratio: Double,
step_length_7_9_ratio: Double,
step_length_10_30_ratio: Double,
step_length_30_60_ratio: Double,
step_length_60_ratio: Double
)
Utiles
/*
* Copyright (c) 2018. Atguigu Inc. All Rights Reserved.
*/
package com.dou.util
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.json.JSONObject
import scala.collection.mutable
/**
* 日期时间工具类
* 使用Joda实现,使用Java提供的Date会存在线程安全问题
* 使用Joda实现,使用Java提供的Date会存在线程安全问题
*
*/
object DateUtils {
val TIME_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
val DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd")
val DATEKEY_FORMAT = DateTimeFormat.forPattern("yyyyMMdd")
val DATE_TIME_FORMAT = DateTimeFormat.forPattern("yyyyMMddHHmm")
/**
* 判断一个时间是否在另一个时间之前
* @param time1 第一个时间
* @param time2 第二个时间
* @return 判断结果
*/
def before(time1:String, time2:String):Boolean = {
if(TIME_FORMAT.parseDateTime(time1).isBefore(TIME_FORMAT.parseDateTime(time2))) {
return true
}
false
}
/**
* 判断一个时间是否在另一个时间之后
* @param time1 第一个时间
* @param time2 第二个时间
* @return 判断结果
*/
def after(time1:String, time2:String):Boolean = {
if(TIME_FORMAT.parseDateTime(time1).isAfter(TIME_FORMAT.parseDateTime(time2))) {
return true
}
false
}
/**
* 计算时间差值(单位为秒)
* @param time1 时间1
* @param time2 时间2
* @return 差值
*/
def minus(time1:String, time2:String): Int = {
return (TIME_FORMAT.parseDateTime(time1).getMillis - TIME_FORMAT.parseDateTime(time2).getMillis)/1000 toInt
}
/**
* 获取年月日和小时
* @param datetime 时间(yyyy-MM-dd HH:mm:ss)
* @return 结果(yyyy-MM-dd_HH)
*/
def getDateHour(datetime:String):String = {
val date = datetime.split(" ")(0)
val hourMinuteSecond = datetime.split(" ")(1)
val hour = hourMinuteSecond.split(":")(0)
date + "_" + hour
}
/**
* 获取当天日期(yyyy-MM-dd)
* @return 当天日期
*/
def getTodayDate():String = {
DateTime.now().toString(DATE_FORMAT)
}
/**
* 获取昨天的日期(yyyy-MM-dd)
* @return 昨天的日期
*/
def getYesterdayDate():String = {
DateTime.now().minusDays(1).toString(DATE_FORMAT)
}
/**
* 格式化日期(yyyy-MM-dd)
* @param date Date对象
* @return 格式化后的日期
*/
def formatDate(date:Date):String = {
new DateTime(date).toString(DATE_FORMAT)
}
/**
* 格式化时间(yyyy-MM-dd HH:mm:ss)
* @param date Date对象
* @return 格式化后的时间
*/
def formatTime(date:Date):String = {
new DateTime(date).toString(TIME_FORMAT)
}
/**
* 解析时间字符串
* @param time 时间字符串
* @return Date
*/
def parseTime(time:String):Date = {
TIME_FORMAT.parseDateTime(time).toDate
}
def main(args: Array[String]): Unit = {
print(DateUtils.parseTime("2017-10-31 20:27:53"))
}
/**
* 格式化日期key
* @param date
* @return
*/
def formatDateKey(date:Date):String = {
new DateTime(date).toString(DATEKEY_FORMAT)
}
/**
* 格式化日期key
* @return
*/
def parseDateKey(datekey: String ):Date = {
DATEKEY_FORMAT.parseDateTime(datekey).toDate
}
/**
* 格式化时间,保留到分钟级别
* yyyyMMddHHmm
* @param date
* @return
*/
def formatTimeMinute(date: Date):String = {
new DateTime(date).toString(DATE_TIME_FORMAT)
}
}
/**
* 数字格工具类
*
*/
object NumberUtils {
/**
* 格式化小数
* @param scale 四舍五入的位数
* @return 格式化小数
*/
def formatDouble(num:Double, scale:Int):Double = {
val bd = BigDecimal(num)
bd.setScale(scale, BigDecimal.RoundingMode.HALF_UP).doubleValue()
}
}
/**
* 参数工具类
*
*/
object ParamUtils {
/**
* 从JSON对象中提取参数
* @param jsonObject JSON对象
* @return 参数
*/
def getParam(jsonObject:JSONObject, field:String):String = {
jsonObject.getString(field)
/*val jsonArray = jsonObject.getJSONArray(field)
if(jsonArray != null && jsonArray.size() > 0) {
return jsonArray.getString(0)
}
null*/
}
}
/**
* 字符串工具类
*
*/
object StringUtils {
/**
* 判断字符串是否为空
* @param str 字符串
* @return 是否为空
*/
def isEmpty(str:String):Boolean = {
str == null || "".equals(str)
}
/**
* 判断字符串是否不为空
* @param str 字符串
* @return 是否不为空
*/
def isNotEmpty(str:String):Boolean = {
str != null && !"".equals(str)
}
/**
* 截断字符串两侧的逗号
* @param str 字符串
* @return 字符串
*/
def trimComma(str:String):String = {
var result = ""
if(str.startsWith(",")) {
result = str.substring(1)
}
if(str.endsWith(",")) {
result = str.substring(0, str.length() - 1)
}
result
}
/**
* 补全两位数字
* @param str
* @return
*/
def fulfuill(str: String):String = {
if(str.length() == 2) {
str
} else {
"0" + str
}
}
/**
* 从拼接的字符串中提取字段
* @param str 字符串
* @param delimiter 分隔符
* @param field 字段
* @return 字段值
*/
def getFieldFromConcatString(str:String, delimiter:String, field:String):String = {
try {
val fields = str.split(delimiter);
for(concatField <- fields) {
// searchKeywords=|clickCategoryIds=1,2,3
if(concatField.split("=").length == 2) {
val fieldName = concatField.split("=")(0)
val fieldValue = concatField.split("=")(1)
if(fieldName.equals(field)) {
return fieldValue
}
}
}
} catch{
case e:Exception => e.printStackTrace()
}
null
}
/**
* 从拼接的字符串中给字段设置值
* @param str 字符串
* @param delimiter 分隔符
* @param field 字段名
* @param newFieldValue 新的field值
* @return 字段值
*/
def setFieldInConcatString(str:String, delimiter:String, field:String, newFieldValue:String):String = {
val fieldsMap = new mutable.HashMap[String,String]()
for(fileds <- str.split(delimiter)){
var arra = fileds.split("=")
if(arra(0).compareTo(field) == 0)
fieldsMap += (field -> newFieldValue)
else
fieldsMap += (arra(0) -> arra(1))
}
fieldsMap.map(item=> item._1 + "=" + item._2).mkString(delimiter)
}
}
/**
* 校验工具类
*
*/
object ValidUtils {
/**
* 校验数据中的指定字段,是否在指定范围内
* @param data 数据
* @param dataField 数据字段
* @param parameter 参数
* @param startParamField 起始参数字段
* @param endParamField 结束参数字段
* @return 校验结果
*/
def between(data:String, dataField:String, parameter:String, startParamField:String, endParamField:String):Boolean = {
val startParamFieldStr = StringUtils.getFieldFromConcatString(parameter, "\\|", startParamField)
val endParamFieldStr = StringUtils.getFieldFromConcatString(parameter, "\\|", endParamField)
if(startParamFieldStr == null || endParamFieldStr == null) {
return true
}
val startParamFieldValue = startParamFieldStr.toInt
val endParamFieldValue = endParamFieldStr.toInt
val dataFieldStr = StringUtils.getFieldFromConcatString(data, "\\|", dataField)
if(dataFieldStr != null) {
val dataFieldValue = dataFieldStr.toInt
if(dataFieldValue >= startParamFieldValue && dataFieldValue <= endParamFieldValue) {
return true
} else {
return false
}
}
false
}
/**
* 校验数据中的指定字段,是否有值与参数字段的值相同
* @param data 数据
* @param dataField 数据字段
* @param parameter 参数
* @param paramField 参数字段
* @return 校验结果
*/
def in(data:String, dataField:String, parameter:String, paramField:String):Boolean = {
val paramFieldValue = StringUtils.getFieldFromConcatString(parameter, "\\|", paramField)
if(paramFieldValue == null) {
return true
}
val paramFieldValueSplited = paramFieldValue.split(",")
val dataFieldValue = StringUtils.getFieldFromConcatString(data, "\\|", dataField)
if(dataFieldValue != null && dataFieldValue != "-1") {
val dataFieldValueSplited = dataFieldValue.split(",")
for(singleDataFieldValue <- dataFieldValueSplited) {
for(singleParamFieldValue <- paramFieldValueSplited) {
if(singleDataFieldValue.compareTo(singleParamFieldValue) ==0) {
return true
}
}
}
}
false
}
/**
* 校验数据中的指定字段,是否在指定范围内
* @param data 数据
* @param dataField 数据字段
* @param parameter 参数
* @param paramField 参数字段
* @return 校验结果
*/
def equal(data:String, dataField:String, parameter:String, paramField:String):Boolean = {
val paramFieldValue = StringUtils.getFieldFromConcatString(parameter, "\\|", paramField)
if(paramFieldValue == null) {
return true
}
val dataFieldValue = StringUtils.getFieldFromConcatString(data, "\\|", dataField)
if(dataFieldValue != null) {
if(dataFieldValue.compareTo(paramFieldValue) == 0) {
return true
}
}
false
}
}
MockDataGenerate:主方法入口
package com.dou.data
/*
* Copyright (c) 2018. Atguigu Inc. All Rights Reserved.
*/
import java.util.UUID
import com.dou.util.{DateUtils, StringUtils}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import com.dou.model.UserInfo
import com.dou.model.UserVisitAction
import com.dou.model.ProductInfo
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
/**
* 模拟的数据
* date:是当前日期
* age: 0 - 59
* professionals: professional[0 - 59]
* cities: 0 - 9
* sex: 0 - 1
* keywords: ("火锅", "蛋糕", "重庆辣子鸡", "重庆小面", "呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉")
* categoryIds: 0 - 99
* ProductId: 0 - 99
*/
object MockDataGenerate {
/**
* 模拟用户行为信息
*
* @return
*/
private def mockUserVisitActionData(): Array[UserVisitAction] = {
val searchKeywords = Array("华为手机", "联想笔记本", "小龙虾", "卫生纸", "吸尘器", "Lamer", "机器学习", "苹果", "洗面奶", "保温杯")
// yyyy-MM-dd
val date = DateUtils.getTodayDate()
// 关注四个行为:搜索、点击、下单、支付
val actions = Array("search", "click", "order", "pay")
val random = new Random()
val rows = ArrayBuffer[UserVisitAction]()
// 一共100个用户(有重复)
for (i <- 0 to 100) {
val userid = random.nextInt(100)
// 每个用户产生10个session
for (j <- 0 to 10) {
// 不可变的,全局的,独一无二的128bit长度的标识符,用于标识一个session,体现一次会话产生的sessionId是独一无二的
val sessionid = UUID.randomUUID().toString().replace("-", "")
// 在yyyy-MM-dd后面添加一个随机的小时时间(0-23)
val baseActionTime = date + " " + random.nextInt(23)
// 每个(userid + sessionid)生成0-100条用户访问数据
for (k <- 0 to random.nextInt(100)) {
val pageid = random.nextInt(10)
// 在yyyy-MM-dd HH后面添加一个随机的分钟时间和秒时间
val actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59)))
var searchKeyword: String = null
var clickCategoryId: Long = -1L
var clickProductId: Long = -1L
var orderCategoryIds: String = null
var orderProductIds: String = null
var payCategoryIds: String = null
var payProductIds: String = null
val cityid = random.nextInt(10).toLong
// 随机确定用户在当前session中的行为
val action = actions(random.nextInt(4))
// 根据随机产生的用户行为action决定对应字段的值
action match {
case "search" => searchKeyword = searchKeywords(random.nextInt(10))
case "click" => clickCategoryId = random.nextInt(100).toLong
clickProductId = String.valueOf(random.nextInt(100)).toLong
case "order" => orderCategoryIds = random.nextInt(100).toString
orderProductIds = random.nextInt(100).toString
case "pay" => payCategoryIds = random.nextInt(100).toString
payProductIds = random.nextInt(100).toString
}
rows += UserVisitAction(date, userid, sessionid,
pageid, actionTime, searchKeyword,
clickCategoryId, clickProductId,
orderCategoryIds, orderProductIds,
payCategoryIds, payProductIds, cityid)
}
}
}
rows.toArray
}
/**
* 模拟用户信息表
*
* @return
*/
private def mockUserInfo(): Array[UserInfo] = {
val rows = ArrayBuffer[UserInfo]()
val sexes = Array("male", "female")
val random = new Random()
// 随机产生100个用户的个人信息
for (i <- 0 to 100) {
val userid = i
val username = "user" + i
val name = "name" + i
val age = random.nextInt(60)
val professional = "professional" + random.nextInt(100)
val city = "city" + random.nextInt(100)
val sex = sexes(random.nextInt(2))
rows += UserInfo(userid, username, name, age,
professional, city, sex)
}
rows.toArray
}
/**
* 模拟产品数据表
*
* @return
*/
private def mockProductInfo(): Array[ProductInfo] = {
val rows = ArrayBuffer[ProductInfo]()
val random = new Random()
val productStatus = Array(0, 1)
// 随机产生100个产品信息
for (i <- 0 to 100) {
val productId = i
val productName = "product" + i
val extendInfo = "{\"product_status\": " + productStatus(random.nextInt(2)) + "}"
rows += ProductInfo(productId, productName, extendInfo)
}
rows.toArray
}
/**
* 将DataFrame插入到Hive表中
*
* @param spark SparkSQL客户端
* @param tableName 表名
* @param dataDF DataFrame
*/
private def insertHive(spark: SparkSession, tableName: String, dataDF: DataFrame): Unit = {
spark.sql("DROP TABLE IF EXISTS " + tableName)
dataDF.write.saveAsTable(tableName)
}
val USER_VISIT_ACTION_TABLE = "user_visit_action"
val USER_INFO_TABLE = "user_info"
val PRODUCT_INFO_TABLE = "product_info"
/**
* 主入口方法
*
* @param args 启动参数
*/
//作用:帮助创建表
def main(args: Array[String]): Unit = {
// 创建Spark配置
val sparkConf = new SparkConf().setAppName("MockData").setMaster("local[*]")
// 创建Spark SQL 客户端
val spark = SparkSession.builder().config(sparkConf)
.enableHiveSupport().getOrCreate()//enableHiveSupport():当前项目中创建hive数据库数据表。开启内置hive
// 模拟数据
val userVisitActionData = this.mockUserVisitActionData()
val userInfoData = this.mockUserInfo()
val productInfoData = this.mockProductInfo()
// 将模拟数据装换为RDD
val userVisitActionRdd = spark.sparkContext.makeRDD(userVisitActionData)
val userInfoRdd = spark.sparkContext.makeRDD(userInfoData)
val productInfoRdd = spark.sparkContext.makeRDD(productInfoData)
// 加载SparkSQL的隐式转换支持
import spark.implicits._
// 将用户访问数据装换为DF保存到Hive表中
val userVisitActionDF = userVisitActionRdd.toDF()
insertHive(spark, USER_VISIT_ACTION_TABLE, userVisitActionDF)
// 将用户信息数据转换为DF保存到Hive表中
val userInfoDF = userInfoRdd.toDF()
insertHive(spark, USER_INFO_TABLE, userInfoDF)
// 将产品信息数据转换为DF保存到Hive表中
val productInfoDF = productInfoRdd.toDF()
insertHive(spark, PRODUCT_INFO_TABLE, productInfoDF)
spark.close
}
}
main方法启动成功后项目结构生成spark-warehouse&metastore_db表示hive数据仓库创建成功