Skip to content

Commit

Permalink
clients: log when clients i online and not + api list clients sorted …
Browse files Browse the repository at this point in the history
…and by status
  • Loading branch information
niklastheman committed Oct 27, 2023
1 parent 8212430 commit b4c21df
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 21 deletions.
7 changes: 3 additions & 4 deletions fedn/fedn/common/tracer/mongotracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,11 @@ def set_round_data(self, round_data):
'$push': {'reducer': round_data}}, True)


def update_client_timestamp(self, client_name):
def update_client_status(self, client_name, status):
datetime_now = datetime.now()

filter_query = {"name": client_name} # Replace with the desired name

# Define the update operation
update_query = {"$set": {"last_seen": datetime_now}} # Replace with the property and value to update
update_query = {"$set": {"last_seen": datetime_now, "status": status}} # Replace with the property and value to update

self.clients.update_one(filter_query, update_query)
8 changes: 4 additions & 4 deletions fedn/fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ def _allowed_file_extension(
and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
)

def get_clients(self, limit=None, skip=None, active_only=False):
def get_clients(self, limit=None, skip=None, status=False):
"""Get all clients from the statestore.
:return: All clients as a json response.
:rtype: :class:`flask.Response`
"""
# Will return list of ObjectId
response = self.statestore.list_clients(limit, skip, active_only)
response = self.statestore.list_clients(limit, skip, status)

arr = []

Expand All @@ -80,13 +80,13 @@ def get_clients(self, limit=None, skip=None, active_only=False):
"combiner": element["combiner"],
"combiner_preferred": element["combiner_preferred"],
"ip": element["ip"],
"updated_at": element["updated_at"],
"status": element["status"],
"last_seen": element["last_seen"],
}

arr.append(obj)

result = {"result": , "count": response["count"]}
result = {"result": arr, "count": response["count"]}

return jsonify(result)

Expand Down
12 changes: 4 additions & 8 deletions fedn/fedn/network/api/server.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from flask import Flask, jsonify, request

from fedn.common.config import (
get_controller_config,
get_modelstorage_config,
get_network_config,
get_statestore_config,
)
from fedn.common.config import (get_controller_config, get_modelstorage_config,
get_network_config, get_statestore_config)
from fedn.network.api.interface import API
from fedn.network.controller.control import Control
from fedn.network.statestore.mongostatestore import MongoStateStore
Expand Down Expand Up @@ -72,9 +68,9 @@ def list_clients():

limit = request.args.get("limit", None)
skip = request.args.get("skip", None)
active_only = request.args.get("active_only", None)
status = request.args.get("status", None)

return api.get_clients(limit, skip, active_only)
return api.get_clients(limit, skip, status)


@app.route("/get_active_clients", methods=["GET"])
Expand Down
7 changes: 6 additions & 1 deletion fedn/fedn/network/combiner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ def __init__(self, config):
break
if status == Status.UnAuthorized:
print(response, flush=True)
print("Status.UnAuthorized", flush=True)
sys.exit("Exiting: Unauthorized")
if status == Status.UnMatchedConfig:
print(response, flush=True)
print("Status.UnMatchedConfig", flush=True)
sys.exit("Exiting: Missing config")

cert = announce_config['certificate']
Expand Down Expand Up @@ -712,14 +714,17 @@ def ModelUpdateRequestStream(self, response, context):

self._send_status(status)

self.tracer.update_client_timestamp(client.name)
self.tracer.update_client_status(client.name, "online")

while context.is_active():
try:
yield q.get(timeout=1.0)
except queue.Empty:
pass

self.tracer.update_client_status(client.name, "offline")


def ModelValidationStream(self, update, context):
""" Model validation stream RPC endpoint. Update status for client is connecting to stream.
Expand Down
11 changes: 7 additions & 4 deletions fedn/fedn/network/statestore/mongostatestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ def get_client(self, name):
except Exception:
return None

def list_clients(self, limit=None, skip=None, active_only=False):
def list_clients(self, limit=None, skip=None, status=False, sort_key="last_seen", sort_order=pymongo.DESCENDING):
"""List all clients registered on the network.
:return: list of clients.
Expand All @@ -594,14 +594,17 @@ def list_clients(self, limit=None, skip=None, active_only=False):
result = None
count = None

find = {} if status is None else {"status": status}
projection = {"_id": False, "updated_at": False}

if limit is not None and skip is not None:
limit = int(limit)
skip = int(skip)
result = self.clients.find().limit(limit).skip(skip)
result = self.clients.find(find, projection).limit(limit).skip(skip).sort(sort_key, sort_order)
else:
result = self.clients.find()
result = self.clients.find(find, projection).sort(sort_key, sort_order)

count = self.clients.count_documents({})
count = self.clients.count_documents(find)

return {
"result": result,
Expand Down

0 comments on commit b4c21df

Please sign in to comment.