From 7c81674651254d86dab3d291617a0a13ac7d83ca Mon Sep 17 00:00:00 2001 From: Stefan Hellander Date: Tue, 5 Dec 2023 12:14:38 +0100 Subject: [PATCH] More clean-up --- fedn/cli/run_cmd.py | 39 +------------ fedn/fedn/common/config.py | 55 +++++++++++++++++-- .../common/storage/filesystem/filesystem.py | 7 +-- fedn/fedn/network/api/interface.py | 35 ++++-------- fedn/fedn/network/api/server.py | 52 ++++-------------- fedn/fedn/network/clients/package.py | 9 ++- fedn/fedn/network/combiner/server.py | 11 ++-- fedn/fedn/network/controller/controlbase.py | 16 +++--- fedn/fedn/network/loadbalancer/leastpacked.py | 3 - 9 files changed, 93 insertions(+), 134 deletions(-) diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index 1809c9155..cc41177f9 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -3,6 +3,7 @@ import click import yaml +from fedn.common.config import get_default_config from fedn.common.exceptions import InvalidClientConfig from fedn.network.api.server import Controller from fedn.network.clients.client import Client @@ -184,37 +185,7 @@ def dashboard_cmd(ctx, host, port, secret_key, local_package, name, init): fedn_config = get_statestore_config_from_file(config['init']) except Exception as e: print('Failed to read config from settings file, trying default values.', flush=True) - fedn_config = { - "statestore": { - "type": "MongoDB", - "mongo_config": { - "username": "admin", - "password": "admin", - "host": "localhost", - "port": 27017 - } - }, - "network_id": "fedn-network", - "controller": { - "host": "localhost", - "port": 8092, - "debug": True - }, - "storage": { - "storage_type": "S3", - "storage_config": { - "storage_hostname": "localhost", - "storage_port": 9100, - "storage_access_key": "admin", - "storage_secret_key": "password", - "storage_bucket": "fedn-models", - "context_bucket": "fedn-context", - "storage_secure_mode": False - } - } - } - # print(e, flush=True) - # exit(-1) + fedn_config = get_default_config() if not remote: _ = check_helper_config_file(fedn_config) @@ -324,11 +295,5 @@ def combiner_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, def controller_cmd(ctx): """ """ - # config = {'discover_host': discoverhost, 'discover_port': discoverport, 'token': token, 'host': host, - # 'port': port, 'fqdn': fqdn, 'name': name, 'secure': secure, 'verify': verify, 'max_clients': max_clients, - # 'init': init, 'aggregator': aggregator} - - # if config['init']: - # apply_config(config) controller = Controller() controller.run() \ No newline at end of file diff --git a/fedn/fedn/common/config.py b/fedn/fedn/common/config.py index 264e27ce6..6d57205fa 100644 --- a/fedn/fedn/common/config.py +++ b/fedn/fedn/common/config.py @@ -7,6 +7,43 @@ global STATESTORE_CONFIG global MODELSTORAGE_CONFIG +def get_default_config(): + statestore_config = { + "statestore": { + "type": "MongoDB", + "mongo_config": { + "username": "admin", + "password": "admin", + "host": "localhost", + "port": 27017 + } + } + } + network_config = { + "network_id": "fedn-network" + } + controller_config = { + "controller": { + "host": "localhost", + "port": 8092, + "debug": True + } + } + storage_path = os.path.expanduser("~/.fedn/files") + storage_config = { + "storage": { + "storage_type": "filesystem", + "storage_config": { + "storage_path": storage_path + } + } + } + fedn_config = {} + fedn_config.update(statestore_config) + fedn_config.update(network_config) + fedn_config.update(controller_config) + fedn_config.update(storage_config) + return fedn_config def get_environment_config(): """ Get the configuration from environment variables. @@ -14,10 +51,8 @@ def get_environment_config(): global STATESTORE_CONFIG global MODELSTORAGE_CONFIG - STATESTORE_CONFIG = os.environ.get('STATESTORE_CONFIG', - '/workspaces/fedn/config/settings-reducer.yaml.template') - MODELSTORAGE_CONFIG = os.environ.get('MODELSTORAGE_CONFIG', - '/workspaces/fedn/config/settings-reducer.yaml.template') + STATESTORE_CONFIG = os.environ.get('STATESTORE_CONFIG', None) + MODELSTORAGE_CONFIG = os.environ.get('MODELSTORAGE_CONFIG', None) def get_statestore_config(file=None): @@ -30,8 +65,12 @@ def get_statestore_config(file=None): """ if file is None: get_environment_config() + if STATESTORE_CONFIG: file = STATESTORE_CONFIG - + else: + fedn_config = get_default_config() + return fedn_config['statestore'] + with open(file, 'r') as config_file: try: settings = dict(yaml.safe_load(config_file)) @@ -50,7 +89,13 @@ def get_modelstorage_config(file=None): """ if file is None: get_environment_config() + + if MODELSTORAGE_CONFIG: file = MODELSTORAGE_CONFIG + else: + fedn_config = get_default_config() + return fedn_config['storage'] + with open(file, 'r') as config_file: try: settings = dict(yaml.safe_load(config_file)) diff --git a/fedn/fedn/common/storage/filesystem/filesystem.py b/fedn/fedn/common/storage/filesystem/filesystem.py index 5fb2dbea0..23ec48455 100644 --- a/fedn/fedn/common/storage/filesystem/filesystem.py +++ b/fedn/fedn/common/storage/filesystem/filesystem.py @@ -2,7 +2,7 @@ import uuid class LocalFileSystemModelRepository: - def __init__(self, directory='./fedn-files'): + def __init__(self, directory='./'): self.directory = directory if not os.path.exists(directory): os.makedirs(directory) @@ -44,9 +44,6 @@ def set_model(self, model, is_file=True): def set_compute_package(self, name, compute_package, is_file=True): package_path = self.get_model_path(name) if is_file: - print("MMMMMMMMMMMMMMMMMMM") - print(package_path) - print(compute_package) with open(compute_package, 'rb') as src, open(package_path, 'wb') as dst: dst.write(src.read()) else: @@ -54,9 +51,7 @@ def set_compute_package(self, name, compute_package, is_file=True): file.write(compute_package) def get_compute_package(self, compute_package): - print("GETTING COMPUTE PACKAGE") package_path = self.get_model_path(compute_package) - print(package_path) if os.path.exists(package_path): with open(package_path, 'rb') as file: return file.read() diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 030c88916..59e4934dc 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -24,6 +24,10 @@ class API: def __init__(self, statestore, control): self.statestore = statestore self.control = control + # TODO: make configurable, perhaps in config.py or package.py + self.local_path = os.path.expanduser("~/.fedn/tmp_dir") + if not os.path.exists(self.local_path): + os.makedirs(self.local_path) self.name = "api" def _to_dict(self): @@ -218,14 +222,7 @@ def set_compute_package(self, file, helper_type): if file and self._allowed_file_extension(file.filename): filename = secure_filename(file.filename) - # TODO: make configurable, perhaps in config.py or package.py - file_path = os.path.join(os.getcwd(), filename) - print("ASDASDASDASDASD") - print(file_path) - file.seek(0, 2) # seeks the end of the file - filesize = file.tell() # tell at which byte we are - print(filesize) - file.seek(0) + file_path = os.path.join(self.local_path, filename) file.save(file_path) if ( @@ -270,15 +267,12 @@ def _get_compute_package_name(self): message = "No compute package found." return None, message else: - print("<><><><><><><>><><><><><") try: - print(package_objects) name = package_objects["filename"] except KeyError as e: message = "No compute package found. Key error." print(e) return None, message - print("SUCCESS <><><<><><><><><><><") return name, "success" def get_compute_package(self): @@ -317,26 +311,19 @@ def download_compute_package(self, name): try: mutex = threading.Lock() mutex.acquire() - # TODO: make configurable, perhaps in config.py or package.py - print("SENDING >?>?>?>?>>?>?>?>?>?>?>?>?>") - print("{}{}".format(os.getcwd()+"./", name)) return send_from_directory( - os.getcwd()+"/./", name, as_attachment=True + self.local_path, name, as_attachment=True ) except Exception as err: - print("IN EXCEPTION >?>?>?>?>?>?>?>?>?>") - print(err) try: data = self.control.get_compute_package(name) - # TODO: make configurable, perhaps in config.py or package.py - file_path = os.path.join("./", name) + file_path = os.path.join(self.local_path, name) with open(file_path, "wb") as fh: fh.write(data) - # TODO: make configurable, perhaps in config.py or package.py return send_from_directory( - "./", name, as_attachment=True + self.local_path, name, as_attachment=True ) - except Exception: + except Exception as err: raise finally: mutex.release() @@ -354,9 +341,7 @@ def _create_checksum(self, name=None): name, message = self._get_compute_package_name() if name is None: return False, message, "" - file_path = os.path.join( - ".//", name - ) # TODO: make configurable, perhaps in config.py or package.py + file_path = os.path.join(self.local_path, name) try: sum = str(sha(file_path)) except FileNotFoundError: diff --git a/fedn/fedn/network/api/server.py b/fedn/fedn/network/api/server.py index a52ead6dd..90f50096a 100644 --- a/fedn/fedn/network/api/server.py +++ b/fedn/fedn/network/api/server.py @@ -1,7 +1,7 @@ from flask import Flask, jsonify, request from fedn.common.config import (get_controller_config, get_modelstorage_config, - get_network_config, get_statestore_config) + get_network_config, get_statestore_config, get_default_config) from fedn.common.log_config import logger from fedn.network.api.interface import API from fedn.network.controller.control import Control @@ -15,36 +15,20 @@ def __init__(self): statestore_config = get_statestore_config() except FileNotFoundError as err: logger.debug("No statestore config, using default values.") - statestore_config = { - "type": "MongoDB", - "mongo_config": { - "username": "admin", - "password": "admin", - "host": "localhost", - "port": 27017 - } - } + fedn_config = get_default_config() + statestore_config = fedn_config['statestore'] try: network_id = get_network_config() - except FileNotFoundError as err: + except (FileNotFoundError, TypeError) as err: logger.debug("No network config found, using default values.") - network_id = "fedn_network" + fedn_config = get_default_config() + network_id = fedn_config['network_id'] try: modelstorage_config = get_modelstorage_config() - except FileNotFoundError as err: + except (FileNotFoundError, TypeError) as err: logger.debug("No model storage config found, using default values.") - modelstorage_config = { - "storage_type": "filesystem", - "storage_config": { - "storage_hostname": "localhost", - "storage_port": 9100, - "storage_access_key": "admin", - "storage_secret_key": "password", - "storage_bucket": "fedn-models", - "context_bucket": "fedn-context", - "storage_secure_mode": False - } - } + fedn_config = get_default_config() + modelstorage_config = fedn_config['storage_config'] statestore = MongoStateStore( network_id, statestore_config["mongo_config"], modelstorage_config ) @@ -241,13 +225,6 @@ def set_package(): ) try: file = request.files["file"] - print(file) - print(file.content_length) - file.seek(0, 2) # seeks the end of the file - filesize = file.tell() # tell at which byte we are - print(filesize) - file.seek(0) - file.save("testest.tgz") except KeyError: return jsonify({"success": False, "message": "Missing file."}), 400 return self.api.set_compute_package(file=file, helper_type=helper_type) @@ -268,9 +245,7 @@ def download_package(): return: The compute package as a json object. rtype: json """ - print("HERE") name = request.args.get("name", None) - print(name) return self.api.download_compute_package(name) @@ -429,13 +404,10 @@ def get_plot_data(): def run(self): try: config = get_controller_config() - except FileNotFoundError as err: + except (FileNotFoundError, TypeError) as err: logger.debug("Found no controller config, using default values.") - config = { - "host": "localhost", - "port": 8092, - "debug": True - } + fedn_config = get_default_config() + config = fedn_config['controller'] port = config["port"] debug = config["debug"] self.app.run(debug=debug, port=port, host="0.0.0.0") diff --git a/fedn/fedn/network/clients/package.py b/fedn/fedn/network/clients/package.py index c947bc85b..61fdc80ed 100644 --- a/fedn/fedn/network/clients/package.py +++ b/fedn/fedn/network/clients/package.py @@ -59,11 +59,10 @@ def download(self, host, port, token, force_ssl=False, secure=False, name=None): if name: logger.debug("Downloading package with name: {}.".format(name)) path = path + "?name={}".format(name) - else: - logger.critical("No name set for compute package.") - logger.debug("Name: {}.".format(name)) - logger.debug("Path: {}://{}".format(scheme, path)) - # sys.exit(1) + # else: + # logger.critical("No name set for compute package.") + # logger.debug("Name: {}.".format(name)) + # logger.debug("Path: {}".format(path)) with requests.get(path, stream=True, verify=False, headers={'Authorization': 'Token {}'.format(token)}) as r: if 200 <= r.status_code < 204: diff --git a/fedn/fedn/network/combiner/server.py b/fedn/fedn/network/combiner/server.py index 38235691e..ddb4d5d37 100644 --- a/fedn/fedn/network/combiner/server.py +++ b/fedn/fedn/network/combiner/server.py @@ -120,11 +120,12 @@ def __init__(self, config): 'key': key} # Set up model repository - # if announce_config['storage']['storage_type'] == 'filesystem': - self.repository = LocalFileSystemModelRepository() - # else: - # self.repository = S3ModelRepository( - # announce_config['storage']['storage_config']) + if announce_config['storage']['storage_type'] == 'filesystem': + storage_path = announce_config['storage']['storage_config'].get('storage_path', './') + self.repository = LocalFileSystemModelRepository(storage_path) + else: + self.repository = S3ModelRepository( + announce_config['storage']['storage_config']) # Create gRPC server self.server = Server(self, self.modelservice, grpc_config) diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index cc297641e..5012e26e5 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -67,13 +67,13 @@ def __init__(self, statestore): ) raise MisconfiguredStorageBackend() print(storage_config) - if storage_config["storage_type"] == "S3" or True: - # self.model_repository = S3ModelRepository( - # storage_config["storage_config"] - # ) - # elif storage_config['storage_type'] == "filesystem": - # print("Using local filesystem for storage.") - self.model_repository = LocalFileSystemModelRepository() + if storage_config["storage_type"] == "S3": + self.model_repository = S3ModelRepository( + storage_config["storage_config"] + ) + elif storage_config['storage_type'] == "filesystem": + storage_path = storage_config['storage_config'].get('storage_path', './') + self.model_repository = LocalFileSystemModelRepository(storage_path) else: print( "REDUCER CONTROL: Unsupported storage backend, exiting.", @@ -276,7 +276,7 @@ def commit(self, model_id, model=None, session_id=None): "CONTROL: Saving model file temporarily to disk...", flush=True ) outfile_name = helper.save(model) - print("CONTROL: Uploading model to Minio...", flush=True) + print("CONTROL: Moving model to permanent storage...", flush=True) model_id = self.model_repository.set_model( outfile_name, is_file=True ) diff --git a/fedn/fedn/network/loadbalancer/leastpacked.py b/fedn/fedn/network/loadbalancer/leastpacked.py index 2888042e3..1e9a2f881 100644 --- a/fedn/fedn/network/loadbalancer/leastpacked.py +++ b/fedn/fedn/network/loadbalancer/leastpacked.py @@ -19,12 +19,9 @@ def find_combiner(self): """ min_clients = None selected_combiner = None - print("HERE") for combiner in self.network.get_combiners(): - print("COMB") try: if combiner.allowing_clients(): - print("ALLOW") combiner_state = combiner.report() if not min_clients: min_clients = combiner_state['nr_active_clients']