From a70748116af3fbb0d1179598fbafbfe09c0af262 Mon Sep 17 00:00:00 2001 From: Stefan Hellander Date: Mon, 8 Jul 2024 11:44:49 +0200 Subject: [PATCH] Controller monitors combiners --- fedn/network/api/interface.py | 15 ++++++- fedn/network/api/network.py | 7 ++- fedn/network/api/server.py | 62 ++++++++++++++++++++++++++ fedn/network/api/v1/combiner_routes.py | 3 +- fedn/network/combiner/combiner.py | 24 +++++++--- fedn/network/combiner/interfaces.py | 4 +- 6 files changed, 104 insertions(+), 11 deletions(-) diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 9936e0bc0..9a1ff3447 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -10,7 +10,8 @@ from fedn.common.config import get_controller_config, get_network_config from fedn.common.log_config import logger -from fedn.network.combiner.interfaces import CombinerInterface, CombinerUnavailableError +from fedn.network.combiner.interfaces import (CombinerInterface, + CombinerUnavailableError) from fedn.network.state import ReducerState, ReducerStateToString from fedn.utils.checksum import sha @@ -83,13 +84,15 @@ def get_all_combiners(self, limit=None, skip=None): :rtype: :class:`flask.Response` """ # Will return list of ObjectId - projection = {"name": True, "updated_at": True} + projection = {"name": True, "updated_at": True, "status": True} response = self.statestore.get_combiners(limit, skip, projection=projection) arr = [] for element in response["result"]: + print(element, flush=True) obj = { "name": element["name"], "updated_at": element["updated_at"], + "status": element["status"] } arr.append(obj) @@ -553,6 +556,14 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por return jsonify(payload) + def update_combiner(self, combiner): + try: + self.statestore.set_combiner(combiner) + except Exception as e: + logger.error(e) + return jsonify({"success": False, "message": "Combiner not updated."}) + return jsonify({"success": True, "message": "Combiner updated."}) + def add_client(self, client_id, preferred_combiner, remote_addr, name): """Add a client to the network. diff --git a/fedn/network/api/network.py b/fedn/network/api/network.py index 5e2f2ef91..da9d7d7af 100644 --- a/fedn/network/api/network.py +++ b/fedn/network/api/network.py @@ -54,7 +54,12 @@ def get_combiners(self): cert = None key = None - combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, key=key, ip=c["ip"])) + if c["status"]: + status = c["status"] + else: + status = "offline" + + combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, key=key, ip=c["ip"], status=status)) return combiners diff --git a/fedn/network/api/server.py b/fedn/network/api/server.py index c9e54ff87..d32105dc3 100644 --- a/fedn/network/api/server.py +++ b/fedn/network/api/server.py @@ -1,6 +1,10 @@ import os +import threading +import time +import grpc from flask import Flask, jsonify, request +from grpc_health.v1 import health_pb2, health_pb2_grpc from fedn.common.config import get_controller_config from fedn.network.api.auth import jwt_auth_required @@ -625,8 +629,66 @@ def list_combiners_data(): if custom_url_prefix: app.add_url_rule(f"{custom_url_prefix}/list_combiners_data", view_func=list_combiners_data, methods=["POST"]) + +def check_health(host, fqdn, port): + # Server address and port + server_address = f"{fqdn}:{port}" + + # Create the gRPC channel + channel = grpc.secure_channel(server_address, grpc.ssl_channel_credentials()) + + # Add the metadata + metadata = [('grpc-server', host)] + + # Create the health check stub + stub = health_pb2_grpc.HealthStub(channel) + + # Create the health check request + request = health_pb2.HealthCheckRequest(service='') + + try: + # Make the health check request + response = stub.Check(request, metadata=metadata) + return response.status + except grpc.RpcError: + print('Health check failed.') + + return 0 + + +def monitor_combiners(): + while True: + with app.app_context(): + try: + combiners = control.network.get_combiners() + for combiner in combiners: + host = combiner.address + fqdn = combiner.fqdn + try: + status = combiner.status + except: + status = "offline" + port = '443' + if check_health(host, fqdn, port) == 1: + combiner_status = "online" + else: + combiner_status = "offline" + if combiner_status != status: + combiner.status = combiner_status + api.update_combiner(combiner.to_dict()) + + except Exception as e: + print(f"Error while monitoring combiners: {e}", flush=True) + + time.sleep(5) + + if __name__ == "__main__": config = get_controller_config() port = config["port"] debug = config["debug"] + # Start thread that monitors combiner status + monitoring_thread = threading.Thread(target=monitor_combiners, daemon=True) + monitoring_thread.start() + app.run(debug=debug, port=port, host="0.0.0.0") diff --git a/fedn/network/api/v1/combiner_routes.py b/fedn/network/api/v1/combiner_routes.py index 9210a7e30..538117cfe 100644 --- a/fedn/network/api/v1/combiner_routes.py +++ b/fedn/network/api/v1/combiner_routes.py @@ -1,7 +1,8 @@ from flask import Blueprint, jsonify, request from fedn.network.api.auth import jwt_auth_required -from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb +from fedn.network.api.v1.shared import (api_version, get_post_data_to_kwargs, + get_typed_list_headers, mdb) from fedn.network.storage.statestore.stores.combiner_store import CombinerStore from fedn.network.storage.statestore.stores.shared import EntityNotFound diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 7f7b93548..cdc78e27b 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -12,7 +12,8 @@ import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc -from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream +from fedn.common.log_config import (logger, set_log_level_from_string, + set_log_stream) from fedn.network.combiner.connect import ConnectorCombiner, Status from fedn.network.combiner.modelservice import ModelService from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler @@ -701,9 +702,20 @@ def SendModelValidation(self, request, context): def run(self): """Start the server.""" logger.info("COMBINER: {} started, ready for gRPC requests.".format(self.id)) - try: - while True: - signal.pause() - except (KeyboardInterrupt, SystemExit): - pass + + def handle_signal(signum, frame): + logger.warning(f"Received signal {signum}, shutting down gracefully.") + self.server.stop(0) # Add a timeout if necessary + logger.warning("Server stopped.") + exit(0) + + signal.signal(signal.SIGINT, handle_signal) # Handle Ctrl-C + signal.signal(signal.SIGTERM, handle_signal) # Handle Docker stop + + # try: + logger.warning("STARTING UP") + while True: + signal.pause() + # except (KeyboardInterrupt, SystemExit): + # pass self.server.stop() diff --git a/fedn/network/combiner/interfaces.py b/fedn/network/combiner/interfaces.py index 935b75442..1ed16aac9 100644 --- a/fedn/network/combiner/interfaces.py +++ b/fedn/network/combiner/interfaces.py @@ -81,7 +81,7 @@ class CombinerInterface: :type config: dict """ - def __init__(self, parent, name, address, fqdn, port, certificate=None, key=None, ip=None, config=None): + def __init__(self, parent, name, address, fqdn, port, certificate=None, key=None, ip=None, config=None, status="offline"): """Initialize the combiner interface.""" self.parent = parent self.name = name @@ -91,6 +91,7 @@ def __init__(self, parent, name, address, fqdn, port, certificate=None, key=None self.certificate = certificate self.key = key self.ip = ip + self.status = status if not config: self.config = {"max_clients": 8} @@ -124,6 +125,7 @@ def to_dict(self): "certificate": None, "key": None, "config": self.config, + "status": self.status } if self.certificate: