diff --git a/.devcontainer/devcontainer.json.tpl b/.devcontainer/devcontainer.json.tpl new file mode 100644 index 000000000..cdf276df5 --- /dev/null +++ b/.devcontainer/devcontainer.json.tpl @@ -0,0 +1,27 @@ +{ + "name": "devcontainer", + "dockerFile": "Dockerfile", + "context": "..", + "remoteUser": "default", + // "workspaceFolder": "/fedn", + // "workspaceMount": "source=/path/to/fedn,target=/fedn,type=bind,consistency=default", + "extensions": [ + "ms-azuretools.vscode-docker", + "ms-python.python", + "exiasr.hadolint", + "yzhang.markdown-all-in-one", + "ms-python.isort" + ], + "mounts": [ + "source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind,consistency=default", + ], + "runArgs": [ + "--net=host" + ], + "build": { + "args": { + "BASE_IMG": "python:3.9" + } + } +} + diff --git a/examples/mnist-keras/requirements.txt b/examples/mnist-keras/requirements.txt index 18b9e6e6a..749cf78a8 100644 --- a/examples/mnist-keras/requirements.txt +++ b/examples/mnist-keras/requirements.txt @@ -1,3 +1,3 @@ -tensorflow==2.9.3 +tensorflow==2.13.1 fire==0.3.1 docker==6.1.1 \ No newline at end of file diff --git a/fedn/fedn/common/net/grpc/fedn.proto b/fedn/fedn/common/net/grpc/fedn.proto index ff0ee293c..1980889ca 100644 --- a/fedn/fedn/common/net/grpc/fedn.proto +++ b/fedn/fedn/common/net/grpc/fedn.proto @@ -195,17 +195,10 @@ message ControlResponse { repeated Parameter parameter = 2; } -message ReportResponse { - Client sender = 1; - repeated Parameter parameter = 2; -} - service Control { rpc Start(ControlRequest) returns (ControlResponse); rpc Stop(ControlRequest) returns (ControlResponse); - rpc Configure(ControlRequest) returns (ReportResponse); - rpc FlushAggregationQueue(ControlRequest) returns (ControlResponse); - rpc Report(ControlRequest) returns (ReportResponse); + rpc FlushAggregationQueue(ControlRequest) returns (ControlResponse); } service Reducer { @@ -253,9 +246,7 @@ service Combiner { rpc ModelValidationRequestStream (ClientAvailableMessage) returns (stream ModelValidationRequest); rpc ModelValidationStream (ClientAvailableMessage) returns (stream ModelValidation); - rpc SendModelUpdateRequest (ModelUpdateRequest) returns (Response); rpc SendModelUpdate (ModelUpdate) returns (Response); - rpc SendModelValidationRequest (ModelValidationRequest) returns (Response); rpc SendModelValidation (ModelValidation) returns (Response); } diff --git a/fedn/fedn/common/net/grpc/fedn_pb2.py b/fedn/fedn/common/net/grpc/fedn_pb2.py index fa4fbb16d..daca249d8 100644 --- a/fedn/fedn/common/net/grpc/fedn_pb2.py +++ b/fedn/fedn/common/net/grpc/fedn_pb2.py @@ -1,13 +1,11 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fedn/common/net/grpc/fedn.proto +# source: fedn.proto """Generated protocol buffer code.""" -from google.protobuf.internal import enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -15,312 +13,80 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1f\x66\x65\x64n/common/net/grpc/fedn.proto\x12\x04grpc\":\n\x08Response\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08response\x18\x02 \x01(\t\"\x8c\x02\n\x06Status\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06status\x18\x02 \x01(\t\x12(\n\tlog_level\x18\x03 \x01(\x0e\x32\x15.grpc.Status.LogLevel\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x1e\n\x04type\x18\x07 \x01(\x0e\x32\x10.grpc.StatusType\x12\r\n\x05\x65xtra\x18\x08 \x01(\t\"B\n\x08LogLevel\x12\x08\n\x04INFO\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x0b\n\x07WARNING\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\t\n\x05\x41UDIT\x10\x04\"\xab\x01\n\x12ModelUpdateRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xaf\x01\n\x0bModelUpdate\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x17\n\x0fmodel_update_id\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xc5\x01\n\x16ModelValidationRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x14\n\x0cis_inference\x18\x08 \x01(\x08\"\xa8\x01\n\x0fModelValidation\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\x89\x01\n\x0cModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\x12\n\n\x02id\x18\x04 \x01(\t\x12!\n\x06status\x18\x05 \x01(\x0e\x32\x11.grpc.ModelStatus\"]\n\rModelResponse\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\n\n\x02id\x18\x02 \x01(\t\x12!\n\x06status\x18\x03 \x01(\x0e\x32\x11.grpc.ModelStatus\x12\x0f\n\x07message\x18\x04 \x01(\t\"U\n\x15GetGlobalModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\"h\n\x16GetGlobalModelResponse\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\")\n\tHeartbeat\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\"W\n\x16\x43lientAvailableMessage\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"R\n\x12ListClientsRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x07\x63hannel\x18\x02 \x01(\x0e\x32\r.grpc.Channel\"*\n\nClientList\x12\x1c\n\x06\x63lient\x18\x01 \x03(\x0b\x32\x0c.grpc.Client\"0\n\x06\x43lient\x12\x18\n\x04role\x18\x01 \x01(\x0e\x32\n.grpc.Role\x12\x0c\n\x04name\x18\x02 \x01(\t\"m\n\x0fReassignRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06server\x18\x03 \x01(\t\x12\x0c\n\x04port\x18\x04 \x01(\r\"c\n\x10ReconnectRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x11\n\treconnect\x18\x03 \x01(\r\"\'\n\tParameter\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"T\n\x0e\x43ontrolRequest\x12\x1e\n\x07\x63ommand\x18\x01 \x01(\x0e\x32\r.grpc.Command\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"F\n\x0f\x43ontrolResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"R\n\x0eReportResponse\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"\x13\n\x11\x43onnectionRequest\"<\n\x12\x43onnectionResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.grpc.ConnectionStatus*\x84\x01\n\nStatusType\x12\x07\n\x03LOG\x10\x00\x12\x18\n\x14MODEL_UPDATE_REQUEST\x10\x01\x12\x10\n\x0cMODEL_UPDATE\x10\x02\x12\x1c\n\x18MODEL_VALIDATION_REQUEST\x10\x03\x12\x14\n\x10MODEL_VALIDATION\x10\x04\x12\r\n\tINFERENCE\x10\x05*\x86\x01\n\x07\x43hannel\x12\x0b\n\x07\x44\x45\x46\x41ULT\x10\x00\x12\x19\n\x15MODEL_UPDATE_REQUESTS\x10\x01\x12\x11\n\rMODEL_UPDATES\x10\x02\x12\x1d\n\x19MODEL_VALIDATION_REQUESTS\x10\x03\x12\x15\n\x11MODEL_VALIDATIONS\x10\x04\x12\n\n\x06STATUS\x10\x05*F\n\x0bModelStatus\x12\x06\n\x02OK\x10\x00\x12\x0f\n\x0bIN_PROGRESS\x10\x01\x12\x12\n\x0eIN_PROGRESS_OK\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03*8\n\x04Role\x12\n\n\x06WORKER\x10\x00\x12\x0c\n\x08\x43OMBINER\x10\x01\x12\x0b\n\x07REDUCER\x10\x02\x12\t\n\x05OTHER\x10\x03*J\n\x07\x43ommand\x12\x08\n\x04IDLE\x10\x00\x12\t\n\x05START\x10\x01\x12\t\n\x05PAUSE\x10\x02\x12\x08\n\x04STOP\x10\x03\x12\t\n\x05RESET\x10\x04\x12\n\n\x06REPORT\x10\x05*I\n\x10\x43onnectionStatus\x12\x11\n\rNOT_ACCEPTING\x10\x00\x12\r\n\tACCEPTING\x10\x01\x12\x13\n\x0fTRY_AGAIN_LATER\x10\x02\x32z\n\x0cModelService\x12\x33\n\x06Upload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse(\x01\x12\x35\n\x08\x44ownload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse0\x01\x32\xa9\x02\n\x07\x43ontrol\x12\x34\n\x05Start\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x33\n\x04Stop\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x37\n\tConfigure\x12\x14.grpc.ControlRequest\x1a\x14.grpc.ReportResponse\x12\x44\n\x15\x46lushAggregationQueue\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x34\n\x06Report\x12\x14.grpc.ControlRequest\x1a\x14.grpc.ReportResponse2V\n\x07Reducer\x12K\n\x0eGetGlobalModel\x12\x1b.grpc.GetGlobalModelRequest\x1a\x1c.grpc.GetGlobalModelResponse2\xab\x03\n\tConnector\x12\x44\n\x14\x41llianceStatusStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x0c.grpc.Status0\x01\x12*\n\nSendStatus\x12\x0c.grpc.Status\x1a\x0e.grpc.Response\x12?\n\x11ListActiveClients\x12\x18.grpc.ListClientsRequest\x1a\x10.grpc.ClientList\x12\x45\n\x10\x41\x63\x63\x65ptingClients\x12\x17.grpc.ConnectionRequest\x1a\x18.grpc.ConnectionResponse\x12\x30\n\rSendHeartbeat\x12\x0f.grpc.Heartbeat\x1a\x0e.grpc.Response\x12\x37\n\x0eReassignClient\x12\x15.grpc.ReassignRequest\x1a\x0e.grpc.Response\x12\x39\n\x0fReconnectClient\x12\x16.grpc.ReconnectRequest\x1a\x0e.grpc.Response2\xda\x04\n\x08\x43ombiner\x12T\n\x18ModelUpdateRequestStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x18.grpc.ModelUpdateRequest0\x01\x12\x46\n\x11ModelUpdateStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x11.grpc.ModelUpdate0\x01\x12\\\n\x1cModelValidationRequestStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x1c.grpc.ModelValidationRequest0\x01\x12N\n\x15ModelValidationStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x15.grpc.ModelValidation0\x01\x12\x42\n\x16SendModelUpdateRequest\x12\x18.grpc.ModelUpdateRequest\x1a\x0e.grpc.Response\x12\x34\n\x0fSendModelUpdate\x12\x11.grpc.ModelUpdate\x1a\x0e.grpc.Response\x12J\n\x1aSendModelValidationRequest\x12\x1c.grpc.ModelValidationRequest\x1a\x0e.grpc.Response\x12<\n\x13SendModelValidation\x12\x15.grpc.ModelValidation\x1a\x0e.grpc.Responseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nfedn.proto\x12\x04grpc\":\n\x08Response\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08response\x18\x02 \x01(\t\"\x8c\x02\n\x06Status\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06status\x18\x02 \x01(\t\x12(\n\tlog_level\x18\x03 \x01(\x0e\x32\x15.grpc.Status.LogLevel\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x1e\n\x04type\x18\x07 \x01(\x0e\x32\x10.grpc.StatusType\x12\r\n\x05\x65xtra\x18\x08 \x01(\t\"B\n\x08LogLevel\x12\x08\n\x04INFO\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x0b\n\x07WARNING\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\t\n\x05\x41UDIT\x10\x04\"\xab\x01\n\x12ModelUpdateRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xaf\x01\n\x0bModelUpdate\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x17\n\x0fmodel_update_id\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xc5\x01\n\x16ModelValidationRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x14\n\x0cis_inference\x18\x08 \x01(\x08\"\xa8\x01\n\x0fModelValidation\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\x89\x01\n\x0cModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\x12\n\n\x02id\x18\x04 \x01(\t\x12!\n\x06status\x18\x05 \x01(\x0e\x32\x11.grpc.ModelStatus\"]\n\rModelResponse\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\n\n\x02id\x18\x02 \x01(\t\x12!\n\x06status\x18\x03 \x01(\x0e\x32\x11.grpc.ModelStatus\x12\x0f\n\x07message\x18\x04 \x01(\t\"U\n\x15GetGlobalModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\"h\n\x16GetGlobalModelResponse\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\")\n\tHeartbeat\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\"W\n\x16\x43lientAvailableMessage\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"R\n\x12ListClientsRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x07\x63hannel\x18\x02 \x01(\x0e\x32\r.grpc.Channel\"*\n\nClientList\x12\x1c\n\x06\x63lient\x18\x01 \x03(\x0b\x32\x0c.grpc.Client\"0\n\x06\x43lient\x12\x18\n\x04role\x18\x01 \x01(\x0e\x32\n.grpc.Role\x12\x0c\n\x04name\x18\x02 \x01(\t\"m\n\x0fReassignRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06server\x18\x03 \x01(\t\x12\x0c\n\x04port\x18\x04 \x01(\r\"c\n\x10ReconnectRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x11\n\treconnect\x18\x03 \x01(\r\"\'\n\tParameter\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"T\n\x0e\x43ontrolRequest\x12\x1e\n\x07\x63ommand\x18\x01 \x01(\x0e\x32\r.grpc.Command\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"F\n\x0f\x43ontrolResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"\x13\n\x11\x43onnectionRequest\"<\n\x12\x43onnectionResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.grpc.ConnectionStatus*\x84\x01\n\nStatusType\x12\x07\n\x03LOG\x10\x00\x12\x18\n\x14MODEL_UPDATE_REQUEST\x10\x01\x12\x10\n\x0cMODEL_UPDATE\x10\x02\x12\x1c\n\x18MODEL_VALIDATION_REQUEST\x10\x03\x12\x14\n\x10MODEL_VALIDATION\x10\x04\x12\r\n\tINFERENCE\x10\x05*\x86\x01\n\x07\x43hannel\x12\x0b\n\x07\x44\x45\x46\x41ULT\x10\x00\x12\x19\n\x15MODEL_UPDATE_REQUESTS\x10\x01\x12\x11\n\rMODEL_UPDATES\x10\x02\x12\x1d\n\x19MODEL_VALIDATION_REQUESTS\x10\x03\x12\x15\n\x11MODEL_VALIDATIONS\x10\x04\x12\n\n\x06STATUS\x10\x05*F\n\x0bModelStatus\x12\x06\n\x02OK\x10\x00\x12\x0f\n\x0bIN_PROGRESS\x10\x01\x12\x12\n\x0eIN_PROGRESS_OK\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03*8\n\x04Role\x12\n\n\x06WORKER\x10\x00\x12\x0c\n\x08\x43OMBINER\x10\x01\x12\x0b\n\x07REDUCER\x10\x02\x12\t\n\x05OTHER\x10\x03*J\n\x07\x43ommand\x12\x08\n\x04IDLE\x10\x00\x12\t\n\x05START\x10\x01\x12\t\n\x05PAUSE\x10\x02\x12\x08\n\x04STOP\x10\x03\x12\t\n\x05RESET\x10\x04\x12\n\n\x06REPORT\x10\x05*I\n\x10\x43onnectionStatus\x12\x11\n\rNOT_ACCEPTING\x10\x00\x12\r\n\tACCEPTING\x10\x01\x12\x13\n\x0fTRY_AGAIN_LATER\x10\x02\x32z\n\x0cModelService\x12\x33\n\x06Upload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse(\x01\x12\x35\n\x08\x44ownload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse0\x01\x32\xba\x01\n\x07\x43ontrol\x12\x34\n\x05Start\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x33\n\x04Stop\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x44\n\x15\x46lushAggregationQueue\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse2V\n\x07Reducer\x12K\n\x0eGetGlobalModel\x12\x1b.grpc.GetGlobalModelRequest\x1a\x1c.grpc.GetGlobalModelResponse2\xab\x03\n\tConnector\x12\x44\n\x14\x41llianceStatusStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x0c.grpc.Status0\x01\x12*\n\nSendStatus\x12\x0c.grpc.Status\x1a\x0e.grpc.Response\x12?\n\x11ListActiveClients\x12\x18.grpc.ListClientsRequest\x1a\x10.grpc.ClientList\x12\x45\n\x10\x41\x63\x63\x65ptingClients\x12\x17.grpc.ConnectionRequest\x1a\x18.grpc.ConnectionResponse\x12\x30\n\rSendHeartbeat\x12\x0f.grpc.Heartbeat\x1a\x0e.grpc.Response\x12\x37\n\x0eReassignClient\x12\x15.grpc.ReassignRequest\x1a\x0e.grpc.Response\x12\x39\n\x0fReconnectClient\x12\x16.grpc.ReconnectRequest\x1a\x0e.grpc.Response2\xca\x03\n\x08\x43ombiner\x12T\n\x18ModelUpdateRequestStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x18.grpc.ModelUpdateRequest0\x01\x12\x46\n\x11ModelUpdateStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x11.grpc.ModelUpdate0\x01\x12\\\n\x1cModelValidationRequestStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x1c.grpc.ModelValidationRequest0\x01\x12N\n\x15ModelValidationStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x15.grpc.ModelValidation0\x01\x12\x34\n\x0fSendModelUpdate\x12\x11.grpc.ModelUpdate\x1a\x0e.grpc.Response\x12<\n\x13SendModelValidation\x12\x15.grpc.ModelValidation\x1a\x0e.grpc.Responseb\x06proto3') -_STATUSTYPE = DESCRIPTOR.enum_types_by_name['StatusType'] -StatusType = enum_type_wrapper.EnumTypeWrapper(_STATUSTYPE) -_CHANNEL = DESCRIPTOR.enum_types_by_name['Channel'] -Channel = enum_type_wrapper.EnumTypeWrapper(_CHANNEL) -_MODELSTATUS = DESCRIPTOR.enum_types_by_name['ModelStatus'] -ModelStatus = enum_type_wrapper.EnumTypeWrapper(_MODELSTATUS) -_ROLE = DESCRIPTOR.enum_types_by_name['Role'] -Role = enum_type_wrapper.EnumTypeWrapper(_ROLE) -_COMMAND = DESCRIPTOR.enum_types_by_name['Command'] -Command = enum_type_wrapper.EnumTypeWrapper(_COMMAND) -_CONNECTIONSTATUS = DESCRIPTOR.enum_types_by_name['ConnectionStatus'] -ConnectionStatus = enum_type_wrapper.EnumTypeWrapper(_CONNECTIONSTATUS) -LOG = 0 -MODEL_UPDATE_REQUEST = 1 -MODEL_UPDATE = 2 -MODEL_VALIDATION_REQUEST = 3 -MODEL_VALIDATION = 4 -INFERENCE = 5 -DEFAULT = 0 -MODEL_UPDATE_REQUESTS = 1 -MODEL_UPDATES = 2 -MODEL_VALIDATION_REQUESTS = 3 -MODEL_VALIDATIONS = 4 -STATUS = 5 -OK = 0 -IN_PROGRESS = 1 -IN_PROGRESS_OK = 2 -FAILED = 3 -WORKER = 0 -COMBINER = 1 -REDUCER = 2 -OTHER = 3 -IDLE = 0 -START = 1 -PAUSE = 2 -STOP = 3 -RESET = 4 -REPORT = 5 -NOT_ACCEPTING = 0 -ACCEPTING = 1 -TRY_AGAIN_LATER = 2 - - -_RESPONSE = DESCRIPTOR.message_types_by_name['Response'] -_STATUS = DESCRIPTOR.message_types_by_name['Status'] -_MODELUPDATEREQUEST = DESCRIPTOR.message_types_by_name['ModelUpdateRequest'] -_MODELUPDATE = DESCRIPTOR.message_types_by_name['ModelUpdate'] -_MODELVALIDATIONREQUEST = DESCRIPTOR.message_types_by_name['ModelValidationRequest'] -_MODELVALIDATION = DESCRIPTOR.message_types_by_name['ModelValidation'] -_MODELREQUEST = DESCRIPTOR.message_types_by_name['ModelRequest'] -_MODELRESPONSE = DESCRIPTOR.message_types_by_name['ModelResponse'] -_GETGLOBALMODELREQUEST = DESCRIPTOR.message_types_by_name['GetGlobalModelRequest'] -_GETGLOBALMODELRESPONSE = DESCRIPTOR.message_types_by_name['GetGlobalModelResponse'] -_HEARTBEAT = DESCRIPTOR.message_types_by_name['Heartbeat'] -_CLIENTAVAILABLEMESSAGE = DESCRIPTOR.message_types_by_name['ClientAvailableMessage'] -_LISTCLIENTSREQUEST = DESCRIPTOR.message_types_by_name['ListClientsRequest'] -_CLIENTLIST = DESCRIPTOR.message_types_by_name['ClientList'] -_CLIENT = DESCRIPTOR.message_types_by_name['Client'] -_REASSIGNREQUEST = DESCRIPTOR.message_types_by_name['ReassignRequest'] -_RECONNECTREQUEST = DESCRIPTOR.message_types_by_name['ReconnectRequest'] -_PARAMETER = DESCRIPTOR.message_types_by_name['Parameter'] -_CONTROLREQUEST = DESCRIPTOR.message_types_by_name['ControlRequest'] -_CONTROLRESPONSE = DESCRIPTOR.message_types_by_name['ControlResponse'] -_REPORTRESPONSE = DESCRIPTOR.message_types_by_name['ReportResponse'] -_CONNECTIONREQUEST = DESCRIPTOR.message_types_by_name['ConnectionRequest'] -_CONNECTIONRESPONSE = DESCRIPTOR.message_types_by_name['ConnectionResponse'] -_STATUS_LOGLEVEL = _STATUS.enum_types_by_name['LogLevel'] -Response = _reflection.GeneratedProtocolMessageType('Response', (_message.Message,), { - 'DESCRIPTOR' : _RESPONSE, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.Response) - }) -_sym_db.RegisterMessage(Response) - -Status = _reflection.GeneratedProtocolMessageType('Status', (_message.Message,), { - 'DESCRIPTOR' : _STATUS, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.Status) - }) -_sym_db.RegisterMessage(Status) - -ModelUpdateRequest = _reflection.GeneratedProtocolMessageType('ModelUpdateRequest', (_message.Message,), { - 'DESCRIPTOR' : _MODELUPDATEREQUEST, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ModelUpdateRequest) - }) -_sym_db.RegisterMessage(ModelUpdateRequest) - -ModelUpdate = _reflection.GeneratedProtocolMessageType('ModelUpdate', (_message.Message,), { - 'DESCRIPTOR' : _MODELUPDATE, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ModelUpdate) - }) -_sym_db.RegisterMessage(ModelUpdate) - -ModelValidationRequest = _reflection.GeneratedProtocolMessageType('ModelValidationRequest', (_message.Message,), { - 'DESCRIPTOR' : _MODELVALIDATIONREQUEST, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ModelValidationRequest) - }) -_sym_db.RegisterMessage(ModelValidationRequest) - -ModelValidation = _reflection.GeneratedProtocolMessageType('ModelValidation', (_message.Message,), { - 'DESCRIPTOR' : _MODELVALIDATION, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ModelValidation) - }) -_sym_db.RegisterMessage(ModelValidation) - -ModelRequest = _reflection.GeneratedProtocolMessageType('ModelRequest', (_message.Message,), { - 'DESCRIPTOR' : _MODELREQUEST, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ModelRequest) - }) -_sym_db.RegisterMessage(ModelRequest) - -ModelResponse = _reflection.GeneratedProtocolMessageType('ModelResponse', (_message.Message,), { - 'DESCRIPTOR' : _MODELRESPONSE, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ModelResponse) - }) -_sym_db.RegisterMessage(ModelResponse) - -GetGlobalModelRequest = _reflection.GeneratedProtocolMessageType('GetGlobalModelRequest', (_message.Message,), { - 'DESCRIPTOR' : _GETGLOBALMODELREQUEST, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.GetGlobalModelRequest) - }) -_sym_db.RegisterMessage(GetGlobalModelRequest) - -GetGlobalModelResponse = _reflection.GeneratedProtocolMessageType('GetGlobalModelResponse', (_message.Message,), { - 'DESCRIPTOR' : _GETGLOBALMODELRESPONSE, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.GetGlobalModelResponse) - }) -_sym_db.RegisterMessage(GetGlobalModelResponse) - -Heartbeat = _reflection.GeneratedProtocolMessageType('Heartbeat', (_message.Message,), { - 'DESCRIPTOR' : _HEARTBEAT, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.Heartbeat) - }) -_sym_db.RegisterMessage(Heartbeat) - -ClientAvailableMessage = _reflection.GeneratedProtocolMessageType('ClientAvailableMessage', (_message.Message,), { - 'DESCRIPTOR' : _CLIENTAVAILABLEMESSAGE, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ClientAvailableMessage) - }) -_sym_db.RegisterMessage(ClientAvailableMessage) - -ListClientsRequest = _reflection.GeneratedProtocolMessageType('ListClientsRequest', (_message.Message,), { - 'DESCRIPTOR' : _LISTCLIENTSREQUEST, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ListClientsRequest) - }) -_sym_db.RegisterMessage(ListClientsRequest) - -ClientList = _reflection.GeneratedProtocolMessageType('ClientList', (_message.Message,), { - 'DESCRIPTOR' : _CLIENTLIST, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ClientList) - }) -_sym_db.RegisterMessage(ClientList) - -Client = _reflection.GeneratedProtocolMessageType('Client', (_message.Message,), { - 'DESCRIPTOR' : _CLIENT, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.Client) - }) -_sym_db.RegisterMessage(Client) - -ReassignRequest = _reflection.GeneratedProtocolMessageType('ReassignRequest', (_message.Message,), { - 'DESCRIPTOR' : _REASSIGNREQUEST, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ReassignRequest) - }) -_sym_db.RegisterMessage(ReassignRequest) - -ReconnectRequest = _reflection.GeneratedProtocolMessageType('ReconnectRequest', (_message.Message,), { - 'DESCRIPTOR' : _RECONNECTREQUEST, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ReconnectRequest) - }) -_sym_db.RegisterMessage(ReconnectRequest) - -Parameter = _reflection.GeneratedProtocolMessageType('Parameter', (_message.Message,), { - 'DESCRIPTOR' : _PARAMETER, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.Parameter) - }) -_sym_db.RegisterMessage(Parameter) - -ControlRequest = _reflection.GeneratedProtocolMessageType('ControlRequest', (_message.Message,), { - 'DESCRIPTOR' : _CONTROLREQUEST, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ControlRequest) - }) -_sym_db.RegisterMessage(ControlRequest) - -ControlResponse = _reflection.GeneratedProtocolMessageType('ControlResponse', (_message.Message,), { - 'DESCRIPTOR' : _CONTROLRESPONSE, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ControlResponse) - }) -_sym_db.RegisterMessage(ControlResponse) - -ReportResponse = _reflection.GeneratedProtocolMessageType('ReportResponse', (_message.Message,), { - 'DESCRIPTOR' : _REPORTRESPONSE, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ReportResponse) - }) -_sym_db.RegisterMessage(ReportResponse) - -ConnectionRequest = _reflection.GeneratedProtocolMessageType('ConnectionRequest', (_message.Message,), { - 'DESCRIPTOR' : _CONNECTIONREQUEST, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ConnectionRequest) - }) -_sym_db.RegisterMessage(ConnectionRequest) - -ConnectionResponse = _reflection.GeneratedProtocolMessageType('ConnectionResponse', (_message.Message,), { - 'DESCRIPTOR' : _CONNECTIONRESPONSE, - '__module__' : 'fedn.common.net.grpc.fedn_pb2' - # @@protoc_insertion_point(class_scope:grpc.ConnectionResponse) - }) -_sym_db.RegisterMessage(ConnectionResponse) - -_MODELSERVICE = DESCRIPTOR.services_by_name['ModelService'] -_CONTROL = DESCRIPTOR.services_by_name['Control'] -_REDUCER = DESCRIPTOR.services_by_name['Reducer'] -_CONNECTOR = DESCRIPTOR.services_by_name['Connector'] -_COMBINER = DESCRIPTOR.services_by_name['Combiner'] +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'fedn_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _STATUSTYPE._serialized_start=2412 - _STATUSTYPE._serialized_end=2544 - _CHANNEL._serialized_start=2547 - _CHANNEL._serialized_end=2681 - _MODELSTATUS._serialized_start=2683 - _MODELSTATUS._serialized_end=2753 - _ROLE._serialized_start=2755 - _ROLE._serialized_end=2811 - _COMMAND._serialized_start=2813 - _COMMAND._serialized_end=2887 - _CONNECTIONSTATUS._serialized_start=2889 - _CONNECTIONSTATUS._serialized_end=2962 - _RESPONSE._serialized_start=41 - _RESPONSE._serialized_end=99 - _STATUS._serialized_start=102 - _STATUS._serialized_end=370 - _STATUS_LOGLEVEL._serialized_start=304 - _STATUS_LOGLEVEL._serialized_end=370 - _MODELUPDATEREQUEST._serialized_start=373 - _MODELUPDATEREQUEST._serialized_end=544 - _MODELUPDATE._serialized_start=547 - _MODELUPDATE._serialized_end=722 - _MODELVALIDATIONREQUEST._serialized_start=725 - _MODELVALIDATIONREQUEST._serialized_end=922 - _MODELVALIDATION._serialized_start=925 - _MODELVALIDATION._serialized_end=1093 - _MODELREQUEST._serialized_start=1096 - _MODELREQUEST._serialized_end=1233 - _MODELRESPONSE._serialized_start=1235 - _MODELRESPONSE._serialized_end=1328 - _GETGLOBALMODELREQUEST._serialized_start=1330 - _GETGLOBALMODELREQUEST._serialized_end=1415 - _GETGLOBALMODELRESPONSE._serialized_start=1417 - _GETGLOBALMODELRESPONSE._serialized_end=1521 - _HEARTBEAT._serialized_start=1523 - _HEARTBEAT._serialized_end=1564 - _CLIENTAVAILABLEMESSAGE._serialized_start=1566 - _CLIENTAVAILABLEMESSAGE._serialized_end=1653 - _LISTCLIENTSREQUEST._serialized_start=1655 - _LISTCLIENTSREQUEST._serialized_end=1737 - _CLIENTLIST._serialized_start=1739 - _CLIENTLIST._serialized_end=1781 - _CLIENT._serialized_start=1783 - _CLIENT._serialized_end=1831 - _REASSIGNREQUEST._serialized_start=1833 - _REASSIGNREQUEST._serialized_end=1942 - _RECONNECTREQUEST._serialized_start=1944 - _RECONNECTREQUEST._serialized_end=2043 - _PARAMETER._serialized_start=2045 - _PARAMETER._serialized_end=2084 - _CONTROLREQUEST._serialized_start=2086 - _CONTROLREQUEST._serialized_end=2170 - _CONTROLRESPONSE._serialized_start=2172 - _CONTROLRESPONSE._serialized_end=2242 - _REPORTRESPONSE._serialized_start=2244 - _REPORTRESPONSE._serialized_end=2326 - _CONNECTIONREQUEST._serialized_start=2328 - _CONNECTIONREQUEST._serialized_end=2347 - _CONNECTIONRESPONSE._serialized_start=2349 - _CONNECTIONRESPONSE._serialized_end=2409 - _MODELSERVICE._serialized_start=2964 - _MODELSERVICE._serialized_end=3086 - _CONTROL._serialized_start=3089 - _CONTROL._serialized_end=3386 - _REDUCER._serialized_start=3388 - _REDUCER._serialized_end=3474 - _CONNECTOR._serialized_start=3477 - _CONNECTOR._serialized_end=3904 - _COMBINER._serialized_start=3907 - _COMBINER._serialized_end=4509 + _globals['_STATUSTYPE']._serialized_start=2307 + _globals['_STATUSTYPE']._serialized_end=2439 + _globals['_CHANNEL']._serialized_start=2442 + _globals['_CHANNEL']._serialized_end=2576 + _globals['_MODELSTATUS']._serialized_start=2578 + _globals['_MODELSTATUS']._serialized_end=2648 + _globals['_ROLE']._serialized_start=2650 + _globals['_ROLE']._serialized_end=2706 + _globals['_COMMAND']._serialized_start=2708 + _globals['_COMMAND']._serialized_end=2782 + _globals['_CONNECTIONSTATUS']._serialized_start=2784 + _globals['_CONNECTIONSTATUS']._serialized_end=2857 + _globals['_RESPONSE']._serialized_start=20 + _globals['_RESPONSE']._serialized_end=78 + _globals['_STATUS']._serialized_start=81 + _globals['_STATUS']._serialized_end=349 + _globals['_STATUS_LOGLEVEL']._serialized_start=283 + _globals['_STATUS_LOGLEVEL']._serialized_end=349 + _globals['_MODELUPDATEREQUEST']._serialized_start=352 + _globals['_MODELUPDATEREQUEST']._serialized_end=523 + _globals['_MODELUPDATE']._serialized_start=526 + _globals['_MODELUPDATE']._serialized_end=701 + _globals['_MODELVALIDATIONREQUEST']._serialized_start=704 + _globals['_MODELVALIDATIONREQUEST']._serialized_end=901 + _globals['_MODELVALIDATION']._serialized_start=904 + _globals['_MODELVALIDATION']._serialized_end=1072 + _globals['_MODELREQUEST']._serialized_start=1075 + _globals['_MODELREQUEST']._serialized_end=1212 + _globals['_MODELRESPONSE']._serialized_start=1214 + _globals['_MODELRESPONSE']._serialized_end=1307 + _globals['_GETGLOBALMODELREQUEST']._serialized_start=1309 + _globals['_GETGLOBALMODELREQUEST']._serialized_end=1394 + _globals['_GETGLOBALMODELRESPONSE']._serialized_start=1396 + _globals['_GETGLOBALMODELRESPONSE']._serialized_end=1500 + _globals['_HEARTBEAT']._serialized_start=1502 + _globals['_HEARTBEAT']._serialized_end=1543 + _globals['_CLIENTAVAILABLEMESSAGE']._serialized_start=1545 + _globals['_CLIENTAVAILABLEMESSAGE']._serialized_end=1632 + _globals['_LISTCLIENTSREQUEST']._serialized_start=1634 + _globals['_LISTCLIENTSREQUEST']._serialized_end=1716 + _globals['_CLIENTLIST']._serialized_start=1718 + _globals['_CLIENTLIST']._serialized_end=1760 + _globals['_CLIENT']._serialized_start=1762 + _globals['_CLIENT']._serialized_end=1810 + _globals['_REASSIGNREQUEST']._serialized_start=1812 + _globals['_REASSIGNREQUEST']._serialized_end=1921 + _globals['_RECONNECTREQUEST']._serialized_start=1923 + _globals['_RECONNECTREQUEST']._serialized_end=2022 + _globals['_PARAMETER']._serialized_start=2024 + _globals['_PARAMETER']._serialized_end=2063 + _globals['_CONTROLREQUEST']._serialized_start=2065 + _globals['_CONTROLREQUEST']._serialized_end=2149 + _globals['_CONTROLRESPONSE']._serialized_start=2151 + _globals['_CONTROLRESPONSE']._serialized_end=2221 + _globals['_CONNECTIONREQUEST']._serialized_start=2223 + _globals['_CONNECTIONREQUEST']._serialized_end=2242 + _globals['_CONNECTIONRESPONSE']._serialized_start=2244 + _globals['_CONNECTIONRESPONSE']._serialized_end=2304 + _globals['_MODELSERVICE']._serialized_start=2859 + _globals['_MODELSERVICE']._serialized_end=2981 + _globals['_CONTROL']._serialized_start=2984 + _globals['_CONTROL']._serialized_end=3170 + _globals['_REDUCER']._serialized_start=3172 + _globals['_REDUCER']._serialized_end=3258 + _globals['_CONNECTOR']._serialized_start=3261 + _globals['_CONNECTOR']._serialized_end=3688 + _globals['_COMBINER']._serialized_start=3691 + _globals['_COMBINER']._serialized_end=4149 # @@protoc_insertion_point(module_scope) diff --git a/fedn/fedn/common/net/grpc/fedn_pb2_grpc.py b/fedn/fedn/common/net/grpc/fedn_pb2_grpc.py index 9590e2b5c..6484fea30 100644 --- a/fedn/fedn/common/net/grpc/fedn_pb2_grpc.py +++ b/fedn/fedn/common/net/grpc/fedn_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from fedn.common.net.grpc import fedn_pb2 as fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2 +from . import fedn_pb2 as fedn__pb2 class ModelServiceStub(object): @@ -15,15 +15,15 @@ def __init__(self, channel): channel: A grpc.Channel. """ self.Upload = channel.stream_unary( - '/grpc.ModelService/Upload', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, - ) + '/grpc.ModelService/Upload', + request_serializer=fedn__pb2.ModelRequest.SerializeToString, + response_deserializer=fedn__pb2.ModelResponse.FromString, + ) self.Download = channel.unary_stream( - '/grpc.ModelService/Download', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, - ) + '/grpc.ModelService/Download', + request_serializer=fedn__pb2.ModelRequest.SerializeToString, + response_deserializer=fedn__pb2.ModelResponse.FromString, + ) class ModelServiceServicer(object): @@ -44,59 +44,60 @@ def Download(self, request, context): def add_ModelServiceServicer_to_server(servicer, server): rpc_method_handlers = { - 'Upload': grpc.stream_unary_rpc_method_handler( - servicer.Upload, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelResponse.SerializeToString, - ), - 'Download': grpc.unary_stream_rpc_method_handler( - servicer.Download, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelResponse.SerializeToString, - ), + 'Upload': grpc.stream_unary_rpc_method_handler( + servicer.Upload, + request_deserializer=fedn__pb2.ModelRequest.FromString, + response_serializer=fedn__pb2.ModelResponse.SerializeToString, + ), + 'Download': grpc.unary_stream_rpc_method_handler( + servicer.Download, + request_deserializer=fedn__pb2.ModelRequest.FromString, + response_serializer=fedn__pb2.ModelResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( - 'grpc.ModelService', rpc_method_handlers) + 'grpc.ModelService', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) - # This class is part of an EXPERIMENTAL API. + + class ModelService(object): """Missing associated documentation comment in .proto file.""" @staticmethod def Upload(request_iterator, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.stream_unary(request_iterator, target, '/grpc.ModelService/Upload', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ModelRequest.SerializeToString, + fedn__pb2.ModelResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def Download(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_stream(request, target, '/grpc.ModelService/Download', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ModelRequest.SerializeToString, + fedn__pb2.ModelResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) class ControlStub(object): @@ -109,30 +110,20 @@ def __init__(self, channel): channel: A grpc.Channel. """ self.Start = channel.unary_unary( - '/grpc.Control/Start', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, - ) + '/grpc.Control/Start', + request_serializer=fedn__pb2.ControlRequest.SerializeToString, + response_deserializer=fedn__pb2.ControlResponse.FromString, + ) self.Stop = channel.unary_unary( - '/grpc.Control/Stop', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, - ) - self.Configure = channel.unary_unary( - '/grpc.Control/Configure', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReportResponse.FromString, - ) + '/grpc.Control/Stop', + request_serializer=fedn__pb2.ControlRequest.SerializeToString, + response_deserializer=fedn__pb2.ControlResponse.FromString, + ) self.FlushAggregationQueue = channel.unary_unary( - '/grpc.Control/FlushAggregationQueue', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, - ) - self.Report = channel.unary_unary( - '/grpc.Control/Report', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReportResponse.FromString, - ) + '/grpc.Control/FlushAggregationQueue', + request_serializer=fedn__pb2.ControlRequest.SerializeToString, + response_deserializer=fedn__pb2.ControlResponse.FromString, + ) class ControlServicer(object): @@ -150,146 +141,91 @@ def Stop(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def Configure(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - def FlushAggregationQueue(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def Report(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - def add_ControlServicer_to_server(servicer, server): rpc_method_handlers = { - 'Start': grpc.unary_unary_rpc_method_handler( - servicer.Start, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, - ), - 'Stop': grpc.unary_unary_rpc_method_handler( - servicer.Stop, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, - ), - 'Configure': grpc.unary_unary_rpc_method_handler( - servicer.Configure, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReportResponse.SerializeToString, - ), - 'FlushAggregationQueue': grpc.unary_unary_rpc_method_handler( - servicer.FlushAggregationQueue, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, - ), - 'Report': grpc.unary_unary_rpc_method_handler( - servicer.Report, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReportResponse.SerializeToString, - ), + 'Start': grpc.unary_unary_rpc_method_handler( + servicer.Start, + request_deserializer=fedn__pb2.ControlRequest.FromString, + response_serializer=fedn__pb2.ControlResponse.SerializeToString, + ), + 'Stop': grpc.unary_unary_rpc_method_handler( + servicer.Stop, + request_deserializer=fedn__pb2.ControlRequest.FromString, + response_serializer=fedn__pb2.ControlResponse.SerializeToString, + ), + 'FlushAggregationQueue': grpc.unary_unary_rpc_method_handler( + servicer.FlushAggregationQueue, + request_deserializer=fedn__pb2.ControlRequest.FromString, + response_serializer=fedn__pb2.ControlResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( - 'grpc.Control', rpc_method_handlers) + 'grpc.Control', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) - # This class is part of an EXPERIMENTAL API. + + class Control(object): """Missing associated documentation comment in .proto file.""" @staticmethod def Start(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Control/Start', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ControlRequest.SerializeToString, + fedn__pb2.ControlResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def Stop(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Control/Stop', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def Configure(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Control/Configure', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReportResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ControlRequest.SerializeToString, + fedn__pb2.ControlResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def FlushAggregationQueue(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Control/FlushAggregationQueue', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def Report(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Control/Report', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReportResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ControlRequest.SerializeToString, + fedn__pb2.ControlResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) class ReducerStub(object): @@ -302,10 +238,10 @@ def __init__(self, channel): channel: A grpc.Channel. """ self.GetGlobalModel = channel.unary_unary( - '/grpc.Reducer/GetGlobalModel', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.FromString, - ) + '/grpc.Reducer/GetGlobalModel', + request_serializer=fedn__pb2.GetGlobalModelRequest.SerializeToString, + response_deserializer=fedn__pb2.GetGlobalModelResponse.FromString, + ) class ReducerServicer(object): @@ -320,37 +256,38 @@ def GetGlobalModel(self, request, context): def add_ReducerServicer_to_server(servicer, server): rpc_method_handlers = { - 'GetGlobalModel': grpc.unary_unary_rpc_method_handler( - servicer.GetGlobalModel, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.SerializeToString, - ), + 'GetGlobalModel': grpc.unary_unary_rpc_method_handler( + servicer.GetGlobalModel, + request_deserializer=fedn__pb2.GetGlobalModelRequest.FromString, + response_serializer=fedn__pb2.GetGlobalModelResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( - 'grpc.Reducer', rpc_method_handlers) + 'grpc.Reducer', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) - # This class is part of an EXPERIMENTAL API. + + class Reducer(object): """Missing associated documentation comment in .proto file.""" @staticmethod def GetGlobalModel(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Reducer/GetGlobalModel', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.GetGlobalModelRequest.SerializeToString, + fedn__pb2.GetGlobalModelResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) class ConnectorStub(object): @@ -363,40 +300,40 @@ def __init__(self, channel): channel: A grpc.Channel. """ self.AllianceStatusStream = channel.unary_stream( - '/grpc.Connector/AllianceStatusStream', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Status.FromString, - ) + '/grpc.Connector/AllianceStatusStream', + request_serializer=fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=fedn__pb2.Status.FromString, + ) self.SendStatus = channel.unary_unary( - '/grpc.Connector/SendStatus', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Status.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - ) + '/grpc.Connector/SendStatus', + request_serializer=fedn__pb2.Status.SerializeToString, + response_deserializer=fedn__pb2.Response.FromString, + ) self.ListActiveClients = channel.unary_unary( - '/grpc.Connector/ListActiveClients', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ListClientsRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientList.FromString, - ) + '/grpc.Connector/ListActiveClients', + request_serializer=fedn__pb2.ListClientsRequest.SerializeToString, + response_deserializer=fedn__pb2.ClientList.FromString, + ) self.AcceptingClients = channel.unary_unary( - '/grpc.Connector/AcceptingClients', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ConnectionRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ConnectionResponse.FromString, - ) + '/grpc.Connector/AcceptingClients', + request_serializer=fedn__pb2.ConnectionRequest.SerializeToString, + response_deserializer=fedn__pb2.ConnectionResponse.FromString, + ) self.SendHeartbeat = channel.unary_unary( - '/grpc.Connector/SendHeartbeat', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Heartbeat.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - ) + '/grpc.Connector/SendHeartbeat', + request_serializer=fedn__pb2.Heartbeat.SerializeToString, + response_deserializer=fedn__pb2.Response.FromString, + ) self.ReassignClient = channel.unary_unary( - '/grpc.Connector/ReassignClient', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReassignRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - ) + '/grpc.Connector/ReassignClient', + request_serializer=fedn__pb2.ReassignRequest.SerializeToString, + response_deserializer=fedn__pb2.Response.FromString, + ) self.ReconnectClient = channel.unary_unary( - '/grpc.Connector/ReconnectClient', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReconnectRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - ) + '/grpc.Connector/ReconnectClient', + request_serializer=fedn__pb2.ReconnectRequest.SerializeToString, + response_deserializer=fedn__pb2.Response.FromString, + ) class ConnectorServicer(object): @@ -452,169 +389,170 @@ def ReconnectClient(self, request, context): def add_ConnectorServicer_to_server(servicer, server): rpc_method_handlers = { - 'AllianceStatusStream': grpc.unary_stream_rpc_method_handler( - servicer.AllianceStatusStream, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Status.SerializeToString, - ), - 'SendStatus': grpc.unary_unary_rpc_method_handler( - servicer.SendStatus, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Status.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.SerializeToString, - ), - 'ListActiveClients': grpc.unary_unary_rpc_method_handler( - servicer.ListActiveClients, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ListClientsRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientList.SerializeToString, - ), - 'AcceptingClients': grpc.unary_unary_rpc_method_handler( - servicer.AcceptingClients, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ConnectionRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ConnectionResponse.SerializeToString, - ), - 'SendHeartbeat': grpc.unary_unary_rpc_method_handler( - servicer.SendHeartbeat, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Heartbeat.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.SerializeToString, - ), - 'ReassignClient': grpc.unary_unary_rpc_method_handler( - servicer.ReassignClient, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReassignRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.SerializeToString, - ), - 'ReconnectClient': grpc.unary_unary_rpc_method_handler( - servicer.ReconnectClient, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReconnectRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.SerializeToString, - ), + 'AllianceStatusStream': grpc.unary_stream_rpc_method_handler( + servicer.AllianceStatusStream, + request_deserializer=fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=fedn__pb2.Status.SerializeToString, + ), + 'SendStatus': grpc.unary_unary_rpc_method_handler( + servicer.SendStatus, + request_deserializer=fedn__pb2.Status.FromString, + response_serializer=fedn__pb2.Response.SerializeToString, + ), + 'ListActiveClients': grpc.unary_unary_rpc_method_handler( + servicer.ListActiveClients, + request_deserializer=fedn__pb2.ListClientsRequest.FromString, + response_serializer=fedn__pb2.ClientList.SerializeToString, + ), + 'AcceptingClients': grpc.unary_unary_rpc_method_handler( + servicer.AcceptingClients, + request_deserializer=fedn__pb2.ConnectionRequest.FromString, + response_serializer=fedn__pb2.ConnectionResponse.SerializeToString, + ), + 'SendHeartbeat': grpc.unary_unary_rpc_method_handler( + servicer.SendHeartbeat, + request_deserializer=fedn__pb2.Heartbeat.FromString, + response_serializer=fedn__pb2.Response.SerializeToString, + ), + 'ReassignClient': grpc.unary_unary_rpc_method_handler( + servicer.ReassignClient, + request_deserializer=fedn__pb2.ReassignRequest.FromString, + response_serializer=fedn__pb2.Response.SerializeToString, + ), + 'ReconnectClient': grpc.unary_unary_rpc_method_handler( + servicer.ReconnectClient, + request_deserializer=fedn__pb2.ReconnectRequest.FromString, + response_serializer=fedn__pb2.Response.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( - 'grpc.Connector', rpc_method_handlers) + 'grpc.Connector', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) - # This class is part of an EXPERIMENTAL API. + + class Connector(object): """Missing associated documentation comment in .proto file.""" @staticmethod def AllianceStatusStream(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_stream(request, target, '/grpc.Connector/AllianceStatusStream', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Status.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ClientAvailableMessage.SerializeToString, + fedn__pb2.Status.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def SendStatus(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Connector/SendStatus', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Status.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.Status.SerializeToString, + fedn__pb2.Response.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def ListActiveClients(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Connector/ListActiveClients', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ListClientsRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientList.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ListClientsRequest.SerializeToString, + fedn__pb2.ClientList.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def AcceptingClients(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Connector/AcceptingClients', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ConnectionRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ConnectionResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ConnectionRequest.SerializeToString, + fedn__pb2.ConnectionResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def SendHeartbeat(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Connector/SendHeartbeat', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Heartbeat.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.Heartbeat.SerializeToString, + fedn__pb2.Response.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def ReassignClient(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Connector/ReassignClient', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReassignRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ReassignRequest.SerializeToString, + fedn__pb2.Response.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def ReconnectClient(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Connector/ReconnectClient', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ReconnectRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ReconnectRequest.SerializeToString, + fedn__pb2.Response.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) class CombinerStub(object): @@ -627,45 +565,35 @@ def __init__(self, channel): channel: A grpc.Channel. """ self.ModelUpdateRequestStream = channel.unary_stream( - '/grpc.Combiner/ModelUpdateRequestStream', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.FromString, - ) + '/grpc.Combiner/ModelUpdateRequestStream', + request_serializer=fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=fedn__pb2.ModelUpdateRequest.FromString, + ) self.ModelUpdateStream = channel.unary_stream( - '/grpc.Combiner/ModelUpdateStream', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, - ) + '/grpc.Combiner/ModelUpdateStream', + request_serializer=fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=fedn__pb2.ModelUpdate.FromString, + ) self.ModelValidationRequestStream = channel.unary_stream( - '/grpc.Combiner/ModelValidationRequestStream', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidationRequest.FromString, - ) + '/grpc.Combiner/ModelValidationRequestStream', + request_serializer=fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=fedn__pb2.ModelValidationRequest.FromString, + ) self.ModelValidationStream = channel.unary_stream( - '/grpc.Combiner/ModelValidationStream', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, - ) - self.SendModelUpdateRequest = channel.unary_unary( - '/grpc.Combiner/SendModelUpdateRequest', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - ) + '/grpc.Combiner/ModelValidationStream', + request_serializer=fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=fedn__pb2.ModelValidation.FromString, + ) self.SendModelUpdate = channel.unary_unary( - '/grpc.Combiner/SendModelUpdate', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - ) - self.SendModelValidationRequest = channel.unary_unary( - '/grpc.Combiner/SendModelValidationRequest', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidationRequest.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - ) + '/grpc.Combiner/SendModelUpdate', + request_serializer=fedn__pb2.ModelUpdate.SerializeToString, + response_deserializer=fedn__pb2.Response.FromString, + ) self.SendModelValidation = channel.unary_unary( - '/grpc.Combiner/SendModelValidation', - request_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, - response_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - ) + '/grpc.Combiner/SendModelValidation', + request_serializer=fedn__pb2.ModelValidation.SerializeToString, + response_deserializer=fedn__pb2.Response.FromString, + ) class CombinerServicer(object): @@ -696,24 +624,12 @@ def ModelValidationStream(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def SendModelUpdateRequest(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - def SendModelUpdate(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def SendModelValidationRequest(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - def SendModelValidation(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) @@ -723,188 +639,145 @@ def SendModelValidation(self, request, context): def add_CombinerServicer_to_server(servicer, server): rpc_method_handlers = { - 'ModelUpdateRequestStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelUpdateRequestStream, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.SerializeToString, - ), - 'ModelUpdateStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelUpdateStream, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, - ), - 'ModelValidationRequestStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelValidationRequestStream, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidationRequest.SerializeToString, - ), - 'ModelValidationStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelValidationStream, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, - ), - 'SendModelUpdateRequest': grpc.unary_unary_rpc_method_handler( - servicer.SendModelUpdateRequest, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.SerializeToString, - ), - 'SendModelUpdate': grpc.unary_unary_rpc_method_handler( - servicer.SendModelUpdate, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.SerializeToString, - ), - 'SendModelValidationRequest': grpc.unary_unary_rpc_method_handler( - servicer.SendModelValidationRequest, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidationRequest.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.SerializeToString, - ), - 'SendModelValidation': grpc.unary_unary_rpc_method_handler( - servicer.SendModelValidation, - request_deserializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, - response_serializer=fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.SerializeToString, - ), + 'ModelUpdateRequestStream': grpc.unary_stream_rpc_method_handler( + servicer.ModelUpdateRequestStream, + request_deserializer=fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=fedn__pb2.ModelUpdateRequest.SerializeToString, + ), + 'ModelUpdateStream': grpc.unary_stream_rpc_method_handler( + servicer.ModelUpdateStream, + request_deserializer=fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=fedn__pb2.ModelUpdate.SerializeToString, + ), + 'ModelValidationRequestStream': grpc.unary_stream_rpc_method_handler( + servicer.ModelValidationRequestStream, + request_deserializer=fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=fedn__pb2.ModelValidationRequest.SerializeToString, + ), + 'ModelValidationStream': grpc.unary_stream_rpc_method_handler( + servicer.ModelValidationStream, + request_deserializer=fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=fedn__pb2.ModelValidation.SerializeToString, + ), + 'SendModelUpdate': grpc.unary_unary_rpc_method_handler( + servicer.SendModelUpdate, + request_deserializer=fedn__pb2.ModelUpdate.FromString, + response_serializer=fedn__pb2.Response.SerializeToString, + ), + 'SendModelValidation': grpc.unary_unary_rpc_method_handler( + servicer.SendModelValidation, + request_deserializer=fedn__pb2.ModelValidation.FromString, + response_serializer=fedn__pb2.Response.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( - 'grpc.Combiner', rpc_method_handlers) + 'grpc.Combiner', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) - # This class is part of an EXPERIMENTAL API. + + class Combiner(object): """Missing associated documentation comment in .proto file.""" @staticmethod def ModelUpdateRequestStream(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelUpdateRequestStream', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ClientAvailableMessage.SerializeToString, + fedn__pb2.ModelUpdateRequest.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def ModelUpdateStream(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelUpdateStream', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ClientAvailableMessage.SerializeToString, + fedn__pb2.ModelUpdate.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def ModelValidationRequestStream(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelValidationRequestStream', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidationRequest.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ClientAvailableMessage.SerializeToString, + fedn__pb2.ModelValidationRequest.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def ModelValidationStream(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelValidationStream', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def SendModelUpdateRequest(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SendModelUpdateRequest', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ClientAvailableMessage.SerializeToString, + fedn__pb2.ModelValidation.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def SendModelUpdate(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SendModelUpdate', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def SendModelValidationRequest(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SendModelValidationRequest', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidationRequest.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ModelUpdate.SerializeToString, + fedn__pb2.Response.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def SendModelValidation(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SendModelValidation', - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, - fedn_dot_common_dot_net_dot_grpc_dot_fedn__pb2.Response.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + fedn__pb2.ModelValidation.SerializeToString, + fedn__pb2.Response.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/fedn/fedn/common/net/grpc/server.py b/fedn/fedn/common/net/grpc/server.py index 03acb9610..e7289117f 100644 --- a/fedn/fedn/common/net/grpc/server.py +++ b/fedn/fedn/common/net/grpc/server.py @@ -3,6 +3,8 @@ import grpc import fedn.common.net.grpc.fedn_pb2_grpc as rpc +from fedn.common.log_config import (logger, set_log_level_from_string, + set_log_stream) class Server: @@ -10,6 +12,9 @@ class Server: def __init__(self, servicer, modelservicer, config): + set_log_level_from_string(config.get('verbosity', "INFO")) + set_log_stream(config.get('logfile', None)) + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=350)) self.certificate = None @@ -25,22 +30,21 @@ def __init__(self, servicer, modelservicer, config): rpc.add_ControlServicer_to_server(servicer, self.server) if config['secure']: - print("Creating secure gRPCS server using certificate: {config['certificate']}", flush=True) + logger.info(f'Creating secure gRPCS server using certificate: {config["certificate"]}') server_credentials = grpc.ssl_server_credentials( ((config['key'], config['certificate'],),)) self.server.add_secure_port( '[::]:' + str(config['port']), server_credentials) else: - print("Creating insecure gRPC server", flush=True) + logger.info("Creating insecure gRPC server") self.server.add_insecure_port('[::]:' + str(config['port'])) def start(self): - """ Start gRPC server. """ - - print("Server started", flush=True) + """ Start the gRPC server.""" + logger.info("gRPC Server started") self.server.start() def stop(self): - """ Stop gRPC server. """ - + """ Stop the gRPC server.""" + logger.info("gRPC Server stopped") self.server.stop(0) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index e56462493..3cea06668 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -35,19 +35,6 @@ def _to_dict(self): data = {"name": self.name} return data - def _get_combiner_report(self, combiner_id): - """Get report response from combiner. - - :param combiner_id: The combiner id to get report response from. - :type combiner_id: str - ::return: The report response from combiner. - ::rtype: dict - """ - # Get CombinerInterface (fedn.network.combiner.inferface.CombinerInterface) for combiner_id - combiner = self.control.network.get_combiner(combiner_id) - report = combiner.report - return report - def _allowed_file_extension( self, filename, ALLOWED_EXTENSIONS={"gz", "bz2", "tar", "zip", "tgz"} ): @@ -91,30 +78,6 @@ def get_clients(self, limit=None, skip=None, status=False): return jsonify(result) - def get_active_clients(self, combiner_id): - """Get all active clients, i.e that are assigned to a combiner. - A report request to the combiner is neccessary to determine if a client is active or not. - - :param combiner_id: The combiner id to get active clients for. - :type combiner_id: str - :return: All active clients as a json response. - :rtype: :class:`flask.Response` - """ - # Get combiner interface object - combiner = self.control.network.get_combiner(combiner_id) - if combiner is None: - return ( - jsonify( - { - "success": False, - "message": f"Combiner {combiner_id} not found.", - } - ), - 404, - ) - response = combiner.list_active_clients() - return response - def get_all_combiners(self, limit=None, skip=None): """Get all combiners from the statestore. @@ -154,7 +117,6 @@ def get_combiner(self, combiner_id): "fqdn": object["fqdn"], "parent_reducer": object["parent"]["name"], "port": object["port"], - "report": object["report"], "updated_at": object["updated_at"], } payload[id] = info @@ -832,12 +794,20 @@ def start_session( {"success": False, "message": "A session is already running."} ) + # Check that initial (seed) model is set + if not self.statestore.get_initial_model(): + return jsonify( + { + "success": False, + "message": "No initial model set. Set initial model before starting session.", + } + ) + # Check available clients per combiner clients_available = 0 for combiner in self.control.network.get_combiners(): try: - combiner_state = combiner.report() - nr_active_clients = combiner_state["nr_active_clients"] + nr_active_clients = len(combiner.list_active_clients()) clients_available = clients_available + int(nr_active_clients) except CombinerUnavailableError as e: # TODO: Handle unavailable combiner, stop session or continue? diff --git a/fedn/fedn/network/api/network.py b/fedn/fedn/network/api/network.py index 6fcaad053..a4d4c7299 100644 --- a/fedn/fedn/network/api/network.py +++ b/fedn/fedn/network/api/network.py @@ -1,7 +1,6 @@ import base64 -from fedn.network.combiner.interfaces import (CombinerInterface, - CombinerUnavailableError) +from fedn.network.combiner.interfaces import CombinerInterface from fedn.network.loadbalancer.leastpacked import LeastPacked __all__ = 'Network', @@ -154,18 +153,3 @@ def get_client_info(self): :rtype: list(ObjectId) """ return self.statestore.list_clients() - - def describe(self): - """ Describe the network. - - :return: The network description - :rtype: dict - """ - network = [] - for combiner in self.get_combiners(): - try: - network.append(combiner.report()) - except CombinerUnavailableError: - # TODO, do better here. - pass - return network diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index b45a71c91..4769c371e 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -441,29 +441,37 @@ def _listen_to_model_update_request_stream(self): # Add client to metadata self._add_grpc_metadata('client', self.name) - while True: + while self._attached: try: for request in self.combinerStub.ModelUpdateRequestStream(r, metadata=self.metadata): + if request: + logger.debug("Received model update request from combiner: {}.".format(request)) if request.sender.role == fedn.COMBINER: # Process training request self._send_status("Received model update request.", log_level=fedn.Status.AUDIT, type=fedn.StatusType.MODEL_UPDATE_REQUEST, request=request) + logger.info("Received model update request.") self.inbox.put(('train', request)) - if not self._attached: - return except grpc.RpcError as e: - _ = e.code() - except grpc.RpcError: - # TODO: make configurable - timeout = 5 - time.sleep(timeout) - except Exception: - raise + # Handle gRPC errors + status_code = e.code() + if status_code == grpc.StatusCode.UNAVAILABLE: + logger.warning("GRPC server unavailable during model update request stream. Retrying.") + # Retry after a delay + time.sleep(5) + else: + # Log the error and continue + logger.error(f"An error occurred during model update request stream: {e}") - if not self._attached: - return + except Exception as ex: + # Handle other exceptions + logger.error(f"An error occurred during model update request stream: {ex}") + + # Detach if not attached + if not self._attached: + return def _listen_to_model_validation_request_stream(self): """Subscribe to the model validation request stream. @@ -479,17 +487,27 @@ def _listen_to_model_validation_request_stream(self): try: for request in self.combinerStub.ModelValidationRequestStream(r, metadata=self.metadata): # Process validation request - _ = request.model_id - self._send_status("Recieved model validation request.", log_level=fedn.Status.AUDIT, - type=fedn.StatusType.MODEL_VALIDATION_REQUEST, request=request) + model_id = request.model_id + self._send_status("Received model validation request for model_id {}".format(model_id), + log_level=fedn.Status.AUDIT, type=fedn.StatusType.MODEL_VALIDATION_REQUEST, + request=request) + logger.info("Received model validation request for model_id {}".format(model_id)) self.inbox.put(('validate', request)) - except grpc.RpcError: - # TODO: make configurable - timeout = 5 - time.sleep(timeout) - except Exception: - raise + except grpc.RpcError as e: + # Handle gRPC errors + status_code = e.code() + if status_code == grpc.StatusCode.UNAVAILABLE: + logger.warning("GRPC server unavailable during model validation request stream. Retrying.") + # Retry after a delay + time.sleep(5) + else: + # Log the error and continue + logger.error(f"An error occurred during model validation request stream: {e}") + + except Exception as ex: + # Handle other exceptions + logger.error(f"An error occurred during model validation request stream: {ex}") if not self._attached: return diff --git a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py index bcbb699e2..b74390c72 100644 --- a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py +++ b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py @@ -3,7 +3,7 @@ import queue from abc import ABC, abstractmethod -import fedn.common.net.grpc.fedn_pb2 as fedn +from fedn.common.log_config import logger AGGREGATOR_PLUGIN_PATH = "fedn.network.combiner.aggregators.{}" @@ -60,8 +60,7 @@ def on_model_update(self, model_update): :type model_id: str """ try: - self.server.report_status("AGGREGATOR({}): callback received model update {}".format(self.name, model_update.model_update_id), - log_level=fedn.Status.INFO) + logger.info("AGGREGATOR({}): callback received model update {}".format(self.name, model_update.model_update_id)) # Validate the update and metadata valid_update = self._validate_model_update(model_update) @@ -69,10 +68,9 @@ def on_model_update(self, model_update): # Push the model update to the processing queue self.model_updates.put(model_update) else: - self.server.report_status("AGGREGATOR({}): Invalid model update, skipping.".format(self.name)) + logger.warning("AGGREGATOR({}): Invalid model update, skipping.".format(self.name)) except Exception as e: - self.server.report_status("AGGREGATOR({}): Failed to receive model update! {}".format(self.name, e), - log_level=fedn.Status.WARNING) + logger.error("AGGREGATOR({}): Failed to receive model update! {}".format(self.name, e)) pass def _validate_model_update(self, model_update): @@ -86,7 +84,7 @@ def _validate_model_update(self, model_update): # TODO: Validate the metadata to check that it contains all variables assumed by the aggregator. data = json.loads(model_update.meta)['training_metadata'] if 'num_examples' not in data.keys(): - self.server.report_status("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name)) + logger.error("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name)) return False return True diff --git a/fedn/fedn/network/combiner/aggregators/fedavg.py b/fedn/fedn/network/combiner/aggregators/fedavg.py index 0cd15b66a..ae479676d 100644 --- a/fedn/fedn/network/combiner/aggregators/fedavg.py +++ b/fedn/fedn/network/combiner/aggregators/fedavg.py @@ -1,4 +1,4 @@ -import fedn.common.net.grpc.fedn_pb2 as fedn +from fedn.common.log_config import logger from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase @@ -50,14 +50,14 @@ def combine_models(self, helper=None, time_window=180, max_nr_models=100, delete nr_aggregated_models = 0 total_examples = 0 - self.server.report_status( + logger.info( "AGGREGATOR({}): Aggregating model updates... ".format(self.name)) while not self.model_updates.empty(): try: # Get next model from queue model_next, metadata, model_id = self.next_model_update(helper) - self.server.report_status( + logger.info( "AGGREGATOR({}): Processing model update {}, metadata: {} ".format(self.name, model_id, metadata)) # Increment total number of examples @@ -73,16 +73,15 @@ def combine_models(self, helper=None, time_window=180, max_nr_models=100, delete # Delete model from storage if delete_models: self.modelservice.models.delete(model_id) - self.server.report_status( + logger.info( "AGGREGATOR({}): Deleted model update {} from storage.".format(self.name, model_id)) self.model_updates.task_done() except Exception as e: - self.server.report_status( + logger.error( "AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e)) self.model_updates.task_done() data['nr_aggregated_models'] = nr_aggregated_models - self.server.report_status("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models), - log_level=fedn.Status.INFO) + logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models)) return model, data diff --git a/fedn/fedn/network/combiner/interfaces.py b/fedn/fedn/network/combiner/interfaces.py index 6dfb0428d..98494984a 100644 --- a/fedn/fedn/network/combiner/interfaces.py +++ b/fedn/fedn/network/combiner/interfaces.py @@ -4,7 +4,6 @@ from io import BytesIO import grpc -from google.protobuf.json_format import MessageToJson import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc @@ -137,11 +136,6 @@ def to_dict(self): data['certificate'] = str(cert_b64).split('\'')[1] data['key'] = str(key_b64).split('\'')[1] - try: - data['report'] = self.report() - except CombinerUnavailableError: - data['report'] = None - return data def to_json(self): @@ -176,55 +170,6 @@ def get_key(self): else: return None - def report(self): - """ Recieve a status report from the combiner. - - :return: A dictionary describing the combiner state. - :rtype: dict - :raises CombinerUnavailableError: If the combiner is unavailable. - """ - channel = Channel(self.address, self.port, - self.certificate).get_channel() - control = rpc.ControlStub(channel) - request = fedn.ControlRequest() - try: - response = control.Report(request) - data = {} - for p in response.parameter: - data[p.key] = p.value - return data - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.UNAVAILABLE: - raise CombinerUnavailableError - else: - raise - - def configure(self, config=None): - """ Configure the combiner. Set the parameters in config at the server. - - :param config: A dictionary containing parameters. - :type config: dict - """ - if not config: - config = self.config - channel = Channel(self.address, self.port, - self.certificate).get_channel() - control = rpc.ControlStub(channel) - - request = fedn.ControlRequest() - for key, value in config.items(): - p = request.parameter.add() - p.key = key - p.value = str(value) - - try: - control.Configure(request) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.UNAVAILABLE: - raise CombinerUnavailableError - else: - raise - def flush_model_update_queue(self): """ Reset the model update queue on the combiner. """ @@ -322,9 +267,12 @@ def allowing_clients(self): return False - def list_active_clients(self): + def list_active_clients(self, queue=1): """ List active clients. + :param queue: The channel (queue) to use (optional). Default is 1 = MODEL_UPDATE_REQUESTS channel. + see :class:`fedn.common.net.grpc.fedn_pb2.Channel` + :type channel: int :return: A list of active clients. :rtype: json """ @@ -332,6 +280,7 @@ def list_active_clients(self): self.certificate).get_channel() control = rpc.ConnectorStub(channel) request = fedn.ListClientsRequest() + request.channel = queue try: response = control.ListActiveClients(request) except grpc.RpcError as e: @@ -339,4 +288,4 @@ def list_active_clients(self): raise CombinerUnavailableError else: raise - return MessageToJson(response) + return response.client diff --git a/fedn/fedn/network/combiner/modelservice.py b/fedn/fedn/network/combiner/modelservice.py index 933253e70..1d15bf10b 100644 --- a/fedn/fedn/network/combiner/modelservice.py +++ b/fedn/fedn/network/combiner/modelservice.py @@ -4,6 +4,7 @@ import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc +from fedn.common.log_config import logger from fedn.network.storage.models.tempmodelstorage import TempModelStorage CHUNK_SIZE = 1024 * 1024 @@ -141,7 +142,7 @@ def Upload(self, request_iterator, context): :return: A model response object. :rtype: :class:`fedn.common.net.grpc.fedn_pb2.ModelResponse` """ - + logger.debug("grpc.ModelService.Upload: Called") result = None for request in request_iterator: if request.status == fedn.ModelStatus.IN_PROGRESS: @@ -167,12 +168,13 @@ def Download(self, request, context): :return: A model response iterator. :rtype: :class:`fedn.common.net.grpc.fedn_pb2.ModelResponse` """ + logger.debug("grpc.ModelService.Download: Called") try: if self.models.get_model_metadata(request.id) != fedn.ModelStatus.OK: - print("Error file is not ready", flush=True) + logger.error("Error file is not ready") yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED) except Exception: - print("Error file does not exist: {}".format(request.id), flush=True) + logger.error("Error file does not exist: {}".format(request.id)) yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED) try: @@ -185,4 +187,4 @@ def Download(self, request, context): return yield fedn.ModelResponse(id=request.id, data=piece, status=fedn.ModelStatus.IN_PROGRESS) except Exception as e: - print("Downloading went wrong: {} {}".format(request.id, e), flush=True) + logger.error("Downloading went wrong: {} {}".format(request.id, e)) diff --git a/fedn/fedn/network/combiner/round.py b/fedn/fedn/network/combiner/round.py index 1414f54ac..769220c52 100644 --- a/fedn/fedn/network/combiner/round.py +++ b/fedn/fedn/network/combiner/round.py @@ -4,6 +4,7 @@ import time import uuid +from fedn.common.log_config import logger from fedn.network.combiner.aggregators.aggregatorbase import get_aggregator from fedn.utils.helpers import get_helper @@ -49,8 +50,8 @@ def push_round_config(self, round_config): round_config['_job_id'] = str(uuid.uuid4()) self.round_configs.put(round_config) except Exception: - self.server.report_status( - "ROUNDCONTROL: Failed to push round config.", flush=True) + logger.warning( + "ROUNDCONTROL: Failed to push round config.") raise return round_config['_job_id'] @@ -68,7 +69,7 @@ def load_model_update(self, helper, model_id): try: model = self.modelservice.load_model_from_BytesIO(model_str.getbuffer(), helper) except IOError: - self.server.report_status( + logger.warning( "AGGREGATOR({}): Failed to load model!".format(self.name)) else: raise ModelUpdateError("Failed to load model.") @@ -95,8 +96,8 @@ def load_model_update_str(self, model_id, retry=3): while tries < retry: tries += 1 if not model_str or sys.getsizeof(model_str) == 80: - self.server.report_status( - "ROUNDCONTROL: Model download failed. retrying", flush=True) + logger.warning( + "ROUNDCONTROL: Model download failed. retrying") time.sleep(1) model_str = self.modelservice.get_model(model_id) @@ -139,7 +140,7 @@ def _training_round(self, config, clients): :rtype: model, dict """ - self.server.report_status( + logger.info( "ROUNDCONTROL: Initiating training round, participating clients: {}".format(clients)) meta = {} @@ -165,7 +166,7 @@ def _training_round(self, config, clients): try: helper = get_helper(config['helper_type']) - print("ROUNDCONTROL: Config delete_models_storage: {}".format(config['delete_models_storage']), flush=True) + logger.info("ROUNDCONTROL: Config delete_models_storage: {}".format(config['delete_models_storage'])) if config['delete_models_storage'] == 'True': delete_models = True else: @@ -173,7 +174,7 @@ def _training_round(self, config, clients): model, data = self.aggregator.combine_models(helper=helper, delete_models=delete_models) except Exception as e: - print("AGGREGATION FAILED AT COMBINER! {}".format(e), flush=True) + logger.warning("AGGREGATION FAILED AT COMBINER! {}".format(e)) meta['time_combination'] = time.time() - tic meta['aggregation_time'] = data @@ -204,9 +205,9 @@ def stage_model(self, model_id, timeout_retry=3, retry=2): # If the model is already in memory at the server we do not need to do anything. if self.modelservice.models.exist(model_id): - print("ROUNDCONTROL: Model already exists in memory, skipping model staging.", flush=True) + logger.info("ROUNDCONTROL: Model already exists in memory, skipping model staging.") return - print("ROUNDCONTROL: Model Staging, fetching model from storage...", flush=True) + logger.info("ROUNDCONTROL: Model Staging, fetching model from storage...") # If not, download it and stage it in memory at the combiner. tries = 0 while True: @@ -215,15 +216,13 @@ def stage_model(self, model_id, timeout_retry=3, retry=2): if model: break except Exception: - self.server.report_status("ROUNDCONTROL: Could not fetch model from storage backend, retrying.", - flush=True) + logger.info("ROUNDCONTROL: Could not fetch model from storage backend, retrying.") time.sleep(timeout_retry) tries += 1 if tries > retry: - self.server.report_status( - "ROUNDCONTROL: Failed to stage model {} from storage backend!".format(model_id), flush=True) + logger.info( + "ROUNDCONTROL: Failed to stage model {} from storage backend!".format(model_id)) raise - return self.modelservice.set_model(model, model_id) @@ -243,8 +242,8 @@ def _assign_round_clients(self, n, type="trainers"): elif type == "trainers": clients = self.server.get_active_trainers() else: - self.server.report_status( - "ROUNDCONTROL(ERROR): {} is not a supported type of client".format(type), flush=True) + logger.info( + "ROUNDCONTROL(ERROR): {} is not a supported type of client".format(type)) raise # If the number of requested trainers exceeds the number of available, use all available. @@ -275,9 +274,9 @@ def _check_nr_round_clients(self, config, timeout=0.0): if active >= int(config['clients_requested']): return True else: - self.server.report_status("waiting for {} clients to get started, currently: {}".format( + logger.info("waiting for {} clients to get started, currently: {}".format( int(config['clients_requested']) - active, - active), flush=True) + active)) if t >= timeout: if active >= int(config['clients_required']): return True @@ -296,7 +295,7 @@ def execute_validation_round(self, round_config): :type round_config: dict """ model_id = round_config['model_id'] - self.server.report_status( + logger.info( "COMBINER orchestrating validation of model {}".format(model_id)) self.stage_model(model_id) validators = self._assign_round_clients( @@ -312,8 +311,8 @@ def execute_training_round(self, config): :rtype: dict """ - self.server.report_status( - "ROUNDCONTROL: Processing training round, job_id {}".format(config['_job_id']), flush=True) + logger.info( + "ROUNDCONTROL: Processing training round, job_id {}".format(config['_job_id'])) data = {} data['config'] = config @@ -327,7 +326,7 @@ def execute_training_round(self, config): data['data'] = meta if model is None: - self.server.report_status( + logger.warning( "\t Failed to update global model in round {0}!".format(config['round_id'])) if model is not None: @@ -339,8 +338,8 @@ def execute_training_round(self, config): a.close() data['model_id'] = model_id - self.server.report_status( - "ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id']), flush=True) + logger.info( + "ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id'])) return data @@ -371,14 +370,14 @@ def run(self, polling_interval=1.0): elif round_config['task'] == 'validation' or round_config['task'] == 'inference': self.execute_validation_round(round_config) else: - self.server.report_status( - "ROUNDCONTROL: Round config contains unkown task type.", flush=True) + logger.warning( + "ROUNDCONTROL: Round config contains unkown task type.") else: round_meta = {} round_meta['status'] = "Failed" round_meta['reason'] = "Failed to meet client allocation requirements for this round config." - self.server.report_status( - "ROUNDCONTROL: {0}".format(round_meta['reason']), flush=True) + logger.warning( + "ROUNDCONTROL: {0}".format(round_meta['reason'])) self.round_configs.task_done() except queue.Empty: diff --git a/fedn/fedn/network/combiner/server.py b/fedn/fedn/network/combiner/server.py index 0ae1e5bde..4cd877890 100644 --- a/fedn/fedn/network/combiner/server.py +++ b/fedn/fedn/network/combiner/server.py @@ -12,6 +12,8 @@ import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc +from fedn.common.log_config import (logger, set_log_level_from_string, + set_log_stream) from fedn.common.net.grpc.server import Server from fedn.network.combiner.connect import ConnectorCombiner, Status from fedn.network.combiner.modelservice import ModelService @@ -58,6 +60,9 @@ class Combiner(rpc.CombinerServicer, rpc.ReducerServicer, rpc.ConnectorServicer, def __init__(self, config): """ Initialize Combiner server.""" + set_log_level_from_string(config.get('verbosity', "INFO")) + set_log_stream(config.get('logfile', None)) + # Client queues self.clients = {} @@ -83,26 +88,23 @@ def __init__(self, config): secure=config['secure'], verify=config['verify']) - response = None while True: - # announce combiner to discover service + # Announce combiner to discover service status, response = announce_client.announce() if status == Status.TryAgain: - print(response, flush=True) + logger.info(response) time.sleep(5) - continue - if status == Status.Assigned: + elif status == Status.Assigned: announce_config = response - print( - "COMBINER {0}: Announced successfully".format(self.id), flush=True) + logger.info("COMBINER {0}: Announced successfully".format(self.id)) break - if status == Status.UnAuthorized: - print(response, flush=True) - print("Status.UnAuthorized", flush=True) + elif status == Status.UnAuthorized: + logger.info(response) + logger.info("Status.UnAuthorized") sys.exit("Exiting: Unauthorized") - if status == Status.UnMatchedConfig: - print(response, flush=True) - print("Status.UnMatchedConfig", flush=True) + elif status == Status.UnMatchedConfig: + logger.info(response) + logger.info("Status.UnMatchedConfig") sys.exit("Exiting: Missing config") cert = announce_config['certificate'] @@ -137,6 +139,9 @@ def __init__(self, config): # Start thread for round controller threading.Thread(target=self.control.run, daemon=True).start() + # Start thread for client status updates: TODO: Should be configurable + threading.Thread(target=self._deamon_thread_client_status, daemon=True).start() + # Start the gRPC server self.server.start() @@ -154,23 +159,6 @@ def __whoami(self, client, instance): client.role = role_to_proto_role(instance.role) return client - def report_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None, flush=True): - """ Report status of the combiner. - - :param msg: the message to report - :type msg: str - :param log_level: the log level to report at - :type log_level: :class:`fedn.common.net.grpc.fedn_pb2.Status` - :param type: the type of status to report - :type type: :class:`fedn.common.net.grpc.fedn_pb2.Status.Type` - :param request: the request to report status for - :type request: :class:`fedn.common.net.grpc.fedn_pb2.Request` - :param flush: whether to flush the message to stdout - :type flush: bool - """ - print("{}:COMBINER({}):{} {}".format(datetime.now().strftime( - '%Y-%m-%d %H:%M:%S'), self.id, log_level, msg), flush=flush) - def request_model_update(self, config, clients=[]): """ Ask clients to update the current global model. @@ -180,25 +168,30 @@ def request_model_update(self, config, clients=[]): :type clients: list """ - + # The request to be added to the client queue request = fedn.ModelUpdateRequest() - self.__whoami(request.sender, self) request.model_id = config['model_id'] request.correlation_id = str(uuid.uuid4()) request.timestamp = str(datetime.now()) request.data = json.dumps(config) + request.sender.name = self.id + request.sender.role = fedn.COMBINER + if len(clients) == 0: clients = self.get_active_trainers() for client in clients: - request.receiver.name = client.name + request.receiver.name = client request.receiver.role = fedn.WORKER - _ = self.SendModelUpdateRequest(request, self) - # TODO: Check response + self._put_request_to_client_queue(request, fedn.Channel.MODEL_UPDATE_REQUESTS) - print("COMBINER: Sent model update request for model {} to clients {}".format( - request.model_id, clients), flush=True) + if len(clients) < 20: + logger.info("Sent model update request for model {} to clients {}".format( + request.model_id, clients)) + else: + logger.info("Sent model update request for model {} to {} clients".format( + request.model_id, len(clients))) def request_model_validation(self, model_id, config, clients=[]): """ Ask clients to validate the current global model. @@ -211,11 +204,10 @@ def request_model_validation(self, model_id, config, clients=[]): :type clients: list """ - + # The request to be added to the client queue request = fedn.ModelValidationRequest() - self.__whoami(request.sender, self) request.model_id = model_id - request.correlation_id = str(uuid.uuid4()) + request.correlation_id = str(uuid.uuid4()) # Obsolete? request.timestamp = str(datetime.now()) request.is_inference = (config['task'] == 'inference') @@ -223,26 +215,16 @@ def request_model_validation(self, model_id, config, clients=[]): clients = self.get_active_validators() for client in clients: - request.receiver.name = client.name + request.receiver.name = client request.receiver.role = fedn.WORKER - self.SendModelValidationRequest(request, self) - - print("COMBINER: Sent validation request for model {} to clients {}".format( - model_id, clients), flush=True) - - def _list_clients(self, channel): - """ List active clients on a channel. + self._put_request_to_client_queue(request, fedn.Channel.MODEL_VALIDATION_REQUESTS) - :param channel: the channel to list clients for, for example MODEL_UPDATE_REQUESTS - :type channel: :class:`fedn.common.net.grpc.fedn_pb2.Channel` - :return: the list of active clients - :rtype: list - """ - request = fedn.ListClientsRequest() - self.__whoami(request.sender, self) - request.channel = channel - clients = self.ListActiveClients(request, self) - return clients.client + if len(clients) < 20: + logger.info("Sent model validation request for model {} to clients {}".format( + request.model_id, clients)) + else: + logger.info("Sent model validation request for model {} to {} clients".format( + request.model_id, len(clients))) def get_active_trainers(self): """ Get a list of active trainers. @@ -250,7 +232,7 @@ def get_active_trainers(self): :return: the list of active trainers :rtype: list """ - trainers = self._list_clients(fedn.Channel.MODEL_UPDATE_REQUESTS) + trainers = self._list_active_clients(fedn.Channel.MODEL_UPDATE_REQUESTS) return trainers def get_active_validators(self): @@ -259,7 +241,7 @@ def get_active_validators(self): :return: the list of active validators :rtype: list """ - validators = self._list_clients(fedn.Channel.MODEL_VALIDATION_REQUESTS) + validators = self._list_active_clients(fedn.Channel.MODEL_VALIDATION_REQUESTS) return validators def nr_active_trainers(self): @@ -270,14 +252,6 @@ def nr_active_trainers(self): """ return len(self.get_active_trainers()) - def nr_active_validators(self): - """ Get the number of active validators. - - :return: the number of active validators - :rtype: int - """ - return len(self.get_active_validators()) - #################################################################################################################### def __join_client(self, client): @@ -287,7 +261,8 @@ def __join_client(self, client): :type client: :class:`fedn.common.net.grpc.fedn_pb2.Client` """ if client.name not in self.clients.keys(): - self.clients[client.name] = {"lastseen": datetime.now()} + # The status is set to offline by default, and will be updated once _list_active_clients is called. + self.clients[client.name] = {"lastseen": datetime.now(), "status": "offline"} def _subscribe_client_to_queue(self, client, queue_name): """ Subscribe a client to the queue. @@ -318,46 +293,81 @@ def __get_queue(self, client, queue_name): except KeyError: raise - def _send_request(self, request, queue_name): - """ Send a request to a client. + def _list_subscribed_clients(self, queue_name): + """ List all clients subscribed to a queue. - :param request: the request to send - :type request: :class:`fedn.common.net.grpc.fedn_pb2.Request` - :param queue_name: the name of the queue to send the request to + :param queue_name: the name of the queue :type queue_name: str + :return: a list of client names + :rtype: list """ - self.__route_request_to_client(request, request.receiver, queue_name) + subscribed_clients = [] + for name, client in self.clients.items(): + if queue_name in client.keys(): + subscribed_clients.append(name) + return subscribed_clients - def _broadcast_request(self, request, queue_name): - """ Publish a request to all subscribed members. + def _list_active_clients(self, channel): + """ List all clients that have sent a status message in the last 10 seconds. - :param request: the request to send - :type request: :class:`fedn.common.net.grpc.fedn_pb2.Request` - :param queue_name: the name of the queue to send the request to - :type queue_name: str + :param channel: the name of the channel + :type channel: str + :return: a list of client names + :rtype: list """ - active_clients = self._list_active_clients() - for client in active_clients: - self.clients[client.name][queue_name].put(request) + # Temporary dict to store client status + clients = { + "active_clients": [], + "update_active_clients": [], + "update_offline_clients": [], + } + for client in self._list_subscribed_clients(channel): + status = self.clients[client]["status"] + now = datetime.now() + then = self.clients[client]["lastseen"] + if (now - then) < timedelta(seconds=10): + clients["active_clients"].append(client) + # If client has changed status, update statestore + if status == "offline": + self.clients[client]["status"] = "online" + clients["update_active_clients"].append(client) + else: + # If client has changed status, update statestore + if status == "online": + self.clients[client]["status"] = "offline" + clients["update_offline_clients"].append(client) + # Update statestore with client status + if len(clients["update_active_clients"]) > 0: + self.statestore.update_client_status(clients["update_active_clients"], "online") + if len(clients["update_offline_clients"]) > 0: + self.statestore.update_client_status(clients["update_offline_clients"], "offline") + + return clients["active_clients"] + + def _deamon_thread_client_status(self, timeout=10): + """ Deamon thread that checks for inactive clients and updates statestore. """ + while True: + time.sleep(timeout) + # TODO: Also update validation clients + self._list_active_clients(fedn.Channel.MODEL_UPDATE_REQUESTS) - def __route_request_to_client(self, request, client, queue_name): - """ Route a request to a client. + def _put_request_to_client_queue(self, request, queue_name): + """ Get a client specific queue and add a request to it. + The client is identified by the request.receiver. :param request: the request to send :type request: :class:`fedn.common.net.grpc.fedn_pb2.Request` - :param client: the client to send the request to - :type client: :class:`fedn.common.net.grpc.fedn_pb2.Client` :param queue_name: the name of the queue to send the request to :type queue_name: str - - :raises Exception: if the request could not be routed, direct cause of KeyError in __get_queue """ try: - q = self.__get_queue(client, queue_name) + q = self.__get_queue(request.receiver, queue_name) q.put(request) - except Exception: - print("Failed to route request to client: {} {}", - request.receiver, queue_name) + except Exception as e: + logger.error("Failed to put request to client queue {} for client {}: {}".format( + queue_name, + request.receiver.name, + str(e))) raise def _send_status(self, status): @@ -369,17 +379,11 @@ def _send_status(self, status): self.statestore.report_status(status) - def __register_heartbeat(self, client): - """ Register a client if first time connecting. Update heartbeat timestamp. + def _flush_model_update_queue(self): + """Clear the model update queue (aggregator). - :param client: the client to register - :type client: :class:`fedn.common.net.grpc.fedn_pb2.Client` + :return: True if successful, else False """ - self.__join_client(client) - self.clients[client.name]["lastseen"] = datetime.now() - - def flush_model_update_queue(self): - """Clear the model update queue (aggregator). """ q = self.control.aggregator.model_updates try: @@ -388,7 +392,8 @@ def flush_model_update_queue(self): q.all_tasks_done.notify_all() q.unfinished_tasks = 0 return True - except Exception: + except Exception as e: + logger.error("Failed to flush model update queue: %s", str(e)) return False ##################################################################################################################### @@ -405,13 +410,16 @@ def Start(self, control: fedn.ControlRequest, context): :return: the control response :rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse` """ - print("\nRECIEVED **START** from Controller {}\n".format(control.command), flush=True) + logger.info("grpc.Combiner.Start: Starting round") config = {} for parameter in control.parameter: config.update({parameter.key: parameter.value}) + logger.debug("grpc.Combiner.Start: Round config {}".format(config)) + job_id = self.control.push_round_config(config) + logger.info("grcp.Combiner.Start: Pushed round config (job_id): {}".format(job_id)) response = fedn.ControlResponse() p = response.parameter.add() @@ -420,25 +428,6 @@ def Start(self, control: fedn.ControlRequest, context): return response - # RPCs related to remote configuration of the server, round controller, - # aggregator and their states. - - def Configure(self, control: fedn.ControlRequest, context): - """ Configure the Combiner. - - :param control: the control request - :type control: :class:`fedn.common.net.grpc.fedn_pb2.ControlRequest` - :param context: the context (unused) - :type context: :class:`grpc._server._Context` - :return: the control response - :rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse` - """ - for parameter in control.parameter: - setattr(self, parameter.key, parameter.value) - - response = fedn.ControlResponse() - return response - def FlushAggregationQueue(self, control: fedn.ControlRequest, context): """ Flush the queue. @@ -449,8 +438,8 @@ def FlushAggregationQueue(self, control: fedn.ControlRequest, context): :return: the control response :rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse` """ - - status = self.flush_model_update_queue() + logger.debug("grpc.Combiner.FlushAggregationQueue: Called") + status = self._flush_model_update_queue() response = fedn.ControlResponse() if status: @@ -473,75 +462,13 @@ def Stop(self, control: fedn.ControlRequest, context): :rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse` """ response = fedn.ControlResponse() - print("\n RECIEVED **STOP** from Controller\n", flush=True) - return response - - def Report(self, control: fedn.ControlRequest, context): - """ Describe current state of the Combiner. - - :param control: the control request - :type control: :class:`fedn.common.net.grpc.fedn_pb2.ControlRequest` - :param context: the context (unused) - :type context: :class:`grpc._server._Context` - :return: the control response - :rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse` - """ - - response = fedn.ControlResponse() - self.report_status("\n RECIEVED **REPORT** from Controller\n", - log_level=fedn.Status.INFO) - - control_state = self.control.aggregator.get_state() - self.report_status("Aggregator state: {}".format(control_state), log_level=fedn.Status.INFO) - p = response.parameter.add() - for key, value in control_state.items(): - p.key = str(key) - p.value = str(value) - - active_trainers = self.get_active_trainers() - p = response.parameter.add() - p.key = "nr_active_trainers" - p.value = str(len(active_trainers)) - - active_validators = self.get_active_validators() - p = response.parameter.add() - p.key = "nr_active_validators" - p.value = str(len(active_validators)) - - active_trainers_ = self.get_active_trainers() - active_trainers = [] - for client in active_trainers_: - active_trainers.append(client) - p = response.parameter.add() - p.key = "active_trainers" - p.value = str(active_trainers) - - active_validators_ = self.get_active_validators() - active_validators = [] - for client in active_validators_: - active_validators.append(client) - p = response.parameter.add() - p.key = "active_validators" - p.value = str(active_validators) - - p = response.parameter.add() - p.key = "nr_active_clients" - p.value = str(len(active_trainers)+len(active_validators)) - - p = response.parameter.add() - p.key = "nr_unprocessed_compute_plans" - p.value = str(self.control.round_configs.qsize()) - - p = response.parameter.add() - p.key = "name" - p.value = str(self.id) - + logger.info("grpc.Combiner.Stop: Called") return response ##################################################################################################################### def SendStatus(self, status: fedn.Status, context): - """ A client stream RPC endpoint that accepts status messages. + """ A client RPC endpoint that accepts status messages. :param status: the status message :type status: :class:`fedn.common.net.grpc.fedn_pb2.Status` @@ -550,48 +477,13 @@ def SendStatus(self, status: fedn.Status, context): :return: the response :rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response` """ - + logger.debug("grpc.Combiner.SendStatus: Called") self._send_status(status) response = fedn.Response() response.response = "Status received." return response - def _list_subscribed_clients(self, queue_name): - """ List all clients subscribed to a queue. - - :param queue_name: the name of the queue - :type queue_name: str - :return: a list of client names - :rtype: list - """ - subscribed_clients = [] - for name, client in self.clients.items(): - if queue_name in client.keys(): - subscribed_clients.append(name) - return subscribed_clients - - def _list_active_clients(self, channel): - """ List all clients that have sent a status message in the last 10 seconds. - - :param channel: the name of the channel - :type channel: str - :return: a list of client names - :rtype: list - """ - active_clients = [] - for client in self._list_subscribed_clients(channel): - # This can break with different timezones. - now = datetime.now() - then = self.clients[client]["lastseen"] - # TODO: move the heartbeat timeout to config. - if (now - then) < timedelta(seconds=10): - active_clients.append(client) - return active_clients - - def _drop_inactive_clients(self): - """ TODO: Not implemented. Clean up clients that have missed the heartbeat. """ - def ListActiveClients(self, request: fedn.ListClientsRequest, context): """ RPC endpoint that returns a ClientList containing the names of all active clients. An active client has sent a status message / responded to a heartbeat @@ -606,6 +498,11 @@ def ListActiveClients(self, request: fedn.ListClientsRequest, context): """ clients = fedn.ClientList() active_clients = self._list_active_clients(request.channel) + nr_active_clients = len(active_clients) + if nr_active_clients < 20: + logger.info("grpc.Combiner.ListActiveClients: Active clients: {}".format(active_clients)) + else: + logger.info("grpc.Combiner.ListActiveClients: Number active clients: {}".format(nr_active_clients)) for client in active_clients: clients.client.append(fedn.Client(name=client, role=fedn.WORKER)) @@ -636,7 +533,7 @@ def AcceptingClients(self, request: fedn.ConnectionRequest, context): return response except Exception as e: - print("Combiner not properly configured! {}".format(e), flush=True) + logger.error("Combiner not properly configured! {}".format(e), flush=True) raise response.status = fedn.ConnectionStatus.TRY_AGAIN_LATER @@ -653,7 +550,12 @@ def SendHeartbeat(self, heartbeat: fedn.Heartbeat, context): :return: the response :rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response` """ - self.__register_heartbeat(heartbeat.sender) + logger.debug("GRPC: Received heartbeat from {}".format(heartbeat.sender.name)) + # Update the clients dict with the last seen timestamp. + client = heartbeat.sender + self.__join_client(client) + self.clients[client.name]["lastseen"] = datetime.now() + response = fedn.Response() response.sender.name = heartbeat.sender.name response.sender.role = heartbeat.sender.role @@ -701,7 +603,7 @@ def ModelUpdateRequestStream(self, response, context): metadata = context.invocation_metadata() if metadata: metadata = dict(metadata) - print("\nClient connected: {}\n".format(metadata['client']), flush=True) + logger.info("grpc.Combiner.ModelUpdateRequestStream: Client connected: {}\n".format(metadata['client'])) status = fedn.Status( status="Client {} connecting to ModelUpdateRequestStream.".format(client.name)) @@ -716,15 +618,20 @@ def ModelUpdateRequestStream(self, response, context): self._send_status(status) - self.statestore.update_client_status(client.name, "online") - + # Keep track of the time context has been active + start_time = time.time() while context.is_active(): + # Check if the context has been active for more than 10 seconds + if time.time() - start_time > 10: + self.clients[client.name]["lastseen"] = datetime.now() + # Reset the start time + start_time = time.time() try: yield q.get(timeout=1.0) except queue.Empty: pass - - self.statestore.update_client_status(client.name, "offline") + except Exception as e: + logger.error("Error in ModelUpdateRequestStream: {}".format(e)) def ModelValidationStream(self, update, context): """ Model validation stream RPC endpoint. Update status for client is connecting to stream. @@ -782,23 +689,6 @@ def ModelValidationRequestStream(self, response, context): except queue.Empty: pass - def SendModelUpdateRequest(self, request, context): - """ Send a model update request. - - :param request: the request - :type request: :class:`fedn.common.net.grpc.fedn_pb2.ModelUpdateRequest` - :param context: the context - :type context: :class:`grpc._server._Context` - :return: the response - :rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response` - """ - self._send_request(request, fedn.Channel.MODEL_UPDATE_REQUESTS) - - response = fedn.Response() - response.response = "RECEIVED ModelUpdateRequest from client {}".format( - request.sender.name) - return response # TODO Fill later - def SendModelUpdate(self, request, context): """ Send a model update response. @@ -816,23 +706,6 @@ def SendModelUpdate(self, request, context): response, response.sender.name) return response # TODO Fill later - def SendModelValidationRequest(self, request, context): - """ Send a model validation request. - - :param request: the request - :type request: :class:`fedn.common.net.grpc.fedn_pb2.ModelValidationRequest` - :param context: the context - :type context: :class:`grpc._server._Context` - :return: the response - :rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response` - """ - self._send_request(request, fedn.Channel.MODEL_VALIDATION_REQUESTS) - - response = fedn.Response() - response.response = "RECEIVED ModelValidationRequest from client {}".format( - request.sender.name) - return response # TODO Fill later - def register_model_validation(self, validation): """Register a model validation. @@ -852,8 +725,7 @@ def SendModelValidation(self, request, context): :return: the response :rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response` """ - self.report_status("Recieved ModelValidation from {}".format(request.sender.name), - log_level=fedn.Status.INFO) + logger.info("Recieved ModelValidation from {}".format(request.sender.name)) self.register_model_validation(request) @@ -867,8 +739,8 @@ def SendModelValidation(self, request, context): def run(self): """ Start the server.""" - print("COMBINER: {} started, ready for requests. ".format( - self.id), flush=True) + logger.info("COMBINER: {} started, ready for gRPC requests.".format( + self.id)) try: while True: signal.pause() diff --git a/fedn/fedn/network/combiner/server_tests.py b/fedn/fedn/network/combiner/server_tests.py new file mode 100644 index 000000000..e69de29bb diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index b68d830f1..d8bb38556 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -294,35 +294,26 @@ def get_participating_combiners(self, combiner_round_config): combiners = [] for combiner in self.network.get_combiners(): try: - combiner_state = combiner.report() + # Current gRPC endpoint only returns active clients (both trainers and validators) + nr_active_clients = len(combiner.list_active_clients()) except CombinerUnavailableError: self._handle_unavailable_combiner(combiner) - combiner_state = None + continue - if combiner_state is not None: - is_participating = self.evaluate_round_participation_policy( - combiner_round_config, combiner_state - ) - if is_participating: - combiners.append((combiner, combiner_round_config)) + is_participating = self.evaluate_round_participation_policy( + combiner_round_config, nr_active_clients + ) + if is_participating: + combiners.append((combiner, combiner_round_config)) return combiners def evaluate_round_participation_policy( - self, compute_plan, combiner_state + self, compute_plan, nr_active_clients ): """Evaluate policy for combiner round-participation. A combiner participates if it is responsive and reports enough active clients to participate in the round. """ - - if compute_plan["task"] == "training": - nr_active_clients = int(combiner_state["nr_active_trainers"]) - elif compute_plan["task"] == "validation": - nr_active_clients = int(combiner_state["nr_active_validators"]) - else: - print("Invalid task type!", flush=True) - return False - if int(compute_plan["clients_required"]) <= nr_active_clients: return True else: diff --git a/fedn/fedn/network/loadbalancer/leastpacked.py b/fedn/fedn/network/loadbalancer/leastpacked.py index 9e4aaba0d..cac7bba54 100644 --- a/fedn/fedn/network/loadbalancer/leastpacked.py +++ b/fedn/fedn/network/loadbalancer/leastpacked.py @@ -3,7 +3,7 @@ class LeastPacked(LoadBalancerBase): - """ Load balancer that selects the combiner with the least number of attached clients. + """ Load balancer that selects the combiner with the least number of attached training clients. :param network: A handle to the network. :type network: class: `fedn.network.api.network.Network` @@ -23,12 +23,13 @@ def find_combiner(self): for combiner in self.network.get_combiners(): try: if combiner.allowing_clients(): - combiner_state = combiner.report() + # Using default default Channel = 1, MODEL_UPDATE_REQUESTS + nr_active_clients = len(combiner.list_active_clients()) if not min_clients: - min_clients = combiner_state['nr_active_clients'] + min_clients = nr_active_clients selected_combiner = combiner - elif combiner_state['nr_active_clients'] < min_clients: - min_clients = combiner_state['nr_active_clients'] + elif nr_active_clients < min_clients: + min_clients = nr_active_clients selected_combiner = combiner except CombinerUnavailableError: pass diff --git a/fedn/fedn/network/storage/statestore/mongostatestore.py b/fedn/fedn/network/storage/statestore/mongostatestore.py index 2e9207a56..2e5220b6c 100644 --- a/fedn/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/fedn/network/storage/statestore/mongostatestore.py @@ -808,9 +808,8 @@ def set_round_data(self, round_id, round_data): self.rounds.update_one({'round_id': round_id}, { '$set': {'round_data': round_data}}, True) - def update_client_status(self, client_name, status): - """Update client status in statestore. - + def update_client_status(self, clients, status): + """ Update client status in statestore. :param client_name: The client name :type client_name: str :param status: The client status @@ -818,7 +817,7 @@ def update_client_status(self, client_name, status): :return: None """ datetime_now = datetime.now() - filter_query = {"name": client_name} + filter_query = {"name": {"$in": clients}} update_query = {"$set": {"last_seen": datetime_now, "status": status}} - self.clients.update_one(filter_query, update_query) + self.clients.update_many(filter_query, update_query) diff --git a/fedn/setup.py b/fedn/setup.py index 62888ce09..e973cb7c8 100644 --- a/fedn/setup.py +++ b/fedn/setup.py @@ -15,7 +15,7 @@ "urllib3>=1.26.4", "minio", "python-slugify", - "grpcio~=1.48.0", + "grpcio~=1.57.0", "grpcio-tools", "numpy>=1.21.6", "protobuf",