Drools这个规则引擎其实不怎么好用,但是总比if-else强,虽然用着憋屈,但需要的时候也是没办法。
在网上找过很多文章和视频,要不就是实用性不高的Demo,要不就是粗略归纳,看的云里雾里的水文。我自己也是被这个东西恶心到了,所以完成之后决定写一个实用的Demo,希望可以帮助到需要的人。
其实Drools的使用方式我觉得有3种:
基于Web页面的WorkBench优点:web页面,操作简单,可动态配置修改规则文件,不影响项目的运行
缺点:web页面卡顿。。。(反正在我的电脑上这个页面打不开)
将规则文件放在本地resourse文件夹优点:开发简单,网上大部分就是这样的教程
缺点:没有实用性,打包运行在服务器不能动态修改规则,跟if-else没什么区别
将规则文件放在MySQL数据库优点:可动态修改配置文件,规则文件易于管理(推荐)
缺点:可能对于流处理需要频繁读数据库
本次我主要基于 批处理和MySQL管理规则文件
1. 数据准备:
{"uid": 1001,"name": "zhangsan","salary": 2500} {"uid": 1002,"name": "lisi","salary": 5000} {"uid": 1003,"name": "wangwu","salary": 12000} {"uid": 1004,"name": "zhaoliu","salary": 23000}2.基于数据编写Java实体类(此处不用Scala的CaseClass的原因是,规则文件的getset方法,CaseClass是没有的,而且实体类可以放其中的几个属性的值,但CaseClass必须全放)
package com.myteam.myproject; public class Loans implements java.io.Serializable { static final long serialVersionUID = 1L; private int uid; private java.lang.String name; private int salary; private java.lang.String describe; public Loans() { } public int getUid() { return uid; } public void setUid(int uid) { this.uid = uid; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getSalary() { return salary; } public void setSalary(int salary) { this.salary = salary; } public String getDescribe() { return describe; } public void setDescribe(String describe) { this.describe = describe; } public Loans(int uid, String name, int salary, String describe) { this.uid = uid; this.name = name; this.salary = salary; this.describe = describe; } }3. 按照实体类编写规则文件loans.drl:
package com.myteam.myproject; import com.myteam.myproject.Loans; rule "rule_1" when $loans:Loans(salary < 3000) then $loans.setDescribe("不贷"); end rule "rule_2" when $loans:Loans(salary >= 3000 && salary <10000) then $loans.setDescribe("贷5000"); end rule "rule_3" when $loans:Loans(salary >= 10000 && salary < 30000) then $loans.setDescribe("贷20000"); end rule "rule_4" when $loans:Loans(salary > 30000) then $loans.setDescribe("贷30000"); end4.在数据库中建rules表
CREATE TABLE IF NOT EXISTS `rules`( `id` INT UNSIGNED AUTO_INCREMENT, `rule` text, PRIMARY KEY ( `id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8;5.将规则文件通过JDBC放入MySQL,以及从数据库获取规则
import java.io.*; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class DRL { public static void main(String[] args) throws IOException { create(); } public static void create() throws IOException { Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { // 2.建立连接 conn = JdbcUtil.getConnection(); // 3.创建语句 String sql = "insert into rules(rule) values(?)"; ps = conn.prepareStatement(sql); //将该目录下的文件内容写到数据库的my_clob_test表中 File file = new File("C:\\Users\\yxf19\\Desktop\\loans.drl"); Reader reader = new BufferedReader(new FileReader(file)); //将“?”代替成数据流 ps.setCharacterStream(1,reader,file.length()); // 4.执行语句 ps.executeUpdate(); reader.close(); } catch (SQLException e) { e.printStackTrace(); } finally { JdbcUtil.free(rs, ps, conn); // 关闭资源 } } public static String getDrl(String id) { Connection connection = JdbcUtil.getConnection(); ResultSet set = null; PreparedStatement statement = null; StringBuilder sb = new StringBuilder(); try { String sql = "SELECT rule FROM rules where id ="+id; statement = connection.prepareStatement(sql); set = statement.executeQuery(); while (set.next()) { Reader reader = set.getCharacterStream(1); //创建缓存区 char[] buff = new char[1024]; //读写数据方式1 int len = 0; while ((len = reader.read(buff)) > 0) { sb.append(buff,0,len); } reader.close(); } } catch (SQLException throwables) { throwables.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { JdbcUtil.free(set, statement, connection); } return sb.toString(); } }6.此时可看到数据库已有数据:(请忽略第二条。。。。)
7.pom文件,开始开发(开始用的是7的版本,后来在本地跑没问题,但是放到集群上初始化就空指针,所以还是用6吧)
<dependencies> <dependency> <groupId>org.kie</groupId> <artifactId>kie-api</artifactId> <version>6.5.0.Final</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-core</artifactId> <version>6.5.0.Final</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-compiler</artifactId> <version>6.5.0.Final</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-decisiontables</artifactId> <version>6.5.0.Final</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-templates</artifactId> <version>6.5.0.Final</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> </dependency> </dependencies>8.获取KieBase/KieSession连接
import org.kie.api.KieBase; import org.kie.api.KieServices; import org.kie.api.builder.KieBuilder; import org.kie.api.builder.KieFileSystem; import org.kie.api.builder.Results; import org.kie.api.runtime.KieContainer; public class DroolsSession { /** * @param rules 从数据库读出来的规则字符串 * @return 获取KieBase而不是KieSession的原因是KieSession是基于每条数据的,可以从KieBase里 * 面取KieSession提高速率 */ public static KieBase getKBase(String rules) { KieServices kieServices = KieServices.Factory.get(); KieFileSystem kfs = kieServices.newKieFileSystem(); //这里创建的是内存文件路径,不用真实存在 kfs.write("src/main/resources/rules/rules.drl", rules.getBytes()); KieBuilder kieBuilder = kieServices.newKieBuilder(kfs).buildAll(); Results results = kieBuilder.getResults(); if (results.hasMessages(org.kie.api.builder.Message.Level.ERROR)) { System.out.println(results.getMessages()); throw new IllegalStateException("### errors ###"); } KieContainer kieContainer = kieServices.newKieContainer(kieServices.getRepository().getDefaultReleaseId()); return kieContainer.getKieBase(); } }9.用SparkSQL对接Drools处理海量数据
object SparkWithDrools { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("SparkWithDrools") .master("local[*]") .getOrCreate() val ds: DataSet[Loans] = spark.read.json("in\\test.json") .selectExpr("cast(uid as int)","name", "cast(salary as int)","null as describe") .as(Encoders.bean(classOf[Loans])) val rules = DRL.getDrl(args(0)) val kieBase = DroolsSession.getKBase(rules) //广播出去,在集群这样做可能会遇到DroolsSession类的序列化问题 spark.sparkContext.broadcast(kieBase) val result = ds.rdd.map(loans=>{ val session = kieBase.newKieSession() session.insert(loans) session.fireAllRules() session.dispose() //一定要将对象返回 loans }) /*如果不想遇到序列化问题可以这样: val result = ds.rdd.mapPartitions(iterator=>{ val kBase = DroolsSession.getKBase(rules) val loanses = iterator.map(loans => { val session = kBase.newKieSession() session.insert(loans) session.fireAllRules() session.dispose() loans }) loanses }) */ spark.createDataFrame(result, classOf[Loans]).show() spark.close() } }完结。
