-
Notifications
You must be signed in to change notification settings - Fork 77
/
appendix_8.4_case4_streamTick.dos
202 lines (181 loc) · 6.58 KB
/
appendix_8.4_case4_streamTick.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
//确认登录实例,清理缓存,清理之前本会话其他脚本的变量
//定义各函数:
//清理流函数
def cleanStreamEngines(engineNames){
for(name in engineNames){
try{
dropStreamEngine(name)
}
catch(ex){}
}
}
//建库函数
def tickCreateDb(dfsPath){
if(existsDatabase(dfsPath)){
return
}
dbTime = database('', VALUE, 2020.01.01..2021.01.05);
dbSymbol = database('', HASH, [SYMBOL, 20])
dbHandle = database(dfsPath, COMPO, [dbTime, dbSymbol],engine='TSDB')
return dbHandle
}
//建表函数
def tickCreateTable(dfsPath,tableName){
//表已存在
if (existsTable(dfsPath,tableName)){
/*
dbTick=database(dfsPath)
dbTick.dropTable(tableName)
*/
return
}
schemaTableTick = table(
array(SYMBOL, 0) as SecurityID,
array(DATE, 0) as tradingDate,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as TradePrice,
array(DOUBLE, 0) as TradeQty,
array(DOUBLE, 0) as TradeAmount,
array(INT, 0) as BuyNo,
array(INT, 0) as SellNo)
dbTick=database(dfsPath)
tick=createPartitionedTable(dbHandle=dbTick, table=schemaTableTick, tableName=tableName, partitionColumns=`TradeTime`SecurityID, sortColumns=`SecurityID`TradeTime, keepDuplicates=ALL)
}
//按目标表schema创建空内存表函数
def createRamTableAsTargetTable(targetTable){
targetSchema=targetTable.schema();
sch =select name, typeString as type from targetSchema.colDefs
colName=sch.name
colType=sch.type
tickStreamTemp = table(10000:0, colName, colType)
return tickStreamTemp
}
//模拟单日单股票的数据
def formOneStockOneDay(SecurityID,tradingDate,yesterdayClose){
openMoment=09:30:00.000
closeMoment=15:00:00.000
oneDayTickCount=(closeMoment-openMoment)/1000//每天每股tick个数,每秒1个
//第二列,时刻,按范围造
timePartVec=(0..(oneDayTickCount-1))*1000+openMoment
tradetime=concatDateTime(tradingDate,timePartVec)
//第三列,成交价格,按范围造
rands=rand(2.0,oneDayTickCount)-1
coef=rands*0.1
targetCoef=1+coef
priceSeries=yesterdayClose*targetCoef
//第四列,成交手数,按p=0.5二项分布的右半边造
volumeRand=randBinomial(30,0.5,oneDayTickCount)-15//要正数的右半边
volumePool=volumeRand[volumeRand>0]//只取正数的
volumes=rand(volumePool,oneDayTickCount)//按手数池子取随机数
stockVolumes=volumes*100//手数*100=股数
//plot(volumes,tradetime,"testing")
sellSideOrderRefCount=oneDayTickCount/4 //卖单号张数
buySideOrderRefCount=sellSideOrderRefCount //买单号张数
allOrderPool=rand(1..oneDayTickCount,oneDayTickCount) //卖单号池子
modes=allOrderPool%2
oddNumbers=allOrderPool[bool(modes)]//奇数作为买单号池子
evenNumbers=allOrderPool[bool(modes==0)]//偶数作为买单号池子
buyOrderPool=oddNumbers
sellOrderPool=evenNumbers
randBuyNo=rand(buyOrderPool,oneDayTickCount)
randSellNo=rand(sellOrderPool,oneDayTickCount)
secVec=array(SYMBOL,oneDayTickCount)
secVec[:]=SecurityID
tradingDateVec=array(DATE,oneDayTickCount)
tradingDateVec[:]=tradingDate
onedayTable=table(
secVec as SecurityID,
tradingDateVec as tradingDate,
tradetime as TradeTime,
priceSeries as TradePrice,
stockVolumes as TradeQty,
priceSeries*stockVolumes as TradeAmount,
randBuyNo as BuyNo,
randSellNo as SellNo)
return onedayTable
}
//模拟单股票数据
def makeFakeTickPerStock(SecurityID){
tickPath="dfs://tick_SH_L2_TSDB"
fakeTickTableName="tick_SH_L2_TSDB"
fakeTsdbTable=loadTable(tickPath,fakeTickTableName)
//每股票全局
emptyTableInRam=createRamTableAsTargetTable(fakeTsdbTable)
startPrice=rand(150.0,1)[0]//模拟单股票全年起点价
dateRange=2020.01.01..2020.01.20 //指定模拟数据的交易日范围
//每天
for(tradingDate in dateRange){
//第二列,交易日,传入
todayTable=formOneStockOneDay(SecurityID,tradingDate,startPrice)
append!(emptyTableInRam,todayTable)
startPrice=last(todayTable.TradePrice)//更新收盘价给明天使用
}
append!(fakeTsdbTable,emptyTableInRam)
}
//模拟tick数据函数
def simulateTickData(){
//传入股票代码
codeNum=string(1..4000)
testFill=lpad(codeNum,6,"0")
for(SecurityID in testFill){
jobName="fill_tick_"+SecurityID
actionName=SecurityID
makeFakeTickPerStock(SecurityID)
submitJob(jobName,actionName,makeFakeTickPerStock,SecurityID)
print(jobName)
sleep(300)
}
}
//函数定义因子计算逻辑
@state
def buyTradeRatio(BuyNo,SellNo,TradeQty){
resultSeries=cumsum(iif(BuyNo>SellNo,TradeQty,0))\cumsum(TradeQty)
return resultSeries
}
//按数据路径定义表和流计算引擎
def createAllStreams(globalTickPath,globalTickTableName){
buyTradeRatioResultTable = table(
array(SYMBOL, 0) as SecurityID,
array(DATE, 0) as tradingDate,
array(TIMESTAMP, 0) as TradeTime,
array(DOUBLE, 0) as factor
)
tickTableHandle=loadTable(globalTickPath,globalTickTableName)
tempTable=select top 50 * from tickTableHandle//tick流数据源结构参照临时小表
//定义流数据源
inputDS = replayDS(<select * from tickTableHandle where tradingDate<2020.03.01>, `TradeTime, `TradeTime)//做两个月因子
engineName="reactiveDemo"
//创建流引擎
demoEngine = createReactiveStateEngine(name=engineName, metrics=<[TradeTime,buyTradeRatio(BuyNo,SellNo,TradeQty)]>, dummyTable=tempTable, outputTable=buyTradeRatioResultTable, keyColumn=["SecurityID","tradingDate"],keepOrder=true)
//定义job名
demoJobName="streamingbuyTradeRatioDemoJob"
//2.3启动因子的流计算
//提交job供执行
submitJob(demoJobName,"streamComputationOfBuyTradeRatio",replay,inputDS,demoEngine, `TradeTime, `TradeTime, -1,false, 4)
return buyTradeRatioResultTable
}
login("admin","123456");
clearAllCache();
go;
//程序参数
globalTickPath="dfs://tick_SH_L2_TSDB"
globalTickTableName="tick_SH_L2_TSDB"
//1.数据源
//1.1. 建库建表
//创建tick基础数据库
tickCreateDb(globalTickPath)
//创建tick表
tickCreateTable(globalTickPath,globalTickTableName)
//1.2造tick仿真数据
/*
用户如果自己有其他的数据源,也可以按照函数tickCreateTable所示的schemaTableTick表结构,写入tick数据表。跳过本段代码
A股tick数据,在压缩前约为3GB,按hash分区20,则每分区150MB上下。
本例举例计算的因子为buyTradeRatio,计算日内的主动买单成交量占所有成交的比例,只写入内存表。用户如果要保存计算结果,可以创建表结构如buyTradeRatioResultTable例子所示的分区表。
*/
simulateTickData()
//2.1 清理计算流引擎
cleanStreamEngines(["reactiveDemo"])
//2.2定义因子流,返回结果表
buyTradeRatioResultTable=createAllStreams(globalTickPath,globalTickTableName)
//2.3查看结果表(2.2的job启动后)
select top 50 * from buyTradeRatioResultTable