diff --git a/README.md b/README.md index b38a3e2..d4ef60a 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ wis2-grep is a Reference Implementation of a WIS2 Global Replay Service. - publish to a WIS2 Global Replay Service (OGC API - Features) using one of the supported transaction backends: - [OGC API - Features - Part 4: Create, Replace, Update and Delete](https://docs.ogc.org/DRAFTS/20-002.html) - Elasticsearch direct (default) +- user-defined subscriptions + - users can execute a process to subscribe to notification messages based on topic and/or datetime ## Installation @@ -74,6 +76,7 @@ wis2-grep load /path/to/dir/of/wnm-files The Docker setup uses Docker and Docker Compose to manage the following services: - **wis2-grep-api**: API powered by [pygeoapi](https://pygeoapi.io) +- **wis2-gdc-broker**: MQTT broker - **wis2-grep-management**: management service to publish notification messages published from a WIS2 Global Broker instance - the default Global Broker connection is to NOAA. This can be modified in `wis2-grep.env` to point to a different Global Broker - **wis2-grep-backend**: API search engine backend (default Elasticsearch) diff --git a/docker-compose.override.yml b/docker-compose.override.yml index 95a4594..0c57da9 100644 --- a/docker-compose.override.yml +++ b/docker-compose.override.yml @@ -20,6 +20,10 @@ ############################################################################### services: + wis2-grep-broker: + ports: + - 1883:1883 # default + - 1884:1884 # websockets wis2-grep-api: ports: - 80:80 diff --git a/docker-compose.yml b/docker-compose.yml index 34e186f..43c5243 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -50,6 +50,16 @@ services: networks: - wis2-grep-net + wis2-grep-broker: + container_name: wis2-grep-broker + restart: always + build: + context: ./wis2-grep-broker/ + env_file: + - wis2-grep.env + networks: + - wis2-grep-net + wis2-grep-management: container_name: wis2-grep-management build: diff --git a/docs/architecture/c4.container.png b/docs/architecture/c4.container.png index 2713a31..8fd98b3 100644 Binary files a/docs/architecture/c4.container.png and b/docs/architecture/c4.container.png differ diff --git a/wis2-grep-api/Dockerfile b/wis2-grep-api/Dockerfile index 0ec43e0..28946ec 100644 --- a/wis2-grep-api/Dockerfile +++ b/wis2-grep-api/Dockerfile @@ -27,9 +27,11 @@ ENV PYGEOAPI_CONFIG=/pygeoapi/local.config.yml ENV PYGEOAPI_OPENAPI=/pygeoapi/local.openapi.yml RUN apt-get update && \ - apt-get install -y curl + apt-get install -y curl && \ + pip3 install pywis-pubsub COPY ./app.py /pygeoapi/pygeoapi/app.py +COPY ./wis2_grep.py /pygeoapi/pygeoapi/process/wis2_grep.py COPY ./docker/wis2-grep-api.yml /pygeoapi/local.config.yml COPY ./docker/entrypoint.sh /app/docker/wis2-grep-api/entrypoint.sh diff --git a/wis2-grep-api/docker/wis2-grep-api.yml b/wis2-grep-api/docker/wis2-grep-api.yml index 0d6ac4d..2b6de46 100644 --- a/wis2-grep-api/docker/wis2-grep-api.yml +++ b/wis2-grep-api/docker/wis2-grep-api.yml @@ -65,25 +65,25 @@ resources: rel: items title: Notifications from Météo-France, Global Broker Service href: mqtts://everyone:everyone@globalbroker.meteo.fr:8883 - channel: 'origin/a/wis2/#' + channel: '+/a/wis2/#' length: -1 - type: application/geo+json rel: items title: Notifications from China Meteorological Agency, Global Broker Service href: mqtts://everyone:everyone@gb.wis.cma.cn:8883 - channel: 'origin/a/wis2/#' + channel: '+/a/wis2/#' length: -1 - type: application/geo+json rel: items title: Notifications from National Oceanic and Atmospheric Administration, National Weather Service, Global Broker Service href: mqtts://everyone:everyone@wis2globalbroker.nws.noaa.gov:8883 - channel: 'origin/a/wis2/#' + channel: '+/a/wis2/#' length: -1 - type: application/geo+json rel: items title: Notifications from Instituto Nacional de Meteorologia (Brazil), Global Broker Service href: mqtts://everyone:everyone@globalbroker.inmet.gov.br:8883 - channel: 'origin/a/wis2/#' + channel: '+/a/wis2/#' length: -1 - type: text/html rel: canonical @@ -99,3 +99,8 @@ resources: data: ${WIS2_GREP_BACKEND_CONNECTION} id_field: id time_field: pubtime + + wis2-grep-subscriber: + type: process + processor: + name: pygeoapi.process.wis2_grep.WIS2GrepSubscriberProcessor diff --git a/wis2-grep-api/wis2_grep.py b/wis2-grep-api/wis2_grep.py new file mode 100644 index 0000000..005aa34 --- /dev/null +++ b/wis2-grep-api/wis2_grep.py @@ -0,0 +1,219 @@ +# ================================================================= +# +# Authors: Tom Kralidis +# +# Copyright (c) 2024 Tom Kralidis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import json +import logging +import os +import requests + +from pywis_pubsub.mqtt import MQTTPubSubClient + +from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError + +API_ENDPOINT = 'http://localhost/collections/wis2-notification-messages/items' + +BROKER_URL = os.environ['WIS2_GREP_BROKER_URL'] + +MQTT_CLIENT = MQTTPubSubClient(BROKER_URL) + +LOGGER = logging.getLogger(__name__) + +#: Process metadata and description +PROCESS_METADATA = { + 'version': '0.1.0', + 'id': 'wis2-grep-subscriber', + 'title': { + 'en': 'Global Replay subscriber', + 'fr': 'Global Replay subscriber' + }, + 'description': { + 'en': 'A process that allows for user-defined subscription to ' + 'replay WIS2 notification messages', + 'fr': 'A process that allows for user-defined subscription to ' + 'replay WIS2 notification messages' + }, + 'jobControlOptions': ['sync-execute', 'async-execute'], + 'keywords': ['wis2-grep', 'subscribe', 'notifications'], + 'links': [{ + 'type': 'text/html', + 'rel': 'canonical', + 'title': 'wis2grep information', + 'href': 'https://github.com/wmo-im/wis2-grep', + 'hreflang': 'en-US' + }, { + 'type': 'text/html', + 'rel': 'related', + 'title': 'WIS2 information', + 'href': 'https://wmo-im.github.io/wis2-guide', + 'hreflang': 'en-US' + }], + 'inputs': { + 'topic': { + 'title': 'Topic', + 'description': 'The topic to subscribe to', + 'schema': { + 'type': 'string' + }, + 'minOccurs': 1, + 'maxOccurs': 1, + 'keywords': ['topic', 'mqtt'] + }, + 'datetime': { + 'title': 'Datetime', + 'description': 'Datetime (RFC3339) instant or envelope', + 'schema': { + 'type': 'string' + }, + 'minOccurs': 1, + 'maxOccurs': 1, + 'keywords': ['datetime', 'rfc3339'] + }, + 'subscriber-id': { + 'title': 'Subscriber id', + 'description': 'identifier of subscribe, used in response topic', + 'schema': { + 'type': 'string' + }, + 'minOccurs': 1, + 'maxOccurs': 1, + 'keywords': ['subscriber'] + } + }, + 'outputs': { + 'subscription': { + 'title': 'Subscription reponse', + 'description': 'Response of subscription result', + 'schema': { + 'type': 'object', + 'contentMediaType': 'application/json', + 'properties': { + 'status': { + 'type': 'string', + 'description': 'Result of subscription request' + }, + 'subscription': { + 'type': 'object', + 'required': [ + 'href', + 'rel' + ], + 'properties': { + 'href': { + 'type': 'string', + 'example': 'http://data.example.com/buildings/123' # noqa + }, + 'rel': { + 'type': 'string', + 'example': 'alternate' + }, + 'type': { + 'type': 'string', + 'example': 'application/geo+json' + }, + 'title': { + 'type': 'string', + 'example': 'Trierer Strasse 70, 53115 Bonn' + }, + 'channel': { + 'type': 'string', + 'description': 'topic to subscribe to for broker workflow' # noqa + } + } + } + } + } + } + }, + 'example': { + 'inputs': { + 'topic': 'origin/a/wis2/fr-meteofrance', + 'datetime': '2024-06-10T03:00:00Z/2024-06-10T06:00:00Z', + 'subscriber-id': 'foobar123' + } + } +} + + +class WIS2GrepSubscriberProcessor(BaseProcessor): + """wis2-grep Subscriber""" + + def __init__(self, processor_def): + """ + Initialize object + + :param processor_def: provider definition + + :returns: pygeoapi.process.wis2_grep.WIS2GrepSubscriberProcessor + """ + + super().__init__(processor_def, PROCESS_METADATA) + self.supports_outputs = True + + def execute(self, data): + datetime_ = data.get('datetime') + topic = data.get('topic') + subscriber_id = data.get('subscriber-id') + + if None in [datetime_, topic, subscriber_id]: + msg = 'datetime/topic/subscriber-id required' + raise ProcessorExecuteError(msg) + + outputs = {} + pub_topic = f'replay/a/wis2/{subscriber_id}' + + api_params = { + # 'datetime': datetime_ + 'topic': topic, + } + + try: + r = requests.get(API_ENDPOINT, params=api_params).json() + r.raise_for_status() + except requests.exceptions.HTTPError as err: + LOGGER.error(err) + outputs['status'] = 'failed' + outputs['description'] = err + return 'application/json', outputs + + outputs['status'] = 'successful' + outputs['subscription'] = { + 'rel': 'items', + 'type': 'application/geo+json', + 'href': BROKER_URL, + 'title': 'User-defined notifications', + 'channel': pub_topic + } + + for feature in r['features']: + MQTT_CLIENT.pub(pub_topic, json.dumps(feature)) + + return 'application/json', outputs + + def __repr__(self): + return f' {self.name}' diff --git a/wis2-grep-broker/Dockerfile b/wis2-grep-broker/Dockerfile new file mode 100644 index 0000000..ed353c7 --- /dev/null +++ b/wis2-grep-broker/Dockerfile @@ -0,0 +1,28 @@ +############################################################################### +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +############################################################################### + +FROM eclipse-mosquitto:latest + +COPY ./docker/mosquitto.conf /mosquitto/config/mosquitto.conf +COPY ./docker/acl.conf /mosquitto/config/acl.conf +COPY ./docker/entrypoint.sh /docker-entrypoint.sh + +RUN chmod +x /docker-entrypoint.sh diff --git a/wis2-grep-broker/docker/acl.conf b/wis2-grep-broker/docker/acl.conf new file mode 100644 index 0000000..00f076c --- /dev/null +++ b/wis2-grep-broker/docker/acl.conf @@ -0,0 +1,6 @@ +user everyone +topic read replay/a/wis2/# + +user _USERNAME +topic readwrite # +topic read $SYS/# diff --git a/wis2-grep-broker/docker/entrypoint.sh b/wis2-grep-broker/docker/entrypoint.sh new file mode 100755 index 0000000..185967e --- /dev/null +++ b/wis2-grep-broker/docker/entrypoint.sh @@ -0,0 +1,23 @@ +#!/bin/sh + +USERNAME=`echo $WIS2_GREP_BROKER_URL |awk -F/ '{print $3}' | awk -F@ '{print $1}' | awk -F: '{print $1}'` +PASSWORD=`echo $WIS2_GREP_BROKER_URL |awk -F/ '{print $3}' | awk -F@ '{print $1}' | awk -F: '{print $2}'` + +echo "Setting mosquitto authentication" + +echo "USERNAME: $USERNAME" +echo "PASSWORD: $PASSWORD" + +if [ ! -e "/mosquitto/config/password.txt" ]; then + echo "Adding wis2-gc users to mosquitto password file" + mosquitto_passwd -b -c /mosquitto/config/password.txt $USERNAME $PASSWORD + mosquitto_passwd -b /mosquitto/config/password.txt everyone everyone + chmod 644 /mosquitto/config/password.txt +else + echo "Mosquitto password file already exists. Skipping wis2box user addition." +fi + + +sed -i "s#_USERNAME#$USERNAME#" /mosquitto/config/acl.conf + +/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf diff --git a/wis2-grep-broker/docker/mosquitto.conf b/wis2-grep-broker/docker/mosquitto.conf new file mode 100644 index 0000000..347fb61 --- /dev/null +++ b/wis2-grep-broker/docker/mosquitto.conf @@ -0,0 +1,17 @@ +persistence true +persistence_location /mosquitto/data/ +log_dest file /mosquitto/log/mosquitto.log +log_dest stdout +log_timestamp_format %Y-%m-%dT%H:%M:%S +password_file /mosquitto/config/password.txt +max_queued_messages 1000 + +# ACLs +acl_file /mosquitto/config/acl.conf + +## MQTT Listener +listener 1883 +protocol mqtt + +listener 1884 +protocol websockets diff --git a/wis2-grep.env b/wis2-grep.env index 54fe992..9c2cb40 100644 --- a/wis2-grep.env +++ b/wis2-grep.env @@ -3,6 +3,6 @@ export WIS2_GREP_API_URL=http://localhost export WIS2_GREP_API_URL_DOCKER=http://wis2-grep-api export WIS2_GREP_BACKEND_TYPE=Elasticsearch export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/wis2-notification-messages +export WIS2_GREP_BROKER_URL=mqtt://wis2-grep:wis2-grep@wis2-grep-broker:1883 export WIS2_GREP_CENTRE_ID=ca-eccc-msc-global-replay -#export WIS2_GREP_GB=mqtts://everyone:everyone@wis2globalbroker.nws.noaa.gov:8883 export WIS2_GREP_GB=mqtts://everyone:everyone@globalbroker.meteo.fr:8883