由于目前项目通信服务端采用的Netty实现的TCPServer,为了保证数据的实时推送以及避免频繁查询数据库,所以采用websocket在接收到消息后立即推送到html页面 代码已经上传至GitHub 链接: nettyAndWebsocket
Netty启动类及相关配置
@Slf4j @Component @Configuration public class NettyServer { private Integer port = 8773; final EventLoopGroup bossGroup = new NioEventLoopGroup(); final EventLoopGroup workerGroup = new NioEventLoopGroup(); public void run() throws Exception { //创建BossGroup 和 WorkerGroup //说明 //1. 创建两个线程组 bossGroup 和 workerGroup //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成 //3. 两个都是无限循环 //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数 //默认实际 cpu核数 * 2 try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来进行设置 //设置两个线程组 bootstrap.group(bossGroup, workerGroup) //使用NioSocketChannel 作为服务器的通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数(也可以说是并发数) .option(ChannelOption.SO_BACKLOG, 2048) //设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) //.handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup //.childOption(ChannelOption.TCP_NODELAY,true)//socketchannel的设置,关闭延迟发送 .childHandler(new NettyInitializer()); log.info(".....服务器 is ready....."); //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象 //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(port).sync(); //给cf 注册监听器,监控我们关心的事件 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { log.info("监听端口[{}]成功!", port); } else { log.error("监听端口[{}]失败!", port); } } }); //对关闭通道进行监听 cf.channel().closeFuture().sync(); } catch (Exception e) { log.error(" netty服务启动异常 " + e.getMessage()); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }用于处理各种Netty连接、端口、接入等
@Slf4j @Component @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static Lock lock_1 = new ReentrantLock(); private static Lock lock_2 = new ReentrantLock(); private static Lock lock_3 = new ReentrantLock(); private static Lock lock_4 = new ReentrantLock(); /** * 管理一个全局map,保存连接进服务端的通道数量 */ private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>(); @Override //数据读取完毕 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //将数据写入到缓存,并刷新 //一般讲,我们对这个发送的数据进行编码 //ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8)); } /** * 处理异常, 一般是需要关闭通道 * * @param ctx * @param cause * * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); log.info("服务端异常关闭" + ctx.channel()); } /** * @param ctx * * @DESCRIPTION: 有客户端连接服务器会触发此函数 * @return: void */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { lock_1.lock(); try { //获取连接通道唯一标识 ChannelId channelId = ctx.channel().id(); //如果map中不包含此连接,就保存连接 if (CHANNEL_MAP.containsKey(channelId)) { log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size()); } else { //保存连接 CHANNEL_MAP.put(channelId, ctx); log.info("客户端【" + channelId + "】连接netty服务器"); log.info("连接通道数量: " + CHANNEL_MAP.size()); } } finally { lock_1.unlock(); } } /** * @param ctx * * @DESCRIPTION: 有客户端终止连接服务器会触发此函数 * @return: void */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { lock_2.lock(); try { ChannelId channelId = ctx.channel().id(); //包含此客户端才去删除 if (CHANNEL_MAP.containsKey(channelId)) { //删除连接 CHANNEL_MAP.remove(channelId); System.out.println(); log.info("客户端【" + channelId + "】退出netty服务器"); log.info("连接通道数量: " + CHANNEL_MAP.size()); } } finally { lock_2.unlock(); } } /** * 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址 * 2. Object msg: 就是客户端发送的数据 默认Object * <p> * 读取数据实际(这里我们可以读取客户端发送的消息) */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { lock_3.lock(); try { log.info("服务器读取线程 " + Thread.currentThread().getName() + " channle = " + ctx.channel()); Channel channel = ctx.channel(); //将 msg 转成一个 ByteBuf //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer. ByteBuf buf = (ByteBuf) msg; //得到此时客户端的数据长度 int bytes_length = buf.readableBytes(); //组件新的字节数组 byte[] buffer = new byte[bytes_length]; buf.readBytes(buffer); final String allData = NettyByteAndStringUtils.byteToHex(buffer); log.info("进入服务端数据:" + allData); ctx.executor().execute(new NettySendThread(ctx, allData)); } finally { lock_3.unlock(); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { lock_4.lock(); try { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client: " + socketString + " READER_IDLE 读超时"); ctx.disconnect(); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("Client: " + socketString + " WRITER_IDLE 写超时"); ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info("Client: " + socketString + " ALL_IDLE 总超时"); ctx.disconnect(); } } } finally { lock_4.unlock(); } } }用于处理接收的数据,比如处理黏包、拆包、数据过滤等,以及数据解析后续操作
@Slf4j public class NettyDataSvervice { private static final ReentrantLock Lock = new ReentrantLock(); /** * 将传送过来的数据进行解析,包括异或运算 (第一次服务器端给客户端发) * * @param ReceiveData * * @return */ public static String sendData(String ReceiveData) { final ReentrantLock putLock = Lock; log.info("接收数据" + ReceiveData); putLock.lock(); try { //此处需要读数据进行校验以及分包黏包处理,本文主要提供思路所以省略 /* * 处理分包黏包、拆分、解析等 */ //进入数据解析 parseData(ReceiveData); try { //数据帧WebSocket推送 WebSocketServer.BroadCastInfo(ReceiveData); } catch (IOException e) { e.printStackTrace(); } return ReceiveData; } finally { putLock.unlock(); } } /** * 数据解析入库处理 */ public static void parseData(String ReceiveData) { //此处省略真实入库操作 System.out.println("执行入库操作"); } }开启WebSocket支持
@Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }websocket 入口以及方法调用
@ServerEndpoint(value = "/ws/asset") @Component public class WebSocketServer { @PostConstruct public void init() { System.out.println("websocket 加载"); } private static Logger log = LoggerFactory.getLogger(WebSocketServer.class); private static final AtomicInteger OnlineCount = new AtomicInteger(0); // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。 private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<Session>(); /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session) { SessionSet.add(session); // 在线数加1 int cnt = OnlineCount.incrementAndGet(); log.info(String.valueOf(session.getRequestURI())); log.info("有连接加入,当前连接数为:{},sessionId={}", cnt, session.getId()); SendMessage(session, "连接成功"); } /** * 连接关闭调用的方法 */ @OnClose public void onClose(Session session) { SessionSet.remove(session); int cnt = OnlineCount.decrementAndGet(); log.info("有连接关闭,当前连接数为:{}", cnt); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { log.info("来自客户端的消息:{}", message); SendMessage(session, "收到消息,消息内容:" + message); } /** * 出现错误 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("发生错误:{},Session ID: {}", error.getMessage(), session.getId()); error.printStackTrace(); } /** * 发送消息,实践表明,每次浏览器刷新,session会发生变化。 * * @param session * @param message */ public static void SendMessage(Session session, String message) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("发送消息出错:{}", e.getMessage()); e.printStackTrace(); } } /** * 群发消息 * * @param message * * @throws IOException */ public static void BroadCastInfo(String message) throws IOException { for (Session session : SessionSet) { if (session.isOpen()) { SendMessage(session, message); } } } /** * 指定Session发送消息 * * @param sessionId * @param message * * @throws IOException */ public static void SendMessage(String message, String sessionId) throws IOException { Session session = null; for (Session s : SessionSet) { if (s.getId().equals(sessionId)) { session = s; break; } } if (session != null) { SendMessage(session, message); } else { log.warn("没有找到你指定ID的会话:{}", sessionId); } } }浏览器地址栏输入 http://localhost:8080/webSocket 如下图所示
使用串口调试助手模拟客户端连接Netty服务端并发送数据 这时我们同步打开网页查看数据是否通过websocket推送成功到前台 至此我们就完成了Netty接收数据并处理,Websocket推送数据到前台。 全部代码都已上传到GitHub仓库
本人使用的数据发送是hex16进制,具体数据格式以自身业务需求位置,比如Json格式等。
