diff --git a/backend/importer/loop.py b/backend/importer/loop.py index ffe14ad6..af061445 100644 --- a/backend/importer/loop.py +++ b/backend/importer/loop.py @@ -31,41 +31,49 @@ def __init__(self, queue_name: str, region: str = "us-east-1"): self.loader_map: dict[str, Loader] = {} def run(self): + self.logger.info("Starting import loop") while True: - resp = self.sqs_client.receive_message( - QueueUrl=self.sqs_queue_url, - # retrieve one message at a time - we could up this - # and parallelize but no point until way more files. - MaxNumberOfMessages=1, - # 10 minutes to process message before it becomes - # visible for another consumer. - VisibilityTimeout=600, - ) - # if no messages found, wait 5m for next poll - if len(resp["Messages"]) == 0: - sleep(600) - continue + try: + self.logger.info("Polling for scraper event messages") + resp = self.sqs_client.receive_message( + QueueUrl=self.sqs_queue_url, + # retrieve one message at a time - we could up this + # and parallelize but no point until way more files. + MaxNumberOfMessages=1, + # 10 minutes to process message before it becomes + # visible for another consumer. + VisibilityTimeout=600, + ) + # if no messages found, wait 5m for next poll + if len(resp["Messages"]) == 0: + sleep(600) + continue - for message in resp["Messages"]: - sqs_body = ujson.loads(message["Body"]) - # this comes through as a list, but we expect one object - for record in sqs_body["Records"]: - bucket_name = record["s3"]["bucket"]["name"] - key = record["s3"]["object"]["key"] - with BytesIO() as fileobj: - self.s3_client.download_fileobj( - bucket_name, key, fileobj) - fileobj.seek(0) - content = fileobj.read() - _ = content # for linting. + for message in resp["Messages"]: + sqs_body = ujson.loads(message["Body"]) + # this comes through as a list, but we expect one object + for record in sqs_body["Records"]: + bucket_name = record["s3"]["bucket"]["name"] + key = record["s3"]["object"]["key"] + with BytesIO() as fileobj: + self.s3_client.download_fileobj( + bucket_name, key, fileobj) + fileobj.seek(0) + content = fileobj.read() + _ = content # for linting. - # TODO: we now have an in-memory copy of s3 file content - # This is where we would run the importer. - # we want a standardized importer class; use like: - # loader = self.get_loader_for_content_type(key) - # loader(content).load() + # TODO: we now have an in-memory copy of s3 file content + # This is where we would run the importer. + # we want a standardized importer class; use like: + # loader = self.get_loader_for_content_type(key) + # loader(content).load() - self.logger.info(f"Imported s3://{bucket_name}/{key}") + self.logger.info(f"Imported s3://{bucket_name}/{key}") + except Exception as e: + self.logger.error( + f"Failed to process scraper events sqs queue: {e}") + sleep(600) + pass def get_loader_for_content_type(self, s3_key: str) -> Loader: # s3 keys should be of format /subject/source/time.jsonl