-
Notifications
You must be signed in to change notification settings - Fork 77
/
appendix_8.3_case3_snapshot.dos
285 lines (239 loc) · 18.9 KB
/
appendix_8.3_case3_snapshot.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/*
* 模拟数据,如若您有Snapshot快照数据可以直接入库
* 数据跨度为20个工作日,4000只股票
* 一共94个字段,20个工作日的数据压缩前大约为200G,分天导入。
* 此处创建一个按天VALUE分区,股票哈希20的数据库,每个分区压缩前大约50M
* 本例涉及的因子是flow因子和权重偏度因子,推荐采用SQL模式计算,再写入对应的单值或者多值结果表。
*/
//数据源创库创表定义
//普通快照数据建库建表
def createSnapshotDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbTime = database(, VALUE, 2021.01.01..2021.12.31)
dbSymbol = database(, HASH, [SYMBOL, 20])
db = database(dbName, COMPO, [dbTime, dbSymbol], , 'TSDB')
name = `SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`BidOrders0`BidOrders1`BidOrders2`BidOrders3`BidOrders4`BidOrders5`BidOrders6`BidOrders7`BidOrders8`BidOrders9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9`OfferOrders0`OfferOrders1`OfferOrders2`OfferOrders3`OfferOrders4`OfferOrders5`OfferOrders6`OfferOrders7`OfferOrders8`OfferOrders9`NumTrades`IOPV`TotalBidQty`TotalOfferQty`WeightedAvgBidPx`WeightedAvgOfferPx`TotalBidNumber`TotalOfferNumber`BidTradeMaxDuration`OfferTradeMaxDuration`NumBidOrders`NumOfferOrders`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney
type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE
tbTemp = table(1:0, name, type)
db = database(dbName)
createPartitionedTable(dbHandle=db, table=tbTemp, tableName=tbName, partitionColumns=`TradeTime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns = `SecurityID`TradeTime, keepDuplicates=LAST)
}
//创建存储为arrayVector的库表
def createSnapshotArrayVectorDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db1 = database("", VALUE, 2020.01.01..2020.12.31)
db2 = database("", HASH, [SYMBOL, 20])
db = database(dbName, COMPO, [db1,db2], , 'TSDB')
name = `SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders`NumTrades`IOPV`TotalBidQty`TotalOfferQty`WeightedAvgBidPx`WeightedAvgOfferPx`TotalBidNumber`TotalOfferNumber`BidTradeMaxDuration`OfferTradeMaxDuration`NumBidOrders`NumOfferOrders`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney
type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`SYMBOL`DOUBLE`INT`INT`DOUBLE`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE
tbTemp = table(1:0, name, type)
tbTemp.dropColumns!(`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders)
tbTemp.addColumn(`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders, [DOUBLE[],INT[],INT[],DOUBLE[],INT[],INT[]])
tbTemp.reorderColumns!(`SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders`NumTrades`IOPV`TotalBidQty`TotalOfferQty`WeightedAvgBidPx`WeightedAvgOfferPx`TotalBidNumber`TotalOfferNumber`BidTradeMaxDuration`OfferTradeMaxDuration`NumBidOrders`NumOfferOrders`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney)
db.createPartitionedTable(tbTemp, tbName, `Tradetime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns = `SecurityID`Tradetime, keepDuplicates=LAST)
}
//模拟数据定义
def genSnapshotOrigin(n){
tradeDate=table(temporalAdd(2020.01.01,n, "B") as tradeDate)
tradeMin = table((09:30:00.000+0..2400*3*1000) join (13:00:00.000+0..2400*3*1000) as tradeMin)
tradetime = select concatDateTime(tradeDate,tradeMin) as tradetime from cj(tradeDate,tradeMin)
securityid ="sz"+lpad(string(000001..004000), 6, `0)
hash_ids = table(securityid as securityid , hashBucket(securityid,20) as hash_id)
for (i in 0..19){
id = exec securityid from hash_ids where hash_id = i
tmpTable = cj(table(id as securityid),tradetime)
resTable = select *, rand(100.0,size(tmpTable )) as PreClosePx , rand(100.0,size(tmpTable )) as OpenPx, rand(100.0,size(tmpTable )) as HighPx, rand(100.0,size(tmpTable )) as LowPx , rand(100.0,size(tmpTable )) as LastPx, rand(100,size(tmpTable )) as TotalVolumeTrade, rand(100.0,size(tmpTable )) as TotalValueTrade , rand(`a`b`c,size(tmpTable )) as InstrumentStatus, rand(100.0,size(tmpTable )) as BidPrice0, rand(100.0,size(tmpTable )) as BidPrice1 , rand(100.0,size(tmpTable )) as BidPrice2 , rand(100.0,size(tmpTable )) as BidPrice3, rand(100.0,size(tmpTable )) as BidPrice4, rand(100.0,size(tmpTable )) as BidPrice5, rand(100.0,size(tmpTable )) as BidPrice6, rand(100.0,size(tmpTable )) as BidPrice7, rand(100.0,size(tmpTable )) as BidPrice8, rand(100.0,size(tmpTable )) as BidPrice9, rand(100,size(tmpTable )) as BidOrderQty0, rand(100,size(tmpTable )) as BidOrderQty1, rand(100,size(tmpTable )) as BidOrderQty2, rand(100,size(tmpTable )) as BidOrderQty3, rand(100,size(tmpTable )) as BidOrderQty4, rand(100,size(tmpTable )) as BidOrderQty5, rand(100,size(tmpTable )) as BidOrderQty6, rand(100,size(tmpTable )) as BidOrderQty7,rand(100,size(tmpTable )) as BidOrderQty8, rand(100,size(tmpTable )) as BidOrderQty9, rand(100,size(tmpTable )) as BidOrders0, rand(100,size(tmpTable )) as BidOrders1,rand(100,size(tmpTable )) as BidOrders2,rand(100,size(tmpTable )) as BidOrders3,rand(100,size(tmpTable )) as BidOrders4,rand(100,size(tmpTable )) as BidOrders5,rand(100,size(tmpTable )) as BidOrders6,rand(100,size(tmpTable )) as BidOrders7,rand(100,size(tmpTable )) as BidOrders8,rand(100,size(tmpTable )) as BidOrders9,rand(100.0,size(tmpTable )) as OfferPrice0,rand(100.0,size(tmpTable )) as OfferPrice1,rand(100.0,size(tmpTable )) as OfferPrice2,rand(100.0,size(tmpTable )) as OfferPrice3,rand(100.0,size(tmpTable )) as OfferPrice4,rand(100.0,size(tmpTable )) as OfferPrice5,rand(100.0,size(tmpTable )) as OfferPrice6,rand(100.0,size(tmpTable )) as OfferPrice7,rand(100.0,size(tmpTable )) as OfferPrice8,rand(100.0,size(tmpTable )) as OfferPrice9,rand(100,size(tmpTable )) as OfferOrderQty0,rand(100,size(tmpTable )) as OfferOrderQty1,rand(100,size(tmpTable )) as OfferOrderQty2,rand(100,size(tmpTable )) as OfferOrderQty3,rand(100,size(tmpTable )) as OfferOrderQty4,rand(100,size(tmpTable )) as OfferOrderQty5,rand(100,size(tmpTable )) as OfferOrderQty6,rand(100,size(tmpTable )) as OfferOrderQty7,rand(100,size(tmpTable )) as OfferOrderQty8,rand(100,size(tmpTable )) as OfferOrderQty9,rand(100,size(tmpTable )) as OfferOrders0,rand(100,size(tmpTable )) as OfferOrders1,rand(100,size(tmpTable )) as OfferOrders2,rand(100,size(tmpTable )) as OfferOrders3,rand(100,size(tmpTable )) as OfferOrders4,rand(100,size(tmpTable )) as OfferOrders5,rand(100,size(tmpTable )) as OfferOrders6,rand(100,size(tmpTable )) as OfferOrders7,rand(100,size(tmpTable )) as OfferOrders8,rand(100,size(tmpTable )) as OfferOrders9,rand(100,size(tmpTable )) as NumTrades,rand(100.0,size(tmpTable )) as IOPV,rand(100,size(tmpTable )) as TotalBidQty,rand(100,size(tmpTable )) as TotalOfferQty,rand(100.0,size(tmpTable )) as WeightedAvgBidPx,rand(100.0,size(tmpTable )) as WeightedAvgOfferPx,rand(100,size(tmpTable )) as TotalBidNumber,rand(100,size(tmpTable )) as TotalOfferNumber,rand(100,size(tmpTable )) as BidTradeMaxDuration,rand(100,size(tmpTable )) as OfferTradeMaxDuration,rand(100,size(tmpTable )) as NumBidOrders,rand(100,size(tmpTable )) as NumOfferOrders,rand(100,size(tmpTable )) as WithdrawBuyNumber,rand(100,size(tmpTable )) as WithdrawBuyAmount,rand(100.0,size(tmpTable )) as WithdrawBuyMoney,rand(100,size(tmpTable )) as WithdrawSellNumber,rand(100,size(tmpTable )) as WithdrawSellAmount, rand(100.0,size(tmpTable )) as WithdrawSellMoney, rand(100,size(tmpTable )) as ETFBuyNumber, rand(100,size(tmpTable )) as ETFBuyAmount, rand(100.0,size(tmpTable )) as ETFBuyMoney, rand(100,size(tmpTable )) as ETFSellNumber, rand(100,size(tmpTable )) as ETFSellAmount, rand(100.0,size(tmpTable )) as ETFSellMoney from tmpTable
db = loadTable("dfs://snapshot_SH_L2_TSDB", "snapshot_SH_L2_TSDB")
db.append!(resTable)
}
}
//因为每天数据大概有10G,按天,股票hash导入,采用submitJob的形式。
def writeInSnapshotByDay(numOfdays){
for (n in 0..(numOfdays-1)){
submitJob("genSnapshotOrigin_"+string(n),"genSnapshotOrigin_"+string(n),genSnapshotOrigin,n)
}
}
//从普通snapshot表中,创建用arrayVector存储的数据
def importDataDaily(d, syms){
snapshot = loadTable("dfs://snapshot_SH_L2_TSDB", "snapshot_SH_L2_TSDB")
for(sym in syms){
tmp = select SecurityID,TradeTime,PreClosePx,OpenPx,HighPx,LowPx,LastPx,TotalVolumeTrade,TotalValueTrade,InstrumentStatus,fixedLengthArrayVector(BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9) as BidPrice,fixedLengthArrayVector(BidOrderQty0,BidOrderQty1,BidOrderQty2,BidOrderQty3,BidOrderQty4,BidOrderQty5,BidOrderQty6,BidOrderQty7,BidOrderQty8,BidOrderQty9) as BidOrderQty,fixedLengthArrayVector(BidOrders0,BidOrders1,BidOrders2,BidOrders3,BidOrders4,BidOrders5,BidOrders6,BidOrders7,BidOrders8,BidOrders9) as BidOrders ,fixedLengthArrayVector(OfferPrice0,OfferPrice1,OfferPrice2,OfferPrice3,OfferPrice4,OfferPrice5,OfferPrice6,OfferPrice7,OfferPrice8,OfferPrice9) as OfferPrice,fixedLengthArrayVector(OfferOrderQty0,OfferOrderQty1,OfferOrderQty2,OfferOrderQty3,OfferOrderQty4,OfferOrderQty5,OfferOrderQty6,OfferOrderQty7,OfferOrderQty8,OfferOrderQty9) as OfferOrderQty,fixedLengthArrayVector(OfferOrders0,OfferOrders1,OfferOrders2,OfferOrders3,OfferOrders4,OfferOrders5,OfferOrders6,OfferOrders7,OfferOrders8,OfferOrders9) as OfferOrders,NumTrades,IOPV,TotalBidQty,TotalOfferQty,WeightedAvgBidPx,WeightedAvgOfferPx,TotalBidNumber,TotalOfferNumber,BidTradeMaxDuration,OfferTradeMaxDuration,NumBidOrders,NumOfferOrders,WithdrawBuyNumber,WithdrawBuyAmount,WithdrawBuyMoney,WithdrawSellNumber,WithdrawSellAmount,WithdrawSellMoney,ETFBuyNumber,ETFBuyAmount,ETFBuyMoney,ETFSellNumber,ETFSellAmount,ETFSellMoney from snapshot where SecurityID=sym, date(TradeTime) = d
if(tmp.size()>0){
loadTable("dfs://LEVEL2_Snapshot_ArrayVector", "Snap").append!(tmp)
}
}
}
def writeInSnapshotArrayVectorByDay(numOfdays){
for (i in 0..19){
securityid ="sz"+lpad(string(000001..004000), 6, `0)
hash_bucket_table = table(securityid,hashBucket(securityid,20) as bucket)
syms = exec securityid from hash_bucket_table where bucket = i
for (d in 2020.01.01+0..(numOfdays-1)){
submitJob("array_bucket"+string(i),"importDataDaily_array"+string(d)+"bucket"+string(i),importDataDaily,d,syms)
}
}
}
// 因子定义
//flow有状态因子自定义函数
@state
def flow(buy_vol, sell_vol, askPrice1, bidPrice1){
buy_vol_ma = round(mavg(buy_vol, 60), 5)
sell_vol_ma = round(mavg(sell_vol, 60), 5)
buy_prop = iif(abs(buy_vol_ma+sell_vol_ma) < 0, 0.5 , buy_vol_ma/ (buy_vol_ma+sell_vol_ma))
spd_tmp = askPrice1 - bidPrice1
spd = iif(spd_tmp < 0, 0, spd_tmp)
spd_ma = round(mavg(spd, 60), 5)
return iif(spd_ma == 0, 0, buy_prop / spd_ma)
}
//多档权重偏度
def mathWghtCovar(x, y, w){
v = (x - rowWavg(x, w)) * (y - rowWavg(y, w))
return rowWavg(v, w)
}
def mathWghtSkew(x, w){
x_var = mathWghtCovar(x, x, w)
x_std = sqrt(x_var)
x_1 = x - rowWavg(x, w)
x_2 = x_1*x_1
len = size(w)
adj = sqrt((len - 1) * len) \ (len - 2)
skew = rowWsum(x_2, x_1) \ (x_var * x_std) * adj \ len
return iif(x_std==0, 0, skew)
}
/*
因子数据库表结构定义:
单值存储表loadTable("dfs://Snapshot_FACTOR_VERTICAL","factor_snapshot")
存储分钟频率、日频因子。采用OLAP引擎。按月,以及因子名VALUE分区
多值宽表loadTable("dfs://Snapshot_FACTOR_WIDE","factor_Snapshot_wide")
宽表的列分别为时间列,因子名,各标的名称。采用TSDB引擎。按月,以及因子名VALUE分区
*/
def createSnapshotFactorVerticalDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", VALUE, 2020.01M..2020.02M)
dbSymbol=database("", VALUE, `factor1`factor2)
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate,dbSymbol], engine = 'TSDB')
//创建因子分区表,按年对因子进行value 分区
mtable=table(1:0, `tradetime`securityid`factorname`val, [TIMESTAMP,SYMBOL,SYMBOL,DOUBLE]);
snapshot = db.createPartitionedTable(table=mtable, tableName= tbName, partitionColumns=`tradetime`factorname,sortColumns = `securityID`tradetime, keepDuplicates=LAST)
}
def createSnapshotFactorWideDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", VALUE, 2020.01M..2020.02M)
dbSymbol=database("", VALUE, `factor1`factor2)
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate,dbSymbol],engine="TSDB")
baseColNames = `tradetime`factorname join ("sz"+lpad(string(000001..004000), 6, `0))
baseColType = ([TIMESTAMP,SYMBOL]).append!(take(DOUBLE,4000))
mtable=table(1:0, baseColNames,baseColType);
min_factor = db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`tradetime`factorname,sortColumns=`factorname`tradetime,compressMethods={tradetime:"delta"},keepDuplicates=LAST)
}
//流表相关定义
def cleanStreamEngine(){
try{
dropStreamEngine("WghtSkew")
dropStreamEngine("snapshotFlow")
}
catch(ex){
print(ex)
}
}
def createWghtSkewStreamEngine(result){
cleanStreamEngine()
inputSchema = table(1:0, ["SecurityID","TradeTime","BidPrice"], [SYMBOL,TIMESTAMP,DOUBLE[]])
metrics = <[TradeTime,mathWghtSkew(BidPrice, 1 1 1 1 1 1 1 1 1 1)]>
return createReactiveStateEngine(name="WghtSkew", metrics=metrics, dummyTable=inputSchema, outputTable=result, keyColumn="SecurityID")
}
def createFlowStreamEngine(result){
cleanStreamEngine()
inputSchema = table(1:0, ["SecurityID","TradeTime","BidOrderQty","OfferOrderQty","OfferPrice","BidPrice"], [SYMBOL,TIMESTAMP,INT[],INT[],DOUBLE[],DOUBLE[]])
metrics = <[TradeTime,flow(BidOrderQty[1],OfferOrderQty[1], OfferPrice[1], BidPrice[1])]>
return createReactiveStateEngine(name="snapshotFlow", metrics= metrics, dummyTable=inputSchema, outputTable=result, keyColumn="SecurityID")
}
def createAllTables(){
createSnapshotDbTable("dfs://snapshot_SH_L2_TSDB", "snapshot_SH_L2_TSDB")
createSnapshotArrayVectorDbTable("dfs://LEVEL2_Snapshot_ArrayVector", "Snap")
createSnapshotFactorVerticalDbTable("dfs://Snapshot_FACTOR_VERTICAL", "factor_snapshot")
createSnapshotFactorWideDbTable("dfs://Snapshot_FACTOR_WIDE", "factor_Snapshot_wide")
}
def writeSimulatedData(dayCount){
writeInSnapshotByDay(dayCount) //普通快照表写入
sleep(1000*60*4)//大概需要花4min
writeInSnapshotArrayVectorByDay(dayCount) //ArrayVector快照表写入
sleep(130000) //大概需要花2分钟
}
//权重偏度因子
def getWeights(){
//weights:
w = 10 9 8 7 6 5 4 3 2 1
resWeight = select TradeTime, SecurityID, `mathWghtSkew as factorname, mathWghtSkew(BidPrice, w) as val from loadTable("dfs://LEVEL2_Snapshot_ArrayVector","Snap") where date(TradeTime) = 2020.01.02 map
return resWeight
}
//flow有状态因子自定义函数
def getFlow(){
resFlow = select TradeTime, SecurityID, `flow as factorname, flow(BidOrderQty[1],OfferOrderQty[1], OfferPrice[1], BidPrice[1]) as val from loadTable("dfs://LEVEL2_Snapshot_ArrayVector","Snap") where date(TradeTime) = 2020.01.02 context by SecurityID
return resFlow
}
//宽表多值模型存储写入函数
def writeFactorsInWideTables(resFlow,resWeight){
pivot_res = select val from resFlow pivot by TradeTime, SecurityID
pivot_res[`factorname]=`flow
reorderColumns!(pivot_res,`TradeTime`factorname)
loadTable("dfs://Snapshot_FACTOR_WIDE","factor_Snapshot_wide").append!(pivot_res)
pivot_res = select val from resWeight pivot by TradeTime, SecurityID
pivot_res[`factorname]=`mathWghtSkew
reorderColumns!(pivot_res,`TradeTime`factorname)
loadTable("dfs://Snapshot_FACTOR_WIDE","factor_Snapshot_wide").append!(pivot_res)
}
//流计算arrayVector权重偏度因子
def performWghtSkewStream(){
dbPath = "dfs://LEVEL2_Snapshot_ArrayVector"
tableName = "Snap"
tableHandle = loadTable(dbPath, tableName)
input =select SecurityID, TradeTime, BidPrice from tableHandle where date(TradeTime)=2020.01.02, TradeTime <2020.01.02T09:40:00.000
resultSkew = table(10000:0, ["SecurityID","TradeTime","factor"], [SYMBOL,TIMESTAMP,DOUBLE])
streamEngine = createWghtSkewStreamEngine(resultSkew)
streamEngine.append!(input)
return resultSkew
}
//基于快照数据的有状态因子flow的增量流式计算
def performFlowStream(){
dbPath = "dfs://LEVEL2_Snapshot_ArrayVector"
tableName = "Snap"
tableHandle = loadTable(dbPath, tableName)
input =select SecurityID, TradeTime,BidOrderQty,OfferOrderQty,OfferPrice,BidPrice from tableHandle where date(TradeTime)=2020.01.02, TradeTime <2020.01.02T09:40:00.000
resultFlow = table(10000:0, ["SecurityID","TradeTime","factor"], [SYMBOL,TIMESTAMP,DOUBLE])
streamEngine = createFlowStreamEngine(resultFlow)
streamEngine.append!(input)
return resultFlow
}
login(`admin,`123456)
//1. 创建普通快照数据库表、array Vector类型快照数据库表、因子单值模型库表,因子多值模型库表
createAllTables()
//2. 模拟写入5天数据
writeSimulatedData(5)
//3. 计算并写入单值模型存储
//flow有状态因子计算
resFlow=getFlow()
//权重偏度因子计算
resWeight=getWeights()
//写入单值模型存储
loadTable("dfs://Snapshot_FACTOR_VERTICAL","factor_snapshot").append!(resFlow)
loadTable("dfs://Snapshot_FACTOR_VERTICAL","factor_snapshot").append!(resWeight)
//写入宽表多值模型存储
writeFactorsInWideTables(resFlow,resWeight)
//4. 流计算
//4.1 流计算arrayVector权重偏度因子
resultSkew=performWghtSkewStream()
//结果在resultSkew中查看
select top 5* from resultSkew
//4.2 基于快照数据的有状态因子flow的增量流式计算
resultFlow=performFlowStream()
//结果在resultFlow中查看
select top 100* from resultFlow