From 942e1e083c2120f7b7f35828d53c6bca65c95563 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Wed, 16 Aug 2023 07:18:22 +0200 Subject: [PATCH 01/20] work in progress --- fedn/fedn/network/controller/control.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index 68e4623e1..d7e26f9c0 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -118,7 +118,7 @@ def session(self, config): self._state = ReducerState.idle def round(self, session_config, round_id): - """ Execute a single global round. + """ Execute one global round. :param session_config: The session config. :type session_config: dict From d4734d921671a37ab65c127058ada0d63e4b22ce Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Fri, 18 Aug 2023 14:08:01 +0200 Subject: [PATCH 02/20] Refactor of controller --- fedn/fedn/common/tracer/mongotracer.py | 24 ++- fedn/fedn/network/controller/control.py | 179 ++++++++++++-------- fedn/fedn/network/controller/controlbase.py | 20 ++- 3 files changed, 143 insertions(+), 80 deletions(-) diff --git a/fedn/fedn/common/tracer/mongotracer.py b/fedn/fedn/common/tracer/mongotracer.py index 92af569ea..08aa50b56 100644 --- a/fedn/fedn/common/tracer/mongotracer.py +++ b/fedn/fedn/common/tracer/mongotracer.py @@ -57,11 +57,9 @@ def new_session(self, id=None): data = {'session_id': str(id)} self.sessions.insert_one(data) - def new_round(self, id): + def new_round(self, round_data): """ Create a new session. """ - - data = {'round_id': str(id)} - self.rounds.insert_one(data) + self.rounds.insert_one(round_data) def set_session_config(self, id, config): self.sessions.update_one({'session_id': str(id)}, { @@ -75,10 +73,26 @@ def set_round_combiner_data(self, data): self.rounds.update_one({'round_id': str(data['round_id'])}, { '$push': {'combiners': data}}, True) + def set_round_config(self, round_id, round_config): + """ + + :param round_meta: + """ + self.rounds.update_one({'round_id': round_id}, { + '$set': {'round_config': round_config}}, True) + + def set_round_status(self, round_id, round_status): + """ + + :param round_meta: + """ + self.rounds.update_one({'round_id': round_id}, { + '$set': {'status': round_status}}, True) + def set_round_data(self, round_data): """ :param round_meta: """ self.rounds.update_one({'round_id': str(round_data['round_id'])}, { - '$push': {'reducer': round_data}}, True) + '$set': {'round_data': round_data}}, True) diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index d7e26f9c0..f25c50ef8 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -105,14 +105,14 @@ def session(self, config): current_round = round try: - _, round_data = self.round(config, current_round) + _, round_data = self.round(config, str(current_round)) except TypeError as e: print("Could not unpack data from round: {0}".format(e), flush=True) print("CONTROL: Round completed with status {}".format( round_data['status']), flush=True) - self.tracer.set_round_data(round_data) + # self.tracer.set_round_data(round_data) # TODO: Report completion of session self._state = ReducerState.idle @@ -123,15 +123,17 @@ def round(self, session_config, round_id): :param session_config: The session config. :type session_config: dict :param round_id: The round id. - :type round_id: str(int) + :type round_id: str """ - round_data = {'round_id': round_id} + round_data = {'round_id': round_id, 'status': "Pending"} + round = self.new_round(round_data) if len(self.network.get_combiners()) < 1: print("REDUCER: No combiners connected!", flush=True) - round_data['status'] = 'Failed' + self.set_round_status(round_id, 'Failed') + # self.update_round(round_data) return None, round_data # 1. Assemble round config for this global round, @@ -144,70 +146,96 @@ def round(self, session_config, round_id): round_config['model_id'] = self.get_latest_model() round_config['helper_type'] = self.statestore.get_helper() - combiners = self.get_participating_combiners(round_config) - round_start = self.evaluate_round_start_policy(combiners) + # Get combiners that are able to participate in round, given round_config + participating_combiners = self.get_participating_combiners(round_config) + + # Check if the policy to start the round is met + round_start = self.evaluate_round_start_policy(participating_combiners) if round_start: print("CONTROL: round start policy met, participating combiners {}".format( - combiners), flush=True) + participating_combiners), flush=True) else: print("CONTROL: Round start policy not met, skipping round!", flush=True) - round_data['status'] = 'Failed' - return None + self.set_round_status(round_id, 'Failed') + return None, round_data - round_data['round_config'] = round_config + # round_data['round_config'] = round_config + self.set_round_config(round_id, round_config) # 2. Ask participating combiners to coordinate model updates - _ = self.request_model_updates(combiners) + combiner_response = self.request_model_updates(participating_combiners) + # TODO: Check response - # Wait until participating combiners have produced an updated global model. + # Wait for the round to timeout wait = 0.0 - # dict to store combiners that have successfully produced an updated model - updated = {} - # wait until all combiners have produced an updated model or until round timeout - print("CONTROL: Fetching round config (ID: {round_id}) from statestore:".format( - round_id=round_id), flush=True) - while len(updated) < len(combiners): + while wait < session_config['round_timeout']: round = self.statestore.get_round(round_id) - if round: - print("CONTROL: Round found!", flush=True) - # For each combiner in the round, check if it has produced an updated model (status == 'Success') - for combiner in round['combiners']: - print(combiner, flush=True) - if combiner['status'] == 'Success': - if combiner['name'] not in updated.keys(): - # Add combiner to updated dict - updated[combiner['name']] = combiner['model_id'] - # Print combiner status - print("CONTROL: Combiner {name} status: {status}".format( - name=combiner['name'], status=combiner['status']), flush=True) - else: - # Print every 10 seconds based on value of wait - if wait % 10 == 0: - print("CONTROL: Waiting for round to complete...", flush=True) - if wait >= session_config['round_timeout']: - print("CONTROL: Round timeout! Exiting round...", flush=True) - break - # Update wait time used for timeout + print(round, flush=True) + # print("CONTROL: Round {0} with status {1}".format(round['round_id'], round['status']), flush=True) time.sleep(1.0) wait += 1.0 - round_valid = self.evaluate_round_validity_policy(updated) - if not round_valid: - print("REDUCER CONTROL: Round invalid!", flush=True) - round_data['status'] = 'Failed' - return None, round_data - - print("CONTROL: Reducing models from combiners...", flush=True) + # Wait until participating combiners have produced an updated global model. + # wait = 0.0 + # dict to store combiners that have successfully produced an updated model + # updated = {} + + # wait until all combiners have produced an updated model or until round timeout + # print("CONTROL: Fetching round config (ID: {round_id}) from statestore:".format( + # round_id=round_id), flush=True) + + # while len(updated) < len(participating_combiners): + # # Check if combiners have reported the round to the db + # round = self.statestore.get_round(round_id) + # if round: + # # For each combiner in the round, check if it has produced an updated model (status == 'Success') + # for combiner in round['combiners']: + # print("CONTROL: Combiner {0} reported round with status {1}".format(combiner['round_id'], combiner['status']), flush=True) + # if combiner['status'] == 'Success': + # if combiner['name'] not in updated.keys(): + # # Add combiner to updated dict + # updated[combiner['name']] = combiner['model_id'] + # # Print combiner status + # print("CONTROL: Combiner {name} status: {status}".format( + # name=combiner['name'], status=combiner['status']), flush=True) + # else: + # # Print every 10 seconds based on value of wait + # if wait % 10 == 0: + # print("CONTROL: Waiting for round to complete...", flush=True) + # if wait >= session_config['round_timeout']: + # print("CONTROL: Round timeout! Exiting round...", flush=True) + # break + # # Update wait time used for timeout + # time.sleep(1.0) + # wait += 1.0 + + #round = self.statestore.get_round(round_id) + #model_updates = [] + # for combiner in round['combiners']: + # try: + # model_updates.add(combiner['model_id']) + # except: + # pass + + # round_valid = self.evaluate_round_validity_policy(updated) + # if not round_valid: + # print("REDUCER CONTROL: Round invalid!", flush=True) + # round_data['status'] = 'Failed' + # self.update_round(round_data) + # return None, round_data + + print("CONTROL: Reducing combiner level models...", flush=True) # 3. Reduce combiner models into a global model try: - model, data = self.reduce(updated) + round = self.statestore.get_round(round_id) + model, data = self.reduce(round['combiners']) round_data['reduce'] = data print("CONTROL: Done reducing models from combiners!", flush=True) except Exception as e: print("CONTROL: Failed to reduce models from combiners: {}".format( e), flush=True) - round_data['status'] = 'Failed' + self.set_round_status('Failed') return None, round_data # 6. Commit the global model to model trail @@ -221,32 +249,34 @@ def round(self, session_config, round_id): else: print("REDUCER: failed to update model in round with config {}".format( session_config), flush=True) - round_data['status'] = 'Failed' + self.set_round_status('Failed') return None, round_data - round_data['status'] = 'Success' + # round_data['status'] = 'Success' # 4. Trigger participating combiner nodes to execute a validation round for the current model - validate = session_config['validate'] - if validate: - combiner_config = copy.deepcopy(session_config) - combiner_config['round_id'] = round_id - combiner_config['model_id'] = self.get_latest_model() - combiner_config['task'] = 'validation' - combiner_config['helper_type'] = self.statestore.get_helper() - - validating_combiners = self._select_participating_combiners( - combiner_config) - - for combiner, combiner_config in validating_combiners: - try: - print("CONTROL: Submitting validation round to combiner {}".format( - combiner), flush=True) - combiner.submit(combiner_config) - except CombinerUnavailableError: - self._handle_unavailable_combiner(combiner) - pass - + # validate = session_config['validate'] + # if validate: + # combiner_config = copy.deepcopy(session_config) + # combiner_config['round_id'] = round_id + # combiner_config['model_id'] = self.get_latest_model() + # combiner_config['task'] = 'validation' + # combiner_config['helper_type'] = self.statestore.get_helper() + # + # validating_combiners = self._select_participating_combiners( + # combiner_config) + # + # for combiner, combiner_config in validating_combiners: + # try: + # print("CONTROL: Submitting validation round to combiner {}".format( + # combiner), flush=True) + # combiner.submit(combiner_config) + # except CombinerUnavailableError: + # self._handle_unavailable_combiner(combiner) + # pass + + #model_id = 'fsfsdfdsfdsfd' + self.set_round_status(round_id, 'Finished') return model_id, round_data def reduce(self, combiners): @@ -268,15 +298,16 @@ def reduce(self, combiners): print("REDUCER: No combiners to reduce!", flush=True) return model, meta - for name, model_id in combiners.items(): - + for combiner in combiners: + name = combiner['name'] + model_id = combiner['model_id'] # TODO: Handle inactive RPC error in get_model and raise specific error print("REDUCER: Fetching model ({model_id}) from combiner {name}".format( model_id=model_id, name=name), flush=True) try: tic = time.time() - combiner = self.get_combiner(name) - data = combiner.get_model(model_id) + combiner_interface = self.get_combiner(name) + data = combiner_interface.get_model(model_id) meta['time_fetch_model'] += (time.time() - tic) except Exception as e: print("REDUCER: Failed to fetch model from combiner {}: {}".format( diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index 471e47a6a..70a75cac6 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -181,8 +181,26 @@ def new_session(self, config): self.tracer.new_session(id=session_id) self.tracer.set_session_config(session_id, config) + def new_round(self, round_data): + """Initialize a new round in backend db. """ + + self.tracer.new_round(round_data) + + def update_round(self, round_data): + """ Upate round in backend db. """ + self.tracer.set_round_data(round_data) + + def set_round_status(self, round_id, status): + """ Upate round in backend db. """ + self.tracer.set_round_status(round_id, status) + + def set_round_config(self, round_id, round_config): + """ Upate round in backend db. """ + self.tracer.set_round_config(round_id, round_config) + def request_model_updates(self, combiners): - """Call Combiner server RPC to get a model update. """ + """Ask Combiner server to produce a model update. """ + cl = [] for combiner, combiner_round_config in combiners: response = combiner.submit(combiner_round_config) From 3775021f0fae9165f2d3b358df6ffaafa1fb536e Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Sat, 19 Aug 2023 10:09:02 +0200 Subject: [PATCH 03/20] Refactor of controller --- fedn/fedn/network/controller/control.py | 52 ++++++++------------- fedn/fedn/network/controller/controlbase.py | 15 ++++-- 2 files changed, 30 insertions(+), 37 deletions(-) diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index f25c50ef8..d93f35f29 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -83,7 +83,7 @@ def session(self, config): self._state = ReducerState.instructing - # Must be called to set info in the db + # Must be called once to set info in the db self.new_session(config) self._state = ReducerState.monitoring @@ -112,8 +112,6 @@ def session(self, config): print("CONTROL: Round completed with status {}".format( round_data['status']), flush=True) - # self.tracer.set_round_data(round_data) - # TODO: Report completion of session self._state = ReducerState.idle @@ -128,17 +126,14 @@ def round(self, session_config, round_id): """ round_data = {'round_id': round_id, 'status': "Pending"} - round = self.new_round(round_data) + self.new_round(round_data) if len(self.network.get_combiners()) < 1: - print("REDUCER: No combiners connected!", flush=True) + print("CONTROLLER: Round cannot start, no combiners connected!", flush=True) self.set_round_status(round_id, 'Failed') - # self.update_round(round_data) return None, round_data - # 1. Assemble round config for this global round, - # and check which combiners are able to participate - # in the round. + # Assemble round config for this global round round_config = copy.deepcopy(session_config) round_config['rounds'] = 1 round_config['round_id'] = round_id @@ -146,6 +141,8 @@ def round(self, session_config, round_id): round_config['model_id'] = self.get_latest_model() round_config['helper_type'] = self.statestore.get_helper() + self.set_round_config(round_id, round_config) + # Get combiners that are able to participate in round, given round_config participating_combiners = self.get_participating_combiners(round_config) @@ -160,19 +157,18 @@ def round(self, session_config, round_id): self.set_round_status(round_id, 'Failed') return None, round_data - # round_data['round_config'] = round_config - self.set_round_config(round_id, round_config) - # 2. Ask participating combiners to coordinate model updates combiner_response = self.request_model_updates(participating_combiners) # TODO: Check response - # Wait for the round to timeout + # Wait until participating combiners have produced an updated global model, + # or round times out. wait = 0.0 while wait < session_config['round_timeout']: round = self.statestore.get_round(round_id) - print(round, flush=True) - # print("CONTROL: Round {0} with status {1}".format(round['round_id'], round['status']), flush=True) + # TODO: Early exit if all combiners are Finished. + if wait % 10 == 0: + print("CONTROL: Waiting for combiners to update models...", flush=True) time.sleep(1.0) wait += 1.0 @@ -209,21 +205,12 @@ def round(self, session_config, round_id): # # Update wait time used for timeout # time.sleep(1.0) # wait += 1.0 - - #round = self.statestore.get_round(round_id) - #model_updates = [] - # for combiner in round['combiners']: - # try: - # model_updates.add(combiner['model_id']) - # except: - # pass - - # round_valid = self.evaluate_round_validity_policy(updated) - # if not round_valid: - # print("REDUCER CONTROL: Round invalid!", flush=True) - # round_data['status'] = 'Failed' - # self.update_round(round_data) - # return None, round_data + round = self.statestore.get_round(round_id) + round_valid = self.evaluate_round_validity_policy(round) + if not round_valid: + print("REDUCER CONTROL: Round invalid!", flush=True) + self.set_round_status(round_id, 'Failed') + return None, round_data print("CONTROL: Reducing combiner level models...", flush=True) # 3. Reduce combiner models into a global model @@ -235,7 +222,7 @@ def round(self, session_config, round_id): except Exception as e: print("CONTROL: Failed to reduce models from combiners: {}".format( e), flush=True) - self.set_round_status('Failed') + self.set_round_status(round_id, 'Failed') return None, round_data # 6. Commit the global model to model trail @@ -249,7 +236,7 @@ def round(self, session_config, round_id): else: print("REDUCER: failed to update model in round with config {}".format( session_config), flush=True) - self.set_round_status('Failed') + self.set_round_status(round_id, 'Failed') return None, round_data # round_data['status'] = 'Success' @@ -275,7 +262,6 @@ def round(self, session_config, round_id): # self._handle_unavailable_combiner(combiner) # pass - #model_id = 'fsfsdfdsfdsfd' self.set_round_status(round_id, 'Finished') return model_id, round_data diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index 70a75cac6..eb3989d03 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -277,7 +277,7 @@ def evaluate_round_start_policy(self, combiners): else: return False - def evaluate_round_validity_policy(self, combiners): + def evaluate_round_validity_policy(self, round): """ Check if the round should be seen as valid. At the end of the round, before committing a model to the global model trail, @@ -285,10 +285,17 @@ def evaluate_round_validity_policy(self, combiners): e.g. asserting that a certain number of combiners have reported in an updated model, or that criteria on model performance have been met. """ - if combiners.keys() == []: + model_ids = [] + for combiner in round['combiners']: + try: + model_ids.append(combiner['model_id']) + except KeyError: + pass + + if len(model_ids) == 0: return False - else: - return True + + return True def _select_participating_combiners(self, compute_plan): participating_combiners = [] From d1c0474a680d7292f3a0bf683fb81bf41998414b Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Mon, 21 Aug 2023 14:53:36 +0200 Subject: [PATCH 04/20] Refactor polling in control --- fedn/fedn/common/tracer/mongotracer.py | 4 +- fedn/fedn/network/controller/control.py | 154 ++++++++++---------- fedn/fedn/network/controller/controlbase.py | 22 +-- 3 files changed, 81 insertions(+), 99 deletions(-) diff --git a/fedn/fedn/common/tracer/mongotracer.py b/fedn/fedn/common/tracer/mongotracer.py index 08aa50b56..57cfd07c8 100644 --- a/fedn/fedn/common/tracer/mongotracer.py +++ b/fedn/fedn/common/tracer/mongotracer.py @@ -89,10 +89,10 @@ def set_round_status(self, round_id, round_status): self.rounds.update_one({'round_id': round_id}, { '$set': {'status': round_status}}, True) - def set_round_data(self, round_data): + def set_round_data(self, round_id, round_data): """ :param round_meta: """ - self.rounds.update_one({'round_id': str(round_data['round_id'])}, { + self.rounds.update_one({'round_id': round_id}, { '$set': {'round_data': round_data}}, True) diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index d93f35f29..de2f89245 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -2,6 +2,9 @@ import time import uuid +from tenacity import (retry, retry_if_exception_type, stop_after_delay, + wait_random) + from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.controller.controlbase import ControlBase from fedn.network.state import ReducerState @@ -49,6 +52,20 @@ def __init__(self, message): super().__init__(self.message) +class CombinersNotDoneException(Exception): + """ Exception class for when model is None """ + + def __init__(self, message): + """ Constructor method. + + :param message: The exception message. + :type message: str + + """ + self.message = message + super().__init__(self.message) + + class Control(ControlBase): """ Controller, implementing the overall global training, validation and inference logic. """ @@ -97,8 +114,6 @@ def session(self, config): # Execute the rounds in this session for round in range(1, int(config['rounds'] + 1)): # Increment the round number - - # round_id = self.new_round(session['session_id']) if last_round: current_round = last_round + round else: @@ -125,13 +140,12 @@ def round(self, session_config, round_id): """ - round_data = {'round_id': round_id, 'status': "Pending"} - self.new_round(round_data) + self.new_round({'round_id': round_id, 'status': "Pending"}) if len(self.network.get_combiners()) < 1: print("CONTROLLER: Round cannot start, no combiners connected!", flush=True) self.set_round_status(round_id, 'Failed') - return None, round_data + return None, self.statestore.get_round(round_id) # Assemble round config for this global round round_config = copy.deepcopy(session_config) @@ -150,70 +164,53 @@ def round(self, session_config, round_id): round_start = self.evaluate_round_start_policy(participating_combiners) if round_start: - print("CONTROL: round start policy met, participating combiners {}".format( - participating_combiners), flush=True) + print("CONTROL: round start policy met, {} participating combiners.".format( + len(participating_combiners)), flush=True) else: print("CONTROL: Round start policy not met, skipping round!", flush=True) self.set_round_status(round_id, 'Failed') - return None, round_data + return None, self.statestore.get_round(round_id) - # 2. Ask participating combiners to coordinate model updates - combiner_response = self.request_model_updates(participating_combiners) + # Ask participating combiners to coordinate model updates + _ = self.request_model_updates(participating_combiners) # TODO: Check response # Wait until participating combiners have produced an updated global model, # or round times out. - wait = 0.0 - while wait < session_config['round_timeout']: + + def do_if_round_times_out(result): + print("CONTROL: Round timed out!", flush=True) + + @retry(wait=wait_random(min=1.0, max=2.0), + stop=stop_after_delay(session_config['round_timeout']), + retry_error_callback=do_if_round_times_out, + retry=retry_if_exception_type(CombinersNotDoneException)) + def combiners_done(): + round = self.statestore.get_round(round_id) - # TODO: Early exit if all combiners are Finished. - if wait % 10 == 0: - print("CONTROL: Waiting for combiners to update models...", flush=True) - time.sleep(1.0) - wait += 1.0 - - # Wait until participating combiners have produced an updated global model. - # wait = 0.0 - # dict to store combiners that have successfully produced an updated model - # updated = {} - - # wait until all combiners have produced an updated model or until round timeout - # print("CONTROL: Fetching round config (ID: {round_id}) from statestore:".format( - # round_id=round_id), flush=True) - - # while len(updated) < len(participating_combiners): - # # Check if combiners have reported the round to the db - # round = self.statestore.get_round(round_id) - # if round: - # # For each combiner in the round, check if it has produced an updated model (status == 'Success') - # for combiner in round['combiners']: - # print("CONTROL: Combiner {0} reported round with status {1}".format(combiner['round_id'], combiner['status']), flush=True) - # if combiner['status'] == 'Success': - # if combiner['name'] not in updated.keys(): - # # Add combiner to updated dict - # updated[combiner['name']] = combiner['model_id'] - # # Print combiner status - # print("CONTROL: Combiner {name} status: {status}".format( - # name=combiner['name'], status=combiner['status']), flush=True) - # else: - # # Print every 10 seconds based on value of wait - # if wait % 10 == 0: - # print("CONTROL: Waiting for round to complete...", flush=True) - # if wait >= session_config['round_timeout']: - # print("CONTROL: Round timeout! Exiting round...", flush=True) - # break - # # Update wait time used for timeout - # time.sleep(1.0) - # wait += 1.0 + if 'combiners' not in round: + # TODO: use logger + print("CONTROL: Waiting for combiners to update model...", flush=True) + raise CombinersNotDoneException("Combiners have not yet reported.") + + if len(round['combiners']) < len(participating_combiners): + print("CONTROL: Waiting for combiners to update model...", flush=True) + raise CombinersNotDoneException("All combiners have not yet reported.") + + return True + + combiners_done() + round = self.statestore.get_round(round_id) round_valid = self.evaluate_round_validity_policy(round) if not round_valid: print("REDUCER CONTROL: Round invalid!", flush=True) self.set_round_status(round_id, 'Failed') - return None, round_data + return None, self.statestore.get_round(round_id) print("CONTROL: Reducing combiner level models...", flush=True) - # 3. Reduce combiner models into a global model + # Reduce combiner models into a new global model + round_data = {} try: round = self.statestore.get_round(round_id) model, data = self.reduce(round['combiners']) @@ -223,9 +220,9 @@ def round(self, session_config, round_id): print("CONTROL: Failed to reduce models from combiners: {}".format( e), flush=True) self.set_round_status(round_id, 'Failed') - return None, round_data + return None, self.statestore.get_round(round_id) - # 6. Commit the global model to model trail + # Commit the new global model to the model trail if model is not None: print("CONTROL: Committing global model to model trail...", flush=True) tic = time.time() @@ -237,33 +234,34 @@ def round(self, session_config, round_id): print("REDUCER: failed to update model in round with config {}".format( session_config), flush=True) self.set_round_status(round_id, 'Failed') - return None, round_data + return None, self.statestore.get_round(round_id) # round_data['status'] = 'Success' - # 4. Trigger participating combiner nodes to execute a validation round for the current model - # validate = session_config['validate'] - # if validate: - # combiner_config = copy.deepcopy(session_config) - # combiner_config['round_id'] = round_id - # combiner_config['model_id'] = self.get_latest_model() - # combiner_config['task'] = 'validation' - # combiner_config['helper_type'] = self.statestore.get_helper() - # - # validating_combiners = self._select_participating_combiners( - # combiner_config) - # - # for combiner, combiner_config in validating_combiners: - # try: - # print("CONTROL: Submitting validation round to combiner {}".format( - # combiner), flush=True) - # combiner.submit(combiner_config) - # except CombinerUnavailableError: - # self._handle_unavailable_combiner(combiner) - # pass + # Ask combiners to validate the new global model + validate = session_config['validate'] + if validate: + combiner_config = copy.deepcopy(session_config) + combiner_config['round_id'] = round_id + combiner_config['model_id'] = self.get_latest_model() + combiner_config['task'] = 'validation' + combiner_config['helper_type'] = self.statestore.get_helper() + + validating_combiners = self.get_participating_combiners( + combiner_config) + for combiner, combiner_config in validating_combiners: + try: + print("CONTROL: Submitting validation round to combiner {}".format( + combiner), flush=True) + combiner.submit(combiner_config) + except CombinerUnavailableError: + self._handle_unavailable_combiner(combiner) + pass + + self.set_round_data(round_id, round_data) self.set_round_status(round_id, 'Finished') - return model_id, round_data + return model_id, self.statestore.get_round(round_id) def reduce(self, combiners): """ Combine updated models from Combiner nodes into one global model. @@ -368,7 +366,7 @@ def inference_round(self, config): combiner_config['helper_type'] = self.statestore.get_framework() # Select combiners - validating_combiners = self._select_round_combiners( + validating_combiners = self.get_participating_combiners( combiner_config) # Test round start policy diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index eb3989d03..9288d52b5 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -186,9 +186,9 @@ def new_round(self, round_data): self.tracer.new_round(round_data) - def update_round(self, round_data): + def set_round_data(self, round_id, round_data): """ Upate round in backend db. """ - self.tracer.set_round_data(round_data) + self.tracer.set_round_data(round_id, round_data) def set_round_status(self, round_id, status): """ Upate round in backend db. """ @@ -278,7 +278,7 @@ def evaluate_round_start_policy(self, combiners): return False def evaluate_round_validity_policy(self, round): - """ Check if the round should be seen as valid. + """ Check if the round is valid. At the end of the round, before committing a model to the global model trail, we check if the round validity policy has been met. This can involve @@ -297,22 +297,6 @@ def evaluate_round_validity_policy(self, round): return True - def _select_participating_combiners(self, compute_plan): - participating_combiners = [] - for combiner in self.network.get_combiners(): - try: - combiner_state = combiner.report() - except CombinerUnavailableError: - self._handle_unavailable_combiner(combiner) - combiner_state = None - - if combiner_state: - is_participating = self.evaluate_round_participation_policy( - compute_plan, combiner_state) - if is_participating: - participating_combiners.append((combiner, compute_plan)) - return participating_combiners - def state(self): """ From 6fe98b231b1be480a779acce7b7c1aa16b4eb4ea Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Mon, 21 Aug 2023 16:03:13 +0200 Subject: [PATCH 05/20] Refactor polling in control --- fedn/fedn/network/controller/control.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index de2f89245..cbe754935 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -177,7 +177,6 @@ def round(self, session_config, round_id): # Wait until participating combiners have produced an updated global model, # or round times out. - def do_if_round_times_out(result): print("CONTROL: Round timed out!", flush=True) @@ -201,6 +200,17 @@ def combiners_done(): combiners_done() + # Due to the distributed nature, combiners may report late to db, + # so we need some robustness here + @retry(wait=wait_random(min=0.1, max=1.0), + retry=retry_if_exception_type(KeyError)) + def check_combiners_done_reporting(): + round = self.statestore.get_round() + combiners = round['combiners'] + return combiners + + _ = check_combiners_done_reporting() + round = self.statestore.get_round(round_id) round_valid = self.evaluate_round_validity_policy(round) if not round_valid: From 3fd0b29f9ba7edb44bb524f017f3203dab963453 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Tue, 22 Aug 2023 15:33:31 +0200 Subject: [PATCH 06/20] Refactor polling in control --- fedn/fedn/network/controller/control.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index cbe754935..30967d457 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -200,12 +200,13 @@ def combiners_done(): combiners_done() - # Due to the distributed nature, combiners may report late to db, - # so we need some robustness here + # Due to the distributed nature of the computation, there might be a + # delay before combiners have reported the round data to the db, + # so we need some robustness here. @retry(wait=wait_random(min=0.1, max=1.0), retry=retry_if_exception_type(KeyError)) def check_combiners_done_reporting(): - round = self.statestore.get_round() + round = self.statestore.get_round(round_id) combiners = round['combiners'] return combiners @@ -246,8 +247,6 @@ def check_combiners_done_reporting(): self.set_round_status(round_id, 'Failed') return None, self.statestore.get_round(round_id) - # round_data['status'] = 'Success' - # Ask combiners to validate the new global model validate = session_config['validate'] if validate: From c5ef76c6dfce55b22b644aa55a76e1de32391166 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Wed, 30 Aug 2023 12:50:54 +0200 Subject: [PATCH 07/20] Functioning --- fedn/fedn/network/controller/control.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index 3f17f7614..88b95f44a 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -97,13 +97,8 @@ def session(self, config): return self._state = ReducerState.instructing - -<<<<<<< HEAD - # Must be called once to set info in the db -======= # Must be called to set info in the db config['committed_at'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") ->>>>>>> develop self.new_session(config) if not self.statestore.get_latest_model(): @@ -119,10 +114,6 @@ def session(self, config): # Execute the rounds in this session for round in range(1, int(config['rounds'] + 1)): # Increment the round number -<<<<<<< HEAD -======= - ->>>>>>> develop if last_round: current_round = last_round + round else: @@ -145,12 +136,8 @@ def round(self, session_config, round_id): :param session_config: The session config. :type session_config: dict :param round_id: The round id. -<<<<<<< HEAD :type round_id: str -======= - :type round_id: str(int) ->>>>>>> develop """ self.new_round({'round_id': round_id, 'status': "Pending"}) From e7894bc5109e220fb96291de4654a799f2d1c684 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Mon, 25 Sep 2023 09:02:32 +0200 Subject: [PATCH 08/20] start on new simulation example --- examples/simulation/client/entrypoint | 58 +++++++++++++++++++++++++++ examples/simulation/client/fedn.yaml | 5 +++ examples/simulation/run_clients.py | 16 ++++++++ 3 files changed, 79 insertions(+) create mode 100644 examples/simulation/client/entrypoint create mode 100644 examples/simulation/client/fedn.yaml create mode 100644 examples/simulation/run_clients.py diff --git a/examples/simulation/client/entrypoint b/examples/simulation/client/entrypoint new file mode 100644 index 000000000..3f660fac1 --- /dev/null +++ b/examples/simulation/client/entrypoint @@ -0,0 +1,58 @@ +#!./.simulation/bin/python +import collections +import math +import os +import random + +import fire +import numpy + +from fedn.utils.helpers import get_helper, save_metadata, save_metrics + +HELPER_MODULE = 'numpyarrayhelper' +N_SAMPLES = 1000 +DELAY = 10 + + +def init_seed(out_path='seed.npz'): + """ Initialize seed model. + + :param out_path: The path to save the seed model to. + :type out_path: str + """ + # Init and save + model = numpy.ones(size=(1, N_SAMPLES)) + helper = get_helper(HELPER_MODULE) + helper.save(model, out_path) + + +def train(in_model_path, out_model_path): + """ Train model. + + :param in_model_path: The path to the input model. + :type in_model_path: str + :param out_model_path: The path to save the output model to. + :type out_model_path: str + """ + + model = numpy.ones(size=(1, N_SAMPLES)) + time.sleep(DELAY+DELAY*random.random()) + + # Metadata needed for aggregation server side + metadata = { + 'num_examples': N_SAMPLES, + } + + # Save JSON metadata file + save_metadata(metadata, out_model_path) + + # Save model update + helper = get_helper(HELPER_MODULE) + helper.save(model, out_model_path) + + +if __name__ == '__main__': + fire.Fire({ + 'init_seed': init_seed, + 'train': train, + }) diff --git a/examples/simulation/client/fedn.yaml b/examples/simulation/client/fedn.yaml new file mode 100644 index 000000000..29c475270 --- /dev/null +++ b/examples/simulation/client/fedn.yaml @@ -0,0 +1,5 @@ +entry_points: + train: + command: /venv/bin/python entrypoint train $ENTRYPOINT_OPTS + validate: + command: /venv/bin/python entrypoint validate $ENTRYPOINT_OPTS \ No newline at end of file diff --git a/examples/simulation/run_clients.py b/examples/simulation/run_clients.py new file mode 100644 index 000000000..42fc5dc7c --- /dev/null +++ b/examples/simulation/run_clients.py @@ -0,0 +1,16 @@ +#!./.simulation/bin/python +import os + +from fedn.network.clients import Client + +API_SERVER = 'localhost:8092' + + +config = {'remote_compute_context': False, + 'validator': False, + 'trainer': True, + 'init': 'client.yaml', + } + +client = Client(config) +client.run() From 78474861b8ffe544040477447c6a4628fbef8cce Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Wed, 18 Oct 2023 00:00:54 +0200 Subject: [PATCH 09/20] update --- examples/mnist-keras/bin/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/mnist-keras/bin/build.sh b/examples/mnist-keras/bin/build.sh index 18cdb5128..44eda61df 100755 --- a/examples/mnist-keras/bin/build.sh +++ b/examples/mnist-keras/bin/build.sh @@ -5,4 +5,4 @@ set -e client/entrypoint init_seed # Make compute package -tar -czvf package.tgz client \ No newline at end of file +tar -czvf package.tgz client From a8ffd6c657bef26a2d67325b276e25fb91881f20 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Thu, 19 Oct 2023 18:41:41 +0200 Subject: [PATCH 10/20] Updated test --- .ci/tests/examples/wait_for.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci/tests/examples/wait_for.py b/.ci/tests/examples/wait_for.py index 7fa75506d..99e95429c 100644 --- a/.ci/tests/examples/wait_for.py +++ b/.ci/tests/examples/wait_for.py @@ -30,7 +30,7 @@ def _test_rounds(n_rounds): client = pymongo.MongoClient( "mongodb://fedn_admin:password@localhost:6534") collection = client['fedn-network']['control']['rounds'] - query = {'reducer.status': 'Success'} + query = {'status': 'Finished'} n = collection.count_documents(query) client.close() _eprint(f'Succeded rounds: {n}.') From f70e1d35cff433b3bf77f976f0c54e81d3f8c139 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Thu, 19 Oct 2023 18:46:05 +0200 Subject: [PATCH 11/20] Fix typos --- .ci/tests/examples/wait_for.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.ci/tests/examples/wait_for.py b/.ci/tests/examples/wait_for.py index 99e95429c..49d8bb6ae 100644 --- a/.ci/tests/examples/wait_for.py +++ b/.ci/tests/examples/wait_for.py @@ -18,7 +18,7 @@ def _retry(try_func, **func_args): for _ in range(RETRIES): is_success = try_func(**func_args) if is_success: - _eprint('Sucess.') + _eprint('Success.') return True _eprint(f'Sleeping for {SLEEP}.') sleep(SLEEP) @@ -42,7 +42,7 @@ def _test_nodes(n_nodes, node_type, reducer_host='localhost', reducer_port='8090 resp = requests.get( f'http://{reducer_host}:{reducer_port}/netgraph', verify=False) except Exception as e: - _eprint(f'Reques exception econuntered: {e}.') + _eprint(f'Reques exception enconuntered: {e}.') return False if resp.status_code == 200: gr = json.loads(resp.content) @@ -50,7 +50,7 @@ def _test_nodes(n_nodes, node_type, reducer_host='localhost', reducer_port='8090 'status') == 'active' for values in gr['nodes']) _eprint(f'Active {node_type}s: {n}.') return n == n_nodes - _eprint(f'Reducer returned {resp.status_code}.') + _eprint(f'Controller returned {resp.status_code}.') return False From 4b7c28eead2e55e8b371801d1864e9e8c4151130 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Thu, 19 Oct 2023 18:49:20 +0200 Subject: [PATCH 12/20] Removed accidentally committed files --- examples/simulation/client/entrypoint | 58 --------------------------- examples/simulation/client/fedn.yaml | 5 --- examples/simulation/run_clients.py | 16 -------- 3 files changed, 79 deletions(-) delete mode 100644 examples/simulation/client/entrypoint delete mode 100644 examples/simulation/client/fedn.yaml delete mode 100644 examples/simulation/run_clients.py diff --git a/examples/simulation/client/entrypoint b/examples/simulation/client/entrypoint deleted file mode 100644 index 3f660fac1..000000000 --- a/examples/simulation/client/entrypoint +++ /dev/null @@ -1,58 +0,0 @@ -#!./.simulation/bin/python -import collections -import math -import os -import random - -import fire -import numpy - -from fedn.utils.helpers import get_helper, save_metadata, save_metrics - -HELPER_MODULE = 'numpyarrayhelper' -N_SAMPLES = 1000 -DELAY = 10 - - -def init_seed(out_path='seed.npz'): - """ Initialize seed model. - - :param out_path: The path to save the seed model to. - :type out_path: str - """ - # Init and save - model = numpy.ones(size=(1, N_SAMPLES)) - helper = get_helper(HELPER_MODULE) - helper.save(model, out_path) - - -def train(in_model_path, out_model_path): - """ Train model. - - :param in_model_path: The path to the input model. - :type in_model_path: str - :param out_model_path: The path to save the output model to. - :type out_model_path: str - """ - - model = numpy.ones(size=(1, N_SAMPLES)) - time.sleep(DELAY+DELAY*random.random()) - - # Metadata needed for aggregation server side - metadata = { - 'num_examples': N_SAMPLES, - } - - # Save JSON metadata file - save_metadata(metadata, out_model_path) - - # Save model update - helper = get_helper(HELPER_MODULE) - helper.save(model, out_model_path) - - -if __name__ == '__main__': - fire.Fire({ - 'init_seed': init_seed, - 'train': train, - }) diff --git a/examples/simulation/client/fedn.yaml b/examples/simulation/client/fedn.yaml deleted file mode 100644 index 29c475270..000000000 --- a/examples/simulation/client/fedn.yaml +++ /dev/null @@ -1,5 +0,0 @@ -entry_points: - train: - command: /venv/bin/python entrypoint train $ENTRYPOINT_OPTS - validate: - command: /venv/bin/python entrypoint validate $ENTRYPOINT_OPTS \ No newline at end of file diff --git a/examples/simulation/run_clients.py b/examples/simulation/run_clients.py deleted file mode 100644 index 42fc5dc7c..000000000 --- a/examples/simulation/run_clients.py +++ /dev/null @@ -1,16 +0,0 @@ -#!./.simulation/bin/python -import os - -from fedn.network.clients import Client - -API_SERVER = 'localhost:8092' - - -config = {'remote_compute_context': False, - 'validator': False, - 'trainer': True, - 'init': 'client.yaml', - } - -client = Client(config) -client.run() From 26fe052802b35f6c4b0f4e3b55f2eb9ad44b546f Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Thu, 19 Oct 2023 19:15:02 +0200 Subject: [PATCH 13/20] update api --- fedn/fedn/network/api/interface.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index f05222e87..4d2d7d84c 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -594,7 +594,6 @@ def get_round(self, round_id): return jsonify({'success': False, 'message': 'Round not found.'}) payload = { 'round_id': round_object['round_id'], - 'reducer': round_object['reducer'], 'combiners': round_object['combiners'], } return jsonify(payload) From 06cd806fa599f7359b2ecef5e7dae3b18f59f9d4 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Mon, 30 Oct 2023 14:33:53 +0100 Subject: [PATCH 14/20] Updates after code review --- fedn/fedn/common/tracer/mongotracer.py | 21 ++++++++++++++++----- fedn/fedn/network/controller/control.py | 4 ++-- fedn/fedn/network/controller/controlbase.py | 8 ++++---- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/fedn/fedn/common/tracer/mongotracer.py b/fedn/fedn/common/tracer/mongotracer.py index 57cfd07c8..8d53f60b4 100644 --- a/fedn/fedn/common/tracer/mongotracer.py +++ b/fedn/fedn/common/tracer/mongotracer.py @@ -50,15 +50,25 @@ def drop_status(self): if self.status: self.status.drop() - def new_session(self, id=None): - """ Create a new session. """ + def create_session(self, id=None): + """ Create a new session. + + :param id: The ID of the created session. + :type id: uuid, str + + """ if not id: id = uuid.uuid4() data = {'session_id': str(id)} self.sessions.insert_one(data) - def new_round(self, round_data): - """ Create a new session. """ + def create_round(self, round_data): + """ Create a new round. + + :param round_data: Dictionary with round data. + :type round_data: dict + """ + # TODO: Add check if round_id already exists self.rounds.insert_one(round_data) def set_session_config(self, id, config): @@ -68,7 +78,8 @@ def set_session_config(self, id, config): def set_round_combiner_data(self, data): """ - :param round_meta: + :param data: The combiner data + :type data: dict """ self.rounds.update_one({'round_id': str(data['round_id'])}, { '$push': {'combiners': data}}, True) diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index 88b95f44a..3bc16b114 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -99,7 +99,7 @@ def session(self, config): self._state = ReducerState.instructing # Must be called to set info in the db config['committed_at'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - self.new_session(config) + self.create_session(config) if not self.statestore.get_latest_model(): print("No model in model chain, please provide a seed model!", flush=True) @@ -140,7 +140,7 @@ def round(self, session_config, round_id): """ - self.new_round({'round_id': round_id, 'status': "Pending"}) + self.create_round({'round_id': round_id, 'status': "Pending"}) if len(self.network.get_combiners()) < 1: print("CONTROLLER: Round cannot start, no combiners connected!", flush=True) diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index aa6b1c456..b58952837 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -180,7 +180,7 @@ def get_compute_package(self, compute_package=''): else: return None - def new_session(self, config): + def create_session(self, config): """ Initialize a new session in backend db. """ if "session_id" not in config.keys(): @@ -189,13 +189,13 @@ def new_session(self, config): else: session_id = config['session_id'] - self.tracer.new_session(id=session_id) + self.tracer.create_session(id=session_id) self.tracer.set_session_config(session_id, config) - def new_round(self, round_data): + def create_round(self, round_data): """Initialize a new round in backend db. """ - self.tracer.new_round(round_data) + self.tracer.create_round(round_data) def set_round_data(self, round_id, round_data): """ Upate round in backend db. """ From 9a21972a676b1c51a6c0042103546c83d276651c Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Thu, 2 Nov 2023 16:50:41 +0100 Subject: [PATCH 15/20] Resolved merge conflicts --- fedn/fedn/common/tracer/mongotracer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fedn/fedn/common/tracer/mongotracer.py b/fedn/fedn/common/tracer/mongotracer.py index c6d0f90bf..aa5c0810b 100644 --- a/fedn/fedn/common/tracer/mongotracer.py +++ b/fedn/fedn/common/tracer/mongotracer.py @@ -109,8 +109,6 @@ def set_round_data(self, round_id, round_data): """ self.rounds.update_one({'round_id': round_id}, { '$set': {'round_data': round_data}}, True) - self.rounds.update_one({'round_id': str(round_data['round_id'])}, { - '$push': {'reducer': round_data}}, True) def update_client_status(self, client_name, status): """ Update client status in statestore. From adca222d8533a18f7ede7b75adf482c10303d6ff Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Thu, 2 Nov 2023 16:55:52 +0100 Subject: [PATCH 16/20] Updated docstrings --- fedn/fedn/network/controller/controlbase.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index a3cbc917f..80483efd7 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -214,11 +214,23 @@ def create_round(self, round_data): self.tracer.create_round(round_data) def set_round_data(self, round_id, round_data): - """ Upate round in backend db. """ + """ Set round data. + + :param round_id: The round unique identifier + :type round_id: str + :param round_data: The status + :type status: dict + """ self.tracer.set_round_data(round_id, round_data) def set_round_status(self, round_id, status): - """ Upate round in backend db. """ + """ Set the round round stats. + + :param round_id: The round unique identifier + :type round_id: str + :param status: The status + :type status: str + """ self.tracer.set_round_status(round_id, status) def set_round_config(self, round_id, round_config): From 0bd0a5daa150fb4de7fab3831e6b3eb56101b446 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Sat, 4 Nov 2023 08:40:05 +0100 Subject: [PATCH 17/20] Fixed docstrings --- fedn/fedn/network/controller/controlbase.py | 42 ++++++++++++++++++--- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index 80483efd7..045faa9c1 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -234,11 +234,21 @@ def set_round_status(self, round_id, status): self.tracer.set_round_status(round_id, status) def set_round_config(self, round_id, round_config): - """ Upate round in backend db. """ + """ Upate round in backend db. + + :param round_id: The round unique identifier + :type round_id: str + :param round_config: The round configuration + :type round_config: dict + """ self.tracer.set_round_config(round_id, round_config) def request_model_updates(self, combiners): - """Ask Combiner server to produce a model update. """ + """Ask Combiner server to produce a model update. + + :param combiners: A list of combiners + :type combiners: tuple (combiner, comboner_round_config) + """ cl = [] for combiner, combiner_round_config in combiners: response = combiner.submit(combiner_round_config) @@ -246,7 +256,15 @@ def request_model_updates(self, combiners): return cl def commit(self, model_id, model=None, session_id=None): - """Commit a model to the global model trail. The model commited becomes the lastest consensus model.""" + """Commit a model to the global model trail. The model commited becomes the lastest consensus model. + + :param model_id: Unique identifier for the model to commit. + :type model_id: str (uuid) + :param model: The model object to commit + :type model: BytesIO + :param session_id: Unique identifier for the session + :type session_id: str + """ helper = self.get_helper() if model is not None: @@ -318,7 +336,13 @@ def evaluate_round_participation_policy( return False def evaluate_round_start_policy(self, combiners): - """Check if the policy to start a round is met.""" + """Check if the policy to start a round is met. + + :param combiners: A list of combiners + :type combiners: list + :return: True if the round policy is mer, otherwise False + :rtype: bool + """ if len(combiners) > 0: return True else: @@ -331,6 +355,11 @@ def evaluate_round_validity_policy(self, round): we check if the round validity policy has been met. This can involve e.g. asserting that a certain number of combiners have reported in an updated model, or that criteria on model performance have been met. + + :param round: The round object + :rtype round: dict + :return: True if the policy is met, otherwise False + :rtype: bool """ model_ids = [] for combiner in round['combiners']: @@ -345,8 +374,9 @@ def evaluate_round_validity_policy(self, round): return True def state(self): - """ + """ Get the current state of the controller - : return: + :return: The state + :rype: str """ return self._state From 142df57ac441ded1b4afb9f8d3f747395124e61a Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Sat, 4 Nov 2023 09:13:04 +0100 Subject: [PATCH 18/20] Fixes --- fedn/fedn/common/storage/s3/miniorepo.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fedn/fedn/common/storage/s3/miniorepo.py b/fedn/fedn/common/storage/s3/miniorepo.py index 9341704e6..154cea7e9 100644 --- a/fedn/fedn/common/storage/s3/miniorepo.py +++ b/fedn/fedn/common/storage/s3/miniorepo.py @@ -62,11 +62,13 @@ def __init__(self, config): self.create_bucket(self.bucket) def create_bucket(self, bucket_name): - """ + """ Create a new bucket. If bucket exists, do nothing. - :param bucket_name: + :param bucket_name: The name of the bucket + :type bucket_name: str """ found = self.client.bucket_exists(bucket_name) + if not found: try: self.client.make_bucket(bucket_name) From aa6ab1989605fb41f124414ec5c5587a1a1ab44d Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Sat, 4 Nov 2023 11:21:43 +0100 Subject: [PATCH 19/20] Fixed code check --- fedn/fedn/network/controller/controlbase.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index 045faa9c1..fab6a2027 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -234,7 +234,7 @@ def set_round_status(self, round_id, status): self.tracer.set_round_status(round_id, status) def set_round_config(self, round_id, round_config): - """ Upate round in backend db. + """ Upate round in backend db. :param round_id: The round unique identifier :type round_id: str From 5494b1c9ac6db2bdb11df89da359991ccfa3db63 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Sun, 5 Nov 2023 20:39:37 +0100 Subject: [PATCH 20/20] use setter --- fedn/fedn/network/controller/control.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index 10cc6394a..615edb3b5 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -276,7 +276,7 @@ def check_combiners_done_reporting(): self.set_round_status(round_id, 'Failed') return None, self.statestore.get_round(round_id) - round_data["status"] = "Success" + self.set_round_status(round_id, 'Success') # 4. Trigger participating combiner nodes to execute a validation round for the current model validate = session_config["validate"]