记录一次java开发基于mqtt的数据推送

tech2022-09-23  65

前言

接到客户需求,需要把生产工艺数据推送到第三方展示,查阅相关博客后(面向百度编程)了解了大致流程,需要我写个客户端发布订阅消息,所有下面代码做了发布相关;

正文

项目结构分布

下面开始贴代码 依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--方便调试可以不要--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <!--mqtt--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>cn.guoyukun.jdbc</groupId> <artifactId>oracle-ojdbc6</artifactId> <version>11.2.0.3.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> @Component @ConfigurationProperties(prefix = "com.mqtt") @Data public class MqttConfiguration { private String host; private String clientid; private String topic; private String username; private String password; private int timeout; private int keepalive; }

对应配置

com: mqtt: host: tcp://ip:port clientid: 这个自己随意,单个客户端不用注意啥,多个需要保持唯一,不然一个启动会导致另一个断开 topic: 主题 这个一般协议文档会指定 username: 同上 password: 同上 timeout: 30 同上 keepalive: 60 同上

SpringUtil 用于非spring环境bean的注入

@Component public class SpringUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if(SpringUtil.applicationContext == null){ SpringUtil.applicationContext = applicationContext; } } public static ApplicationContext getApplicationContext() { return applicationContext; } /** * 根据name获取bean * @param name * @return */ public static Object getBean(String name){ return getApplicationContext().getBean(name); } /** * 根据class获取bean * @param clazz * @param <T> * @return */ public static <T> T getBean(Class<T> clazz){ return getApplicationContext().getBean(clazz); } /** * 根据name,class返回指定bean * @param name * @param clazz * @param <T> * @return */ public static <T> T getBean(String name,Class<T> clazz){ return getApplicationContext().getBean(name,clazz); } }

缓存的

@Component public class RedisUtil { private RedisTemplate redisTemplate; @Autowired public void setRedisTemplate(RedisTemplate redisTemplate){ RedisSerializer serializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(serializer); redisTemplate.setValueSerializer(serializer); redisTemplate.setHashKeySerializer(serializer); redisTemplate.setHashValueSerializer(serializer); this.redisTemplate = redisTemplate; } /** * 向一张hash表中放入数据,如果不存在将创建 * @param key 键 * @param item 项 * @param value 值 * @return */ public boolean hset(String key, String item, Object value){ try { redisTemplate.opsForHash().put(key, item, value); return true; }catch (Exception e){ e.printStackTrace(); return false; } } public boolean hsetAll(String key, Map value){ try { redisTemplate.opsForHash().putAll(key,value); return true; }catch (Exception e){ e.printStackTrace(); return false; } } /** * hget * @param key 键 * @param item 项 * @return */ public Object hget(String key, String item){ return redisTemplate.opsForHash().get(key,item); } /** * 指定有效时间 * @param key 键 * @param time 时间 s * @return */ public boolean expire(String key, long time){ try { if (time > 0){ redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; }catch (Exception e){ e.printStackTrace(); return false; } } }

客户端相关

@Slf4j public class MqttPushClient { private MqttConfiguration mqttConfiguration = (MqttConfiguration) SpringUtil.getBean("mqttConfiguration"); private MqttClient client; private static volatile MqttPushClient mqttPushClient = null; public static MqttPushClient getInstance(){ if(null == mqttPushClient){ synchronized (MqttPushClient.class){ if (null == mqttPushClient){ mqttPushClient = new MqttPushClient(); } } } return mqttPushClient; } private MqttPushClient(){ connect(); } private void connect(){ try { client = new MqttClient(mqttConfiguration.getHost(),mqttConfiguration.getClientid(),new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); // 将订阅持久 options.setUserName(mqttConfiguration.getUsername()); options.setPassword(mqttConfiguration.getPassword().toCharArray()); options.setConnectionTimeout(mqttConfiguration.getTimeout()); options.setKeepAliveInterval(mqttConfiguration.getKeepalive()); try { client.setCallback(new PushCallback()); client.connect(options); }catch (Exception e){ log.error("通信异常",e); } }catch (Exception e){ log.error("创建客户端异常",e); } } public void publish(String topic, String msg){ publish(0,false,topic,msg); } /** * 发布主题 * @param qos 默认qos为0 (只管发不管接)(协议文档会有要求) * @param retained * @param topic 主题 * @param msg 数据 */ public void publish(int qos, boolean retained, String topic, String msg){ // 消息体 MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(msg.getBytes()); // 主题目标 这里用来发布 MqttTopic mqttTopic = client.getTopic(topic); if (null == mqttTopic){ log.error("topic not exist"); } MqttDeliveryToken token; try { token = mqttTopic.publish(message); token.waitForCompletion(); log.info("数据推送服务器[{}]",new String(message.getPayload())); }catch (MqttPersistenceException e){ log.error("发布异常",e); }catch (MqttException e){ log.error("发布异常",e); } } } MqttPushClient.getInstance().publish("pos_message_all","测试");

测试OK 调试用到工具 链接: https://pan.baidu.com/s/1oTcuhCM0oeyjWckcU52FNw 提取码: xak5

参考文章 https://blog.csdn.net/zhangxing52077/article/details/80568244

最新回复(0)