Skip to content

Commit

Permalink
- add GDC centre-id support
Browse files Browse the repository at this point in the history
- add Pub/Sub workflow for reporting
- fix record links updates (enumerating all GBs)
  • Loading branch information
tomkralidis committed Nov 26, 2023
1 parent 0a266e5 commit a2012e5
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 20 deletions.
1 change: 1 addition & 0 deletions docker/mosquitto/acl.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
user everyone
topic read #
topic read gdc-reports/#

user _USERNAME
Expand Down
4 changes: 2 additions & 2 deletions docker/pywis-pubsub.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ subscribe_topics:

qos: 1

verify_data: true
verify_data: false

validate_message: true
validate_message: false

hook: wis2_gdc.hook.DiscoveryMetadataHook
3 changes: 2 additions & 1 deletion docker/wis2-gdc-api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ LABEL maintainer="Tom Kralidis <[email protected]>"
ENV PYGEOAPI_CONFIG=/pygeoapi/local.config.yml
ENV PYGEOAPI_OPENAPI=/pygeoapi/local.openapi.yml

RUN pip3 install pywcmp && \
RUN pip3 install "pywcmp>=0.7" && \
pywcmp --version && \
pywcmp bundle sync

COPY ./wis2-gdc.yml /pygeoapi/local.config.yml
Expand Down
8 changes: 7 additions & 1 deletion docker/wis2-gdc.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
export WIS2_GDC_API_URL=http://localhost
export WIS2_GDC_API_URL_DOCKER=http://wis2-gdc-api
export WIS2_GDC_BACKEND_TYPE=Elasticsearch
export WIS2_GDC_BACKEND_CONNECTION=http://elasticsearch:9200/wis2-discovery-metadata
export WIS2_GDC_BROKER_URL=mqtt://wis2-gdc:wis2-gdc@mosquitto:1883
export WIS2_GDC_GB=mqtts://everyone:[email protected]:8883
export WIS2_GDC_CENTRE_ID=ca-eccc-msc-gdc
export WIS2_GDC_GB=mqtt://everyone:everyone@mosquitto:1883
export WIS2_GDC_GB_TOPIC=origin/a/wis2/+/+/metadata/#

# global broker links
export WIS2_GDC_GB_LINK_METEOFRANCE=Météo-France,mqtt://everyone:[email protected]:8883
export WIS2_GDC_GB_LINK_CMA=China Meteorological Agency,mqtt://everyone:[email protected]:8883
6 changes: 6 additions & 0 deletions wis2-gdc.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
export WIS2_GDC_API_URL=http://localhost
export WIS2_GDC_API_URL_DOCKER=http://wis2-gdc-api
export WIS2_GDC_BACKEND_TYPE=Elasticsearch
export WIS2_GDC_BACKEND_CONNECTION=http://localhost:9200/wis2-discovery-metadata
export WIS2_GDC_BROKER_URL=mqtt://wis2-gdc:wis2-gdc@localhost:1883
export WIS2_GDC_CENTRE_ID=ca-eccc-msc-gdc
export WIS2_GDC_GB=mqtt://everyone:everyone@localhost:1883
export WIS2_GDC_GB_TOPIC=origin/a/wis2/+/+/metadata/#

# global broker links
export WIS2_GDC_GB_LINK_METEOFRANCE=Météo-France,mqtt://everyone:[email protected]:8883
export WIS2_GDC_GB_LINK_CMA=China Meteorological Agency,mqtt://everyone:[email protected]:8883
11 changes: 11 additions & 0 deletions wis2_gdc/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,14 @@
BACKEND_TYPE = os.environ.get('WIS2_GDC_BACKEND_TYPE')
BACKEND_CONNECTION = os.environ.get('WIS2_GDC_BACKEND_CONNECTION')
BROKER_URL = os.environ.get('WIS2_GDC_BROKER_URL')
CENTRE_ID = os.environ.get('WIS2_GDC_CENTRE_ID')

if None in [API_URL, API_URL_DOCKER, BACKEND_TYPE,
BACKEND_CONNECTION, BROKER_URL, CENTRE_ID]:
raise EnvironmentError('Environment variables not set!')

GB_LINKS = []

for key, value in os.environ.items():
if key.startswith('WIS2_GDC_GB_LINK'):
GB_LINKS.append(value.split(','))
92 changes: 77 additions & 15 deletions wis2_gdc/registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@
#
###############################################################################

from copy import deepcopy
import json
import logging
from pathlib import Path

import click

from pywcmp.wcmp2.ets import WMOCoreMetadataProfileTestSuite2
from pywcmp.wcmp2.kpi import WMOCoreMetadataProfileKeyPerformanceIndicators
from pywis_pubsub import cli_options
from pywis_pubsub.mqtt import MQTTPubSubClient

from wis2_gdc.backend import BACKENDS
from wis2_gdc.env import BACKEND_TYPE, BACKEND_CONNECTION, BROKER_URL
from wis2_gdc.env import (BACKEND_TYPE, BACKEND_CONNECTION, BROKER_URL,
GB_LINKS)

LOGGER = logging.getLogger(__name__)

Expand All @@ -56,42 +59,62 @@ def register(self, metadata: dict) -> None:
"""

self.metadata = metadata
self.centre_id = self.metadata['id'].split(':')[3]
topic = f"report/a/wis2/{self.centre_id}"

LOGGER.debug(f'Metadata: {self.metadata}')

LOGGER.info('Running ETS')
ets_results = self._run_ets()
if ets_results['ets-report']['summary']['FAILED'] > 0:
LOGGER.debug('ETS errors; metadata not published')

LOGGER.info('Publishing ETS report to broker')
self.broker.pub(topic, json.dumps(ets_results))

try:
if ets_results['ets-report']['summary']['FAILED'] > 0:
LOGGER.warning('ETS errors; metadata not published')
return
except KeyError: # validation error
pass
# LOGGER.debug('Validation errors; metadata not published')
# return

LOGGER.info('Updating links')
self.update_record_links()

LOGGER.info(f'Publishing metadata to {BACKEND_TYPE} ({BACKEND_CONNECTION})') # noqa
self._publish()

LOGGER.info('Running ETS')
kpi_results = self._run_kpi()
LOGGER.info('Publishing KPI report to broker')
self.broker.pub(topic, json.dumps(kpi_results))

def _run_ets(self) -> dict:
"""
Helper function to run ETS
:returns: `dict` of ETS results
"""

LOGGER.info('Running ETS')
ts = WMOCoreMetadataProfileTestSuite2(self.metadata)
try:
results = ts.run_tests(fail_on_schema_validation=True)
LOGGER.info('Publishing ETS report to broker')
topic = f"gdc-reports/ets/{self.metadata['id']}"
self.broker.pub(topic, json.dumps(results))
return results
except Exception as err:
LOGGER.error(err)
ts = WMOCoreMetadataProfileTestSuite2(self.metadata)
return ts.run_tests(fail_on_schema_validation=True)
except ValueError as err:
return {'description': f'Failed validation: {err}'}

def _run_kpi(self):
def _run_kpi(self) -> dict:
"""
Helper function to run KPI
:returns: `dict` of KPI results
"""

LOGGER.info('Running KPI')
pass
try:
kpis = WMOCoreMetadataProfileKeyPerformanceIndicators(self.metadata) # noqa
return kpis.evaluate()
except Exception as err:
return {'description': f'Failed validation: {err}'}

def _publish(self):
"""
Expand All @@ -105,6 +128,45 @@ def _publish(self):
LOGGER.info('Saving metadata to backend')
backend.save(self.metadata)

def update_record_links(self) -> None:
"""
Update Global Service links
:returns: `None` (self.metadata updated inline)
"""

def is_wis2_mqtt_link(link) -> bool:
if link['href'].startswith('mqtt'):
if (link.get('wmo:topic', '').startswith('origin/a/wis2') or
link.get('channel', '').startswith('origin/a/wis2')):
LOGGER.debug('Found MQTT link')
return True

return False

for count, value in enumerate(self.metadata['links']):
if is_wis2_mqtt_link(value):
LOGGER.debug('Adjusting MQTT link')
channel = value.get('wmo:topic', value.get('channel'))

new_link = value
del new_link['wmo:topic']

new_link['rel'] = 'items'
new_link['channel'] = channel.replace('origin', 'cache')
new_link['type'] = 'application/geo+json'

del self.metadata['links'][count]

for gb_link in GB_LINKS:
gb_link_to_add = deepcopy(new_link)
title = f'Notifications from {gb_link[0]} Global Broker'
gb_link_to_add['title'] = title
gb_link_to_add['href'] = gb_link[1]

LOGGER.debug(f'Adding new link: {gb_link_to_add}')
self.metadata['links'].append(gb_link_to_add)


@click.command()
@click.pass_context
Expand Down
2 changes: 1 addition & 1 deletion wis2_gdc/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def archive_metadata(url: str, archive_zipfile: str) -> None:
Archive all discovery metadata from a GDC to an archive zipfile
:param url: `str` of GDC API URL
:archive_zipfile: `str` of filename of zipfile
:param archive_zipfile: `str` of filename of zipfile
:returns: `None`
"""
Expand Down

0 comments on commit a2012e5

Please sign in to comment.