Skip to content

Commit

Permalink
Update Deployment Logic:
Browse files Browse the repository at this point in the history
- Kubernetes: Ensure GlobalWorker is deployed even in single worker deployments.
- Flower: Configure the server to run on the GlobalWorker.
  • Loading branch information
Kostas Filippopolitis committed Jul 16, 2024
1 parent cab4379 commit f0f10f1
Show file tree
Hide file tree
Showing 18 changed files with 207 additions and 154 deletions.
100 changes: 31 additions & 69 deletions .github/workflows/prod_env_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,82 +203,44 @@ jobs:
- name: Wait for pods to get healthy
run: timeout 300 bash -c 'while true; do if kubectl get pods --no-headers | awk '\''{if ($2 != "1/1" && $2 != "2/2" && $2 != "3/3" && $2 != "4/4") exit 1;}'\''; then echo "All pods are ready!"; break; else kubectl get pods -o wide; sleep 20; fi done'

- name: Initialize MONETDB from mipdb container
- name: Load data models into localworkers and globalworker
run: |
for i in 0 1; do
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[$i].metadata.name")
kubectl exec $POD_NAME -c db-importer -- sh -c 'mipdb init'
done
GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb init'
- name: Load dementia data model into localworkers and globalworker
run: |
for i in 0 1; do
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[$i].metadata.name")
kubectl exec $POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/dementia_v_0_1/CDEsMetadata.json'
done
GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/dementia_v_0_1/CDEsMetadata.json'
- name: Load dementia dataset csvs into localworkers and globalworker
run: |
for suffix in {0..9}; do
if [ $suffix -eq 0 ] || [ $suffix -eq 3 ] || [ $suffix -eq 6 ]; then
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[0].metadata.name")
elif [ $suffix -eq 1 ] || [ $suffix -eq 4 ] || [ $suffix -eq 7 ]; then
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name")
else
POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
fi
for dataset in edsd ppmi desd-synthdata; do
kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/dementia_v_0_1/${dataset}${suffix}.csv -d dementia -v 0.1"
LOCALWORKER1=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[0].metadata.name")
LOCALWORKER2=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name")
GLOBALWORKER=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r ".items[0].metadata.name")
for POD in $LOCALWORKER1 $LOCALWORKER2 $GLOBALWORKER; do
kubectl exec $POD -c db-importer -- sh -c 'mipdb init'
for model in dementia_v_0_1 tbi_v_0_1 longitudinal_dementia_v_0_1; do
kubectl exec $POD -c db-importer -- sh -c "mipdb add-data-model /opt/data/${model}/CDEsMetadata.json"
done
done
- name: Load tbi data model into localworkers and globalworker
- name: Load dataset csvs into localworkers
run: |
for i in 0 1; do
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[$i].metadata.name")
kubectl exec $POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/tbi_v_0_1/CDEsMetadata.json'
done
GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/tbi_v_0_1/CDEsMetadata.json'
- name: Load tbi dataset csvs into localworkers and globalworker
run: |
for suffix in {0..9}; do
if [ $suffix -eq 0 ] || [ $suffix -eq 3 ] || [ $suffix -eq 6 ]; then
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[0].metadata.name")
elif [ $suffix -eq 1 ] || [ $suffix -eq 4 ] || [ $suffix -eq 7 ]; then
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name")
else
POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
fi
kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi${suffix}.csv -d tbi -v 0.1"
done
- name: Load longitudinal dementia data model into localworkers and globalworker
run: |
for i in 0 1; do
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[$i].metadata.name")
kubectl exec $POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/longitudinal_dementia_v_0_1/CDEsMetadata.json'
LOCALWORKER1=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[0].metadata.name")
LOCALWORKER2=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name")
GLOBALWORKER=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r ".items[0].metadata.name")
for model in dementia_v_0_1 tbi_v_0_1 longitudinal_dementia_v_0_1; do
for filepath in $(kubectl exec $GLOBALWORKER -c db-importer -- ls /opt/data/${model}); do
filepath=/opt/data/${model}/${filepath}
if [[ $filepath == *test.csv ]]; then
echo "Loading file: $filepath at $GLOBALWORKER"
kubectl exec $GLOBALWORKER -c db-importer -- mipdb add-dataset $filepath -d ${model%_v_*} -v 0.1
elif [[ $filepath == *.csv ]]; then
filename=$(basename $filepath)
suffix=$(echo $filename | grep -o '[0-9]*' | tail -1)
if (( suffix % 2 == 0 )); then
POD_NAME=$LOCALWORKER2
else
POD_NAME=$LOCALWORKER1
fi
echo "Loading file: $filepath at $POD_NAME"
kubectl exec $POD_NAME -c db-importer -- mipdb add-dataset $filepath -d ${model%_v_*} -v 0.1
fi
done
done
GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/longitudinal_dementia_v_0_1/CDEsMetadata.json'
- name: Load longitudinal dementia datasets into localworkers and globalworker
run: |
for suffix in {0..2}; do
if [ $suffix -eq 0 ]; then
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[0].metadata.name")
elif [ $suffix -eq 1 ]; then
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name")
else
POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
fi
kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/longitudinal_dementia_v_0_1/longitudinal_dementia${suffix}.csv -d longitudinal_dementia -v 0.1"
done
- name: Controller logs
run: kubectl logs -l app=exareme2-controller --tail -1
Expand Down
16 changes: 5 additions & 11 deletions exareme2/controller/services/flower/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,12 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto):
for worker in workers_info
]

server_task_handler, server_ip, server_id = (
task_handlers[0],
workers_info[0].ip,
workers_info[0].id,
global_worker = self.worker_landscape_aggregator.get_global_worker()
server_task_handler = self._create_worker_tasks_handler(
request_id, global_worker
)
if len(task_handlers) > 1:
global_worker = self.worker_landscape_aggregator.get_global_worker()
server_task_handler = self._create_worker_tasks_handler(
request_id, global_worker
)
server_ip = global_worker.ip
server_id = global_worker.id
server_ip = global_worker.ip
server_id = global_worker.id
# Garbage Collect
server_task_handler.garbage_collect()
for handler in task_handlers:
Expand Down
15 changes: 13 additions & 2 deletions kubernetes/templates/exareme2-globalnode.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{{ if gt .Values.localnodes 1.0}} # Single node deployment
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -26,6 +25,9 @@ spec:
- name: csv-data
hostPath:
path: {{ .Values.db.csvs_location }}
- name: credentials
hostPath:
path: {{ .Values.db.credentials_location }}
containers:
- name: monetdb
image: {{ .Values.exareme2_images.repository }}/exareme2_db:{{ .Values.exareme2_images.version }}
Expand All @@ -49,6 +51,10 @@ spec:
volumeMounts:
- mountPath: /home/monetdb
name: db-data
- mountPath: /opt/data
name: csv-data
- mountPath: /opt/credentials
name: credentials
startupProbe:
exec:
command:
Expand Down Expand Up @@ -120,6 +126,8 @@ spec:
- name: worker
image: {{ .Values.exareme2_images.repository }}/exareme2_worker:{{ .Values.exareme2_images.version }}
volumeMounts:
- mountPath: /opt/credentials
name: credentials
- mountPath: /opt/data
name: csv-data
env:
Expand Down Expand Up @@ -159,6 +167,10 @@ spec:
value: "guest"
- name: MONETDB_PUBLIC_PASSWORD
value: "guest"
- name: SQLITE_DB_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: SMPC_ENABLED
value: {{ quote .Values.smpc.enabled }}
{{ if .Values.smpc.enabled }}
Expand Down Expand Up @@ -190,4 +202,3 @@ spec:
- exareme2.worker.healthcheck
periodSeconds: 30
timeoutSeconds: 10
{{end}}
2 changes: 1 addition & 1 deletion mipdb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ WORKDIR $DATA_PATH
#######################################################
# Installing dependencies
#######################################################
RUN pip install mipdb==3.0.0 # Must be updated together with pyproject.toml
RUN pip install mipdb==3.0.2 # Must be updated together with pyproject.toml
RUN pip install click==8.1.2
RUN pip install pymonetdb==1.6.3 # Must be updated together with pyproject.toml

Expand Down
24 changes: 13 additions & 11 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,19 @@ def load_datasets(
:param worker_id_and_ports: A list of tuples containing worker identifiers and ports.
:param use_sockets: Flag to determine if data will be loaded via sockets.
"""
if len(worker_id_and_ports) == 1:
worker_id, port = worker_id_and_ports[0]
for file in filenames:
if file.endswith(".csv") and not file.endswith("test.csv"):
csv = os.path.join(dirpath, file)
message(
f"Loading dataset {pathlib.PurePath(csv).name} in MonetDB at port {port}...",
Level.HEADER,
)
cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}"
run(c, cmd)
return

# Load the first set of CSVs into the first worker
first_worker_csvs = sorted(
[
Expand Down Expand Up @@ -581,17 +594,6 @@ def load_test_datasets(
if not local_worker_id_and_ports:
raise Exception("Local worker config files cannot be loaded.")

# If only one local worker is specified, load the entire folder to that worker
if len(local_worker_id_and_ports) == 1:
worker_id, port = local_worker_id_and_ports[0]
cmd = f"poetry run mipdb load-folder {TEST_DATA_FOLDER} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}"
message(
f"Loading the folder '{TEST_DATA_FOLDER}' in MonetDB at port {port}...",
Level.HEADER,
)
run(c, cmd)
return

# Process each dataset in the TEST_DATA_FOLDER for local workers
for dirpath, dirnames, filenames in os.walk(TEST_DATA_FOLDER):
if "CDEsMetadata.json" not in filenames:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ def test_logistic_regression(get_algorithm_result):
}
input["type"] = "flower"
algorithm_result = get_algorithm_result("logistic_regression", input)
print(algorithm_result)
assert algorithm_result == {"accuracy": 0.63} or algorithm_result == {
"accuracy": 0.3819241982507289
}
assert algorithm_result == {"accuracy": 0.63}


def test_logistic_regression_with_filters(get_algorithm_result):
Expand Down Expand Up @@ -69,7 +66,4 @@ def test_logistic_regression_with_filters(get_algorithm_result):
}
input["type"] = "flower"
algorithm_result = get_algorithm_result("logistic_regression", input)
print(algorithm_result)
assert algorithm_result == {"accuracy": 0.7884615384615384} or algorithm_result == {
"accuracy": 0.22443181818181818
}
assert algorithm_result == {"accuracy": 0.7884615384615384}
14 changes: 12 additions & 2 deletions tests/algorithm_validation_tests/one_node_deployment_template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,21 @@ optional=false
enabled = false

[[workers]]
id = "localworker1"
role="LOCALWORKER"
id = "globalworker"
role="GLOBALWORKER"
rabbitmq_port=5670
monetdb_port=50000
local_monetdb_username="executor"
local_monetdb_password="executor"
public_monetdb_username="guest"
public_monetdb_password="guest"

[[workers]]
id = "localworker1"
role="LOCALWORKER"
rabbitmq_port=5671
monetdb_port=50001
local_monetdb_username="executor"
local_monetdb_password="executor"
public_monetdb_username="guest"
public_monetdb_password="guest"
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
export MONETDB_LOCAL_PASSWORD="global"
export MONETDB_LOCAL_PASSWORD="master"
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def test_logisticregression_algorithm():
"ppmi7",
"ppmi8",
"ppmi9",
"ppmi_test",
],
"filters": None,
},
Expand Down
5 changes: 5 additions & 0 deletions tests/prod_env_tests/test_get_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def expected_datasets_per_data_model():
"edsd7",
"edsd8",
"edsd9",
"edsd_test",
"ppmi0",
"ppmi1",
"ppmi2",
Expand All @@ -30,6 +31,7 @@ def expected_datasets_per_data_model():
"ppmi7",
"ppmi8",
"ppmi9",
"ppmi_test",
"desd-synthdata0",
"desd-synthdata1",
"desd-synthdata2",
Expand All @@ -40,6 +42,7 @@ def expected_datasets_per_data_model():
"desd-synthdata7",
"desd-synthdata8",
"desd-synthdata9",
"desd-synthdata_test",
},
"tbi:0.1": {
"dummy_tbi0",
Expand All @@ -52,11 +55,13 @@ def expected_datasets_per_data_model():
"dummy_tbi7",
"dummy_tbi8",
"dummy_tbi9",
"dummy_tbi_test",
},
"longitudinal_dementia:0.1": {
"longitudinal_dementia0",
"longitudinal_dementia1",
"longitudinal_dementia2",
"longitudinal_dementia_test",
},
}

Expand Down
Loading

0 comments on commit f0f10f1

Please sign in to comment.