-
Notifications
You must be signed in to change notification settings - Fork 77
/
云边一体脚本实现.dos
156 lines (134 loc) · 6.92 KB
/
云边一体脚本实现.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
//创建warn分布式表
if(existsDatabase("dfs://warnDB")){
dropDatabase("dfs://warnDB")
}
db = database("dfs://warnDB", VALUE, 2022.01.01..2022.12.31)
m = table(1:0,`datetime`source`type`metric,[TIMESTAMP,SYMBOL,INT,STRING])
db.createPartitionedTable(m, "warn", ["datetime"])
//创建聚合结果分布式表
if(existsDatabase("dfs://waveDB")){
dropDatabase("dfs://waveDB")
}
db = database("dfs://waveDB", VALUE, 2022.01.01..2022.12.31)
m = table(1:0, `datetime`source`max_wave`min_wave,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE])
db.createPartitionedTable(m, "wave", ["datetime"])
//创建异常录波表
if(existsDatabase("dfs://alterWaveDB")){
dropDatabase("dfs://alterWaveDB")
}
db = database("dfs://alterWaveDB", VALUE, 2022.01.01..2022.12.31)
m = table(1:0,`datetime`source`wave_datatime <- (`d+string(0..255)),[TIMESTAMP,SYMBOL,TIMESTAMP] <- take(DOUBLE,256))
db.createPartitionedTable(m, "alterWave", ["datetime"])
//创建降采样后的原始数据存储表
if(existsDatabase("dfs://waveDB5M")){
dropDatabase("dfs://waveDB5M")
}
db = database("dfs://waveDB5M", VALUE, 2022.01.01..2022.12.31)
m = table(1:0,`datetime`source <- (`d+string(0..255)),[TIMESTAMP,SYMBOL] <- take(DOUBLE,256))
db.createPartitionedTable(m, "waveDB5M", ["datetime"])
go;
//创建流表
share streamTable(100:0, `datetime`source <- (`d+string(0..255)),[TIMESTAMP,SYMBOL] <- take(DOUBLE,256)) as wave
share streamTable(100:0, `datetime`source <- (`d+string(0..255)),[TIMESTAMP,SYMBOL] <- take(DOUBLE,256)) as wave5M
share streamTable(100:0, `datetime`source`max_wave`min_wave,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE]) as stwave
share streamTable(100:0, `datetime`source`type`metric,[TIMESTAMP,SYMBOL,INT,STRING]) as warn
share streamTable(100:0, `datetime`source`wave_datatime <- (`d+string(0..255)),[TIMESTAMP,SYMBOL,TIMESTAMP] <- take(DOUBLE,256)) as windowOutput1
go;
//创建时序引擎,每五分钟保留最后一条原始数据,对原数据进行降采样
metricsDownSampling = parseExpr(each(add,'last(',(`d+string(0..255)+")")))
downsampling = createTimeSeriesEngine(name="downsampling", windowSize=5*60*1000, step=5*60*1000, metrics=metricsDownSampling, dummyTable=wave, outputTable=wave5M, timeColumn=`datetime, keyColumn=`source)
subscribeTable(tableName='wave',actionName="down_sampling", offset=0, handler=append!{downsampling}, msgAsTable=true,batchSize=10000,throttle=1)
subscribeTable(tableName='wave5M',actionName='save5mToDFS',offset=0,handler=append!{loadTable("dfs://waveDB5M",'wave5M')},msgAsTable=true,batchSize=10000,throttle=1)
//定义处理函数
def metricCal(engineName,msg){
t = select datetime,source,rowMin(msg[,2:258]) as min,rowMax(msg[,2:258]) as max from msg
getStreamEngine(engineName).append!(t)
}
//定义聚合规则
metrics=<[max(max),min(min)]>
inputSchema = table(100:0, `datetime`source`min`max,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE])
//创建时序引擎,窗口为1s,步长为1s
tsAggr1 = createTimeSeriesEngine(name="tsAggr1", windowSize=1000, step=1000, metrics=metrics, dummyTable=inputSchema, outputTable=stwave, timeColumn=`datetime, keyColumn=`source)
//订阅wave表,进行指标计算
subscribeTable(tableName="wave", actionName="act_tsAggr1", offset=0, handler=metricCal{"tsAggr1"}, msgAsTable=true,batchSize=10000,throttle=1);
//订阅stwave表,将峰值趋势入库
subscribeTable(tableName='stwave',actionName='savePeakToDFS',offset=0,handler=append!{loadTable("dfs://waveDB",'wave')},msgAsTable=true,batchSize=10000,throttle=1)
//创建异常检测引擎,检测规则 max>90,min<-90
tsAggr2 = createAnomalyDetectionEngine(name="tsAggr2", metrics=<[max_wave > 90, min_wave <-90]>, dummyTable=stwave, outputTable=warn, timeColumn=`datetime, keyColumn=`source, windowSize=60*1000, step=60*1000)
//订阅stwave 进行异常检测
subscribeTable(tableName="stwave", actionName="act_tsAggr2", offset=0, handler=append!{tsAggr2}, msgAsTable=true,batchSize=10000,throttle=1);
//订阅warn表,将异常记录入库
subscribeTable(tableName='warn',actionName='saveWarnToDFS',offset=0,handler=append!{loadTable("dfs://warnDB",'warn')},msgAsTable=true,batchSize=10000,throttle=1)
//创建windowjoin引擎,订阅warn和wave,输出待存数据
cbefore = short(1)*(-1000)
cafter = short(1)*1000
metricswj = parseExpr(("wave.datetime" <- (`d+string(0..255))))
wjengine = createWindowJoinEngine(name="waveStreamWindow",leftTable=warn,rightTable=wave,outputTable=windowOutput1,window=cbefore:cafter,
metrics=metricswj,matchingColumn=`source,timeColumn=`datetime)
subscribeTable(tableName=`warn,actionName="joinLeft",offset=0,handler=appendForJoin{wjengine,true},msgAsTable=true,batchSize=10000,throttle=1)
subscribeTable(tableName=`wave,actionName="joinRight",offset=0,handler=appendForJoin{wjengine,false},msgAsTable=true,batchSize=10000,throttle=1)
//订阅 windowOutput1 ,异常波形入库
subscribeTable(tableName='windowOutput1',actionName='saveAlterWaveToDFS',offset=0,handler=append!{loadTable("dfs://alterWaveDB",'alterWave')},msgAsTable=true,batchSize=10000,throttle=1)
//模拟4通道数据
def simulateData(start_time,freq,channelNum,mutable t){
num=channelNum
ts=start_time
do{
time1 = take(ts,num)
tmp = table(num:num,`datetime`source <-(`d+string(0..255)),[TIMESTAMP,SYMBOL] <- take(DOUBLE,256))
tmp[`datetime] = time1
tmp[`source] = symbol(lpad(string(1..4),4,"0"))
if(rand(1000.0,1)>998.99){
if(rand(2,1)==0){
randList = -1000..-900 <- 0..900
}else{
randList = -900..0 <- 900..1000
}
for(i in 0..255){
tmp[`d+string(i)] = rand(randList,num) \ 10
}
}else{
for(i in 0..255){
tmp[`d+string(i)] = rand(-900..900,num) \ 10
}
}
t.append!(tmp)
ts += 1000/freq
}while(ts < (start_time+1000*60*15+1000))
}
start_time = 2024.04.10T00:00:00.000
freq = 50
channelNum = 4
submitJob("simulateData","simulateData",simulateData{start_time,freq,channelNum,wave})
getStreamingStat()
//查看计算结果
pt = loadTable('dfs://waveDB5M','wave5M')
select * from pt
pt = loadTable('dfs://waveDB','wave')
select * from pt
pt = loadTable('dfs://warnDB','warn')
select * from pt
pt = loadTable('dfs://alterWaveDB','alterWave')
select max(wave_datatime),min(wave_datatime) from pt group by datetime,source
// 取消全部订阅
// sub = getStreamingStat().pubTables
// for(row in sub){
// tableName = row.tableName
// if(substr(row.actions,0,1)=='[') actions = split(substr(row.actions, 1, strlen(row.actions)-2), ",")
// else actions = row.actions
// for(action in actions){
// unsubscribeTable(tableName=tableName, actionName=action)
// }
// }
// go;
// for(i in getStreamEngineStat().values()){
// for(name in i['name']){
// dropStreamEngine(name)
// }
// }
// go;
// undef(`stwave,SHARED)
// undef(`warn,SHARED)
// undef(`wave,SHARED)
// undef(`wave5M,SHARED)
// undef(`windowOutput1,SHARED)