diff --git a/README.md b/README.md index 7f669cb..d0af1a1 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,44 @@ wis2-grep register /path/to/wnm-file.json # loading notification messages manually (directory of .json files) wis2-grep load /path/to/dir/of/wnm-files + +# clean messages from API index +wis2-grep clean --hours 24 +``` + +## API queries + +```bash + +# by topic +curl "http://localhost/collections/wis2-notification-messages/items?q=%22cache/b/wis2%22" + +# by bounding box (Canada): +curl "http://localhost/collections/wis2-notification-messages/items?bbox=-142,42,-5,84" + +# by publication time (from/to): +curl "http://localhost/collections/wis2-notification-messages/items?datetime=2024-07-24T11:11:11Z/2024-07-25T12:34:21Z" + +# by publication time (from): +curl "http://localhost/collections/wis2-notification-messages/items?datetime=2024-07-24T11:11:11Z/.." + +# by publication time (to): +curl "http://localhost/collections/wis2-notification-messages/items?datetime=../2024-07-24T11:11:11Z" + +# by message identifier +curl "http://localhost/collections/wis2-notification-messages/items/" + +# sort results by oldest messages (pubtime) +curl "http://localhost/collections/wis2-notification-messages/items?sortby=pubtime" + +# sort results by latest messages (pubtime) +curl "http://localhost/collections/wis2-notification-messages/items?sortby=-pubtime" + +# return as GeoJSON +curl "http://localhost/collections/wis2-notification-messages/items?f=json" + +# return as HTML +curl "http://localhost/collections/wis2-notification-messages/items?f=html" ``` ### Docker diff --git a/wis2-grep-management/docker/wis2-grep-management.cron b/wis2-grep-management/docker/wis2-grep-management.cron index 2878605..2ebf473 100644 --- a/wis2-grep-management/docker/wis2-grep-management.cron +++ b/wis2-grep-management/docker/wis2-grep-management.cron @@ -1 +1 @@ -19 * * * * su -c "wis2-grep clean" > /proc/1/fd/1 2>/proc/1/fd/2 +19 * * * * su -c "wis2-grep clean --hours=$WIS2_GREP_MESSAGE_RETENTION_HOURS" > /proc/1/fd/1 2>/proc/1/fd/2 diff --git a/wis2-grep-management/wis2_grep/__init__.py b/wis2-grep-management/wis2_grep/__init__.py index 72f2efd..2d45fdd 100644 --- a/wis2-grep-management/wis2_grep/__init__.py +++ b/wis2-grep-management/wis2_grep/__init__.py @@ -21,7 +21,7 @@ import click -from wis2_grep.loader import load, setup, teardown +from wis2_grep.loader import clean, load, setup, teardown __version__ = '0.1.dev0' @@ -34,6 +34,7 @@ def cli(): pass +cli.add_command(clean) cli.add_command(setup) cli.add_command(teardown) cli.add_command(load) diff --git a/wis2-grep-management/wis2_grep/backend/base.py b/wis2-grep-management/wis2_grep/backend/base.py index 26631de..1961ccb 100644 --- a/wis2-grep-management/wis2_grep/backend/base.py +++ b/wis2-grep-management/wis2_grep/backend/base.py @@ -50,11 +50,11 @@ def teardown(self) -> None: raise NotImplementedError() @abstractmethod - def save(self, record: dict) -> None: + def save(self, message: dict) -> None: """ Upsert a resource to a backend - :param payload: `dict` of resource + :param payload: `dict` of message :returns: `None` """ @@ -62,13 +62,25 @@ def save(self, record: dict) -> None: raise NotImplementedError() @abstractmethod - def exists(self, identifier: str) -> bool: + def message_exists(self, identifier: str) -> bool: """ - Querying whether a record exists in a backend + Query whether a message exists in a backend - :param identifier: `str` of record identifier + :param identifier: `str` of message identifier - :returns: `bool` of whether record exists in backend + :returns: `bool` of whether message exists in backend + """ + + raise NotImplementedError() + + @abstractmethod + def clean(self, hours: int) -> None: + """ + Clean messages older than n hours from backend + + :param hours: `int` of hours of message max age + + :returns: `None` """ raise NotImplementedError() diff --git a/wis2-grep-management/wis2_grep/backend/elastic.py b/wis2-grep-management/wis2_grep/backend/elastic.py index e7d0f0f..878d3d5 100644 --- a/wis2-grep-management/wis2_grep/backend/elastic.py +++ b/wis2-grep-management/wis2_grep/backend/elastic.py @@ -19,6 +19,7 @@ # ############################################################################### +from datetime import datetime, timedelta import logging from urllib.parse import urlparse @@ -65,42 +66,6 @@ def __init__(self, defs): 'ignore_malformed': True } } - }, - 'properties': { - 'properties': { - 'type': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'title': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'description': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'wmo:dataPolicy': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - } - } } } } @@ -154,11 +119,11 @@ def teardown(self) -> None: LOGGER.debug(f'Deleting index {self.index_name}') self.es.indices.delete(index=self.index_name) - def save(self, record: dict) -> None: - LOGGER.debug(f"Indexing record {record['id']}") - self.es.index(index=self.index_name, id=record['id'], body=record) + def save(self, message: dict) -> None: + LOGGER.debug(f"Indexing message {message['id']}") + self.es.index(index=self.index_name, id=message['id'], body=message) - def exists(self, identifier: str) -> bool: + def message_exists(self, identifier: str) -> bool: LOGGER.debug(f'Querying Replay API for id {identifier}') try: _ = self.es.get(index=self.index_name, id=identifier) @@ -166,5 +131,41 @@ def exists(self, identifier: str) -> bool: except NotFoundError: return False + def clean(self, hours: int) -> None: + before = datetime_hours_ago(hours) + + query = { + 'query': { + 'bool': { + 'should': [{ + 'range': { + 'properties.pubtime': { + 'lte': before + } + } + }] + } + } + } + + LOGGER.debug(f'deleting documents older than {hours} hours ({before})') # noqa + self.es.delete_by_query(index=self.index_name, **query) + + return + def __repr__(self): return '' + + +def datetime_hours_ago(hours: int) -> datetime: + """ + Calculate datetime given n hours ago + + :param hours: `int` of number of hours + + :returns: `datetime.date` object of date n hours ago + """ + + today = datetime.utcnow() + + return today - timedelta(hours=hours) diff --git a/wis2-grep-management/wis2_grep/backend/ogcapi_features.py b/wis2-grep-management/wis2_grep/backend/ogcapi_features.py index 8d05c38..6fb53fb 100644 --- a/wis2-grep-management/wis2_grep/backend/ogcapi_features.py +++ b/wis2-grep-management/wis2_grep/backend/ogcapi_features.py @@ -57,7 +57,7 @@ def save(self): LOGGER.debug('Updating existing notification in collection') _ = self.conn.get_collection_update(self.collection, payload) - def exists(self, identifier: str) -> bool: + def message_exists(self, identifier: str) -> bool: LOGGER.debug(f'Querying Replay API for id {identifier}') try: _ = self.conn.collection_item(self.collection, identifier) diff --git a/wis2-grep-management/wis2_grep/env.py b/wis2-grep-management/wis2_grep/env.py index 2ec035a..78cc541 100644 --- a/wis2-grep-management/wis2_grep/env.py +++ b/wis2-grep-management/wis2_grep/env.py @@ -49,7 +49,11 @@ def str2bool(value: Any) -> bool: BACKEND_CONNECTION = os.environ.get('WIS2_GREP_BACKEND_CONNECTION') CENTRE_ID = os.environ.get('WIS2_GREP_CENTRE_ID') GB = os.environ.get('WIS2_GREP_GB') +MESSAGE_RETENTION_HOURS = os.environ.get('WIS2_GREP_MESSAGE_RETENTION_HOURS', + 3) + +MESSAGE_RETENTION_HOURS = int(MESSAGE_RETENTION_HOURS) if None in [API_URL, API_URL_DOCKER, BACKEND_TYPE, BACKEND_CONNECTION, - CENTRE_ID, GB]: + CENTRE_ID, GB, MESSAGE_RETENTION_HOURS]: raise EnvironmentError('Environment variables not set!') diff --git a/wis2-grep-management/wis2_grep/loader.py b/wis2-grep-management/wis2_grep/loader.py index 1032bcd..97a66fd 100644 --- a/wis2-grep-management/wis2_grep/loader.py +++ b/wis2-grep-management/wis2_grep/loader.py @@ -29,7 +29,8 @@ from pywis_pubsub import cli_options from wis2_grep.backend import BACKENDS -from wis2_grep.env import BACKEND_TYPE, BACKEND_CONNECTION +from wis2_grep.env import (BACKEND_TYPE, BACKEND_CONNECTION, + MESSAGE_RETENTION_HOURS) LOGGER = logging.getLogger(__name__) @@ -143,3 +144,23 @@ def load(ctx, path, verbosity='NOTSET'): with w2p.open() as fh: r = Loader() r.load(fh.read()) + + +@click.command() +@click.pass_context +@click.option('--hours', type=int, default=MESSAGE_RETENTION_HOURS, + help='Number of hours of messages to keep') +@cli_options.OPTION_VERBOSITY +def clean(ctx, hours, verbosity): + """Clean messages on API indexes""" + + hours_ = hours or MESSAGE_RETENTION_HOURS + + if hours_ is None or hours_ < 0: + click.echo('No data retention set. Skipping') + else: + backend = BACKENDS[BACKEND_TYPE]({'connection': BACKEND_CONNECTION}) + LOGGER.debug(f'Backend: {backend}') + backend.clean(hours_) + + click.echo(f'Deleting messages > {hours_} hour(s) old from {backend}') diff --git a/wis2-grep.env b/wis2-grep.env index 8fc62d5..9b2725f 100644 --- a/wis2-grep.env +++ b/wis2-grep.env @@ -6,6 +6,7 @@ export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/wis2-notificat export WIS2_GREP_BROKER_URL=mqtt://wis2-grep:wis2-grep@wis2-grep-broker:1883 export WIS2_GREP_CENTRE_ID=ca-eccc-msc-global-replay export WIS2_GREP_GB=mqtts://everyone:everyone@globalbroker.meteo.fr:8883 +export WIS2_GREP_MESSAGE_RETENTION_HOURS=3 # global broker links export WIS2_GREP_GB_LINK_METEOFRANCE="fr-meteo-france-global-broker,mqtts://everyone:everyone@globalbroker.meteo.fr:8883,Météo-France, Global Broker Service"