Skip to content

Commit

Permalink
Merge branch 'master' into feature/android
Browse files Browse the repository at this point in the history
  • Loading branch information
niklastheman committed Nov 17, 2023
2 parents b316303 + 36cf7f7 commit 1c0ee37
Show file tree
Hide file tree
Showing 16 changed files with 349 additions and 183 deletions.
6 changes: 3 additions & 3 deletions .ci/tests/examples/wait_for.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def _retry(try_func, **func_args):
for _ in range(RETRIES):
is_success = try_func(**func_args)
if is_success:
_eprint('Sucess.')
_eprint('Success.')
return True
_eprint(f'Sleeping for {SLEEP}.')
sleep(SLEEP)
Expand All @@ -30,7 +30,7 @@ def _test_rounds(n_rounds):
client = pymongo.MongoClient(
"mongodb://fedn_admin:password@localhost:6534")
collection = client['fedn-network']['control']['rounds']
query = {'reducer.status': 'Success'}
query = {'status': 'Finished'}
n = collection.count_documents(query)
client.close()
_eprint(f'Succeded rounds: {n}.')
Expand Down Expand Up @@ -60,7 +60,7 @@ def _test_nodes(n_nodes, node_type, reducer_host='localhost', reducer_port='8092
return count == n_nodes

except Exception as e:
_eprint(f'Reques exception econuntered: {e}.')
_eprint(f'Request exception enconuntered: {e}.')
return False


Expand Down
1 change: 1 addition & 0 deletions .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ python:
install:
- method: pip
path: ./fedn
- requirements: docs/requirements.txt
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Base image
ARG BASE_IMG=python:3.9-slim
ARG BASE_IMG=python:3.10-slim
FROM $BASE_IMG

# Requirements (use MNIST Keras as default)
Expand Down
8 changes: 4 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ services:
build:
context: .
args:
BASE_IMG: ${BASE_IMG:-python:3.9-slim}
BASE_IMG: ${BASE_IMG:-python:3.10-slim}
working_dir: /app
volumes:
- ${HOST_REPO_DIR:-.}/fedn:/app/fedn
Expand All @@ -89,7 +89,7 @@ services:
build:
context: .
args:
BASE_IMG: ${BASE_IMG:-python:3.9-slim}
BASE_IMG: ${BASE_IMG:-python:3.10-slim}
working_dir: /app
volumes:
- ${HOST_REPO_DIR:-.}/fedn:/app/fedn
Expand All @@ -110,7 +110,7 @@ services:
build:
context: .
args:
BASE_IMG: ${BASE_IMG:-python:3.9-slim}
BASE_IMG: ${BASE_IMG:-python:3.10-slim}
working_dir: /app
volumes:
- ${HOST_REPO_DIR:-.}/fedn:/app/fedn
Expand All @@ -127,7 +127,7 @@ services:
build:
context: .
args:
BASE_IMG: ${BASE_IMG:-python:3.9-slim}
BASE_IMG: ${BASE_IMG:-python:3.10-slim}
working_dir: /app
volumes:
- ${HOST_REPO_DIR:-.}/fedn:/app/fedn
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
author = 'Scaleout Systems AB'

# The full version, including alpha/beta/rc tags
release = '0.4.1'
release = '0.6.0'

# Add any Sphinx extension module names here, as strings
extensions = [
Expand Down
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sphinx-rtd-theme
2 changes: 1 addition & 1 deletion examples/mnist-keras/bin/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ set -e
client/entrypoint init_seed

# Make compute package
tar -czvf package.tgz client
tar -czvf package.tgz client
6 changes: 4 additions & 2 deletions fedn/fedn/common/storage/s3/miniorepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ def __init__(self, config):
self.create_bucket(self.bucket)

def create_bucket(self, bucket_name):
"""
""" Create a new bucket. If bucket exists, do nothing.
:param bucket_name:
:param bucket_name: The name of the bucket
:type bucket_name: str
"""
found = self.client.bucket_exists(bucket_name)

if not found:
try:
self.client.make_bucket(bucket_name)
Expand Down
45 changes: 35 additions & 10 deletions fedn/fedn/common/tracer/mongotracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,26 @@ def drop_status(self):
if self.status:
self.status.drop()

def new_session(self, id=None):
""" Create a new session. """
def create_session(self, id=None):
""" Create a new session.
:param id: The ID of the created session.
:type id: uuid, str
"""
if not id:
id = uuid.uuid4()
data = {'session_id': str(id)}
self.sessions.insert_one(data)

def new_round(self, id):
""" Create a new session. """
def create_round(self, round_data):
""" Create a new round.
data = {'round_id': str(id)}
self.rounds.insert_one(data)
:param round_data: Dictionary with round data.
:type round_data: dict
"""
# TODO: Add check if round_id already exists
self.rounds.insert_one(round_data)

def set_session_config(self, id, config):
self.sessions.update_one({'session_id': str(id)}, {
Expand All @@ -72,18 +80,35 @@ def set_session_config(self, id, config):
def set_round_combiner_data(self, data):
"""
:param round_meta:
:param data: The combiner data
:type data: dict
"""
self.rounds.update_one({'round_id': str(data['round_id'])}, {
'$push': {'combiners': data}}, True)

def set_round_data(self, round_data):
def set_round_config(self, round_id, round_config):
"""
:param round_meta:
"""
self.rounds.update_one({'round_id': round_id}, {
'$set': {'round_config': round_config}}, True)

def set_round_status(self, round_id, round_status):
"""
:param round_meta:
"""
self.rounds.update_one({'round_id': round_id}, {
'$set': {'status': round_status}}, True)

def set_round_data(self, round_id, round_data):
"""
:param round_meta:
"""
self.rounds.update_one({'round_id': str(round_data['round_id'])}, {
'$push': {'reducer': round_data}}, True)
self.rounds.update_one({'round_id': round_id}, {
'$set': {'round_data': round_data}}, True)

def update_client_status(self, client_name, status):
""" Update client status in statestore.
Expand Down
5 changes: 0 additions & 5 deletions fedn/fedn/network/api/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import uuid

import requests

__all__ = ['APIClient']
Expand Down Expand Up @@ -137,9 +135,6 @@ def start_session(self, session_id=None, round_timeout=180, rounds=5, round_buff
:return: A dict with success or failure message and session config.
:rtype: dict
"""
# If session id is None, generate a random session id.
if session_id is None:
session_id = str(uuid.uuid4())
response = requests.post(self._get_url('start_session'), json={
'session_id': session_id,
'round_timeout': round_timeout,
Expand Down
8 changes: 4 additions & 4 deletions fedn/fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import os
import threading
import uuid
from io import BytesIO

from flask import jsonify, send_from_directory
Expand Down Expand Up @@ -707,9 +708,8 @@ def get_round(self, round_id):
if round_object is None:
return jsonify({"success": False, "message": "Round not found."})
payload = {
"round_id": round_object["round_id"],
"reducer": round_object["reducer"],
"combiners": round_object["combiners"],
'round_id': round_object['round_id'],
'combiners': round_object['combiners'],
}
return jsonify(payload)

Expand Down Expand Up @@ -864,7 +864,7 @@ def start_session(

# Setup session config
session_config = {
"session_id": session_id,
"session_id": session_id if session_id else str(uuid.uuid4()),
"round_timeout": round_timeout,
"buffer_size": round_buffer_size,
"model_id": model_id,
Expand Down
64 changes: 53 additions & 11 deletions fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import queue
import re
import ssl
import socket
import sys
import tempfile
import threading
Expand All @@ -15,7 +15,9 @@
from io import BytesIO

import grpc
from cryptography.hazmat.primitives.serialization import Encoding
from google.protobuf.json_format import MessageToJson
from OpenSSL import SSL

import fedn.common.net.grpc.fedn_pb2 as fedn
import fedn.common.net.grpc.fedn_pb2_grpc as rpc
Expand Down Expand Up @@ -127,6 +129,42 @@ def _assign(self):
print("Received combiner config: {}".format(client_config), flush=True)
return client_config

def _add_grpc_metadata(self, key, value):
"""Add metadata for gRPC calls.
:param key: The key of the metadata.
:type key: str
:param value: The value of the metadata.
:type value: str
"""
# Check if metadata exists and add if not
if not hasattr(self, 'metadata'):
self.metadata = ()

# Check if metadata key already exists and replace value if so
for i, (k, v) in enumerate(self.metadata):
if k == key:
# Replace value
self.metadata = self.metadata[:i] + ((key, value),) + self.metadata[i + 1:]
return

# Set metadata using tuple concatenation
self.metadata += ((key, value),)

def _get_ssl_certificate(self, domain, port=443):
context = SSL.Context(SSL.SSLv23_METHOD)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((domain, port))
ssl_sock = SSL.Connection(context, sock)
ssl_sock.set_tlsext_host_name(domain.encode())
ssl_sock.set_connect_state()
ssl_sock.do_handshake()
cert = ssl_sock.get_peer_certificate()
ssl_sock.close()
sock.close()
cert = cert.to_cryptography().public_bytes(Encoding.PEM).decode()
return cert

def _connect(self, client_config):
"""Connect to assigned combiner.
Expand All @@ -137,6 +175,9 @@ def _connect(self, client_config):

# TODO use the client_config['certificate'] for setting up secure comms'
host = client_config['host']
# Add host to gRPC metadata
self._add_grpc_metadata('grpc-server', host)
print("CLIENT: Using metadata: {}".format(self.metadata), flush=True)
port = client_config['port']
secure = False
if client_config['fqdn'] is not None:
Expand All @@ -161,7 +202,7 @@ def _connect(self, client_config):
elif self.config['secure']:
secure = True
print("CLIENT: using CA certificate for GRPC channel")
cert = ssl.get_server_certificate((host, port))
cert = self._get_ssl_certificate(host, port=port)

credentials = grpc.ssl_channel_credentials(cert.encode('utf-8'))
if self.config['token']:
Expand Down Expand Up @@ -331,7 +372,7 @@ def get_model(self, id):
"""
data = BytesIO()

for part in self.modelStub.Download(fedn.ModelRequest(id=id)):
for part in self.modelStub.Download(fedn.ModelRequest(id=id), metadata=self.metadata):

if part.status == fedn.ModelStatus.IN_PROGRESS:
data.write(part.data)
Expand Down Expand Up @@ -386,7 +427,7 @@ def upload_request_generator(mdl):
if not b:
break

result = self.modelStub.Upload(upload_request_generator(bt))
result = self.modelStub.Upload(upload_request_generator(bt), metadata=self.metadata)

return result

Expand All @@ -400,11 +441,12 @@ def _listen_to_model_update_request_stream(self):
r = fedn.ClientAvailableMessage()
r.sender.name = self.name
r.sender.role = fedn.WORKER
metadata = [('client', r.sender.name)]
# Add client to metadata
self._add_grpc_metadata('client', self.name)

while True:
try:
for request in self.combinerStub.ModelUpdateRequestStream(r, metadata=metadata):
for request in self.combinerStub.ModelUpdateRequestStream(r, metadata=self.metadata):
if request.sender.role == fedn.COMBINER:
# Process training request
self._send_status("Received model update request.", log_level=fedn.Status.AUDIT,
Expand Down Expand Up @@ -438,7 +480,7 @@ def _listen_to_model_validation_request_stream(self):
r.sender.role = fedn.WORKER
while True:
try:
for request in self.combinerStub.ModelValidationRequestStream(r):
for request in self.combinerStub.ModelValidationRequestStream(r, metadata=self.metadata):
# Process validation request
_ = request.model_id
self._send_status("Recieved model validation request.", log_level=fedn.Status.AUDIT,
Expand Down Expand Up @@ -589,7 +631,7 @@ def process_request(self):
update.correlation_id = request.correlation_id
update.meta = json.dumps(meta)
# TODO: Check responses
_ = self.combinerStub.SendModelUpdate(update)
_ = self.combinerStub.SendModelUpdate(update, metadata=self.metadata)
self._send_status("Model update completed.", log_level=fedn.Status.AUDIT,
type=fedn.StatusType.MODEL_UPDATE, request=update)

Expand Down Expand Up @@ -618,7 +660,7 @@ def process_request(self):
validation.timestamp = self.str
validation.correlation_id = request.correlation_id
_ = self.combinerStub.SendModelValidation(
validation)
validation, metadata=self.metadata)

# Set status type
if request.is_inference:
Expand Down Expand Up @@ -655,7 +697,7 @@ def _send_heartbeat(self, update_frequency=2.0):
heartbeat = fedn.Heartbeat(sender=fedn.Client(
name=self.name, role=fedn.WORKER))
try:
self.connectorStub.SendHeartbeat(heartbeat)
self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata)
self._missed_heartbeat = 0
except grpc.RpcError as e:
status_code = e.code()
Expand Down Expand Up @@ -694,7 +736,7 @@ def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None)
self.logs.append(
"{} {} LOG LEVEL {} MESSAGE {}".format(str(datetime.now()), status.sender.name, status.log_level,
status.status))
_ = self.connectorStub.SendStatus(status)
_ = self.connectorStub.SendStatus(status, metadata=self.metadata)

def run(self):
""" Run the client. """
Expand Down
Loading

0 comments on commit 1c0ee37

Please sign in to comment.