From 9a70e26bf6874cc1ae6d0daf99d1433bb8a8bda0 Mon Sep 17 00:00:00 2001 From: Tom Kralidis Date: Thu, 12 Dec 2024 07:55:36 -0500 Subject: [PATCH] add WME support for reporting --- wis2-gdc-management/wis2_gdc/registrar.py | 7 ++- wis2-gdc-management/wis2_gdc/wme.py | 56 +++++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) create mode 100644 wis2-gdc-management/wis2_gdc/wme.py diff --git a/wis2-gdc-management/wis2_gdc/registrar.py b/wis2-gdc-management/wis2_gdc/registrar.py index 79664c6..592919c 100644 --- a/wis2-gdc-management/wis2_gdc/registrar.py +++ b/wis2-gdc-management/wis2_gdc/registrar.py @@ -40,6 +40,7 @@ BACKEND_CONNECTION, BROKER_URL, CENTRE_ID, GB_LINKS, PUBLISH_REPORTS, REJECT_ON_FAILING_ETS, RUN_KPI) +from wis2_gdc.wme import generate_wme LOGGER = logging.getLogger(__name__) @@ -193,7 +194,8 @@ def register(self, metadata: Union[dict, str], topic: str = None) -> None: if PUBLISH_REPORTS: LOGGER.info('Publishing ETS report to broker') - self.broker.pub(publish_report_topic, json.dumps(ets_results)) + wme = generate_wme(self.centre_id, 'ets', ets_results) + self.broker.pub(publish_report_topic, json.dumps(wme)) if failed_ets: self._process_record_metric( @@ -227,7 +229,8 @@ def register(self, metadata: Union[dict, str], topic: str = None) -> None: if PUBLISH_REPORTS and 'summary' in kpi_results: LOGGER.info('Publishing KPI report to broker') - self.broker.pub(publish_report_topic, json.dumps(kpi_results)) + wme = generate_wme(self.centre_id, 'kpi', kpi_results) + self.broker.pub(publish_report_topic, json.dumps(wme)) kpi_labels = [self.metadata['id']] + centre_id_labels diff --git a/wis2-gdc-management/wis2_gdc/wme.py b/wis2-gdc-management/wis2_gdc/wme.py new file mode 100644 index 0000000..1cd5d40 --- /dev/null +++ b/wis2-gdc-management/wis2_gdc/wme.py @@ -0,0 +1,56 @@ +############################################################################### +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +############################################################################### + +from datetime import datetime, UTC +import logging +import uuid + +from wis2_gdc.env import CENTRE_ID + +LOGGER = logging.getLogger(__name__) + +DATASCHEMAS = { + 'ets': 'https://raw.githubusercontent.com/wmo-im/wis2-monitoring-events/refs/heads/main/schemas/wcmp2-ets-bundled.json', # noqa + 'kpi': 'https://raw.githubusercontent.com/wmo-im/wis2-monitoring-events/refs/heads/main/schemas/wcmp2-kpi-bundled.json' # noqa +} + + +def generate_wme(subject: str, report_type: str, data: dict) -> dict: + """ + Generate WIS2 Monitoring Event Message of WCMP2 report + + :param subject: `str` of centre-id being reported + :param report_type: `str` of WCMP2 report type (default is ets) + + :returns: `dict` of WMEM + """ + + return { + 'specversion': '1.0', + 'type': 'int.wmo.wis2.wme.report.wcmp2.{report_type}', + 'source': CENTRE_ID, + 'subject': subject, + 'id': str(uuid.uuid4()), + 'time': datetime.now(UTC).strftime('%Y-%m-%dT%H:%M:%SZ'), + 'datacontenttype': 'application/json', + 'dataschema': DATASCHEMAS[report_type], + 'data': data + }