diff --git a/.github/workflows/prod_env_tests.yml b/.github/workflows/prod_env_tests.yml index ba1eb6e41..9309839b8 100644 --- a/.github/workflows/prod_env_tests.yml +++ b/.github/workflows/prod_env_tests.yml @@ -203,82 +203,44 @@ jobs: - name: Wait for pods to get healthy run: timeout 300 bash -c 'while true; do if kubectl get pods --no-headers | awk '\''{if ($2 != "1/1" && $2 != "2/2" && $2 != "3/3" && $2 != "4/4") exit 1;}'\''; then echo "All pods are ready!"; break; else kubectl get pods -o wide; sleep 20; fi done' - - name: Initialize MONETDB from mipdb container + - name: Load data models into localworkers and globalworker run: | - for i in 0 1; do - POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[$i].metadata.name") - kubectl exec $POD_NAME -c db-importer -- sh -c 'mipdb init' - done - GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name') - kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb init' - - - name: Load dementia data model into localworkers and globalworker - run: | - for i in 0 1; do - POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[$i].metadata.name") - kubectl exec $POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/dementia_v_0_1/CDEsMetadata.json' - done - GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name') - kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/dementia_v_0_1/CDEsMetadata.json' - - - name: Load dementia dataset csvs into localworkers and globalworker - run: | - for suffix in {0..9}; do - if [ $suffix -eq 0 ] || [ $suffix -eq 3 ] || [ $suffix -eq 6 ]; then - POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[0].metadata.name") - elif [ $suffix -eq 1 ] || [ $suffix -eq 4 ] || [ $suffix -eq 7 ]; then - POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name") - else - POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name') - fi - for dataset in edsd ppmi desd-synthdata; do - kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/dementia_v_0_1/${dataset}${suffix}.csv -d dementia -v 0.1" + LOCALWORKER1=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[0].metadata.name") + LOCALWORKER2=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name") + GLOBALWORKER=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r ".items[0].metadata.name") + for POD in $LOCALWORKER1 $LOCALWORKER2 $GLOBALWORKER; do + kubectl exec $POD -c db-importer -- sh -c 'mipdb init' + for model in dementia_v_0_1 tbi_v_0_1 longitudinal_dementia_v_0_1; do + kubectl exec $POD -c db-importer -- sh -c "mipdb add-data-model /opt/data/${model}/CDEsMetadata.json" done done - - name: Load tbi data model into localworkers and globalworker + - name: Load dataset csvs into localworkers run: | - for i in 0 1; do - POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[$i].metadata.name") - kubectl exec $POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/tbi_v_0_1/CDEsMetadata.json' - done - GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name') - kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/tbi_v_0_1/CDEsMetadata.json' - - - name: Load tbi dataset csvs into localworkers and globalworker - run: | - for suffix in {0..9}; do - if [ $suffix -eq 0 ] || [ $suffix -eq 3 ] || [ $suffix -eq 6 ]; then - POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[0].metadata.name") - elif [ $suffix -eq 1 ] || [ $suffix -eq 4 ] || [ $suffix -eq 7 ]; then - POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name") - else - POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name') - fi - kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi${suffix}.csv -d tbi -v 0.1" - done - - - name: Load longitudinal dementia data model into localworkers and globalworker - run: | - for i in 0 1; do - POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[$i].metadata.name") - kubectl exec $POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/longitudinal_dementia_v_0_1/CDEsMetadata.json' + LOCALWORKER1=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[0].metadata.name") + LOCALWORKER2=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name") + GLOBALWORKER=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r ".items[0].metadata.name") + + for model in dementia_v_0_1 tbi_v_0_1 longitudinal_dementia_v_0_1; do + for filepath in $(kubectl exec $GLOBALWORKER -c db-importer -- ls /opt/data/${model}); do + filepath=/opt/data/${model}/${filepath} + if [[ $filepath == *test.csv ]]; then + echo "Loading file: $filepath at $GLOBALWORKER" + kubectl exec $GLOBALWORKER -c db-importer -- mipdb add-dataset $filepath -d ${model%_v_*} -v 0.1 + elif [[ $filepath == *.csv ]]; then + filename=$(basename $filepath) + suffix=$(echo $filename | grep -o '[0-9]*' | tail -1) + if (( suffix % 2 == 0 )); then + POD_NAME=$LOCALWORKER2 + else + POD_NAME=$LOCALWORKER1 + fi + echo "Loading file: $filepath at $POD_NAME" + kubectl exec $POD_NAME -c db-importer -- mipdb add-dataset $filepath -d ${model%_v_*} -v 0.1 + fi + done done - GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name') - kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/longitudinal_dementia_v_0_1/CDEsMetadata.json' - - name: Load longitudinal dementia datasets into localworkers and globalworker - run: | - for suffix in {0..2}; do - if [ $suffix -eq 0 ]; then - POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[0].metadata.name") - elif [ $suffix -eq 1 ]; then - POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name") - else - POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name') - fi - kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/longitudinal_dementia_v_0_1/longitudinal_dementia${suffix}.csv -d longitudinal_dementia -v 0.1" - done - name: Controller logs run: kubectl logs -l app=exareme2-controller --tail -1 diff --git a/exareme2/controller/services/flower/controller.py b/exareme2/controller/services/flower/controller.py index 57b2dc366..5cd51c282 100644 --- a/exareme2/controller/services/flower/controller.py +++ b/exareme2/controller/services/flower/controller.py @@ -70,18 +70,12 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto): for worker in workers_info ] - server_task_handler, server_ip, server_id = ( - task_handlers[0], - workers_info[0].ip, - workers_info[0].id, + global_worker = self.worker_landscape_aggregator.get_global_worker() + server_task_handler = self._create_worker_tasks_handler( + request_id, global_worker ) - if len(task_handlers) > 1: - global_worker = self.worker_landscape_aggregator.get_global_worker() - server_task_handler = self._create_worker_tasks_handler( - request_id, global_worker - ) - server_ip = global_worker.ip - server_id = global_worker.id + server_ip = global_worker.ip + server_id = global_worker.id # Garbage Collect server_task_handler.garbage_collect() for handler in task_handlers: diff --git a/kubernetes/templates/exareme2-globalnode.yaml b/kubernetes/templates/exareme2-globalnode.yaml index 1fa26aec2..81758743a 100644 --- a/kubernetes/templates/exareme2-globalnode.yaml +++ b/kubernetes/templates/exareme2-globalnode.yaml @@ -1,4 +1,3 @@ -{{ if gt .Values.localnodes 1.0}} # Single node deployment apiVersion: apps/v1 kind: Deployment metadata: @@ -26,6 +25,9 @@ spec: - name: csv-data hostPath: path: {{ .Values.db.csvs_location }} + - name: credentials + hostPath: + path: {{ .Values.db.credentials_location }} containers: - name: monetdb image: {{ .Values.exareme2_images.repository }}/exareme2_db:{{ .Values.exareme2_images.version }} @@ -49,6 +51,10 @@ spec: volumeMounts: - mountPath: /home/monetdb name: db-data + - mountPath: /opt/data + name: csv-data + - mountPath: /opt/credentials + name: credentials startupProbe: exec: command: @@ -120,6 +126,8 @@ spec: - name: worker image: {{ .Values.exareme2_images.repository }}/exareme2_worker:{{ .Values.exareme2_images.version }} volumeMounts: + - mountPath: /opt/credentials + name: credentials - mountPath: /opt/data name: csv-data env: @@ -159,6 +167,10 @@ spec: value: "guest" - name: MONETDB_PUBLIC_PASSWORD value: "guest" + - name: SQLITE_DB_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName - name: SMPC_ENABLED value: {{ quote .Values.smpc.enabled }} {{ if .Values.smpc.enabled }} @@ -190,4 +202,3 @@ spec: - exareme2.worker.healthcheck periodSeconds: 30 timeoutSeconds: 10 -{{end}} diff --git a/mipdb/Dockerfile b/mipdb/Dockerfile index 8b5c66412..876099aad 100644 --- a/mipdb/Dockerfile +++ b/mipdb/Dockerfile @@ -30,7 +30,7 @@ WORKDIR $DATA_PATH ####################################################### # Installing dependencies ####################################################### -RUN pip install mipdb==3.0.0 # Must be updated together with pyproject.toml +RUN pip install mipdb==3.0.2 # Must be updated together with pyproject.toml RUN pip install click==8.1.2 RUN pip install pymonetdb==1.6.3 # Must be updated together with pyproject.toml diff --git a/tasks.py b/tasks.py index c5b92f542..bdf530845 100644 --- a/tasks.py +++ b/tasks.py @@ -503,6 +503,19 @@ def load_datasets( :param worker_id_and_ports: A list of tuples containing worker identifiers and ports. :param use_sockets: Flag to determine if data will be loaded via sockets. """ + if len(worker_id_and_ports) == 1: + worker_id, port = worker_id_and_ports[0] + for file in filenames: + if file.endswith(".csv") and not file.endswith("test.csv"): + csv = os.path.join(dirpath, file) + message( + f"Loading dataset {pathlib.PurePath(csv).name} in MonetDB at port {port}...", + Level.HEADER, + ) + cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}" + run(c, cmd) + return + # Load the first set of CSVs into the first worker first_worker_csvs = sorted( [ @@ -581,17 +594,6 @@ def load_test_datasets( if not local_worker_id_and_ports: raise Exception("Local worker config files cannot be loaded.") - # If only one local worker is specified, load the entire folder to that worker - if len(local_worker_id_and_ports) == 1: - worker_id, port = local_worker_id_and_ports[0] - cmd = f"poetry run mipdb load-folder {TEST_DATA_FOLDER} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}" - message( - f"Loading the folder '{TEST_DATA_FOLDER}' in MonetDB at port {port}...", - Level.HEADER, - ) - run(c, cmd) - return - # Process each dataset in the TEST_DATA_FOLDER for local workers for dirpath, dirnames, filenames in os.walk(TEST_DATA_FOLDER): if "CDEsMetadata.json" not in filenames: diff --git a/tests/algorithm_validation_tests/flower/test_logistic_regression.py b/tests/algorithm_validation_tests/flower/test_logistic_regression.py index 74de56b97..de166f7d5 100644 --- a/tests/algorithm_validation_tests/flower/test_logistic_regression.py +++ b/tests/algorithm_validation_tests/flower/test_logistic_regression.py @@ -24,10 +24,7 @@ def test_logistic_regression(get_algorithm_result): } input["type"] = "flower" algorithm_result = get_algorithm_result("logistic_regression", input) - print(algorithm_result) - assert algorithm_result == {"accuracy": 0.63} or algorithm_result == { - "accuracy": 0.3819241982507289 - } + assert algorithm_result == {"accuracy": 0.63} def test_logistic_regression_with_filters(get_algorithm_result): @@ -69,7 +66,4 @@ def test_logistic_regression_with_filters(get_algorithm_result): } input["type"] = "flower" algorithm_result = get_algorithm_result("logistic_regression", input) - print(algorithm_result) - assert algorithm_result == {"accuracy": 0.7884615384615384} or algorithm_result == { - "accuracy": 0.22443181818181818 - } + assert algorithm_result == {"accuracy": 0.7884615384615384} diff --git a/tests/algorithm_validation_tests/one_node_deployment_template.toml b/tests/algorithm_validation_tests/one_node_deployment_template.toml index 8ce14cbdf..b9bc263ef 100644 --- a/tests/algorithm_validation_tests/one_node_deployment_template.toml +++ b/tests/algorithm_validation_tests/one_node_deployment_template.toml @@ -33,11 +33,21 @@ optional=false enabled = false [[workers]] -id = "localworker1" -role="LOCALWORKER" +id = "globalworker" +role="GLOBALWORKER" rabbitmq_port=5670 monetdb_port=50000 local_monetdb_username="executor" local_monetdb_password="executor" public_monetdb_username="guest" public_monetdb_password="guest" + +[[workers]] +id = "localworker1" +role="LOCALWORKER" +rabbitmq_port=5671 +monetdb_port=50001 +local_monetdb_username="executor" +local_monetdb_password="executor" +public_monetdb_username="guest" +public_monetdb_password="guest" diff --git a/tests/prod_env_tests/deployment_configs/kind_configuration/master/monetdb_password.sh b/tests/prod_env_tests/deployment_configs/kind_configuration/master/monetdb_password.sh index 2d34996e9..15af13bf9 100755 --- a/tests/prod_env_tests/deployment_configs/kind_configuration/master/monetdb_password.sh +++ b/tests/prod_env_tests/deployment_configs/kind_configuration/master/monetdb_password.sh @@ -1,2 +1,2 @@ #!/bin/bash -export MONETDB_LOCAL_PASSWORD="global" +export MONETDB_LOCAL_PASSWORD="master" diff --git a/tests/prod_env_tests/test_flower_logisticregression_validation.py b/tests/prod_env_tests/test_flower_logisticregression_validation.py index 9bf454df2..10c7cfe7c 100644 --- a/tests/prod_env_tests/test_flower_logisticregression_validation.py +++ b/tests/prod_env_tests/test_flower_logisticregression_validation.py @@ -21,6 +21,7 @@ def test_logisticregression_algorithm(): "ppmi7", "ppmi8", "ppmi9", + "ppmi_test", ], "filters": None, }, diff --git a/tests/prod_env_tests/test_get_datasets.py b/tests/prod_env_tests/test_get_datasets.py index 40df10e9c..36c82f864 100644 --- a/tests/prod_env_tests/test_get_datasets.py +++ b/tests/prod_env_tests/test_get_datasets.py @@ -20,6 +20,7 @@ def expected_datasets_per_data_model(): "edsd7", "edsd8", "edsd9", + "edsd_test", "ppmi0", "ppmi1", "ppmi2", @@ -30,6 +31,7 @@ def expected_datasets_per_data_model(): "ppmi7", "ppmi8", "ppmi9", + "ppmi_test", "desd-synthdata0", "desd-synthdata1", "desd-synthdata2", @@ -40,6 +42,7 @@ def expected_datasets_per_data_model(): "desd-synthdata7", "desd-synthdata8", "desd-synthdata9", + "desd-synthdata_test", }, "tbi:0.1": { "dummy_tbi0", @@ -52,11 +55,13 @@ def expected_datasets_per_data_model(): "dummy_tbi7", "dummy_tbi8", "dummy_tbi9", + "dummy_tbi_test", }, "longitudinal_dementia:0.1": { "longitudinal_dementia0", "longitudinal_dementia1", "longitudinal_dementia2", + "longitudinal_dementia_test", }, } diff --git a/tests/prod_env_tests/test_get_datasets_locations.py b/tests/prod_env_tests/test_get_datasets_locations.py index 4c4cb76df..46a6a6129 100644 --- a/tests/prod_env_tests/test_get_datasets_locations.py +++ b/tests/prod_env_tests/test_get_datasets_locations.py @@ -9,54 +9,59 @@ @pytest.fixture def expected_datasets_locations(): return { - "longitudinal_dementia:0.1": { - "longitudinal_dementia0": "localworker1", - "longitudinal_dementia1": "localworker2", - "longitudinal_dementia2": "localworker3", - }, - "tbi:0.1": { - "dummy_tbi0": "localworker1", - "dummy_tbi1": "localworker2", - "dummy_tbi3": "localworker2", - "dummy_tbi5": "localworker2", - "dummy_tbi7": "localworker2", - "dummy_tbi9": "localworker2", - "dummy_tbi2": "localworker3", - "dummy_tbi4": "localworker3", - "dummy_tbi6": "localworker3", - "dummy_tbi8": "localworker3", - }, "dementia:0.1": { "desd-synthdata0": "localworker1", - "edsd0": "localworker1", - "ppmi0": "localworker1", "desd-synthdata1": "localworker2", + "desd-synthdata2": "localworker1", "desd-synthdata3": "localworker2", + "desd-synthdata4": "localworker1", "desd-synthdata5": "localworker2", + "desd-synthdata6": "localworker1", "desd-synthdata7": "localworker2", + "desd-synthdata8": "localworker1", "desd-synthdata9": "localworker2", - "edsd2": "localworker2", - "edsd4": "localworker2", - "edsd6": "localworker2", - "edsd8": "localworker2", + "desd-synthdata_test": "master", + "edsd0": "localworker1", + "edsd1": "localworker2", + "edsd2": "localworker1", + "edsd3": "localworker2", + "edsd4": "localworker1", + "edsd5": "localworker2", + "edsd6": "localworker1", + "edsd7": "localworker2", + "edsd8": "localworker1", + "edsd9": "localworker2", + "edsd_test": "master", + "ppmi0": "localworker1", "ppmi1": "localworker2", + "ppmi2": "localworker1", "ppmi3": "localworker2", + "ppmi4": "localworker1", "ppmi5": "localworker2", + "ppmi6": "localworker1", "ppmi7": "localworker2", + "ppmi8": "localworker1", "ppmi9": "localworker2", - "desd-synthdata2": "localworker3", - "desd-synthdata4": "localworker3", - "desd-synthdata6": "localworker3", - "desd-synthdata8": "localworker3", - "edsd1": "localworker3", - "edsd3": "localworker3", - "edsd5": "localworker3", - "edsd7": "localworker3", - "edsd9": "localworker3", - "ppmi2": "localworker3", - "ppmi4": "localworker3", - "ppmi6": "localworker3", - "ppmi8": "localworker3", + "ppmi_test": "master", + }, + "longitudinal_dementia:0.1": { + "longitudinal_dementia0": "localworker1", + "longitudinal_dementia1": "localworker2", + "longitudinal_dementia2": "localworker1", + "longitudinal_dementia_test": "master", + }, + "tbi:0.1": { + "dummy_tbi0": "localworker2", + "dummy_tbi1": "localworker1", + "dummy_tbi2": "localworker1", + "dummy_tbi3": "localworker1", + "dummy_tbi4": "localworker1", + "dummy_tbi5": "localworker1", + "dummy_tbi6": "localworker1", + "dummy_tbi7": "localworker1", + "dummy_tbi8": "localworker1", + "dummy_tbi9": "localworker1", + "dummy_tbi_test": "master", }, } @@ -64,9 +69,4 @@ def expected_datasets_locations(): def test_get_dataset_location(expected_datasets_locations): request = requests.get(datasets_locations_url) response = json.loads(request.text) - for worker_id in response: - assert worker_id in expected_datasets_locations - for data_model in response[worker_id]: - assert data_model in expected_datasets_locations[worker_id] - for dataset in response[worker_id][data_model]: - assert dataset in response[worker_id][data_model] + assert response == expected_datasets_locations diff --git a/tests/standalone_tests/algorithms/flower/test_processes_garbage_collect.py b/tests/standalone_tests/algorithms/flower/test_processes_garbage_collect.py index 641bb1f31..f9ab9c25a 100644 --- a/tests/standalone_tests/algorithms/flower/test_processes_garbage_collect.py +++ b/tests/standalone_tests/algorithms/flower/test_processes_garbage_collect.py @@ -14,8 +14,10 @@ @pytest.mark.slow def test_processes_garbage_collect( + globalworker_worker_service, localworker1_worker_service, load_data_localworker1, + load_test_data_globalworker, controller_service_with_localworker1, localworker1_celery_app, ): @@ -29,6 +31,7 @@ def test_processes_garbage_collect( algorithm_name="logistic_regression", number_of_clients=1, server_address=f"{COMMON_IP}:8080", + csv_paths="dataset1.csv,dataset2.csv", ) localworker1_celery_app.get_result( async_result=async_result, @@ -46,6 +49,7 @@ def test_processes_garbage_collect( "ppmi1", "ppmi2", "ppmi3", + "ppmi_test", ], "filters": None, }, diff --git a/tests/standalone_tests/conftest.py b/tests/standalone_tests/conftest.py index f1a74d289..9e2812773 100644 --- a/tests/standalone_tests/conftest.py +++ b/tests/standalone_tests/conftest.py @@ -35,7 +35,6 @@ PULL_DOCKER_IMAGES_STR = os.getenv("PULL_DOCKER_IMAGES", "true") PULL_DOCKER_IMAGES = PULL_DOCKER_IMAGES_STR.lower() == "true" - this_mod_path = os.path.dirname(os.path.abspath(__file__)) TEST_ENV_CONFIG_FOLDER = path.join(this_mod_path, "testing_env_configs") TEST_DATA_FOLDER = Path(this_mod_path).parent / "test_data" @@ -56,7 +55,6 @@ HEALTHCHECK_URL = f"http://{COMMON_IP}:4500/healthcheck" - RABBITMQ_GLOBALWORKER_NAME = "rabbitmq_test_globalworker" RABBITMQ_LOCALWORKER1_NAME = "rabbitmq_test_localworker1" RABBITMQ_LOCALWORKER2_NAME = "rabbitmq_test_localworker2" @@ -81,7 +79,6 @@ RABBITMQ_SMPC_LOCALWORKER2_PORT = 60006 RABBITMQ_SMPC_LOCALWORKER2_ADDR = f"{COMMON_IP}:{str(RABBITMQ_SMPC_LOCALWORKER2_PORT)}" - DATASET_SUFFIXES_LOCALWORKER1 = [0, 1, 2, 3] DATASET_SUFFIXES_LOCALWORKER2 = [4, 5, 6] DATASET_SUFFIXES_LOCALWORKERTMP = [7, 8, 9] @@ -176,6 +173,8 @@ SMPC_PLAYER3_PORT3 = 14002 SMPC_CLIENT1_PORT = 9005 SMPC_CLIENT2_PORT = 9006 + + ##################################### @@ -380,6 +379,62 @@ def _init_database_monetdb_container(db_port, worker_id): print(f"\nDatabase ({monetdb_configs.ip}:{monetdb_configs.port}) initialized.") +def _load_test_data_monetdb_container(db_port, worker_id): + monetdb_configs = MonetDBConfigurations(db_port) + # Check if the database is already loaded + cmd = f"mipdb list-datasets {monetdb_configs.convert_to_mipdb_format()} --sqlite_db_path {TEST_DATA_FOLDER}/{worker_id}.db" + res = subprocess.run( + cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + if "There are no datasets" not in str(res.stdout): + print( + f"\nDatabase ({monetdb_configs.ip}:{monetdb_configs.port}) already loaded, continuing." + ) + return + + datasets_per_data_model = {} + # Load the test data folder into the dbs + for dirpath, dirnames, filenames in os.walk(TEST_DATA_FOLDER): + if "CDEsMetadata.json" not in filenames: + continue + cdes_file = os.path.join(dirpath, "CDEsMetadata.json") + with open(cdes_file) as data_model_metadata_file: + data_model_metadata = json.load(data_model_metadata_file) + data_model_code = data_model_metadata["code"] + data_model_version = data_model_metadata["version"] + data_model = f"{data_model_code}:{data_model_version}" + + print( + f"\nLoading data model '{data_model_code}:{data_model_version}' metadata to database ({monetdb_configs.ip}:{monetdb_configs.port})" + ) + cmd = f"mipdb add-data-model {cdes_file} {monetdb_configs.convert_to_mipdb_format()} --sqlite_db_path {TEST_DATA_FOLDER}/{worker_id}.db" + subprocess.run( + cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + csvs = sorted( + [f"{dirpath}/{file}" for file in filenames if file.endswith("test.csv")] + ) + + for csv in csvs: + cmd = f"mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} {monetdb_configs.convert_to_mipdb_format()} --sqlite_db_path {TEST_DATA_FOLDER}/{worker_id}.db" + subprocess.run( + cmd, + shell=True, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + print( + f"\nLoading dataset {pathlib.PurePath(csv).name} to database ({monetdb_configs.ip}:{monetdb_configs.port})" + ) + datasets_per_data_model[data_model] = pathlib.PurePath(csv).name + + print(f"\nData loaded to database ({monetdb_configs.ip}:{monetdb_configs.port})") + time.sleep(2) # Needed to avoid db crash while loading + return datasets_per_data_model + + def _load_data_monetdb_container(db_port, dataset_suffixes, worker_id): monetdb_configs = MonetDBConfigurations(db_port) # Check if the database is already loaded @@ -424,15 +479,12 @@ def _load_data_monetdb_container(db_port, dataset_suffixes, worker_id): for csv in csvs: cmd = f"mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} {monetdb_configs.convert_to_mipdb_format()} --sqlite_db_path {TEST_DATA_FOLDER}/{worker_id}.db" - _env = os.environ.copy() - _env["DATA_PATH"] = str(TEST_DATA_FOLDER) subprocess.run( cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env=_env, ) print( f"\nLoading dataset {pathlib.PurePath(csv).name} to database ({monetdb_configs.ip}:{monetdb_configs.port})" @@ -456,7 +508,7 @@ def init_data_globalworker(monetdb_globalworker): def load_data_localworker1(monetdb_localworker1): worker_config_file = LOCALWORKER1_CONFIG_FILE worker_id = get_worker_id(worker_config_file) - _init_database_monetdb_container(MONETDB_GLOBALWORKER_PORT, worker_id) + _init_database_monetdb_container(LOCALWORKER1_CONFIG_FILE, worker_id) loaded_datasets_per_data_model = _load_data_monetdb_container( MONETDB_LOCALWORKER1_PORT, DATASET_SUFFIXES_LOCALWORKER1, worker_id ) @@ -474,6 +526,17 @@ def load_data_localworker2(monetdb_localworker2): yield loaded_datasets_per_data_model +@pytest.fixture(scope="session") +def load_test_data_globalworker(monetdb_globalworker): + worker_config_file = GLOBALWORKER_CONFIG_FILE + worker_id = get_worker_id(worker_config_file) + _init_database_monetdb_container(MONETDB_GLOBALWORKER_PORT, worker_id) + loaded_datasets_per_data_model = _load_test_data_monetdb_container( + MONETDB_GLOBALWORKER_PORT, worker_id + ) + yield loaded_datasets_per_data_model + + @pytest.fixture(scope="function") def load_data_localworkertmp(monetdb_localworkertmp): worker_config_file = LOCALWORKERTMP_CONFIG_FILE diff --git a/tests/standalone_tests/controller/services/flower/test_pos_and_kw_args_in_algorithm_flow.py b/tests/standalone_tests/controller/services/flower/test_pos_and_kw_args_in_algorithm_flow.py index 96ea5d36e..0576fee42 100644 --- a/tests/standalone_tests/controller/services/flower/test_pos_and_kw_args_in_algorithm_flow.py +++ b/tests/standalone_tests/controller/services/flower/test_pos_and_kw_args_in_algorithm_flow.py @@ -8,8 +8,10 @@ @pytest.mark.slow def test_pos_and_kw_args_in_algorithm_flow( + globalworker_worker_service, localworker1_worker_service, load_data_localworker1, + load_test_data_globalworker, controller_service_with_localworker1, ): algorithm_name = "logistic_regression" @@ -23,6 +25,7 @@ def test_pos_and_kw_args_in_algorithm_flow( "ppmi1", "ppmi2", "ppmi3", + "ppmi_test", ], "filters": None, }, diff --git a/tests/standalone_tests/worker/worker_info/test_get_data_model_cdes.py b/tests/standalone_tests/worker/worker_info/test_get_data_model_cdes.py index a2baccecd..16d90c325 100644 --- a/tests/standalone_tests/worker/worker_info/test_get_data_model_cdes.py +++ b/tests/standalone_tests/worker/worker_info/test_get_data_model_cdes.py @@ -33,6 +33,7 @@ def get_test_cases_get_data_model_cdes(): "ppmi7": "PPMI_7", "ppmi8": "PPMI_8", "ppmi9": "PPMI_9", + "ppmi_test": "PPMI_TEST", "edsd0": "EDSD_0", "edsd1": "EDSD_1", "edsd2": "EDSD_2", @@ -43,6 +44,7 @@ def get_test_cases_get_data_model_cdes(): "edsd7": "EDSD_7", "edsd8": "EDSD_8", "edsd9": "EDSD_9", + "edsd_test": "EDSD_TEST", "desd-synthdata0": "DESD-synthdata_0", "desd-synthdata1": "DESD-synthdata_1", "desd-synthdata2": "DESD-synthdata_2", @@ -53,6 +55,7 @@ def get_test_cases_get_data_model_cdes(): "desd-synthdata7": "DESD-synthdata_7", "desd-synthdata8": "DESD-synthdata_8", "desd-synthdata9": "DESD-synthdata_9", + "desd-synthdata_test": "DESD-synthdata_TEST", }, ), "alzheimerbroadcategory": CommonDataElement( @@ -98,6 +101,7 @@ def get_test_cases_get_data_model_cdes(): "dummy_tbi7": "Dummy TBI_7", "dummy_tbi8": "Dummy TBI_8", "dummy_tbi9": "Dummy TBI_9", + "dummy_tbi_test": "Dummy TBI test", }, ), "gender_type": CommonDataElement( diff --git a/tests/test_data/globalworker.db b/tests/test_data/globalworker.db deleted file mode 100644 index 98651cbe4..000000000 Binary files a/tests/test_data/globalworker.db and /dev/null differ diff --git a/tests/test_data/localworker1.db b/tests/test_data/localworker1.db deleted file mode 100644 index e12f6409d..000000000 Binary files a/tests/test_data/localworker1.db and /dev/null differ diff --git a/tests/test_data/localworker2.db b/tests/test_data/localworker2.db deleted file mode 100644 index 7e5feb6c1..000000000 Binary files a/tests/test_data/localworker2.db and /dev/null differ