Skip to content

Commit

Permalink
fixing ruff linting
Browse files Browse the repository at this point in the history
  • Loading branch information
FrankJonasmoelle committed Dec 19, 2024
1 parent 1afb9d2 commit 6e857bf
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 50 deletions.
54 changes: 31 additions & 23 deletions examples/splitlearning_titanic/api.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 133,
"execution_count": 168,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -11,7 +11,7 @@
},
{
"cell_type": "code",
"execution_count": 134,
"execution_count": 169,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -22,16 +22,16 @@
},
{
"cell_type": "code",
"execution_count": 135,
"execution_count": 170,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<fedn.network.api.client.APIClient at 0x107cd93d0>"
"<fedn.network.api.client.APIClient at 0x1162a05e0>"
]
},
"execution_count": 135,
"execution_count": 170,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -42,34 +42,34 @@
},
{
"cell_type": "code",
"execution_count": 136,
"execution_count": 171,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'count': 2,\n",
" 'result': [{'client_id': 'eb2bcf55-17c5-4e34-9b6f-d53b41e38cd4',\n",
" 'combiner': 'combinerd3486ca7',\n",
" 'id': '6763339936d0caaa05834846',\n",
" 'result': [{'client_id': 'ef5dd2e4-0660-400e-b883-2301366f3bd5',\n",
" 'combiner': 'combiner93d66f64',\n",
" 'id': '67641f327c2d2ddecce002c8',\n",
" 'ip': '127.0.0.1',\n",
" 'last_seen': 'Wed, 18 Dec 2024 21:42:03 GMT',\n",
" 'name': 'clientb0abc7ea',\n",
" 'last_seen': 'Thu, 19 Dec 2024 14:27:16 GMT',\n",
" 'name': 'client4e582282',\n",
" 'package': 'local',\n",
" 'status': 'online',\n",
" 'updated_at': '2024-12-18 21:42:01.464088'},\n",
" {'client_id': '8b4a21f9-c1e6-4632-a196-6aa29b268f61',\n",
" 'combiner': 'combinerd3486ca7',\n",
" 'id': '676332f636d0caaa058346f8',\n",
" 'updated_at': '2024-12-19 14:27:14.648611'},\n",
" {'client_id': '7d50d8eb-24df-4a91-aa98-737c06a9d455',\n",
" 'combiner': 'combiner93d66f64',\n",
" 'id': '67641f267c2d2ddecce002ab',\n",
" 'ip': '127.0.0.1',\n",
" 'last_seen': 'Wed, 18 Dec 2024 21:41:07 GMT',\n",
" 'name': 'client1aafa234',\n",
" 'last_seen': 'Thu, 19 Dec 2024 14:27:04 GMT',\n",
" 'name': 'client5c2c168f',\n",
" 'package': 'local',\n",
" 'status': 'online',\n",
" 'updated_at': '2024-12-18 21:39:18.504766'}]}"
" 'updated_at': '2024-12-19 14:27:02.270562'}]}"
]
},
"execution_count": 136,
"execution_count": 171,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -80,7 +80,7 @@
},
{
"cell_type": "code",
"execution_count": 137,
"execution_count": 172,
"metadata": {},
"outputs": [
{
Expand All @@ -89,7 +89,7 @@
"{'message': 'Compute package set.', 'success': True}"
]
},
"execution_count": 137,
"execution_count": 172,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -100,7 +100,7 @@
},
{
"cell_type": "code",
"execution_count": 138,
"execution_count": 173,
"metadata": {},
"outputs": [
{
Expand All @@ -110,6 +110,7 @@
" 'buffer_size': -1,\n",
" 'clients_requested': 8,\n",
" 'clients_required': 1,\n",
" 'committed_at': 'Thu, 19 Dec 2024 14:29:30 GMT',\n",
" 'delete_models_storage': True,\n",
" 'helper_type': 'splitlearninghelper',\n",
" 'round_timeout': 60,\n",
Expand All @@ -122,7 +123,7 @@
" 'success': True}"
]
},
"execution_count": 138,
"execution_count": 173,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -141,6 +142,13 @@
"\n",
"client.start_splitlearning_session(**session_config)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
2 changes: 1 addition & 1 deletion examples/splitlearning_titanic/client/backward.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from torch import optim

from fedn.common.log_config import logger
from fedn.utils.helpers.helpers import get_helper, save_metadata
from fedn.utils.helpers.helpers import get_helper

dir_path = os.path.dirname(os.path.realpath(__file__))
abs_path = os.path.abspath(dir_path)
Expand Down
3 changes: 2 additions & 1 deletion fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from werkzeug.security import safe_join
from werkzeug.utils import secure_filename

from fedn.common.config import FEDN_COMPUTE_PACKAGE_DIR, get_controller_config, get_network_config
from fedn.common.config import (FEDN_COMPUTE_PACKAGE_DIR,
get_controller_config, get_network_config)
from fedn.common.log_config import logger
from fedn.network.combiner.interfaces import CombinerUnavailableError
from fedn.network.state import ReducerState, ReducerStateToString
Expand Down
3 changes: 2 additions & 1 deletion fedn/network/clients/client_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from fedn.common.config import FEDN_CUSTOM_URL_PREFIX
from fedn.common.log_config import logger
from fedn.network.clients.fedn_client import ConnectToApiResult, FednClient, GrpcConnectionOptions
from fedn.network.clients.fedn_client import (ConnectToApiResult, FednClient,
GrpcConnectionOptions)
from fedn.network.combiner.modelservice import get_tmp_path
from fedn.utils.helpers.helpers import get_helper

Expand Down
18 changes: 9 additions & 9 deletions fedn/network/combiner/aggregators/splitlearningagg.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, update_handler):
def combine_models(self, helper=None, delete_models=True):
"""Concatenates client embeddings in the queue by aggregating them.
NOTE: After all embeddings are received, the embeddings need to be sorted
After all embeddings are received, the embeddings need to be sorted
(consistently) by client ID.
:param helper: An instance of :class: `fedn.utils.helpers.helpers.HelperBase`, ML framework specific helper, defaults to None
Expand All @@ -78,7 +78,7 @@ def combine_models(self, helper=None, delete_models=True):
try:
logger.info("AGGREGATOR({}): Getting next embedding from queue.".format(self.name))
new_embedding = self.update_handler.next_model_update() # NOTE: should return in format {client_id: embedding}

# Load model parameters and metadata
logger.info("AGGREGATOR({}): Loading embedding metadata.".format(self.name))
embedding_next, metadata = self.update_handler.load_model_update(new_embedding, helper)
Expand All @@ -103,7 +103,7 @@ def combine_models(self, helper=None, delete_models=True):
logger.error(tb)

logger.info("splitlearning aggregator: starting calculation of gradients")

# NOTE: When aggregating the embeddings in SplitLearning, they always need to be sorted consistently
client_order = sorted(embeddings.keys())

Expand All @@ -114,19 +114,19 @@ def combine_models(self, helper=None, delete_models=True):
concatenated_embeddings = torch.cat(ordered_embeddings, dim=1) # to 1d tensor

self.optimizer.zero_grad()

output = self.model(concatenated_embeddings)

# TODO: need to match indices of data samples to target indices in order to calculate gradients.
# NOTE: For one epoch, depending on the batch size, multiple communications are necessary.
# use dummy target for now
# batch_size = concatenated_embeddings.shape[0] # TODO: check

targets = helper.load_targets()

loss = self.criterion(output, targets)
loss.backward()

self.optimizer.step()

logger.info("AGGREGATOR({}): Loss: {}".format(self.name, loss))
Expand All @@ -135,6 +135,6 @@ def combine_models(self, helper=None, delete_models=True):
gradients = {}
for client_id, embedding in zip(client_order, ordered_embeddings):
gradients[str(client_id)] = embedding.grad.numpy()

logger.info("AGGREGATOR({}): Gradients are calculated.".format(self.name))
return gradients, data
return gradients, data
20 changes: 11 additions & 9 deletions fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
import fedn.network.grpc.fedn_pb2 as fedn
import fedn.network.grpc.fedn_pb2_grpc as rpc
from fedn.common.certificate.certificate import Certificate
from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream
from fedn.common.log_config import (logger, set_log_level_from_string,
set_log_stream)
from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler
from fedn.network.combiner.shared import client_store, combiner_store, prediction_store, repository, statestore, status_store, validation_store
from fedn.network.combiner.shared import (client_store, combiner_store,
prediction_store, repository,
statestore, status_store,
validation_store)
from fedn.network.grpc.server import Server, ServerConfig
from fedn.network.storage.statestore.stores.shared import EntityNotFound

Expand Down Expand Up @@ -289,11 +293,9 @@ def _send_request_type(self, request_type, session_id, model_id=None, config=Non
elif request_type == fedn.StatusType.MODEL_PREDICTION:
# TODO: add prediction clients type
clients = self.get_active_validators()
elif request_type == fedn.StatusType.FORWARD:
elif request_type == fedn.StatusType.FORWARD or request_type == fedn.StatusType.BACKWARD:
clients = self.get_active_trainers()
elif request_type == fedn.StatusType.BACKWARD:
clients = self.get_active_trainers()


for client in clients:
request = fedn.TaskRequest()
request.timestamp = str(datetime.now())
Expand All @@ -309,15 +311,15 @@ def _send_request_type(self, request_type, session_id, model_id=None, config=Non
# TODO: in prediction, request.data should also contain user-defined data/parameters
request.data = json.dumps({"presigned_url": presigned_url})
elif request_type == fedn.StatusType.MODEL_UPDATE: # noqa: SIM114
request.model_id = model_id
request.model_id = model_id
request.correlation_id = str(uuid.uuid4())
request.data = json.dumps(config)
elif request_type == fedn.StatusType.FORWARD: # noqa: SIM114
# request.model_id = model_id
# request.model_id = model_id
# request.correlation_id = str(uuid.uuid4()) # NOTE: necessary or not? TODO: rename model_id
request.data = json.dumps(config)
elif request_type == fedn.StatusType.BACKWARD:
request.model_id = model_id
request.model_id = model_id
request.correlation_id = str(uuid.uuid4())
request.data = json.dumps(config)
self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE)
Expand Down
7 changes: 4 additions & 3 deletions fedn/network/combiner/roundhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ def _backward_pass(self, config: dict, clients: list):
self.server.request_backward_pass(session_id=config["session_id"], gradient_id=config["model_id"], config=config, clients=clients)

time.sleep(1) # TODO: this is an easy hack for now. There needs to be some waiting time for the backward pass to complete.
# the above mechanism cannot be used, as the backward pass is not returning any model updates (update_handler.waitforit checks for aggregation on the queue)
# the above mechanism cannot be used, as the backward pass is not returning any model updates (update_handler.waitforit checks for aggregation on the
# queue)
return meta

def stage_model(self, model_id, timeout_retry=3, retry=2):
Expand Down Expand Up @@ -439,7 +440,7 @@ def execute_forward_pass(self, config):
data = {}
data["config"] = config
data["round_id"] = config["round_id"]

data["model_id"] = None # TODO: checking

clients = self._assign_round_clients(self.server.max_clients)
Expand Down Expand Up @@ -479,7 +480,7 @@ def execute_backward_pass(self, config):
data["round_id"] = config["round_id"]

logger.info("roundhandler execute_backward_pass: downloading model/gradient with id: {}".format(config["model_id"]))

# Download gradients and set in temp storage.
self.stage_model(config["model_id"]) # Download a model from persistent storage and set in modelservice

Expand Down
3 changes: 2 additions & 1 deletion fedn/network/controller/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import time
import uuid

from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_random
from tenacity import (retry, retry_if_exception_type, stop_after_delay,
wait_random)

from fedn.common.log_config import logger
from fedn.network.combiner.interfaces import CombinerUnavailableError
Expand Down
3 changes: 1 addition & 2 deletions fedn/utils/helpers/plugins/splitlearninghelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def load(self, path):
data = np.load(path)
logger.info("SPLIT LEARNING HELPER: loaded data from {}".format(path))
logger.info("data type: {}".format(type(data)))

result_dict = {k: data[k] for k in data.files}
return result_dict
except Exception as e:
Expand All @@ -65,7 +64,7 @@ def load_targets(self):
try:
data_path = os.environ.get("FEDN_LABELS_PATH")
except Exception as e:
logger.error("FEDN_LABELS_PATH environment variable is not set. Set via export FEDN_LABELS_PATH='path/to/labels.pt'")
logger.error(f"FEDN_LABELS_PATH environment variable is not set. Set via export FEDN_LABELS_PATH='path/to/labels.pt', {e}")
raise

try:
Expand Down

0 comments on commit 6e857bf

Please sign in to comment.