Skip to content

Commit

Permalink
Merge pull request #200 from HSF/flin
Browse files Browse the repository at this point in the history
Reformat code with autopep8 and black
  • Loading branch information
mightqxc authored Oct 5, 2023
2 parents 7855cda + 3257f4b commit a1e1617
Show file tree
Hide file tree
Showing 238 changed files with 11,658 additions and 12,135 deletions.
28 changes: 28 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
repos:
- repo: https://github.com/pre-commit/mirrors-autopep8
rev: v2.0.4
hooks:
- id: autopep8
# args: ["--global-config", "package/pyproject.toml"]
- repo: https://github.com/psf/black
rev: 23.9.1
hooks:
- id: black
# args: ["--config", "package/pyproject.toml"]
# exclude python 2 code which cannot be dealt with black
exclude: |
(?x)(
^pandaharvester/harvestermonitor/arc_monitor.py|
^pandaharvester/harvestermisc/arc_utils.py|
^pandaharvester/harvesterpayload/simple_wrapper_mpi.py|
^pandaharvester/harvestersubmitter/apfgrid_submitter.py|
^pandaharvester/harvestertest/dumpTable.py|
^pandaharvester/harvestertest/getQueuedata.py|
^pandaharvester/harvestermessenger/arc_messenger.py|
^pandaharvester/harvestersubmitter/arc_submitter.py|
^pandaharvester/harvestertest/stageOutTest_globus.py|
^pandaharvester/harvestertest/stageInTest_go_bulk_preparator.py|
^pandaharvester/harvesterpayload/ATLAS_simple_wrapper_mpi.py|
^pandaharvester/harvestercloud/google_startup_script.py|
^&
)
98 changes: 47 additions & 51 deletions examples/k8s/k8s_atlas_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,67 +8,66 @@

from kubernetes import client, config, watch

config.load_kube_config(config_file=os.environ.get('KUBECONFIG'))
config.load_kube_config(config_file=os.environ.get("KUBECONFIG"))
corev1 = client.CoreV1Api()
scheduler_name = 'atlas_scheduler'
scheduler_name = "atlas_scheduler"


def node_allocatable_map(node_status_allocatable):
cpu_str = node_status_allocatable['cpu']
memory_str = node_status_allocatable['memory']
mCpu = int(cpu_str)*1000
_m = re.match('^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$', memory_str)
cpu_str = node_status_allocatable["cpu"]
memory_str = node_status_allocatable["memory"]
mCpu = int(cpu_str) * 1000
_m = re.match("^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$", memory_str)
if _m is None:
print('No memory allocatable in node')
print("No memory allocatable in node")
memoryKB = 0
elif 'M' in _m.group(2):
memoryKB = int(float(_m.group(1))*2**10)
elif 'G' in _m.group(2):
memoryKB = int(float(_m.group(1))*2**20)
elif "M" in _m.group(2):
memoryKB = int(float(_m.group(1)) * 2**10)
elif "G" in _m.group(2):
memoryKB = int(float(_m.group(1)) * 2**20)
else:
memoryKB = int(float(_m.group(1)))
return {'mCpu': mCpu, 'memoryKB': memoryKB}
return {"mCpu": mCpu, "memoryKB": memoryKB}


def get_mcpu(containers):
mcpu_req = 0
for c in containers:
if hasattr(c.resources, 'requests'):
mcpu_req_str = c.resources.requests['cpu']
elif hasattr(c.resources, 'limits'):
mcpu_req_str = c.resources.limits['cpu']
if hasattr(c.resources, "requests"):
mcpu_req_str = c.resources.requests["cpu"]
elif hasattr(c.resources, "limits"):
mcpu_req_str = c.resources.limits["cpu"]
else:
mcpu_req_str = ''
_m = re.match('^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$', mcpu_req_str)
mcpu_req_str = ""
_m = re.match("^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$", mcpu_req_str)
if _m is None:
print('No cpu reources requests or limits specified')
print("No cpu reources requests or limits specified")
mcpu_req += 999
elif _m.group(2) == '':
mcpu_req += int(float(_m.group(1))*1000)
elif _m.group(2) == 'm':
elif _m.group(2) == "":
mcpu_req += int(float(_m.group(1)) * 1000)
elif _m.group(2) == "m":
mcpu_req += int(float(_m.group(1)))
else:
print('Invalid cpu reources requests or limits specified')
print("Invalid cpu reources requests or limits specified")
mcpu_req += 999
return mcpu_req


def get_allocated_resources(namespace='default'):
def get_allocated_resources(namespace="default"):
node_allocated_resources_map = {}
ret = corev1.list_namespaced_pod(namespace=namespace,
field_selector='status.phase!=Succeeded,status.phase!=Failed')
ret = corev1.list_namespaced_pod(namespace=namespace, field_selector="status.phase!=Succeeded,status.phase!=Failed")
for i in ret.items:
# pod_info = {}
# pod_info['name'] = i.metadata.name
# pod_info['status'] = i.status.phase
# pod_info['status_reason'] = i.status.conditions[0].reason if i.status.conditions else None
# pod_info['status_message'] = i.status.conditions[0].message if i.status.conditions else None
nodeName = getattr(i.spec, 'node_name', None)
nodeName = getattr(i.spec, "node_name", None)
if nodeName is None:
continue
node_allocated_resources_map.setdefault(nodeName, {})
node_allocated_resources_map[nodeName].setdefault('mCpu', 0)
node_allocated_resources_map[nodeName]['mCpu'] += get_mcpu(i.spec.containers)
node_allocated_resources_map[nodeName].setdefault("mCpu", 0)
node_allocated_resources_map[nodeName]["mCpu"] += get_mcpu(i.spec.containers)
return node_allocated_resources_map


Expand All @@ -78,21 +77,20 @@ def nodes_available():
for node in corev1.list_node().items:
node_name = node.metadata.name
for status in node.status.conditions:
if status.status == 'True' and status.type == 'Ready':
if status.status == "True" and status.type == "Ready":
node_allocatable_dict = node_allocatable_map(node.status.allocatable)
mcpu_available = node_allocatable_dict['mCpu'] \
- allocated_resources_map.get(node_name, {'mCpu': 0})['mCpu']
ready_nodes.append({'name': node_name, 'mCpu': mcpu_available})
ready_nodes = sorted(ready_nodes, key=(lambda x: x['mCpu']))
mcpu_available = node_allocatable_dict["mCpu"] - allocated_resources_map.get(node_name, {"mCpu": 0})["mCpu"]
ready_nodes.append({"name": node_name, "mCpu": mcpu_available})
ready_nodes = sorted(ready_nodes, key=(lambda x: x["mCpu"]))
return ready_nodes


def scheduler(name, node, namespace='default'):
def scheduler(name, node, namespace="default"):
target = client.V1ObjectReference()
target.kind = 'Node'
target.apiVersion = 'corev1'
target.kind = "Node"
target.apiVersion = "corev1"
target.name = node
print('target', target)
print("target", target)
meta = client.V1ObjectMeta()
meta.name = name
body = client.V1Binding(metadata=meta, target=target)
Expand All @@ -102,28 +100,26 @@ def scheduler(name, node, namespace='default'):
def main():
w = watch.Watch()
while True:
for event in w.stream(corev1.list_namespaced_pod, 'default', timeout_seconds=30):
pod = event['object']
if pod.status.phase == 'Pending' and not pod.spec.node_name \
and pod.spec.scheduler_name == scheduler_name:
for event in w.stream(corev1.list_namespaced_pod, "default", timeout_seconds=30):
pod = event["object"]
if pod.status.phase == "Pending" and not pod.spec.node_name and pod.spec.scheduler_name == scheduler_name:
for node_info in nodes_available():
pod_mcpu_req = get_mcpu(pod.spec.containers)
node_mcpu_free = node_info['mCpu']
to_bind = (pod_mcpu_req <= node_mcpu_free)
print('Node {0} has {1} mcpu ; pod requests {2} mcpu ; to_bind: {3}'.format(
node_info['name'], node_mcpu_free, pod_mcpu_req, to_bind))
node_mcpu_free = node_info["mCpu"]
to_bind = pod_mcpu_req <= node_mcpu_free
print("Node {0} has {1} mcpu ; pod requests {2} mcpu ; to_bind: {3}".format(node_info["name"], node_mcpu_free, pod_mcpu_req, to_bind))
if to_bind:
try:
print('Scheduling ' + pod.metadata.name)
res = scheduler(pod.metadata.name, node_info['name'])
print("Scheduling " + pod.metadata.name)
res = scheduler(pod.metadata.name, node_info["name"])
except ValueError as e:
print('ValueError (maybe harmless):', e)
print("ValueError (maybe harmless):", e)
except client.rest.ApiException as e:
print(json.loads(e.body)['message'])
print(json.loads(e.body)["message"])
finally:
break
time.sleep(2**-4)


if __name__ == '__main__':
if __name__ == "__main__":
main()
19 changes: 12 additions & 7 deletions git_hooks/pre-commit
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
#!/usr/bin/env bash

DATE=`date -u '+%d-%m-%Y %H:%M:%S'`
NAME=`git config --global user.name`
BRANCH=`git rev-parse --abbrev-ref HEAD`
FILE=pandaharvester/commit_timestamp.py
echo timestamp = \"$DATE on $BRANCH \(by $NAME\)\" > $FILE
git add $FILE
exit 0
basedir="$(dirname $0)/pre-commit.d"

for hook in $(ls -1 $basedir); do
bash $basedir/$hook
RESULT=$?
if [ $RESULT != 0 ]; then
echo "$hook returned non-zero: $RESULT, abort commit"
exit $RESULT
fi
done

exit 0
17 changes: 17 additions & 0 deletions git_hooks/pre-commit.d/a10-run_pre-commit
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env bash

# start templated
INSTALL_PYTHON=/usr/bin/python3
ARGS=(hook-impl --config=.pre-commit-config.yaml --hook-type=pre-commit)
# end templated

HERE="$(cd "$(dirname "$0")" && pwd)"
ARGS+=(--hook-dir "$HERE" -- "$@")

if [ -x "$INSTALL_PYTHON" ]; then
exec "$INSTALL_PYTHON" -mpre_commit "${ARGS[@]}"
elif command -v pre-commit > /dev/null; then
exec pre-commit "${ARGS[@]}"
else
echo '`pre-commit` not found. Skipped...' 1>&2
fi
10 changes: 10 additions & 0 deletions git_hooks/pre-commit.d/a99-commit_timestamp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env bash

DATE=`date -u '+%d-%m-%Y %H:%M:%S'`
NAME=`git config --global user.name`
BRANCH=`git rev-parse --abbrev-ref HEAD`
FILE=pandaharvester/commit_timestamp.py
echo timestamp = \"$DATE on $BRANCH \(by $NAME\)\" > $FILE
git add $FILE

exit 0
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "18-07-2023 14:53:35 on flin (by mightqxc)"
timestamp = "29-09-2023 13:31:07 on flin (by mightqxc)"
5 changes: 2 additions & 3 deletions pandaharvester/harvesterbody/agent_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

# base class for agents
class AgentBase(threading.Thread):

# constructor
def __init__(self, single_mode):
threading.Thread.__init__(self)
Expand All @@ -30,11 +29,11 @@ def terminated(self, wait_interval, randomize=True):
# get process identifier
def get_pid(self):
thread_id = self.ident if self.ident else 0
return '{0}_{1}-{2}'.format(self.hostname, self.os_pid, format(thread_id, 'x'))
return "{0}_{1}-{2}".format(self.hostname, self.os_pid, format(thread_id, "x"))

# make logger
def make_logger(self, base_log, token=None, method_name=None, send_dialog=True):
if send_dialog and hasattr(self, 'dbInterface'):
if send_dialog and hasattr(self, "dbInterface"):
hook = self.dbInterface
else:
hook = None
Expand Down
Loading

0 comments on commit a1e1617

Please sign in to comment.