forked from swar/Swar-Chia-Plot-Manager
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stateless-manager.py
110 lines (95 loc) · 5.19 KB
/
stateless-manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import logging
import time
from datetime import datetime, timedelta
from plotmanager.library.parse.configuration import get_config_info
from plotmanager.library.utilities.jobs import has_active_jobs_and_work, load_jobs, monitor_jobs_to_start
from plotmanager.library.utilities.log import check_log_progress
from plotmanager.library.utilities.processes import get_running_plots, get_system_drives
chia_location, log_directory, config_jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
minimum_minutes_between_jobs, progress_settings, notification_settings, debug_level, view_settings, \
instrumentation_settings = get_config_info()
logging.basicConfig(format='%(asctime)s [%(levelname)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=debug_level)
logging.info(f'Debug Level: {debug_level}')
logging.info(f'Chia Location: {chia_location}')
logging.info(f'Log Directory: {log_directory}')
logging.info(f'Jobs: {config_jobs}')
logging.info(f'Manager Check Interval: {manager_check_interval}')
logging.info(f'Max Concurrent: {max_concurrent}')
logging.info(f'Max for Phase 1: {max_for_phase_1}')
logging.info(f'Minimum Minutes between Jobs: {minimum_minutes_between_jobs}')
logging.info(f'Progress Settings: {progress_settings}')
logging.info(f'Notification Settings: {notification_settings}')
logging.info(f'View Settings: {view_settings}')
logging.info(f'Instrumentation Settings: {instrumentation_settings}')
logging.info(f'Loading jobs into objects.')
jobs = load_jobs(config_jobs)
next_log_check = datetime.now()
next_job_work = {}
running_work = {}
logging.info(f'Grabbing system drives.')
system_drives = get_system_drives()
logging.info(f"Found System Drives: {system_drives}")
logging.info(f'Grabbing running plots.')
jobs, running_work = get_running_plots(jobs=jobs, running_work=running_work,
instrumentation_settings=instrumentation_settings)
for job in jobs:
next_job_work[job.name] = datetime.now()
max_date = None
for pid in job.running_work:
work = running_work[pid]
start = work.datetime_start
if not max_date or start > max_date:
max_date = start
initial_delay_date = datetime.now() + timedelta(minutes=job.initial_delay_minutes)
if job.initial_delay_minutes:
next_job_work[job.name] = initial_delay_date
if not max_date:
continue
max_date = max_date + timedelta(minutes=job.stagger_minutes)
if job.initial_delay_minutes and initial_delay_date > max_date:
logging.info(f'{job.name} Found. Setting initial dalay date to {next_job_work[job.name]} which is '
f'{job.initial_delay_minutes} minutes.')
continue
next_job_work[job.name] = max_date
logging.info(f'{job.name} Found. Setting next stagger date to {next_job_work[job.name]}')
if minimum_minutes_between_jobs and len(running_work.keys()) > 0:
logging.info(f'Checking to see if stagger needs to be altered due to minimum_minutes_between_jobs. '
f'Value: {minimum_minutes_between_jobs}')
maximum_start_date = max([work.datetime_start for work in running_work.values()])
minimum_stagger = maximum_start_date + timedelta(minutes=minimum_minutes_between_jobs)
logging.info(f'All dates: {[work.datetime_start for work in running_work.values()]}')
logging.info(f'Calculated Latest Job Start Date: {maximum_start_date}')
logging.info(f'Calculated Minimum Stagger: {minimum_stagger}')
for job_name in next_job_work:
if next_job_work[job_name] > minimum_stagger:
logging.info(f'Skipping stagger for {job_name}. Stagger is larger than minimum_minutes_between_jobs. '
f'Minimum: {minimum_stagger}, Current: {next_job_work[job_name]}')
continue
next_job_work[job_name] = minimum_stagger
logging.info(f'Setting a new stagger for {job_name}. minimum_minutes_between_jobs is larger than assigned '
f'stagger. Minimum: {minimum_stagger}, Current: {next_job_work[job_name]}')
logging.info(f'Starting loop.')
while has_active_jobs_and_work(jobs):
# CHECK LOGS FOR DELETED WORK
logging.info(f'Checking log progress..')
check_log_progress(jobs=jobs, running_work=running_work, progress_settings=progress_settings,
notification_settings=notification_settings, view_settings=view_settings,
instrumentation_settings=instrumentation_settings)
next_log_check = datetime.now() + timedelta(seconds=manager_check_interval)
# DETERMINE IF JOB NEEDS TO START
logging.info(f'Monitoring jobs to start.')
jobs, running_work, next_job_work, next_log_check = monitor_jobs_to_start(
jobs=jobs,
running_work=running_work,
max_concurrent=max_concurrent,
max_for_phase_1=max_for_phase_1,
next_job_work=next_job_work,
chia_location=chia_location,
log_directory=log_directory,
next_log_check=next_log_check,
minimum_minutes_between_jobs=minimum_minutes_between_jobs,
system_drives=system_drives,
)
logging.info(f'Sleeping for {manager_check_interval} seconds.')
time.sleep(manager_check_interval)
logging.info(f'Manager has exited loop because there are no more active jobs.')