-
Notifications
You must be signed in to change notification settings - Fork 77
/
singleValueModeWrite.txt
64 lines (55 loc) · 2.34 KB
/
singleValueModeWrite.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def createDatabase(dbName,tableName, ps1, ps2){
tableSchema = table(1:0,`id`datetime`value,[INT,DATETIME,FLOAT]);
db1 = database("", VALUE, ps1)
db2 = database("", RANGE, ps2)
db = database(dbName,COMPO,[db1,db2])
dfsTable = db.createPartitionedTable(tableSchema,tableName,`datetime`id)
}
def generate1DayData(day, id, freqPerDay){
startTime = day.datetime()
idSize = size(id)
numRecords = freqPerDay * idSize
idVec = array(INT, numRecords)
for(i in 0:idSize) idVec[(i*freqPerDay) : ((i+1)*freqPerDay)] = id[i]
return table(idVec, take(startTime+0..(freqPerDay-1),numRecords) as datetime, rand(1.0, numRecords) as value)
}
def singleThreadWriting(id, startDay, days, freqPerDay, numIdPerPartition){
t = loadTable("dfs://svmDemo","sensors")
for(d in 0:days){
index=0
do{
t.append!(generate1DayData(startDay + d, id[index+0..(numIdPerPartition-1)], freqPerDay))
index += numIdPerPartition
}while (index < size(id))
}
}
def multipleThreadWriting(id, startDay, days, freqPerDay, numIdPerPartition, threads) {
//split id to multiple part for parallel writing
idCountPerThread = ceil(id.size()\threads/numIdPerPartition)*numIdPerPartition
ploop(singleThreadWriting{, startDay, days, freqPerDay, numIdPerPartition}, id.cut(idCountPerThread))
}
def mainJob(id, startDay, days, ps1, ps2, freqPerDay, numIdPerPartition, threads){
if(existsDatabase("dfs://svmDemo"))
dropDatabase("dfs://svmDemo")
createDatabase("dfs://svmDemo","sensors", ps1, ps2)
if(threads == 1)
submitJob("submit_singleThreadWriting", "write data", singleThreadWriting{id, startDay, days, freqPerDay, numIdPerPartition})
else
submitJob("submit_multipleThreadWriting", "write data", multipleThreadWriting{id, startDay, days, freqPerDay, numIdPerPartition, threads})
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
login("admin","123456")
freqPerDay=86400
numMachines=100
numMetrics=50
numMachinesPerPartition=2
numIdPerPartition=numMachinesPerPartition*numMetrics
ps1=2020.09.01..2020.12.31
ps2=(numMetrics*numMachinesPerPartition)*(0..(numMachines/numMachinesPerPartition))+1
id =1..(numMachines*numMetrics)
startDay=2020.09.01
//写入多少天的数据
days = 5
//多少个线程并行写入
threads = 20
mainJob(id, startDay, days, ps1, ps2, freqPerDay, numIdPerPartition, threads)