数据分析——Kettle自定义Step插件编写

tech2022-09-19  59

前言:公司业务需要开发Kettle的自定义Step插件,在查找资料的过程中发现网上关于Kettle的资料比较少,有的资料也比较简洁,因此记录一下自己demo插件的详细开发过程。

插件功能:用户输入需要替换的字符和替换后的字符以及需要替换的列号,插件进行自动替换,效果图如下

 

开发环境:

开发工具:IntelliJ IDEA 2020.1

开发环境:JDK1.8、Maven-3.6.3

Kettle版本:8.3.0.0-371

Kettle源代码:点击进入

插件示例代码:下载地址

官方文档:点击进入

注:本文将根据官方提供的示例代码编写Step插件。在打包插件时,如果出现Test报错的情况,则注释掉即可。

(1).下载并编译示例代码

1.下载示例代码

2.下载完成后将Step的示例代码解压出来,并使用idea打开

3.使用idea打开后,maven会去下载pom.xml下面的相关依赖,此时,我这里报错

Could not find artifact pentaho-kettle:kettle-sdk-plugin-parent:pom:8.3.0.0-371 in alimaven (https://maven.aliyun.com/repository/central)

原因是在阿里云的central仓库下没有pentaho-kettle,因此我们在pom.xml配置一下项目的仓库url即可

<build> ... </build> <repositories> <repository> <id>pentaho-public</id> <name>Pentaho Public</name> <url>http://nexus.pentaho.org/content/groups/omni</url> </repository> </repositories>

4.等

这个步骤会持续较长时间,可以尝试自己换一个仓库或者通过不可描述的方法加快下载速度,但是由于需要下载的文件较大,所以还是需要耐心等待。

5.编译并打包示例插件

点击右侧Maven标签,然后选择package进行打包

完成后,在target目录下面会出现jar包和一个压缩文件,其中,jar包为插件,zip包中会包含jar包和一些资源文件(如果用到的话)

(2).添加并使用示例插件

1.将上述zip文件复制,并粘贴到Kettle的plugins目录下面,然后解压文件

2.打开Spoon,新建一个转换

3.在核心对象处搜索demo,如果在转换中看到了Demo Step,则插件添加成功

4.将插件拖入转换中,双击即可看到插件的界面,插件的具体功能请看官网的开发者中心,在这里就先不讲了

(3).开发前了解

建议大家还是先看一遍官方文档,在这里我只挑部分展示。

以下是我们至少要实现的:

PS.官方的示例代码中有英文注释,下文中会删除部分不重要的代码与注释

(4).插件编写——UI (DemoStepDialog.java)

在开始前,我们必须要了解一些方法:

DemoStepDialog.java(插件的对话框):

           open():在用户打开插件Dialog时被Spoon调用,仅当用户关闭对话框时返回,此方法必须在用户确认时返回这个步骤的名字,或者在用户取消时返回null。其中,changed标记必须反映对话框是否更改了步骤配置,用户取消时,标志位不能改变。

           populateDialog():在打开插件Dialog前进行数据填充,填充后显示Dialog,一般在shell.open()之前被open()调用。

           cancel()、ok():在点击"取消"或"确定"按钮后被调用,需要在open()中绑定控件的监听器。

1.需要的控件

我们的插件只需要一个LabelText来输入列号,一个TableView来输入替换前后的文本即可。实例插件中已经有一个wHelloFieldName的LabelText,因此我们只要再加一个TableView即可。(在代码中我将wHelloFieldName改为了changeColLabelText)

2.修改open()方法

将此类中wHelloFieldName全都修改为changeColLabelText后,为类增加一个变量:private TableView changeTableView,然后在open()方法中的changeColLabelText与wOK、wCancel中间加入以下代码来添加表格控件:

final int FieldsCols = 2; final int FieldsRows = 10; Control lastControl = changeColLabelText; ColumnInfo[] colinf; colinf = new ColumnInfo[FieldsCols]; colinf[0] = new ColumnInfo("替换前的字符", ColumnInfo.COLUMN_TYPE_TEXT, new String[]{""}, false); colinf[1] = new ColumnInfo("替换后的字符", ColumnInfo.COLUMN_TYPE_TEXT, new String[]{""}, false); changeTableView = new TableView(transMeta, shell, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, colinf, FieldsRows, lsMod, props); FormData fd = new FormData(); fd.left = new FormAttachment(0, 0); fd.top = new FormAttachment(lastControl, margin); fd.right = new FormAttachment(100, 0); fd.bottom = new FormAttachment(100, -50); changeTableView.setLayoutData(fd); lastControl = changeTableView;

此时,插件就变成了这个样子:

好像有点不对劲,不过没关系,让我们修改一下细节

//需要修改的地方 changeColLabelText = new LabelText(shell, "需要替换的列号", null); props.setLook(changeColLabelText); changeColLabelText.addModifyListener(lsMod); FormData fdValName = new FormData(); fdValName.left = new FormAttachment(0, 0); fdValName.right = new FormAttachment(100, 0); fdValName.top = new FormAttachment(wStepname, margin); changeColLabelText.setLayoutData(fdValName); wOK = new Button(shell, SWT.PUSH); wOK.setText(BaseMessages.getString(PKG, "System.Button.OK")); wCancel = new Button(shell, SWT.PUSH); wCancel.setText(BaseMessages.getString(PKG, "System.Button.Cancel")); //需要修改的地方 setButtonPositions(new Button[]{wOK, wCancel}, margin, lastControl);

此时,插件看起来就正常了,不过大家可以根据自己的需要自行修改

(5).插件编写——数据存储与绑定 (DemoStepDialog.java DemoStepMeta.java)

完成上面一步后,我们会发现,changeTableView中的文字在改变了以后,下次打开插件还是显示空,而changeColLabelText却不会出现此情况,那是因为changeColLabelText已经完成数据的存储与绑定。下面将介绍如何进行数据存储和绑定。

同样的,在开始前,我们必须要了解一些方法:

DemoStepMeta.java(插件元,可以将插件的变量保存在这个类):

           setDefault():每次创建Step时会被Spoon调用,一般在这里设置一些需要初始化的值。

           getter、setter:读、写插件变量的一些方法。

           clone():拷贝Step,必须在里面实现对变量的深拷贝

           getXML():在Step需要将其配置(一般指变量)序列化为XML时被Spoon调用。

           loadXML(...):在Step需要从XML加载其配置时,PDI会调用这个方法。

           saveRep(...):当Step需要将其配置(一般指变量)保存到存储库时被Spoon调用。

           readRep(...):在Step需要从存储库读取其配置时,PDI会调用这个方法。

           getFields(...):在Step对行流所做任何更改时,必须调用此方法。如果想在下一个步骤获取这个步骤新增的字段名称,必须在这里新增字段。

           check(...):在用户选择"Verify Transformation"时,Spoon将调用此方法

1.在插件元中添加变量

删去outputField,增加changeCol和changeStr,同时生成一下Getter和Setter。

//删去outputField //@Injection(name = "OUTPUT_FIELD") //private String outputField; //增加changeCol来存储需要替换的列号 private String changeCol; //增加changeStr来存储替换前后的字符 private Map<String, String> changeStr; public String getChangeCol() { return changeCol; } public void setChangeCol(String changeCol) { this.changeCol = changeCol; } public Map<String, String> getChangeStr() { return changeStr; } public void setChangeStr(Map<String, String> changeStr) { this.changeStr = changeStr; }

2. 设置新建Step时初始化的值

public void setDefault() { changeStr = new HashMap<>(); changeStr.put("$", "dollar"); changeStr.put("#", "sharp"); setChangeStr(changeStr); setChangeCol("1,2,3"); }

3.完成变量在配置中的读写

由于此处涉及到序列化与反序列化,我使用了FastJson,大家自行配置一下pom.xml,同时将fastjson的jar包放入Kettle的lib目录中。

public String getXML() { StringBuilder xml = new StringBuilder(); String obj = JSONObject.toJSONString(changeStr); xml.append(XMLHandler.addTagValue("changeStr", obj)); xml.append(XMLHandler.addTagValue("changeCol", changeCol)); return xml.toString(); } public void loadXML(Node stepnode, List<DatabaseMeta> databases, IMetaStore metaStore) throws KettleXMLException { try { String obj = XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "changeStr")); setChangeStr(JSONObject.parseObject(obj, HashMap.class)); setChangeCol(XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "changeCol"))); } catch (Exception e) { throw new KettleXMLException("Demo plugin unable to read step info from XML node", e); } }

4.修改Dialog中的方法

private void populateDialog() { wStepname.selectAll(); Map<String, String> changeStr = meta.getChangeStr(); Iterator iter = changeStr.entrySet().iterator(); int row = 0; while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); Object key = entry.getKey(); Object val = entry.getValue(); changeTableView.setText(key.toString(), 1, row); changeTableView.setText(val.toString(), 2, row++); } changeColLabelText.setText(meta.getChangeCol()); } private void ok() { stepname = wStepname.getText(); int count = changeTableView.nrNonEmpty(); Map<String, String> tableData = new HashMap<>(); for (int i = 0; i < count; i++) { TableItem item = changeTableView.getNonEmpty(i); tableData.put(item.getText(1), item.getText(2)); } meta.setChangeStr(tableData); meta.setChangeCol(changeColLabelText.getText()); dispose(); }

此时,重新创建一个Demo Step插件,它就会有初始值,同时改变以后也可以保存了。

(5).插件编写——业务逻辑实现 (DemoStep.java)

同样的,在开始前,我们必须要了解一些类与方法:

DemoStep.java(插件元,可以将插件的变量保存在这个类):

           init(...):初始化方法,可以建立数据库链接、获取文件句柄等操作,会被PDI调用。

           processRow(...):读取行的业务逻辑,会被PDI调用,当此方法返回false时,完成行读取。

           dispose():析构函数,用来释放资源,会被PDI调用​。​​​​​​

1.添加变量

private List<String> changeColList; private Map<String, String> changeStr;

2.初始化

由于输入的时String类型的列号,以","分割,因此我们在初始化时需要处理一下,并保存到changeColList中

public boolean init(StepMetaInterface smi, StepDataInterface sdi) { DemoStepMeta meta = (DemoStepMeta) smi; DemoStepData data = (DemoStepData) sdi; if (!super.init(meta, data)) { return false; } String changeColNamesStr = meta.getChangeCol(); changeColList = new ArrayList<>(); while (StringUtils.isNotBlank(changeColNamesStr)) { int i; String str; if ((i = changeColNamesStr.indexOf(",")) > 0) { str = changeColNamesStr.substring(0, i).trim(); if (StringUtils.isNotBlank(str)) { changeColList.add(str); } changeColNamesStr = changeColNamesStr.substring(i + 1); } else { str = changeColNamesStr.trim(); if (StringUtils.isNotBlank(str)) { changeColList.add(str); } break; } } changeStr = meta.getChangeStr(); return true; }

之后只需要在processRow(...)中实现具体的业务逻辑即可

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { DemoStepMeta meta = (DemoStepMeta) smi; DemoStepData data = (DemoStepData) sdi; //从输入流中读取一行 Object[] r = getRow(); //若读不到下一行,则读写完成,调用setOutputDone(),return false if (r == null) { setOutputDone(); return false; } if (first) { first = false; //如果是第一行则保存数据行元信息到data类中,后续使用 data.outputRowMeta = (RowMetaInterface) getInputRowMeta().clone(); } //字符替换的业务逻辑 for (int i = 0; i < r.length && r[i] != null; i++) { String str = data.outputRowMeta.getString(r, i); if (changeColList.indexOf((i + 1) + "") >= 0) { Iterator iter = changeStr.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); Object before = entry.getKey(); Object after = entry.getValue(); str = str.replace(String.valueOf(before), String.valueOf(after)); } //很重要,必须用getBytes() r[i] = str.getBytes(); } } //将行放入输出行流 putRow(data.outputRowMeta, r); //如有需要,可以进行日志记录 if (checkFeedback(getLinesRead())) { logBasic(BaseMessages.getString(PKG, "DemoStep.Linenr", getLinesRead())); } //返回true则表示还应继续使用processRow()读取下一行 return true; }

在这里有个大坑,输出String类型必须要用getBytes()方法,之后我应该会写一篇博客介绍我遇到的这个坑。

至此,我们的插件编写完成。

(6).打包测试

把我们写好的插件打包,设置一下我们的字段,跑一下,就能出我们想要的结果

 

当然,我们也可以在调用populateDialog()之后添加如下代码来让TableView变得稍微好看那么一点点

changeTableView.removeEmptyRows(); changeTableView.setRowNums(); changeTableView.optWidth(true);

效果如下

至此,简单的Kettle自定义Step插件编写完成。

由于本人也是自己看源代码、文档和注释摸索的,同时英语也不是很好,因此有错误的话也拜托大家多多包涵,谢谢。

代码下载地址:https://download.csdn.net/download/xhy1999/12805227

GitHub地址:https://github.com/xhy1999/kettle-sdk-step-plugin-demo

最新回复(0)