Skip to content

Commit

Permalink
Feature/SK-669 | Store session id on status (event) and validation ob…
Browse files Browse the repository at this point in the history
…jects (#528)

* client status log stores session id

* validations stores session id
  • Loading branch information
niklastheman authored Feb 15, 2024
1 parent 7cef838 commit d5bcfba
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 76 deletions.
30 changes: 18 additions & 12 deletions fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def _listen_to_task_stream(self):
if request.sender.role == fedn.COMBINER:
# Process training request
self._send_status("Received model update request.", log_level=fedn.Status.AUDIT,
type=fedn.StatusType.MODEL_UPDATE_REQUEST, request=request)
type=fedn.StatusType.MODEL_UPDATE_REQUEST, request=request, sesssion_id=request.session_id)
logger.info("Received model update request of type {} for model_id {}".format(request.type, request.model_id))

if request.type == fedn.StatusType.MODEL_UPDATE and self.config['trainer']:
Expand Down Expand Up @@ -463,17 +463,19 @@ def _listen_to_task_stream(self):
if not self._attached:
return

def _process_training_request(self, model_id):
def _process_training_request(self, model_id: str, session_id: str = None):
"""Process a training (model update) request.
:param model_id: The model id of the model to be updated.
:type model_id: str
:param session_id: The id of the current session
:type session_id: str
:return: The model id of the updated model, or None if the update failed. And a dict with metadata.
:rtype: tuple
"""

self._send_status(
"\t Starting processing of training request for model_id {}".format(model_id))
"\t Starting processing of training request for model_id {}".format(model_id), sesssion_id=session_id)
self.state = ClientState.training

try:
Expand Down Expand Up @@ -526,13 +528,15 @@ def _process_training_request(self, model_id):

return updated_model_id, meta

def _process_validation_request(self, model_id, is_inference):
def _process_validation_request(self, model_id: str, is_inference: bool, session_id: str = None):
"""Process a validation request.
:param model_id: The model id of the model to be validated.
:type model_id: str
:param is_inference: True if the validation is an inference request, False if it is a validation request.
:type is_inference: bool
:param session_id: The id of the current session.
:type session_id: str
:return: The validation metrics, or None if validation failed.
:rtype: dict
"""
Expand All @@ -543,7 +547,7 @@ def _process_validation_request(self, model_id, is_inference):
cmd = 'validate'

self._send_status(
f"Processing {cmd} request for model_id {model_id}")
f"Processing {cmd} request for model_id {model_id}", sesssion_id=session_id)
self.state = ClientState.validating
try:
model = self.get_model_from_combiner(str(model_id))
Expand Down Expand Up @@ -586,7 +590,7 @@ def process_request(self):
tic = time.time()
self.state = ClientState.training
model_id, meta = self._process_training_request(
request.model_id)
request.model_id, session_id=request.session_id)
processing_time = time.time()-tic
meta['processing_time'] = processing_time
meta['config'] = request.data
Expand All @@ -606,19 +610,19 @@ def process_request(self):
# TODO: Check responses
_ = self.combinerStub.SendModelUpdate(update, metadata=self.metadata)
self._send_status("Model update completed.", log_level=fedn.Status.AUDIT,
type=fedn.StatusType.MODEL_UPDATE, request=update)
type=fedn.StatusType.MODEL_UPDATE, request=update, sesssion_id=request.session_id)

else:
self._send_status("Client {} failed to complete model update.",
log_level=fedn.Status.WARNING,
request=request)
request=request, sesssion_id=request.session_id)
self.state = ClientState.idle
self.inbox.task_done()

elif task_type == 'validate':
self.state = ClientState.validating
metrics = self._process_validation_request(
request.model_id, False)
request.model_id, False, request.session_id)

if metrics is not None:
# Send validation
Expand All @@ -631,17 +635,18 @@ def process_request(self):
validation.data = json.dumps(metrics)
validation.timestamp.GetCurrentTime()
validation.correlation_id = request.correlation_id
validation.session_id = request.session_id

_ = self.combinerStub.SendModelValidation(
validation, metadata=self.metadata)

status_type = fedn.StatusType.MODEL_VALIDATION

self._send_status("Model validation completed.", log_level=fedn.Status.AUDIT,
type=status_type, request=validation)
type=status_type, request=validation, sesssion_id=request.session_id)
else:
self._send_status("Client {} failed to complete model validation.".format(self.name),
log_level=fedn.Status.WARNING, request=request)
log_level=fedn.Status.WARNING, request=request, sesssion_id=request.session_id)

self.state = ClientState.idle
self.inbox.task_done()
Expand Down Expand Up @@ -679,7 +684,7 @@ def _send_heartbeat(self, update_frequency=2.0):
if not self._attached:
return

def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None):
def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None):
"""Send status message.
:param msg: The message to send.
Expand All @@ -697,6 +702,7 @@ def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None)
status.sender.role = fedn.WORKER
status.log_level = log_level
status.status = str(msg)
status.session_id = sesssion_id
if type is not None:
status.type = type

Expand Down
2 changes: 2 additions & 0 deletions fedn/fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def request_model_update(self, config, clients=[]):
request.timestamp = str(datetime.now())
request.data = json.dumps(config)
request.type = fedn.StatusType.MODEL_UPDATE
request.session_id = config['session_id']

request.sender.name = self.id
request.sender.role = fedn.COMBINER
Expand Down Expand Up @@ -215,6 +216,7 @@ def request_model_validation(self, model_id, config, clients=[]):

request.sender.name = self.id
request.sender.role = fedn.COMBINER
request.session_id = config["session_id"]

if len(clients) == 0:
clients = self.get_active_validators()
Expand Down
1 change: 1 addition & 0 deletions fedn/fedn/network/controller/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def round(self, session_config, round_id):
round_config["rounds"] = 1
round_config["round_id"] = round_id
round_config["task"] = "training"
round_config["session_id"] = session_config["session_id"]

self.set_round_config(round_id, round_config)

Expand Down
2 changes: 2 additions & 0 deletions fedn/fedn/network/grpc/fedn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ message Status {
google.protobuf.Timestamp timestamp = 6;
StatusType type = 7;
string extra = 8;
string session_id = 9;

}

Expand Down Expand Up @@ -74,6 +75,7 @@ message ModelValidation {
string correlation_id = 5;
google.protobuf.Timestamp timestamp = 6;
string meta = 7;
string session_id = 8;
}

enum ModelStatus {
Expand Down
Loading

0 comments on commit d5bcfba

Please sign in to comment.