Skip to content

Commit

Permalink
adjust response payload to MQTT subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
tomkralidis committed Jun 13, 2024
1 parent 2bb8bf5 commit d353a08
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 40 deletions.
108 changes: 68 additions & 40 deletions wis2-grep-api/wis2_grep.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,30 @@
import json
import logging
import os
import requests
import uuid

from pywis_pubsub.mqtt import MQTTPubSubClient
import requests

from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError

API_ENDPOINT = 'http://localhost/collections/wis2-notification-messages/items'

BROKER_URL = os.environ['WIS2_GREP_BROKER_URL']
CENTRE_ID = os.environ['WIS2_GREP_CENTRE_ID']

MQTT_CLIENT = MQTTPubSubClient(BROKER_URL)

LOGGER = logging.getLogger(__name__)

GB_LINKS = []

for key, value in os.environ.items():
if key.startswith('WIS2_GREP_GB_LINK'):
# centre_id, url, title = value.rsplit(',', 2)
GB_LINKS.append(value.split(',', 2))


#: Process metadata and description
PROCESS_METADATA = {
'version': '0.1.0',
Expand Down Expand Up @@ -96,9 +106,10 @@
},
'subscriber-id': {
'title': 'Subscriber id',
'description': 'identifier of subscribe, used in response topic',
'description': 'UUID of subscriber, used in response topic',
'schema': {
'type': 'string'
'type': 'string',
'format': 'uuid'
},
'minOccurs': 1,
'maxOccurs': 1,
Expand All @@ -117,32 +128,35 @@
'type': 'string',
'description': 'Result of subscription request'
},
'subscription': {
'type': 'object',
'required': [
'href',
'rel'
],
'properties': {
'href': {
'type': 'string',
'example': 'http://data.example.com/buildings/123' # noqa
},
'rel': {
'type': 'string',
'example': 'alternate'
},
'type': {
'type': 'string',
'example': 'application/geo+json'
},
'title': {
'type': 'string',
'example': 'Trierer Strasse 70, 53115 Bonn'
},
'channel': {
'type': 'string',
'description': 'topic to subscribe to for broker workflow' # noqa
'subscriptions': {
'type': 'array',
'items': {
'type': 'object',
'required': [
'href',
'rel'
],
'properties': {
'href': {
'type': 'string',
'example': 'http://data.example.com/buildings/123' # noqa
},
'rel': {
'type': 'string',
'example': 'alternate'
},
'type': {
'type': 'string',
'example': 'application/geo+json'
},
'title': {
'type': 'string',
'example': 'Trierer Strasse 70, 53115 Bonn'
},
'channel': {
'type': 'string',
'description': 'topic to subscribe to for broker workflow' # noqa
}
}
}
}
Expand All @@ -154,7 +168,7 @@
'inputs': {
'topic': 'origin/a/wis2/fr-meteofrance',
'datetime': '2024-06-10T03:00:00Z/2024-06-10T06:00:00Z',
'subscriber-id': 'foobar123'
'subscriber-id': 'a30c829b-0ee3-4e4f-bc1f-1f784465b20e'
}
}
}
Expand All @@ -180,35 +194,49 @@ def execute(self, data):
topic = data.get('topic')
subscriber_id = data.get('subscriber-id')

LOGGER.debug('Sanitizing topic')
api_topic = topic.replace('/#', '').replace('+', '*')

if None in [datetime_, topic, subscriber_id]:
msg = 'datetime/topic/subscriber-id required'
LOGGER.error(msg)
raise ProcessorExecuteError(msg)

try:
LOGGER.debug('Validating subscriber-id')
uuid.UUID(subscriber_id)
except ValueError:
raise ProcessorExecuteError('Invalid UUID')

outputs = {}
pub_topic = f'replay/a/wis2/{subscriber_id}'
pub_topic = f'replay/a/wis2/{CENTRE_ID}/{subscriber_id}'

api_params = {
# 'datetime': datetime_
'topic': topic,
'topic': api_topic,
}

try:
r = requests.get(API_ENDPOINT, params=api_params).json()
r = requests.get(API_ENDPOINT, params=api_params)
r.raise_for_status()
r = r.json()
except requests.exceptions.HTTPError as err:
LOGGER.error(err)
outputs['status'] = 'failed'
outputs['description'] = err
return 'application/json', outputs

outputs['status'] = 'successful'
outputs['subscription'] = {
'rel': 'items',
'type': 'application/geo+json',
'href': BROKER_URL,
'title': 'User-defined notifications',
'channel': pub_topic
}
outputs['subscriptions'] = []

for gb_link in GB_LINKS:
outputs['subscriptions'].append({
'rel': 'items',
'type': 'application/geo+json',
'href': gb_link[1],
'title': gb_link[2],
'channel': pub_topic
})

for feature in r['features']:
MQTT_CLIENT.pub(pub_topic, json.dumps(feature))
Expand Down
6 changes: 6 additions & 0 deletions wis2-grep.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,9 @@ export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/wis2-notificat
export WIS2_GREP_BROKER_URL=mqtt://wis2-grep:wis2-grep@wis2-grep-broker:1883
export WIS2_GREP_CENTRE_ID=ca-eccc-msc-global-replay
export WIS2_GREP_GB=mqtts://everyone:[email protected]:8883

# global broker links
export WIS2_GREP_GB_LINK_METEOFRANCE="fr-meteo-france-global-broker,mqtts://everyone:[email protected]:8883,Météo-France, Global Broker Service"
export WIS2_GREP_GB_LINK_CMA="cn-cma-global-broker,mqtts://everyone:[email protected]:8883,China Meteorological Agency, Global Broker Service"
export WIS2_GREP_GB_LINK_NOAA="us-noaa-nws-global-broker,mqtts://everyone:[email protected]:8883,National Oceanic and Atmospheric Administration, National Weather Service, Global Broker Service"
export WIS2_GREP_GB_LINK_INMET="br-inmet-global-broker,mqtts://everyone:[email protected]:8883,Instituto Nacional de Meteorologia (Brazil), Global Broker Service"

0 comments on commit d353a08

Please sign in to comment.