Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Wrede committed Dec 1, 2023
1 parent 475d9a2 commit 68cdeda
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 22 deletions.
11 changes: 10 additions & 1 deletion fedn/fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,11 +794,20 @@ def start_session(
{"success": False, "message": "A session is already running."}
)

# Check that initial (seed) model is set
if not self.statestore.get_initial_model():
return jsonify(
{
"success": False,
"message": "No initial model set. Set initial model before starting session.",
}
)

# Check available clients per combiner
clients_available = 0
for combiner in self.control.network.get_combiners():
try:
nr_active_clients = len(combiner.get_active_clients())
nr_active_clients = len(combiner.list_active_clients())
clients_available = clients_available + int(nr_active_clients)
except CombinerUnavailableError as e:
# TODO: Handle unavailable combiner, stop session or continue?
Expand Down
11 changes: 4 additions & 7 deletions fedn/fedn/network/combiner/aggregators/aggregatorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import queue
from abc import ABC, abstractmethod

import fedn.common.net.grpc.fedn_pb2 as fedn
from fedn.common.log_config import logger

AGGREGATOR_PLUGIN_PATH = "fedn.network.combiner.aggregators.{}"
Expand Down Expand Up @@ -61,19 +60,17 @@ def on_model_update(self, model_update):
:type model_id: str
"""
try:
logger.info("AGGREGATOR({}): callback received model update {}".format(self.name, model_update.model_update_id),
log_level=fedn.Status.INFO)
logger.info("AGGREGATOR({}): callback received model update {}".format(self.name, model_update.model_update_id))

# Validate the update and metadata
valid_update = self._validate_model_update(model_update)
if valid_update:
# Push the model update to the processing queue
self.model_updates.put(model_update)
else:
logger.info("AGGREGATOR({}): Invalid model update, skipping.".format(self.name))
logger.warning("AGGREGATOR({}): Invalid model update, skipping.".format(self.name))
except Exception as e:
logger.info("AGGREGATOR({}): Failed to receive model update! {}".format(self.name, e),
log_level=fedn.Status.WARNING)
logger.error("AGGREGATOR({}): Failed to receive model update! {}".format(self.name, e))
pass

def _validate_model_update(self, model_update):
Expand All @@ -87,7 +84,7 @@ def _validate_model_update(self, model_update):
# TODO: Validate the metadata to check that it contains all variables assumed by the aggregator.
data = json.loads(model_update.meta)['training_metadata']
if 'num_examples' not in data.keys():
logger.info("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name))
logger.error("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name))
return False
return True

Expand Down
8 changes: 3 additions & 5 deletions fedn/fedn/network/combiner/aggregators/fedavg.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import fedn.common.net.grpc.fedn_pb2 as fedn
from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase
from fedn.common.log_config import logger
from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase


class Aggregator(AggregatorBase):
Expand Down Expand Up @@ -78,12 +77,11 @@ def combine_models(self, helper=None, time_window=180, max_nr_models=100, delete
"AGGREGATOR({}): Deleted model update {} from storage.".format(self.name, model_id))
self.model_updates.task_done()
except Exception as e:
logger.info(
logger.error(
"AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e))
self.model_updates.task_done()

data['nr_aggregated_models'] = nr_aggregated_models

logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models),
log_level=fedn.Status.INFO)
logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models))
return model, data
7 changes: 5 additions & 2 deletions fedn/fedn/network/combiner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ def request_model_update(self, config, clients=[]):
# The request to be added to the client queue
request = fedn.ModelUpdateRequest()
request.model_id = config['model_id']
request.correlation_id = str(uuid.uuid4()) # Obesolete?
request.correlation_id = str(uuid.uuid4())
request.timestamp = str(datetime.now())
request.data = json.dumps(config)

if len(clients) == 0:
clients = self.get_active_trainers()

for client in clients:
request.receiver.name = client.name
request.receiver.name = client
request.receiver.role = fedn.WORKER
self._put_request_to_client_queue(request, fedn.Channel.MODEL_UPDATE_REQUESTS)

Expand Down Expand Up @@ -592,6 +592,9 @@ def ModelUpdateRequestStream(self, response, context):
yield q.get(timeout=1.0)
except queue.Empty:
pass
except Exception as e:
logger.error("Error in ModelUpdateRequestStream: {}".format(e))
break

self.tracer.update_client_status(client.name, "offline")

Expand Down
13 changes: 6 additions & 7 deletions fedn/fedn/network/controller/controlbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,13 @@ def get_participating_combiners(self, combiner_round_config):
nr_active_clients = len(combiner.list_active_clients())
except CombinerUnavailableError:
self._handle_unavailable_combiner(combiner)
combiner_state = None
continue

if combiner_state is not None:
is_participating = self.evaluate_round_participation_policy(
combiner_round_config, nr_active_clients
)
if is_participating:
combiners.append((combiner, combiner_round_config))
is_participating = self.evaluate_round_participation_policy(
combiner_round_config, nr_active_clients
)
if is_participating:
combiners.append((combiner, combiner_round_config))
return combiners

def evaluate_round_participation_policy(
Expand Down

0 comments on commit 68cdeda

Please sign in to comment.