SpringBoot 使用 WebSocket及注意事项

tech2022-09-11  107

**WebSocket **

WebSocket 协议本质上是一个基于 TCP 的协议。为了建立一个 WebSocket 连接,客户端浏览器首先要向服务器发起一个 HTTP 请求,这个请求和通常的 HTTP 请求不同,包含了一些附加头信息,其中附加头信息"Upgrade: WebSocket"表明这是一个申请协议升级的 HTTP 请求,服务器端解析这些附加的头信息然后产生应答信息返回给客户端,客户端和服务器端的 WebSocket 连接就建立起来了,双方就可以通过这个连接通道自由的传递信息,并且这个连接会持续存在直到客户端或者服务器端的某一方主动的关闭连接。

客户端的 HTML 和 JavaScript目前大部分浏览器支持 WebSocket() 接口,你可以在以下浏览器中尝试实例: Chrome, Mozilla, Opera 和 Safari。

这里比较完整地介绍了四种使用方式:这里有四种使用方式,实测均可实现通讯,细节需要自己琢磨

因为使用到springboot的项目,我选择了spring集成的。下面是步骤: 第一步:引入依赖:

compile group: 'org.springframework.boot', name: 'spring-boot-starter-websocket', version: '2.3.3.RELEASE'

第二步:配置websocket参数。可以配置不同会话的地址、握手前的拦截器等。

package com.aa.bb.cc.config; import com.aa.bb.cc.common.interceptor.WebSocketHandshakeInterceptor; import com.aa.bb.cc.communication.websocket.MtWebsocketHandler; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; /** * @decription: WebSocket 配置类 */ @Configuration @EnableWebMvc @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer, WebMvcConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { //setAllowedOrigins 调试先关闭 registry.addHandler(verifyHandler(), "/aa/bb/cc/dd") .setAllowedOrigins("*")//这是不做域名拦截处理 .addInterceptors(new WebSocketHandshakeInterceptor());//这里添加自定义的拦截器 } public WebSocketHandler verifyHandler() { return MtWebsocketHandler.getInstance(); } }

第三步: 可选择配置拦截器:=== 这里非常重要,如果前端携带了子协议,后端必须使用相同的子协议解析,不然会出现一次连接就马上关闭会话的情况!!!!!!==

package com.aa.bb.cc.common.interceptor; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.stereotype.Service; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.List; import java.util.Map; @Service public class WebSocketHandshakeInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { if (request instanceof ServletServerHttpRequest) { //websocket携带不了信息,只能自定义自协议sec-websocket-protocol。 //你可以通过session传递用户的登录信息,用户校验。attributes 是用户传递给消息处理者的参数 //ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request; // List<String> customInfo = serverRequest.getHeaders().get("sec-websocket- protocol"); // if (customInfo == null || customInfo.size() == 0) { // return false; // } //attributes.put("username", customInfo.get(0)); } return true; } /** * 握手后的操作 */ @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } }

第四步:消息处理器。(因为websocket的会话无法序列化,如果是分布式的系统需要使用中间件传递,可使用Redis的订阅模式,那样会简单点。这里不深入介绍)。另外避免多线程下的不安全,保存会话的要使用现场安全的集合 例如:ConcurrentHashMap、CopyOnWriteArraySet等

package com.aa.bb.cc.communication.websocket; import com.aa.bb.cc.common.enums.ResultEnum; import com.aa.bb.cc.common.exception.BusinessException; import com.aa.bb.cc.communication.modbus.common.ThreadPoolFactory; import com.aa.bb.cc.communication.websocket.sessions.WsSessionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.PingMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.io.IOException; import java.time.LocalDateTime; import java.util.LinkedList; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.*; /** * @created Date: 2020/9/1 * @decription: 这里继承最基本的 TextWebSocketHandler */ @Component public class MtWebsocketHandler extends TextWebSocketHandler { private static final Logger LOG = LoggerFactory.getLogger(WsSessionManager.class); private static final int MESSAGE_QUEUE_INTERVAL = 50; private static final int MESSAGE_PING_INTERVAL = 10; private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolFactory .getThreadPoolDiscardOldest(5, 100, 30, 1000); private MtWebsocketHandler() { } /** * 静态内部类,懒加载 */ public static MtWebsocketHandler getInstance() { return LazyMtWebsocketHandler.INSTANCE; } /** * 线程安全集合保存会话信息 */ private static final Set<WebSocketSession> sessions = new CopyOnWriteArraySet<>(); /** * 附带消息队列的会话 */ private final Map<WebSocketSession, LinkedList<TextMessage>> messageQueueMap = new ConcurrentHashMap<>(); private final ExecutorService websocketSendThreadPool = Executors.newFixedThreadPool(40); /** * socket 建立成功事件 * * @param session * @throws Exception */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { Object token = session.getAttributes().get("username"); sessions.add(session); messageQueueMap.put(session, new LinkedList<>()); //每一个会话有一个线程处理 websocketSendThreadPool.submit(() -> { LinkedList<TextMessage> textMessages = messageQueueMap.get(session); Integer pingSendInterval = MESSAGE_PING_INTERVAL; while (sessions.contains(session)) { if (textMessages.size() > 0) { TextMessage textMessage = textMessages.poll(); try { if (Objects.nonNull(textMessage)) { session.sendMessage(textMessage); } } catch (IOException e) { e.printStackTrace(); } } else { try { pingSendInterval--; if (pingSendInterval == 0) { try { session.sendMessage(new PingMessage()); } catch (Exception e) { try { session.close(); } catch (IOException ex) { LOG.error("Websocket close 失败"); LOG.error(ex.toString()); } LOG.error("Websocket ping 失败,关闭连接"); } finally { pingSendInterval = MESSAGE_PING_INTERVAL; } } Thread.sleep(MESSAGE_QUEUE_INTERVAL); } catch (InterruptedException e) { LOG.error(e.toString()); } } } }); super.afterConnectionEstablished(session); } /** * 接收消息事件 * * @param session * @param message * @throws Exception */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { System.out.println(message.toString()); } public void sendMessage(String message) { for (WebSocketSession session : sessions) { try { session.sendMessage(new TextMessage(message.getBytes())); } catch (IOException e) { e.printStackTrace(); } } } /** * socket 断开连接时 * * @param session * @param status */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { LOG.warn("websocket close!!"); sessions.remove(session); messageQueueMap.remove(session); super.afterConnectionClosed(session, status); } private static class LazyMtWebsocketHandler { private static final MtWebsocketHandler INSTANCE = new MtWebsocketHandler(); } }

第五步:前端设置。使用vue,elementUI。关闭websocket方法:this.webSocket.close();

methods: { init: function() { if (typeof WebSocket === 'undefined') { alert('您的浏览器不支持消息推送,可能会影响!!'); } else { // 实例化socket let hostname = window.location.hostname; const wsuri = 'ws://' + hostname + ':8089/aa/bb/cc/dd'; this.webSocket = new WebSocket(wsuri); this.webSocket.addEventListener('open', (_) => { //打开socket,可增加事件 }); this.webSocket.addEventListener('close', (_) => { //关闭socket,可增加事件 }); this.webSocket.addEventListener('message', (msg) => { //接收到信息socket,可增加事件 this.$notify({ title: '错误', type: 'error', message: msg.data, duration: 0, }); }); } }, } mounted() { this.init(); }

暂时写到这吧,有时间最补充。欢迎大家指出错误之处哈······

最新回复(0)