-
Notifications
You must be signed in to change notification settings - Fork 3
/
harvest.py
66 lines (44 loc) · 1.11 KB
/
harvest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import json
import threading
import time
from database import init_database, save_ip
from rmq_queue import RmqChannel, RmqQueueName
from statuses import *
# time to initialize for other services
# time.sleep(30)
chan = RmqChannel()
chan.declare_queue(RmqQueueName.FROM_WORKER)
total = 0
hiks = 0
http = 0
def recv(ch, method, properties, body):
"""
receiving data from worker_grab_users
:param ch:
:param method:
:param properties:
:param body:
:return:
"""
global total
global http, hiks
result = json.loads(body.decode())
save_ip(result['ip'], result['status'])
if result['status'] == STATUS_HAS_HTTP:
http += 1
else:
hiks += 1
# acknowledge
total += 1
if total % 5 == 0:
print("total saved: %d, http: %d, hikcams: %d" % (total, http, hiks))
ch.basic_ack(delivery_tag=method.delivery_tag)
def recv_ips() -> None:
"""
starts receiving users from the queue
"""
chan.receive(RmqQueueName.FROM_WORKER, recv)
# start consumer
threading.Thread(target=chan.start).start()
init_database()
recv_ips()