diff --git a/backend/api.py b/backend/api.py index fa0ef971..bf8ef958 100644 --- a/backend/api.py +++ b/backend/api.py @@ -16,6 +16,7 @@ from backend.routes.auth import bp as auth_bp from backend.routes.healthcheck import bp as healthcheck_bp from backend.utils import dev_only +from backend.importer.loop import Importer def create_app(config: Optional[str] = None): @@ -34,7 +35,8 @@ def create_app(config: Optional[str] = None): # db.create_all() # start background processor for SQS imports - + importer = Importer(queue_name=config_obj.SCRAPER_SQS_QUEUE_NAME) + importer.start() return app diff --git a/backend/config.py b/backend/config.py index 294f8602..e2205470 100644 --- a/backend/config.py +++ b/backend/config.py @@ -65,6 +65,8 @@ class Config(object): "http://localhost:" + FRONTEND_PORT ) + SCRAPER_SQS_QUEUE_NAME = os.environ.get("SCRAPER_SQS_QUEUE_NAME") + @property def SQLALCHEMY_DATABASE_URI(self): return "postgresql://%s:%s@%s:%s/%s" % ( diff --git a/backend/database/core.py b/backend/database/core.py index 3a49e4c5..7304c1df 100644 --- a/backend/database/core.py +++ b/backend/database/core.py @@ -1,7 +1,7 @@ """This file defines the database connection, plus some terminal commands for setting up and tearing down the database. -Do not import anything directly from `backend.database._core`. Instead, import +Do not importer anything directly from `backend.database._core`. Instead, importer from `backend.database`. """ import os diff --git a/backend/import/__init__.py b/backend/importer/__init__.py similarity index 100% rename from backend/import/__init__.py rename to backend/importer/__init__.py diff --git a/backend/import/loaders/__init__.py b/backend/importer/loaders/__init__.py similarity index 100% rename from backend/import/loaders/__init__.py rename to backend/importer/loaders/__init__.py diff --git a/backend/import/loop.py b/backend/importer/loop.py similarity index 72% rename from backend/import/loop.py rename to backend/importer/loop.py index 68b73cd8..f5f73b2d 100644 --- a/backend/import/loop.py +++ b/backend/importer/loop.py @@ -1,12 +1,23 @@ from io import BytesIO from logging import getLogger +from threading import Thread from time import sleep import boto3 import ujson -class Importer: + +class Loader: + def __init__(self, content: bytes): + self.content = content + + def load(self): + raise Exception("unimplemented; extend this class to write a load migration.") + + +class Importer(Thread): def __init__(self, queue_name: str, region: str = "us-east-1"): + super().__init__(daemon=True) # TODO: ideally we would have a function on the app to catch shutdown events and close gracefully, but until then daemon it is. self.queue_name = queue_name self.session = boto3.Session(region_name=region) self.sqs_client = self.session.client("sqs") @@ -14,6 +25,10 @@ def __init__(self, queue_name: str, region: str = "us-east-1"): self.sqs_queue_url = self.sqs_client.get_queue_url(QueueName=self.queue_name) self.logger = getLogger(self.__class__.__name__) + self.loader_map: dict[str, Loader] = { + # this should be a mapping of s3 key prefix : loader class for that file type + } + def run(self): while True: resp = self.sqs_client.receive_message( @@ -36,15 +51,14 @@ def run(self): fileobj.seek(0) content = fileobj.read() - # TODO: we now have an in-memory copy of the s3 file content. This is where we would run the import. + # TODO: we now have an in-memory copy of the s3 file content. This is where we would run the importer. # we want a standardized importer class; we would call something like below: - # loader = Loader(content).load() + # loader = self.get_loader_for_content_type(key) + # loader(content).load() self.logger.info(f"Imported s3://{bucket_name}/{key}") -class Loader: - def __init__(self, content: bytes): - self.content = content - - def load(self): - raise Exception("unimplemented; extend this class to write a load migration.") + def get_loader_for_content_type(self, s3_key: str) -> Loader: + # s3 keys should be of format /subject/source/time.jsonl + prefix = "/".join(s3_key.split("/")[:-1]) + return self.loader_map[prefix] diff --git a/frontend/babel.config.js b/frontend/babel.config.js index d3a6c601..772bf826 100644 --- a/frontend/babel.config.js +++ b/frontend/babel.config.js @@ -3,7 +3,7 @@ module.exports = { plugins: ["inline-react-svg"], env: { test: { - plugins: ["transform-dynamic-import"] + plugins: ["transform-dynamic-importer"] } } } diff --git a/frontend/helpers/api/mocks/browser.ts b/frontend/helpers/api/mocks/browser.ts index 5e73c934..ca6ace3e 100644 --- a/frontend/helpers/api/mocks/browser.ts +++ b/frontend/helpers/api/mocks/browser.ts @@ -3,7 +3,7 @@ import { handlers, rejectUnhandledApiRequests } from "./handlers" export const worker = setupWorker(...handlers) -/** Starts worker, convenience for conditional import */ +/** Starts worker, convenience for conditional importer */ export const startWorker = () => { worker.start({ onUnhandledRequest: rejectUnhandledApiRequests diff --git a/frontend/tests/helpers/api.e2e.test.ts b/frontend/tests/helpers/api.e2e.test.ts index 61ee78d4..8c38238f 100644 --- a/frontend/tests/helpers/api.e2e.test.ts +++ b/frontend/tests/helpers/api.e2e.test.ts @@ -3,5 +3,5 @@ import { server } from "../test-utils" /** Turn off API mocking for the test so we use the real API */ beforeAll(() => server.close()) -/** Re-import the main test file to pick up the tests */ +/** Re-importer the main test file to pick up the tests */ require("./api.test") diff --git a/requirements/docs.txt b/requirements/docs.txt index cdaf7f87..88aef5fb 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -67,7 +67,7 @@ pymdown-extensions==10.8.1 # mkdocs-material python-dateutil==2.9.0.post0 # via - # ghp-import + # ghp-importer # mkdocs-macros-plugin pyyaml==6.0.1 # via