diff --git a/fedn/cli/main.py b/fedn/cli/main.py index cc33c579b..d004c9605 100644 --- a/fedn/cli/main.py +++ b/fedn/cli/main.py @@ -1,10 +1,5 @@ -import logging - import click -logging.basicConfig(format='%(asctime)s [%(filename)s:%(lineno)d] %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p') # , level=logging.DEBUG) - CONTEXT_SETTINGS = dict( # Support -h as a shortcut for --help help_option_names=['-h', '--help'], diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index 119b8de45..024748f52 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -106,9 +106,12 @@ def run_cmd(ctx): help='Set logfile for client log to file.') @click.option('--heartbeat-interval', required=False, default=2) @click.option('--reconnect-after-missed-heartbeat', required=False, default=30) +@click.option('--verbosity', required=False, default='INFO', type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], case_sensitive=False)) +@click.option('--theme', required=False, default='default', type=click.Choice(['dark', 'light', 'vibrant', 'default'], case_sensitive=False)) @click.pass_context def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_package, force_ssl, dry_run, secure, preshared_cert, - verify, preferred_combiner, validator, trainer, init, logfile, heartbeat_interval, reconnect_after_missed_heartbeat): + verify, preferred_combiner, validator, trainer, init, logfile, heartbeat_interval, reconnect_after_missed_heartbeat, + verbosity, theme): """ :param ctx: @@ -127,6 +130,8 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa :param logfile: :param hearbeat_interval :param reconnect_after_missed_heartbeat + :param verbosity + :param theme :return: """ remote = False if local_package else True @@ -134,7 +139,7 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa 'client_id': client_id, 'remote_compute_context': remote, 'force_ssl': force_ssl, 'dry_run': dry_run, 'secure': secure, 'preshared_cert': preshared_cert, 'verify': verify, 'preferred_combiner': preferred_combiner, 'validator': validator, 'trainer': trainer, 'init': init, 'logfile': logfile, 'heartbeat_interval': heartbeat_interval, - 'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat} + 'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat, 'verbosity': verbosity, 'theme': theme} if init: apply_config(config) diff --git a/fedn/fedn/common/color_handler.py b/fedn/fedn/common/color_handler.py new file mode 100644 index 000000000..f5a3aa18f --- /dev/null +++ b/fedn/fedn/common/color_handler.py @@ -0,0 +1,58 @@ +import logging +from termcolor import colored + + +class ColorizingStreamHandler(logging.StreamHandler): + dark_theme = { + 'DEBUG': 'white', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'red', + } + + light_theme = { + 'DEBUG': 'black', + 'INFO': 'blue', + 'WARNING': 'magenta', + 'ERROR': 'red', + 'CRITICAL': 'red', + } + + vibrant_theme = { + 'DEBUG': 'cyan', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'red', + } + + def __init__(self, theme='dark'): + super().__init__() + self.set_theme(theme) + + def set_theme(self, theme): + if theme == 'dark': + self.color_map = self.dark_theme + elif theme == 'light': + self.color_map = self.light_theme + elif theme == 'vibrant': + self.color_map = self.vibrant_theme + elif theme == 'default': + self.color_map = {} # No color applied + else: + self.color_map = {} # No color applied + + def emit(self, record): + try: + # Separate the log level from the message + level = '[{}]'.format(record.levelname) + color = self.color_map.get(record.levelname, 'white') + colored_level = colored(level, color) + + # Combine the colored log level with the rest of the message + message = self.format(record).replace(level, colored_level) + self.stream.write(message + "\n") + self.flush() + except Exception: + self.handleError(record) \ No newline at end of file diff --git a/fedn/fedn/common/log_config.py b/fedn/fedn/common/log_config.py new file mode 100644 index 000000000..407861b53 --- /dev/null +++ b/fedn/fedn/common/log_config.py @@ -0,0 +1,49 @@ +import logging +import logging.config +import urllib3 +from fedn.common.color_handler import ColorizingStreamHandler +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +logging.getLogger("urllib3").setLevel(logging.ERROR) + +handler = ColorizingStreamHandler(theme='dark') +logger = logging.getLogger() +logger.addHandler(handler) +logger.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') +handler.setFormatter(formatter) + + +def set_log_level_from_string(level_str): + """ + Set the log level based on a string input. + """ + # Mapping of string representation to logging constants + level_mapping = { + 'CRITICAL': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'INFO': logging.INFO, + 'DEBUG': logging.DEBUG, + } + + # Get the logging level from the mapping + level = level_mapping.get(level_str.upper()) + + if not level: + raise ValueError(f"Invalid log level: {level_str}") + + # Set the log level + logger.setLevel(level) + + +def set_theme_from_string(theme_str): + """ + Set the logging color theme based on a string input. + """ + # Check if the theme string is valid + valid_themes = ['dark', 'light', 'vibrant', 'default'] + if theme_str.lower() not in valid_themes: + raise ValueError(f"Invalid theme: {theme_str}. Valid themes are: {', '.join(valid_themes)}") + + # Set the theme for the ColorizingStreamHandler + handler.set_theme(theme_str.lower()) diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 9851b32ef..387422660 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -19,12 +19,13 @@ import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc +from fedn.common.log_config import logger, set_log_level_from_string, set_theme_from_string from fedn.network.clients.connect import ConnectorClient, Status from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers import get_helper -from fedn.utils.logger import Logger + CHUNK_SIZE = 1024 * 1024 VALID_NAME_REGEX = '^[a-zA-Z0-9_-]*$' @@ -49,13 +50,22 @@ class Client: def __init__(self, config): """Initialize the client.""" - + print(""" _____ _ _ ______ ______ _____ + / ____| | | | | | ____| ____| __ \ + | (___ ___ __ _| | ___ ___ _ _| |_ | |__ | |__ | | | |_ __ + \___ \ / __/ _` | |/ _ \/ _ \| | | | __| | __| | __| | | | | '_ \ + ____) | (_| (_| | | __/ (_) | |_| | |_ | | | |____| |__| | | | | + |_____/ \___\__,_|_|\___|\___/ \__,_|\__| |_| |______|_____/|_| |_| +""") self.state = None self.error_state = False self._attached = False self._missed_heartbeat = 0 self.config = config + set_log_level_from_string(config.get('verbosity', "INFO")) + set_theme_from_string(config.get('theme', 'default')) + self.connector = ConnectorClient(host=config['discover_host'], port=config['discover_port'], token=config['token'], @@ -76,8 +86,8 @@ def __init__(self, config): self.run_path = os.path.join(os.getcwd(), dirname) os.mkdir(self.run_path) - self.logger = Logger( - to_file=config['logfile'], file_path=self.run_path) + # self.logger = Logger( + # to_file=config['logfile'], file_path=self.run_path) self.started_at = datetime.now() self.logs = [] @@ -90,8 +100,8 @@ def __init__(self, config): self._initialize_helper(client_config) if not self.helper: - print("Failed to retrive helper class settings! {}".format( - client_config), flush=True) + logger.warning("Failed to retrive helper class settings: {}".format( + client_config)) self._subscribe_to_combiner(config) @@ -104,27 +114,26 @@ def _assign(self): :rtype: dict """ - print("Asking for assignment!", flush=True) + logger.info("Initiating assignment request.") while True: status, response = self.connector.assign() if status == Status.TryAgain: - print(response, flush=True) + logger.info(response) time.sleep(5) continue if status == Status.Assigned: client_config = response break if status == Status.UnAuthorized: - print(response, flush=True) + logger.warning(response) sys.exit("Exiting: Unauthorized") if status == Status.UnMatchedConfig: - print(response, flush=True) + logger.warning(response) sys.exit("Exiting: UnMatchedConfig") time.sleep(5) - print(".", end=' ', flush=True) - print("Got assigned!", flush=True) - print("Received combiner config: {}".format(client_config), flush=True) + logger.info("Assignment successfully received.") + logger.info("Received combiner configuration: {}".format(client_config)) return client_config def _connect(self, client_config): @@ -143,10 +152,10 @@ def _connect(self, client_config): host = client_config['fqdn'] # assuming https if fqdn is used port = 443 - print(f"CLIENT: Connecting to combiner host: {host}:{port}", flush=True) + logger.info(f"Initiating connection to combiner host at: {host}:{port}") if client_config['certificate']: - print("CLIENT: using certificate from Reducer for GRPC channel") + logger.info("Utilizing CA certificate for GRPC channel authentication.") secure = True cert = base64.b64decode( client_config['certificate']) # .decode('utf-8') @@ -154,13 +163,13 @@ def _connect(self, client_config): channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) elif os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): secure = True - print("CLIENT: using root certificate from environment variable for GRPC channel") + logger.info("Using root certificate from environment variable for GRPC channel.") with open(os.environ["FEDN_GRPC_ROOT_CERT_PATH"], 'rb') as f: credentials = grpc.ssl_channel_credentials(f.read()) channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) elif self.config['secure']: secure = True - print("CLIENT: using CA certificate for GRPC channel") + logger.info("Using CA certificate for GRPC channel.") cert = ssl.get_server_certificate((host, port)) credentials = grpc.ssl_channel_credentials(cert.encode('utf-8')) @@ -171,7 +180,7 @@ def _connect(self, client_config): else: channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) else: - print("CLIENT: using insecure GRPC channel") + logger.info("Using insecure GRPC channel.") if port == 443: port = 80 channel = grpc.insecure_channel("{}:{}".format( @@ -184,13 +193,11 @@ def _connect(self, client_config): self.combinerStub = rpc.CombinerStub(channel) self.modelStub = rpc.ModelServiceStub(channel) - print("Client: {} connected {} to {}:{}".format(self.name, - "SECURED" if secure else "INSECURE", - host, - port), - flush=True) + logger.info("Successfully established {} connection to {}:{}".format("secure" if secure else "insecure", + host, + port)) - print("Client: Using {} compute package.".format( + logger.info("Using {} compute package.".format( client_config["package"])) def _disconnect(self): @@ -201,7 +208,7 @@ def _detach(self): """Detach from the FEDn network (disconnect from combiner)""" # Setting _attached to False will make all processing threads return if not self._attached: - print("Client is not attached.", flush=True) + logger.info("Client is not attached.") self._attached = False # Close gRPC connection to combiner @@ -211,7 +218,7 @@ def _attach(self): """Attach to the FEDn network (connect to combiner)""" # Ask controller for a combiner and connect to that combiner. if self._attached: - print("Client is already attached. ", flush=True) + logger.info("Client is already attached. ") return None client_config = self._assign() @@ -284,19 +291,16 @@ def _initialize_dispatcher(self, config): if retval: break time.sleep(60) - print("No compute package available... retrying in 60s Trying {} more times.".format( - tries), flush=True) + logger.warning("Compute package not available. Retrying in 60 seconds. {} attempts remaining.".format(tries)) tries -= 1 if retval: if 'checksum' not in config: - print( - "\nWARNING: Skipping security validation of local package!, make sure you trust the package source.\n", - flush=True) + logger.warning("Bypassing security validation for local package. Ensure the package source is trusted.") else: checks_out = pr.validate(config['checksum']) if not checks_out: - print("Validation was enforced and invalid, client closing!") + logger.critical("Validation of local package failed. Client terminating.") self.error_state = True return @@ -305,10 +309,12 @@ def _initialize_dispatcher(self, config): self.dispatcher = pr.dispatcher(self.run_path) try: - print("Running Dispatcher for entrypoint: startup", flush=True) + logger.info("Initiating Dispatcher with entrypoint set to: startup") self.dispatcher.run_cmd("startup") except KeyError: pass + except Exception as e: + logger.error(f"Caught exception: {type(e).__name__}") else: # TODO: Deprecate dispatch_config = {'entry_points': @@ -481,11 +487,14 @@ def _process_training_request(self, model_id): outpath = self.helper.get_tmp_path() tic = time.time() # TODO: Check return status, fail gracefully + self.dispatcher.run_cmd("train {} {}".format(inpath, outpath)) + meta['exec_training'] = time.time() - tic tic = time.time() out_model = None + with open(outpath, "rb") as fr: out_model = io.BytesIO(fr.read()) @@ -509,6 +518,11 @@ def _process_training_request(self, model_id): updated_model_id = None meta = {'status': 'failed', 'error': str(e)} + # Push model update to combiner server + updated_model_id = uuid.uuid4() + self.set_model(out_model, str(updated_model_id)) + meta['upload_model'] = time.time() - tic + self.state = ClientState.idle return updated_model_id, meta @@ -549,7 +563,7 @@ def _process_validation_request(self, model_id, is_inference): os.unlink(outpath) except Exception as e: - print("Validation failed with exception {}".format(e), flush=True) + logger.warning("Validation failed with exception {}".format(e)) raise self.state = ClientState.idle return None @@ -659,8 +673,9 @@ def _send_heartbeat(self, update_frequency=2.0): self._missed_heartbeat = 0 except grpc.RpcError as e: status_code = e.code() - print("CLIENT heartbeat: GRPC ERROR {} retrying..".format( - status_code.name), flush=True) + logger.warning("CLIENT heartbeat: GRPC ERROR {} retrying..".format( + status_code.name)) + logger.debug(e) self._handle_combiner_failure() time.sleep(update_frequency) @@ -705,18 +720,15 @@ def run(self): time.sleep(1) cnt += 1 if self.state != old_state: - print("{}:CLIENT in {} state".format(datetime.now().strftime( - '%Y-%m-%d %H:%M:%S'), ClientStateToString(self.state)), flush=True) + logger.info("Client in {} state.".format(ClientStateToString(self.state))) if cnt > 5: - print("{}:CLIENT active".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S')), flush=True) cnt = 0 if not self._attached: - print("Detatched from combiner.", flush=True) + logger.info("Detached from combiner.") # TODO: Implement a check/condition to ulitmately close down if too many reattachment attepts have failed. s self._attach() self._subscribe_to_combiner(self.config) if self.error_state: return except KeyboardInterrupt: - print("Ok, exiting..") + logger.info("Shutting down.") diff --git a/fedn/fedn/network/clients/connect.py b/fedn/fedn/network/clients/connect.py index 2f8acfa8d..07329ee3a 100644 --- a/fedn/fedn/network/clients/connect.py +++ b/fedn/fedn/network/clients/connect.py @@ -8,6 +8,7 @@ import requests +from fedn.common.log_config import logger class Status(enum.Enum): """ Enum for representing the status of a client assignment.""" @@ -63,8 +64,7 @@ def __init__(self, host, port, token, name, remote_package, force_ssl=False, ver self.connect_string = "{}{}".format( self.prefix, self.host) - print("\n\nsetting the connection string to {}\n\n".format( - self.connect_string), flush=True) + logger.info("Setting connection string to {}.".format(self.connect_string)) def assign(self): """ diff --git a/fedn/fedn/network/clients/package.py b/fedn/fedn/network/clients/package.py index d6c91ccba..a8e4fe1df 100644 --- a/fedn/fedn/network/clients/package.py +++ b/fedn/fedn/network/clients/package.py @@ -11,7 +11,7 @@ from fedn.utils.checksum import sha from fedn.utils.dispatcher import Dispatcher - +from fedn.common.log_config import logger class PackageRuntime: """ PackageRuntime is used to download, validate and unpack compute packages. @@ -65,7 +65,7 @@ def download(self, host, port, token, force_ssl=False, secure=False, name=None): try: self.pkg_name = params['filename'] except KeyError: - print("No package returned!", flush=True) + logger.error("No package returned.") return None r.raise_for_status() with open(os.path.join(self.pkg_path, self.pkg_name), 'wb') as f: @@ -85,7 +85,7 @@ def download(self, host, port, token, force_ssl=False, secure=False, name=None): try: self.checksum = data['checksum'] except Exception: - print("Could not extract checksum!") + logger.error("Could not extract checksum.") return True @@ -102,7 +102,7 @@ def validate(self, expected_checksum): file_checksum = str(sha(os.path.join(self.pkg_path, self.pkg_name))) if self.checksum == self.expected_checksum == file_checksum: - print("Package validated {}".format(self.checksum)) + logger.info("Package validated {}".format(self.checksum)) return True else: return False @@ -165,8 +165,8 @@ def dispatcher(self, run_path): self.dispatch_config = cfg except Exception: - print( - "Error trying to load and unpack dispatcher config - trying default", flush=True) + logger.error( + "Error trying to load and unpack dispatcher config - trying default") dispatcher = Dispatcher(self.dispatch_config, run_path) diff --git a/fedn/fedn/network/combiner/connect.py b/fedn/fedn/network/combiner/connect.py index de705a56c..30446c35f 100644 --- a/fedn/fedn/network/combiner/connect.py +++ b/fedn/fedn/network/combiner/connect.py @@ -8,6 +8,7 @@ import requests +from fedn.common.log_config import logger class Status(enum.Enum): """ Enum for representing the status of a combiner announcement.""" @@ -83,8 +84,7 @@ def __init__(self, host, port, myhost, fqdn, myport, token, name, secure=False, self.connect_string = "{}{}".format( self.prefix, self.host) - print("\n\nsetting the connection string to {}\n\n".format( - self.connect_string), flush=True) + logger.info("Setting connection string to {}".format(self.connect_string)) def announce(self): """ diff --git a/fedn/fedn/network/dashboard/restservice.py b/fedn/fedn/network/dashboard/restservice.py index 3c272349e..6e44897a1 100644 --- a/fedn/fedn/network/dashboard/restservice.py +++ b/fedn/fedn/network/dashboard/restservice.py @@ -182,7 +182,7 @@ def check_configured_response(self): if not self.control.idle(): return jsonify({'status': 'retry', 'package': self.package, - 'msg': "Conroller is not in idle state, try again later. "}) + 'msg': "Controller is not in idle state, try again later. "}) return None def check_configured(self): diff --git a/fedn/fedn/utils/dispatcher.py b/fedn/fedn/utils/dispatcher.py index 3fe0a3fc1..a3aab7d8e 100644 --- a/fedn/fedn/utils/dispatcher.py +++ b/fedn/fedn/utils/dispatcher.py @@ -1,8 +1,5 @@ -import logging - from fedn.utils.process import run_process - -logger = logging.getLogger(__name__) +from fedn.common.log_config import logger class Dispatcher: @@ -39,11 +36,9 @@ def run_cmd(self, cmd_type): # add the corresponding process defined in project.yaml and append arguments from invoked command args = shell + [' '.join(cmd + args)] - # print("trying to run process {} with args {}".format(cmd, args)) run_process(args=args, cwd=self.project_dir) - logger.info('DONE RUNNING {}'.format(cmd_type)) + logger.info('Done executing {}'.format(cmd_type)) except IndexError: - message = "No such argument or configuration to run!" + message = "No such argument or configuration to run." logger.error(message) - print(message, flush=True) diff --git a/fedn/fedn/utils/logger.py b/fedn/fedn/utils/logger.py deleted file mode 100644 index 563012996..000000000 --- a/fedn/fedn/utils/logger.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging -import os - - -class Logger: - """ Logger class for Fedn. - - :param log_level: The log level. - :type log_level: int - :param to_file: The name of the file to log to. - :type to_file: str - :param file_path: The path to the log file. - :type file_path: str - """ - - def __init__(self, log_level=logging.DEBUG, to_file='', file_path=os.getcwd()): - """ Initialize the logger.""" - root = logging.getLogger() - root.setLevel(log_level) - - # sh = logging.StreamHandler(sys.stdout) - sh = logging.StreamHandler() - sh.setLevel(log_level) - log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - formatter = logging.Formatter(log_format) - sh.setFormatter(formatter) - root.addHandler(sh) - - if to_file != '': - fh = logging.FileHandler(os.path.join( - file_path, '{}'.format(to_file))) - fh.setFormatter(logging.Formatter(log_format)) - root.addHandler(fh) diff --git a/fedn/fedn/utils/process.py b/fedn/fedn/utils/process.py index bd31f9441..95e6eaa63 100644 --- a/fedn/fedn/utils/process.py +++ b/fedn/fedn/utils/process.py @@ -1,7 +1,6 @@ -import logging import subprocess -logger = logging.getLogger() +from fedn.common.log_config import logger def run_process(args, cwd): @@ -25,7 +24,7 @@ def check_io(): while True: output = status.stdout.readline().decode() if output: - logger.log(logging.INFO, output) + logger.info(output) else: break diff --git a/fedn/setup.py b/fedn/setup.py index 0adcd8e8e..f7c9fa0bb 100644 --- a/fedn/setup.py +++ b/fedn/setup.py @@ -31,7 +31,8 @@ "plotly", "pandas", "bokeh<3.0.0", - "networkx" + "networkx", + "termcolor" ], license='Apache 2.0', zip_safe=False,