From 697c9560fa570d1ba0d32bfa16c18a03c9702206 Mon Sep 17 00:00:00 2001
From: Tom Kralidis <tomkralidis@gmail.com>
Date: Mon, 3 Jun 2024 19:24:57 -0400
Subject: [PATCH] split cache / origin into separate collections

---
 README.md                                     |  2 +
 docker-compose.yml                            |  2 +-
 wis2-grep-api/docker/wis2-grep-api.yml        | 54 +++++++++++++++++--
 .../wis2_grep/backend/base.py                 |  6 ++-
 .../wis2_grep/backend/elastic.py              | 28 +++++-----
 .../wis2_grep/backend/ogcapi_features.py      | 13 ++---
 wis2-grep-management/wis2_grep/loader.py      |  8 ++-
 wis2-grep.env                                 |  2 +-
 8 files changed, 88 insertions(+), 27 deletions(-)

diff --git a/README.md b/README.md
index f54cec6..c9eefde 100644
--- a/README.md
+++ b/README.md
@@ -93,9 +93,11 @@ make force-build
 
 # start all containers
 make up
+# API is up at http://localhos
 
 # start all containers in dev mode
 make dev
+# API is up at http://localhost
 
 # view all container logs in realtime
 make logs
diff --git a/docker-compose.yml b/docker-compose.yml
index 34e186f..af878aa 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -62,7 +62,7 @@ services:
       wis2-grep-backend:
         condition: service_healthy
     healthcheck:
-      test: ["CMD", "curl", "-f", "http://wis2-grep-backend:9200/wis2-notification-messages"]
+      test: ["CMD", "curl", "-f", "http://wis2-grep-backend:9200/wis2-notification-messages-origin", "&&", "curl", "-f", "http://wis2-grep-backend:9200/wis2-notification-messages-cache" ]
       interval: 5s
       retries: 3
     volumes:
diff --git a/wis2-grep-api/docker/wis2-grep-api.yml b/wis2-grep-api/docker/wis2-grep-api.yml
index 33d00a0..668a37e 100644
--- a/wis2-grep-api/docker/wis2-grep-api.yml
+++ b/wis2-grep-api/docker/wis2-grep-api.yml
@@ -53,10 +53,10 @@ metadata:
         role: pointOfContact
 
 resources:
-    wis2-notification-messages:
+    wis2-notification-messages-origin:
         type: collection
-        title: WIS2 notification messages
-        description: WIS2 notification messages
+        title: WIS2 notification messages (origin)
+        description: WIS2 notification messages from origin/a/wis2
         keywords: [wmo, wis2, notifications]
         crs:
             - CRS84
@@ -96,6 +96,52 @@ resources:
         providers:
             - type: feature
               name: ${WIS2_GREP_BACKEND_TYPE}
-              data: ${WIS2_GREP_BACKEND_CONNECTION}
+              data: ${WIS2_GREP_BACKEND_CONNECTION}/wis2-notification-messages-origin
+              id_field: id
+              time_field: pubtime
+    wis2-notification-messages-cache:
+        type: collection
+        title: WIS2 notification messages (cache)
+        description: WIS2 notification messages from cache/a/wis2
+        keywords: [wmo, wis2, notifications]
+        crs:
+            - CRS84
+        links:
+            - type: application/geo+json
+              rel: items
+              title: Notifications from Météo-France, Global Broker Service
+              href: mqtts://everyone:everyone@globalbroker.meteo.fr:8883
+              channel: 'cache/a/wis2/#'
+              length: -1
+            - type: application/geo+json
+              rel: items
+              title: Notifications from China Meteorological Agency, Global Broker Service
+              href: mqtts://everyone:everyone@gb.wis.cma.cn:8883
+              channel: 'cache/a/wis2/#'
+              length: -1
+            - type: application/geo+json
+              rel: items
+              title: Notifications from National Oceanic and Atmospheric Administration, National Weather Service, Global Broker Service
+              href: mqtts://everyone:everyone@wis2globalbroker.nws.noaa.gov:8883
+              channel: 'cache/a/wis2/#'
+              length: -1
+            - type: application/geo+json
+              rel: items
+              title: Notifications from Instituto Nacional de Meteorologia (Brazil), Global Broker Service
+              href: mqtts://everyone:everyone@globalbroker.inmet.gov.br:8883
+              channel: 'cache/a/wis2/#'
+              length: -1
+            - type: text/html
+              rel: canonical
+              title: WMO Information System (WIS) | World Meteorological Organization
+              href: https://community.wmo.int/en/activity-areas/wis
+        extents:
+            spatial:
+                bbox: [-180, -90, 180, 90]
+                crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84
+        providers:
+            - type: feature
+              name: ${WIS2_GREP_BACKEND_TYPE}
+              data: ${WIS2_GREP_BACKEND_CONNECTION}/wis2-notification-messages-cache
               id_field: id
               time_field: pubtime
diff --git a/wis2-grep-management/wis2_grep/backend/base.py b/wis2-grep-management/wis2_grep/backend/base.py
index 26631de..19da60f 100644
--- a/wis2-grep-management/wis2_grep/backend/base.py
+++ b/wis2-grep-management/wis2_grep/backend/base.py
@@ -50,10 +50,11 @@ def teardown(self) -> None:
         raise NotImplementedError()
 
     @abstractmethod
-    def save(self, record: dict) -> None:
+    def save(self, channel: str, record: dict) -> None:
         """
         Upsert a resource to a backend
 
+        :param channel: `str` of channel (`origin` or `cache`)
         :param payload: `dict` of resource
 
         :returns: `None`
@@ -62,10 +63,11 @@ def save(self, record: dict) -> None:
         raise NotImplementedError()
 
     @abstractmethod
-    def exists(self, identifier: str) -> bool:
+    def exists(self, channel: str, identifier: str) -> bool:
         """
         Querying whether a record exists in a backend
 
+        :param channel: `str` of channel (`origin` or `cache`)
         :param identifier: `str` of record identifier
 
         :returns: `bool` of whether record exists in backend
diff --git a/wis2-grep-management/wis2_grep/backend/elastic.py b/wis2-grep-management/wis2_grep/backend/elastic.py
index e7d0f0f..d61e0bb 100644
--- a/wis2-grep-management/wis2_grep/backend/elastic.py
+++ b/wis2-grep-management/wis2_grep/backend/elastic.py
@@ -107,7 +107,6 @@ def __init__(self, defs):
         }
 
         self.url_parsed = urlparse(self.defs.get('connection'))
-        self.index_name = self.url_parsed.path.lstrip('/')
 
         url2 = f'{self.url_parsed.scheme}://{self.url_parsed.netloc}'
 
@@ -124,11 +123,9 @@ def __init__(self, defs):
         if self.url_parsed.path.count('/') > 1:
             LOGGER.debug('ES URL has a basepath')
             basepath = self.url_parsed.path.split('/')[1]
-            self.index_name = self.url_parsed.path.split('/')[-1]
             url2 = f'{url2}/{basepath}/'
 
         LOGGER.debug(f'ES URL: {url2}')
-        LOGGER.debug(f'ES index: {self.index_name}')
 
         settings = {
             'hosts': [url2],
@@ -146,22 +143,29 @@ def __init__(self, defs):
 
     def setup(self) -> None:
         self.teardown()
-        LOGGER.debug(f'Creating index {self.index_name}')
-        self.es.indices.create(index=self.index_name, body=self.ES_SETTINGS)
+        LOGGER.debug('Creating indexes')
+
+        for channel in ['origin', 'cache']:
+            index_name = f'wis2-notification-messages-{channel}'
+            self.es.indices.create(index=index_name, body=self.ES_SETTINGS)
 
     def teardown(self) -> None:
-        if self.es.indices.exists(index=self.index_name):
-            LOGGER.debug(f'Deleting index {self.index_name}')
-            self.es.indices.delete(index=self.index_name)
+        for channel in ['origin', 'cache']:
+            index_name = f'wis2-notification-messages-{channel}'
+            if self.es.indices.exists(index=index_name):
+                LOGGER.debug(f'Deleting index {index_name}')
+                self.es.indices.delete(index=index_name)
 
-    def save(self, record: dict) -> None:
+    def save(self, channel: str, record: dict) -> None:
         LOGGER.debug(f"Indexing record {record['id']}")
-        self.es.index(index=self.index_name, id=record['id'], body=record)
+        index_name = f'wis2-notification-messages-{channel}'
+        self.es.index(index=index_name, id=record['id'], body=record)
 
-    def exists(self, identifier: str) -> bool:
+    def exists(self, channel: str, identifier: str) -> bool:
         LOGGER.debug(f'Querying Replay API for id {identifier}')
+        index_name = f'wis2-notification-messages-{channel}'
         try:
-            _ = self.es.get(index=self.index_name, id=identifier)
+            _ = self.es.get(index=index_name, id=identifier)
             return True
         except NotFoundError:
             return False
diff --git a/wis2-grep-management/wis2_grep/backend/ogcapi_features.py b/wis2-grep-management/wis2_grep/backend/ogcapi_features.py
index 8d05c38..a71aefc 100644
--- a/wis2-grep-management/wis2_grep/backend/ogcapi_features.py
+++ b/wis2-grep-management/wis2_grep/backend/ogcapi_features.py
@@ -36,10 +36,10 @@ def __init__(self, defs):
         super().__init__(defs)
 
         self.conn = Features(env.API_URL)
-        self.collection = 'notification-messsages'
 
-    def save(self):
+    def save(self, channel: str):
 
+        collection = f'wis2-notification-messages-{channel}'
         ttype = 'create'
 
         try:
@@ -52,15 +52,16 @@ def save(self):
 
         if ttype == 'create':
             LOGGER.debug('Adding new notification to collection')
-            _ = self.conn.get_collection_create(self.collection, payload)
+            _ = self.conn.get_collection_create(collection, payload)
         elif ttype == 'update':
             LOGGER.debug('Updating existing notification in collection')
-            _ = self.conn.get_collection_update(self.collection, payload)
+            _ = self.conn.get_collection_update(collection, payload)
 
-    def exists(self, identifier: str) -> bool:
+    def exists(self, channel: str, identifier: str) -> bool:
+        collection = f'wis2-notification-messages-{channel}'
         LOGGER.debug(f'Querying Replay API for id {identifier}')
         try:
-            _ = self.conn.collection_item(self.collection, identifier)
+            _ = self.conn.collection_item(collection, identifier)
             return True
         except RuntimeError:
             return False
diff --git a/wis2-grep-management/wis2_grep/loader.py b/wis2-grep-management/wis2_grep/loader.py
index 1032bcd..3c7aff1 100644
--- a/wis2-grep-management/wis2_grep/loader.py
+++ b/wis2-grep-management/wis2_grep/loader.py
@@ -42,6 +42,7 @@ def __init__(self):
         :returns: `wis2_grep.loader.Loader`
         """
 
+        self.channel = None
         self.backend = BACKENDS[BACKEND_TYPE](
                     {'connection': BACKEND_CONNECTION})
 
@@ -55,6 +56,11 @@ def load(self, message: Union[dict, str], topic: str = None) -> None:
         :returns: `None`
         """
 
+        if topic.startswith('origin'):
+            self.channel = 'origin'
+        elif topic.startswith('cache'):
+            self.channel = 'cache'
+
         if isinstance(message, dict):
             LOGGER.debug('Notification message is already a dict')
             self.message = message
@@ -83,7 +89,7 @@ def _publish(self):
         """
 
         LOGGER.info(f'Saving to {BACKEND_TYPE} ({BACKEND_CONNECTION})')
-        self.backend.save(self.message)
+        self.backend.save(self.channel, self.message)
 
     def __repr__(self):
         return '<Loader>'
diff --git a/wis2-grep.env b/wis2-grep.env
index 54fe992..d6edd4e 100644
--- a/wis2-grep.env
+++ b/wis2-grep.env
@@ -2,7 +2,7 @@ export WIS2_GREP_LOGGING_LEVEL=DEBUG
 export WIS2_GREP_API_URL=http://localhost
 export WIS2_GREP_API_URL_DOCKER=http://wis2-grep-api
 export WIS2_GREP_BACKEND_TYPE=Elasticsearch
-export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/wis2-notification-messages
+export WIS2_GREP_BACKEND_CONNECTION=http://wis2-grep-backend:9200/
 export WIS2_GREP_CENTRE_ID=ca-eccc-msc-global-replay
 #export WIS2_GREP_GB=mqtts://everyone:everyone@wis2globalbroker.nws.noaa.gov:8883
 export WIS2_GREP_GB=mqtts://everyone:everyone@globalbroker.meteo.fr:8883