diff --git a/examples/mnist-pytorch/client/fedn.yaml b/examples/mnist-pytorch/client/fedn.yaml index 29c475270..e5d3b2166 100644 --- a/examples/mnist-pytorch/client/fedn.yaml +++ b/examples/mnist-pytorch/client/fedn.yaml @@ -1,5 +1,5 @@ entry_points: train: - command: /venv/bin/python entrypoint train $ENTRYPOINT_OPTS + command: python entrypoint train $ENTRYPOINT_OPTS validate: - command: /venv/bin/python entrypoint validate $ENTRYPOINT_OPTS \ No newline at end of file + command: python entrypoint validate $ENTRYPOINT_OPTS \ No newline at end of file diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index 55f93eba8..3c017c729 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -107,9 +107,12 @@ def run_cmd(ctx): help='Set logfile for client log to file.') @click.option('--heartbeat-interval', required=False, default=2) @click.option('--reconnect-after-missed-heartbeat', required=False, default=30) +@click.option('--verbosity', required=False, default='INFO', type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], case_sensitive=False)) +@click.option('--theme', required=False, default='default', type=click.Choice(['dark', 'light', 'vibrant', 'default'], case_sensitive=False)) @click.pass_context def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_package, force_ssl, dry_run, secure, preshared_cert, - verify, preferred_combiner, validator, trainer, init, logfile, heartbeat_interval, reconnect_after_missed_heartbeat): + verify, preferred_combiner, validator, trainer, init, logfile, heartbeat_interval, reconnect_after_missed_heartbeat, + verbosity, theme): """ :param ctx: @@ -128,6 +131,8 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa :param logfile: :param hearbeat_interval :param reconnect_after_missed_heartbeat + :param verbosity + :param theme :return: """ remote = False if local_package else True @@ -135,7 +140,7 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa 'client_id': client_id, 'remote_compute_context': remote, 'force_ssl': force_ssl, 'dry_run': dry_run, 'secure': secure, 'preshared_cert': preshared_cert, 'verify': verify, 'preferred_combiner': preferred_combiner, 'validator': validator, 'trainer': trainer, 'init': init, 'logfile': logfile, 'heartbeat_interval': heartbeat_interval, - 'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat} + 'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat, 'verbosity': verbosity, 'theme': theme} if init: parse_client_config(config) diff --git a/fedn/fedn/client.py b/fedn/fedn/client.py index f1abfc3db..d8eec61b3 100644 --- a/fedn/fedn/client.py +++ b/fedn/fedn/client.py @@ -24,9 +24,10 @@ from fedn.common.control.package import PackageRuntime from fedn.common.net.connect import ConnectorClient, Status from fedn.common.net.web.client import page, style +from fedn.common.log_config import logger, set_log_level_from_string, set_theme_from_string from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers import get_helper -from fedn.utils.logger import Logger + CHUNK_SIZE = 1024 * 1024 VALID_NAME_REGEX = '^[a-zA-Z0-9_-]*$' @@ -62,13 +63,23 @@ def __init__(self, config): the discovery service (controller) and settings governing e.g. client-combiner assignment behavior. """ - + print("""\n _____ _ _ ______ ______ _____ + / ____| | | | | | ____| ____| __ \ + | (___ ___ __ _| | ___ ___ _ _| |_ | |__ | |__ | | | |_ __ + \___ \ / __/ _` | |/ _ \/ _ \| | | | __| | __| | __| | | | | '_ \ + ____) | (_| (_| | | __/ (_) | |_| | |_ | | | |____| |__| | | | | + |_____/ \___\__,_|_|\___|\___/ \__,_|\__| |_| |______|_____/|_| |_| + + """) self.state = None self.error_state = False self._attached = False self._missed_heartbeat = 0 self.config = config + set_log_level_from_string(config.get('verbosity', "INFO")) + set_theme_from_string(config.get('theme', 'default')) + self.connector = ConnectorClient(host=config['discover_host'], port=config['discover_port'], token=config['token'], @@ -89,8 +100,8 @@ def __init__(self, config): self.run_path = os.path.join(os.getcwd(), dirname) os.mkdir(self.run_path) - self.logger = Logger( - to_file=config['logfile'], file_path=self.run_path) + # self.logger = Logger( + # to_file=config['logfile'], file_path=self.run_path) self.started_at = datetime.now() self.logs = [] @@ -103,8 +114,8 @@ def __init__(self, config): self._initialize_helper(client_config) if not self.helper: - print("Failed to retrive helper class settings! {}".format( - client_config), flush=True) + logger.warning("Failed to retrive helper class settings: {}".format( + client_config)) self._subscribe_to_combiner(config) @@ -113,7 +124,7 @@ def __init__(self, config): def _detach(self): # Setting _attached to False will make all processing threads return if not self._attached: - print("Client is not attached.", flush=True) + logger.info("Client is not attached.") self._attached = False # Close gRPC connection to combiner @@ -123,7 +134,7 @@ def _attach(self): """ """ # Ask controller for a combiner and connect to that combiner. if self._attached: - print("Client is already attached. ", flush=True) + logger.info("Client is already attached. ") return None client_config = self._assign() @@ -178,19 +189,16 @@ def _initialize_dispatcher(self, config): if retval: break time.sleep(60) - print("No compute package available... retrying in 60s Trying {} more times.".format( - tries), flush=True) + logger.warning("Compute package not available. Retrying in 60 seconds. {} attempts remaining.".format(tries)) tries -= 1 if retval: if 'checksum' not in config: - print( - "\nWARNING: Skipping security validation of local package!, make sure you trust the package source.\n", - flush=True) + logger.warning("Bypassing security validation for local package. Ensure the package source is trusted.") else: checks_out = pr.validate(config['checksum']) if not checks_out: - print("Validation was enforced and invalid, client closing!") + logger.critical("Validation of local package failed. Client terminating.") self.error_state = True return @@ -199,10 +207,12 @@ def _initialize_dispatcher(self, config): self.dispatcher = pr.dispatcher(self.run_path) try: - print("Running Dispatcher for entrypoint: startup", flush=True) + logger.info("Initiating Dispatcher with entrypoint set to: startup") self.dispatcher.run_cmd("startup") except KeyError: pass + except Exception as e: + print(f"Caught exception: {type(e).__name__}") else: # TODO: Deprecate dispatch_config = {'entry_points': @@ -217,26 +227,26 @@ def _initialize_dispatcher(self, config): def _assign(self): """Contacts the controller and asks for combiner assignment. """ - print("Asking for assignment!", flush=True) + logger.info("Initiating assignment request.") while True: status, response = self.connector.assign() if status == Status.TryAgain: - print(response, flush=True) + logger.info(response) time.sleep(5) continue if status == Status.Assigned: client_config = response break if status == Status.UnAuthorized: - print(response, flush=True) + logger.error(response) sys.exit("Exiting: Unauthorized") if status == Status.UnMatchedConfig: - print(response, flush=True) + logger.error(response) sys.exit("Exiting: UnMatchedConfig") time.sleep(5) - print(".", end=' ', flush=True) + # print(".", end=' ') - print("Got assigned!", flush=True) + logger.info("Assignment successfully received.") return client_config def _connect(self, client_config): @@ -258,10 +268,10 @@ def _connect(self, client_config): host = client_config['fqdn'] # assuming https if fqdn is used port = 443 - print(f"CLIENT: Connecting to combiner host: {host}:{port}", flush=True) + logger.info(f"Initiating connection to combiner host at: {host}:{port}") if client_config['certificate']: - print("CLIENT: using certificate from Reducer for GRPC channel") + logger.info("Using certificate from Reducer for GRPC channel") secure = True cert = base64.b64decode( client_config['certificate']) # .decode('utf-8') @@ -269,13 +279,13 @@ def _connect(self, client_config): channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) elif os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): secure = True - print("CLIENT: using root certificate from environment variable for GRPC channel") + logger.info("Using root certificate from environment variable for GRPC channel") with open(os.environ["FEDN_GRPC_ROOT_CERT_PATH"], 'rb') as f: credentials = grpc.ssl_channel_credentials(f.read()) channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) elif self.config['secure']: secure = True - print("CLIENT: using CA certificate for GRPC channel") + logger.info("Utilizing CA certificate for GRPC channel authentication.") cert = ssl.get_server_certificate((host, port)) credentials = grpc.ssl_channel_credentials(cert.encode('utf-8')) @@ -286,7 +296,7 @@ def _connect(self, client_config): else: channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) else: - print("CLIENT: using insecure GRPC channel") + logger.warning("Using insecure GRPC channel") if port == 443: port = 80 channel = grpc.insecure_channel("{}:{}".format( @@ -299,13 +309,12 @@ def _connect(self, client_config): self.orchestrator = rpc.CombinerStub(channel) self.models = rpc.ModelServiceStub(channel) - print("Client: {} connected {} to {}:{}".format(self.name, - "SECURED" if secure else "INSECURE", - host, - port), - flush=True) + logger.info("{} successfully established {} connection to {}:{}".format(self.name, + "secured" if secure else "insecure", + host, + port)) - print("Client: Using {} compute package.".format( + logger.info("Using {} compute package.".format( client_config["package"])) def _disconnect(self): @@ -407,7 +416,7 @@ def _listen_to_model_update_request_stream(self): # TODO: make configurable timeout = 5 # print("CLIENT __listen_to_model_update_request_stream: GRPC ERROR {} retrying in {}..".format( - # status_code.name, timeout), flush=True) + # status_code.name, timeout)) time.sleep(timeout) except Exception: raise @@ -434,7 +443,7 @@ def _listen_to_model_validation_request_stream(self): # TODO: make configurable timeout = 5 # print("CLIENT __listen_to_model_validation_request_stream: GRPC ERROR {} retrying in {}..".format( - # status_code.name, timeout), flush=True) + # status_code.name, timeout)) time.sleep(timeout) except Exception: raise @@ -528,40 +537,67 @@ def _process_training_request(self, model_id): "\t Starting processing of training request for model_id {}".format(model_id)) self.state = ClientState.training - try: - meta = {} - tic = time.time() - mdl = self.get_model(str(model_id)) - meta['fetch_model'] = time.time() - tic + # try: + meta = {} + tic = time.time() + mdl = self.get_model(str(model_id)) + meta['fetch_model'] = time.time() - tic - inpath = self.helper.get_tmp_path() + inpath = self.helper.get_tmp_path() + try: with open(inpath, 'wb') as fh: fh.write(mdl.getbuffer()) + except FileNotFoundError: + logger.error(f"File {inpath} not found.") + except PermissionError: + logger.error(f"Permission denied for writing to {inpath}.") + except IOError as e: + logger.error(f"An IO error occurred: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") - outpath = self.helper.get_tmp_path() - tic = time.time() - # TODO: Check return status, fail gracefully + outpath = self.helper.get_tmp_path() + tic = time.time() + # TODO: Check return status, fail gracefully + try: self.dispatcher.run_cmd("train {} {}".format(inpath, outpath)) - meta['exec_training'] = time.time() - tic + except Exception as e: + logger.error("Failed to launch training.") + logger.debug(e) + logger.debug(type(e).__name__) + meta['exec_training'] = time.time() - tic - tic = time.time() - out_model = None + tic = time.time() + out_model = None + try: with open(outpath, "rb") as fr: out_model = io.BytesIO(fr.read()) - - # Push model update to combiner server - updated_model_id = uuid.uuid4() - self.set_model(out_model, str(updated_model_id)) - meta['upload_model'] = time.time() - tic - - os.unlink(inpath) - os.unlink(outpath) - + except FileNotFoundError: + logger.error(f"File {outpath} not found.") + except PermissionError: + logger.error(f"Permission denied for writing to {outpath}.") + except IOError as e: + logger.error(f"An IO error occurred: {e}") except Exception as e: - print("ERROR could not process training request due to error: {}".format( - e), flush=True) - updated_model_id = None - meta = {'status': 'failed', 'error': str(e)} + logger.error(f"An unexpected error occurred: {e}") + logger.error(type(e).__name__) + # Push model update to combiner server + updated_model_id = uuid.uuid4() + self.set_model(out_model, str(updated_model_id)) + meta['upload_model'] = time.time() - tic + + os.unlink(inpath) + os.unlink(outpath) + # except FileNotFoundError as e: + # print("File not found.") + # except Exception as e: + # print("ERROR could not process training request due to error: {}".format( + # e)) + # print(type(e).__name__) + # print(inpath) + # print(outpath) + # updated_model_id = None + # meta = {'status': 'failed', 'error': str(e)} self.state = ClientState.idle @@ -588,7 +624,7 @@ def _process_validation_request(self, model_id): os.unlink(outpath) except Exception as e: - print("Validation failed with exception {}".format(e), flush=True) + logger.warning("Validation failed with exception {}".format(e)) raise self.state = ClientState.idle return None @@ -622,8 +658,9 @@ def _send_heartbeat(self, update_frequency=2.0): self._missed_heartbeat = 0 except grpc.RpcError as e: status_code = e.code() - print("CLIENT heartbeat: GRPC ERROR {} retrying..".format( - status_code.name), flush=True) + logger.warning("CLIENT heartbeat: GRPC ERROR {} retrying..".format( + status_code.name)) + logger.debug(e) self._handle_combiner_failure() time.sleep(update_frequency) @@ -684,18 +721,18 @@ def run(self): time.sleep(1) cnt += 1 if self.state != old_state: - print("{}:CLIENT in {} state".format(datetime.now().strftime( - '%Y-%m-%d %H:%M:%S'), ClientStateToString(self.state)), flush=True) + logger.info("Client in {} state.".format(ClientStateToString(self.state))) if cnt > 5: - print("{}:CLIENT active".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S')), flush=True) + # logger.info("CLIENT active.") + # print("{}:CLIENT active".format( + # datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) cnt = 0 if not self._attached: - print("Detatched from combiner.", flush=True) + logger.info("Detached from combiner.") # TODO: Implement a check/condition to ulitmately close down if too many reattachment attepts have failed. s self._attach() self._subscribe_to_combiner(self.config) if self.error_state: return except KeyboardInterrupt: - print("Ok, exiting..") + logger.info("Shutting down.") diff --git a/fedn/fedn/clients/reducer/restservice.py b/fedn/fedn/clients/reducer/restservice.py index 432c6fa92..49eb06976 100644 --- a/fedn/fedn/clients/reducer/restservice.py +++ b/fedn/fedn/clients/reducer/restservice.py @@ -178,7 +178,7 @@ def check_configured_response(self): if not self.control.idle(): return jsonify({'status': 'retry', 'package': self.package, - 'msg': "Conroller is not in idle state, try again later. "}) + 'msg': "Controller is not in idle state, try again later. "}) return None def check_configured(self): diff --git a/fedn/fedn/common/color_handler.py b/fedn/fedn/common/color_handler.py new file mode 100644 index 000000000..f5a3aa18f --- /dev/null +++ b/fedn/fedn/common/color_handler.py @@ -0,0 +1,58 @@ +import logging +from termcolor import colored + + +class ColorizingStreamHandler(logging.StreamHandler): + dark_theme = { + 'DEBUG': 'white', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'red', + } + + light_theme = { + 'DEBUG': 'black', + 'INFO': 'blue', + 'WARNING': 'magenta', + 'ERROR': 'red', + 'CRITICAL': 'red', + } + + vibrant_theme = { + 'DEBUG': 'cyan', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'red', + } + + def __init__(self, theme='dark'): + super().__init__() + self.set_theme(theme) + + def set_theme(self, theme): + if theme == 'dark': + self.color_map = self.dark_theme + elif theme == 'light': + self.color_map = self.light_theme + elif theme == 'vibrant': + self.color_map = self.vibrant_theme + elif theme == 'default': + self.color_map = {} # No color applied + else: + self.color_map = {} # No color applied + + def emit(self, record): + try: + # Separate the log level from the message + level = '[{}]'.format(record.levelname) + color = self.color_map.get(record.levelname, 'white') + colored_level = colored(level, color) + + # Combine the colored log level with the rest of the message + message = self.format(record).replace(level, colored_level) + self.stream.write(message + "\n") + self.flush() + except Exception: + self.handleError(record) \ No newline at end of file diff --git a/fedn/fedn/common/control/package.py b/fedn/fedn/common/control/package.py index b7f3a3471..3e1f8e62b 100644 --- a/fedn/fedn/common/control/package.py +++ b/fedn/fedn/common/control/package.py @@ -9,7 +9,7 @@ from fedn.utils.checksum import sha from fedn.utils.dispatcher import Dispatcher - +from fedn.common.log_config import logger class Package: """ @@ -76,7 +76,7 @@ def upload(self): # print("going to send {}".format(data),flush=True) f = open(os.path.join(os.path.dirname( self.file_path), self.package_file), 'rb') - print("Sending the following file {}".format(f.read()), flush=True) + logger.info("Sending the following file {}".format(f.read())) f.seek(0, 0) files = {'file': f} try: @@ -85,13 +85,10 @@ def upload(self): # data=data, headers={'Authorization': 'Token {}'.format(self.reducer_token)}) except Exception as e: - print("failed to put execution context to reducer. {}".format( - e), flush=True) + logger.error("Failed to put execution context to reducer. {}".format(e)) finally: f.close() - print("Upload 4 ", flush=True) - class PackageRuntime: """ @@ -141,7 +138,7 @@ def download(self, host, port, token, force_ssl=False, secure=False, name=None): try: self.pkg_name = params['filename'] except KeyError: - print("No package returned!", flush=True) + logger.error("No package returned.") return None r.raise_for_status() with open(os.path.join(self.pkg_path, self.pkg_name), 'wb') as f: @@ -161,7 +158,7 @@ def download(self, host, port, token, force_ssl=False, secure=False, name=None): try: self.checksum = data['checksum'] except Exception: - print("Could not extract checksum!") + logger.error("Could not extract checksum.") return True @@ -183,7 +180,7 @@ def validate(self, expected_checksum): # return True if self.checksum == self.expected_checksum == file_checksum: - print("Package validated {}".format(self.checksum)) + logger.info("Package validated {}".format(self.checksum)) return True else: return False @@ -204,7 +201,7 @@ def unpack(self): f = tarfile.open(os.path.join( self.pkg_path, self.pkg_name), 'r:bz2') else: - print( + logger.warning( "Failed to unpack compute package, no pkg_name set. Has the reducer been configured with a compute package?") os.getcwd() @@ -213,10 +210,10 @@ def unpack(self): if f: f.extractall() - print("Successfully extracted compute package content in {}".format( - self.dir), flush=True) + logger.info("Successfully extracted compute package content in {}".format( + self.dir)) except Exception: - print("Error extracting files!") + logger.errro("Error extracting files!") def dispatcher(self, run_path): """ @@ -237,8 +234,8 @@ def dispatcher(self, run_path): self.dispatch_config = cfg except Exception: - print( - "Error trying to load and unpack dispatcher config - trying default", flush=True) + logger.error( + "Error trying to load and unpack dispatcher config - trying default") dispatcher = Dispatcher(self.dispatch_config, run_path) diff --git a/fedn/fedn/common/log_config.py b/fedn/fedn/common/log_config.py new file mode 100644 index 000000000..407861b53 --- /dev/null +++ b/fedn/fedn/common/log_config.py @@ -0,0 +1,49 @@ +import logging +import logging.config +import urllib3 +from fedn.common.color_handler import ColorizingStreamHandler +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +logging.getLogger("urllib3").setLevel(logging.ERROR) + +handler = ColorizingStreamHandler(theme='dark') +logger = logging.getLogger() +logger.addHandler(handler) +logger.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') +handler.setFormatter(formatter) + + +def set_log_level_from_string(level_str): + """ + Set the log level based on a string input. + """ + # Mapping of string representation to logging constants + level_mapping = { + 'CRITICAL': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'INFO': logging.INFO, + 'DEBUG': logging.DEBUG, + } + + # Get the logging level from the mapping + level = level_mapping.get(level_str.upper()) + + if not level: + raise ValueError(f"Invalid log level: {level_str}") + + # Set the log level + logger.setLevel(level) + + +def set_theme_from_string(theme_str): + """ + Set the logging color theme based on a string input. + """ + # Check if the theme string is valid + valid_themes = ['dark', 'light', 'vibrant', 'default'] + if theme_str.lower() not in valid_themes: + raise ValueError(f"Invalid theme: {theme_str}. Valid themes are: {', '.join(valid_themes)}") + + # Set the theme for the ColorizingStreamHandler + handler.set_theme(theme_str.lower()) diff --git a/fedn/fedn/common/net/connect.py b/fedn/fedn/common/net/connect.py index 0a7cf4051..165427e7e 100644 --- a/fedn/fedn/common/net/connect.py +++ b/fedn/fedn/common/net/connect.py @@ -1,7 +1,7 @@ import enum import requests as r - +from fedn.common.log_config import logger class State(enum.Enum): Disconnected = 0 @@ -45,8 +45,7 @@ def __init__(self, host, port, token, name, remote_package, force_ssl=False, ver self.connect_string = "{}{}".format( self.prefix, self.host) - print("\n\nsetting the connection string to {}\n\n".format( - self.connect_string), flush=True) + logger.info("Established connection string to {}.".format(self.connect_string)) def state(self): """ @@ -77,7 +76,7 @@ def assign(self): allow_redirects=True, headers={'Authorization': 'Token {}'.format(self.token)}) except Exception as e: - print('***** {}'.format(e), flush=True) + logger.error('***** {}'.format(e)) return Status.Unassigned, {} if retval.status_code == 401: @@ -131,8 +130,7 @@ def __init__(self, host, port, myhost, fqdn, myport, token, name, secure=False, self.connect_string = "{}{}".format( self.prefix, self.host) - print("\n\nsetting the connection string to {}\n\n".format( - self.connect_string), flush=True) + logger.info("Established connection string to {}.".format(self.connect_string)) def state(self): """ diff --git a/fedn/fedn/utils/dispatcher.py b/fedn/fedn/utils/dispatcher.py index 63743e6a6..856466173 100644 --- a/fedn/fedn/utils/dispatcher.py +++ b/fedn/fedn/utils/dispatcher.py @@ -1,3 +1,4 @@ +import platform import logging from fedn.utils.process import run_process @@ -28,11 +29,14 @@ def run_cmd(self, cmd_type): args = cmdsandargs[1:] # shell (this could be a venv, TODO: parametrize) - shell = ['/bin/sh', '-c'] + if platform.system() == "Windows": + shell = ['powershell.exe', '-Command'] + else: + shell = ['/bin/sh', '-c'] # add the corresponding process defined in project.yaml and append arguments from invoked command args = shell + [' '.join(cmd + args)] - # print("trying to run process {} with args {}".format(cmd, args)) + logger.debug("trying to run process {} with args {}".format(cmd, args)) run_process(args=args, cwd=self.project_dir) logger.info('DONE RUNNING {}'.format(cmd_type))