-
Notifications
You must be signed in to change notification settings - Fork 77
/
appendix_4.2_streamComputationOfDoubleEmaFactor_main.dos
67 lines (65 loc) · 2.04 KB
/
appendix_4.2_streamComputationOfDoubleEmaFactor_main.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
login("admin","123456");
clearAllCache();
undef(all);
go;
//清理流函数
def cleanStreamEngines(engineNames){
for(name in engineNames){
try{
dropStreamEngine(name)
}
catch(ex){
}
}
}
//因子纵表建表函数
def createTableForDoubleEmaFactor(){
factorSchema = table(
array(SYMBOL, 0) as SecurityID,
array(TIMESTAMP, 0) as TradeTime,
array(SYMBOL, 0) as factorName,
array(DOUBLE, 0) as val)
return factorSchema
}
def loadSnapshotTable(){
dbPath="dfs://snapshot_SH_L2_TSDB"
tableName="snapshot_SH_L2_TSDB"
tableHandle=loadTable(dbPath,tableName)
return tableHandle
}
//定义snapshot数据源的dummy和DS
def getDummyAndDs(){
columnPhrase='SecurityID, TradeTime,"doubleEma" as factorName, LastPx from tableHandle where date(TradeTime)<2020.07.01' //查询各列指令
testString="select top 500 "+columnPhrase
testPhrase=parseExpr(testString)//查询指令
fullString="select "+columnPhrase
fullPhrase=parseExpr(fullString)//查询指令
schemaTable=eval(testPhrase)
DataSource=replayDS(fullPhrase,"TradeTime", "TradeTime")//全量数据源
return schemaTable,DataSource
}
//因子函数定义
def sumDiff(x, y){
return (x-y)\(x+y)
}
@state
def doubleEma(price){
ema_20=ema(price, 20)
ema_40=ema(price, 40)
sumDiff_1000=1000 * sumDiff(ema_20, ema_40)
finalFactor=ema(sumDiff_1000,10) - ema(sumDiff_1000, 20)
return finalFactor
}
//以下计算代码
resultTable = createTableForDoubleEmaFactor()
tableHandle=loadSnapshotTable()
snapshotDummyTable,inputDS=getDummyAndDs()
//创建流引擎
engineName="doubleEmaFactorDemo"
cleanStreamEngines([engineName])//清理之前的同名引擎
demoEngine = createReactiveStateEngine(name=engineName, metrics=<[TradeTime,factorName,doubleEma(LastPx)]>, dummyTable=snapshotDummyTable, outputTable=resultTable, keyColumn="SecurityID")
demoJobName="streamingFactorDemoJob"
//提交job供执行
submitJob(demoJobName,"snapshot因子半年",replay,inputDS,demoEngine, `TradeTime, `TradeTime, -1,false, 4)
//上述job完成后可在结果表中查到计算结果
select top 50 * from resultTable where val > 0