Skip to content

Commit

Permalink
work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Wrede committed Nov 30, 2023
1 parent 91a635e commit 475d9a2
Show file tree
Hide file tree
Showing 10 changed files with 549 additions and 916 deletions.
8 changes: 1 addition & 7 deletions fedn/fedn/common/net/grpc/fedn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,16 +195,10 @@ message ControlResponse {
repeated Parameter parameter = 2;
}

message ReportResponse {
Client sender = 1;
repeated Parameter parameter = 2;
}

service Control {
rpc Start(ControlRequest) returns (ControlResponse);
rpc Stop(ControlRequest) returns (ControlResponse);
rpc FlushAggregationQueue(ControlRequest) returns (ControlResponse);
rpc Report(ControlRequest) returns (ReportResponse);
rpc FlushAggregationQueue(ControlRequest) returns (ControlResponse);
}

service Reducer {
Expand Down
382 changes: 74 additions & 308 deletions fedn/fedn/common/net/grpc/fedn_pb2.py

Large diffs are not rendered by default.

995 changes: 434 additions & 561 deletions fedn/fedn/common/net/grpc/fedn_pb2_grpc.py

Large diffs are not rendered by default.

22 changes: 12 additions & 10 deletions fedn/fedn/common/net/grpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@
import grpc

import fedn.common.net.grpc.fedn_pb2_grpc as rpc
from fedn.common.log_config import (logger, set_log_level_from_string,
set_log_stream)


class Server:
"""
Server class for gRPC server.
"""

def __init__(self, servicer, modelservicer, config):

set_log_level_from_string(config.get('verbosity', "INFO"))
set_log_stream(config.get('logfile', None))

self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=350))
self.certificate = None

Expand All @@ -27,24 +32,21 @@ def __init__(self, servicer, modelservicer, config):
rpc.add_ControlServicer_to_server(servicer, self.server)

if config['secure']:
print("Creating secure gRPCS server using certificate: {config['certificate']}", flush=True)
logger.info(f'Creating secure gRPCS server using certificate: {config["certificate"]}')
server_credentials = grpc.ssl_server_credentials(
((config['key'], config['certificate'],),))
self.server.add_secure_port(
'[::]:' + str(config['port']), server_credentials)
else:
print("Creating insecure gRPC server", flush=True)
logger.info("Creating insecure gRPC server")
self.server.add_insecure_port('[::]:' + str(config['port']))

def start(self):
"""
"""
print("Server started", flush=True)
""" Start the gRPC server."""
logger.info("gRPC Server started")
self.server.start()

def stop(self):
"""
"""
""" Stop the gRPC server."""
logger.info("gRPC Server stopped")
self.server.stop(0)
1 change: 0 additions & 1 deletion fedn/fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def get_clients(self, limit=None, skip=None, status=False):

return jsonify(result)


def get_all_combiners(self, limit=None, skip=None):
"""Get all combiners from the statestore.
Expand Down
2 changes: 1 addition & 1 deletion fedn/fedn/network/combiner/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def allowing_clients(self):
def list_active_clients(self, queue=1):
""" List active clients.
:param queue: The channel (queque) to use (optional). Default is 1 = MODEL_UPDATE_REQUESTS channel.
:param queue: The channel (queue) to use (optional). Default is 1 = MODEL_UPDATE_REQUESTS channel.
see :class:`fedn.common.net.grpc.fedn_pb2.Channel`
:type channel: int
:return: A list of active clients.
Expand Down
10 changes: 6 additions & 4 deletions fedn/fedn/network/combiner/modelservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import fedn.common.net.grpc.fedn_pb2 as fedn
import fedn.common.net.grpc.fedn_pb2_grpc as rpc
from fedn.common.log_config import logger
from fedn.common.storage.models.tempmodelstorage import TempModelStorage

CHUNK_SIZE = 1024 * 1024
Expand Down Expand Up @@ -141,7 +142,7 @@ def Upload(self, request_iterator, context):
:return: A model response object.
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.ModelResponse`
"""

logger.info("grpc.ModelService.Upload: Called")
result = None
for request in request_iterator:
if request.status == fedn.ModelStatus.IN_PROGRESS:
Expand All @@ -167,12 +168,13 @@ def Download(self, request, context):
:return: A model response iterator.
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.ModelResponse`
"""
logger.info("grpc.ModelService.Download: Called")
try:
if self.models.get_meta(request.id) != fedn.ModelStatus.OK:
print("Error file is not ready", flush=True)
logger.warning("Error file is not ready")
yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED)
except Exception:
print("Error file does not exist: {}".format(request.id), flush=True)
logger.error("Error file does not exist: {}".format(request.id))
yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED)

try:
Expand All @@ -185,4 +187,4 @@ def Download(self, request, context):
return
yield fedn.ModelResponse(id=request.id, data=piece, status=fedn.ModelStatus.IN_PROGRESS)
except Exception as e:
print("Downloading went wrong: {} {}".format(request.id, e), flush=True)
logger.error("Downloading went wrong: {} {}".format(request.id, e))
7 changes: 3 additions & 4 deletions fedn/fedn/network/combiner/round.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import time
import uuid

from fedn.common.log_config import logger
from fedn.network.combiner.aggregators.aggregatorbase import get_aggregator
from fedn.utils.helpers import get_helper
from fedn.common.log_config import logger


class ModelUpdateError(Exception):
Expand Down Expand Up @@ -207,7 +207,7 @@ def stage_model(self, model_id, timeout_retry=3, retry=2):
if self.modelservice.models.exist(model_id):
logger.info("ROUNDCONTROL: Model already exists in memory, skipping model staging.")
return
print("ROUNDCONTROL: Model Staging, fetching model from storage...")
logger.info("ROUNDCONTROL: Model Staging, fetching model from storage...")
# If not, download it and stage it in memory at the combiner.
tries = 0
while True:
Expand All @@ -216,8 +216,7 @@ def stage_model(self, model_id, timeout_retry=3, retry=2):
if model:
break
except Exception:
logger.info("ROUNDCONTROL: Could not fetch model from storage backend, retrying.",
flush=True)
logger.info("ROUNDCONTROL: Could not fetch model from storage backend, retrying.")
time.sleep(timeout_retry)
tries += 1
if tries > retry:
Expand Down
34 changes: 16 additions & 18 deletions fedn/fedn/network/combiner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,6 @@ def nr_active_trainers(self):
"""
return len(self.get_active_trainers())

def nr_active_validators(self):
""" Get the number of active validators.
:return: the number of active validators
:rtype: int
"""
return len(self.get_active_validators())

####################################################################################################################

def __join_client(self, client):
Expand Down Expand Up @@ -383,13 +375,16 @@ def Start(self, control: fedn.ControlRequest, context):
:return: the control response
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse`
"""
print("\nRECIEVED **START** from Controller {}\n".format(control.command), flush=True)
logger.info("grpc.Combiner.Start: Starting round")

config = {}
for parameter in control.parameter:
config.update({parameter.key: parameter.value})

logger.debug("grpc.Combiner.Start: Round config {}".format(config))

job_id = self.control.push_round_config(config)
logger.info("grcp.Combiner.Start: Pushed round config (job_id): {}".format(job_id))

response = fedn.ControlResponse()
p = response.parameter.add()
Expand All @@ -408,7 +403,7 @@ def FlushAggregationQueue(self, control: fedn.ControlRequest, context):
:return: the control response
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse`
"""

logger.info("grpc.Combiner.FlushAggregationQueue: Called")
status = self._flush_model_update_queue()

response = fedn.ControlResponse()
Expand All @@ -432,7 +427,7 @@ def Stop(self, control: fedn.ControlRequest, context):
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse`
"""
response = fedn.ControlResponse()
print("\n RECIEVED **STOP** from Controller\n", flush=True)
logger.info("grpc.Combiner.Stop: Called")
return response

#####################################################################################################################
Expand All @@ -447,7 +442,7 @@ def SendStatus(self, status: fedn.Status, context):
:return: the response
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response`
"""

logger.debug("grpc.Combiner.SendStatus: Called")
self._send_status(status)

response = fedn.Response()
Expand All @@ -468,8 +463,11 @@ def ListActiveClients(self, request: fedn.ListClientsRequest, context):
"""
clients = fedn.ClientList()
active_clients = self._list_active_clients(request.channel)
logger.info("Active clients: {}".format(active_clients))
logger.info("All clients: {}".format(self.clients))
nr_active_clients = len(active_clients)
if nr_active_clients < 20:
logger.info("grpc.Combiner.ListActiveClients: Active clients: {}".format(active_clients))
else:
logger.info("grpc.Combiner.ListActiveClients: Number active clients: {}".format(nr_active_clients))

for client in active_clients:
clients.client.append(fedn.Client(name=client, role=fedn.WORKER))
Expand Down Expand Up @@ -500,7 +498,7 @@ def AcceptingClients(self, request: fedn.ConnectionRequest, context):
return response

except Exception as e:
print("Combiner not properly configured! {}".format(e), flush=True)
logger.error("Combiner not properly configured! {}".format(e), flush=True)
raise

response.status = fedn.ConnectionStatus.TRY_AGAIN_LATER
Expand Down Expand Up @@ -572,7 +570,7 @@ def ModelUpdateRequestStream(self, response, context):
metadata = context.invocation_metadata()
if metadata:
metadata = dict(metadata)
print("\nClient connected: {}\n".format(metadata['client']), flush=True)
logger.info("grpc.Combiner.ModelUpdateRequestStream: Client connected: {}\n".format(metadata['client']))

status = fedn.Status(
status="Client {} connecting to ModelUpdateRequestStream.".format(client.name))
Expand Down Expand Up @@ -703,8 +701,8 @@ def SendModelValidation(self, request, context):
def run(self):
""" Start the server."""

print("COMBINER: {} started, ready for requests. ".format(
self.id), flush=True)
logger.info("COMBINER: {} started, ready for gRPC requests.".format(
self.id))
try:
while True:
signal.pause()
Expand Down
4 changes: 2 additions & 2 deletions fedn/fedn/network/loadbalancer/leastpacked.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


class LeastPacked(LoadBalancerBase):
""" Load balancer that selects the combiner with the least number of attached clients.
""" Load balancer that selects the combiner with the least number of attached training clients.
:param network: A handle to the network.
:type network: class: `fedn.network.api.network.Network`
Expand All @@ -23,7 +23,7 @@ def find_combiner(self):
for combiner in self.network.get_combiners():
try:
if combiner.allowing_clients():
# Using default default Channel = 1
# Using default default Channel = 1, MODEL_UPDATE_REQUESTS
nr_active_clients = len(combiner.list_active_clients())
if not min_clients:
min_clients = nr_active_clients
Expand Down

0 comments on commit 475d9a2

Please sign in to comment.