object IdMappingGen {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val spark = SparkSession.builder().appName("IDMAPPING映射字典生成").master("local[*]").getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
// 加载T日的日志
val schema = new StructType()
.add("deviceid",DataTypes.StringType)
.add("account",DataTypes.StringType)
.add("timestamp",DataTypes.LongType)
val t_log = spark.read.option("header",true).schema(schema).csv("dw_etl\\data\\idmp_testdata\\T_day_log")
t_log.show(100,false)
// 1. 对当天日志中的访问记录进行账号评分
// 1.1 过滤掉完全没有账号登录的设备记录
val ifBlank = (s:String)=>{StringUtils.isBlank(s)}
spark.udf.register("isBlank",ifBlank)
// 有登录记录的设备
val haveAccount = t_log.where("!isBlank(account)")
t_log.cache()
haveAccount.show(100,false)
// 没有账号登录记录的设备
val noAccount = t_log.groupBy("deviceid").agg(max("account") as "account",min("timestamp") as "ts").where("account is null")
// 1.2 对同一个设备上的同一个账号,取时间最早的第一条
val firstRecord = haveAccount.groupBy("deviceid","account").agg(min("timestamp") as "ts")
firstRecord.show(100,false)
// 1.3 对每个设备上的每个登录账号评分(越早分越高)
val window = Window.partitionBy("deviceid").orderBy("ts")
val scored = firstRecord.select('deviceid,'account,'ts,row_number() over(window) as "rn")
.selectExpr("deviceid","account","ts","100-(rn-1)*10 as score")
scored.show(100,false)
// 1.4 将账号评分结果 union 无账号的数据
val t_result = scored.union(noAccount.selectExpr("deviceid","account","ts","null"))
t_result.show(100,false)
// 2. 加载T-1日的映射字典
// 2.1 读取数据
val preIdmp = spark.read.json("dw_etl\\data\\idmp_testdata\\T-1dict_out")
preIdmp.printSchema()
preIdmp.show(100,false)
preIdmp.createTempView("preidmp")
/**
* +--------+-----+-----------------------------+
* |deviceid|guid |uid_list |
* +--------+-----+-----------------------------+
* |did01 |u01 |[[u01, 100, 1]] |
* |did02 |u02 |[[u02, 100, 3], [u03, 90, 8]]|
* |did03 |did03|[] |
* +--------+-----+-----------------------------+
*/
// 2.2 格式扁平化
val preUidScore = spark.sql(
"""
|
|select
|deviceid,
|uid_score.account as account,
|uid_score.timestamp as ts,
|uid_score.score as score
|
|from preidmp lateral view explode(uid_list) tmp as uid_score
|
|""".stripMargin)
preUidScore.show(100,false)
// 2.3 将历史idmp数据中,账号绑定列表为空的数据单独拎出来
val preNoUid = preIdmp.where("size(uid_list)=0").selectExpr("deviceid","null","null","null")
// 3.合并T和T-1
// 3.1 分数聚合
val wholeDevices = t_result.union(preUidScore).union(preNoUid)
wholeDevices.show(100,false)
wholeDevices.createTempView("whole")
val todayUidScoreResult = spark.sql(
"""
|select
|deviceid,
|account,
|ts,
|score
|from
|(
|select
|
|deviceid,
|account,
|ts,
|score,
|row_number() over(partition by deviceid order by account desc) as rn
|
|from
|(
|select
|
|deviceid,
|account,
|min(ts) as ts,
|sum(score) as score
|
|from whole
|group by deviceid,account
|) o1
|) o2
|where !(rn>1 and account is null)
|
|""".stripMargin)
.show(100,false)
/**
* +--------+-------+---+-----+
* |deviceid|account|ts |score|
* +--------+-------+---+-----+
* |did04 |null |16 |null |
* |did03 |u04 |12 |100 |
* |did02 |u03 |8 |190 |
* |did02 |u02 |3 |190 |
* |did01 |u01 |1 |200 |
* +--------+-------+---+-----+
*/
// 3.2 将同一个设备的账号评分整理到一起,选出guid,并转成json格式
// TODO
// 保存结果
spark.close()
}