This repository has been archived by the owner on Feb 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimeprint.py
executable file
·68 lines (61 loc) · 2.07 KB
/
timeprint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python3
import confluent_kafka
import flow_messages_enriched_pb2 as api # this needs to be in the local path
import helpers as h # this needs to be in the local path
import sys
import ipaddress
from collections import defaultdict
from ssl import get_default_verify_paths
import time
with open("./authdata","r") as f:
lines = f.readlines()
group = lines[0].strip()
username = lines[1].strip()
password = lines[2].strip()
consumer = confluent_kafka.Consumer(
{
"bootstrap.servers": "kafka01.bwnf.belwue.de:9093,kafka02.bwnf.belwue.de:9093,kafka03.bwnf.belwue.de:9093,kafka04.bwnf.belwue.de:9093,kafka05.bwnf.belwue.de:9093",
"group.id": group,
"security.protocol": "sasl_ssl",
"ssl.ca.location": get_default_verify_paths().cafile,
"sasl.mechanisms": "PLAIN",
"sasl.username": username,
"sasl.password": password,
}
)
def go_to_oldest(consumer, partitions):
for p in partitions:
p.offset = confluent_kafka.OFFSET_BEGINNING
consumer.assign(partitions)
def go_to_newest(consumer, partitions):
for p in partitions:
p.offset = confluent_kafka.OFFSET_END
consumer.assign(partitions)
consumer.subscribe(['flow-messages-enriched'], on_assign=go_to_newest)
try:
lag = 0.0
count = 0
while True:
raw = consumer.poll()
if raw.error():
continue
else:
if count == 0:
flowmsg = api.FlowMessage()
flowmsg.ParseFromString(raw.value())
print("Received:", flowmsg.TimeReceived)
print("FlowStart:", flowmsg.TimeFlowStart)
print("FlowEnd:", flowmsg.TimeFlowEnd)
count += 1
flowmsg = api.FlowMessage()
flowmsg.ParseFromString(raw.value())
lag += int(time.time()) - flowmsg.TimeFlowEnd
if count >= 30000:
lag = lag/count
count = 1
sys.stdout.write('\x0d')
sys.stdout.write("{:.2f}".format(lag))
sys.stdout.flush()
except KeyboardInterrupt:
print()
consumer.close()