Skip to content

Commit

Permalink
add OAProc process for user-defined subscriptions and notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
tomkralidis committed Jun 10, 2024
1 parent 5b27347 commit dab60a4
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 6 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ wis2-grep is a Reference Implementation of a WIS2 Global Replay Service.
- publish to a WIS2 Global Replay Service (OGC API - Features) using one of the supported transaction backends:
- [OGC API - Features - Part 4: Create, Replace, Update and Delete](https://docs.ogc.org/DRAFTS/20-002.html)
- Elasticsearch direct (default)
- user-defined subscriptions
- users can execute a process to subscribe to notification messages based on topic and/or datetime

## Installation

Expand Down Expand Up @@ -74,6 +76,7 @@ wis2-grep load /path/to/dir/of/wnm-files
The Docker setup uses Docker and Docker Compose to manage the following services:

- **wis2-grep-api**: API powered by [pygeoapi](https://pygeoapi.io)
- **wis2-gdc-broker**: MQTT broker
- **wis2-grep-management**: management service to publish notification messages published from a WIS2 Global Broker instance
- the default Global Broker connection is to NOAA. This can be modified in `wis2-grep.env` to point to a different Global Broker
- **wis2-grep-backend**: API search engine backend (default Elasticsearch)
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
###############################################################################

services:
wis2-grep-broker:
ports:
- 1883:1883 # default
- 1884:1884 # websockets
wis2-grep-api:
ports:
- 80:80
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ services:
networks:
- wis2-grep-net

wis2-grep-broker:
container_name: wis2-grep-broker
restart: always
build:
context: ./wis2-grep-broker/
env_file:
- wis2-grep.env
networks:
- wis2-grep-net

wis2-grep-management:
container_name: wis2-grep-management
build:
Expand Down
Binary file modified docs/architecture/c4.container.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion wis2-grep-api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ ENV PYGEOAPI_CONFIG=/pygeoapi/local.config.yml
ENV PYGEOAPI_OPENAPI=/pygeoapi/local.openapi.yml

RUN apt-get update && \
apt-get install -y curl
apt-get install -y curl && \
pip3 install pywis-pubsub

COPY ./app.py /pygeoapi/pygeoapi/app.py
COPY ./wis2_grep.py /pygeoapi/pygeoapi/process/wis2_grep.py
COPY ./docker/wis2-grep-api.yml /pygeoapi/local.config.yml
COPY ./docker/entrypoint.sh /app/docker/wis2-grep-api/entrypoint.sh

Expand Down
13 changes: 9 additions & 4 deletions wis2-grep-api/docker/wis2-grep-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,25 @@ resources:
rel: items
title: Notifications from Météo-France, Global Broker Service
href: mqtts://everyone:[email protected]:8883
channel: 'origin/a/wis2/#'
channel: '+/a/wis2/#'
length: -1
- type: application/geo+json
rel: items
title: Notifications from China Meteorological Agency, Global Broker Service
href: mqtts://everyone:[email protected]:8883
channel: 'origin/a/wis2/#'
channel: '+/a/wis2/#'
length: -1
- type: application/geo+json
rel: items
title: Notifications from National Oceanic and Atmospheric Administration, National Weather Service, Global Broker Service
href: mqtts://everyone:[email protected]:8883
channel: 'origin/a/wis2/#'
channel: '+/a/wis2/#'
length: -1
- type: application/geo+json
rel: items
title: Notifications from Instituto Nacional de Meteorologia (Brazil), Global Broker Service
href: mqtts://everyone:[email protected]:8883
channel: 'origin/a/wis2/#'
channel: '+/a/wis2/#'
length: -1
- type: text/html
rel: canonical
Expand All @@ -99,3 +99,8 @@ resources:
data: ${WIS2_GREP_BACKEND_CONNECTION}
id_field: id
time_field: pubtime

wis2-grep-subscriber:
type: process
processor:
name: pygeoapi.process.wis2_grep.WIS2GrepSubscriberProcessor
219 changes: 219 additions & 0 deletions wis2-grep-api/wis2_grep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# =================================================================
#
# Authors: Tom Kralidis <[email protected]>
#
# Copyright (c) 2024 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================

import json
import logging
import os
import requests

from pywis_pubsub.mqtt import MQTTPubSubClient

from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError

API_ENDPOINT = 'http://localhost/collections/wis2-notification-messages/items'

BROKER_URL = os.environ['WIS2_GREP_BROKER_URL']

MQTT_CLIENT = MQTTPubSubClient(BROKER_URL)

LOGGER = logging.getLogger(__name__)

#: Process metadata and description
PROCESS_METADATA = {
'version': '0.1.0',
'id': 'wis2-grep-subscriber',
'title': {
'en': 'Global Replay subscriber',
'fr': 'Global Replay subscriber'
},
'description': {
'en': 'A process that allows for user-defined subscription to '
'replay WIS2 notification messages',
'fr': 'A process that allows for user-defined subscription to '
'replay WIS2 notification messages'
},
'jobControlOptions': ['sync-execute', 'async-execute'],
'keywords': ['wis2-grep', 'subscribe', 'notifications'],
'links': [{
'type': 'text/html',
'rel': 'canonical',
'title': 'wis2grep information',
'href': 'https://github.com/wmo-im/wis2-grep',
'hreflang': 'en-US'
}, {
'type': 'text/html',
'rel': 'related',
'title': 'WIS2 information',
'href': 'https://wmo-im.github.io/wis2-guide',
'hreflang': 'en-US'
}],
'inputs': {
'topic': {
'title': 'Topic',
'description': 'The topic to subscribe to',
'schema': {
'type': 'string'
},
'minOccurs': 1,
'maxOccurs': 1,
'keywords': ['topic', 'mqtt']
},
'datetime': {
'title': 'Datetime',
'description': 'Datetime (RFC3339) instant or envelope',
'schema': {
'type': 'string'
},
'minOccurs': 1,
'maxOccurs': 1,
'keywords': ['datetime', 'rfc3339']
},
'subscriber-id': {
'title': 'Subscriber id',
'description': 'identifier of subscribe, used in response topic',
'schema': {
'type': 'string'
},
'minOccurs': 1,
'maxOccurs': 1,
'keywords': ['subscriber']
}
},
'outputs': {
'subscription': {
'title': 'Subscription reponse',
'description': 'Response of subscription result',
'schema': {
'type': 'object',
'contentMediaType': 'application/json',
'properties': {
'status': {
'type': 'string',
'description': 'Result of subscription request'
},
'subscription': {
'type': 'object',
'required': [
'href',
'rel'
],
'properties': {
'href': {
'type': 'string',
'example': 'http://data.example.com/buildings/123' # noqa
},
'rel': {
'type': 'string',
'example': 'alternate'
},
'type': {
'type': 'string',
'example': 'application/geo+json'
},
'title': {
'type': 'string',
'example': 'Trierer Strasse 70, 53115 Bonn'
},
'channel': {
'type': 'string',
'description': 'topic to subscribe to for broker workflow' # noqa
}
}
}
}
}
}
},
'example': {
'inputs': {
'topic': 'origin/a/wis2/fr-meteofrance',
'datetime': '2024-06-10T03:00:00Z/2024-06-10T06:00:00Z',
'subscriber-id': 'foobar123'
}
}
}


class WIS2GrepSubscriberProcessor(BaseProcessor):
"""wis2-grep Subscriber"""

def __init__(self, processor_def):
"""
Initialize object
:param processor_def: provider definition
:returns: pygeoapi.process.wis2_grep.WIS2GrepSubscriberProcessor
"""

super().__init__(processor_def, PROCESS_METADATA)
self.supports_outputs = True

def execute(self, data):
datetime_ = data.get('datetime')
topic = data.get('topic')
subscriber_id = data.get('subscriber-id')

if None in [datetime_, topic, subscriber_id]:
msg = 'datetime/topic/subscriber-id required'
raise ProcessorExecuteError(msg)

outputs = {}
pub_topic = f'replay/a/wis2/{subscriber_id}'

api_params = {
# 'datetime': datetime_
'topic': topic,
}

try:
r = requests.get(API_ENDPOINT, params=api_params).json()
r.raise_for_status()
except requests.exceptions.HTTPError as err:
LOGGER.error(err)
outputs['status'] = 'failed'
outputs['description'] = err
return 'application/json', outputs

outputs['status'] = 'successful'
outputs['subscription'] = {
'rel': 'items',
'type': 'application/geo+json',
'href': BROKER_URL,
'title': 'User-defined notifications',
'channel': pub_topic
}

for feature in r['features']:
MQTT_CLIENT.pub(pub_topic, json.dumps(feature))

return 'application/json', outputs

def __repr__(self):
return f'<WIS2GrepSubscriberProcessor> {self.name}'
28 changes: 28 additions & 0 deletions wis2-grep-broker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
###############################################################################
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
###############################################################################

FROM eclipse-mosquitto:latest

COPY ./docker/mosquitto.conf /mosquitto/config/mosquitto.conf
COPY ./docker/acl.conf /mosquitto/config/acl.conf
COPY ./docker/entrypoint.sh /docker-entrypoint.sh

RUN chmod +x /docker-entrypoint.sh
6 changes: 6 additions & 0 deletions wis2-grep-broker/docker/acl.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
user everyone
topic read replay/a/wis2/#

user _USERNAME
topic readwrite #
topic read $SYS/#
23 changes: 23 additions & 0 deletions wis2-grep-broker/docker/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/sh

USERNAME=`echo $WIS2_GREP_BROKER_URL |awk -F/ '{print $3}' | awk -F@ '{print $1}' | awk -F: '{print $1}'`
PASSWORD=`echo $WIS2_GREP_BROKER_URL |awk -F/ '{print $3}' | awk -F@ '{print $1}' | awk -F: '{print $2}'`

echo "Setting mosquitto authentication"

echo "USERNAME: $USERNAME"
echo "PASSWORD: $PASSWORD"

if [ ! -e "/mosquitto/config/password.txt" ]; then
echo "Adding wis2-gc users to mosquitto password file"
mosquitto_passwd -b -c /mosquitto/config/password.txt $USERNAME $PASSWORD
mosquitto_passwd -b /mosquitto/config/password.txt everyone everyone
chmod 644 /mosquitto/config/password.txt
else
echo "Mosquitto password file already exists. Skipping wis2box user addition."
fi


sed -i "s#_USERNAME#$USERNAME#" /mosquitto/config/acl.conf

/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf
17 changes: 17 additions & 0 deletions wis2-grep-broker/docker/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
log_dest stdout
log_timestamp_format %Y-%m-%dT%H:%M:%S
password_file /mosquitto/config/password.txt
max_queued_messages 1000

# ACLs
acl_file /mosquitto/config/acl.conf

## MQTT Listener
listener 1883
protocol mqtt

listener 1884
protocol websockets
Loading

0 comments on commit dab60a4

Please sign in to comment.