diff --git a/wis2-grep-api/wis2_grep.py b/wis2-grep-api/wis2_grep.py index 005aa34..8b71d20 100644 --- a/wis2-grep-api/wis2_grep.py +++ b/wis2-grep-api/wis2_grep.py @@ -30,20 +30,30 @@ import json import logging import os -import requests +import uuid from pywis_pubsub.mqtt import MQTTPubSubClient +import requests from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError API_ENDPOINT = 'http://localhost/collections/wis2-notification-messages/items' BROKER_URL = os.environ['WIS2_GREP_BROKER_URL'] +CENTRE_ID = os.environ['WIS2_GREP_CENTRE_ID'] MQTT_CLIENT = MQTTPubSubClient(BROKER_URL) LOGGER = logging.getLogger(__name__) +GB_LINKS = [] + +for key, value in os.environ.items(): + if key.startswith('WIS2_GREP_GB_LINK'): + # centre_id, url, title = value.rsplit(',', 2) + GB_LINKS.append(value.split(',', 2)) + + #: Process metadata and description PROCESS_METADATA = { 'version': '0.1.0', @@ -96,9 +106,10 @@ }, 'subscriber-id': { 'title': 'Subscriber id', - 'description': 'identifier of subscribe, used in response topic', + 'description': 'UUID of subscriber, used in response topic', 'schema': { - 'type': 'string' + 'type': 'string', + 'format': 'uuid' }, 'minOccurs': 1, 'maxOccurs': 1, @@ -117,32 +128,35 @@ 'type': 'string', 'description': 'Result of subscription request' }, - 'subscription': { - 'type': 'object', - 'required': [ - 'href', - 'rel' - ], - 'properties': { - 'href': { - 'type': 'string', - 'example': 'http://data.example.com/buildings/123' # noqa - }, - 'rel': { - 'type': 'string', - 'example': 'alternate' - }, - 'type': { - 'type': 'string', - 'example': 'application/geo+json' - }, - 'title': { - 'type': 'string', - 'example': 'Trierer Strasse 70, 53115 Bonn' - }, - 'channel': { - 'type': 'string', - 'description': 'topic to subscribe to for broker workflow' # noqa + 'subscriptions': { + 'type': 'array', + 'items': { + 'type': 'object', + 'required': [ + 'href', + 'rel' + ], + 'properties': { + 'href': { + 'type': 'string', + 'example': 'http://data.example.com/buildings/123' # noqa + }, + 'rel': { + 'type': 'string', + 'example': 'alternate' + }, + 'type': { + 'type': 'string', + 'example': 'application/geo+json' + }, + 'title': { + 'type': 'string', + 'example': 'Trierer Strasse 70, 53115 Bonn' + }, + 'channel': { + 'type': 'string', + 'description': 'topic to subscribe to for broker workflow' # noqa + } } } } @@ -154,7 +168,7 @@ 'inputs': { 'topic': 'origin/a/wis2/fr-meteofrance', 'datetime': '2024-06-10T03:00:00Z/2024-06-10T06:00:00Z', - 'subscriber-id': 'foobar123' + 'subscriber-id': 'a30c829b-0ee3-4e4f-bc1f-1f784465b20e' } } } @@ -180,21 +194,32 @@ def execute(self, data): topic = data.get('topic') subscriber_id = data.get('subscriber-id') + LOGGER.debug('Sanitizing topic') + api_topic = topic.replace('/#', '').replace('+', '*') + if None in [datetime_, topic, subscriber_id]: msg = 'datetime/topic/subscriber-id required' + LOGGER.error(msg) raise ProcessorExecuteError(msg) + try: + LOGGER.debug('Validating subscriber-id') + uuid.UUID(subscriber_id) + except ValueError: + raise ProcessorExecuteError('Invalid UUID') + outputs = {} - pub_topic = f'replay/a/wis2/{subscriber_id}' + pub_topic = f'replay/a/wis2/{CENTRE_ID}/{subscriber_id}' api_params = { # 'datetime': datetime_ - 'topic': topic, + 'topic': api_topic, } try: - r = requests.get(API_ENDPOINT, params=api_params).json() + r = requests.get(API_ENDPOINT, params=api_params) r.raise_for_status() + r = r.json() except requests.exceptions.HTTPError as err: LOGGER.error(err) outputs['status'] = 'failed' @@ -202,13 +227,16 @@ def execute(self, data): return 'application/json', outputs outputs['status'] = 'successful' - outputs['subscription'] = { - 'rel': 'items', - 'type': 'application/geo+json', - 'href': BROKER_URL, - 'title': 'User-defined notifications', - 'channel': pub_topic - } + outputs['subscriptions'] = [] + + for gb_link in GB_LINKS: + outputs['subscriptions'].append({ + 'rel': 'items', + 'type': 'application/geo+json', + 'href': gb_link[1], + 'title': gb_link[2], + 'channel': pub_topic + }) for feature in r['features']: MQTT_CLIENT.pub(pub_topic, json.dumps(feature)) diff --git a/wis2-grep.env b/wis2-grep.env index 9c2cb40..8fc62d5 100644 --- a/wis2-grep.env +++ b/wis2-grep.env @@ -6,3 +6,9 @@ export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/wis2-notificat export WIS2_GREP_BROKER_URL=mqtt://wis2-grep:wis2-grep@wis2-grep-broker:1883 export WIS2_GREP_CENTRE_ID=ca-eccc-msc-global-replay export WIS2_GREP_GB=mqtts://everyone:everyone@globalbroker.meteo.fr:8883 + +# global broker links +export WIS2_GREP_GB_LINK_METEOFRANCE="fr-meteo-france-global-broker,mqtts://everyone:everyone@globalbroker.meteo.fr:8883,Météo-France, Global Broker Service" +export WIS2_GREP_GB_LINK_CMA="cn-cma-global-broker,mqtts://everyone:everyone@gb.wis.cma.cn:8883,China Meteorological Agency, Global Broker Service" +export WIS2_GREP_GB_LINK_NOAA="us-noaa-nws-global-broker,mqtts://everyone:everyone@wis2globalbroker.nws.noaa.gov:8883,National Oceanic and Atmospheric Administration, National Weather Service, Global Broker Service" +export WIS2_GREP_GB_LINK_INMET="br-inmet-global-broker,mqtts://everyone:everyone@globalbroker.inmet.gov.br:8883,Instituto Nacional de Meteorologia (Brazil), Global Broker Service"