Skip to content

Commit

Permalink
添加redis存储结构
Browse files Browse the repository at this point in the history
  • Loading branch information
SpikeFJ committed Oct 17, 2023
1 parent fe48b1e commit 519a424
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 100 deletions.
17 changes: 14 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,27 @@ iot通讯网关,支持tcp、udp等多种通讯方式接入,管理通讯连
## 二. 上行数据处理
上行数据,需要考虑上行数据的分发目的地,有可能是kafka,redis或其他

### 2.1. 普通上行数据
### 2.1 在线列表
存储设备所在机器及刷新时间,用于后续下行发送时候,定位设备。

1. 检测到上线、下线事件后,主动存储到redis,设置永不过期,缺点:服务端宕机后,仍旧会显示设备在线
2. 检测到上线、下线事件后,主动存储到redis,设置20s过期,缺点。需要实时(接收到数据后)/定时更新redis。

建议采用第一种,
1. 服务端宕机仍显示在线,可以在页面上标注服务器状态,在线的设备则相当于保留了最后一次快照,可能有利于后续定位问题
2. 可以设置一个比较长的过时时间。

### 2.2 服务端信息统计
### 2.3. 普通上行数据分发
该数据独立自解析,即可以针对该报文直接解析出是哪个对象的数据,存储分发即可

### 2.2 命令响应数据
### 2.4 命令响应数据分发
该数据是由于之前命令下发之后,设备上报的响应数据,
需要找到之前下发的历史记录,并判断是否超时,之后将处理好的数据传递给下发方。

需要注意的是:可能一次请求,多次响应

### 2.3 需要响应的数据
### 2.5 需要响应的数据
部分数据可能需要返回相应的报文给设备,这里也会出现几种情况
1. 所有响应都是公用结构,只有些许差异
如每次接收数据后,提供一个ACK,差异就是流水号要和请求一致
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/com/jfeng/gateway/comm/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ public class Constant {
public static final String SYSTEM_PREFIX = "iot:";
public static final String MODULE_MACHINE = SYSTEM_PREFIX + "machine:";
public static final String ONLINE_MAPPING = SYSTEM_PREFIX + "online:";
public static final String SESSION_CURRENT = SYSTEM_PREFIX + "current:";
public static final String SESSION_HISTORY = SYSTEM_PREFIX + "history:";

//<-----------------单个会话信息----------------------->
public static final String SESSION_REMOTE_ADDRESS = "remote_address";
public static final String SESSION_DEVICE_ID = "device_id";
public static final String SESSION_REMOTE_ADDRESS = "ra";
public static final String SESSION_LOCAL_ADDRESS = "la";
public static final String SESSION_DEVICE_ID = "did";
public static final String SESSION_BID = "bid";
public static final String SESSION_CREATE_TIME = "create_time";
public static final String SESSION_RECEIVE_PACKETS = "receive_packets";
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/jfeng/gateway/config/GateWayConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class GateWayConfig {

@Data
class Up {
List<DispatchSetting> eventDispatch;
Setting dataDispatch;
List<DispatchSetting> rawDataDispatch;
Setting preparedDataDispatch;
}

@Data
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/jfeng/gateway/server/PeriodServerWorker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.jfeng.gateway.server;

/**
* 服务器周期性定时任务
*/
public interface PeriodServerWorker {
/**
* 处理Server信息
*
* @param tcpServer 服务端
*/
void run(TcpServer tcpServer);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.jfeng.gateway.up.dispatch;
package com.jfeng.gateway.server;

import com.jfeng.gateway.comm.Constant;
import com.jfeng.gateway.server.TcpServer;
import com.jfeng.gateway.util.DateTimeUtils2;
import com.jfeng.gateway.util.RedisUtils;
import lombok.Getter;
Expand All @@ -19,14 +18,15 @@
@Setter
@Slf4j
@Component
@ConditionalOnProperty(name = "spring.redis")
@ConditionalOnProperty(name = "spring.redis.enable",havingValue = "true")
@Primary
public class RedisServerInfoDispatcher implements ServerInfoDispatcher {
public class PeriodServerWorkerInRedis implements PeriodServerWorker {
@Resource
private RedisUtils redisUtils;

@Override
public void dispatch(TcpServer tcpserver) {
public void run(TcpServer tcpserver) {
//1. 服务端自身信息
Map<String, String> serverInfos = new HashMap<>();
serverInfos.put(Constant.ONLINE, String.valueOf(tcpserver.getOnLines().size()));
serverInfos.put(Constant.CONNECTED, String.valueOf(tcpserver.getConnected().size()));
Expand All @@ -43,5 +43,15 @@ public void dispatch(TcpServer tcpserver) {
serverInfos.put(Constant.SERVER_LAST_REFRESH_TIME, DateTimeUtils2.outNow());

redisUtils.putAll(Constant.SYSTEM_PREFIX + tcpserver.getLocalAddress() + ":summary", serverInfos);

//TODO 2. 当前哪些设备在线,用于时间段查询
Map<String, String> onlineInfo = new HashMap<>();
String timeNow = DateTimeUtils2.outNow();

for (String deviceId : tcpserver.getOnLines().keySet()) {
onlineInfo.put(deviceId, deviceId);
}
redisUtils.putAll(Constant.SYSTEM_PREFIX + tcpserver.getLocalAddress() + ":online:" + timeNow, onlineInfo);

}
}
15 changes: 7 additions & 8 deletions src/main/java/com/jfeng/gateway/server/TcpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.jfeng.gateway.session.SessionStatus;
import com.jfeng.gateway.session.TcpSession;
import com.jfeng.gateway.up.dispatch.DataDispatcher;
import com.jfeng.gateway.up.dispatch.ServerInfoDispatcher;
import com.jfeng.gateway.util.DateTimeUtils;
import com.jfeng.gateway.util.Utils;
import io.netty.bootstrap.ServerBootstrap;
Expand Down Expand Up @@ -56,7 +55,7 @@ public class TcpServer extends ProxySessionListener implements Server {
DataDispatcher dataDispatcher;

@Autowired(required = false)
ServerInfoDispatcher serverInfoDispatcher;
PeriodServerWorker periodServerWorker;

@Autowired(required = false)
DownInfoSaveStrategy downInfoSave;
Expand All @@ -68,8 +67,8 @@ public class TcpServer extends ProxySessionListener implements Server {
private volatile boolean isRunning = true;
private int loginTimeout;//登陆超时
private int heartTimeout;//心跳超时
private int timeoutCheckInterval = 1000;//检测周期
private int checkPeriod = 1000;//检测周期
private int timeoutCheckInterval = 1_000;//检测周期
private int periodDelay = 30_000;//周期性任务延迟间隔

private boolean allowMultiSocketPerDevice = false;//是否需要单设备多连接,如双卡设备
private Map<String, TcpSession> connected = new ConcurrentHashMap<>();//已连接集合
Expand Down Expand Up @@ -162,23 +161,23 @@ public void init(Map<String, String> parameters) {
log.warn("下发发送", e);
} finally {
try {
Thread.sleep(checkPeriod);
Thread.sleep(periodDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
if (serverInfoDispatcher != null) {
if (periodServerWorker != null) {
Executors.newSingleThreadExecutor(new ThreadFactoryImpl("机器流量统计")).submit(() -> {
while (isRunning && !Thread.currentThread().isInterrupted()) {
try {
serverInfoDispatcher.dispatch(this);
periodServerWorker.run(this);
} catch (Exception e) {
log.warn("机器流量统计", e);
} finally {
try {
Thread.sleep(getCheckPeriod());
Thread.sleep(getPeriodDelay());
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/jfeng/gateway/session/TcpSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public void close(String closeReason) {


public SessionLifeCycle createConnectLifeCycle() {
return createConnectLifeCycle(true);
}

public SessionLifeCycle createConnectLifeCycle(boolean withRecords) {
SessionLifeCycle lifeCycle = new SessionLifeCycle();
lifeCycle.setChannelId(this.channelId);
lifeCycle.setRemoteAddress(this.remoteAddress);
Expand All @@ -152,11 +156,13 @@ public SessionLifeCycle createConnectLifeCycle() {
lifeCycle.setCloseTime(ZonedDateTime.now().toInstant().toEpochMilli());

SessionRecord last = this.histroyRecordFIFO.getData().getLast();
lifeCycle.setCloseReason((last != null && last.getDataType() == 3)?last.getData():"未知");
lifeCycle.setCloseReason((last != null && last.getDataType() == 3) ? last.getData() : "未知");

lifeCycle.setDeviceId(this.deviceId);
lifeCycle.setBusinessId(this.bId);
lifeCycle.setSessionRecords(this.histroyRecordFIFO);
if (withRecords) {
lifeCycle.setSessionRecords(this.histroyRecordFIFO);
}
return lifeCycle;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.Map;

/**
* kafka会话监听器。接收到事件后写入kafka对应的主题中
* kafka会话监听器。
* <p>
* 接收到事件后写入kafka对应的主题中
*/
@Component
@Setter
Expand All @@ -40,7 +42,7 @@ public class KafkaSessionListener implements SessionListener {
@PostConstruct
public void init() {
this.topics = new HashMap<>();
this.topics.put("default","default");
this.topics.put("default", "default");
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import javax.annotation.Resource;

/**
* 会话记录存储
* 会话内存监听器
* <p>
* 维护会话历史及明细
*/
@Component
@Setter
@Slf4j
@ConditionalOnProperty(name = "gateway.store.type",havingValue = "memory")
public class SessionRecordsListener implements SessionListener {
@ConditionalOnProperty(name = "gateway.store.type", havingValue = "memory")
public class SessionInMemoryListener implements SessionListener {

@Value("${gateway.store.maxRecordsForSingleSession:1000}")
private int maxHistory;
Expand Down
Loading

0 comments on commit 519a424

Please sign in to comment.