Skip to content

Commit

Permalink
Merged with develop
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanhellander committed Nov 13, 2023
2 parents cddf888 + ae06b84 commit 3965989
Show file tree
Hide file tree
Showing 15 changed files with 1,916 additions and 1,026 deletions.
38 changes: 24 additions & 14 deletions .ci/tests/examples/wait_for.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def _retry(try_func, **func_args):
for _ in range(RETRIES):
is_success = try_func(**func_args)
if is_success:
_eprint('Sucess.')
_eprint('Success.')
return True
_eprint(f'Sleeping for {SLEEP}.')
sleep(SLEEP)
Expand All @@ -30,28 +30,38 @@ def _test_rounds(n_rounds):
client = pymongo.MongoClient(
"mongodb://fedn_admin:password@localhost:6534")
collection = client['fedn-network']['control']['rounds']
query = {'reducer.status': 'Success'}
query = {'status': 'Finished'}
n = collection.count_documents(query)
client.close()
_eprint(f'Succeded rounds: {n}.')
return n == n_rounds


def _test_nodes(n_nodes, node_type, reducer_host='localhost', reducer_port='8090'):
def _test_nodes(n_nodes, node_type, reducer_host='localhost', reducer_port='8092'):
try:
resp = requests.get(
f'http://{reducer_host}:{reducer_port}/netgraph', verify=False)

endpoint = "list_clients" if node_type == "client" else "list_combiners"

response = requests.get(
f'http://{reducer_host}:{reducer_port}/{endpoint}', verify=False)

if response.status_code == 200:

data = json.loads(response.content)

count = 0
if node_type == "client":
arr = data.get('result')
count = sum(element.get('status') == "online" for element in arr)
else:
count = data.get('count')

_eprint(f'Active {node_type}s: {count}.')
return count == n_nodes

except Exception as e:
_eprint(f'Reques exception econuntered: {e}.')
_eprint(f'Request exception enconuntered: {e}.')
return False
if resp.status_code == 200:
gr = json.loads(resp.content)
n = sum(values.get('type') == node_type and values.get(
'status') == 'active' for values in gr['nodes'])
_eprint(f'Active {node_type}s: {n}.')
return n == n_nodes
_eprint(f'Reducer returned {resp.status_code}.')
return False


def rounds(n_rounds=3):
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ To connect a client that uses the data partition 'data/clients/1/mnist.pt':
-v $PWD/data/clients/1:/var/data \
-e ENTRYPOINT_OPTS=--data_path=/var/data/mnist.pt \
--network=fedn_default \
ghcr.io/scaleoutsystems/fedn/fedn:develop-mnist-pytorch run client -in client.yaml --name client1
ghcr.io/scaleoutsystems/fedn/fedn:master-mnist-pytorch run client -in client.yaml --name client1
You are now ready to start training the model at http://localhost:8090/control.

Expand Down
2 changes: 1 addition & 1 deletion examples/mnist-keras/bin/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ set -e
client/entrypoint init_seed

# Make compute package
tar -czvf package.tgz client
tar -czvf package.tgz client
6 changes: 4 additions & 2 deletions fedn/fedn/common/storage/s3/miniorepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ def __init__(self, config):
self.create_bucket(self.bucket)

def create_bucket(self, bucket_name):
"""
""" Create a new bucket. If bucket exists, do nothing.
:param bucket_name:
:param bucket_name: The name of the bucket
:type bucket_name: str
"""
found = self.client.bucket_exists(bucket_name)

if not found:
try:
self.client.make_bucket(bucket_name)
Expand Down
61 changes: 51 additions & 10 deletions fedn/fedn/common/tracer/mongotracer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import uuid
from datetime import datetime

from google.protobuf.json_format import MessageToDict

Expand All @@ -18,6 +19,7 @@ def __init__(self, mongo_config, network_id):
self.rounds = self.mdb['control.rounds']
self.sessions = self.mdb['control.sessions']
self.validations = self.mdb['control.validations']
self.clients = self.mdb['network.clients']
except Exception as e:
print("FAILED TO CONNECT TO MONGO, {}".format(e), flush=True)
self.status = None
Expand Down Expand Up @@ -50,18 +52,26 @@ def drop_status(self):
if self.status:
self.status.drop()

def new_session(self, id=None):
""" Create a new session. """
def create_session(self, id=None):
""" Create a new session.
:param id: The ID of the created session.
:type id: uuid, str
"""
if not id:
id = uuid.uuid4()
data = {'session_id': str(id)}
self.sessions.insert_one(data)

def new_round(self, id):
""" Create a new session. """
def create_round(self, round_data):
""" Create a new round.
data = {'round_id': str(id)}
self.rounds.insert_one(data)
:param round_data: Dictionary with round data.
:type round_data: dict
"""
# TODO: Add check if round_id already exists
self.rounds.insert_one(round_data)

def set_session_config(self, id, config):
self.sessions.update_one({'session_id': str(id)}, {
Expand All @@ -70,15 +80,46 @@ def set_session_config(self, id, config):
def set_round_combiner_data(self, data):
"""
:param round_meta:
:param data: The combiner data
:type data: dict
"""
self.rounds.update_one({'round_id': str(data['round_id'])}, {
'$push': {'combiners': data}}, True)

def set_round_data(self, round_data):
def set_round_config(self, round_id, round_config):
"""
:param round_meta:
"""
self.rounds.update_one({'round_id': str(round_data['round_id'])}, {
'$push': {'reducer': round_data}}, True)
self.rounds.update_one({'round_id': round_id}, {
'$set': {'round_config': round_config}}, True)

def set_round_status(self, round_id, round_status):
"""
:param round_meta:
"""
self.rounds.update_one({'round_id': round_id}, {
'$set': {'status': round_status}}, True)

def set_round_data(self, round_id, round_data):
"""
:param round_meta:
"""
self.rounds.update_one({'round_id': round_id}, {
'$set': {'round_data': round_data}}, True)

def update_client_status(self, client_name, status):
""" Update client status in statestore.
:param client_name: The client name
:type client_name: str
:param status: The client status
:type status: str
:return: None
"""
datetime_now = datetime.now()
filter_query = {"name": client_name}

update_query = {"$set": {"last_seen": datetime_now, "status": status}}
self.clients.update_one(filter_query, update_query)
Loading

0 comments on commit 3965989

Please sign in to comment.