spark以snappydata模式(内部表) 导入snappy数据库异常

tech2023-05-25  104

使用spark job导数之后,job显示执行成功,去dbeaver查询表但是显示Syntax error or analysis exception: Union can only be performed on tables with the compatible column types. 

Query execution failed 原因: SQL 错误 [20000] [42000]: (SQLState=42000 Severity=20000) (Server=rf-sd0003.rayfay.io/192.168.106.96[2527] Thread=ThriftProcessor-3982) Syntax error or analysis exception: Union can only be performed on tables with the compatible column types. TimestampType <> DecimalType(38,0) at the first column of the second table;; 'Union :- Project [id#13361, name#13362, creat_time#13363, upt#13364, money#13365, _upt#13366, _SID#13379, timestamptomill(_UPT#13366) AS __T_UPT_T#13407L] : +- Project [id#13361, name#13362, creat_time#13363, upt#13364, money#13365, _upt#13366, 0 AS _SID#13379] : +- SubqueryAlias app_db_tset_t_talbe_test_lq : +- Relation[id#13361,name#13362,creat_time#13363,upt#13364,money#13365,_upt#13366] RowFormatRelation[app_db_tset.app_db_tset_t_talbe_test_lq] +- Filter (1 = 1) +- Project [_upt#13387, money#13388, upt#13389, creat_time#13390, name#13391, id#13392, _SID#13399, timestamptomill(_UPT#13387) AS __T_UPT_T#13416L] +- Project [_upt#13387, money#13388, upt#13389, creat_time#13390, name#13391, id#13392, 1 AS _SID#13399] +- Relation[_upt#13387,money#13388,upt#13389,creat_time#13390,name#13391,id#13392] parquet <?xml version="1.0" encoding="UTF-8"?> <daas-model xmlns="http://daas-model.rayfay.cn/pipline/job/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://daas-model.rayfay.cn/pipline/job/ http://daas-model.rayfay.cn/pipline/job/daas-model.xsd"> <domain viural="true"> <sources> <source view="source_DF" provider="csv"> <options> <option key="path">file:///apps/daas/data/F_T_TALBE_TEST_LQ/7.csv</option> <option key="header">true</option> <option key="inferSchema">false</option> <option key="charset">UTF-8</option> <option key="delimiter">,</option> <option key="lastDelimiter">false</option> </options> </source> </sources> <process> <dataframe name="t_transform_DF" show="true"> select CAST(CURRENT_TIMESTAMP as TIMESTAMP) _UPT,CAST(MONEY as DECIMAL(10,2)) MONEY,CAST(CURRENT_TIMESTAMP as TIMESTAMP) UPT,CAST(CURRENT_TIMESTAMP as TIMESTAMP) CREAT_TIME,CAST(NAME as VARCHAR(100)) NAME,CAST(ID as DECIMAL(38,0)) ID from source_DF </dataframe> </process> <sinks> <sink provider="merge" dataframe="t_transform_DF"> <options> <option key="table">APP_DB_TSET.F_T_TALBE_TEST_LQ</option> <option key="import_mode">snappydata</option> </options> </sink> </sinks> </domain> </daas-model>

解决过程:

初步判断是字段对应不上导致的数据表字段混乱报的错

错误

把文件头和SQL调整为如下顺序:

[ID#13439,CREAT_TIME#13440,UPT#13441,MONEY#13442,NAME#13443]

 select CAST(ID as DECIMAL(38,0)) ID,CAST(NAME as VARCHAR(100)) NAME,CAST(CURRENT_TIMESTAMP as TIMESTAMP) CREAT_TIME,CAST(CURRENT_TIMESTAMP as TIMESTAMP) UPT,CAST(MONEY as DECIMAL(10,2)) MONEY,CAST(CURRENT_TIMESTAMP as TIMESTAMP) _UPT from source_DF

执行spark job发生如下错误:

{ "duration": "1.076 secs", "classPath": "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml", "startTime": "2020-09-03T15:09:25.914+08:00", "context": "a8bdc2b4-org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml", "result": { "message": "org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. DecimalType(38,0) <> TimestampType at the first column of the second table;;\n'Union\n:- Relation[_upt#13562,money#13563,upt#13564,creat_time#13565,name#13566,id#13567] parquet\n+- SubqueryAlias t_transform_df, `t_transform_df`\n +- Project [cast(id#13439 as decimal(38,0)) AS id#13449, cast(name#13443 as string) AS name#13450, cast(current_timestamp() as timestamp) AS creat_time#13451, cast(current_timestamp() as timestamp) AS upt#13452, cast(money#13442 as decimal(10,2)) AS money#13453, cast(current_timestamp() as timestamp) AS _upt#13454]\n +- SubqueryAlias source_df, `source_df`\n +- Relation[ID#13439,CREAT_TIME#13440,UPT#13441,MONEY#13442,NAME#13443] csv\n", "errorClass": "java.lang.RuntimeException", "stack": ["org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)", "org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$16$$anonfun$apply$17.apply(CheckAnalysis.scala:338)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$16$$anonfun$apply$17.apply(CheckAnalysis.scala:335)", "scala.collection.Iterator$class.foreach(Iterator.scala:893)", "scala.collection.AbstractIterator.foreach(Iterator.scala:1336)", "scala.collection.IterableLike$class.foreach(IterableLike.scala:72)", "scala.collection.AbstractIterable.foreach(Iterable.scala:54)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$16.apply(CheckAnalysis.scala:335)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$16.apply(CheckAnalysis.scala:324)", "scala.collection.immutable.List.foreach(List.scala:381)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:324)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76)", "org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76)", "org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)", "org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)", "org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)", "org.apache.spark.sql.Dataset.withSetOperator(Dataset.scala:2860)", "org.apache.spark.sql.Dataset.union(Dataset.scala:1635)", "org.apache.spark.sql.ray.merge.SnMergeSink.sink(SnMergeSink.scala:60)", "org.apache.spark.sql.ray.PiplineSink$class.sink(PiplineSink.scala:11)", "org.apache.spark.sql.ray.merge.SnMergeSink.sink(SnMergeSink.scala:12)", "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml$$anonfun$runSnappyJob$6.apply(PiplineStagesSubmitAsXml.scala:108)", "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml$$anonfun$runSnappyJob$6.apply(PiplineStagesSubmitAsXml.scala:100)", "scala.collection.Iterator$class.foreach(Iterator.scala:893)", "scala.collection.AbstractIterator.foreach(Iterator.scala:1336)", "scala.collection.IterableLike$class.foreach(IterableLike.scala:72)", "scala.collection.AbstractIterable.foreach(Iterable.scala:54)", "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml$.runSnappyJob(PiplineStagesSubmitAsXml.scala:100)", "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml$.runSnappyJob(PiplineStagesSubmitAsXml.scala:19)", "org.apache.spark.sql.SnappySQLJob$class.runJob(SnappySessionFactory.scala:128)", "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml$.runJob(PiplineStagesSubmitAsXml.scala:19)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:351)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)", "java.lang.Thread.run(Thread.java:748)"] }, "status": "ERROR", "jobId": "b9f1e969-b14b-444b-8610-571ff46eca73" }

又显示字段匹配不上了, 显然以上思路方法是不对的。

正确

使用snappydata导入的是进入内存表的,hdfs导入的是通过建表时关联的parquet文件来导入的,由于之前的parquet文件夹里有旧文件,顺序为:[_upt#13387,money#13388,upt#13389,creat_time#13390,name#13391,id#13392], 然而建表时的字段顺序为[id#13361, name#13362, creat_time#13363, upt#13364, money#13365, _upt#13366, 0 AS _SID#13379],匹配不上字段顺序,所以报错。

 INSERT INTO RAYDM.T_RAYDM_TABLE_JOB_META VALUES('APP_DB_TSET.T_TALBE_TEST_IMPORT_LQ','hdfs://192.168.106.31:8020/data/snappydata/test/APP_DB_TSET/F_T_TALBE_TEST_IMPORT_LQ/merged/TOTAL_MERGED',0 );

解决方法:

删除之前的表,重新建表,使用新的parquet路径,问题解决。

注意:SQL中字段顺序要和建表字段顺序一致,导入文件不必一致。  

 

 

最新回复(0)