-
Notifications
You must be signed in to change notification settings - Fork 0
/
assigner.py
85 lines (55 loc) · 2.03 KB
/
assigner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python2
# coding: utf-8
import logging
import time
from pykit import threadutil
from throttle_central import service
logger = logging.getLogger(__name__)
def wait_to_start(slot_number):
to_sleep = (slot_number + 0.5) - time.time()
if to_sleep <= 0:
to_sleep = 0.01
time.sleep(to_sleep)
def wait_until_slot_end(slot_number):
to_sleep = (slot_number + 1) - time.time()
if to_sleep <= 0:
to_sleep = 0.01
time.sleep(to_sleep)
def start_assign(context, assign_threads, slot_number):
for service_name, service_model in service.services.iteritems():
if service_name in assign_threads:
continue
th = threadutil.start_daemon_thread(
service_model['module'].assign, args=(context, slot_number,))
assign_threads[service_name] = th
def check_assign_complete(assign_threads):
for service_name in service.services.keys():
th = assign_threads[service_name]
if th.is_alive():
logger.error('assign thread for: %s, not complete at: %f' % (
service_name, time.time()))
else:
del(assign_threads[service_name])
def _run(context):
assign_threads = {}
while True:
slot_number = int(round(time.time()))
wait_to_start(slot_number)
logger.info('start to assign at time: %f' % time.time())
start_assign(context, assign_threads, slot_number)
wait_until_slot_end(slot_number)
check_assign_complete(assign_threads)
def run(context):
while True:
context['running'] = False
try:
logger.info('try to get lock at time: %f' % time.time())
with context['Lock']():
logger.info('got lock at time: %f' % time.time())
context['running'] = True
_run(context)
context['running'] = False
except Exception as e:
context['running'] = False
logger.exception('failed to run assigner: %s' % repr(e))
time.sleep(3)