-
Notifications
You must be signed in to change notification settings - Fork 115
/
Copy pathcreate.sh
executable file
·147 lines (115 loc) · 3.32 KB
/
create.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/bin/bash
set -e
clickhouse client -n <<-EOSQL
CREATE DATABASE IF NOT EXISTS dictionaries;
CREATE DICTIONARY IF NOT EXISTS dictionaries.protocols (
proto UInt8,
name String,
description String
)
PRIMARY KEY proto
LAYOUT(FLAT())
SOURCE (FILE(path '/var/lib/clickhouse/user_files/protocols.csv' format 'CSVWithNames'))
LIFETIME(3600);
CREATE TABLE IF NOT EXISTS flows
(
time_received_ns UInt64,
time_flow_start_ns UInt64,
sequence_num UInt32,
sampling_rate UInt64,
sampler_address FixedString(16),
src_addr FixedString(16),
dst_addr FixedString(16),
src_as UInt32,
dst_as UInt32,
etype UInt32,
proto UInt32,
src_port UInt32,
dst_port UInt32,
bytes UInt64,
packets UInt64
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_num_consumers = 1,
kafka_topic_list = 'flows',
kafka_group_name = 'clickhouse',
kafka_format = 'Protobuf',
kafka_schema = 'flow.proto:FlowMessage';
CREATE TABLE IF NOT EXISTS flows_raw
(
date Date,
time_inserted_ns DateTime64(9),
time_received_ns DateTime64(9),
time_flow_start_ns DateTime64(9),
sequence_num UInt32,
sampling_rate UInt64,
sampler_address FixedString(16),
src_addr FixedString(16),
dst_addr FixedString(16),
src_as UInt32,
dst_as UInt32,
etype UInt32,
proto UInt32,
src_port UInt32,
dst_port UInt32,
bytes UInt64,
packets UInt64
) ENGINE = MergeTree()
PARTITION BY date
ORDER BY time_received_ns;
CREATE MATERIALIZED VIEW IF NOT EXISTS flows_raw_view TO flows_raw
AS SELECT
toDate(time_received_ns) AS date,
now() AS time_inserted_ns,
toDateTime64(time_received_ns/1000000000, 9) AS time_received_ns,
toDateTime64(time_flow_start_ns/1000000000, 9) AS time_flow_start_ns,
sequence_num,
sampling_rate,
sampler_address,
src_addr,
dst_addr,
src_as,
dst_as,
etype,
proto,
src_port,
dst_port,
bytes,
packets
FROM flows;
CREATE TABLE IF NOT EXISTS flows_5m
(
date Date,
timeslot DateTime,
src_as UInt32,
dst_as UInt32,
etypeMap Nested (
etype UInt32,
bytes UInt64,
packets UInt64,
count UInt64
),
bytes UInt64,
packets UInt64,
count UInt64
) ENGINE = SummingMergeTree()
PARTITION BY date
ORDER BY (date, timeslot, src_as, dst_as, \`etypeMap.etype\`);
CREATE MATERIALIZED VIEW IF NOT EXISTS flows_5m_view TO flows_5m
AS
SELECT
date,
toStartOfFiveMinute(time_received_ns) AS timeslot,
src_as,
dst_as,
[etype] AS \`etypeMap.etype\`,
[bytes] AS \`etypeMap.bytes\`,
[packets] AS \`etypeMap.packets\`,
[count] AS \`etypeMap.count\`,
sum(bytes) AS bytes,
sum(packets) AS packets,
count() AS count
FROM flows_raw
GROUP BY date, timeslot, src_as, dst_as, \`etypeMap.etype\`;
EOSQL