From ee4d74a2475a819be07c99de181c80805cb8641d Mon Sep 17 00:00:00 2001 From: Stefan Hellander Date: Tue, 5 Dec 2023 16:28:04 +0100 Subject: [PATCH] Removed telemetry stuff from this branch --- fedn/cli/run_cmd.py | 6 +- fedn/fedn/common/log_config.py | 92 ----------------------------- fedn/fedn/network/clients/client.py | 70 +++++----------------- 3 files changed, 17 insertions(+), 151 deletions(-) diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index 5f9d93a05..32df0537a 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -108,11 +108,10 @@ def run_cmd(ctx): @click.option('--heartbeat-interval', required=False, default=2) @click.option('--reconnect-after-missed-heartbeat', required=False, default=30) @click.option('--verbosity', required=False, default='INFO', type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], case_sensitive=False)) -@click.option('--telemetry', required=False, default=False) @click.pass_context def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_package, force_ssl, dry_run, secure, preshared_cert, verify, preferred_combiner, validator, trainer, init, logfile, heartbeat_interval, reconnect_after_missed_heartbeat, - verbosity, telemetry): + verbosity): """ :param ctx: @@ -132,7 +131,6 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa :param hearbeat_interval :param reconnect_after_missed_heartbeat :param verbosity - :param telemetry :return: """ remote = False if local_package else True @@ -145,7 +143,7 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa 'client_id': client_id, 'remote_compute_context': remote, 'force_ssl': force_ssl, 'dry_run': dry_run, 'secure': secure, 'preshared_cert': preshared_cert, 'verify': verify, 'preferred_combiner': preferred_combiner, 'validator': validator, 'trainer': trainer, 'init': init, 'logfile': logfile, 'heartbeat_interval': heartbeat_interval, - 'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat, 'verbosity': verbosity, 'telemetry': telemetry} + 'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat, 'verbosity': verbosity} if init: apply_config(config) diff --git a/fedn/fedn/common/log_config.py b/fedn/fedn/common/log_config.py index a9c0f24e9..47579cb82 100644 --- a/fedn/fedn/common/log_config.py +++ b/fedn/fedn/common/log_config.py @@ -1,60 +1,8 @@ import logging import logging.config -from functools import wraps import urllib3 -try: - import os - import platform - import socket - - import psutil - from opentelemetry import trace - from opentelemetry.exporter.jaeger.thrift import JaegerExporter - from opentelemetry.sdk.resources import Resource - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import BatchSpanProcessor - from opentelemetry.semconv.resource import ResourceAttributes - - telemetry_enabled = True -except ImportError: - telemetry_enabled = False - - -def get_system_info(): - system_info = [ - ["os.name", os.name], - ["platform.system", platform.system()], - ["platform.release", platform.release()], - ["hostname", socket.gethostname()], - ["ip_address", socket.gethostbyname(socket.gethostname())], - ["cpu_count", psutil.cpu_count(logical=True)], - ["total_memory", psutil.virtual_memory().total], - ["total_disk", psutil.disk_usage('/').total], - ] - return system_info - - -# Configure the tracer to export traces to Jaeger -resource = Resource.create({ResourceAttributes.SERVICE_NAME: "FEDn Client"}) -tracer_provider = TracerProvider(resource=resource) -trace.set_tracer_provider(tracer_provider) - -# Create a JaegerExporter -jaeger_exporter = JaegerExporter( - agent_host_name='localhost', - agent_port=6831, -) - -# Add the Jaeger exporter to the tracer provider -tracer_provider.add_span_processor( - BatchSpanProcessor(jaeger_exporter) -) - -tracer = None - - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logging.getLogger("urllib3").setLevel(logging.ERROR) @@ -66,46 +14,6 @@ def get_system_info(): handler.setFormatter(formatter) -def add_trace(name=""): - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - self = args[0] - name = func.__name__ - if tracer: - - with tracer.start_as_current_span(name) as span: - # print("name={}....{}".format(name, attributes)) - if self.trace_attribs: - for attrib in self.trace_attribs: - span.set_attribute(attrib[0], attrib[1]) - # system_attribs = get_system_info() - # print(system_attribs) - # for attrib in system_attribs: - # span.set_attribute(attrib[0], attrib[1]) - return func(*args, **kwargs) - else: - return func(*args, **kwargs) - return wrapper - return decorator - - -def get_tracer(): - global tracer - return tracer - - -def enable_tracing(): - global tracer - tracer = trace.get_tracer(__name__) - - -def log_remote(server='localhost:8000', path='/log'): - http_handler = logging.handlers.HTTPHandler(server, '/log', method='POST') - http_handler.setLevel(logging.WARNING) - logger.addHandler(http_handler) - - def set_log_level_from_string(level_str): """ Set the log level based on a string input. diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 903158867..e0836d3aa 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -2,6 +2,7 @@ import io import json import os +import platform import queue import re import socket @@ -14,16 +15,17 @@ from distutils.dir_util import copy_tree from io import BytesIO +import GPUtil import grpc +import psutil from cryptography.hazmat.primitives.serialization import Encoding from google.protobuf.json_format import MessageToJson from OpenSSL import SSL import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc -from fedn.common.log_config import (add_trace, enable_tracing, get_tracer, - log_remote, logger, - set_log_level_from_string, set_log_stream) +from fedn.common.log_config import (logger, set_log_level_from_string, + set_log_stream) from fedn.network.clients.connect import ConnectorClient, Status from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString @@ -33,13 +35,6 @@ CHUNK_SIZE = 1024 * 1024 VALID_NAME_REGEX = '^[a-zA-Z0-9_-]*$' -import os -import platform -import socket - -import GPUtil -import psutil - def get_system_info(): gpus = GPUtil.getGPUs() @@ -58,6 +53,7 @@ def get_system_info(): } return system_info, gpu_info + class GrpcAuth(grpc.AuthMetadataPlugin): def __init__(self, key): self._key = key @@ -65,6 +61,7 @@ def __init__(self, key): def __call__(self, context, callback): callback((('authorization', f'Token {self._key}'),), None) + class Client: """FEDn Client. Service running on client/datanodes in a federation, recieving and handling model update and model validation requests. @@ -85,31 +82,15 @@ def __init__(self, config): set_log_level_from_string(config.get('verbosity', "INFO")) set_log_stream(config.get('logfile', None)) - if config.get('telemetry', False): - log_remote() - enable_tracing() - proj = config['discover_host'].split('/')[1] - self.trace_attribs = [["project", proj], ["client_name", config["name"]]] - system_info, gpu_info = get_system_info() - print(system_info) - with get_tracer().start_as_current_span("TelemetryInit") as span: - for key, value in system_info.items(): - span.set_attribute(key, value) - print(gpu_info) - for attrib in gpu_info: - span.set_attribute(attrib[0], attrib[1]) - for attrib in self.trace_attribs: - span.set_attribute(attrib[0], attrib[1]) - self.connector = ConnectorClient(host=config['discover_host'], - port=config['discover_port'], - token=config['token'], - name=config['name'], - remote_package=config['remote_compute_context'], - force_ssl=config['force_ssl'], - verify=config['verify'], - combiner=config['preferred_combiner'], - id=config['client_id']) + port=config['discover_port'], + token=config['token'], + name=config['name'], + remote_package=config['remote_compute_context'], + force_ssl=config['force_ssl'], + verify=config['verify'], + combiner=config['preferred_combiner'], + id=config['client_id']) # Validate client name match = re.search(VALID_NAME_REGEX, config['name']) @@ -140,7 +121,6 @@ def __init__(self, config): self.state = ClientState.idle - @add_trace() def _assign(self): """Contacts the controller and asks for combiner assignment. @@ -170,7 +150,6 @@ def _assign(self): logger.info("Received combiner configuration: {}".format(client_config)) return client_config - @add_trace() def _add_grpc_metadata(self, key, value): """Add metadata for gRPC calls. @@ -193,7 +172,6 @@ def _add_grpc_metadata(self, key, value): # Set metadata using tuple concatenation self.metadata += ((key, value),) - @add_trace() def _get_ssl_certificate(self, domain, port=443): context = SSL.Context(SSL.SSLv23_METHOD) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -208,7 +186,6 @@ def _get_ssl_certificate(self, domain, port=443): cert = cert.to_cryptography().public_bytes(Encoding.PEM).decode() return cert - @add_trace() def _connect(self, client_config): """Connect to assigned combiner. @@ -276,12 +253,10 @@ def _connect(self, client_config): logger.info("Using {} compute package.".format( client_config["package"])) - @add_trace() def _disconnect(self): """Disconnect from the combiner.""" self.channel.close() - @add_trace() def _detach(self): """Detach from the FEDn network (disconnect from combiner)""" # Setting _attached to False will make all processing threads return @@ -292,7 +267,6 @@ def _detach(self): # Close gRPC connection to combiner self._disconnect() - @add_trace() def _attach(self): """Attach to the FEDn network (connect to combiner)""" # Ask controller for a combiner and connect to that combiner. @@ -307,7 +281,6 @@ def _attach(self): self._attached = True return client_config - @add_trace() def _initialize_helper(self, client_config): """Initialize the helper class for the client. @@ -321,7 +294,6 @@ def _initialize_helper(self, client_config): if 'helper_type' in client_config.keys(): self.helper = get_helper(client_config['helper_type']) - @add_trace() def _subscribe_to_combiner(self, config): """Listen to combiner message stream and start all processing threads. @@ -346,7 +318,6 @@ def _subscribe_to_combiner(self, config): # Start processing the client message inbox threading.Thread(target=self.process_request, daemon=True).start() - @add_trace() def _initialize_dispatcher(self, config): """ Initialize the dispatcher for the client. @@ -408,7 +379,6 @@ def _initialize_dispatcher(self, config): copy_tree(from_path, self.run_path) self.dispatcher = Dispatcher(dispatch_config, self.run_path) - @add_trace() def get_model(self, id): """Fetch a model from the assigned combiner. Downloads the model update object via a gRPC streaming channel. @@ -433,7 +403,6 @@ def get_model(self, id): return data - @add_trace() def set_model(self, model, id): """Send a model update to the assigned combiner. Uploads the model updated object via a gRPC streaming channel, Upload. @@ -480,7 +449,6 @@ def upload_request_generator(mdl): return result - @add_trace() def _listen_to_model_update_request_stream(self): """Subscribe to the model update request stream. @@ -518,7 +486,6 @@ def _listen_to_model_update_request_stream(self): if not self._attached: return - @add_trace() def _listen_to_model_validation_request_stream(self): """Subscribe to the model validation request stream. @@ -548,7 +515,6 @@ def _listen_to_model_validation_request_stream(self): if not self._attached: return - @add_trace() def _process_training_request(self, model_id): """Process a training (model update) request. @@ -614,7 +580,6 @@ def _process_training_request(self, model_id): return updated_model_id, meta - @add_trace() def _process_validation_request(self, model_id, is_inference): """Process a validation request. @@ -659,7 +624,6 @@ def _process_validation_request(self, model_id, is_inference): self.state = ClientState.idle return validation - @add_trace() def process_request(self): """Process training and validation tasks. """ while True: @@ -740,14 +704,12 @@ def process_request(self): except queue.Empty: pass - @add_trace() def _handle_combiner_failure(self): """ Register failed combiner connection.""" self._missed_heartbeat += 1 if self._missed_heartbeat > self.config['reconnect_after_missed_heartbeat']: self._detach() - @add_trace() def _send_heartbeat(self, update_frequency=2.0): """Send a heartbeat to the combiner. @@ -773,7 +735,6 @@ def _send_heartbeat(self, update_frequency=2.0): if not self._attached: return - @add_trace() def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None): """Send status message. @@ -803,7 +764,6 @@ def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None) status.status)) _ = self.connectorStub.SendStatus(status, metadata=self.metadata) - @add_trace() def run(self): """ Run the client. """ try: