目录
概述
web管理系统中可以对业务数据执行新增和删除,现在需要当业务数据发生新增或删除操作后,尽可能实时的反应到WPF客户端上面。
web管理系统用VUE编写,后端服务为SpringBoot,WPF客户端基于.Netframework4.8编写。
整体架构
sequenceDiagram title: 交互时序图 web前台->>+web后端服务:新增数据 Note over web前台,web后端服务:caremaId,labelInfo,...... web后端服务->>+WebSocketServer:创建websocker消息 Note over web后端服务,WebSocketServer:Must:cameraId=clientId WPF客户端1-->>+WebSocketServer:创建监听 Note over WPF客户端1,WebSocketServer:clientId WPF客户端2-->>+WebSocketServer:创建监听 Note over WPF客户端2,WebSocketServer:clientId WebSocketServer->>WPF客户端1:分发websocker消息 Note over WebSocketServer,WPF客户端1:依据:cameraId=clientId WebSocketServer->>WPF客户端2:分发websocker消息 Note over WebSocketServer,WPF客户端2:依据:cameraId=clientId设计
流程设计
- 用户在浏览器界面执行新增业务数据操作,调用后端新增接口
- WPF客户端在启动的时候初始化websocket客户端,并创建对server的监听
- 后端新增接口先将数据落库,而后调用websocket服务端产生消息,消息在产生后立马被发送到了正在监听中的websocket-client
- websocket-server和websocket-client是一对多的关系,如何保证业务数据被正确的分发?监听的时候给server端传递一个全局唯一的clientId,业务数据在产生的时候关联到一个BizId上面,只要保证clientId=BizId就可以了。
- 删除流程和新增类似
程序设计
WebSocketServer
概述
WebSocketServer端采用SpringBoot框架实现,通过在springboot-web项目中集成 org.springframework.boot:spring-boot-starter-websocket
实现websocket的能力。
新增pom
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
复制代码 新增配置类
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.config.annotation.EnableWebSocket;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
- @Configuration
- @EnableWebSocket
- public class WebSocketConfig {
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- return new ServerEndpointExporter();
- }
- }
复制代码 创建websocket端点
- import com.alibaba.fastjson.JSON;
- import org.springframework.stereotype.Component;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.concurrent.ConcurrentHashMap;
- @ServerEndpoint("/ws/label/{clientId}")
- @Component
- public class LabelWebSocket {
- /**
- * session list
- */
- private static ConcurrentHashMap<String, Session> sessionList = new ConcurrentHashMap<>();
- /**
- * 当前 clientId
- */
- private String currentClientId = "";
- @OnOpen
- public void open(Session session, @PathParam("clientId") String clientId) throws IOException {
- if (sessionList.containsKey(clientId)) {
- sessionList.remove(clientId);
- }
- sessionList.put(clientId, session);
- currentClientId = clientId;
- this.sendMsg(session, "connectok");
- }
- @OnClose
- public void close(Session session) throws IOException {
- sessionList.remove(currentClientId);
- System.out.println("连接关闭,session=" + JSON.toJSONString(session.getId()));
- }
- @OnMessage
- public void receiveMsg(Session session, String msg) throws IOException {
- this.sendMsg(session, "接收到的消息为:" + msg);
- // throw new RuntimeException("主动抛异常");
- }
- @OnError
- public void error(Session session, Throwable e) throws IOException {
- System.out.println("连接异常,session=" + JSON.toJSONString(session.getId()) + ";currentClientId=" + currentClientId);
- this.sendMsg(session, "发生异常,e=" + e.getMessage());
- e.printStackTrace();
- }
- /**
- * @param clientId
- * @param msg
- */
- public boolean sendMsg(String clientId, String msg) throws IOException {
- if (sessionList.containsKey(clientId)) {
- Session session = sessionList.get(clientId);
- this.sendMsg(session, msg);
- return true;
- } else {
- return false;
- }
- }
- private void sendMsg(Session session, String msg) throws IOException {
- session.getBasicRemote().sendText(msg);
- }
- }
复制代码 WebSocketClient
概述
WebSocketClient端集成在WPF应用客户端中,通过前期调研,选中 WebSocketSharp 作为websocketclient工具,WebSocketSharp 是托管在Github的开源项目,MITLicense,目前4.9K的star。
安装WebSocketSharp
- //nuget
- Install-Package WebSocketSharp -Pre
复制代码 初始化client
- WebSocket ws = new WebSocket("ws://127.0.0.1:8083/ws/xx/clientId");
复制代码 创建连接
- private void InitWebSocket()
- {
- ws.OnOpen += (sender, e) =>
- {
- Console.WriteLine("onOpen");
- };
- //允许ping
- ws.EmitOnPing = true;
- //接收到xiaoxi
- ws.OnMessage += (sender, e) =>
- {
- ReceiveMessage(sender, e);
- };
- ws.Connect();
- //发送消息
- //ws.Send("BALUS")
- ;
- }
- private void ReceiveMessage(object sender, MessageEventArgs e)
- {
- if (e.IsText)
- {
- // Do something with e.Data.like jsonstring
- Console.WriteLine(e.Data);
- return;
- }
- if (e.IsBinary)
- {
- // Do something with e.RawData. like byte[]
- return;
- }
- if (e.IsPing)
- {
- // Do something to notify that a ping has been received.
- return;
- }
- }
复制代码 跨线程更新UI
由于 WebSocketSharp 会创建线程来处理 ReceiveMessage ,而WPF中子线程是无法更新UI的,所以需要引入 Dispatcher 来实现跨线程更新UI。
获取当前线程名字
- //当前线程
- string name = Thread.CurrentThread.ManagedThreadId.ToString();
复制代码 示例代码
- private void ReceiveMessage(object sender, MessageEventArgs e)
- {
- if (e.IsText)
- {
- // Do something with e.Data.like jsonstring
- Console.WriteLine(e.Data);
- //当前线程
- string name = Thread.CurrentThread.ManagedThreadId.ToString();
- App.Current.Dispatcher.Invoke((Action)(() =>
- {
- Image lab = new Image();
- lab.Uid = "123456";
- lab.Name = "labName";
- lab.Width = 50; lab.Height = 50;
- string url = "http://xxx:xxx/img/louyu.png";
- BitmapImage bitmapImage = HttpUtil.getImage(url);
- lab.Source = bitmapImage;
- lab.AddHandler(MouseLeftButtonDownEvent, new MouseButtonEventHandler(LabelClick));
- Canvas.SetTop(lab, 800);
- Canvas.SetLeft(lab, 600);
- this.cav.Children.Add(lab);
- }));
- return;
- }
- }
复制代码 重连机制
当websocket-server重启,此时client将丢失和server的连接,如果不引入重连机制,那么在手动重启client前,业务数据将无法实时更新,这在实际使用中肯定是不被允许的,所以需要引入重连机制。当捕捉到连接关闭之后,尝试重新连接。
重试次数5次,重试间隔5s,示例代码如下:- //连接关闭
- wsLabel.OnClose += (sender, e) =>
- {
- OnWsLabelClose(sender, e);
- };
- ......
- int retry = 0;
- /// <summary>
- /// 连接关闭
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- private void OnWsLabelClose(object sender, CloseEventArgs e)
- {
- Log.LogInfo.Info("OnWsLabelClose,retry=" + retry);
- if (retry < 5)
- {
- retry++;
- Thread.Sleep(5000);
- wsLabel.Connect();
- }
- else
- {
- wsLabel.Log.Error("The reconnecting has failed.");
- Log.LogInfo.Info("重新连接失败,超过最大重试次数");
- }
- }
-
复制代码 接口设计
新增接口
概述
目前WebSocketServer和web后端服务是在同一个SpringBoot的工程中,所以只要将WebSocketServer托管到SpringContainer中,web后端服务可以通过 DI 的方式直接访问 WebSocketEndPoint。
如果考虑程序的低耦合,可以在WebSocketServer和web后端服务之间架设一个MQ。
核心代码
- @Autowired
- private LabelWebSocket ws;
- @GetMapping("/create")
- public boolean createLabel() throws IOException {
- String cameraId = "cml";
- //todo
- boolean result = ws.sendMsg(cameraId, "新增标签");
- return result;
- }
复制代码 风险
分布式风险
当前在 WebSocketServer 中,已经连接的client信息是记录在当前进程的cache中,如果服务做横向扩容,cache信息无法在多实例进程中传递,将导致无法正确的处理业务数据,并可能会发生意想不到的异常和bug,此问题在并发越高的情况下造成的影响越大
资源风险
web后端服务为基于java语言的springboot程序,这种类型程序的特点是内存消耗特别严重。WebSocketServer服务在本项目中仅用作消息中间件,连通web后端服务和WPF客户端。
首先WebSocketServer没有太多的计算能力的消耗,内存消耗会随着连接客户端数量的增长而增长,网络将是最大的开销,一方面需要转发来自web后端服务的业务数据,并和WPF客户端保持长连接;另一方面WebSocketServer和WPF客户端的交互可能会走公网,而其和web后端服务必然是在局域网环境。
综上,将web后端服务和WebSocketServer分开部署对于硬件资源成本和利用率来说是最好的选择。
高可用风险
未引入重试机制,当某一个环节失败之后,将导致异常情况发生。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |