idmaping代码实现

tech2022-07-30  144

object IdMappingGen { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession.builder().appName("IDMAPPING映射字典生成").master("local[*]").getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ // 加载T日的日志 val schema = new StructType() .add("deviceid",DataTypes.StringType) .add("account",DataTypes.StringType) .add("timestamp",DataTypes.LongType) val t_log = spark.read.option("header",true).schema(schema).csv("dw_etl\\data\\idmp_testdata\\T_day_log") t_log.show(100,false) // 1. 对当天日志中的访问记录进行账号评分 // 1.1 过滤掉完全没有账号登录的设备记录 val ifBlank = (s:String)=>{StringUtils.isBlank(s)} spark.udf.register("isBlank",ifBlank) // 有登录记录的设备 val haveAccount = t_log.where("!isBlank(account)") t_log.cache() haveAccount.show(100,false) // 没有账号登录记录的设备 val noAccount = t_log.groupBy("deviceid").agg(max("account") as "account",min("timestamp") as "ts").where("account is null") // 1.2 对同一个设备上的同一个账号,取时间最早的第一条 val firstRecord = haveAccount.groupBy("deviceid","account").agg(min("timestamp") as "ts") firstRecord.show(100,false) // 1.3 对每个设备上的每个登录账号评分(越早分越高) val window = Window.partitionBy("deviceid").orderBy("ts") val scored = firstRecord.select('deviceid,'account,'ts,row_number() over(window) as "rn") .selectExpr("deviceid","account","ts","100-(rn-1)*10 as score") scored.show(100,false) // 1.4 将账号评分结果 union 无账号的数据 val t_result = scored.union(noAccount.selectExpr("deviceid","account","ts","null")) t_result.show(100,false) // 2. 加载T-1日的映射字典 // 2.1 读取数据 val preIdmp = spark.read.json("dw_etl\\data\\idmp_testdata\\T-1dict_out") preIdmp.printSchema() preIdmp.show(100,false) preIdmp.createTempView("preidmp") /** * +--------+-----+-----------------------------+ * |deviceid|guid |uid_list | * +--------+-----+-----------------------------+ * |did01 |u01 |[[u01, 100, 1]] | * |did02 |u02 |[[u02, 100, 3], [u03, 90, 8]]| * |did03 |did03|[] | * +--------+-----+-----------------------------+ */ // 2.2 格式扁平化 val preUidScore = spark.sql( """ | |select |deviceid, |uid_score.account as account, |uid_score.timestamp as ts, |uid_score.score as score | |from preidmp lateral view explode(uid_list) tmp as uid_score | |""".stripMargin) preUidScore.show(100,false) // 2.3 将历史idmp数据中,账号绑定列表为空的数据单独拎出来 val preNoUid = preIdmp.where("size(uid_list)=0").selectExpr("deviceid","null","null","null") // 3.合并T和T-1 // 3.1 分数聚合 val wholeDevices = t_result.union(preUidScore).union(preNoUid) wholeDevices.show(100,false) wholeDevices.createTempView("whole") val todayUidScoreResult = spark.sql( """ |select |deviceid, |account, |ts, |score |from |( |select | |deviceid, |account, |ts, |score, |row_number() over(partition by deviceid order by account desc) as rn | |from |( |select | |deviceid, |account, |min(ts) as ts, |sum(score) as score | |from whole |group by deviceid,account |) o1 |) o2 |where !(rn>1 and account is null) | |""".stripMargin) .show(100,false) /** * +--------+-------+---+-----+ * |deviceid|account|ts |score| * +--------+-------+---+-----+ * |did04 |null |16 |null | * |did03 |u04 |12 |100 | * |did02 |u03 |8 |190 | * |did02 |u02 |3 |190 | * |did01 |u01 |1 |200 | * +--------+-------+---+-----+ */ // 3.2 将同一个设备的账号评分整理到一起,选出guid,并转成json格式 // TODO // 保存结果 spark.close() }

 

最新回复(0)