From 4ba9693db9e5779821bd003b1caf39a0b679d7a0 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Wed, 4 Sep 2024 17:30:41 +0200 Subject: [PATCH] Refactor/SK-936 | Clean up certs and remove /add_combiner logic (#653) --- .github/workflows/code-checks.yaml | 2 + MANIFEST.in | 2 + config/settings-client.yaml.local.template | 3 + config/settings-combiner.yaml.local.template | 31 +++++ config/settings-combiner.yaml.template | 19 +++ .../settings-controller.yaml.local.template | 24 ++++ docker-compose.yaml | 2 + examples/mnist-pytorch/client/python_env.yaml | 4 +- fedn/cli/combiner_cmd.py | 7 +- fedn/cli/run_cmd.py | 31 +++-- fedn/common/certificate/certificate.py | 40 +++--- fedn/common/config.py | 2 + fedn/network/api/interface.py | 115 +++++++----------- fedn/network/api/network.py | 19 +-- fedn/network/api/server.py | 6 +- fedn/network/clients/client.py | 34 ++---- fedn/network/clients/connect.py | 11 +- fedn/network/combiner/combiner.py | 104 ++++++++-------- fedn/network/combiner/interfaces.py | 7 -- fedn/network/combiner/roundhandler.py | 5 +- fedn/network/combiner/shared.py | 13 ++ fedn/network/grpc/server.py | 17 ++- .../storage/statestore/mongostatestore.py | 7 +- 23 files changed, 281 insertions(+), 224 deletions(-) create mode 100644 MANIFEST.in create mode 100644 config/settings-client.yaml.local.template create mode 100644 config/settings-combiner.yaml.local.template create mode 100644 config/settings-controller.yaml.local.template create mode 100644 fedn/network/combiner/shared.py diff --git a/.github/workflows/code-checks.yaml b/.github/workflows/code-checks.yaml index 8c48a3015..7098b0732 100644 --- a/.github/workflows/code-checks.yaml +++ b/.github/workflows/code-checks.yaml @@ -26,6 +26,8 @@ jobs: --exclude-dir='flower-client' --exclude='tests.py' --exclude='controller_cmd.py' + --exclude='combiner_cmd.py' + --exclude='run_cmd.py' --exclude='README.rst' '^[ \t]+(import|from) ' -I . diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..b1504311c --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include README.rst +include fedn/common/settings-controller.yaml.template \ No newline at end of file diff --git a/config/settings-client.yaml.local.template b/config/settings-client.yaml.local.template new file mode 100644 index 000000000..e48e779af --- /dev/null +++ b/config/settings-client.yaml.local.template @@ -0,0 +1,3 @@ +network_id: fedn-network +discover_host: localhost +discover_port: 8092 diff --git a/config/settings-combiner.yaml.local.template b/config/settings-combiner.yaml.local.template new file mode 100644 index 000000000..b49917389 --- /dev/null +++ b/config/settings-combiner.yaml.local.template @@ -0,0 +1,31 @@ +network_id: fedn-network + +name: combiner +host: localhost +address: localhost +port: 12080 +max_clients: 30 + +cert_path: tmp/server.crt +key_path: tmp/server.key + +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: localhost + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: localhost + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False + + diff --git a/config/settings-combiner.yaml.template b/config/settings-combiner.yaml.template index 8cef6643a..11911cc6f 100644 --- a/config/settings-combiner.yaml.template +++ b/config/settings-combiner.yaml.template @@ -7,4 +7,23 @@ host: combiner port: 12080 max_clients: 30 +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: mongo + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: minio + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False + diff --git a/config/settings-controller.yaml.local.template b/config/settings-controller.yaml.local.template new file mode 100644 index 000000000..a5266a38b --- /dev/null +++ b/config/settings-controller.yaml.local.template @@ -0,0 +1,24 @@ +network_id: fedn-network +controller: + host: localhost + port: 8092 + debug: True + +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: localhost + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: localhost + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False diff --git a/docker-compose.yaml b/docker-compose.yaml index c3620e79d..26291748f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -87,6 +87,8 @@ services: environment: - PYTHONUNBUFFERED=0 - GET_HOSTS_FROM=dns + - STATESTORE_CONFIG=/app/config/settings-combiner.yaml + - MODELSTORAGE_CONFIG=/app/config/settings-combiner.yaml build: context: . args: diff --git a/examples/mnist-pytorch/client/python_env.yaml b/examples/mnist-pytorch/client/python_env.yaml index afdea926f..f43d2353d 100644 --- a/examples/mnist-pytorch/client/python_env.yaml +++ b/examples/mnist-pytorch/client/python_env.yaml @@ -4,6 +4,6 @@ build_dependencies: - setuptools - wheel dependencies: - - torch==2.3.1 - - torchvision==0.18.1 + - torch + - torchvision - fedn diff --git a/fedn/cli/combiner_cmd.py b/fedn/cli/combiner_cmd.py index 02a797448..3e7753e80 100644 --- a/fedn/cli/combiner_cmd.py +++ b/fedn/cli/combiner_cmd.py @@ -3,8 +3,6 @@ import click import requests -from fedn.network.combiner.combiner import Combiner - from .main import main from .shared import CONTROLLER_DEFAULTS, apply_config, get_api_url, get_token, print_response @@ -12,8 +10,7 @@ @main.group("combiner") @click.pass_context def combiner_cmd(ctx): - """:param ctx: - """ + """:param ctx:""" pass @@ -60,6 +57,8 @@ def start_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, se click.echo(f"\nCombiner configuration loaded from file: {init}") click.echo("Values set in file override defaults and command line arguments...\n") + from fedn.network.combiner.combiner import Combiner + combiner = Combiner(config) combiner.run() diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index 082aaa7bb..0342bb39d 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -4,15 +4,15 @@ import click import yaml + +from fedn.cli.client_cmd import validate_client_config +from fedn.cli.main import main +from fedn.cli.shared import apply_config from fedn.common.exceptions import InvalidClientConfig from fedn.common.log_config import logger from fedn.network.clients.client import Client -from fedn.network.combiner.combiner import Combiner from fedn.utils.dispatcher import Dispatcher, _read_yaml_file -from fedn.cli.client_cmd import validate_client_config -from fedn.cli.main import main -from fedn.cli.shared import apply_config def get_statestore_config_from_file(init): """:param init: @@ -35,6 +35,7 @@ def check_helper_config_file(config): exit(-1) return helper + def check_yaml_exists(path): """Check if fedn.yaml exists in the given path.""" yaml_file = os.path.join(path, "fedn.yaml") @@ -43,21 +44,26 @@ def check_yaml_exists(path): click.echo(f"Could not find fedn.yaml in {path}") exit(-1) return yaml_file + + def delete_virtual_environment(dispatcher): if dispatcher.python_env_path: logger.info(f"Removing virtualenv {dispatcher.python_env_path}") shutil.rmtree(dispatcher.python_env_path) else: logger.warning("No virtualenv found to remove.") + + @main.group("run") @click.pass_context def run_cmd(ctx): - """:param ctx: - """ + """:param ctx:""" pass + + @run_cmd.command("validate") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") -@click.option("-i", "--input", required=True, help="Path to input model" ) +@click.option("-i", "--input", required=True, help="Path to input model") @click.option("-o", "--output", required=True, help="Path to write the output JSON containing validation metrics") @click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @click.pass_context @@ -82,9 +88,11 @@ def validate_cmd(ctx, path, input, output, keep_venv): dispatcher.run_cmd("validate {} {}".format(input, output)) if not keep_venv: delete_virtual_environment(dispatcher) + + @run_cmd.command("train") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") -@click.option("-i", "--input", required=True, help="Path to input model parameters" ) +@click.option("-i", "--input", required=True, help="Path to input model parameters") @click.option("-o", "--output", required=True, help="Path to write the updated model parameters ") @click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @click.pass_context @@ -109,6 +117,8 @@ def train_cmd(ctx, path, input, output, keep_venv): dispatcher.run_cmd("train {} {}".format(input, output)) if not keep_venv: delete_virtual_environment(dispatcher) + + @run_cmd.command("startup") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") @click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @@ -134,6 +144,7 @@ def startup_cmd(ctx, path, keep_venv): if not keep_venv: delete_virtual_environment(dispatcher) + @run_cmd.command("build") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") @click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @@ -173,7 +184,7 @@ def build_cmd(ctx, path, keep_venv): @click.option("-s", "--secure", required=False, default=False) @click.option("-pc", "--preshared-cert", required=False, default=False) @click.option("-v", "--verify", is_flag=True, help="Verify SSL/TLS for REST service") -@click.option("-c", "--preferred-combiner", required=False,type=str, default="",help="url to the combiner or name of the preferred combiner") +@click.option("-c", "--preferred-combiner", required=False, type=str, default="", help="url to the combiner or name of the preferred combiner") @click.option("-va", "--validator", required=False, default=True) @click.option("-tr", "--trainer", required=False, default=True) @click.option("-in", "--init", required=False, default=None, help="Set to a filename to (re)init client from file state.") @@ -310,5 +321,7 @@ def combiner_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, click.echo(f"\nCombiner configuration loaded from file: {init}") click.echo("Values set in file override defaults and command line arguments...\n") + from fedn.network.combiner.combiner import Combiner + combiner = Combiner(config) combiner.run() diff --git a/fedn/common/certificate/certificate.py b/fedn/common/certificate/certificate.py index 3cb09016c..547175a20 100644 --- a/fedn/common/certificate/certificate.py +++ b/fedn/common/certificate/certificate.py @@ -9,24 +9,27 @@ class Certificate: - """Utility to generate unsigned certificates. - - """ + """Utility to generate unsigned certificates.""" CERT_NAME = "cert.pem" KEY_NAME = "key.pem" BITS = 2048 - def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", create_dirs=True): - try: - os.makedirs(cwd) - except OSError: - logger.info("Directory exists, will store all cert and keys here.") + def __init__(self, name=None, key_path="", cert_path="", create_dirs=False): + if create_dirs: + try: + cwd = os.getcwd() + os.makedirs(cwd) + except OSError: + logger.info("Directory exists, will store all cert and keys here.") + else: + logger.info("Successfully created the directory to store cert and keys in {}".format(cwd)) + + self.key_path = os.path.join(cwd, "key.pem") + self.cert_path = os.path.join(cwd, "cert.pem") else: - logger.info("Successfully created the directory to store cert and keys in {}".format(cwd)) - - self.key_path = os.path.join(cwd, key_name) - self.cert_path = os.path.join(cwd, cert_name) + self.key_path = key_path + self.cert_path = cert_path if name: self.name = name @@ -36,9 +39,7 @@ def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", cre def gen_keypair( self, ): - """Generate keypair. - - """ + """Generate keypair.""" key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, 2048) cert = crypto.X509() @@ -73,8 +74,7 @@ def set_keypair_raw(self, certificate, privatekey): certfile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, certificate)) def get_keypair_raw(self): - """:return: - """ + """:return:""" with open(self.key_path, "rb") as keyfile: key_buf = keyfile.read() with open(self.cert_path, "rb") as certfile: @@ -82,16 +82,14 @@ def get_keypair_raw(self): return copy.deepcopy(cert_buf), copy.deepcopy(key_buf) def get_key(self): - """:return: - """ + """:return:""" with open(self.key_path, "rb") as keyfile: key_buf = keyfile.read() key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_buf) return key def get_cert(self): - """:return: - """ + """:return:""" with open(self.cert_path, "rb") as certfile: cert_buf = certfile.read() cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_buf) diff --git a/fedn/common/config.py b/fedn/common/config.py index 23d873ff7..517e57d94 100644 --- a/fedn/common/config.py +++ b/fedn/common/config.py @@ -15,6 +15,8 @@ FEDN_AUTH_REFRESH_TOKEN = os.environ.get("FEDN_AUTH_REFRESH_TOKEN", False) FEDN_CUSTOM_URL_PREFIX = os.environ.get("FEDN_CUSTOM_URL_PREFIX", "") + +FEDN_ALLOW_LOCAL_PACKAGE = os.environ.get("FEDN_ALLOW_LOCAL_PACKAGE", False) FEDN_PACKAGE_EXTRACT_DIR = os.environ.get("FEDN_PACKAGE_EXTRACT_DIR", "package") diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 9936e0bc0..3ffaadecf 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -1,5 +1,4 @@ -import base64 -import copy +import os import threading import uuid from io import BytesIO @@ -8,9 +7,9 @@ from werkzeug.security import safe_join from werkzeug.utils import secure_filename -from fedn.common.config import get_controller_config, get_network_config +from fedn.common.config import FEDN_ALLOW_LOCAL_PACKAGE, get_controller_config, get_network_config from fedn.common.log_config import logger -from fedn.network.combiner.interfaces import CombinerInterface, CombinerUnavailableError +from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.state import ReducerState, ReducerStateToString from fedn.utils.checksum import sha @@ -231,7 +230,7 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript file_name = file.filename storage_file_name = secure_filename(f"{str(uuid.uuid4())}.{extension}") - file_path = safe_join("/app/client/package/", storage_file_name) + file_path = safe_join(os.getcwd(), storage_file_name) file.save(file_path) self.control.set_compute_package(storage_file_name, file_path) @@ -247,7 +246,8 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript ), 400, ) - + # Delete the file after it has been saved + os.remove(file_path) return jsonify({"success": True, "message": "Compute package set."}) def _get_compute_package_name(self): @@ -371,19 +371,21 @@ def download_compute_package(self, name): mutex = threading.Lock() mutex.acquire() # TODO: make configurable, perhaps in config.py or package.py - return send_from_directory("/app/client/package/", name, as_attachment=True) + return send_from_directory(os.getcwd(), name, as_attachment=True) except Exception: try: data = self.control.get_compute_package(name) # TODO: make configurable, perhaps in config.py or package.py - file_path = safe_join("/app/client/package/", name) + file_path = safe_join(os.getcwd(), name) with open(file_path, "wb") as fh: fh.write(data) # TODO: make configurable, perhaps in config.py or package.py - return send_from_directory("/app/client/package/", name, as_attachment=True) + return send_from_directory(os.getcwd(), name, as_attachment=True) except Exception: raise finally: + # Delete the file after it has been saved + os.remove(file_path) mutex.release() def _create_checksum(self, name=None): @@ -398,7 +400,7 @@ def _create_checksum(self, name=None): name, message = self._get_compute_package_name() if name is None: return False, message, "" - file_path = safe_join("/app/client/package/", name) # TODO: make configurable, perhaps in config.py or package.py + file_path = safe_join(os.getcwd(), name) # TODO: make configurable, perhaps in config.py or package.py try: sum = str(sha(file_path)) except FileNotFoundError: @@ -502,58 +504,15 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por :return: Config of the combiner as a json response. :rtype: :class:`flask.Response` """ - # TODO: Any more required check for config? Formerly based on status: "retry" - if not self.control.idle(): - return jsonify( - { - "success": False, - "status": "retry", - "message": "Conroller is not in idle state, try again later. ", - } - ) - # Check if combiner already exists - combiner = self.control.network.get_combiner(combiner_id) - if not combiner: - if secure_grpc == "True": - certificate, key = self.certificate_manager.get_or_create(address).get_keypair_raw() - _ = base64.b64encode(certificate) - _ = base64.b64encode(key) - - else: - certificate = None - key = None - - combiner_interface = CombinerInterface( - parent=self._to_dict(), - name=combiner_id, - address=address, - fqdn=fqdn, - port=port, - certificate=copy.deepcopy(certificate), - key=copy.deepcopy(key), - ip=remote_addr, - ) - - self.control.network.add_combiner(combiner_interface) - - # Check combiner now exists - combiner = self.control.network.get_combiner(combiner_id) - if not combiner: - return jsonify({"success": False, "message": "Combiner not added."}) - payload = { - "success": True, - "message": "Combiner added successfully.", - "status": "added", - "storage": self.statestore.get_storage_backend(), - "statestore": self.statestore.get_config(), - "certificate": combiner.get_certificate(), - "key": combiner.get_key(), + "success": False, + "message": "Adding combiner via REST API is obsolete. Include statestore and object store config in combiner config.", + "status": "abort", } return jsonify(payload) - def add_client(self, client_id, preferred_combiner, remote_addr, name): + def add_client(self, client_id, preferred_combiner, remote_addr, name, package): """Add a client to the network. :param client_id: The client id to add. @@ -563,19 +522,37 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name): :return: A json response with combiner assignment config. :rtype: :class:`flask.Response` """ - # Check if package has been set - package_object = self.statestore.get_compute_package() - if package_object is None: + local_package = FEDN_ALLOW_LOCAL_PACKAGE + if local_package: + local_package = True + + if package == "remote": + package_object = self.statestore.get_compute_package() + if package_object is None: + return ( + jsonify( + { + "success": False, + "status": "retry", + "message": "No compute package found. Set package in controller.", + } + ), + 203, + ) + helper_type = self.control.statestore.get_helper() + elif package == "local" and local_package is False: + print("Local package not allowed. Set FEDN_ALLOW_LOCAL_PACKAGE=True in controller config.") return ( jsonify( { "success": False, - "status": "retry", - "message": "No compute package found. Set package in controller.", + "message": "Local package not allowed. Set FEDN_ALLOW_LOCAL_PACKAGE=True in controller config.", } ), - 203, + 400, ) + elif package == "local" and local_package is True: + helper_type = "" # Assign client to combiner if preferred_combiner: @@ -609,22 +586,14 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name): # Add client to network self.control.network.add_client(client_config) - # Setup response containing information about the combiner for assinging the client - if combiner.certificate: - cert_b64 = base64.b64encode(combiner.certificate) - cert = str(cert_b64).split("'")[1] - else: - cert = None - payload = { "status": "assigned", "host": combiner.address, "fqdn": combiner.fqdn, - "package": "remote", # TODO: Make this configurable + "package": package, "ip": combiner.ip, "port": combiner.port, - "certificate": cert, - "helper_type": self.control.statestore.get_helper(), + "helper_type": helper_type, } return jsonify(payload) diff --git a/fedn/network/api/network.py b/fedn/network/api/network.py index 5e2f2ef91..542761f49 100644 --- a/fedn/network/api/network.py +++ b/fedn/network/api/network.py @@ -1,4 +1,4 @@ -import base64 +import os from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerInterface @@ -47,14 +47,19 @@ def get_combiners(self): data = self.statestore.get_combiners() combiners = [] for c in data["result"]: - if c["certificate"]: - cert = base64.b64decode(c["certificate"]) - key = base64.b64decode(c["key"]) + name = c["name"].upper() + # General certificate handling, same for all combiners. + if os.environ.get("FEDN_GRPC_CERT_PATH"): + with open(os.environ.get("FEDN_GRPC_CERT_PATH"), "rb") as f: + cert = f.read() + # Specific certificate handling for each combiner. + elif os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}"): + cert_path = os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}") + with open(cert_path, "rb") as f: + cert = f.read() else: cert = None - key = None - - combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, key=key, ip=c["ip"])) + combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, ip=c["ip"])) return combiners diff --git a/fedn/network/api/server.py b/fedn/network/api/server.py index ac87ab7b1..185877577 100644 --- a/fedn/network/api/server.py +++ b/fedn/network/api/server.py @@ -592,9 +592,11 @@ def add_client(): remote_addr = request.remote_addr try: response = api.add_client(**json_data, remote_addr=remote_addr) - except TypeError: + except TypeError as e: + print(e) return jsonify({"success": False, "message": "Invalid data provided"}), 400 - except Exception: + except Exception as e: + print(e) return jsonify({"success": False, "message": "An unexpected error occurred"}), 500 return response diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index e594d7b6d..8508291ff 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -1,4 +1,3 @@ -import base64 import io import json import os @@ -11,7 +10,6 @@ import uuid from datetime import datetime from io import BytesIO -from shutil import copytree import grpc import requests @@ -28,7 +26,6 @@ from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString from fedn.network.combiner.modelservice import get_tmp_path, upload_request_generator -from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers.helpers import get_helper CHUNK_SIZE = 1024 * 1024 @@ -198,13 +195,7 @@ def connect(self, combiner_config): port = 443 logger.info(f"Initiating connection to combiner host at: {host}:{port}") - if combiner_config["certificate"]: - logger.info("Utilizing CA certificate for GRPC channel authentication.") - secure = True - cert = base64.b64decode(combiner_config["certificate"]) # .decode('utf-8') - credentials = grpc.ssl_channel_credentials(root_certificates=cert) - channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) - elif os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): + if os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): secure = True logger.info("Using root certificate from environment variable for GRPC channel.") with open(os.environ["FEDN_GRPC_ROOT_CERT_PATH"], "rb") as f: @@ -236,8 +227,6 @@ def connect(self, combiner_config): logger.info("Successfully established {} connection to {}:{}".format("secure" if secure else "insecure", host, port)) - logger.info("Using {} compute package.".format(combiner_config["package"])) - self._connected = True def disconnect(self): @@ -259,7 +248,11 @@ def _initialize_helper(self, combiner_config): :return: """ if "helper_type" in combiner_config.keys(): - self.helper = get_helper(combiner_config["helper_type"]) + if not combiner_config["helper_type"]: + # Default to numpyhelper + self.helper = get_helper("numpyhelper") + else: + self.helper = get_helper(combiner_config["helper_type"]) def _subscribe_to_combiner(self, config): """Listen to combiner message stream and start all processing threads. @@ -292,9 +285,8 @@ def _initialize_dispatcher(self, config): :type config: dict :return: """ + pr = PackageRuntime(self.run_path) if config["remote_compute_context"]: - pr = PackageRuntime(self.run_path) - retval = None tries = 10 @@ -333,18 +325,8 @@ def _initialize_dispatcher(self, config): logger.error(f"Caught exception: {type(e).__name__}") else: - # TODO: Deprecate - dispatch_config = { - "entry_points": { - "predict": {"command": "python3 predict.py"}, - "train": {"command": "python3 train.py"}, - "validate": {"command": "python3 validate.py"}, - } - } from_path = os.path.join(os.getcwd(), "client") - - copytree(from_path, self.run_path) - self.dispatcher = Dispatcher(dispatch_config, self.run_path) + self.dispatcher = pr.dispatcher(from_path) # Get or create python environment activate_cmd = self.dispatcher._get_or_create_python_env() if activate_cmd: diff --git a/fedn/network/clients/connect.py b/fedn/network/clients/connect.py index 59aaead35..bd7262936 100644 --- a/fedn/network/clients/connect.py +++ b/fedn/network/clients/connect.py @@ -74,7 +74,7 @@ def assign(self): """ try: retval = None - payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner} + payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner, "package": self.package} retval = requests.post( self.connect_string + FEDN_CUSTOM_URL_PREFIX + "/add_client", json=payload, @@ -110,17 +110,10 @@ def assign(self): if "message" in retval.json(): reason = retval.json()["message"] else: - reason = "Reducer was not ready. Try again later." + reason = "Controller was not ready. Try again later." return Status.TryAgain, reason - reducer_package = retval.json()["package"] - if reducer_package != self.package: - reason = "Unmatched config of compute package between client and reducer.\n" + "Reducer uses {} package and client uses {}.".format( - reducer_package, self.package - ) - return Status.UnMatchedConfig, reason - return Status.Assigned, retval.json() return Status.Unassigned, None diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 2c59991f9..d336932c5 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -1,24 +1,21 @@ -import base64 import json import queue import re import signal -import sys import threading import time import uuid from datetime import datetime, timedelta from enum import Enum +from typing import TypedDict import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc +from fedn.common.certificate.certificate import Certificate from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream -from fedn.network.combiner.connect import ConnectorCombiner, Status -from fedn.network.combiner.modelservice import ModelService from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler -from fedn.network.grpc.server import Server -from fedn.network.storage.s3.repository import Repository -from fedn.network.storage.statestore.mongostatestore import MongoStateStore +from fedn.network.combiner.shared import repository, statestore +from fedn.network.grpc.server import Server, ServerConfig VALID_NAME_REGEX = "^[a-zA-Z0-9_-]*$" @@ -50,6 +47,28 @@ def role_to_proto_role(role): return fedn.OTHER +class CombinerConfig(TypedDict): + """Configuration for the combiner.""" + + discover_host: str + discover_port: int + token: str + host: str + port: int + ip: str + parent: str + fqdn: str + name: str + secure: bool + verify: bool + cert_path: str + key_path: str + max_clients: int + network_id: str + logfile: str + verbosity: str + + class Combiner(rpc.CombinerServicer, rpc.ReducerServicer, rpc.ConnectorServicer, rpc.ControlServicer): """Combiner gRPC server. @@ -74,52 +93,21 @@ def __init__(self, config): self.role = Role.COMBINER self.max_clients = config["max_clients"] - # Connector to announce combiner to discover service (reducer) - announce_client = ConnectorCombiner( - host=config["discover_host"], - port=config["discover_port"], - myhost=config["host"], - fqdn=config["fqdn"], - myport=config["port"], - token=config["token"], - name=config["name"], - secure=config["secure"], - verify=config["verify"], - ) - - while True: - # Announce combiner to discover service - status, response = announce_client.announce() - if status == Status.TryAgain: - logger.info(response) - time.sleep(5) - elif status == Status.Assigned: - announce_config = response - logger.info("COMBINER {0}: Announced successfully".format(self.id)) - break - elif status == Status.UnAuthorized: - logger.info(response) - logger.info("Status.UnAuthorized") - sys.exit("Exiting: Unauthorized") - elif status == Status.UnMatchedConfig: - logger.info(response) - logger.info("Status.UnMatchedConfig") - sys.exit("Exiting: Missing config") - - cert = announce_config["certificate"] - key = announce_config["key"] - - if announce_config["certificate"]: - cert = base64.b64decode(announce_config["certificate"]) # .decode('utf-8') - key = base64.b64decode(announce_config["key"]) # .decode('utf-8') - - # Set up gRPC server configuration - grpc_config = {"port": config["port"], "secure": config["secure"], "certificate": cert, "key": key} - # Set up model repository - self.repository = Repository(announce_config["storage"]["storage_config"]) - - self.statestore = MongoStateStore(announce_config["statestore"]["network_id"], announce_config["statestore"]["mongo_config"]) + self.repository = repository + + self.statestore = statestore + + # Add combiner to statestore + interface_config = { + "port": config["port"], + "fqdn": config["fqdn"], + "name": config["name"], + "address": config["host"], + "parent": "localhost", + "ip": "", + } + self.statestore.set_combiner(interface_config) # Fetch all clients previously connected to the combiner # If a client and a combiner goes down at the same time, @@ -132,13 +120,19 @@ def __init__(self, config): except KeyError: self.statestore.set_client({"name": client["name"], "status": "offline"}) - self.modelservice = ModelService() + # Set up gRPC server configuration + if config["secure"]: + cert = Certificate(key_path=config["key_path"], cert_path=config["cert_path"]) + certificate, key = cert.get_keypair_raw() + grpc_server_config = ServerConfig(port=config["port"], secure=True, key=key, certificate=certificate) + else: + grpc_server_config = ServerConfig(port=config["port"], secure=False) # Create gRPC server - self.server = Server(self, self.modelservice, grpc_config) + self.server = Server(self, grpc_server_config) # Set up round controller - self.round_handler = RoundHandler(self.repository, self, self.modelservice) + self.round_handler = RoundHandler(self) # Start thread for round controller threading.Thread(target=self.round_handler.run, daemon=True).start() diff --git a/fedn/network/combiner/interfaces.py b/fedn/network/combiner/interfaces.py index 935b75442..20da29d23 100644 --- a/fedn/network/combiner/interfaces.py +++ b/fedn/network/combiner/interfaces.py @@ -125,13 +125,6 @@ def to_dict(self): "key": None, "config": self.config, } - - if self.certificate: - cert_b64 = base64.b64encode(self.certificate) - key_b64 = base64.b64encode(self.key) - data["certificate"] = str(cert_b64).split("'")[1] - data["key"] = str(key_b64).split("'")[1] - return data def to_json(self): diff --git a/fedn/network/combiner/roundhandler.py b/fedn/network/combiner/roundhandler.py index 54cfd189c..1f0025303 100644 --- a/fedn/network/combiner/roundhandler.py +++ b/fedn/network/combiner/roundhandler.py @@ -9,6 +9,7 @@ from fedn.common.log_config import logger from fedn.network.combiner.aggregators.aggregatorbase import get_aggregator from fedn.network.combiner.modelservice import load_model_from_BytesIO, serialize_model_to_BytesIO +from fedn.network.combiner.shared import modelservice, repository from fedn.utils.helpers.helpers import get_helper from fedn.utils.parameters import Parameters @@ -84,10 +85,10 @@ class RoundHandler: :type modelservice: class: `fedn.network.combiner.modelservice.ModelService` """ - def __init__(self, storage, server, modelservice): + def __init__(self, server): """Initialize the RoundHandler.""" self.round_configs = queue.Queue() - self.storage = storage + self.storage = repository self.server = server self.modelservice = modelservice diff --git a/fedn/network/combiner/shared.py b/fedn/network/combiner/shared.py new file mode 100644 index 000000000..5e5ee114c --- /dev/null +++ b/fedn/network/combiner/shared.py @@ -0,0 +1,13 @@ +from fedn.common.config import get_modelstorage_config, get_network_config, get_statestore_config +from fedn.network.combiner.modelservice import ModelService +from fedn.network.storage.s3.repository import Repository +from fedn.network.storage.statestore.mongostatestore import MongoStateStore + +statestore_config = get_statestore_config() +modelstorage_config = get_modelstorage_config() +network_id = get_network_config() + +statestore = MongoStateStore(network_id, statestore_config["mongo_config"]) +repository = Repository(modelstorage_config["storage_config"]) + +modelservice = ModelService() diff --git a/fedn/network/grpc/server.py b/fedn/network/grpc/server.py index a23691505..edd2fd6d5 100644 --- a/fedn/network/grpc/server.py +++ b/fedn/network/grpc/server.py @@ -1,17 +1,28 @@ from concurrent import futures +from typing import TypedDict import grpc from grpc_health.v1 import health, health_pb2_grpc import fedn.network.grpc.fedn_pb2_grpc as rpc from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream +from fedn.network.combiner.shared import modelservice from fedn.network.grpc.auth import JWTInterceptor +class ServerConfig(TypedDict): + port: int + secure: bool + key: str + certificate: str + logfile: str + verbosity: str + + class Server: """Class for configuring and launching the gRPC server.""" - def __init__(self, servicer, modelservicer, config): + def __init__(self, servicer, config: ServerConfig): set_log_level_from_string(config.get("verbosity", "INFO")) set_log_stream(config.get("logfile", None)) @@ -25,8 +36,8 @@ def __init__(self, servicer, modelservicer, config): rpc.add_ConnectorServicer_to_server(servicer, self.server) if isinstance(servicer, rpc.ReducerServicer): rpc.add_ReducerServicer_to_server(servicer, self.server) - if isinstance(modelservicer, rpc.ModelServiceServicer): - rpc.add_ModelServiceServicer_to_server(modelservicer, self.server) + if isinstance(modelservice, rpc.ModelServiceServicer): + rpc.add_ModelServiceServicer_to_server(modelservice, self.server) if isinstance(servicer, rpc.CombinerServicer): rpc.add_ControlServicer_to_server(servicer, self.server) diff --git a/fedn/network/storage/statestore/mongostatestore.py b/fedn/network/storage/statestore/mongostatestore.py index 7ef22a795..3ef204b5c 100644 --- a/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/network/storage/statestore/mongostatestore.py @@ -6,7 +6,6 @@ from google.protobuf.json_format import MessageToDict from fedn.common.log_config import logger -from fedn.network.combiner.roundhandler import RoundConfig from fedn.network.state import ReducerStateToString, StringToReducerState @@ -878,7 +877,7 @@ def create_round(self, round_data): # TODO: Add check if round_id already exists self.rounds.insert_one(round_data) - def set_session_config(self, id: str, config: RoundConfig) -> None: + def set_session_config(self, id: str, config) -> None: """Set the session configuration. :param id: The session id @@ -889,7 +888,7 @@ def set_session_config(self, id: str, config: RoundConfig) -> None: self.sessions.update_one({"session_id": str(id)}, {"$push": {"session_config": config}}, True) # Added to accomodate new session config structure - def set_session_config_v2(self, id: str, config: RoundConfig) -> None: + def set_session_config_v2(self, id: str, config) -> None: """Set the session configuration. :param id: The session id @@ -916,7 +915,7 @@ def set_round_combiner_data(self, data): """ self.rounds.update_one({"round_id": str(data["round_id"])}, {"$push": {"combiners": data}}, True) - def set_round_config(self, round_id, round_config: RoundConfig): + def set_round_config(self, round_id, round_config): """Set round configuration. :param round_id: The round unique identifier