-
Notifications
You must be signed in to change notification settings - Fork 77
/
02.calTradeCost_asofJoin.txt
75 lines (64 loc) · 3.4 KB
/
02.calTradeCost_asofJoin.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
calTradeCost_asofJoin.txt
Script to use asof join engine to calculate trade cost
DolphinDB Inc.
DolphinDB server version: 2.00.6 2022.05.31
Storage engine: TSDB
Last modification time: 2022.07.07
*/
//login account
login("admin", "123456")
def createStreamTableFunc(){
//create stream table: messageStream
colName = `msgTime`msgType`msgBody
colType = [TIMESTAMP,SYMBOL, BLOB]
messageTemp = streamTable(5000000:0, colName, colType)
enableTableShareAndPersistence(table=messageTemp, tableName="messageStream", asynWrite=true, compress=true, cacheSize=5000000, retentionMinutes=1440, flushMode=0, preCache=10000)
messageTemp = NULL
//create stream table: prevailingQuotes
colName = `TradeTime`SecurityID`Price`TradeQty`BidPX1`OfferPX1`TradeCost`SnapshotTime
colType = [TIME, SYMBOL, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, TIME]
prevailingQuotesTemp = streamTable(1000000:0, colName, colType)
enableTableShareAndPersistence(table=prevailingQuotesTemp, tableName="prevailingQuotes", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000)
prevailingQuotesTemp = NULL
}
createStreamTableFunc()
go
//get table schema
def createSchemaTable(dbName, tableName){
schema = loadTable(dbName, tableName).schema().colDefs
return table(1:0, schema.name, schema.typeString)
}
tradeSchema = createSchemaTable("dfs://trade", "trade")
snapshotSchema = createSchemaTable("dfs://snapshot", "snapshot")
// register asof join stream computing engine
joinEngine=createAsofJoinEngine(name="tradeJoinSnapshot", leftTable=tradeSchema, rightTable=snapshotSchema, outputTable=prevailingQuotes, metrics=<[Price, TradeQty, BidPX1, OfferPX1, abs(Price-(BidPX1+OfferPX1)/2), snapshotSchema.Time]>, matchingColumn=`SecurityID, timeColumn=`Time, useSystemTime=false, delayedTime=1)
def appendLeftStream(msg){
tempMsg = select * from msg where Price > 0 and Time>=09:30:00.000
getLeftStream(getStreamEngine(`tradeJoinSnapshot)).tableInsert(tempMsg)
}
//register filter stream computing engine and subscribe the stream tables
def filterAndParseStreamFunc(tradeSchema, snapshotSchema){
filter1 = dict(STRING,ANY)
filter1["condition"] = "trade"
filter1["handler"] = appendLeftStream
filter2 = dict(STRING,ANY)
filter2["condition"] = "snapshot"
filter2["handler"] = getRightStream(getStreamEngine(`tradeJoinSnapshot))
schema = dict(["trade", "snapshot"], [tradeSchema, snapshotSchema])
engine = streamFilter(name="streamFilter", dummyTable=messageStream, filter=[filter1, filter2], msgSchema=schema)
subscribeTable(tableName="messageStream", actionName="tradeJoinSnapshot", offset=-1, handler=engine, msgAsTable=true, reconnect=true)
}
filterAndParseStreamFunc(tradeSchema, snapshotSchema)
//replay history data
def replayStockMarketData(){
timeRS = cutPoints(09:15:00.000..15:00:00.000, 100)
tradeDS = replayDS(sqlObj=<select * from loadTable("dfs://trade", "trade") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS)
snapshotDS = replayDS(sqlObj=<select * from loadTable("dfs://snapshot", "snapshot") where Date =2020.12.31>, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS)
inputDict = dict(["trade", "snapshot"], [tradeDS, snapshotDS])
submitJob("replay", "replay for factor calculation", replay, inputDict, messageStream, `Date, `Time, 100000, true, 2)
}
replayStockMarketData()
//getRecentJobs()
//cancelJob("your jobId")
//select * from prevailingQuotes limit 100