Datax从入门到精通02--用例介绍(MySQL2HIVE)

tech2022-07-15  144

文章目录

一、前置说明二、使用步骤1.创建Hive目标分区表2.编写同步任务的json文件3. 运行4. 空值处理5. 集成到平台处理 总结


一、前置说明

Hive 表需要提前创建好( 以下以ORC示例)MySQL的建表要符合规范必须要有主键便于datax抽数的切分

二、使用步骤

1.创建Hive目标分区表

# 1. 创建表 CREATE TABLE `ods.ods_tan_df`( `id` bigint COMMENT '主键', `reward_pool_id` bigint COMMENT '奖励规则,t_reward_pool表id', `pay_amount` string COMMENT '订单金额', `reward_desc` string COMMENT '奖励原因描述', `create_time` string COMMENT '创建时间', `update_time` string COMMENT '更新时间') COMMENT '1' PARTITIONED BY ( `pt` string COMMENT 'null ') ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs://nameservice1/user/hive/warehouse/ods.db/ods_tan_df' TBLPROPERTIES ( 'transient_lastDdlTime'='1598700294') # 2. 创建分区 hive> alter table ods.ods_tan_df add partition (pt = '2020-09-02');

2.编写同步任务的json文件

代码如下(示例):

mysql2hive.json

{ "job":{ "content":[ { "reader":{ "parameter":{ "password":"xxxxxxxxxxxxxxx", "column":[ "`id`", "`reward_pool_id`", "`pay_amount`", "`reward_desc`", "`create_time`", "`update_time`" ], "connection":[ { "jdbcUrl":[ "jdbc:mysql://xxxx:3306/xxxx?tinyInt1isBit=false" ], "table":[ "xxxxxx" ] } ], "where":"", "splitPk":"id", "username":"xxxxx" }, "name":"mysqlreader" }, "transformer":[], "writer":{ "parameter":{ "path":"/user/hive/warehouse/ods.db/ods_xxx_df/pt=2020-08-28", "fileName":"ods_tan_df", "haveKerberos":true, "kerberosKeytabFilePath":"/xxx/zm_app_prd.keytab", "compress":"SNAPPY", "column":[ { "name":"id", "type":"bigint" }, { "name":"reward_pool_id", "type":"bigint" }, { "name":"pay_amount", "type":"string" }, { "name":"reward_desc", "type":"string" }, { "name":"create_time", "type":"string" }, { "name":"update_time", "type":"string" } ], "defaultFS":"hdfs://nameservice1", "writeMode":"append", "fieldDelimiter":"\u0001", "fileType":"orc", "kerberosPrincipal":"zm_app_prd@FAYSON.COM" }, "name":"hdfswriter" } } ], "setting":{ "errorLimit":{ "record":0 }, "speed":{ "channel":1 } } } }

具体的参数如果不明白建议多看看官网,我在下一节介绍如何集成Kerberos认证。

3. 运行

代码如下(示例):

python /xx/program/datax/bin/datax.py mysql2hive.json 看到如下信息即表示运行完成: 任务启动时刻 : 2020-08-31 18:05:49 任务结束时刻 : 2020-08-31 18:23:36 任务总计耗时 : 1067s 任务平均流量 : 9.05MB/s 记录写入速度 : 74424rec/s 读出记录总数 : 78889453 读写失败总数 : 0

4. 空值处理

MySQL Null 抽取到Hive的时候会变成字符串的NULL,以下附上解决办法。

源码修改位置:datax/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java

/** * 写orcfile类型文件 * @param lineReceiver * @param config * @param fileName * @param taskPluginCollector */ public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName, TaskPluginCollector taskPluginCollector){ List<Configuration> columns = config.getListConfiguration(Key.COLUMN); String compress = config.getString(Key.COMPRESS, null); // 加入nullFormat设置(对于HDFS的空为\\N,我们可以在writer插件中设置此值,如果没有则为空) String nullFormat = config.getString(Key.NULL_FORMAT, null); List<String> columnNames = getColumnNames(columns); List<ObjectInspector> columnTypeInspectors = getColumnTypeInspectors(columns); StructObjectInspector inspector = (StructObjectInspector)ObjectInspectorFactory .getStandardStructObjectInspector(columnNames, columnTypeInspectors); OrcSerde orcSerde = new OrcSerde(); FileOutputFormat outFormat = new OrcOutputFormat(); if(!"NONE".equalsIgnoreCase(compress) && null != compress ) { Class<? extends CompressionCodec> codecClass = getCompressCodec(compress); if (null != codecClass) { outFormat.setOutputCompressorClass(conf, codecClass); } } try { RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL); Record record = null; while ((record = lineReceiver.getFromReader()) != null) { // transportOneRecord的时候将该设置传入 MutablePair<List<Object>, Boolean> transportResult = transportOneRecord(record,columns,taskPluginCollector, nullFormat); if (!transportResult.getRight()) { writer.write(NullWritable.get(), orcSerde.serialize(transportResult.getLeft(), inspector)); } } writer.close(Reporter.NULL); } catch (Exception e) { String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName); LOG.error(message); Path path = new Path(fileName); deleteDir(path.getParent()); throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); } } .... public static MutablePair<List<Object>, Boolean> transportOneRecord( Record record,List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector, String nullFormat){ MutablePair<List<Object>, Boolean> transportResult = new MutablePair<List<Object>, Boolean>(); transportResult.setRight(false); List<Object> recordList = Lists.newArrayList(); int recordLength = record.getColumnNumber(); if (0 != recordLength) { Column column; for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); //todo as method if (null != column.getRawData()) { String rowData = column.getRawData().toString(); SupportHiveDataType columnType = SupportHiveDataType.valueOf( columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase()); //根据writer端类型配置做类型转换 try { switch (columnType) { case TINYINT: recordList.add(Byte.valueOf(rowData)); break; case SMALLINT: recordList.add(Short.valueOf(rowData)); break; case INT: recordList.add(Integer.valueOf(rowData)); break; case BIGINT: recordList.add(column.asLong()); break; case FLOAT: recordList.add(Float.valueOf(rowData)); break; case DOUBLE: recordList.add(column.asDouble()); break; case STRING: case VARCHAR: case CHAR: recordList.add(column.asString()); break; case BOOLEAN: recordList.add(column.asBoolean()); break; case DATE: recordList.add(new java.sql.Date(column.asDate().getTime())); break; case TIMESTAMP: recordList.add(new java.sql.Timestamp(column.asDate().getTime())); break; default: throw DataXException .asDataXException( HdfsWriterErrorCode.ILLEGAL_VALUE, String.format( "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d]. 请修改表中该字段的类型或者不同步该字段.", columnsConfiguration.get(i).getString(Key.NAME), columnsConfiguration.get(i).getString(Key.TYPE))); } } catch (Exception e) { // warn: 此处认为脏数据 String message = String.format( "字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].", columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData().toString()); taskPluginCollector.collectDirtyRecord(record, message); transportResult.setRight(true); break; } }else { // 在这里传入我们设置的值 recordList.add(nullFormat); } } }

在这里我踩过坑,网上也有相应的解决办法,相信多看看总能找到解决办法。

5. 集成到平台处理

现今datax任务的调度有几种:

Crontab定时任务使用DataxWeb解决方案 参考DATAX-WEB自研DataX调度平台(我们使用此种)既然是平台我们要使得任务的运行幂等,支持增量、全量抽取、支持失败任务重试等。

总结

本节我们进行了简要的Datax 最常用的功能MySQL抽数到Hive的演示,附上了MySQL NULL抽取到Hive变成NULL的问题解决方案。从平台集成的角度将datax组件化,支持生产的任务运作,如果你有更好的思路欢迎交流。

最新回复(0)