-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathappendix_4.1.2_streamComputationOfSmallInflowRate_main.dos
109 lines (97 loc) · 4.29 KB
/
appendix_4.1.2_streamComputationOfSmallInflowRate_main.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
def createResultTable(){
return table(
array(SYMBOL, 0) as SecurityID,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as smallBuyOrderAmount,
array(DOUBLE, 0) as smallSellOrderAmount,
array(DOUBLE, 0) as totalOrderAmount,
array(DOUBLE, 0) as factor)
}
def createTradeSchema(){
return table(
array(SYMBOL, 0) as SecurityID,
array(INT, 0) as BuyNo,
array(INT, 0) as SellNo,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as TradeAmount)
}
def createResult1Schema() {
return table(
array(INT, 0) as BuyNo,
array(SYMBOL, 0) as SecurityID,
array(INT, 0) as SellNo,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as TradeAmount,
array(DOUBLE, 0) as BuyCumAmount,
array(DOUBLE, 0) as PrevBuyCumAmount,
array(INT, 0) as BuyOrderFlag,
array(INT, 0) as PrevBuyOrderFlag)
}
def createResult2Schema() {
return table(
array(INT, 0) as SellNo,
array(INT, 0) as BuyNo,
array(SYMBOL, 0) as SecurityID,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as TradeAmount,
array(DOUBLE, 0) as BuyCumAmount,
array(DOUBLE, 0) as PrevBuyCumAmount,
array(INT, 0) as BuyOrderFlag,
array(INT, 0) as PrevBuyOrderFlag,
array(DOUBLE, 0) as SellCumAmount,
array(DOUBLE, 0) as PrevSellCumAmount,
array(INT, 0) as SellOrderFlag,
array(INT, 0) as PrevSellOrderFlag)
}
def cleanStreamEngines(engineNames){
for(name in engineNames){
try{
dropStreamEngine(name)
}
catch(ex){}
}
}
@state
def factorOrderCumAmount(tradeAmount){
cumsumTradeAmount = cumsum(tradeAmount)
prevCumsumTradeAmount = prev(cumsumTradeAmount)
orderFlag = iif(cumsumTradeAmount<100000, 0, 1)
prevOrderFlag = prev(orderFlag)
return cumsumTradeAmount, prevCumsumTradeAmount, orderFlag, prevOrderFlag
}
@state
def factorSmallOrderNetAmountRatio(tradeAmount, sellCumAmount, sellOrderFlag, prevSellCumAmount, prevSellOrderFlag, buyCumAmount, buyOrderFlag, prevBuyCumAmount, prevBuyOrderFlag){
cumsumTradeAmount = cumsum(tradeAmount)
smallSellCumAmount, bigSellCumAmount = dynamicGroupCumsum(sellCumAmount, prevSellCumAmount, sellOrderFlag, prevSellOrderFlag, 2)
smallBuyCumAmount, bigBuyCumAmount = dynamicGroupCumsum(buyCumAmount, prevBuyCumAmount, buyOrderFlag, prevBuyOrderFlag, 2)
f = (smallBuyCumAmount - smallSellCumAmount) \ cumsumTradeAmount
return smallBuyCumAmount, smallSellCumAmount, cumsumTradeAmount, f
}
def createStreamEngine(result){
tradeSchema = createTradeSchema()
result1Schema = createResult1Schema()
result2Schema = createResult2Schema()
engineNames = ["rse1", "rse2", "rse3"]
cleanStreamEngines(engineNames)
metrics3 = <[TradeTime, factorSmallOrderNetAmountRatio(tradeAmount, sellCumAmount, sellOrderFlag, prevSellCumAmount, prevSellOrderFlag, buyCumAmount, buyOrderFlag, prevBuyCumAmount, prevBuyOrderFlag)]>
rse3 = createReactiveStateEngine(name=engineNames[2], metrics=metrics3, dummyTable=result2Schema, outputTable=result, keyColumn="SecurityID")
metrics2 = <[BuyNo, SecurityID, TradeTime, TradeAmount, BuyCumAmount, PrevBuyCumAmount, BuyOrderFlag, PrevBuyOrderFlag, factorOrderCumAmount(TradeAmount)]>
rse2 = createReactiveStateEngine(name=engineNames[1], metrics=metrics2, dummyTable=result1Schema, outputTable=rse3, keyColumn="SellNo")
metrics1 = <[SecurityID, SellNo, TradeTime, TradeAmount, factorOrderCumAmount(TradeAmount)]>
return createReactiveStateEngine(name=engineNames[0], metrics=metrics1, dummyTable=tradeSchema, outputTable=rse2, keyColumn="BuyNo")
}
result = createResultTable()
rse = createStreamEngine(result)
insert into rse values(`000155, 1000, 1001, 2020.01.01T09:30:00, 20000)
insert into rse values(`000155, 1000, 1002, 2020.01.01T09:30:01, 40000)
insert into rse values(`000155, 1000, 1003, 2020.01.01T09:30:02, 60000)
insert into rse values(`000155, 1004, 1003, 2020.01.01T09:30:03, 30000)
select * from result
/*
SecurityID TradeTime smallBuyOrderAmount smallSellOrderAmount totalOrderAmount factor
---------- ------------------- ------------------- -------------------- ---------------- ------
000155 2020.01.01T09:30:00 20000 20000 20000 0
000155 2020.01.01T09:30:01 60000 60000 60000 0
000155 2020.01.01T09:30:02 0 120000 120000 -1
000155 2020.01.01T09:30:03 30000 150000 150000 -0.8
*/