diff --git a/fedn/network/api/v1/client_routes.py b/fedn/network/api/v1/client_routes.py index d8cc632fd..fb268905b 100644 --- a/fedn/network/api/v1/client_routes.py +++ b/fedn/network/api/v1/client_routes.py @@ -1,13 +1,11 @@ from flask import Blueprint, jsonify, request from fedn.network.api.auth import jwt_auth_required -from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb -from fedn.network.storage.statestore.stores.client_store import ClientStore +from fedn.network.api.v1.shared import api_version, client_store, get_post_data_to_kwargs, get_typed_list_headers from fedn.network.storage.statestore.stores.shared import EntityNotFound bp = Blueprint("client", __name__, url_prefix=f"/api/{api_version}/clients") -client_store = ClientStore(mdb, "network.clients") @bp.route("/", methods=["GET"]) diff --git a/fedn/network/api/v1/combiner_routes.py b/fedn/network/api/v1/combiner_routes.py index b17898a5c..02617b7bb 100644 --- a/fedn/network/api/v1/combiner_routes.py +++ b/fedn/network/api/v1/combiner_routes.py @@ -1,7 +1,7 @@ from flask import Blueprint, jsonify, request from fedn.network.api.auth import jwt_auth_required -from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb +from fedn.network.api.v1.shared import api_version, client_store, get_post_data_to_kwargs, get_typed_list_headers, mdb from fedn.network.storage.statestore.stores.combiner_store import CombinerStore from fedn.network.storage.statestore.stores.shared import EntityNotFound @@ -381,3 +381,50 @@ def delete_combiner(id: str): return jsonify({"message": f"Entity with id: {id} not found"}), 404 except Exception: return jsonify({"message": "An unexpected error occurred"}), 500 + + +@bp.route("/clients/count", methods=["POST"]) +@jwt_auth_required(role="admin") +def number_of_clients_connected(): + """Number of clients connected + Retrieves the number of clients connected to the combiner. + --- + tags: + - Combiners + parameters: + - name: combiners + in: body + required: true + type: object + description: Object containing the ids of the combiners + schema: + type: object + properties: + combiners: + type: string + responses: + 200: + description: A list of objects containing the number of clients connected to each combiner + schema: + type: Array + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + data = request.get_json() + combiners = data.get("combiners", "") + combiners = combiners.split(",") if combiners else [] + response = client_store.connected_client_count(combiners) + + result = { + "result": response + } + + return jsonify(result), 200 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/shared.py b/fedn/network/api/v1/shared.py index a27a6f637..0fda39c45 100644 --- a/fedn/network/api/v1/shared.py +++ b/fedn/network/api/v1/shared.py @@ -3,13 +3,15 @@ import pymongo from pymongo.database import Database -from fedn.network.api.shared import statestore_config,network_id +from fedn.network.api.shared import network_id, statestore_config +from fedn.network.storage.statestore.stores.client_store import ClientStore api_version = "v1" mc = pymongo.MongoClient(**statestore_config["mongo_config"]) mc.server_info() mdb: Database = mc[network_id] +client_store = ClientStore(mdb, "network.clients") def is_positive_integer(s): return s is not None and s.isdigit() and int(s) > 0 diff --git a/fedn/network/storage/statestore/stores/client_store.py b/fedn/network/storage/statestore/stores/client_store.py index 51f77023c..c3c2e5225 100644 --- a/fedn/network/storage/statestore/stores/client_store.py +++ b/fedn/network/storage/statestore/stores/client_store.py @@ -93,3 +93,36 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI def count(self, **kwargs) -> int: return super().count(**kwargs) + + def connected_client_count(self, combiners): + """Count the number of connected clients for each combiner. + + :param combiners: list of combiners to get data for. + :type combiners: list + :param sort_key: The key to sort by. + :type sort_key: str + :param sort_order: The sort order. + :type sort_order: pymongo.ASCENDING or pymongo.DESCENDING + :return: list of combiner data. + :rtype: list(ObjectId) + """ + try: + pipeline = ( + [ + {"$match": {"combiner": {"$in": combiners}, "status": "online"}}, + {"$group": {"_id": "$combiner", "count": {"$sum": 1}}}, + {"$project": {"id": "$_id", "count": 1, "_id": 0}} + ] + if len(combiners) > 0 + else [ + {"$match": { "status": "online"}}, + {"$group": {"_id": "$combiner", "count": {"$sum": 1}}}, + {"$project": {"id": "$_id", "count": 1, "_id": 0}} + ] + ) + + result = list(self.database[self.collection].aggregate(pipeline)) + except Exception: + result = {} + + return result