diff --git a/README.md b/README.md index f54cec6..c9eefde 100644 --- a/README.md +++ b/README.md @@ -93,9 +93,11 @@ make force-build # start all containers make up +# API is up at http://localhos # start all containers in dev mode make dev +# API is up at http://localhost # view all container logs in realtime make logs diff --git a/docker-compose.yml b/docker-compose.yml index 34e186f..af878aa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -62,7 +62,7 @@ services: wis2-grep-backend: condition: service_healthy healthcheck: - test: ["CMD", "curl", "-f", "http://wis2-grep-backend:9200/wis2-notification-messages"] + test: ["CMD", "curl", "-f", "http://wis2-grep-backend:9200/wis2-notification-messages-origin", "&&", "curl", "-f", "http://wis2-grep-backend:9200/wis2-notification-messages-cache" ] interval: 5s retries: 3 volumes: diff --git a/wis2-grep-api/docker/wis2-grep-api.yml b/wis2-grep-api/docker/wis2-grep-api.yml index 33d00a0..668a37e 100644 --- a/wis2-grep-api/docker/wis2-grep-api.yml +++ b/wis2-grep-api/docker/wis2-grep-api.yml @@ -53,10 +53,10 @@ metadata: role: pointOfContact resources: - wis2-notification-messages: + wis2-notification-messages-origin: type: collection - title: WIS2 notification messages - description: WIS2 notification messages + title: WIS2 notification messages (origin) + description: WIS2 notification messages from origin/a/wis2 keywords: [wmo, wis2, notifications] crs: - CRS84 @@ -96,6 +96,52 @@ resources: providers: - type: feature name: ${WIS2_GREP_BACKEND_TYPE} - data: ${WIS2_GREP_BACKEND_CONNECTION} + data: ${WIS2_GREP_BACKEND_CONNECTION}/wis2-notification-messages-origin + id_field: id + time_field: pubtime + wis2-notification-messages-cache: + type: collection + title: WIS2 notification messages (cache) + description: WIS2 notification messages from cache/a/wis2 + keywords: [wmo, wis2, notifications] + crs: + - CRS84 + links: + - type: application/geo+json + rel: items + title: Notifications from Météo-France, Global Broker Service + href: mqtts://everyone:everyone@globalbroker.meteo.fr:8883 + channel: 'cache/a/wis2/#' + length: -1 + - type: application/geo+json + rel: items + title: Notifications from China Meteorological Agency, Global Broker Service + href: mqtts://everyone:everyone@gb.wis.cma.cn:8883 + channel: 'cache/a/wis2/#' + length: -1 + - type: application/geo+json + rel: items + title: Notifications from National Oceanic and Atmospheric Administration, National Weather Service, Global Broker Service + href: mqtts://everyone:everyone@wis2globalbroker.nws.noaa.gov:8883 + channel: 'cache/a/wis2/#' + length: -1 + - type: application/geo+json + rel: items + title: Notifications from Instituto Nacional de Meteorologia (Brazil), Global Broker Service + href: mqtts://everyone:everyone@globalbroker.inmet.gov.br:8883 + channel: 'cache/a/wis2/#' + length: -1 + - type: text/html + rel: canonical + title: WMO Information System (WIS) | World Meteorological Organization + href: https://community.wmo.int/en/activity-areas/wis + extents: + spatial: + bbox: [-180, -90, 180, 90] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: ${WIS2_GREP_BACKEND_TYPE} + data: ${WIS2_GREP_BACKEND_CONNECTION}/wis2-notification-messages-cache id_field: id time_field: pubtime diff --git a/wis2-grep-management/wis2_grep/backend/base.py b/wis2-grep-management/wis2_grep/backend/base.py index 26631de..19da60f 100644 --- a/wis2-grep-management/wis2_grep/backend/base.py +++ b/wis2-grep-management/wis2_grep/backend/base.py @@ -50,10 +50,11 @@ def teardown(self) -> None: raise NotImplementedError() @abstractmethod - def save(self, record: dict) -> None: + def save(self, channel: str, record: dict) -> None: """ Upsert a resource to a backend + :param channel: `str` of channel (`origin` or `cache`) :param payload: `dict` of resource :returns: `None` @@ -62,10 +63,11 @@ def save(self, record: dict) -> None: raise NotImplementedError() @abstractmethod - def exists(self, identifier: str) -> bool: + def exists(self, channel: str, identifier: str) -> bool: """ Querying whether a record exists in a backend + :param channel: `str` of channel (`origin` or `cache`) :param identifier: `str` of record identifier :returns: `bool` of whether record exists in backend diff --git a/wis2-grep-management/wis2_grep/backend/elastic.py b/wis2-grep-management/wis2_grep/backend/elastic.py index e7d0f0f..d61e0bb 100644 --- a/wis2-grep-management/wis2_grep/backend/elastic.py +++ b/wis2-grep-management/wis2_grep/backend/elastic.py @@ -107,7 +107,6 @@ def __init__(self, defs): } self.url_parsed = urlparse(self.defs.get('connection')) - self.index_name = self.url_parsed.path.lstrip('/') url2 = f'{self.url_parsed.scheme}://{self.url_parsed.netloc}' @@ -124,11 +123,9 @@ def __init__(self, defs): if self.url_parsed.path.count('/') > 1: LOGGER.debug('ES URL has a basepath') basepath = self.url_parsed.path.split('/')[1] - self.index_name = self.url_parsed.path.split('/')[-1] url2 = f'{url2}/{basepath}/' LOGGER.debug(f'ES URL: {url2}') - LOGGER.debug(f'ES index: {self.index_name}') settings = { 'hosts': [url2], @@ -146,22 +143,29 @@ def __init__(self, defs): def setup(self) -> None: self.teardown() - LOGGER.debug(f'Creating index {self.index_name}') - self.es.indices.create(index=self.index_name, body=self.ES_SETTINGS) + LOGGER.debug('Creating indexes') + + for channel in ['origin', 'cache']: + index_name = f'wis2-notification-messages-{channel}' + self.es.indices.create(index=index_name, body=self.ES_SETTINGS) def teardown(self) -> None: - if self.es.indices.exists(index=self.index_name): - LOGGER.debug(f'Deleting index {self.index_name}') - self.es.indices.delete(index=self.index_name) + for channel in ['origin', 'cache']: + index_name = f'wis2-notification-messages-{channel}' + if self.es.indices.exists(index=index_name): + LOGGER.debug(f'Deleting index {index_name}') + self.es.indices.delete(index=index_name) - def save(self, record: dict) -> None: + def save(self, channel: str, record: dict) -> None: LOGGER.debug(f"Indexing record {record['id']}") - self.es.index(index=self.index_name, id=record['id'], body=record) + index_name = f'wis2-notification-messages-{channel}' + self.es.index(index=index_name, id=record['id'], body=record) - def exists(self, identifier: str) -> bool: + def exists(self, channel: str, identifier: str) -> bool: LOGGER.debug(f'Querying Replay API for id {identifier}') + index_name = f'wis2-notification-messages-{channel}' try: - _ = self.es.get(index=self.index_name, id=identifier) + _ = self.es.get(index=index_name, id=identifier) return True except NotFoundError: return False diff --git a/wis2-grep-management/wis2_grep/backend/ogcapi_features.py b/wis2-grep-management/wis2_grep/backend/ogcapi_features.py index 8d05c38..a71aefc 100644 --- a/wis2-grep-management/wis2_grep/backend/ogcapi_features.py +++ b/wis2-grep-management/wis2_grep/backend/ogcapi_features.py @@ -36,10 +36,10 @@ def __init__(self, defs): super().__init__(defs) self.conn = Features(env.API_URL) - self.collection = 'notification-messsages' - def save(self): + def save(self, channel: str): + collection = f'wis2-notification-messages-{channel}' ttype = 'create' try: @@ -52,15 +52,16 @@ def save(self): if ttype == 'create': LOGGER.debug('Adding new notification to collection') - _ = self.conn.get_collection_create(self.collection, payload) + _ = self.conn.get_collection_create(collection, payload) elif ttype == 'update': LOGGER.debug('Updating existing notification in collection') - _ = self.conn.get_collection_update(self.collection, payload) + _ = self.conn.get_collection_update(collection, payload) - def exists(self, identifier: str) -> bool: + def exists(self, channel: str, identifier: str) -> bool: + collection = f'wis2-notification-messages-{channel}' LOGGER.debug(f'Querying Replay API for id {identifier}') try: - _ = self.conn.collection_item(self.collection, identifier) + _ = self.conn.collection_item(collection, identifier) return True except RuntimeError: return False diff --git a/wis2-grep-management/wis2_grep/loader.py b/wis2-grep-management/wis2_grep/loader.py index 1032bcd..3c7aff1 100644 --- a/wis2-grep-management/wis2_grep/loader.py +++ b/wis2-grep-management/wis2_grep/loader.py @@ -42,6 +42,7 @@ def __init__(self): :returns: `wis2_grep.loader.Loader` """ + self.channel = None self.backend = BACKENDS[BACKEND_TYPE]( {'connection': BACKEND_CONNECTION}) @@ -55,6 +56,11 @@ def load(self, message: Union[dict, str], topic: str = None) -> None: :returns: `None` """ + if topic.startswith('origin'): + self.channel = 'origin' + elif topic.startswith('cache'): + self.channel = 'cache' + if isinstance(message, dict): LOGGER.debug('Notification message is already a dict') self.message = message @@ -83,7 +89,7 @@ def _publish(self): """ LOGGER.info(f'Saving to {BACKEND_TYPE} ({BACKEND_CONNECTION})') - self.backend.save(self.message) + self.backend.save(self.channel, self.message) def __repr__(self): return '' diff --git a/wis2-grep.env b/wis2-grep.env index 54fe992..d6edd4e 100644 --- a/wis2-grep.env +++ b/wis2-grep.env @@ -2,7 +2,7 @@ export WIS2_GREP_LOGGING_LEVEL=DEBUG export WIS2_GREP_API_URL=http://localhost export WIS2_GREP_API_URL_DOCKER=http://wis2-grep-api export WIS2_GREP_BACKEND_TYPE=Elasticsearch -export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/wis2-notification-messages +export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/ export WIS2_GREP_CENTRE_ID=ca-eccc-msc-global-replay #export WIS2_GREP_GB=mqtts://everyone:everyone@wis2globalbroker.nws.noaa.gov:8883 export WIS2_GREP_GB=mqtts://everyone:everyone@globalbroker.meteo.fr:8883