Skip to content

Commit

Permalink
#382 import loop
Browse files Browse the repository at this point in the history
  • Loading branch information
zganger committed Jun 26, 2024
1 parent 6f7e084 commit 5f26647
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 15 deletions.
4 changes: 3 additions & 1 deletion backend/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from backend.routes.auth import bp as auth_bp
from backend.routes.healthcheck import bp as healthcheck_bp
from backend.utils import dev_only
from backend.importer.loop import Importer


def create_app(config: Optional[str] = None):
Expand All @@ -34,7 +35,8 @@ def create_app(config: Optional[str] = None):
# db.create_all()

# start background processor for SQS imports

importer = Importer(queue_name=config_obj.SCRAPER_SQS_QUEUE_NAME)
importer.start()

return app

Expand Down
2 changes: 2 additions & 0 deletions backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class Config(object):
"http://localhost:" + FRONTEND_PORT
)

SCRAPER_SQS_QUEUE_NAME = os.environ.get("SCRAPER_SQS_QUEUE_NAME")

@property
def SQLALCHEMY_DATABASE_URI(self):
return "postgresql://%s:%s@%s:%s/%s" % (
Expand Down
2 changes: 1 addition & 1 deletion backend/database/core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""This file defines the database connection, plus some terminal commands for
setting up and tearing down the database.
Do not import anything directly from `backend.database._core`. Instead, import
Do not importer anything directly from `backend.database._core`. Instead, importer
from `backend.database`.
"""
import os
Expand Down
File renamed without changes.
File renamed without changes.
32 changes: 23 additions & 9 deletions backend/import/loop.py → backend/importer/loop.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
from io import BytesIO
from logging import getLogger
from threading import Thread
from time import sleep

import boto3
import ujson

class Importer:

class Loader:
def __init__(self, content: bytes):
self.content = content

def load(self):
raise Exception("unimplemented; extend this class to write a load migration.")


class Importer(Thread):
def __init__(self, queue_name: str, region: str = "us-east-1"):
super().__init__(daemon=True) # TODO: ideally we would have a function on the app to catch shutdown events and close gracefully, but until then daemon it is.
self.queue_name = queue_name
self.session = boto3.Session(region_name=region)
self.sqs_client = self.session.client("sqs")
self.s3_client = self.session.client("s3")
self.sqs_queue_url = self.sqs_client.get_queue_url(QueueName=self.queue_name)
self.logger = getLogger(self.__class__.__name__)

self.loader_map: dict[str, Loader] = {
# this should be a mapping of s3 key prefix : loader class for that file type
}

def run(self):
while True:
resp = self.sqs_client.receive_message(
Expand All @@ -36,15 +51,14 @@ def run(self):
fileobj.seek(0)
content = fileobj.read()

# TODO: we now have an in-memory copy of the s3 file content. This is where we would run the import.
# TODO: we now have an in-memory copy of the s3 file content. This is where we would run the importer.
# we want a standardized importer class; we would call something like below:
# loader = Loader(content).load()
# loader = self.get_loader_for_content_type(key)
# loader(content).load()

self.logger.info(f"Imported s3://{bucket_name}/{key}")

class Loader:
def __init__(self, content: bytes):
self.content = content

def load(self):
raise Exception("unimplemented; extend this class to write a load migration.")
def get_loader_for_content_type(self, s3_key: str) -> Loader:
# s3 keys should be of format /subject/source/time.jsonl
prefix = "/".join(s3_key.split("/")[:-1])
return self.loader_map[prefix]
2 changes: 1 addition & 1 deletion frontend/babel.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module.exports = {
plugins: ["inline-react-svg"],
env: {
test: {
plugins: ["transform-dynamic-import"]
plugins: ["transform-dynamic-importer"]
}
}
}
2 changes: 1 addition & 1 deletion frontend/helpers/api/mocks/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { handlers, rejectUnhandledApiRequests } from "./handlers"

export const worker = setupWorker(...handlers)

/** Starts worker, convenience for conditional import */
/** Starts worker, convenience for conditional importer */
export const startWorker = () => {
worker.start({
onUnhandledRequest: rejectUnhandledApiRequests
Expand Down
2 changes: 1 addition & 1 deletion frontend/tests/helpers/api.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import { server } from "../test-utils"
/** Turn off API mocking for the test so we use the real API */
beforeAll(() => server.close())

/** Re-import the main test file to pick up the tests */
/** Re-importer the main test file to pick up the tests */
require("./api.test")
2 changes: 1 addition & 1 deletion requirements/docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pymdown-extensions==10.8.1
# mkdocs-material
python-dateutil==2.9.0.post0
# via
# ghp-import
# ghp-importer
# mkdocs-macros-plugin
pyyaml==6.0.1
# via
Expand Down

0 comments on commit 5f26647

Please sign in to comment.