diff --git a/fedn/cli/main.py b/fedn/cli/main.py index cc33c579b..d004c9605 100644 --- a/fedn/cli/main.py +++ b/fedn/cli/main.py @@ -1,10 +1,5 @@ -import logging - import click -logging.basicConfig(format='%(asctime)s [%(filename)s:%(lineno)d] %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p') # , level=logging.DEBUG) - CONTEXT_SETTINGS = dict( # Support -h as a shortcut for --help help_option_names=['-h', '--help'], diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index 119b8de45..c0c442c3d 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -1,4 +1,3 @@ -import time import uuid import click @@ -102,13 +101,15 @@ def run_cmd(ctx): @click.option('-tr', '--trainer', required=False, default=True) @click.option('-in', '--init', required=False, default=None, help='Set to a filename to (re)init client from file state.') -@click.option('-l', '--logfile', required=False, default='{}-client.log'.format(time.strftime("%Y%m%d-%H%M%S")), +@click.option('-l', '--logfile', required=False, default=None, help='Set logfile for client log to file.') @click.option('--heartbeat-interval', required=False, default=2) @click.option('--reconnect-after-missed-heartbeat', required=False, default=30) +@click.option('--verbosity', required=False, default='INFO', type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], case_sensitive=False)) @click.pass_context def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_package, force_ssl, dry_run, secure, preshared_cert, - verify, preferred_combiner, validator, trainer, init, logfile, heartbeat_interval, reconnect_after_missed_heartbeat): + verify, preferred_combiner, validator, trainer, init, logfile, heartbeat_interval, reconnect_after_missed_heartbeat, + verbosity): """ :param ctx: @@ -127,6 +128,7 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa :param logfile: :param hearbeat_interval :param reconnect_after_missed_heartbeat + :param verbosity :return: """ remote = False if local_package else True @@ -134,7 +136,7 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa 'client_id': client_id, 'remote_compute_context': remote, 'force_ssl': force_ssl, 'dry_run': dry_run, 'secure': secure, 'preshared_cert': preshared_cert, 'verify': verify, 'preferred_combiner': preferred_combiner, 'validator': validator, 'trainer': trainer, 'init': init, 'logfile': logfile, 'heartbeat_interval': heartbeat_interval, - 'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat} + 'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat, 'verbosity': verbosity} if init: apply_config(config) diff --git a/fedn/fedn/common/log_config.py b/fedn/fedn/common/log_config.py new file mode 100644 index 000000000..47579cb82 --- /dev/null +++ b/fedn/fedn/common/log_config.py @@ -0,0 +1,56 @@ +import logging +import logging.config + +import urllib3 + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +logging.getLogger("urllib3").setLevel(logging.ERROR) + +handler = logging.StreamHandler() +logger = logging.getLogger() +logger.addHandler(handler) +logger.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') +handler.setFormatter(formatter) + + +def set_log_level_from_string(level_str): + """ + Set the log level based on a string input. + """ + # Mapping of string representation to logging constants + level_mapping = { + 'CRITICAL': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'INFO': logging.INFO, + 'DEBUG': logging.DEBUG, + } + + # Get the logging level from the mapping + level = level_mapping.get(level_str.upper()) + + if not level: + raise ValueError(f"Invalid log level: {level_str}") + + # Set the log level + logger.setLevel(level) + + +def set_log_stream(log_file): + """ + Redirect the log stream to a specified file, if log_file is set. + """ + if not log_file: + return + + # Remove existing handlers + for h in logger.handlers[:]: + logger.removeHandler(h) + + # Create a FileHandler + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(formatter) + + # Add the file handler to the logger + logger.addHandler(file_handler) diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index e27616925..44b8e8a4e 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -21,12 +21,13 @@ import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc +from fedn.common.log_config import (logger, set_log_level_from_string, + set_log_stream) from fedn.network.clients.connect import ConnectorClient, Status from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers import get_helper -from fedn.utils.logger import Logger CHUNK_SIZE = 1024 * 1024 VALID_NAME_REGEX = '^[a-zA-Z0-9_-]*$' @@ -51,13 +52,15 @@ class Client: def __init__(self, config): """Initialize the client.""" - self.state = None self.error_state = False self._attached = False self._missed_heartbeat = 0 self.config = config + set_log_level_from_string(config.get('verbosity', "INFO")) + set_log_stream(config.get('logfile', None)) + self.connector = ConnectorClient(host=config['discover_host'], port=config['discover_port'], token=config['token'], @@ -78,8 +81,6 @@ def __init__(self, config): self.run_path = os.path.join(os.getcwd(), dirname) os.mkdir(self.run_path) - self.logger = Logger( - to_file=config['logfile'], file_path=self.run_path) self.started_at = datetime.now() self.logs = [] @@ -92,8 +93,8 @@ def __init__(self, config): self._initialize_helper(client_config) if not self.helper: - print("Failed to retrive helper class settings! {}".format( - client_config), flush=True) + logger.warning("Failed to retrieve helper class settings: {}".format( + client_config)) self._subscribe_to_combiner(config) @@ -106,27 +107,26 @@ def _assign(self): :rtype: dict """ - print("Asking for assignment!", flush=True) + logger.info("Initiating assignment request.") while True: status, response = self.connector.assign() if status == Status.TryAgain: - print(response, flush=True) + logger.info(response) time.sleep(5) continue if status == Status.Assigned: client_config = response break if status == Status.UnAuthorized: - print(response, flush=True) + logger.critical(response) sys.exit("Exiting: Unauthorized") if status == Status.UnMatchedConfig: - print(response, flush=True) + logger.critical(response) sys.exit("Exiting: UnMatchedConfig") time.sleep(5) - print(".", end=' ', flush=True) - print("Got assigned!", flush=True) - print("Received combiner config: {}".format(client_config), flush=True) + logger.info("Assignment successfully received.") + logger.info("Received combiner configuration: {}".format(client_config)) return client_config def _add_grpc_metadata(self, key, value): @@ -177,17 +177,17 @@ def _connect(self, client_config): host = client_config['host'] # Add host to gRPC metadata self._add_grpc_metadata('grpc-server', host) - print("CLIENT: Using metadata: {}".format(self.metadata), flush=True) + logger.info("Client using metadata: {}.".format(self.metadata)) port = client_config['port'] secure = False if client_config['fqdn'] is not None: host = client_config['fqdn'] # assuming https if fqdn is used port = 443 - print(f"CLIENT: Connecting to combiner host: {host}:{port}", flush=True) + logger.info(f"Initiating connection to combiner host at: {host}:{port}") if client_config['certificate']: - print("CLIENT: using certificate from Reducer for GRPC channel") + logger.info("Utilizing CA certificate for GRPC channel authentication.") secure = True cert = base64.b64decode( client_config['certificate']) # .decode('utf-8') @@ -195,13 +195,13 @@ def _connect(self, client_config): channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) elif os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): secure = True - print("CLIENT: using root certificate from environment variable for GRPC channel") + logger.info("Using root certificate from environment variable for GRPC channel.") with open(os.environ["FEDN_GRPC_ROOT_CERT_PATH"], 'rb') as f: credentials = grpc.ssl_channel_credentials(f.read()) channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) elif self.config['secure']: secure = True - print("CLIENT: using CA certificate for GRPC channel") + logger.info("Using CA certificate for GRPC channel.") cert = self._get_ssl_certificate(host, port=port) credentials = grpc.ssl_channel_credentials(cert.encode('utf-8')) @@ -212,7 +212,7 @@ def _connect(self, client_config): else: channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) else: - print("CLIENT: using insecure GRPC channel") + logger.info("Using insecure GRPC channel.") if port == 443: port = 80 channel = grpc.insecure_channel("{}:{}".format( @@ -225,13 +225,11 @@ def _connect(self, client_config): self.combinerStub = rpc.CombinerStub(channel) self.modelStub = rpc.ModelServiceStub(channel) - print("Client: {} connected {} to {}:{}".format(self.name, - "SECURED" if secure else "INSECURE", - host, - port), - flush=True) + logger.info("Successfully established {} connection to {}:{}".format("secure" if secure else "insecure", + host, + port)) - print("Client: Using {} compute package.".format( + logger.info("Using {} compute package.".format( client_config["package"])) def _disconnect(self): @@ -242,7 +240,7 @@ def _detach(self): """Detach from the FEDn network (disconnect from combiner)""" # Setting _attached to False will make all processing threads return if not self._attached: - print("Client is not attached.", flush=True) + logger.info("Client is not attached.") self._attached = False # Close gRPC connection to combiner @@ -252,7 +250,7 @@ def _attach(self): """Attach to the FEDn network (connect to combiner)""" # Ask controller for a combiner and connect to that combiner. if self._attached: - print("Client is already attached. ", flush=True) + logger.info("Client is already attached. ") return None client_config = self._assign() @@ -325,19 +323,16 @@ def _initialize_dispatcher(self, config): if retval: break time.sleep(60) - print("No compute package available... retrying in 60s Trying {} more times.".format( - tries), flush=True) + logger.warning("Compute package not available. Retrying in 60 seconds. {} attempts remaining.".format(tries)) tries -= 1 if retval: if 'checksum' not in config: - print( - "\nWARNING: Skipping security validation of local package!, make sure you trust the package source.\n", - flush=True) + logger.warning("Bypassing security validation for local package. Ensure the package source is trusted.") else: checks_out = pr.validate(config['checksum']) if not checks_out: - print("Validation was enforced and invalid, client closing!") + logger.critical("Validation of local package failed. Client terminating.") self.error_state = True return @@ -346,10 +341,12 @@ def _initialize_dispatcher(self, config): self.dispatcher = pr.dispatcher(self.run_path) try: - print("Running Dispatcher for entrypoint: startup", flush=True) + logger.info("Initiating Dispatcher with entrypoint set to: startup") self.dispatcher.run_cmd("startup") except KeyError: pass + except Exception as e: + logger.error(f"Caught exception: {type(e).__name__}") else: # TODO: Deprecate dispatch_config = {'entry_points': @@ -523,11 +520,14 @@ def _process_training_request(self, model_id): outpath = self.helper.get_tmp_path() tic = time.time() # TODO: Check return status, fail gracefully + self.dispatcher.run_cmd("train {} {}".format(inpath, outpath)) + meta['exec_training'] = time.time() - tic tic = time.time() out_model = None + with open(outpath, "rb") as fr: out_model = io.BytesIO(fr.read()) @@ -546,11 +546,15 @@ def _process_training_request(self, model_id): os.unlink(outpath+'-metadata') except Exception as e: - print("ERROR could not process training request due to error: {}".format( - e), flush=True) + logger.error("Could not process training request due to error: {}".format(e)) updated_model_id = None meta = {'status': 'failed', 'error': str(e)} + # Push model update to combiner server + updated_model_id = uuid.uuid4() + self.set_model(out_model, str(updated_model_id)) + meta['upload_model'] = time.time() - tic + self.state = ClientState.idle return updated_model_id, meta @@ -591,7 +595,7 @@ def _process_validation_request(self, model_id, is_inference): os.unlink(outpath) except Exception as e: - print("Validation failed with exception {}".format(e), flush=True) + logger.warning("Validation failed with exception {}".format(e)) raise self.state = ClientState.idle return None @@ -701,8 +705,9 @@ def _send_heartbeat(self, update_frequency=2.0): self._missed_heartbeat = 0 except grpc.RpcError as e: status_code = e.code() - print("CLIENT heartbeat: GRPC ERROR {} retrying..".format( - status_code.name), flush=True) + logger.warning("Client heartbeat: GRPC error, {}. Retrying.".format( + status_code.name)) + logger.debug(e) self._handle_combiner_failure() time.sleep(update_frequency) @@ -747,18 +752,15 @@ def run(self): time.sleep(1) cnt += 1 if self.state != old_state: - print("{}:CLIENT in {} state".format(datetime.now().strftime( - '%Y-%m-%d %H:%M:%S'), ClientStateToString(self.state)), flush=True) + logger.info("Client in {} state.".format(ClientStateToString(self.state))) if cnt > 5: - print("{}:CLIENT active".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S')), flush=True) cnt = 0 if not self._attached: - print("Detatched from combiner.", flush=True) + logger.info("Detached from combiner.") # TODO: Implement a check/condition to ulitmately close down if too many reattachment attepts have failed. s self._attach() self._subscribe_to_combiner(self.config) if self.error_state: return except KeyboardInterrupt: - print("Ok, exiting..") + logger.info("Shutting down.") diff --git a/fedn/fedn/network/clients/connect.py b/fedn/fedn/network/clients/connect.py index 2f8acfa8d..478844d26 100644 --- a/fedn/fedn/network/clients/connect.py +++ b/fedn/fedn/network/clients/connect.py @@ -8,6 +8,8 @@ import requests +from fedn.common.log_config import logger + class Status(enum.Enum): """ Enum for representing the status of a client assignment.""" @@ -63,8 +65,7 @@ def __init__(self, host, port, token, name, remote_package, force_ssl=False, ver self.connect_string = "{}{}".format( self.prefix, self.host) - print("\n\nsetting the connection string to {}\n\n".format( - self.connect_string), flush=True) + logger.info("Setting connection string to {}.".format(self.connect_string)) def assign(self): """ diff --git a/fedn/fedn/network/clients/package.py b/fedn/fedn/network/clients/package.py index d6c91ccba..d56296de8 100644 --- a/fedn/fedn/network/clients/package.py +++ b/fedn/fedn/network/clients/package.py @@ -9,6 +9,7 @@ import requests import yaml +from fedn.common.log_config import logger from fedn.utils.checksum import sha from fedn.utils.dispatcher import Dispatcher @@ -65,7 +66,7 @@ def download(self, host, port, token, force_ssl=False, secure=False, name=None): try: self.pkg_name = params['filename'] except KeyError: - print("No package returned!", flush=True) + logger.error("No package returned.") return None r.raise_for_status() with open(os.path.join(self.pkg_path, self.pkg_name), 'wb') as f: @@ -85,7 +86,7 @@ def download(self, host, port, token, force_ssl=False, secure=False, name=None): try: self.checksum = data['checksum'] except Exception: - print("Could not extract checksum!") + logger.error("Could not extract checksum.") return True @@ -102,7 +103,7 @@ def validate(self, expected_checksum): file_checksum = str(sha(os.path.join(self.pkg_path, self.pkg_name))) if self.checksum == self.expected_checksum == file_checksum: - print("Package validated {}".format(self.checksum)) + logger.info("Package validated {}".format(self.checksum)) return True else: return False @@ -125,7 +126,7 @@ def unpack(self): f = tarfile.open(os.path.join( self.pkg_path, self.pkg_name), 'r:bz2') else: - print( + logger.error( "Failed to unpack compute package, no pkg_name set." "Has the reducer been configured with a compute package?" ) @@ -137,11 +138,10 @@ def unpack(self): if f: f.extractall() - print("Successfully extracted compute package content in {}".format( - self.dir), flush=True) + logger.info("Successfully extracted compute package content in {}".format(self.dir)) return True except Exception: - print("Error extracting files!") + logger.error("Error extracting files.") return False def dispatcher(self, run_path): @@ -165,8 +165,8 @@ def dispatcher(self, run_path): self.dispatch_config = cfg except Exception: - print( - "Error trying to load and unpack dispatcher config - trying default", flush=True) + logger.error( + "Error trying to load and unpack dispatcher config - trying default") dispatcher = Dispatcher(self.dispatch_config, run_path) diff --git a/fedn/fedn/network/combiner/connect.py b/fedn/fedn/network/combiner/connect.py index de705a56c..4c1c94266 100644 --- a/fedn/fedn/network/combiner/connect.py +++ b/fedn/fedn/network/combiner/connect.py @@ -8,6 +8,8 @@ import requests +from fedn.common.log_config import logger + class Status(enum.Enum): """ Enum for representing the status of a combiner announcement.""" @@ -83,8 +85,7 @@ def __init__(self, host, port, myhost, fqdn, myport, token, name, secure=False, self.connect_string = "{}{}".format( self.prefix, self.host) - print("\n\nsetting the connection string to {}\n\n".format( - self.connect_string), flush=True) + logger.info("Setting connection string to {}".format(self.connect_string)) def announce(self): """ diff --git a/fedn/fedn/network/dashboard/restservice.py b/fedn/fedn/network/dashboard/restservice.py index 14e7266bb..808bcb272 100644 --- a/fedn/fedn/network/dashboard/restservice.py +++ b/fedn/fedn/network/dashboard/restservice.py @@ -190,7 +190,7 @@ def check_configured_response(self): { "status": "retry", "package": self.package, - "msg": "Conroller is not in idle state, try again later. ", + "msg": "Controller is not in idle state, try again later. ", } ) return None diff --git a/fedn/fedn/utils/dispatcher.py b/fedn/fedn/utils/dispatcher.py index f4b2ee44a..c7ace0ab5 100644 --- a/fedn/fedn/utils/dispatcher.py +++ b/fedn/fedn/utils/dispatcher.py @@ -1,10 +1,8 @@ import os -import logging +from fedn.common.log_config import logger from fedn.utils.process import run_process -logger = logging.getLogger(__name__) - class Dispatcher: """ Dispatcher class for compute packages. @@ -43,11 +41,9 @@ def run_cmd(self, cmd_type): # add the corresponding process defined in project.yaml and append arguments from invoked command args = shell + [' '.join(cmd + args)] - print("trying to run process {} with args {}".format(cmd, args)) run_process(args=args, cwd=self.project_dir) - logger.info('DONE RUNNING {}'.format(cmd_type)) + logger.info('Done executing {}'.format(cmd_type)) except IndexError: - message = "No such argument or configuration to run!" + message = "No such argument or configuration to run." logger.error(message) - print(message, flush=True) diff --git a/fedn/fedn/utils/logger.py b/fedn/fedn/utils/logger.py deleted file mode 100644 index 563012996..000000000 --- a/fedn/fedn/utils/logger.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging -import os - - -class Logger: - """ Logger class for Fedn. - - :param log_level: The log level. - :type log_level: int - :param to_file: The name of the file to log to. - :type to_file: str - :param file_path: The path to the log file. - :type file_path: str - """ - - def __init__(self, log_level=logging.DEBUG, to_file='', file_path=os.getcwd()): - """ Initialize the logger.""" - root = logging.getLogger() - root.setLevel(log_level) - - # sh = logging.StreamHandler(sys.stdout) - sh = logging.StreamHandler() - sh.setLevel(log_level) - log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - formatter = logging.Formatter(log_format) - sh.setFormatter(formatter) - root.addHandler(sh) - - if to_file != '': - fh = logging.FileHandler(os.path.join( - file_path, '{}'.format(to_file))) - fh.setFormatter(logging.Formatter(log_format)) - root.addHandler(fh) diff --git a/fedn/fedn/utils/process.py b/fedn/fedn/utils/process.py index bd31f9441..95e6eaa63 100644 --- a/fedn/fedn/utils/process.py +++ b/fedn/fedn/utils/process.py @@ -1,7 +1,6 @@ -import logging import subprocess -logger = logging.getLogger() +from fedn.common.log_config import logger def run_process(args, cwd): @@ -25,7 +24,7 @@ def check_io(): while True: output = status.stdout.readline().decode() if output: - logger.log(logging.INFO, output) + logger.info(output) else: break