Skip to content

Commit

Permalink
Controller monitors combiners
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanhellander committed Jul 8, 2024
1 parent 6e75dfa commit a707481
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 11 deletions.
15 changes: 13 additions & 2 deletions fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

from fedn.common.config import get_controller_config, get_network_config
from fedn.common.log_config import logger
from fedn.network.combiner.interfaces import CombinerInterface, CombinerUnavailableError
from fedn.network.combiner.interfaces import (CombinerInterface,
CombinerUnavailableError)
from fedn.network.state import ReducerState, ReducerStateToString
from fedn.utils.checksum import sha

Expand Down Expand Up @@ -83,13 +84,15 @@ def get_all_combiners(self, limit=None, skip=None):
:rtype: :class:`flask.Response`
"""
# Will return list of ObjectId
projection = {"name": True, "updated_at": True}
projection = {"name": True, "updated_at": True, "status": True}
response = self.statestore.get_combiners(limit, skip, projection=projection)
arr = []
for element in response["result"]:
print(element, flush=True)
obj = {
"name": element["name"],
"updated_at": element["updated_at"],
"status": element["status"]
}

arr.append(obj)
Expand Down Expand Up @@ -553,6 +556,14 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por

return jsonify(payload)

def update_combiner(self, combiner):
try:
self.statestore.set_combiner(combiner)
except Exception as e:
logger.error(e)
return jsonify({"success": False, "message": "Combiner not updated."})
return jsonify({"success": True, "message": "Combiner updated."})

def add_client(self, client_id, preferred_combiner, remote_addr, name):
"""Add a client to the network.
Expand Down
7 changes: 6 additions & 1 deletion fedn/network/api/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ def get_combiners(self):
cert = None
key = None

combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, key=key, ip=c["ip"]))
if c["status"]:
status = c["status"]
else:
status = "offline"

combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, key=key, ip=c["ip"], status=status))

return combiners

Expand Down
62 changes: 62 additions & 0 deletions fedn/network/api/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
import threading
import time

import grpc
from flask import Flask, jsonify, request
from grpc_health.v1 import health_pb2, health_pb2_grpc

from fedn.common.config import get_controller_config
from fedn.network.api.auth import jwt_auth_required
Expand Down Expand Up @@ -625,8 +629,66 @@ def list_combiners_data():
if custom_url_prefix:
app.add_url_rule(f"{custom_url_prefix}/list_combiners_data", view_func=list_combiners_data, methods=["POST"])


def check_health(host, fqdn, port):
# Server address and port
server_address = f"{fqdn}:{port}"

# Create the gRPC channel
channel = grpc.secure_channel(server_address, grpc.ssl_channel_credentials())

# Add the metadata
metadata = [('grpc-server', host)]

# Create the health check stub
stub = health_pb2_grpc.HealthStub(channel)

# Create the health check request
request = health_pb2.HealthCheckRequest(service='')

try:
# Make the health check request
response = stub.Check(request, metadata=metadata)
return response.status
except grpc.RpcError:
print('Health check failed.')

return 0


def monitor_combiners():
while True:
with app.app_context():
try:
combiners = control.network.get_combiners()
for combiner in combiners:
host = combiner.address
fqdn = combiner.fqdn
try:
status = combiner.status
except:
status = "offline"
port = '443'
if check_health(host, fqdn, port) == 1:
combiner_status = "online"
else:
combiner_status = "offline"
if combiner_status != status:
combiner.status = combiner_status
api.update_combiner(combiner.to_dict())

except Exception as e:
print(f"Error while monitoring combiners: {e}", flush=True)

time.sleep(5)


if __name__ == "__main__":
config = get_controller_config()
port = config["port"]
debug = config["debug"]
# Start thread that monitors combiner status
monitoring_thread = threading.Thread(target=monitor_combiners, daemon=True)
monitoring_thread.start()

app.run(debug=debug, port=port, host="0.0.0.0")
3 changes: 2 additions & 1 deletion fedn/network/api/v1/combiner_routes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from flask import Blueprint, jsonify, request

from fedn.network.api.auth import jwt_auth_required
from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb
from fedn.network.api.v1.shared import (api_version, get_post_data_to_kwargs,
get_typed_list_headers, mdb)
from fedn.network.storage.statestore.stores.combiner_store import CombinerStore
from fedn.network.storage.statestore.stores.shared import EntityNotFound

Expand Down
24 changes: 18 additions & 6 deletions fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

import fedn.network.grpc.fedn_pb2 as fedn
import fedn.network.grpc.fedn_pb2_grpc as rpc
from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream
from fedn.common.log_config import (logger, set_log_level_from_string,
set_log_stream)
from fedn.network.combiner.connect import ConnectorCombiner, Status
from fedn.network.combiner.modelservice import ModelService
from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler
Expand Down Expand Up @@ -701,9 +702,20 @@ def SendModelValidation(self, request, context):
def run(self):
"""Start the server."""
logger.info("COMBINER: {} started, ready for gRPC requests.".format(self.id))
try:
while True:
signal.pause()
except (KeyboardInterrupt, SystemExit):
pass

def handle_signal(signum, frame):
logger.warning(f"Received signal {signum}, shutting down gracefully.")
self.server.stop(0) # Add a timeout if necessary
logger.warning("Server stopped.")
exit(0)

signal.signal(signal.SIGINT, handle_signal) # Handle Ctrl-C
signal.signal(signal.SIGTERM, handle_signal) # Handle Docker stop

# try:
logger.warning("STARTING UP")
while True:
signal.pause()
# except (KeyboardInterrupt, SystemExit):
# pass
self.server.stop()
4 changes: 3 additions & 1 deletion fedn/network/combiner/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class CombinerInterface:
:type config: dict
"""

def __init__(self, parent, name, address, fqdn, port, certificate=None, key=None, ip=None, config=None):
def __init__(self, parent, name, address, fqdn, port, certificate=None, key=None, ip=None, config=None, status="offline"):
"""Initialize the combiner interface."""
self.parent = parent
self.name = name
Expand All @@ -91,6 +91,7 @@ def __init__(self, parent, name, address, fqdn, port, certificate=None, key=None
self.certificate = certificate
self.key = key
self.ip = ip
self.status = status

if not config:
self.config = {"max_clients": 8}
Expand Down Expand Up @@ -124,6 +125,7 @@ def to_dict(self):
"certificate": None,
"key": None,
"config": self.config,
"status": self.status
}

if self.certificate:
Expand Down

0 comments on commit a707481

Please sign in to comment.