diff --git a/examples/splitlearning_titanic/api.ipynb b/examples/splitlearning_titanic/api.ipynb index bc801c2c..43331978 100644 --- a/examples/splitlearning_titanic/api.ipynb +++ b/examples/splitlearning_titanic/api.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 133, + "execution_count": 168, "metadata": {}, "outputs": [], "source": [ @@ -11,7 +11,7 @@ }, { "cell_type": "code", - "execution_count": 134, + "execution_count": 169, "metadata": {}, "outputs": [], "source": [ @@ -22,16 +22,16 @@ }, { "cell_type": "code", - "execution_count": 135, + "execution_count": 170, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "" + "" ] }, - "execution_count": 135, + "execution_count": 170, "metadata": {}, "output_type": "execute_result" } @@ -42,34 +42,34 @@ }, { "cell_type": "code", - "execution_count": 136, + "execution_count": 171, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'count': 2,\n", - " 'result': [{'client_id': 'eb2bcf55-17c5-4e34-9b6f-d53b41e38cd4',\n", - " 'combiner': 'combinerd3486ca7',\n", - " 'id': '6763339936d0caaa05834846',\n", + " 'result': [{'client_id': 'ef5dd2e4-0660-400e-b883-2301366f3bd5',\n", + " 'combiner': 'combiner93d66f64',\n", + " 'id': '67641f327c2d2ddecce002c8',\n", " 'ip': '127.0.0.1',\n", - " 'last_seen': 'Wed, 18 Dec 2024 21:42:03 GMT',\n", - " 'name': 'clientb0abc7ea',\n", + " 'last_seen': 'Thu, 19 Dec 2024 14:27:16 GMT',\n", + " 'name': 'client4e582282',\n", " 'package': 'local',\n", " 'status': 'online',\n", - " 'updated_at': '2024-12-18 21:42:01.464088'},\n", - " {'client_id': '8b4a21f9-c1e6-4632-a196-6aa29b268f61',\n", - " 'combiner': 'combinerd3486ca7',\n", - " 'id': '676332f636d0caaa058346f8',\n", + " 'updated_at': '2024-12-19 14:27:14.648611'},\n", + " {'client_id': '7d50d8eb-24df-4a91-aa98-737c06a9d455',\n", + " 'combiner': 'combiner93d66f64',\n", + " 'id': '67641f267c2d2ddecce002ab',\n", " 'ip': '127.0.0.1',\n", - " 'last_seen': 'Wed, 18 Dec 2024 21:41:07 GMT',\n", - " 'name': 'client1aafa234',\n", + " 'last_seen': 'Thu, 19 Dec 2024 14:27:04 GMT',\n", + " 'name': 'client5c2c168f',\n", " 'package': 'local',\n", " 'status': 'online',\n", - " 'updated_at': '2024-12-18 21:39:18.504766'}]}" + " 'updated_at': '2024-12-19 14:27:02.270562'}]}" ] }, - "execution_count": 136, + "execution_count": 171, "metadata": {}, "output_type": "execute_result" } @@ -80,7 +80,7 @@ }, { "cell_type": "code", - "execution_count": 137, + "execution_count": 172, "metadata": {}, "outputs": [ { @@ -89,7 +89,7 @@ "{'message': 'Compute package set.', 'success': True}" ] }, - "execution_count": 137, + "execution_count": 172, "metadata": {}, "output_type": "execute_result" } @@ -100,7 +100,7 @@ }, { "cell_type": "code", - "execution_count": 138, + "execution_count": 173, "metadata": {}, "outputs": [ { @@ -110,6 +110,7 @@ " 'buffer_size': -1,\n", " 'clients_requested': 8,\n", " 'clients_required': 1,\n", + " 'committed_at': 'Thu, 19 Dec 2024 14:29:30 GMT',\n", " 'delete_models_storage': True,\n", " 'helper_type': 'splitlearninghelper',\n", " 'round_timeout': 60,\n", @@ -122,7 +123,7 @@ " 'success': True}" ] }, - "execution_count": 138, + "execution_count": 173, "metadata": {}, "output_type": "execute_result" } @@ -141,6 +142,13 @@ "\n", "client.start_splitlearning_session(**session_config)" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/examples/splitlearning_titanic/client/backward.py b/examples/splitlearning_titanic/client/backward.py index efc0befe..ec7f7d98 100644 --- a/examples/splitlearning_titanic/client/backward.py +++ b/examples/splitlearning_titanic/client/backward.py @@ -7,7 +7,7 @@ from torch import optim from fedn.common.log_config import logger -from fedn.utils.helpers.helpers import get_helper, save_metadata +from fedn.utils.helpers.helpers import get_helper dir_path = os.path.dirname(os.path.realpath(__file__)) abs_path = os.path.abspath(dir_path) diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 4faeef6a..1d947b0b 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -7,7 +7,8 @@ from werkzeug.security import safe_join from werkzeug.utils import secure_filename -from fedn.common.config import FEDN_COMPUTE_PACKAGE_DIR, get_controller_config, get_network_config +from fedn.common.config import (FEDN_COMPUTE_PACKAGE_DIR, + get_controller_config, get_network_config) from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.state import ReducerState, ReducerStateToString diff --git a/fedn/network/clients/client_v2.py b/fedn/network/clients/client_v2.py index 7bac37c4..7898c152 100644 --- a/fedn/network/clients/client_v2.py +++ b/fedn/network/clients/client_v2.py @@ -8,7 +8,8 @@ from fedn.common.config import FEDN_CUSTOM_URL_PREFIX from fedn.common.log_config import logger -from fedn.network.clients.fedn_client import ConnectToApiResult, FednClient, GrpcConnectionOptions +from fedn.network.clients.fedn_client import (ConnectToApiResult, FednClient, + GrpcConnectionOptions) from fedn.network.combiner.modelservice import get_tmp_path from fedn.utils.helpers.helpers import get_helper diff --git a/fedn/network/combiner/aggregators/splitlearningagg.py b/fedn/network/combiner/aggregators/splitlearningagg.py index 47b4c892..eced9030 100644 --- a/fedn/network/combiner/aggregators/splitlearningagg.py +++ b/fedn/network/combiner/aggregators/splitlearningagg.py @@ -54,7 +54,7 @@ def __init__(self, update_handler): def combine_models(self, helper=None, delete_models=True): """Concatenates client embeddings in the queue by aggregating them. - NOTE: After all embeddings are received, the embeddings need to be sorted + After all embeddings are received, the embeddings need to be sorted (consistently) by client ID. :param helper: An instance of :class: `fedn.utils.helpers.helpers.HelperBase`, ML framework specific helper, defaults to None @@ -78,7 +78,7 @@ def combine_models(self, helper=None, delete_models=True): try: logger.info("AGGREGATOR({}): Getting next embedding from queue.".format(self.name)) new_embedding = self.update_handler.next_model_update() # NOTE: should return in format {client_id: embedding} - + # Load model parameters and metadata logger.info("AGGREGATOR({}): Loading embedding metadata.".format(self.name)) embedding_next, metadata = self.update_handler.load_model_update(new_embedding, helper) @@ -103,7 +103,7 @@ def combine_models(self, helper=None, delete_models=True): logger.error(tb) logger.info("splitlearning aggregator: starting calculation of gradients") - + # NOTE: When aggregating the embeddings in SplitLearning, they always need to be sorted consistently client_order = sorted(embeddings.keys()) @@ -114,19 +114,19 @@ def combine_models(self, helper=None, delete_models=True): concatenated_embeddings = torch.cat(ordered_embeddings, dim=1) # to 1d tensor self.optimizer.zero_grad() - + output = self.model(concatenated_embeddings) # TODO: need to match indices of data samples to target indices in order to calculate gradients. # NOTE: For one epoch, depending on the batch size, multiple communications are necessary. # use dummy target for now # batch_size = concatenated_embeddings.shape[0] # TODO: check - + targets = helper.load_targets() - + loss = self.criterion(output, targets) loss.backward() - + self.optimizer.step() logger.info("AGGREGATOR({}): Loss: {}".format(self.name, loss)) @@ -135,6 +135,6 @@ def combine_models(self, helper=None, delete_models=True): gradients = {} for client_id, embedding in zip(client_order, ordered_embeddings): gradients[str(client_id)] = embedding.grad.numpy() - + logger.info("AGGREGATOR({}): Gradients are calculated.".format(self.name)) - return gradients, data \ No newline at end of file + return gradients, data diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 31d6e725..9d4750c2 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -14,9 +14,13 @@ import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc from fedn.common.certificate.certificate import Certificate -from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream +from fedn.common.log_config import (logger, set_log_level_from_string, + set_log_stream) from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler -from fedn.network.combiner.shared import client_store, combiner_store, prediction_store, repository, statestore, status_store, validation_store +from fedn.network.combiner.shared import (client_store, combiner_store, + prediction_store, repository, + statestore, status_store, + validation_store) from fedn.network.grpc.server import Server, ServerConfig from fedn.network.storage.statestore.stores.shared import EntityNotFound @@ -289,11 +293,9 @@ def _send_request_type(self, request_type, session_id, model_id=None, config=Non elif request_type == fedn.StatusType.MODEL_PREDICTION: # TODO: add prediction clients type clients = self.get_active_validators() - elif request_type == fedn.StatusType.FORWARD: + elif request_type == fedn.StatusType.FORWARD or request_type == fedn.StatusType.BACKWARD: clients = self.get_active_trainers() - elif request_type == fedn.StatusType.BACKWARD: - clients = self.get_active_trainers() - + for client in clients: request = fedn.TaskRequest() request.timestamp = str(datetime.now()) @@ -309,15 +311,15 @@ def _send_request_type(self, request_type, session_id, model_id=None, config=Non # TODO: in prediction, request.data should also contain user-defined data/parameters request.data = json.dumps({"presigned_url": presigned_url}) elif request_type == fedn.StatusType.MODEL_UPDATE: # noqa: SIM114 - request.model_id = model_id + request.model_id = model_id request.correlation_id = str(uuid.uuid4()) request.data = json.dumps(config) elif request_type == fedn.StatusType.FORWARD: # noqa: SIM114 - # request.model_id = model_id + # request.model_id = model_id # request.correlation_id = str(uuid.uuid4()) # NOTE: necessary or not? TODO: rename model_id request.data = json.dumps(config) elif request_type == fedn.StatusType.BACKWARD: - request.model_id = model_id + request.model_id = model_id request.correlation_id = str(uuid.uuid4()) request.data = json.dumps(config) self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE) diff --git a/fedn/network/combiner/roundhandler.py b/fedn/network/combiner/roundhandler.py index 2c9f7482..f967b80b 100644 --- a/fedn/network/combiner/roundhandler.py +++ b/fedn/network/combiner/roundhandler.py @@ -283,7 +283,8 @@ def _backward_pass(self, config: dict, clients: list): self.server.request_backward_pass(session_id=config["session_id"], gradient_id=config["model_id"], config=config, clients=clients) time.sleep(1) # TODO: this is an easy hack for now. There needs to be some waiting time for the backward pass to complete. - # the above mechanism cannot be used, as the backward pass is not returning any model updates (update_handler.waitforit checks for aggregation on the queue) + # the above mechanism cannot be used, as the backward pass is not returning any model updates (update_handler.waitforit checks for aggregation on the + # queue) return meta def stage_model(self, model_id, timeout_retry=3, retry=2): @@ -439,7 +440,7 @@ def execute_forward_pass(self, config): data = {} data["config"] = config data["round_id"] = config["round_id"] - + data["model_id"] = None # TODO: checking clients = self._assign_round_clients(self.server.max_clients) @@ -479,7 +480,7 @@ def execute_backward_pass(self, config): data["round_id"] = config["round_id"] logger.info("roundhandler execute_backward_pass: downloading model/gradient with id: {}".format(config["model_id"])) - + # Download gradients and set in temp storage. self.stage_model(config["model_id"]) # Download a model from persistent storage and set in modelservice diff --git a/fedn/network/controller/control.py b/fedn/network/controller/control.py index 5928d565..990bccd2 100644 --- a/fedn/network/controller/control.py +++ b/fedn/network/controller/control.py @@ -3,7 +3,8 @@ import time import uuid -from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_random +from tenacity import (retry, retry_if_exception_type, stop_after_delay, + wait_random) from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerUnavailableError diff --git a/fedn/utils/helpers/plugins/splitlearninghelper.py b/fedn/utils/helpers/plugins/splitlearninghelper.py index b33c854e..4cd12961 100644 --- a/fedn/utils/helpers/plugins/splitlearninghelper.py +++ b/fedn/utils/helpers/plugins/splitlearninghelper.py @@ -53,7 +53,6 @@ def load(self, path): data = np.load(path) logger.info("SPLIT LEARNING HELPER: loaded data from {}".format(path)) logger.info("data type: {}".format(type(data))) - result_dict = {k: data[k] for k in data.files} return result_dict except Exception as e: @@ -65,7 +64,7 @@ def load_targets(self): try: data_path = os.environ.get("FEDN_LABELS_PATH") except Exception as e: - logger.error("FEDN_LABELS_PATH environment variable is not set. Set via export FEDN_LABELS_PATH='path/to/labels.pt'") + logger.error(f"FEDN_LABELS_PATH environment variable is not set. Set via export FEDN_LABELS_PATH='path/to/labels.pt', {e}") raise try: