Skip to content

Commit

Permalink
list combiners pagination fix + get combiners data now post, accepts …
Browse files Browse the repository at this point in the history
…list of combiner names
  • Loading branch information
niklastheman committed Oct 30, 2023
1 parent 371bbfb commit 22b4308
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 61 deletions.
34 changes: 16 additions & 18 deletions fedn/fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,26 @@ def get_active_clients(self, combiner_id):
response = combiner.list_active_clients()
return response

def get_all_combiners(self):
def get_all_combiners(self, limit=None, skip=None):
"""Get all combiners from the statestore.
:return: All combiners as a json response.
:rtype: :class:`flask.Response`
"""
# Will return list of ObjectId
combiner_objects = self.statestore.get_combiners()
payload = {}
for object in combiner_objects:
id = object["name"]
info = {
"address": object["address"],
"fqdn": object["fqdn"],
"parent_reducer": object["parent"]["name"],
"port": object["port"],
"report": object["report"],
"updated_at": object["updated_at"],
response = self.statestore.get_combiners(limit, skip)
arr = []
for element in response["result"]:
obj = {
"name": element["name"],
"updated_at": element["updated_at"],
}
payload[id] = info

return jsonify(payload)
arr.append(obj)

result = {"result": arr, "count": response["count"]}

return jsonify(result)

def get_combiner(self, combiner_id):
"""Get a combiner from the statestore.
Expand Down Expand Up @@ -760,16 +758,16 @@ def get_plot_data(self, feature=None):

return jsonify(result)

def list_combiners_data(self, limit=None, skip=None):
def list_combiners_data(self, combiners):
"""Get combiners data.
"""

response = self.statestore.list_combiners_data(limit, skip)
response = self.statestore.list_combiners_data(combiners)

arr = []

# order list by combiner name
for element in response["result"]:
for element in response:

obj = {
"combiner": element["_id"],
Expand All @@ -778,7 +776,7 @@ def list_combiners_data(self, limit=None, skip=None):

arr.append(obj)

result = {"result": arr, "count": response["count"]}
result = {"result": arr}

return jsonify(result)

Expand Down
19 changes: 13 additions & 6 deletions fedn/fedn/network/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def list_clients():
limit = request.args.get("limit", None)
skip = request.args.get("skip", None)
status = request.args.get("status", None)

return api.get_clients(limit, skip, status)


Expand All @@ -96,7 +96,11 @@ def list_combiners():
return: All combiners as a json object.
rtype: json
"""
return api.get_all_combiners()

limit = request.args.get("limit", None)
skip = request.args.get("skip", None)

return api.get_all_combiners(limit, skip)


@app.route("/get_combiner", methods=["GET"])
Expand Down Expand Up @@ -344,17 +348,20 @@ def add_client():
return response


@app.route("/list_combiners_data", methods=["GET"])
@app.route("/list_combiners_data", methods=["POST"])
def list_combiners_data():
"""Add a client to the network.
return: The response from control.
rtype: json
"""
limit = request.args.get("limit", None)
skip = request.args.get("skip", None)

json_data = request.get_json()

# expects a list of combiner names (strings) in an array
combiners = json_data.get("combiners", None)

try:
response = api.list_combiners_data(limit=limit, skip=skip)
response = api.list_combiners_data(combiners)
except TypeError as e:
return jsonify({"success": False, "message": str(e)}), 400
return response
Expand Down
85 changes: 48 additions & 37 deletions fedn/fedn/network/statestore/mongostatestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,18 +519,36 @@ def get_combiner(self, name):
except Exception:
return None

def get_combiners(self):
def get_combiners(self, limit=None, skip=None, sort_key="updated_at", sort_order=pymongo.DESCENDING):
"""Get all combiners.
:return: list of combiners.
:rtype: list
"""

result = None
count = None

projection = {"name": True, "updated_at": True}

try:
ret = self.combiners.find()
return list(ret)
if limit is not None and skip is not None:
limit = int(limit)
skip = int(skip)
result = self.combiners.find({}, projection).limit(limit).skip(skip).sort(sort_key, sort_order)
else:
result = self.combiners.find({}, projection).sort(sort_key, sort_order)

count = self.combiners.count_documents({})

except Exception:
return None

return {
"result": result,
"count": count,
}

def set_combiner(self, combiner_data):
"""Set combiner in statestore.
Expand Down Expand Up @@ -598,60 +616,53 @@ def list_clients(self, limit=None, skip=None, status=False, sort_key="last_seen"
result = None
count = None

find = {} if status is None else {"status": status}
projection = {"_id": False, "updated_at": False}
try:
find = {} if status is None else {"status": status}
projection = {"_id": False, "updated_at": False}

if limit is not None and skip is not None:
limit = int(limit)
skip = int(skip)
result = self.clients.find(find, projection).limit(limit).skip(skip).sort(sort_key, sort_order)
else:
result = self.clients.find(find, projection).sort(sort_key, sort_order)
if limit is not None and skip is not None:
limit = int(limit)
skip = int(skip)
result = self.clients.find(find, projection).limit(limit).skip(skip).sort(sort_key, sort_order)
else:
result = self.clients.find(find, projection).sort(sort_key, sort_order)

count = self.clients.count_documents(find)

count = self.clients.count_documents(find)
except Exception as e:
print("ERROR: {}".format(e), flush=True)

return {
"result": result,
"count": count,
}

def list_combiners_data(self, limit=None, skip=None, sort_key="count", sort_order=pymongo.DESCENDING):
def list_combiners_data(self, combiners, sort_key="count", sort_order=pymongo.DESCENDING):
"""List all combiner data.
:return: list of combiner data.
:rtype: list(ObjectId)
"""

result = None
count = None

pipeline = [
{"$group": {"_id": "$combiner", "count": {"$sum": 1}}},
{"$sort": {sort_key: sort_order, "_id": pymongo.ASCENDING}}
]

if limit is not None and skip is not None:
limit = int(limit)
skip = int(skip)

pipeline.append({"$skip": skip})
pipeline.append({"$limit": limit})

result = self.clients.aggregate(pipeline)
try:

pipeline_count = [
{"$group": {"_id": "$combiner"}},
{"$group": {"_id": None, "count": {"$sum": 1}}}
]
pipeline = [
{"$match": {"combiner": {"$in": combiners}}},
{"$group": {"_id": "$combiner", "count": {"$sum": 1}}},
{"$sort": {sort_key: sort_order, "_id": pymongo.ASCENDING}}
] if combiners is not None else [
{"$group": {"_id": "$combiner", "count": {"$sum": 1}}},
{"$sort": {sort_key: sort_order, "_id": pymongo.ASCENDING}}
]

result_count = list(self.clients.aggregate(pipeline_count))
result = self.clients.aggregate(pipeline)

count = result_count[0]['count']
except Exception as e:
print("ERROR: {}".format(e), flush=True)

return {
"result": result,
"count": count,
}
return result

def update_client_status(self, client_data, status, role):
"""Set or update client status.
Expand Down

0 comments on commit 22b4308

Please sign in to comment.