数据同步方案canal客户端连接服务器之RocketMQ异步模式

tech2023-11-12  83

准备RocketMQ环境

Windows环境下安装RocketMQ请参https://www.cnblogs.com/linjiqin/p/9553663.html

Canal配置更改

修改 example/instance.properties canal.mq.topic=canal-topic修改 canal.properties canal.serverMode = RocketMQ canal.mq.servers = 127.0.0.1:9876

注意:如果使用的是阿里云RocketMQ 需要配置以下信息

# 配置ak/sk canal.aliyun.accessKey = XXX canal.aliyun.secretKey = XXX # 配置topic canal.mq.accessChannel = cloud canal.mq.servers = 内网接入点 canal.mq.producerGroup = GID_**group(在后台创建) canal.mq.namespace = rocketmq实例id canal.mq.topic=(在后台创建)

SpringBoot项目整合RocketMQ

添加maven依赖 <!-- springBoot集成rocketmq --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>

2.配置application.yml文件

rocketmq: name-server: 内网接入点 编写RocketMQ 消费者 @Service @RocketMQMessageListener(topic = "", consumerGroup = "",accessKey = "",secretKey = "" ) public class RocketCanalListener implements RocketMQListener<JSONObject> { public void onMessage(JSONObject jsonObject) { String sqlType = jsonObject.getString("type"); JSONArray data = jsonObject.getJSONArray("data"); System.out.println("表名为:" + jsonObject.getString("table") + ",sql类型为:" + sqlType); if ("UPDATE".equals(sqlType) || "INSERT".equals(sqlType)) { System.out.println(data); return; } if ("DELETE".equals(sqlType)) { System.out.println(data); } } }
最新回复(0)