From c4b3c4e471450d8015413322bbe94f6854991bdc Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 4 Jan 2024 13:35:18 +0100 Subject: [PATCH] =?UTF-8?q?Feature/SK-344=20|=C2=A0Add=20compute=20package?= =?UTF-8?q?=20registry=20(#495)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Started on new set compute package * set compute package updated * list compute packages added * added properties to /get_package * name and desription added to compute package * simplyfied compute package object response (get_package) * include name and desc in compute package list response * include id in result for compute packages * added id property to compute packages to enable tracking active easier * set active compute package * list_models can include active property * select correct file name * set active model * set CURRENT model * Removed comment * lint fix * Added api test for list_compute_packages --- fedn/fedn/network/api/client.py | 14 +- fedn/fedn/network/api/interface.py | 236 ++++++++++++++---- fedn/fedn/network/api/server.py | 50 +++- fedn/fedn/network/api/tests.py | 13 + fedn/fedn/network/controller/controlbase.py | 3 +- .../storage/statestore/mongostatestore.py | 152 +++++++++-- 6 files changed, 394 insertions(+), 74 deletions(-) diff --git a/fedn/fedn/network/api/client.py b/fedn/fedn/network/api/client.py index 58fc27304..4fe0cfe4a 100644 --- a/fedn/fedn/network/api/client.py +++ b/fedn/fedn/network/api/client.py @@ -169,7 +169,7 @@ def get_session(self, session_id): response = requests.get(self._get_url(f'get_session?session_id={session_id}'), self.verify) return response.json() - def set_package(self, path, helper): + def set_package(self, path: str, helper: str, name: str = None, description: str = None): """ Set the compute package in the statestore. :param path: The file path of the compute package to set. @@ -180,7 +180,8 @@ def set_package(self, path, helper): :rtype: dict """ with open(path, 'rb') as file: - response = requests.post(self._get_url('set_package'), files={'file': file}, data={'helper': helper}, verify=self.verify) + response = requests.post(self._get_url('set_package'), files={'file': file}, data={ + 'helper': helper, 'name': name, 'description': description}, verify=self.verify) return response.json() def get_package(self): @@ -192,6 +193,15 @@ def get_package(self): response = requests.get(self._get_url('get_package'), verify=self.verify) return response.json() + def list_compute_packages(self): + """ Get all compute packages from the statestore. + + :return: All compute packages with info. + :rtype: dict + """ + response = requests.get(self._get_url('list_compute_packages'), verify=self.verify) + return response.json() + def download_package(self, path): """ Download the compute package. diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 885f2b0ce..9e6eb0bb3 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -42,14 +42,15 @@ def _allowed_file_extension( :param filename: The filename to check. :type filename: str - :return: True if file extension is allowed, else False. - :rtype: bool + :return: True and extension str if file extension is allowed, else False and None. + :rtype: Tuple (bool, str) """ + if "." in filename: + extension = filename.rsplit(".", 1)[1].lower() + if extension in ALLOWED_EXTENSIONS: + return (True, extension) - return ( - "." in filename - and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS - ) + return (False, None) def get_clients(self, limit=None, skip=None, status=False): """Get all clients from the statestore. @@ -169,7 +170,24 @@ def get_session(self, session_id): payload[id] = info return jsonify(payload) - def set_compute_package(self, file, helper_type): + def set_active_compute_package(self, id: str): + + success = self.statestore.set_active_compute_package(id) + + if not success: + return ( + jsonify( + { + "success": False, + "message": "Failed to set compute package.", + } + ), + 400, + ) + + return jsonify({"success": True, "message": "Compute package set."}) + + def set_compute_package(self, file, helper_type: str, name: str = None, description: str = None): """Set the compute package in the statestore. :param file: The compute package to set. @@ -178,31 +196,54 @@ def set_compute_package(self, file, helper_type): :rtype: :class:`flask.Response` """ - if file and self._allowed_file_extension(file.filename): - filename = secure_filename(file.filename) - # TODO: make configurable, perhaps in config.py or package.py - file_path = os.path.join("/app/client/package/", filename) - file.save(file_path) + if ( + self.control.state() == ReducerState.instructing + or self.control.state() == ReducerState.monitoring + ): + return ( + jsonify( + { + "success": False, + "message": "Reducer is in instructing or monitoring state." + "Cannot set compute package.", + } + ), + 400, + ) - if ( - self.control.state() == ReducerState.instructing - or self.control.state() == ReducerState.monitoring - ): - return ( - jsonify( - { - "success": False, - "message": "Reducer is in instructing or monitoring state." - "Cannot set compute package.", - } - ), - 400, - ) + if file is None: + return ( + jsonify( + { + "success": False, + "message": "No file provided.", + } + ), + 404, + ) + + success, extension = self._allowed_file_extension(file.filename) + + if not success: + return ( + jsonify( + { + "success": False, + "message": f"File extension {extension} not allowed.", + } + ), + 404, + ) + + file_name = file.filename + storage_file_name = secure_filename(f"{str(uuid.uuid4())}.{extension}") + + file_path = os.path.join("/app/client/package/", storage_file_name) + file.save(file_path) - self.control.set_compute_package(filename, file_path) - self.statestore.set_helper(helper_type) + self.control.set_compute_package(storage_file_name, file_path) + success = self.statestore.set_compute_package(file_name, storage_file_name, helper_type, name, description) - success = self.statestore.set_compute_package(filename) if not success: return ( jsonify( @@ -213,6 +254,7 @@ def set_compute_package(self, file, helper_type): ), 400, ) + return jsonify({"success": True, "message": "Compute package set."}) def _get_compute_package_name(self): @@ -227,7 +269,7 @@ def _get_compute_package_name(self): return None, message else: try: - name = package_objects["filename"] + name = package_objects["storage_file_name"] except KeyError as e: message = "No compute package found. Key error." print(e) @@ -240,22 +282,87 @@ def get_compute_package(self): :return: The compute package as a json response. :rtype: :class:`flask.Response` """ - package_object = self.statestore.get_compute_package() - if package_object is None: + result = self.statestore.get_compute_package() + if result is None: return ( jsonify( {"success": False, "message": "No compute package found."} ), 404, ) - payload = {} - id = str(package_object["_id"]) - info = { - "filename": package_object["filename"], - "helper": package_object["helper"], + + obj = { + "id": result["id"] if "id" in result else "", + "file_name": result["file_name"], + "helper": result["helper"], + "committed_at": result["committed_at"], + "storage_file_name": result["storage_file_name"] if "storage_file_name" in result else "", + "name": result["name"] if "name" in result else "", + "description": result["description"] if "description" in result else "", } - payload[id] = info - return jsonify(payload) + + return jsonify(obj) + + def list_compute_packages(self, limit: str = None, skip: str = None, include_active: str = None): + """Get paginated list of compute packages from the statestore. + + :return: All compute packages as a json response. + :rtype: :class:`flask.Response` + """ + + if limit is not None and skip is not None: + limit = int(limit) + skip = int(skip) + + include_active: bool = include_active == "true" + + result = self.statestore.list_compute_packages(limit, skip) + if result is None: + return ( + jsonify( + {"success": False, "message": "No compute packages found."} + ), + 404, + ) + + active_package_id: str = None + + if include_active: + active_package = self.statestore.get_compute_package() + + if active_package is not None: + active_package_id = active_package["id"] if "id" in active_package else "" + + if include_active: + arr = [ + { + "id": element["id"] if "id" in element else "", + "file_name": element["file_name"], + "helper": element["helper"], + "committed_at": element["committed_at"], + "storage_file_name": element["storage_file_name"] if "storage_file_name" in element else "", + "name": element["name"] if "name" in element else "", + "description": element["description"] if "description" in element else "", + "active": "id" in element and element["id"] == active_package_id, + } + for element in result["result"] + ] + else: + arr = [ + { + "id": element["id"] if "id" in element else "", + "file_name": element["file_name"], + "helper": element["helper"], + "committed_at": element["committed_at"], + "storage_file_name": element["storage_file_name"] if "storage_file_name" in element else "", + "name": element["name"] if "name" in element else "", + "description": element["description"] if "description" in element else "", + } + for element in result["result"] + ] + + result = {"result": arr, "count": result["count"]} + return jsonify(result) def download_compute_package(self, name): """Download the compute package. @@ -595,7 +702,30 @@ def get_latest_model(self): {"success": False, "message": "No initial model set."} ) - def get_models(self, session_id=None, limit=None, skip=None): + def set_current_model(self, model_id: str): + """Set the active model in the statestore. + + :param model_id: The model id to set. + :type model_id: str + :return: A json response with success or failure message. + :rtype: :class:`flask.Response` + """ + success = self.statestore.set_current_model(model_id) + + if not success: + return ( + jsonify( + { + "success": False, + "message": "Failed to set active model.", + } + ), + 400, + ) + + return jsonify({"success": True, "message": "Active model set."}) + + def get_models(self, session_id: str = None, limit: str = None, skip: str = None, include_active: str = None): result = self.statestore.list_models(session_id, limit, skip) if result is None: @@ -604,10 +734,30 @@ def get_models(self, session_id=None, limit=None, skip=None): 404, ) - arr = [] + include_active: bool = include_active == "true" - for model in result["result"]: - arr.append(model) + if include_active: + + latest_model = self.statestore.get_latest_model() + + arr = [ + { + "committed_at": element["committed_at"], + "model": element["model"], + "session_id": element["session_id"], + "active": element["model"] == latest_model, + } + for element in result["result"] + ] + else: + arr = [ + { + "committed_at": element["committed_at"], + "model": element["model"], + "session_id": element["session_id"], + } + for element in result["result"] + ] result = {"result": arr, "count": result["count"]} diff --git a/fedn/fedn/network/api/server.py b/fedn/fedn/network/api/server.py index c564af3d9..0b385c566 100644 --- a/fedn/fedn/network/api/server.py +++ b/fedn/fedn/network/api/server.py @@ -45,8 +45,9 @@ def list_models(): session_id = request.args.get("session_id", None) limit = request.args.get("limit", None) skip = request.args.get("skip", None) + include_active = request.args.get("include_active", None) - return api.get_models(session_id, limit, skip) + return api.get_models(session_id, limit, skip, include_active) @app.route("/delete_model_trail", methods=["GET", "POST"]) @@ -183,6 +184,12 @@ def get_session(): return api.get_session(session_id) +@app.route("/set_active_package", methods=["PUT"]) +def set_active_package(): + id = request.args.get("id", None) + return api.set_active_compute_package(id) + + @app.route("/set_package", methods=["POST"]) def set_package(): """ Set the compute package in the statestore. @@ -198,6 +205,9 @@ def set_package(): rtype: json """ helper_type = request.form.get("helper", None) + name = request.form.get("name", None) + description = request.form.get("description", None) + if helper_type is None: return ( jsonify({"success": False, "message": "Missing helper type."}), @@ -207,7 +217,7 @@ def set_package(): file = request.files["file"] except KeyError: return jsonify({"success": False, "message": "Missing file."}), 400 - return api.set_compute_package(file=file, helper_type=helper_type) + return api.set_compute_package(file=file, helper_type=helper_type, name=name, description=description) @app.route("/get_package", methods=["GET"]) @@ -219,6 +229,24 @@ def get_package(): return api.get_compute_package() +@app.route("/list_compute_packages", methods=["GET"]) +def list_compute_packages(): + """Get the compute package from the statestore. + return: The compute package as a json object. + rtype: json + """ + + limit = request.args.get("limit", None) + skip = request.args.get("skip", None) + include_active = request.args.get("include_active", None) + + return api.list_compute_packages( + limit=limit, + skip=skip, + include_active=include_active + ) + + @app.route("/download_package", methods=["GET"]) def download_package(): """Download the compute package. @@ -244,6 +272,24 @@ def get_latest_model(): return api.get_latest_model() +@app.route("/set_current_model", methods=["PUT"]) +def set_current_model(): + """Set the initial model in the statestore and upload to model repository. + Usage with curl: + curl -k -X PUT + -F id= + http://localhost:8092/set_current_model + + param: id: The model id to set. + type: id: str + return: boolean. + rtype: json + """ + id = request.args.get("id", None) + if id is None: + return jsonify({"success": False, "message": "Missing model id."}), 400 + return api.set_current_model(id) + # Get initial model endpoint diff --git a/fedn/fedn/network/api/tests.py b/fedn/fedn/network/api/tests.py index 7395d9bdf..cf9e27b3e 100644 --- a/fedn/fedn/network/api/tests.py +++ b/fedn/fedn/network/api/tests.py @@ -160,6 +160,19 @@ def test_list_combiners(self): # Assert api.get_all_combiners was called fedn.network.api.server.api.get_all_combiners.assert_called_once_with() + def test_list_compute_packages(self): + """ Test list_compute_packages endpoint. """ + # Mock api.list_compute_packages + return_value = {"test": "test"} + fedn.network.api.server.api.list_compute_packages = MagicMock(return_value=return_value) + # Make request + response = self.app.get('/list_combiners') + # Assert response + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json, return_value) + # Assert api.list_compute_packages was called + fedn.network.api.server.api.list_compute_packages.assert_called_once_with() + def test_list_rounds(self): """ Test list_rounds endpoint. """ # Mock api.get_all_rounds diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index d8bb38556..90a86e3bd 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -160,7 +160,7 @@ def get_compute_package_name(self): definition = self.statestore.get_compute_package() if definition: try: - package_name = definition["filename"] + package_name = definition["storage_file_name"] return package_name except (IndexError, KeyError): print( @@ -174,7 +174,6 @@ def get_compute_package_name(self): def set_compute_package(self, filename, path): """Persist the configuration for the compute package.""" self.model_repository.set_compute_package(filename, path) - self.statestore.set_compute_package(filename) def get_compute_package(self, compute_package=""): """ diff --git a/fedn/fedn/network/storage/statestore/mongostatestore.py b/fedn/fedn/network/storage/statestore/mongostatestore.py index 389325e10..fe6f93c51 100644 --- a/fedn/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/fedn/network/storage/statestore/mongostatestore.py @@ -57,6 +57,8 @@ def __init__(self, network_id, config): self.clients = None raise + self.init_index() + def connect(self): """ Establish client connection to MongoDB. @@ -75,6 +77,9 @@ def connect(self): except Exception: raise + def init_index(self): + self.package.create_index([("id", pymongo.DESCENDING)]) + def is_inited(self): """Check if the statestore is intialized. @@ -242,6 +247,32 @@ def get_latest_model(self): except (KeyError, IndexError): return None + def set_current_model(self, model_id: str): + """Set the current model in statestore. + + :param model_id: The model id. + :type model_id: str + :return: + """ + + try: + + committed_at = datetime.now() + + existing_model = self.model.find_one({"key": "models", "model": model_id}) + + if existing_model is not None: + + self.model.update_one( + {"key": "current_model"}, {"$set": {"model": model_id, "committed_at": committed_at, "session_id": None}}, True + ) + + return True + except Exception as e: + print("ERROR: {}".format(e), flush=True) + + return False + def get_latest_round(self): """Get the id of the most recent round. @@ -283,34 +314,68 @@ def get_validations(self, **kwargs): result = self.control.validations.find(kwargs) return result - def set_compute_package(self, filename): + def set_active_compute_package(self, id: str): + """Set the active compute package in statestore. + + :param id: The id of the compute package (not document _id). + :type id: str + :return: True if successful. + :rtype: bool + """ + + try: + + find = {"id": id} + projection = {"_id": False, "key": False} + + doc = self.control.package.find_one(find, projection) + + if doc is None: + return False + + doc["key"] = "active" + + self.control.package.replace_one( + {"key": "active"}, doc + ) + + except Exception as e: + print("ERROR: {}".format(e), flush=True) + return False + + return True + + def set_compute_package(self, file_name: str, storage_file_name: str, helper_type: str, name: str = None, description: str = None): """Set the active compute package in statestore. - :param filename: The filename of the compute package. - :type filename: str + :param file_name: The file_name of the compute package. + :type file_name: str :return: True if successful. :rtype: bool """ + + obj = { + "file_name": file_name, + "storage_file_name": storage_file_name, + "helper": helper_type, + "committed_at": datetime.now(), + "name": name, + "description": description, + "id": str(uuid.uuid4()), + } + self.control.package.update_one( {"key": "active"}, { - "$set": { - "filename": filename, - "committed_at": str(datetime.now()), - } - }, - True, - ) - self.control.package.update_one( - {"key": "package_trail"}, - { - "$push": { - "filename": filename, - "committed_at": str(datetime.now()), - } + "$set": obj }, True, ) + + trail_obj = {**{"key": "package_trail"}, **obj} + + self.control.package.insert_one(trail_obj) + return True def get_compute_package(self): @@ -319,17 +384,54 @@ def get_compute_package(self): :return: The active compute package. :rtype: ObjectID """ - ret = self.control.package.find({"key": "active"}) try: - retcheck = ret[0] - if ( - retcheck is None or retcheck == "" or retcheck == " " - ): # ugly check for empty string - return None - return retcheck - except (KeyError, IndexError): + + find = {"key": "active"} + projection = {"key": False, "_id": False} + ret = self.control.package.find_one(find, projection) + return ret + except Exception as e: + print("ERROR: {}".format(e), flush=True) + return None + + def list_compute_packages(self, limit: int = None, skip: int = None, sort_key="committed_at", sort_order=pymongo.DESCENDING): + """List compute packages in the statestore (paginated). + + :param limit: The maximum number of compute packages to return. + :type limit: int + :param skip: The number of compute packages to skip. + :type skip: int + :param sort_key: The key to sort by. + :type sort_key: str + :param sort_order: The sort order. + :type sort_order: pymongo.ASCENDING or pymongo.DESCENDING + :return: Dictionary of compute packages in result and count. + :rtype: dict + """ + + result = None + count = None + + find_option = {"key": "package_trail"} + projection = {"key": False, "_id": False} + + try: + if limit is not None and skip is not None: + result = self.control.package.find(find_option, projection).limit(limit).skip(skip).sort(sort_key, sort_order) + else: + result = self.control.package.find(find_option, projection).sort(sort_key, sort_order) + + count = self.control.package.count_documents(find_option) + + except Exception as e: + print("ERROR: {}".format(e), flush=True) return None + return { + "result": result or [], + "count": count or 0, + } + def set_helper(self, helper): """Set the active helper package in statestore.