From 3d160c5110f02132127a9bec4a5a2426ddfbca30 Mon Sep 17 00:00:00 2001 From: "[moon]" <[moon@jdk.plus]> Date: Sat, 23 Jul 2022 01:29:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0websocket=E9=9B=86=E7=BE=A4?= =?UTF-8?q?=E5=B9=BF=E6=92=AD=E8=A7=A3=E5=86=B3=E6=96=B9=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README-CN.md | 42 +++++++++++++++++++++++++++++++++--------- README.md | 46 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 70 insertions(+), 18 deletions(-) diff --git a/README-CN.md b/README-CN.md index 3cd1056..9d87b09 100644 --- a/README-CN.md +++ b/README-CN.md @@ -6,6 +6,7 @@

+

这是一款支持集群广播的使用netty编写的websocket组件,完美解决websocket和用户单机无法和整个业务集群通信的问题

- [English](README-CN.md) @@ -67,6 +68,9 @@ plus.jdk.websocket.log-level=debug # udp广播监听端口 plus.jdk.websocket.broadcast-monitor-port=10300 + +# udp广播监听端口,若小于等于0,则不监听消息 +plus.jdk.websocket.broadcast-monitor-port=10300 ``` @@ -107,23 +111,40 @@ public class MyWsSession implements IWsSession { import io.netty.channel.Channel; import io.netty.handler.codec.http.FullHttpRequest; import org.springframework.stereotype.Component; +import plus.jdk.broadcast.model.Monitor; import plus.jdk.websocket.common.HttpWsRequest; import plus.jdk.websocket.global.IWSSessionAuthenticatorManager; import plus.jdk.websocket.model.IWsSession; +import plus.jdk.websocket.properties.WebsocketProperties; @Component -public class WSSessionAuthenticator implements IWSSessionAuthenticatorManager { +public class WSSessionAuthenticator implements IWSSessionAuthenticatorManager { + /** + * 握手阶段验证session信息,若验证不通过,直接抛出异常终止握手流程。 + * 若验证成功,则返回自定义的session,并使用redis之类的公用存储服务记录当前用户和哪台机器建立了连接 + */ @Override - public MyWsSession authenticate(Channel channel, FullHttpRequest req, String path) { + public MyWsSession authenticate(Channel channel, FullHttpRequest req, String path, WebsocketProperties properties) { HttpWsRequest httpWsRequest = new HttpWsRequest(req); String uid = httpWsRequest.getQueryValue("uid"); return new MyWsSession(uid, channel); } + /** + * 当连接断开时,销毁session的回调 + */ + @Override + public void onSessionDestroy(IWsSession session, String path, WebsocketProperties properties) { + + } + + /** + * 返回当前用户和哪些机器建立了连接,需要向这些机器发送广播推送消息 + */ @Override - public void onSessionDestroy(IWsSession session) { - IWSSessionAuthenticatorManager.super.onSessionDestroy(session); + public Monitor[] getUserConnectedMachine(String userId, String path, WebsocketProperties properties) { + return new Monitor[]{new Monitor("127.0.0.1", properties.getBroadcastMonitorPort())}; } } ``` @@ -219,15 +240,18 @@ public class MessageController { @RequestMapping(value = "/message/send", method = {RequestMethod.GET}) public Object sendMessage(@RequestParam String uid, @RequestParam String content){ - // 调用sessionGroupManager.getSession()函数获取当前用户在该实例中的所有连接 // 你可以在 IWSSessionAuthenticator 的实现中自行实现自己的session定义,将消息分发给不同的设备 // 或向远端上报当前用户的连接到底在哪些机器上 ConcurrentLinkedDeque> sessions = sessionGroupManager.getSession(uid, "/ws/message"); - for(IWsSession wsSession: sessions) { - wsSession.sendText(content); // 发送文本消息 - wsSession.sendBinary(content.getBytes()); // 发送二进制消息 - } + + // 发送文本消息 + // 如果你按要求实现了IWSSessionAuthenticatorManager接口中的getUserConnectedMachine方法,那么将会向已经和用户建立连接的机器发送广播,推送消息 + + sessionGroupManager.sendText(uid, "/ws/message", content); + + // 发送二进制消息 + sessionGroupManager.sendBinary(uid, "/ws/message", content.getBytes(StandardCharsets.UTF_8)); return "success"; } } diff --git a/README.md b/README.md index 84c1afe..7bdb2b9 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@

+

This is a websocket component written in netty that supports cluster broadcasting, Perfectly solve the problem that websocket and user single machine cannot communicate with the entire business cluster

- [中文文档](README-CN.md) @@ -63,6 +64,9 @@ plus.jdk.websocket.event-executor-group-threads=0 # log level plus.jdk.websocket.log-level=debug + +# udp broadcast listening port, if it is less than or equal to 0, it will not listen for messages +plus.jdk.websocket.broadcast-monitor-port=10300 ``` ## Example of use @@ -103,23 +107,42 @@ The usage example is as follows: import io.netty.channel.Channel; import io.netty.handler.codec.http.FullHttpRequest; import org.springframework.stereotype.Component; +import plus.jdk.broadcast.model.Monitor; import plus.jdk.websocket.common.HttpWsRequest; import plus.jdk.websocket.global.IWSSessionAuthenticatorManager; import plus.jdk.websocket.model.IWsSession; +import plus.jdk.websocket.properties.WebsocketProperties; @Component -public class WSSessionAuthenticator implements IWSSessionAuthenticatorManager { +public class WSSessionAuthenticator implements IWSSessionAuthenticatorManager { + /** + * The session information is verified in the connection handshake stage. + * If the verification fails, an exception is thrown directly to terminate the handshake process. + * If the verification is successful, return a custom session, and use a public storage service such as redis to + * record which machine the current user has established a connection with + */ @Override - public MyWsSession authenticate(Channel channel, FullHttpRequest req, String path) { + public MyWsSession authenticate(Channel channel, FullHttpRequest req, String path, WebsocketProperties properties) { HttpWsRequest httpWsRequest = new HttpWsRequest(req); String uid = httpWsRequest.getQueryValue("uid"); return new MyWsSession(uid, channel); } + /** + * When the connection is disconnected, the callback for destroying the session + */ + @Override + public void onSessionDestroy(IWsSession session, String path, WebsocketProperties properties) { + + } + + /** + * Returns which machines the current user has established a connection with, and needs to send broadcast push messages to these machines + */ @Override - public void onSessionDestroy(IWsSession session) { - IWSSessionAuthenticatorManager.super.onSessionDestroy(session); + public Monitor[] getUserConnectedMachine(String userId, String path, WebsocketProperties properties) { + return new Monitor[]{new Monitor("127.0.0.1", properties.getBroadcastMonitorPort())}; } } ``` @@ -201,6 +224,7 @@ import plus.jdk.websocket.global.SessionGroupManager; import plus.jdk.websocket.model.IWsSession; import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; import java.util.concurrent.ConcurrentLinkedDeque; @@ -214,16 +238,20 @@ public class MessageController { private SessionGroupManager sessionGroupManager; @RequestMapping(value = "/message/send", method = {RequestMethod.GET}) - public Object sendMessage(@RequestParam String uid, @RequestParam String content){ + public Object sendMessage(@RequestParam String uid, @RequestParam String content) { // Call the sessionGroupManager.getSession() function to get all connections of the current user in this instance. // You can implement your own session definition in the implementation of IWSSessionAuthenticator to distribute messages to different devices // Or report to the remote end which machines the current user is connected to ConcurrentLinkedDeque> sessions = sessionGroupManager.getSession(uid, "/ws/message"); - for(IWsSession wsSession: sessions) { - wsSession.sendText(content); // 发送文本消息 - wsSession.sendBinary(content.getBytes()); // 发送二进制消息 - } + + // send text message, + // If you implement the getUserConnectedMachine method in the IWSSessionAuthenticatorManager interface as required, + // it will send a broadcast and push message to the machine that has established a connection with the user + sessionGroupManager.sendText(uid, "/ws/message", content); + + // send binary message + sessionGroupManager.sendBinary(uid, "/ws/message", content.getBytes(StandardCharsets.UTF_8)); return "success"; } }