-
Notifications
You must be signed in to change notification settings - Fork 0
/
echkpnt.criu
executable file
·168 lines (139 loc) · 6.06 KB
/
echkpnt.criu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/path/to/venv/bin/python
# This script is intended to be run as normal users, through LSF with their
# normal permissions. The use of capabilities to grant extra permissions only
# applies to the criu executable and is dropped after, as explained in
# confluence:
# https://ssg-confluence.internal.sanger.ac.uk/pages/viewpage.action?pageId=153044333
# Copyright International Business Machines Corp, 2021
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import psutil
import logging
import logging.handlers
import subprocess
import json
import traceback
import criucommon as cmn
def fail_cleanup(log):
chkpnt_dir = os.environ.get('LSB_CHKPNT_DIR', "")
if chkpnt_dir == "":
log.error("could not get chkpnt dir to write failure state.")
return
try:
with open(f'{os.environ["LSB_CHKPNT_DIR"]}/.checkpoint-state.json', 'r') as f:
log.debug("loaded checkpoint state")
data = json.load(f)
data['should_restart'] = False
data['chkpnt_state'] = 0x4
with open(f'{os.environ["LSB_CHKPNT_DIR"]}/.checkpoint-state.json', 'w') as f:
data = f.write(json.dumps(data))
except FileNotFoundError:
log.debug("could not load checkpoint state to dump it")
except json.JSONDecodeError:
log.error("corrupted JSON file, cannot record state")
except Exception as e:
print(f"Unhandled exception during writing chkpnt err! trace:\n"
f"{''.join(traceback.format_exception(e))}")
def run_it(logger, cmd, loglevel=logging.DEBUG):
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
if out is not None and out.strip() != '':
logger.log(loglevel, out)
if err is not None and err.strip() != '':
logger.log(loglevel, err)
proc.wait()
return proc.returncode
def main(argv, log):
log.debug("start checkpointing ......")
log.debug(argv)
log.debug("getting PIDs for the job")
out = subprocess.run(f"bjobs -o pids -json {os.environ['LSB_JOBID']}",
shell=True, capture_output=True)
try:
pids_str = json.loads(out.stdout.decode())['RECORDS'][0]['PIDS']
except KeyError:
log.error("incorrectly formed JSON. checkpointing failed.")
sys.exit(1)
except json.JSONDecodeError:
log.error("incorrectly formed JSON. checkpointing failed.")
sys.exit(1)
pids = pids_str.split(',')
# get process id for running job to checkpoint
try:
p = psutil.Process(int(pids[0]))
except ValueError:
log.warn("Could not read pids from LSF. This can sometimes occur near "
"the start of the job. Aborting for now, retry later.")
return
# we need to find the pid of the process which is actually running the job,
# LSF sometimes counts its own 'res' process as part of the jobpids listed
# in the environment var, so we need to inspect the parent process of the
# processes in the jobpids, and if its 'res' then we actually need to
# checkpoint the child of res. if it is not res, then we can just
# checkpoint that process.
jobpid = str(p.pid)
log.debug('p: ' + str(p.pid))
if 'res' in p.name():
jobpid = str(p.children()[0].pid)
log.debug('job process id: ' + jobpid)
# make checkpoint command line
cmd = ["/usr/local/bin/criu", "dump", "-D", os.environ['LSB_CHKPNT_DIR'],
"-t", jobpid, "--shell-job", "--unprivileged"]
if '-k' not in argv:
cmd.append('--leave-running')
log.debug(cmd)
# checkpoint
ret = run_it(log, cmd)
log.info(f"criu command exited with code = {ret}, checkpointing attempt complete.")
# if we are doing a checkpoint for an overrunning job, we need to log some
# extra data
try:
log.debug("attempt to write checkpoint state if required")
with open(f'{os.environ["LSB_CHKPNT_DIR"]}/.checkpoint-state.json', 'r') as f:
log.debug("file exists")
data = json.load(f)
log.debug("json read in")
if data['chkpnt_state'] == 0x2: # value for STARTED from hijack.py
if ret == 0:
data['chkpnt_state'] = 0x8 # value for SUCCESS from hijack.py
else:
data['chkpnt_state'] = 0x4 # value for FAILED from hijack.py
log.debug("state set")
with open(f'{os.environ["LSB_CHKPNT_DIR"]}/.checkpoint-state.json', 'w') as f:
json.dump(data, f)
else:
log.debug("checkpoint state != started, not overrunning job")
log.debug("json dumped")
except FileNotFoundError:
log.debug("not an overrunning job - not dumping overrun state")
except json.JSONDecodeError:
log.error("corrupted JSON file, cannot record state")
except Exception as e:
print(f"Unhandled exception! trace:\n{''.join(traceback.format_exception(e))}")
sys.exit(ret)
if __name__ == "__main__":
log = None
try:
# send logs to a log file in the checkpoint directory of INFO or above
log = cmn.set_log(f"/{os.environ['LSB_CHKPNT_DIR']}/lsf-job-{os.environ['LSB_JOBID']}.cr.log",
logging.INFO)
# send logs to syslog of level ERROR or above
# set_log(None, logging.ERROR)
main(sys.argv[1:], log)
except Exception as e:
with open(f'{os.environ["LSB_CHKPNT_DIR"]}/lsf-job-ERR-{os.environ["LSB_JOBID"]}.log', 'w') as f:
f.write('ERROR IN CHECKPOINTING. stack trace: \n'
f'{"".join(traceback.format_exception(e))}')
fail_cleanup(log)