-
Notifications
You must be signed in to change notification settings - Fork 77
/
appendix_4.3.3_streamComputationOfDoubleEmaFactorPublishingOnZMQ_main.dos
130 lines (126 loc) · 4.09 KB
/
appendix_4.3.3_streamComputationOfDoubleEmaFactorPublishingOnZMQ_main.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
login("admin","123456");
clearAllCache();
undef(all);
go;
def unsubscribeAll(){
streamStat=getStreamingStat()//检查流状态
pubTables=streamStat["pubTables"]
unsubTablesDict=dict(pubTables.tableName,pubTables.actions)
for(eachTableName in unsubTablesDict.keys()){
print(eachTableName)
allActionsString=unsubTablesDict[eachTableName]
allActionsString=strReplace(allActionsString,"]","")
allActionsString=strReplace(allActionsString,"[","")
allActionNames=allActionsString.split(",")
for(eachActionName in allActionNames){
print(eachActionName)
try{
unsubscribeTable(tableName=eachTableName,actionName=eachActionName)
输出日志="共享表:"+eachTableName+",action:"+eachActionName+",已解除订阅"
print(输出日志)
}
catch(ex){
print("解除订阅失败:\r\n",ex)
print(eachTableName)
print(eachActionName)
}
}
}
}
def dropAllStreamEngines(){
engineStatus=getStreamEngineStat()
allCurrrentEngineTypes=engineStatus.keys()
for(eachEngineType in allCurrrentEngineTypes){
engineInfoTable=engineStatus[eachEngineType]
for (eachEngineName in engineInfoTable.name){
try{
dropStreamEngine(eachEngineName)
print("注销引擎",eachEngineName,"成功")
}
catch(ex){
print("注销引擎",eachEngineName,"失败:",ex)
}
}
}
}
def removeAllSharedTable(){
allSharedObjects=exec name from objs(true) where shared==true
for (eachSharedTable in allSharedObjects){
try{
dropStreamTable(eachSharedTable)
//undef(eachSharedTable,SHARED)
日志提示="共享表:"+eachSharedTable+",已析构"
print(日志提示)
}
catch(ex){
日志提示="共享表:"+eachSharedTable+",析构失败,提示:"
print(日志提示,ex)
}
}
}
def clearAllStreamStuff(){
unsubscribeAll()
dropAllStreamEngines()
removeAllSharedTable()
}
//执行流清理函数
unsubscribeAll()//解除订阅
dropAllStreamEngines()//删引擎
removeAllSharedTable()//删流表
go;
//加载zmq消息队列插件
def loadPluginConfig(插件配置文件){
try{
loadPlugin(插件配置文件)
}
catch(ex){
print("loadPluginConfig失败:")
print(ex)
}
}
//按zmq地址定义消息推送表
def zmqPusherTable(zmqSubscriberAddress,schemaTable){
SignalSender=def (x) {
return x
}
pushingSocket = zmq::socket("ZMQ_PUB", SignalSender)
zmq::connect(pushingSocket, zmqSubscriberAddress)
pusher = zmq::createPusher(pushingSocket, schemaTable)
return pusher
}
//因子计算逻辑
def sumDiff(x, y){
return (x-y)\(x+y)
}
@state
def factor1(price){
ema_20=ema(price, 20)
ema_40=ema(price, 40)
sumDiff_1000=1000 * sumDiff(ema_20, ema_40)
finalFactor=ema(sumDiff_1000,10) - ema(sumDiff_1000, 20)
return finalFactor
}
zmqPluginConfigFile="/hdd/hdd0/FactorPractice_1/server/plugins/zmq/PluginZmq.txt"
loadPluginConfig(zmqPluginConfigFile)
//快照表名
dbPath="dfs://snapshot_SH_L2_TSDB"
tableName="snapshot_SH_L2_TSDB"
tableHandle=loadTable(dbPath,tableName)
//快照数据结构
snapshotSchema=table(1:0,["SecurityID","TradeTime","LastPx"], [SYMBOL,TIMESTAMP,DOUBLE])
//输出到消息队列的表结构
resultSchema=table(1:0,["SecurityID","TradeTime","factor"], [SYMBOL,TIMESTAMP,DOUBLE])
//流表resultStream向zmq队列推送,使用时根据不同的zmq地址修改此字符串
zmqSubscriberAddress="tcp://192.168.1.195:55556"
//生成一个逻辑表向上述地址发送zmq包
pusherTable=zmqPusherTable(zmqSubscriberAddress,resultSchema)
//创建流引擎
demoEngine = createReactiveStateEngine(name="reactiveDemo", metrics=<[TradeTime,factor1(LastPx)]>, dummyTable=snapshotSchema, outputTable=pusherTable, keyColumn="SecurityID",keepOrder=true)
//做半年的因子
inputDS = replayDS(<select SecurityID, TradeTime, LastPx from tableHandle where date(TradeTime)<2020.07.01>, `TradeTime, `TradeTime)
//inputDS = replayDS(<select SecurityID, TradeTime, LastPx from tableHandle>, `TradeTime, `TradeTime)//做全年的因子
demoJobName="streamingFactorDemoJob"
//提交job供执行,4并行度
submitJob(demoJobName,"算因子全年",replay,inputDS,demoEngine, `TradeTime, `TradeTime, -1,false, 4)
//直接执行
//replay(inputDS,snapshotStreamInstance, `TradeTime, `TradeTime, -1,false, 4)