Skip to content

Commit

Permalink
split cache / origin into separate collections
Browse files Browse the repository at this point in the history
  • Loading branch information
tomkralidis committed Jun 3, 2024
1 parent e51f4de commit 697c956
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 27 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ make force-build

# start all containers
make up
# API is up at http://localhos

# start all containers in dev mode
make dev
# API is up at http://localhost

# view all container logs in realtime
make logs
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ services:
wis2-grep-backend:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://wis2-grep-backend:9200/wis2-notification-messages"]
test: ["CMD", "curl", "-f", "http://wis2-grep-backend:9200/wis2-notification-messages-origin", "&&", "curl", "-f", "http://wis2-grep-backend:9200/wis2-notification-messages-cache" ]
interval: 5s
retries: 3
volumes:
Expand Down
54 changes: 50 additions & 4 deletions wis2-grep-api/docker/wis2-grep-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ metadata:
role: pointOfContact

resources:
wis2-notification-messages:
wis2-notification-messages-origin:
type: collection
title: WIS2 notification messages
description: WIS2 notification messages
title: WIS2 notification messages (origin)
description: WIS2 notification messages from origin/a/wis2
keywords: [wmo, wis2, notifications]
crs:
- CRS84
Expand Down Expand Up @@ -96,6 +96,52 @@ resources:
providers:
- type: feature
name: ${WIS2_GREP_BACKEND_TYPE}
data: ${WIS2_GREP_BACKEND_CONNECTION}
data: ${WIS2_GREP_BACKEND_CONNECTION}/wis2-notification-messages-origin
id_field: id
time_field: pubtime
wis2-notification-messages-cache:
type: collection
title: WIS2 notification messages (cache)
description: WIS2 notification messages from cache/a/wis2
keywords: [wmo, wis2, notifications]
crs:
- CRS84
links:
- type: application/geo+json
rel: items
title: Notifications from Météo-France, Global Broker Service
href: mqtts://everyone:[email protected]:8883
channel: 'cache/a/wis2/#'
length: -1
- type: application/geo+json
rel: items
title: Notifications from China Meteorological Agency, Global Broker Service
href: mqtts://everyone:[email protected]:8883
channel: 'cache/a/wis2/#'
length: -1
- type: application/geo+json
rel: items
title: Notifications from National Oceanic and Atmospheric Administration, National Weather Service, Global Broker Service
href: mqtts://everyone:[email protected]:8883
channel: 'cache/a/wis2/#'
length: -1
- type: application/geo+json
rel: items
title: Notifications from Instituto Nacional de Meteorologia (Brazil), Global Broker Service
href: mqtts://everyone:[email protected]:8883
channel: 'cache/a/wis2/#'
length: -1
- type: text/html
rel: canonical
title: WMO Information System (WIS) | World Meteorological Organization
href: https://community.wmo.int/en/activity-areas/wis
extents:
spatial:
bbox: [-180, -90, 180, 90]
crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84
providers:
- type: feature
name: ${WIS2_GREP_BACKEND_TYPE}
data: ${WIS2_GREP_BACKEND_CONNECTION}/wis2-notification-messages-cache
id_field: id
time_field: pubtime
6 changes: 4 additions & 2 deletions wis2-grep-management/wis2_grep/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ def teardown(self) -> None:
raise NotImplementedError()

@abstractmethod
def save(self, record: dict) -> None:
def save(self, channel: str, record: dict) -> None:
"""
Upsert a resource to a backend
:param channel: `str` of channel (`origin` or `cache`)
:param payload: `dict` of resource
:returns: `None`
Expand All @@ -62,10 +63,11 @@ def save(self, record: dict) -> None:
raise NotImplementedError()

@abstractmethod
def exists(self, identifier: str) -> bool:
def exists(self, channel: str, identifier: str) -> bool:
"""
Querying whether a record exists in a backend
:param channel: `str` of channel (`origin` or `cache`)
:param identifier: `str` of record identifier
:returns: `bool` of whether record exists in backend
Expand Down
28 changes: 16 additions & 12 deletions wis2-grep-management/wis2_grep/backend/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def __init__(self, defs):
}

self.url_parsed = urlparse(self.defs.get('connection'))
self.index_name = self.url_parsed.path.lstrip('/')

url2 = f'{self.url_parsed.scheme}://{self.url_parsed.netloc}'

Expand All @@ -124,11 +123,9 @@ def __init__(self, defs):
if self.url_parsed.path.count('/') > 1:
LOGGER.debug('ES URL has a basepath')
basepath = self.url_parsed.path.split('/')[1]
self.index_name = self.url_parsed.path.split('/')[-1]
url2 = f'{url2}/{basepath}/'

LOGGER.debug(f'ES URL: {url2}')
LOGGER.debug(f'ES index: {self.index_name}')

settings = {
'hosts': [url2],
Expand All @@ -146,22 +143,29 @@ def __init__(self, defs):

def setup(self) -> None:
self.teardown()
LOGGER.debug(f'Creating index {self.index_name}')
self.es.indices.create(index=self.index_name, body=self.ES_SETTINGS)
LOGGER.debug('Creating indexes')

for channel in ['origin', 'cache']:
index_name = f'wis2-notification-messages-{channel}'
self.es.indices.create(index=index_name, body=self.ES_SETTINGS)

def teardown(self) -> None:
if self.es.indices.exists(index=self.index_name):
LOGGER.debug(f'Deleting index {self.index_name}')
self.es.indices.delete(index=self.index_name)
for channel in ['origin', 'cache']:
index_name = f'wis2-notification-messages-{channel}'
if self.es.indices.exists(index=index_name):
LOGGER.debug(f'Deleting index {index_name}')
self.es.indices.delete(index=index_name)

def save(self, record: dict) -> None:
def save(self, channel: str, record: dict) -> None:
LOGGER.debug(f"Indexing record {record['id']}")
self.es.index(index=self.index_name, id=record['id'], body=record)
index_name = f'wis2-notification-messages-{channel}'
self.es.index(index=index_name, id=record['id'], body=record)

def exists(self, identifier: str) -> bool:
def exists(self, channel: str, identifier: str) -> bool:
LOGGER.debug(f'Querying Replay API for id {identifier}')
index_name = f'wis2-notification-messages-{channel}'
try:
_ = self.es.get(index=self.index_name, id=identifier)
_ = self.es.get(index=index_name, id=identifier)
return True
except NotFoundError:
return False
Expand Down
13 changes: 7 additions & 6 deletions wis2-grep-management/wis2_grep/backend/ogcapi_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def __init__(self, defs):
super().__init__(defs)

self.conn = Features(env.API_URL)
self.collection = 'notification-messsages'

def save(self):
def save(self, channel: str):

collection = f'wis2-notification-messages-{channel}'
ttype = 'create'

try:
Expand All @@ -52,15 +52,16 @@ def save(self):

if ttype == 'create':
LOGGER.debug('Adding new notification to collection')
_ = self.conn.get_collection_create(self.collection, payload)
_ = self.conn.get_collection_create(collection, payload)
elif ttype == 'update':
LOGGER.debug('Updating existing notification in collection')
_ = self.conn.get_collection_update(self.collection, payload)
_ = self.conn.get_collection_update(collection, payload)

def exists(self, identifier: str) -> bool:
def exists(self, channel: str, identifier: str) -> bool:
collection = f'wis2-notification-messages-{channel}'
LOGGER.debug(f'Querying Replay API for id {identifier}')
try:
_ = self.conn.collection_item(self.collection, identifier)
_ = self.conn.collection_item(collection, identifier)
return True
except RuntimeError:
return False
Expand Down
8 changes: 7 additions & 1 deletion wis2-grep-management/wis2_grep/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self):
:returns: `wis2_grep.loader.Loader`
"""

self.channel = None
self.backend = BACKENDS[BACKEND_TYPE](
{'connection': BACKEND_CONNECTION})

Expand All @@ -55,6 +56,11 @@ def load(self, message: Union[dict, str], topic: str = None) -> None:
:returns: `None`
"""

if topic.startswith('origin'):
self.channel = 'origin'
elif topic.startswith('cache'):
self.channel = 'cache'

if isinstance(message, dict):
LOGGER.debug('Notification message is already a dict')
self.message = message
Expand Down Expand Up @@ -83,7 +89,7 @@ def _publish(self):
"""

LOGGER.info(f'Saving to {BACKEND_TYPE} ({BACKEND_CONNECTION})')
self.backend.save(self.message)
self.backend.save(self.channel, self.message)

def __repr__(self):
return '<Loader>'
Expand Down
2 changes: 1 addition & 1 deletion wis2-grep.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export WIS2_GREP_LOGGING_LEVEL=DEBUG
export WIS2_GREP_API_URL=http://localhost
export WIS2_GREP_API_URL_DOCKER=http://wis2-grep-api
export WIS2_GREP_BACKEND_TYPE=Elasticsearch
export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/wis2-notification-messages
export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/
export WIS2_GREP_CENTRE_ID=ca-eccc-msc-global-replay
#export WIS2_GREP_GB=mqtts://everyone:[email protected]:8883
export WIS2_GREP_GB=mqtts://everyone:[email protected]:8883

0 comments on commit 697c956

Please sign in to comment.