diff --git a/.github/workflows/code-checks.yaml b/.github/workflows/code-checks.yaml index 8c48a3015..7098b0732 100644 --- a/.github/workflows/code-checks.yaml +++ b/.github/workflows/code-checks.yaml @@ -26,6 +26,8 @@ jobs: --exclude-dir='flower-client' --exclude='tests.py' --exclude='controller_cmd.py' + --exclude='combiner_cmd.py' + --exclude='run_cmd.py' --exclude='README.rst' '^[ \t]+(import|from) ' -I . diff --git a/LICENSE b/LICENSE index a8b7d2c09..ddf746a6b 100644 --- a/LICENSE +++ b/LICENSE @@ -1,3 +1,5 @@ +Copyright 2021 Scaleout Systems AB. All rights reserved. + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..b1504311c --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include README.rst +include fedn/common/settings-controller.yaml.template \ No newline at end of file diff --git a/config/settings-client.yaml.local.template b/config/settings-client.yaml.local.template new file mode 100644 index 000000000..e48e779af --- /dev/null +++ b/config/settings-client.yaml.local.template @@ -0,0 +1,3 @@ +network_id: fedn-network +discover_host: localhost +discover_port: 8092 diff --git a/config/settings-combiner.yaml.local.template b/config/settings-combiner.yaml.local.template new file mode 100644 index 000000000..b49917389 --- /dev/null +++ b/config/settings-combiner.yaml.local.template @@ -0,0 +1,31 @@ +network_id: fedn-network + +name: combiner +host: localhost +address: localhost +port: 12080 +max_clients: 30 + +cert_path: tmp/server.crt +key_path: tmp/server.key + +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: localhost + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: localhost + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False + + diff --git a/config/settings-combiner.yaml.template b/config/settings-combiner.yaml.template index 8cef6643a..11911cc6f 100644 --- a/config/settings-combiner.yaml.template +++ b/config/settings-combiner.yaml.template @@ -7,4 +7,23 @@ host: combiner port: 12080 max_clients: 30 +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: mongo + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: minio + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False + diff --git a/config/settings-controller.yaml.local.template b/config/settings-controller.yaml.local.template new file mode 100644 index 000000000..a5266a38b --- /dev/null +++ b/config/settings-controller.yaml.local.template @@ -0,0 +1,24 @@ +network_id: fedn-network +controller: + host: localhost + port: 8092 + debug: True + +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: localhost + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: localhost + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False diff --git a/docker-compose.yaml b/docker-compose.yaml index c3620e79d..26291748f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -87,6 +87,8 @@ services: environment: - PYTHONUNBUFFERED=0 - GET_HOSTS_FROM=dns + - STATESTORE_CONFIG=/app/config/settings-combiner.yaml + - MODELSTORAGE_CONFIG=/app/config/settings-combiner.yaml build: context: . args: diff --git a/docs/_static/css/elements.css b/docs/_static/css/elements.css index 8c3b1fdd2..b4efe08f5 100644 --- a/docs/_static/css/elements.css +++ b/docs/_static/css/elements.css @@ -19,7 +19,6 @@ article ul { .rst-content .section ul li, .rst-content .toctree-wrapper ul li, -.rst-content section ul li, .wy-plain-list-disc li, article ul li { list-style: none; diff --git a/docs/conf.py b/docs/conf.py index c5b4db2a2..1c3cdcbb3 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -8,11 +8,10 @@ # Project info project = "FEDn" -copyright = "2021, Scaleout Systems AB" author = "Scaleout Systems AB" # The full version, including alpha/beta/rc tags -release = "0.13.0" +release = "0.14.0" # Add any Sphinx extension module names here, as strings extensions = [ diff --git a/docs/projects.rst b/docs/projects.rst index 4ff75549f..ff4c6e735 100644 --- a/docs/projects.rst +++ b/docs/projects.rst @@ -385,6 +385,15 @@ Then, standing inside the 'client folder', you can test *train* and *validate* b python train.py ../seed.npz ../model_update.npz --data_path data/clients/1/mnist.pt python validate.py ../model_update.npz ../validation.json --data_path data/clients/1/mnist.pt +You can also test *train* and *validate* entrypoint using CLI command: + +.. note:: Before running the fedn run train or fedn run validate commands, make sure to download the training and test data. The downloads are usually handled by the "fedn run startup" command in the examples provided by FEDn. + +.. code-block:: bash + + fedn run train --path client --input --output + fedn run validate --path client --input --output + Packaging for training on FEDn =============================== diff --git a/examples/mnist-pytorch/client/python_env.yaml b/examples/mnist-pytorch/client/python_env.yaml index afdea926f..f43d2353d 100644 --- a/examples/mnist-pytorch/client/python_env.yaml +++ b/examples/mnist-pytorch/client/python_env.yaml @@ -4,6 +4,6 @@ build_dependencies: - setuptools - wheel dependencies: - - torch==2.3.1 - - torchvision==0.18.1 + - torch + - torchvision - fedn diff --git a/fedn/cli/combiner_cmd.py b/fedn/cli/combiner_cmd.py index 02a797448..3e7753e80 100644 --- a/fedn/cli/combiner_cmd.py +++ b/fedn/cli/combiner_cmd.py @@ -3,8 +3,6 @@ import click import requests -from fedn.network.combiner.combiner import Combiner - from .main import main from .shared import CONTROLLER_DEFAULTS, apply_config, get_api_url, get_token, print_response @@ -12,8 +10,7 @@ @main.group("combiner") @click.pass_context def combiner_cmd(ctx): - """:param ctx: - """ + """:param ctx:""" pass @@ -60,6 +57,8 @@ def start_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, se click.echo(f"\nCombiner configuration loaded from file: {init}") click.echo("Values set in file override defaults and command line arguments...\n") + from fedn.network.combiner.combiner import Combiner + combiner = Combiner(config) combiner.run() diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index 082aaa7bb..0342bb39d 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -4,15 +4,15 @@ import click import yaml + +from fedn.cli.client_cmd import validate_client_config +from fedn.cli.main import main +from fedn.cli.shared import apply_config from fedn.common.exceptions import InvalidClientConfig from fedn.common.log_config import logger from fedn.network.clients.client import Client -from fedn.network.combiner.combiner import Combiner from fedn.utils.dispatcher import Dispatcher, _read_yaml_file -from fedn.cli.client_cmd import validate_client_config -from fedn.cli.main import main -from fedn.cli.shared import apply_config def get_statestore_config_from_file(init): """:param init: @@ -35,6 +35,7 @@ def check_helper_config_file(config): exit(-1) return helper + def check_yaml_exists(path): """Check if fedn.yaml exists in the given path.""" yaml_file = os.path.join(path, "fedn.yaml") @@ -43,21 +44,26 @@ def check_yaml_exists(path): click.echo(f"Could not find fedn.yaml in {path}") exit(-1) return yaml_file + + def delete_virtual_environment(dispatcher): if dispatcher.python_env_path: logger.info(f"Removing virtualenv {dispatcher.python_env_path}") shutil.rmtree(dispatcher.python_env_path) else: logger.warning("No virtualenv found to remove.") + + @main.group("run") @click.pass_context def run_cmd(ctx): - """:param ctx: - """ + """:param ctx:""" pass + + @run_cmd.command("validate") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") -@click.option("-i", "--input", required=True, help="Path to input model" ) +@click.option("-i", "--input", required=True, help="Path to input model") @click.option("-o", "--output", required=True, help="Path to write the output JSON containing validation metrics") @click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @click.pass_context @@ -82,9 +88,11 @@ def validate_cmd(ctx, path, input, output, keep_venv): dispatcher.run_cmd("validate {} {}".format(input, output)) if not keep_venv: delete_virtual_environment(dispatcher) + + @run_cmd.command("train") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") -@click.option("-i", "--input", required=True, help="Path to input model parameters" ) +@click.option("-i", "--input", required=True, help="Path to input model parameters") @click.option("-o", "--output", required=True, help="Path to write the updated model parameters ") @click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @click.pass_context @@ -109,6 +117,8 @@ def train_cmd(ctx, path, input, output, keep_venv): dispatcher.run_cmd("train {} {}".format(input, output)) if not keep_venv: delete_virtual_environment(dispatcher) + + @run_cmd.command("startup") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") @click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @@ -134,6 +144,7 @@ def startup_cmd(ctx, path, keep_venv): if not keep_venv: delete_virtual_environment(dispatcher) + @run_cmd.command("build") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") @click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @@ -173,7 +184,7 @@ def build_cmd(ctx, path, keep_venv): @click.option("-s", "--secure", required=False, default=False) @click.option("-pc", "--preshared-cert", required=False, default=False) @click.option("-v", "--verify", is_flag=True, help="Verify SSL/TLS for REST service") -@click.option("-c", "--preferred-combiner", required=False,type=str, default="",help="url to the combiner or name of the preferred combiner") +@click.option("-c", "--preferred-combiner", required=False, type=str, default="", help="url to the combiner or name of the preferred combiner") @click.option("-va", "--validator", required=False, default=True) @click.option("-tr", "--trainer", required=False, default=True) @click.option("-in", "--init", required=False, default=None, help="Set to a filename to (re)init client from file state.") @@ -310,5 +321,7 @@ def combiner_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, click.echo(f"\nCombiner configuration loaded from file: {init}") click.echo("Values set in file override defaults and command line arguments...\n") + from fedn.network.combiner.combiner import Combiner + combiner = Combiner(config) combiner.run() diff --git a/fedn/common/certificate/certificate.py b/fedn/common/certificate/certificate.py index 3cb09016c..547175a20 100644 --- a/fedn/common/certificate/certificate.py +++ b/fedn/common/certificate/certificate.py @@ -9,24 +9,27 @@ class Certificate: - """Utility to generate unsigned certificates. - - """ + """Utility to generate unsigned certificates.""" CERT_NAME = "cert.pem" KEY_NAME = "key.pem" BITS = 2048 - def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", create_dirs=True): - try: - os.makedirs(cwd) - except OSError: - logger.info("Directory exists, will store all cert and keys here.") + def __init__(self, name=None, key_path="", cert_path="", create_dirs=False): + if create_dirs: + try: + cwd = os.getcwd() + os.makedirs(cwd) + except OSError: + logger.info("Directory exists, will store all cert and keys here.") + else: + logger.info("Successfully created the directory to store cert and keys in {}".format(cwd)) + + self.key_path = os.path.join(cwd, "key.pem") + self.cert_path = os.path.join(cwd, "cert.pem") else: - logger.info("Successfully created the directory to store cert and keys in {}".format(cwd)) - - self.key_path = os.path.join(cwd, key_name) - self.cert_path = os.path.join(cwd, cert_name) + self.key_path = key_path + self.cert_path = cert_path if name: self.name = name @@ -36,9 +39,7 @@ def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", cre def gen_keypair( self, ): - """Generate keypair. - - """ + """Generate keypair.""" key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, 2048) cert = crypto.X509() @@ -73,8 +74,7 @@ def set_keypair_raw(self, certificate, privatekey): certfile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, certificate)) def get_keypair_raw(self): - """:return: - """ + """:return:""" with open(self.key_path, "rb") as keyfile: key_buf = keyfile.read() with open(self.cert_path, "rb") as certfile: @@ -82,16 +82,14 @@ def get_keypair_raw(self): return copy.deepcopy(cert_buf), copy.deepcopy(key_buf) def get_key(self): - """:return: - """ + """:return:""" with open(self.key_path, "rb") as keyfile: key_buf = keyfile.read() key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_buf) return key def get_cert(self): - """:return: - """ + """:return:""" with open(self.cert_path, "rb") as certfile: cert_buf = certfile.read() cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_buf) diff --git a/fedn/common/config.py b/fedn/common/config.py index 23d873ff7..517e57d94 100644 --- a/fedn/common/config.py +++ b/fedn/common/config.py @@ -15,6 +15,8 @@ FEDN_AUTH_REFRESH_TOKEN = os.environ.get("FEDN_AUTH_REFRESH_TOKEN", False) FEDN_CUSTOM_URL_PREFIX = os.environ.get("FEDN_CUSTOM_URL_PREFIX", "") + +FEDN_ALLOW_LOCAL_PACKAGE = os.environ.get("FEDN_ALLOW_LOCAL_PACKAGE", False) FEDN_PACKAGE_EXTRACT_DIR = os.environ.get("FEDN_PACKAGE_EXTRACT_DIR", "package") diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 9936e0bc0..3ffaadecf 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -1,5 +1,4 @@ -import base64 -import copy +import os import threading import uuid from io import BytesIO @@ -8,9 +7,9 @@ from werkzeug.security import safe_join from werkzeug.utils import secure_filename -from fedn.common.config import get_controller_config, get_network_config +from fedn.common.config import FEDN_ALLOW_LOCAL_PACKAGE, get_controller_config, get_network_config from fedn.common.log_config import logger -from fedn.network.combiner.interfaces import CombinerInterface, CombinerUnavailableError +from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.state import ReducerState, ReducerStateToString from fedn.utils.checksum import sha @@ -231,7 +230,7 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript file_name = file.filename storage_file_name = secure_filename(f"{str(uuid.uuid4())}.{extension}") - file_path = safe_join("/app/client/package/", storage_file_name) + file_path = safe_join(os.getcwd(), storage_file_name) file.save(file_path) self.control.set_compute_package(storage_file_name, file_path) @@ -247,7 +246,8 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript ), 400, ) - + # Delete the file after it has been saved + os.remove(file_path) return jsonify({"success": True, "message": "Compute package set."}) def _get_compute_package_name(self): @@ -371,19 +371,21 @@ def download_compute_package(self, name): mutex = threading.Lock() mutex.acquire() # TODO: make configurable, perhaps in config.py or package.py - return send_from_directory("/app/client/package/", name, as_attachment=True) + return send_from_directory(os.getcwd(), name, as_attachment=True) except Exception: try: data = self.control.get_compute_package(name) # TODO: make configurable, perhaps in config.py or package.py - file_path = safe_join("/app/client/package/", name) + file_path = safe_join(os.getcwd(), name) with open(file_path, "wb") as fh: fh.write(data) # TODO: make configurable, perhaps in config.py or package.py - return send_from_directory("/app/client/package/", name, as_attachment=True) + return send_from_directory(os.getcwd(), name, as_attachment=True) except Exception: raise finally: + # Delete the file after it has been saved + os.remove(file_path) mutex.release() def _create_checksum(self, name=None): @@ -398,7 +400,7 @@ def _create_checksum(self, name=None): name, message = self._get_compute_package_name() if name is None: return False, message, "" - file_path = safe_join("/app/client/package/", name) # TODO: make configurable, perhaps in config.py or package.py + file_path = safe_join(os.getcwd(), name) # TODO: make configurable, perhaps in config.py or package.py try: sum = str(sha(file_path)) except FileNotFoundError: @@ -502,58 +504,15 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por :return: Config of the combiner as a json response. :rtype: :class:`flask.Response` """ - # TODO: Any more required check for config? Formerly based on status: "retry" - if not self.control.idle(): - return jsonify( - { - "success": False, - "status": "retry", - "message": "Conroller is not in idle state, try again later. ", - } - ) - # Check if combiner already exists - combiner = self.control.network.get_combiner(combiner_id) - if not combiner: - if secure_grpc == "True": - certificate, key = self.certificate_manager.get_or_create(address).get_keypair_raw() - _ = base64.b64encode(certificate) - _ = base64.b64encode(key) - - else: - certificate = None - key = None - - combiner_interface = CombinerInterface( - parent=self._to_dict(), - name=combiner_id, - address=address, - fqdn=fqdn, - port=port, - certificate=copy.deepcopy(certificate), - key=copy.deepcopy(key), - ip=remote_addr, - ) - - self.control.network.add_combiner(combiner_interface) - - # Check combiner now exists - combiner = self.control.network.get_combiner(combiner_id) - if not combiner: - return jsonify({"success": False, "message": "Combiner not added."}) - payload = { - "success": True, - "message": "Combiner added successfully.", - "status": "added", - "storage": self.statestore.get_storage_backend(), - "statestore": self.statestore.get_config(), - "certificate": combiner.get_certificate(), - "key": combiner.get_key(), + "success": False, + "message": "Adding combiner via REST API is obsolete. Include statestore and object store config in combiner config.", + "status": "abort", } return jsonify(payload) - def add_client(self, client_id, preferred_combiner, remote_addr, name): + def add_client(self, client_id, preferred_combiner, remote_addr, name, package): """Add a client to the network. :param client_id: The client id to add. @@ -563,19 +522,37 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name): :return: A json response with combiner assignment config. :rtype: :class:`flask.Response` """ - # Check if package has been set - package_object = self.statestore.get_compute_package() - if package_object is None: + local_package = FEDN_ALLOW_LOCAL_PACKAGE + if local_package: + local_package = True + + if package == "remote": + package_object = self.statestore.get_compute_package() + if package_object is None: + return ( + jsonify( + { + "success": False, + "status": "retry", + "message": "No compute package found. Set package in controller.", + } + ), + 203, + ) + helper_type = self.control.statestore.get_helper() + elif package == "local" and local_package is False: + print("Local package not allowed. Set FEDN_ALLOW_LOCAL_PACKAGE=True in controller config.") return ( jsonify( { "success": False, - "status": "retry", - "message": "No compute package found. Set package in controller.", + "message": "Local package not allowed. Set FEDN_ALLOW_LOCAL_PACKAGE=True in controller config.", } ), - 203, + 400, ) + elif package == "local" and local_package is True: + helper_type = "" # Assign client to combiner if preferred_combiner: @@ -609,22 +586,14 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name): # Add client to network self.control.network.add_client(client_config) - # Setup response containing information about the combiner for assinging the client - if combiner.certificate: - cert_b64 = base64.b64encode(combiner.certificate) - cert = str(cert_b64).split("'")[1] - else: - cert = None - payload = { "status": "assigned", "host": combiner.address, "fqdn": combiner.fqdn, - "package": "remote", # TODO: Make this configurable + "package": package, "ip": combiner.ip, "port": combiner.port, - "certificate": cert, - "helper_type": self.control.statestore.get_helper(), + "helper_type": helper_type, } return jsonify(payload) diff --git a/fedn/network/api/network.py b/fedn/network/api/network.py index 5e2f2ef91..542761f49 100644 --- a/fedn/network/api/network.py +++ b/fedn/network/api/network.py @@ -1,4 +1,4 @@ -import base64 +import os from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerInterface @@ -47,14 +47,19 @@ def get_combiners(self): data = self.statestore.get_combiners() combiners = [] for c in data["result"]: - if c["certificate"]: - cert = base64.b64decode(c["certificate"]) - key = base64.b64decode(c["key"]) + name = c["name"].upper() + # General certificate handling, same for all combiners. + if os.environ.get("FEDN_GRPC_CERT_PATH"): + with open(os.environ.get("FEDN_GRPC_CERT_PATH"), "rb") as f: + cert = f.read() + # Specific certificate handling for each combiner. + elif os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}"): + cert_path = os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}") + with open(cert_path, "rb") as f: + cert = f.read() else: cert = None - key = None - - combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, key=key, ip=c["ip"])) + combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, ip=c["ip"])) return combiners diff --git a/fedn/network/api/server.py b/fedn/network/api/server.py index ac87ab7b1..185877577 100644 --- a/fedn/network/api/server.py +++ b/fedn/network/api/server.py @@ -592,9 +592,11 @@ def add_client(): remote_addr = request.remote_addr try: response = api.add_client(**json_data, remote_addr=remote_addr) - except TypeError: + except TypeError as e: + print(e) return jsonify({"success": False, "message": "Invalid data provided"}), 400 - except Exception: + except Exception as e: + print(e) return jsonify({"success": False, "message": "An unexpected error occurred"}), 500 return response diff --git a/fedn/network/api/v1/client_routes.py b/fedn/network/api/v1/client_routes.py index 8fa13febe..fb268905b 100644 --- a/fedn/network/api/v1/client_routes.py +++ b/fedn/network/api/v1/client_routes.py @@ -1,13 +1,11 @@ from flask import Blueprint, jsonify, request from fedn.network.api.auth import jwt_auth_required -from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb -from fedn.network.storage.statestore.stores.client_store import ClientStore +from fedn.network.api.v1.shared import api_version, client_store, get_post_data_to_kwargs, get_typed_list_headers from fedn.network.storage.statestore.stores.shared import EntityNotFound bp = Blueprint("client", __name__, url_prefix=f"/api/{api_version}/clients") -client_store = ClientStore(mdb, "network.clients") @bp.route("/", methods=["GET"]) @@ -368,3 +366,47 @@ def get_client(id: str): return jsonify({"message": f"Entity with id: {id} not found"}), 404 except Exception: return jsonify({"message": "An unexpected error occurred"}), 500 + +# delete client +@bp.route("/", methods=["DELETE"]) +@jwt_auth_required(role="admin") +def delete_client(id: str): + """Delete client + Deletes a client based on the provided id. + --- + tags: + - Clients + parameters: + - name: id + in: path + required: true + type: string + description: The id of the client + responses: + 200: + description: The client was deleted + 404: + description: The client was not found + schema: + type: object + properties: + message: + type: string + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + result: bool = client_store.delete(id) + + msg = "Client deleted" if result else "Client not deleted" + + return jsonify({"message": msg}), 200 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/combiner_routes.py b/fedn/network/api/v1/combiner_routes.py index 9210a7e30..02617b7bb 100644 --- a/fedn/network/api/v1/combiner_routes.py +++ b/fedn/network/api/v1/combiner_routes.py @@ -1,7 +1,7 @@ from flask import Blueprint, jsonify, request from fedn.network.api.auth import jwt_auth_required -from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb +from fedn.network.api.v1.shared import api_version, client_store, get_post_data_to_kwargs, get_typed_list_headers, mdb from fedn.network.storage.statestore.stores.combiner_store import CombinerStore from fedn.network.storage.statestore.stores.shared import EntityNotFound @@ -339,3 +339,92 @@ def get_combiner(id: str): return jsonify({"message": f"Entity with id: {id} not found"}), 404 except Exception: return jsonify({"message": "An unexpected error occurred"}), 500 + +@bp.route("/", methods=["DELETE"]) +@jwt_auth_required(role="admin") +def delete_combiner(id: str): + """Delete combiner + Deletes a combiner based on the provided id. + --- + tags: + - Combiners + parameters: + - name: id + in: path + required: true + type: string + description: The id of the combiner + responses: + 200: + description: The combiner was deleted + 404: + description: The combiner was not found + schema: + type: object + properties: + error: + type: string + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + result: bool = combiner_store.delete(id) + msg = "Combiner deleted" if result else "Combiner not deleted" + + return jsonify({"message": msg}), 200 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 + + +@bp.route("/clients/count", methods=["POST"]) +@jwt_auth_required(role="admin") +def number_of_clients_connected(): + """Number of clients connected + Retrieves the number of clients connected to the combiner. + --- + tags: + - Combiners + parameters: + - name: combiners + in: body + required: true + type: object + description: Object containing the ids of the combiners + schema: + type: object + properties: + combiners: + type: string + responses: + 200: + description: A list of objects containing the number of clients connected to each combiner + schema: + type: Array + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + data = request.get_json() + combiners = data.get("combiners", "") + combiners = combiners.split(",") if combiners else [] + response = client_store.connected_client_count(combiners) + + result = { + "result": response + } + + return jsonify(result), 200 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/shared.py b/fedn/network/api/v1/shared.py index a27a6f637..0fda39c45 100644 --- a/fedn/network/api/v1/shared.py +++ b/fedn/network/api/v1/shared.py @@ -3,13 +3,15 @@ import pymongo from pymongo.database import Database -from fedn.network.api.shared import statestore_config,network_id +from fedn.network.api.shared import network_id, statestore_config +from fedn.network.storage.statestore.stores.client_store import ClientStore api_version = "v1" mc = pymongo.MongoClient(**statestore_config["mongo_config"]) mc.server_info() mdb: Database = mc[network_id] +client_store = ClientStore(mdb, "network.clients") def is_positive_integer(s): return s is not None and s.isdigit() and int(s) > 0 diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index e594d7b6d..8508291ff 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -1,4 +1,3 @@ -import base64 import io import json import os @@ -11,7 +10,6 @@ import uuid from datetime import datetime from io import BytesIO -from shutil import copytree import grpc import requests @@ -28,7 +26,6 @@ from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString from fedn.network.combiner.modelservice import get_tmp_path, upload_request_generator -from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers.helpers import get_helper CHUNK_SIZE = 1024 * 1024 @@ -198,13 +195,7 @@ def connect(self, combiner_config): port = 443 logger.info(f"Initiating connection to combiner host at: {host}:{port}") - if combiner_config["certificate"]: - logger.info("Utilizing CA certificate for GRPC channel authentication.") - secure = True - cert = base64.b64decode(combiner_config["certificate"]) # .decode('utf-8') - credentials = grpc.ssl_channel_credentials(root_certificates=cert) - channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) - elif os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): + if os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): secure = True logger.info("Using root certificate from environment variable for GRPC channel.") with open(os.environ["FEDN_GRPC_ROOT_CERT_PATH"], "rb") as f: @@ -236,8 +227,6 @@ def connect(self, combiner_config): logger.info("Successfully established {} connection to {}:{}".format("secure" if secure else "insecure", host, port)) - logger.info("Using {} compute package.".format(combiner_config["package"])) - self._connected = True def disconnect(self): @@ -259,7 +248,11 @@ def _initialize_helper(self, combiner_config): :return: """ if "helper_type" in combiner_config.keys(): - self.helper = get_helper(combiner_config["helper_type"]) + if not combiner_config["helper_type"]: + # Default to numpyhelper + self.helper = get_helper("numpyhelper") + else: + self.helper = get_helper(combiner_config["helper_type"]) def _subscribe_to_combiner(self, config): """Listen to combiner message stream and start all processing threads. @@ -292,9 +285,8 @@ def _initialize_dispatcher(self, config): :type config: dict :return: """ + pr = PackageRuntime(self.run_path) if config["remote_compute_context"]: - pr = PackageRuntime(self.run_path) - retval = None tries = 10 @@ -333,18 +325,8 @@ def _initialize_dispatcher(self, config): logger.error(f"Caught exception: {type(e).__name__}") else: - # TODO: Deprecate - dispatch_config = { - "entry_points": { - "predict": {"command": "python3 predict.py"}, - "train": {"command": "python3 train.py"}, - "validate": {"command": "python3 validate.py"}, - } - } from_path = os.path.join(os.getcwd(), "client") - - copytree(from_path, self.run_path) - self.dispatcher = Dispatcher(dispatch_config, self.run_path) + self.dispatcher = pr.dispatcher(from_path) # Get or create python environment activate_cmd = self.dispatcher._get_or_create_python_env() if activate_cmd: diff --git a/fedn/network/clients/connect.py b/fedn/network/clients/connect.py index 59aaead35..bd7262936 100644 --- a/fedn/network/clients/connect.py +++ b/fedn/network/clients/connect.py @@ -74,7 +74,7 @@ def assign(self): """ try: retval = None - payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner} + payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner, "package": self.package} retval = requests.post( self.connect_string + FEDN_CUSTOM_URL_PREFIX + "/add_client", json=payload, @@ -110,17 +110,10 @@ def assign(self): if "message" in retval.json(): reason = retval.json()["message"] else: - reason = "Reducer was not ready. Try again later." + reason = "Controller was not ready. Try again later." return Status.TryAgain, reason - reducer_package = retval.json()["package"] - if reducer_package != self.package: - reason = "Unmatched config of compute package between client and reducer.\n" + "Reducer uses {} package and client uses {}.".format( - reducer_package, self.package - ) - return Status.UnMatchedConfig, reason - return Status.Assigned, retval.json() return Status.Unassigned, None diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 2c59991f9..d336932c5 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -1,24 +1,21 @@ -import base64 import json import queue import re import signal -import sys import threading import time import uuid from datetime import datetime, timedelta from enum import Enum +from typing import TypedDict import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc +from fedn.common.certificate.certificate import Certificate from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream -from fedn.network.combiner.connect import ConnectorCombiner, Status -from fedn.network.combiner.modelservice import ModelService from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler -from fedn.network.grpc.server import Server -from fedn.network.storage.s3.repository import Repository -from fedn.network.storage.statestore.mongostatestore import MongoStateStore +from fedn.network.combiner.shared import repository, statestore +from fedn.network.grpc.server import Server, ServerConfig VALID_NAME_REGEX = "^[a-zA-Z0-9_-]*$" @@ -50,6 +47,28 @@ def role_to_proto_role(role): return fedn.OTHER +class CombinerConfig(TypedDict): + """Configuration for the combiner.""" + + discover_host: str + discover_port: int + token: str + host: str + port: int + ip: str + parent: str + fqdn: str + name: str + secure: bool + verify: bool + cert_path: str + key_path: str + max_clients: int + network_id: str + logfile: str + verbosity: str + + class Combiner(rpc.CombinerServicer, rpc.ReducerServicer, rpc.ConnectorServicer, rpc.ControlServicer): """Combiner gRPC server. @@ -74,52 +93,21 @@ def __init__(self, config): self.role = Role.COMBINER self.max_clients = config["max_clients"] - # Connector to announce combiner to discover service (reducer) - announce_client = ConnectorCombiner( - host=config["discover_host"], - port=config["discover_port"], - myhost=config["host"], - fqdn=config["fqdn"], - myport=config["port"], - token=config["token"], - name=config["name"], - secure=config["secure"], - verify=config["verify"], - ) - - while True: - # Announce combiner to discover service - status, response = announce_client.announce() - if status == Status.TryAgain: - logger.info(response) - time.sleep(5) - elif status == Status.Assigned: - announce_config = response - logger.info("COMBINER {0}: Announced successfully".format(self.id)) - break - elif status == Status.UnAuthorized: - logger.info(response) - logger.info("Status.UnAuthorized") - sys.exit("Exiting: Unauthorized") - elif status == Status.UnMatchedConfig: - logger.info(response) - logger.info("Status.UnMatchedConfig") - sys.exit("Exiting: Missing config") - - cert = announce_config["certificate"] - key = announce_config["key"] - - if announce_config["certificate"]: - cert = base64.b64decode(announce_config["certificate"]) # .decode('utf-8') - key = base64.b64decode(announce_config["key"]) # .decode('utf-8') - - # Set up gRPC server configuration - grpc_config = {"port": config["port"], "secure": config["secure"], "certificate": cert, "key": key} - # Set up model repository - self.repository = Repository(announce_config["storage"]["storage_config"]) - - self.statestore = MongoStateStore(announce_config["statestore"]["network_id"], announce_config["statestore"]["mongo_config"]) + self.repository = repository + + self.statestore = statestore + + # Add combiner to statestore + interface_config = { + "port": config["port"], + "fqdn": config["fqdn"], + "name": config["name"], + "address": config["host"], + "parent": "localhost", + "ip": "", + } + self.statestore.set_combiner(interface_config) # Fetch all clients previously connected to the combiner # If a client and a combiner goes down at the same time, @@ -132,13 +120,19 @@ def __init__(self, config): except KeyError: self.statestore.set_client({"name": client["name"], "status": "offline"}) - self.modelservice = ModelService() + # Set up gRPC server configuration + if config["secure"]: + cert = Certificate(key_path=config["key_path"], cert_path=config["cert_path"]) + certificate, key = cert.get_keypair_raw() + grpc_server_config = ServerConfig(port=config["port"], secure=True, key=key, certificate=certificate) + else: + grpc_server_config = ServerConfig(port=config["port"], secure=False) # Create gRPC server - self.server = Server(self, self.modelservice, grpc_config) + self.server = Server(self, grpc_server_config) # Set up round controller - self.round_handler = RoundHandler(self.repository, self, self.modelservice) + self.round_handler = RoundHandler(self) # Start thread for round controller threading.Thread(target=self.round_handler.run, daemon=True).start() diff --git a/fedn/network/combiner/interfaces.py b/fedn/network/combiner/interfaces.py index 935b75442..20da29d23 100644 --- a/fedn/network/combiner/interfaces.py +++ b/fedn/network/combiner/interfaces.py @@ -125,13 +125,6 @@ def to_dict(self): "key": None, "config": self.config, } - - if self.certificate: - cert_b64 = base64.b64encode(self.certificate) - key_b64 = base64.b64encode(self.key) - data["certificate"] = str(cert_b64).split("'")[1] - data["key"] = str(key_b64).split("'")[1] - return data def to_json(self): diff --git a/fedn/network/combiner/roundhandler.py b/fedn/network/combiner/roundhandler.py index 54cfd189c..1f0025303 100644 --- a/fedn/network/combiner/roundhandler.py +++ b/fedn/network/combiner/roundhandler.py @@ -9,6 +9,7 @@ from fedn.common.log_config import logger from fedn.network.combiner.aggregators.aggregatorbase import get_aggregator from fedn.network.combiner.modelservice import load_model_from_BytesIO, serialize_model_to_BytesIO +from fedn.network.combiner.shared import modelservice, repository from fedn.utils.helpers.helpers import get_helper from fedn.utils.parameters import Parameters @@ -84,10 +85,10 @@ class RoundHandler: :type modelservice: class: `fedn.network.combiner.modelservice.ModelService` """ - def __init__(self, storage, server, modelservice): + def __init__(self, server): """Initialize the RoundHandler.""" self.round_configs = queue.Queue() - self.storage = storage + self.storage = repository self.server = server self.modelservice = modelservice diff --git a/fedn/network/combiner/shared.py b/fedn/network/combiner/shared.py new file mode 100644 index 000000000..5e5ee114c --- /dev/null +++ b/fedn/network/combiner/shared.py @@ -0,0 +1,13 @@ +from fedn.common.config import get_modelstorage_config, get_network_config, get_statestore_config +from fedn.network.combiner.modelservice import ModelService +from fedn.network.storage.s3.repository import Repository +from fedn.network.storage.statestore.mongostatestore import MongoStateStore + +statestore_config = get_statestore_config() +modelstorage_config = get_modelstorage_config() +network_id = get_network_config() + +statestore = MongoStateStore(network_id, statestore_config["mongo_config"]) +repository = Repository(modelstorage_config["storage_config"]) + +modelservice = ModelService() diff --git a/fedn/network/grpc/server.py b/fedn/network/grpc/server.py index a23691505..edd2fd6d5 100644 --- a/fedn/network/grpc/server.py +++ b/fedn/network/grpc/server.py @@ -1,17 +1,28 @@ from concurrent import futures +from typing import TypedDict import grpc from grpc_health.v1 import health, health_pb2_grpc import fedn.network.grpc.fedn_pb2_grpc as rpc from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream +from fedn.network.combiner.shared import modelservice from fedn.network.grpc.auth import JWTInterceptor +class ServerConfig(TypedDict): + port: int + secure: bool + key: str + certificate: str + logfile: str + verbosity: str + + class Server: """Class for configuring and launching the gRPC server.""" - def __init__(self, servicer, modelservicer, config): + def __init__(self, servicer, config: ServerConfig): set_log_level_from_string(config.get("verbosity", "INFO")) set_log_stream(config.get("logfile", None)) @@ -25,8 +36,8 @@ def __init__(self, servicer, modelservicer, config): rpc.add_ConnectorServicer_to_server(servicer, self.server) if isinstance(servicer, rpc.ReducerServicer): rpc.add_ReducerServicer_to_server(servicer, self.server) - if isinstance(modelservicer, rpc.ModelServiceServicer): - rpc.add_ModelServiceServicer_to_server(modelservicer, self.server) + if isinstance(modelservice, rpc.ModelServiceServicer): + rpc.add_ModelServiceServicer_to_server(modelservice, self.server) if isinstance(servicer, rpc.CombinerServicer): rpc.add_ControlServicer_to_server(servicer, self.server) diff --git a/fedn/network/storage/statestore/mongostatestore.py b/fedn/network/storage/statestore/mongostatestore.py index 7ef22a795..3ef204b5c 100644 --- a/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/network/storage/statestore/mongostatestore.py @@ -6,7 +6,6 @@ from google.protobuf.json_format import MessageToDict from fedn.common.log_config import logger -from fedn.network.combiner.roundhandler import RoundConfig from fedn.network.state import ReducerStateToString, StringToReducerState @@ -878,7 +877,7 @@ def create_round(self, round_data): # TODO: Add check if round_id already exists self.rounds.insert_one(round_data) - def set_session_config(self, id: str, config: RoundConfig) -> None: + def set_session_config(self, id: str, config) -> None: """Set the session configuration. :param id: The session id @@ -889,7 +888,7 @@ def set_session_config(self, id: str, config: RoundConfig) -> None: self.sessions.update_one({"session_id": str(id)}, {"$push": {"session_config": config}}, True) # Added to accomodate new session config structure - def set_session_config_v2(self, id: str, config: RoundConfig) -> None: + def set_session_config_v2(self, id: str, config) -> None: """Set the session configuration. :param id: The session id @@ -916,7 +915,7 @@ def set_round_combiner_data(self, data): """ self.rounds.update_one({"round_id": str(data["round_id"])}, {"$push": {"combiners": data}}, True) - def set_round_config(self, round_id, round_config: RoundConfig): + def set_round_config(self, round_id, round_config): """Set round configuration. :param round_id: The round unique identifier diff --git a/fedn/network/storage/statestore/stores/client_store.py b/fedn/network/storage/statestore/stores/client_store.py index 6d7d5865e..c3c2e5225 100644 --- a/fedn/network/storage/statestore/stores/client_store.py +++ b/fedn/network/storage/statestore/stores/client_store.py @@ -2,10 +2,13 @@ from typing import Any, Dict, List, Tuple import pymongo +from bson import ObjectId from pymongo.database import Database from fedn.network.storage.statestore.stores.store import Store +from .shared import EntityNotFound + class Client: def __init__(self, id: str, name: str, combiner: str, combiner_preferred: str, ip: str, status: str, updated_at: str, last_seen: datetime): @@ -53,7 +56,14 @@ def add(self, item: Client)-> Tuple[bool, Any]: raise NotImplementedError("Add not implemented for ClientStore") def delete(self, id: str) -> bool: - raise NotImplementedError("Delete not implemented for ClientStore") + kwargs = { "_id": ObjectId(id) } if ObjectId.is_valid(id) else { "client_id": id } + + document = self.database[self.collection].find_one(kwargs) + + if document is None: + raise EntityNotFound(f"Entity with (id | client_id) {id} not found") + + return super().delete(document["_id"]) def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDING, use_typing: bool = False, **kwargs) -> Dict[int, List[Client]]: """List entities @@ -83,3 +93,36 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI def count(self, **kwargs) -> int: return super().count(**kwargs) + + def connected_client_count(self, combiners): + """Count the number of connected clients for each combiner. + + :param combiners: list of combiners to get data for. + :type combiners: list + :param sort_key: The key to sort by. + :type sort_key: str + :param sort_order: The sort order. + :type sort_order: pymongo.ASCENDING or pymongo.DESCENDING + :return: list of combiner data. + :rtype: list(ObjectId) + """ + try: + pipeline = ( + [ + {"$match": {"combiner": {"$in": combiners}, "status": "online"}}, + {"$group": {"_id": "$combiner", "count": {"$sum": 1}}}, + {"$project": {"id": "$_id", "count": 1, "_id": 0}} + ] + if len(combiners) > 0 + else [ + {"$match": { "status": "online"}}, + {"$group": {"_id": "$combiner", "count": {"$sum": 1}}}, + {"$project": {"id": "$_id", "count": 1, "_id": 0}} + ] + ) + + result = list(self.database[self.collection].aggregate(pipeline)) + except Exception: + result = {} + + return result diff --git a/fedn/network/storage/statestore/stores/combiner_store.py b/fedn/network/storage/statestore/stores/combiner_store.py index 28f54aa6c..5fceea1b7 100644 --- a/fedn/network/storage/statestore/stores/combiner_store.py +++ b/fedn/network/storage/statestore/stores/combiner_store.py @@ -86,7 +86,17 @@ def add(self, item: Combiner)-> Tuple[bool, Any]: raise NotImplementedError("Add not implemented for CombinerStore") def delete(self, id: str) -> bool: - raise NotImplementedError("Delete not implemented for CombinerStore") + if(ObjectId.is_valid(id)): + kwargs = { "_id": ObjectId(id)} + else: + return False + + document = self.database[self.collection].find_one(kwargs) + + if document is None: + raise EntityNotFound(f"Entity with (id) {id} not found") + + return super().delete(document["_id"]) def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDING, use_typing: bool = False, **kwargs) -> Dict[int, List[Combiner]]: """List entities diff --git a/fedn/network/storage/statestore/stores/store.py b/fedn/network/storage/statestore/stores/store.py index f1175c9f7..eb9d8b1bb 100644 --- a/fedn/network/storage/statestore/stores/store.py +++ b/fedn/network/storage/statestore/stores/store.py @@ -51,7 +51,8 @@ def add(self, item: T) -> Tuple[bool, Any]: return False, str(e) def delete(self, id: str) -> bool: - pass + result = self.database[self.collection].delete_one({"_id": ObjectId(id)}) + return result.deleted_count == 1 def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDING, use_typing: bool = False, **kwargs) -> Dict[int, List[T]]: """List entities diff --git a/pyproject.toml b/pyproject.toml index 72345c946..3a0be7aee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "fedn" -version = "0.13.0" +version = "0.14.0" description = "Scaleout Federated Learning" authors = [{ name = "Scaleout Systems AB", email = "contact@scaleoutsystems.com" }] readme = "README.rst"