注意:如果使用的是阿里云RocketMQ 需要配置以下信息
# 配置ak/sk canal.aliyun.accessKey = XXX canal.aliyun.secretKey = XXX # 配置topic canal.mq.accessChannel = cloud canal.mq.servers = 内网接入点 canal.mq.producerGroup = GID_**group(在后台创建) canal.mq.namespace = rocketmq实例id canal.mq.topic=(在后台创建)2.配置application.yml文件
rocketmq: name-server: 内网接入点 编写RocketMQ 消费者 @Service @RocketMQMessageListener(topic = "", consumerGroup = "",accessKey = "",secretKey = "" ) public class RocketCanalListener implements RocketMQListener<JSONObject> { public void onMessage(JSONObject jsonObject) { String sqlType = jsonObject.getString("type"); JSONArray data = jsonObject.getJSONArray("data"); System.out.println("表名为:" + jsonObject.getString("table") + ",sql类型为:" + sqlType); if ("UPDATE".equals(sqlType) || "INSERT".equals(sqlType)) { System.out.println(data); return; } if ("DELETE".equals(sqlType)) { System.out.println(data); } } }