reactor等响应式可以轻松的传递流数据,想起了spring的webflux也支持以rsocket,它可以通过websocket或是tcp的方式进行数据传输,正好有java版本和js版本,且spring也支持rsocket,下面尝试是哟给你rsocket模拟天气数据的传输,前端接到数据通过折线图展示。
1. 后端部分
1.1 model实现
创建一个weather类用于传输包装数据包含时间、风速、风向、温度数据
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Weather {
Long id
;
LocalDateTime date
;
Long direction
;
Long speed
;
Long temperature
;
}
1.2 服务实现
使用random获取一些数据用于测试rsocket的controller使用的是@Controller路由是一段字符串@MessageMapping("weather")
@Controller
@Slf4j
public class RSocketController {
private final Faker f
= new Faker(Locale
.CHINA
);
@MessageMapping("weather")
public Flux
<Weather> stream(final String request
) {
return Flux
.interval(Duration
.ofSeconds(1))
.filter(it
-> it
%10!=3)
.map(index
-> Weather
.builder()
.id(index
)
.date(LocalDateTime
.now().minusDays(1000-index
))
.direction(f
.random().nextLong(360))
.speed(f
.random().nextLong(150))
.temperature(f
.random().nextLong(15)+20)
.build()
)
.log();
}
}
1.3 rsocket配置
需要配置传输方式websocket或是tcp配置port,不要跟web相同
rsocket
:
server
:
transport
: websocket
port
: 8081
注入ServerRSocketFactoryProcessor
@Configuration
public class RSocketConfig {
@Bean
ServerRSocketFactoryProcessor
serverRSocketFactoryProcessor() {
return RSocketFactory
.ServerRSocketFactory
::resume
;
}
}
2. 前端部分
2.1 实现client类
实现rsocket的js部分完成rsocket的连接类resumeToken要设置一下,两个连接使用同一个resumeToken,前一个会被顶掉,这里设置这个参数也是方便登录鉴权metadata中会存放route的东西,就是spring中的 @MessageMapping("weather"),这里用这个设置${String.fromCharCode(route.length)}${route}
import {IdentitySerializer
, JsonSerializer
, RSocketClient
, RSocketResumableTransport
} from "rsocket-core";
import {message
} from "antd";
import RSocketWebSocketClient
from "rsocket-websocket-client";
import {v4
} from 'uuid';
import {Weather
} from "./Weather";
export class Client {
constructor(address
) {
this.client
= new RSocketClient({
serializers
: {
data
: JsonSerializer
,
metadata
: IdentitySerializer
},
setup
: {
keepAlive
: 10000,
lifetime
: 20000,
dataMimeType
: 'application/json',
metadataMimeType
: 'message/x.rsocket.routing.v0',
},
transport
: new RSocketResumableTransport(
() => new RSocketWebSocketClient({url
: address
}),
{
bufferSize
: 200,
resumeToken
: v4('ffzs'),
})
});
}
connect() {
return new Promise((resolve
, reject
) => {
this.client
.connect().subscribe({
onComplete
: s
=> {
this.socket
= s
;
this.socket
.connectionStatus().subscribe(status
=> {
message
.info(`链接状态: ${JSON.stringify(status)}`)
});
resolve(this.socket
);
},
onError
: error
=> {
reject(error
);
},
onSubscribe
: cancel
=> {
this.cancel
= cancel
}
});
});
}
requestResponse(message
, route
) {
return new Promise((resolve
, reject
) => {
this.socket
.requestResponse({
data
: message
,
metadata
: String
.fromCharCode(route
.length
) + route
}).subscribe({
onComplete
: msg
=> {
resolve(new Weather().toObject(msg
.data
))
},
onError
: error
=> {
reject(error
)
}
});
});
}
fireAndForget(message
, route
) {
return this.socket
.fireAndForget({
data
: message
,
metadata
: `${String.fromCharCode(route.length)}${route}`
});
}
requestStream(message
, route
) {
return this.socket
.requestStream({
data
: message
,
metadata
: `${String.fromCharCode(route.length)}${route}`
});
}
requestChannel(flow
, route
) {
return this.socket
.requestChannel(flow
.map(msg
=> {
return {
data
: msg
,
metadata
: `${String.fromCharCode(route.length)}${route}`
};
}));
}
disconnect() {
console
.log('rsocketclientsocket', this.socket
);
console
.log('rsocketclient', this.client
);
this.client
.close();
}
}
2.2 连接实现
通过按钮激活连接
handleConnect() {
if (!this.state
.connected
) {
this.client
= new Client(this.state
.address
);
this.client
.connect().then(() => {
this.setState({connected
: true});
});
} else {
this.client
.disconnect();
this.setState({connected
: false});
}
}
2.3 访问实现
这里通过requestCount实现背压onNext中获取每回返回数据
handleRequestStream() {
if (!this.state
.inStream
) {
const requestCount
= 5;
let processedCount
= 0;
const msg
= new Weather('client', 'request');
this.client
.requestStream(msg
, 'weather').subscribe({
onSubscribe
: sub
=> {
this.streamSub
= sub
;
this.streamSub
.request(requestCount
);
this.setState({inStream
: true});
},
onError
: error
=> {
message
.error(`出现错误 ${error}`)
},
onNext
: msg
=> {
this.addData(msg
.data
);
processedCount
+= 1;
if (processedCount
>= requestCount
) {
this.streamSub
.request(requestCount
);
processedCount
= 0;
}
},
onComplete
: () => {
},
});
} else {
this.streamSub
.cancel();
this.setState({inStream
: false});
}
}
2.4 作图
使用plotly简单绘图,可视化数据,添加依赖:
npm install react-plotly.js plotly.js
接入数据x轴时间,y轴温度设置y轴由0开始
<Plot
data
={[
{
x
: this.state
.date
,
y
: this.state
.temperature
,
type
: 'scatter',
mode
: 'lines',
marker
: {color
: 'rgba(197,84,118,0.71)'},
},
]}
layout
={{width
: 1300,
height
: 640,
title
: '温度',
plot_bgcolor
:"rgba(65,135,113,0.35)",
yaxis
: {
rangemode
:"tozero",
}
}}
/>
2.5 设置button
服务连接前无法连接数据流连接数据流之后无法直接关闭服务
3. 结果展示
4.源码
github前端(antd pro)后端(spring webflux)
gitee前端(antd pro)后端(spring webflux)