Skip to content

Commit

Permalink
add message retention workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
tomkralidis committed Jul 29, 2024
1 parent 0eb05e6 commit bf63931
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 51 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,29 @@ wis2-grep register /path/to/wnm-file.json

# loading notification messages manually (directory of .json files)
wis2-grep load /path/to/dir/of/wnm-files

# clean messages from API index
wis2-grep clean --hours 24
```

## API queries

By bounding box (Canada):
http://localhost/collections/wis2-notification-messages/items?bbox=-142,42,-5,84

By publication time (from/to):
http://localhost/collections/wis2-notification-messages/items?datetime=2024-07-24T11:11:11Z/2024-07-25T12:34:21Z

By publication time (from):
http://localhost/collections/wis2-notification-messages/items?datetime=2024-07-24T11:11:11Z/..

By publication time (to):
http://localhost/collections/wis2-notification-messages/items?datetime=../2024-07-24T11:11:11Z

By message identifier
http://localhost/collections/wis2-notification-messages/items/WNM_ID


### Docker

The Docker setup uses Docker and Docker Compose to manage the following services:
Expand Down
2 changes: 1 addition & 1 deletion wis2-grep-management/docker/wis2-grep-management.cron
Original file line number Diff line number Diff line change
@@ -1 +1 @@
19 * * * * su -c "wis2-grep clean" > /proc/1/fd/1 2>/proc/1/fd/2
19 * * * * su -c "wis2-grep clean --hours=$WIS2_GREP_MESSAGE_RETENTION_HOURS" > /proc/1/fd/1 2>/proc/1/fd/2
3 changes: 2 additions & 1 deletion wis2-grep-management/wis2_grep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import click

from wis2_grep.loader import load, setup, teardown
from wis2_grep.loader import clean, load, setup, teardown

__version__ = '0.1.dev0'

Expand All @@ -34,6 +34,7 @@ def cli():
pass


cli.add_command(clean)
cli.add_command(setup)
cli.add_command(teardown)
cli.add_command(load)
24 changes: 18 additions & 6 deletions wis2-grep-management/wis2_grep/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,37 @@ def teardown(self) -> None:
raise NotImplementedError()

@abstractmethod
def save(self, record: dict) -> None:
def save(self, message: dict) -> None:
"""
Upsert a resource to a backend
:param payload: `dict` of resource
:param payload: `dict` of message
:returns: `None`
"""

raise NotImplementedError()

@abstractmethod
def exists(self, identifier: str) -> bool:
def message_exists(self, identifier: str) -> bool:
"""
Querying whether a record exists in a backend
Query whether a message exists in a backend
:param identifier: `str` of record identifier
:param identifier: `str` of message identifier
:returns: `bool` of whether record exists in backend
:returns: `bool` of whether message exists in backend
"""

raise NotImplementedError()

@abstractmethod
def clean(self, hours: int) -> None:
"""
Clean messages older than n hours from backend
:param hours: `int` of hours of message max age
:returns: `None`
"""

raise NotImplementedError()
Expand Down
81 changes: 41 additions & 40 deletions wis2-grep-management/wis2_grep/backend/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#
###############################################################################

from datetime import datetime, timedelta
import logging
from urllib.parse import urlparse

Expand Down Expand Up @@ -65,42 +66,6 @@ def __init__(self, defs):
'ignore_malformed': True
}
}
},
'properties': {
'properties': {
'type': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
},
'title': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
},
'description': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
},
'wmo:dataPolicy': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
}
}
}
}
}
Expand Down Expand Up @@ -154,17 +119,53 @@ def teardown(self) -> None:
LOGGER.debug(f'Deleting index {self.index_name}')
self.es.indices.delete(index=self.index_name)

def save(self, record: dict) -> None:
LOGGER.debug(f"Indexing record {record['id']}")
self.es.index(index=self.index_name, id=record['id'], body=record)
def save(self, message: dict) -> None:
LOGGER.debug(f"Indexing message {message['id']}")
self.es.index(index=self.index_name, id=message['id'], body=message)

def exists(self, identifier: str) -> bool:
def message_exists(self, identifier: str) -> bool:
LOGGER.debug(f'Querying Replay API for id {identifier}')
try:
_ = self.es.get(index=self.index_name, id=identifier)
return True
except NotFoundError:
return False

def clean(self, hours: int) -> None:
before = datetime_hours_ago(hours)

query = {
'query': {
'bool': {
'should': [{
'range': {
'properties.pubtime': {
'lte': before
}
}
}]
}
}
}

LOGGER.debug(f'deleting documents older than {hours} hours ({before})') # noqa
self.es.delete_by_query(index=self.index_name, **query)

return

def __repr__(self):
return '<ElasticsearchBackend>'


def datetime_hours_ago(hours: int) -> datetime:
"""
Calculate datetime given n hours ago
:param hours: `int` of number of hours
:returns: `datetime.date` object of date n hours ago
"""

today = datetime.utcnow()

return today - timedelta(hours=hours)
2 changes: 1 addition & 1 deletion wis2-grep-management/wis2_grep/backend/ogcapi_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def save(self):
LOGGER.debug('Updating existing notification in collection')
_ = self.conn.get_collection_update(self.collection, payload)

def exists(self, identifier: str) -> bool:
def message_exists(self, identifier: str) -> bool:
LOGGER.debug(f'Querying Replay API for id {identifier}')
try:
_ = self.conn.collection_item(self.collection, identifier)
Expand Down
6 changes: 5 additions & 1 deletion wis2-grep-management/wis2_grep/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ def str2bool(value: Any) -> bool:
BACKEND_CONNECTION = os.environ.get('WIS2_GREP_BACKEND_CONNECTION')
CENTRE_ID = os.environ.get('WIS2_GREP_CENTRE_ID')
GB = os.environ.get('WIS2_GREP_GB')
MESSAGE_RETENTION_HOURS = os.environ.get('WIS2_GREP_MESSAGE_RETENTION_HOURS',
3)

MESSAGE_RETENTION_HOURS = int(MESSAGE_RETENTION_HOURS)

if None in [API_URL, API_URL_DOCKER, BACKEND_TYPE, BACKEND_CONNECTION,
CENTRE_ID, GB]:
CENTRE_ID, GB, MESSAGE_RETENTION_HOURS]:
raise EnvironmentError('Environment variables not set!')
23 changes: 22 additions & 1 deletion wis2-grep-management/wis2_grep/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from pywis_pubsub import cli_options

from wis2_grep.backend import BACKENDS
from wis2_grep.env import BACKEND_TYPE, BACKEND_CONNECTION
from wis2_grep.env import (BACKEND_TYPE, BACKEND_CONNECTION,
MESSAGE_RETENTION_HOURS)

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -143,3 +144,23 @@ def load(ctx, path, verbosity='NOTSET'):
with w2p.open() as fh:
r = Loader()
r.load(fh.read())


@click.command()
@click.pass_context
@click.option('--hours', type=int, default=MESSAGE_RETENTION_HOURS,
help='Number of hours of data to keep')
@cli_options.OPTION_VERBOSITY
def clean(ctx, hours, verbosity):
"""Clean messages on API indexes"""

hours_ = hours or MESSAGE_RETENTION_HOURS

if hours_ is None or hours_ < 0:
click.echo('No data retention set. Skipping')
else:
backend = BACKENDS[BACKEND_TYPE]({'connection': BACKEND_CONNECTION})
LOGGER.debug(f'Backend: {backend}')
backend.clean(hours_)

click.echo(f'Deleting messages > {hours_} hour(s) old from {backend}')
1 change: 1 addition & 0 deletions wis2-grep.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/wis2-notificat
export WIS2_GREP_BROKER_URL=mqtt://wis2-grep:wis2-grep@wis2-grep-broker:1883
export WIS2_GREP_CENTRE_ID=ca-eccc-msc-global-replay
export WIS2_GREP_GB=mqtts://everyone:[email protected]:8883
export WIS2_GREP_MESSAGE_RETENTION_HOURS=3

# global broker links
export WIS2_GREP_GB_LINK_METEOFRANCE="fr-meteo-france-global-broker,mqtts://everyone:[email protected]:8883,Météo-France, Global Broker Service"
Expand Down

0 comments on commit bf63931

Please sign in to comment.