DataX从入门到精通04—自定义transformer

tech2022-10-28  242

文章目录

前言一、Transformer是什么?二、使用步骤1. 开发流程2.示例 总结


前言


一、Transformer是什么?

在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。

更多详情参考官方介绍: 官方介绍

二、使用步骤

1. 开发流程

    1.从 Github 上 clone DataX 项目源码到本地,在根目录下找到 transformer 文件夹;     2.在 com.alibaba.datax.transport.transformer 路径下找到transformer, 继承 Transformer 并参考已有的transformer类实现接口,按你的需求接收参数,用于从 job 配置文件接收命令;     3.在 core\src\main\java\com\alibaba\datax\core\transport\transformer 目录的 TransformerRegistry 类中注册你编写的 transformer 类     4. mvn打包编译,调试 mvn clean package -DskipTests assembly:assembly

2.示例

代码如下(示例):

以下以非对称加密做个演示,继承Transformer 编写自己的处理类 datax/transformer/src/main/java/com/alibaba/datax/transformer/AESTransformer.java public class AESTransformer extends Transformer { private static final Logger LOG = LoggerFactory.getLogger(AESTransformer.class); public static final String ENCRYPT_KEY = "种子key"; int columnIndex; public AESTransformer() { setTransformerName("dx_aes"); LOG.info("Using AES preserve masker"); } @Override public Record evaluate(Record record, Object... paras) { try { if (paras.length < 1) { throw new RuntimeException("dx_aes transformer缺少参数"); } columnIndex = (Integer) paras[0]; } catch (Exception e) { throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage()); } Column column = record.getColumn(columnIndex); try { String oriValue = column.asString(); if (oriValue == null) { return record; } if(column.getType() == Column.Type.STRING) { EncryptUtil encryptUtil = EncryptUtil.getInstance(); String newValue = encryptUtil.AESencode(oriValue, ENCRYPT_KEY); record.setColumn(columnIndex, new StringColumn(newValue)); } } catch (Exception e) { throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e); } return record; } } 注册 (节选部分源码) /** * no comments. * Created by liqiang on 16/3/3. */ public class TransformerRegistry { private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class); private static Map<String, TransformerInfo> registedTransformer = new HashMap<String, TransformerInfo>(); static { /** * add native transformer * local storage and from server will be delay load. */ registTransformer(new SubstrTransformer()); registTransformer(new PadTransformer()); registTransformer(new ReplaceTransformer()); registTransformer(new FilterTransformer()); registTransformer(new GroovyTransformer()); registTransformer(new AESTransformer()); // 注册自己的类 } 重新打包部署使用transformer示例JSON { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "xxxx", "password": "xxxx", "column" : [ "id" ,"les_id" ,"grade_id" ,"edition_id" ,"subject_id" ,"course_system_first_id" ,"course_system_second_id" ,"course_system_third_id" ,"course_system_four_id" ,"custom_points" ,"deleted" ,"created_at" ,"tea_id" ,"stu_id" ,"les_uid" ,"updated_at" ,"pt" ], "connection": [ { "jdbcUrl": ["jdbc:mysql://xxxx:3306/test?useUnicode=true&characterEncoding=utf8"], "table": ["xxx"] } ] } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ {"name":"id" , "type":"int"}, {"name":"les_id" , "type":"int"}, {"name":"grade_id" , "type":"int"}, {"name":"edition_id", "type":"int"}, {"name":"subject_id", "type":"int"}, {"name":"course_system_first_id" , "type":"int"}, {"name":"course_system_second_id", "type":"int"}, {"name":"course_system_third_id" , "type":"int"}, {"name":"course_system_four_id" , "type":"int"}, {"name":"custom_points", "type":"string"}, {"name":"deleted" ,"type":"TINYINT"}, {"name":"created_at" ,"type":"string"}, {"name":"tea_id" ,"type":"int"}, {"name":"stu_id", "type":"int"}, {"name":"les_uid" ,"type":"string"}, {"name":"updated_at" ,"type":"string"} ], "defaultFS": "hdfs://nameservice1", "hadoopConfig":{ "dfs.nameservices": "nameservice1", "dfs.ha.namenodes.nameservice1": "namenode286,namenode36", "dfs.namenode.rpc-address.nameservice1.namenode286": "xxxx:8020", "dfs.namenode.rpc-address.nameservice1.namenode36": "xxxx:8020", "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }, "haveKerberos": "true", "kerberosKeytabFilePath": "/home/xx/kerberos/xxx.keytab", "kerberosPrincipal":"xxx@FAYSON.COM", "encoding": "UTF-8", "fileType": "orc", "fileName": "xxx", "path": "/user/hive/warehouse/ods.db/xxxxx/pt=2020-01-20", "writeMode": "append", // append & overwrite "fieldDelimiter" :"\u0001" } }, // 加密控制对应的字段索引号 "transformer": [ { "name": "dx_aes", "parameter": { "columnIndex":9, "paras":[""] } }, { "name": "dx_aes", "parameter": { "columnIndex":11, "paras":[""] } } ] } ], // 优化相关,暂时给默认值 "setting": { "speed": { "channel": "5" }, "errorLimit": { "record": 0 } } } }

总结

本文作为示例主要讲解了DataX作为ETL工具中的T的转换部分,他能做的事情有很多比如脏数据清理,UDF转换、数据的加密解密处理等,在实际使用过程中还是有很多的用处的。后面的章节中将从源码的角度解开其“神秘”面纱,如果你有更好的想法欢迎和我一同分享。

最新回复(0)