Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: granule links via event subscription #44

Merged
merged 43 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
871bdc8
WIP
ceholden Oct 30, 2024
9bdbe8d
Deploy event subscription push handler
ceholden Nov 12, 2024
3b7dfd2
Remove scihub credentials permission from downloader
ceholden Nov 12, 2024
1395f89
Add logging for development
ceholden Nov 12, 2024
456d63e
WIP
ceholden Nov 12, 2024
7087bc7
s/IDENTIFIER/STAGE/g
ceholden Nov 15, 2024
3bd446f
Exclude granules acquired more than 30 days ago
ceholden Nov 15, 2024
fa6c53f
Setup logger for 'app' namespace
ceholden Nov 15, 2024
7c6cb6d
Add tests
ceholden Nov 18, 2024
dfaff5b
Add copyright info for example code from ESA
ceholden Nov 18, 2024
ff0a685
Remove manual test code
ceholden Nov 18, 2024
2cd8301
Update link_fetcher REAMDE with subscription handler details
ceholden Nov 18, 2024
b943300
Add missing diagram
ceholden Nov 18, 2024
8bcf664
fix lint
ceholden Nov 18, 2024
4382b2c
Oops
ceholden Nov 19, 2024
f215d0f
Point to ESA's documentation for push subscriptions
ceholden Nov 19, 2024
03e7113
Add user/pass credentials for API handler to integration stack
ceholden Nov 19, 2024
46e2bc4
Fix integration test name/password keys in secret
ceholden Nov 19, 2024
3daa8fd
Try to fix endpoint url in integration test
ceholden Nov 19, 2024
19dc21f
debug & add polling for sqs message length
ceholden Nov 19, 2024
0316886
Cleanup print statements
ceholden Nov 19, 2024
fa7551f
Add CLI to subscription manager script
ceholden Nov 19, 2024
43cfcab
Load acceptable_tile_ids on startup
ceholden Nov 19, 2024
a3effef
Test process_notification
ceholden Nov 19, 2024
77f03a0
lint
ceholden Nov 19, 2024
d9c09ad
Split tests for common.py code (DB/SQS)
ceholden Nov 19, 2024
6ddcb32
comments
ceholden Nov 19, 2024
b716f17
lint
ceholden Nov 19, 2024
7a7de5a
lint
ceholden Nov 19, 2024
08be0db
Apply suggestions from code review
ceholden Nov 21, 2024
51b3059
reduce monkeying around
ceholden Nov 21, 2024
05a7a71
lint
ceholden Nov 21, 2024
ea30140
Rename string param & update to APIGateway v2
ceholden Dec 6, 2024
c718a27
lint
ceholden Dec 6, 2024
88f909e
Update how we get subscription handler endpoint in integration test
ceholden Dec 6, 2024
f63edf1
fix bad merge
ceholden Dec 12, 2024
36c9bad
Use psycopg2-binary instead of psycopg2 for dev deps
ceholden Dec 16, 2024
3fdc7c3
Remove deprecated 'version' from docker-compose.yml
ceholden Dec 16, 2024
0b375d7
Fix type hint for fixture that yields
ceholden Dec 16, 2024
6ca8545
Update lambdas/link_fetcher/tests/test_subscription_endpoint.py
ceholden Dec 16, 2024
3068724
Use urljoin to join urls
ceholden Dec 16, 2024
94bf593
Fix import
ceholden Dec 16, 2024
23ed64f
Inject now_utc() into app builder to reduce monkeypatching in tests
ceholden Dec 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ $ npm install # This installs any node packages that are within package.json (CD
$ make install # This calls `pipenv install --dev` on the repo root and any of the directories that contain a Makefile with `install`
```

_**Note** you might have an issue installing `psycopg2` - I found [this](https://github.com/pypa/pipenv/issues/3991#issuecomment-564645309) helpful_

A file named `.env` is expected in the root of the repository, the expected values are:

```bash
Expand Down
2 changes: 1 addition & 1 deletion alembic_migration/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ sqlalchemy = "==1.4.0"
db = {editable = true, path = "./../layers/db"}
pytest = "==7.4.3"
moto = "==5.0.17"
psycopg2 = "==2.9.10"
psycopg2-binary = "==2.9.10"
pytest-docker = "==2.0.1"
assertpy = "==1.1"
pytest-cov = "==4.1.0"
Expand Down
334 changes: 196 additions & 138 deletions alembic_migration/Pipfile.lock

Large diffs are not rendered by default.

77 changes: 66 additions & 11 deletions cdk/downloader_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
Duration,
RemovalPolicy,
Stack,
aws_apigatewayv2,
aws_apigatewayv2_integrations,
aws_cloudwatch,
aws_ec2,
aws_events,
aws_events_targets,
aws_iam,
aws_lambda,
aws_s3,
)
from aws_cdk import aws_lambda_python_alpha as aws_lambda_python
from aws_cdk import aws_logs, aws_rds, aws_secretsmanager, aws_sqs, aws_ssm
from aws_cdk import aws_logs, aws_rds, aws_s3, aws_secretsmanager, aws_sqs, aws_ssm
from aws_cdk import aws_stepfunctions as sfn
from aws_cdk import aws_stepfunctions_tasks as tasks
from constructs import Construct
Expand Down Expand Up @@ -255,7 +256,7 @@ def __init__(
self,
id=f"{identifier}-link-fetcher",
entry="lambdas/link_fetcher",
index="handler.py",
index="app/search_handler.py",
handler="handler",
layers=[
db_layer,
Expand Down Expand Up @@ -286,6 +287,58 @@ def __init__(
threshold=1,
)

link_subscription = aws_lambda_python.PythonFunction(
self,
id=f"{identifier}-link-subscription",
entry="lambdas/link_fetcher",
index="app/subscription_handler.py",
handler="handler",
layers=[
db_layer,
],
memory_size=200,
timeout=Duration.minutes(15),
runtime=aws_lambda.Runtime.PYTHON_3_11,
chuckwondo marked this conversation as resolved.
Show resolved Hide resolved
environment=link_fetcher_environment_vars,
)

aws_logs.LogGroup(
self,
id=f"{identifier}-link-subscription-log-group",
log_group_name=f"/aws/lambda/{link_subscription.function_name}",
removal_policy=RemovalPolicy.DESTROY
if removal_policy_destroy
else RemovalPolicy.RETAIN,
retention=aws_logs.RetentionDays.ONE_DAY
if removal_policy_destroy
else aws_logs.RetentionDays.TWO_WEEKS,
)

aws_cloudwatch.Alarm(
self,
id=f"{identifier}-link-subscription-errors-alarm",
metric=link_fetcher.metric_errors(),
evaluation_periods=3,
threshold=1,
)

forwarder_api = aws_apigatewayv2.HttpApi(
self,
"EsaPushSubscriptionHandlerApi",
api_name="EsaPushSubscriptionHandlerApi",
default_integration=aws_apigatewayv2_integrations.HttpLambdaIntegration(
"EsaPushSubscriptionHandlerApi-Integration",
handler=link_subscription,
),
)

aws_ssm.StringParameter(
self,
id=f"{identifier}-link-subscription-endpoint-url",
string_value=forwarder_api.url,
parameter_name=f"/hls-s2-downloader-serverless/{identifier}/link_subscription_endpoint_url",
)

downloader_environment_vars = {
"STAGE": identifier,
"DB_CONNECTION_SECRET_ARN": downloader_rds_secret.secret_arn,
Expand Down Expand Up @@ -338,7 +391,7 @@ def __init__(
self,
id=f"{identifier}-downloader-role-arn",
string_value=self.downloader.role.role_arn,
parameter_name=(f"/integration_tests/{identifier}/downloader_role_arn"),
parameter_name=f"/integration_tests/{identifier}/downloader_role_arn",
)

self.downloader.role.add_managed_policy(lambda_insights_policy)
Expand All @@ -351,15 +404,9 @@ def __init__(
downloader_bucket.grant_write(self.downloader)

downloader_rds_secret.grant_read(link_fetcher)
downloader_rds_secret.grant_read(link_subscription)
downloader_rds_secret.grant_read(self.downloader)

scihub_credentials = aws_secretsmanager.Secret.from_secret_name_v2(
self,
id=f"{identifier}-scihub-credentials",
secret_name=f"hls-s2-downloader-serverless/{identifier}/scihub-credentials",
)
scihub_credentials.grant_read(self.downloader)

copernicus_credentials = aws_secretsmanager.Secret.from_secret_name_v2(
self,
id=f"{identifier}-copernicus-credentials",
Expand All @@ -368,9 +415,17 @@ def __init__(
copernicus_credentials.grant_read(self.downloader)
copernicus_credentials.grant_read(self.token_rotator)

esa_subscription_credentials = aws_secretsmanager.Secret.from_secret_name_v2(
self,
id=f"{identifier}-esa-subscription-credentials",
secret_name=f"hls-s2-downloader-serverless/{identifier}/esa-subscription-credentials",
)
esa_subscription_credentials.grant_read(link_subscription)

token_parameter.grant_read(self.downloader)

to_download_queue.grant_send_messages(link_fetcher)
to_download_queue.grant_send_messages(link_subscription)
to_download_queue.grant_consume_messages(self.downloader)

# We must resort to using CfnEventSourceMapping to set the maximum concurrency
Expand Down
18 changes: 9 additions & 9 deletions cdk/integration_stack.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
from typing import Optional

from aws_cdk import Duration, RemovalPolicy, Stack, aws_apigateway, aws_lambda
from aws_cdk import Duration, RemovalPolicy, Stack, aws_apigateway, aws_iam, aws_lambda
from aws_cdk import aws_lambda_python_alpha as aws_lambda_python
from aws_cdk import aws_iam, aws_logs, aws_s3, aws_secretsmanager, aws_ssm
from aws_cdk import aws_logs, aws_s3, aws_secretsmanager, aws_ssm
from constructs import Construct


Expand All @@ -27,16 +27,16 @@ def __init__(
)
)

# TODO remove this, along with other references to it, but leaving for
# now, just in case removing it would break the downloader lambda
aws_secretsmanager.Secret(
self,
id=f"{identifier}-integration-scihub-credentials",
secret_name=f"hls-s2-downloader-serverless/{identifier}/scihub-credentials",
description="Dummy values for the Mock SciHub API credentials",
id=f"{identifier}-integration-esa-subscription-credentials",
secret_name=f"hls-s2-downloader-serverless/{identifier}/esa-subscription-credentials",
description="Dummy values for the ESA 'push' subscription authentication",
generate_secret_string=aws_secretsmanager.SecretStringGenerator(
secret_string_template=json.dumps({"username": "test-user"}),
generate_string_key="password",
secret_string_template=json.dumps(
{"notification_username": "test-user"}
),
generate_string_key="notification_password",
),
)

Expand Down
Binary file added images/hls-s2-downloader-link-subscription.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 9 additions & 0 deletions integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ def db_session(monkeypatch, ssm_parameter: Callable[[str], str]) -> Iterable[Ses
session.commit()


@pytest.fixture
def link_subscription_endpoint_url(ssm_client: SSMClient, identifier: str):
qname = f"/hls-s2-downloader-serverless/{identifier}/link_subscription_endpoint_url"
result = ssm_client.get_parameter(Name=qname)
value = result["Parameter"].get("Value")
assert value is not None, f"No such SSM parameter: {qname}"
return value


@pytest.fixture
def step_function_arn(ssm_parameter: Callable[[str], str]):
return ssm_parameter("link_fetcher_step_function_arn")
Expand Down
149 changes: 149 additions & 0 deletions integration_tests/test_link_push_subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import datetime as dt
import json
from pathlib import Path
from typing import Callable
from uuid import uuid4

import boto3
import polling2
import pytest
import requests
from db.models.granule import Granule
from mypy_boto3_sqs import SQSClient
from sqlalchemy.orm import Session


def check_sqs_message_count(sqs_client, queue_url, count):
queue_attributes = sqs_client.get_queue_attributes(
QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]
)
return int(queue_attributes["Attributes"]["ApproximateNumberOfMessages"]) == count


def _format_dt(datetime: dt.datetime) -> str:
"""Format datetime into string used by ESA's payload"""
return datetime.isoformat().replace("+00:00", "Z")


@pytest.fixture
def recent_event_s2_created() -> dict:
"""Create a recent Sentinel-2 "Created" event from ESA's push subscription

This message contains two types of fields,
* Message metadata (event type, subscription ID, ack ID, notification date, etc)
* Message "body" - `(.value)`
"""
# Reusing example from ESA as a template
data = (
Path(__file__).parents[1]
/ "lambdas"
/ "link_fetcher"
/ "tests"
/ "data"
/ "push-granule-created-s2-n1.json"
)
payload = json.loads(data.read_text())

# Update relevant parts of message payload to be "recent"
# where recent is <30 days from today as we're not currently
# reprocessing historical scenes that ESA has reprocessed
now = dt.datetime.now(tz=dt.timezone.utc)

payload["NotificationDate"] = _format_dt(now)
payload["value"]["OriginDate"] = _format_dt(now - dt.timedelta(seconds=7))
payload["value"]["PublicationDate"] = _format_dt(now - dt.timedelta(seconds=37))
payload["value"]["ModificationDate"] = _format_dt(now - dt.timedelta(seconds=1))
payload["value"]["ContentDate"] = {
"Start": _format_dt(now - dt.timedelta(hours=3, seconds=3)),
"End": _format_dt(now - dt.timedelta(hours=3)),
}
# We're not using fields in `payload["value"]["Attributes"]` but there's duplicate
# datetime information in there following OData conventions

# Randomize ID of message to ensure each fixture's return is unique according
# to our DB (which uses granule ID as primary key)
payload["value"]["Id"] = str(uuid4())

return payload


@pytest.fixture
def link_subscription_credentials(
identifier: str, ssm_parameter: Callable[[str], str]
) -> tuple[str, str]:
"""Return user/pass credentials for subscription endpoint"""
secrets_manager_client = boto3.client("secretsmanager")
secret = json.loads(
secrets_manager_client.get_secret_value(
SecretId=(
f"hls-s2-downloader-serverless/{identifier}/esa-subscription-credentials"
)
)["SecretString"]
)

return (
secret["notification_username"],
secret["notification_password"],
)


@pytest.mark.parametrize("notification_count", [1, 2])
def test_link_push_subscription_handles_event(
recent_event_s2_created: dict,
link_subscription_endpoint_url: str,
link_subscription_credentials: tuple[str, str],
db_session: Session,
sqs_client: SQSClient,
queue_url: str,
notification_count: int,
):
"""Test that we handle a new granule created notification

We have occasionally observed duplicate granule IDs being
sent to our API endpoint and we want to only process one,
so this test includes a parametrized "notification_count"
to replicate this reality.
chuckwondo marked this conversation as resolved.
Show resolved Hide resolved
"""
for _ in range(notification_count):
resp = requests.post(
f"{link_subscription_endpoint_url}events",
auth=link_subscription_credentials,
json=recent_event_s2_created,
)

# ensure correct response (204)
assert resp.status_code == 204

# ensure we have SQS message
polling2.poll(
check_sqs_message_count,
args=(sqs_client, queue_url, 1),
step=5,
timeout=120,
)

# ensure we have 1 granule for this ID
granules = (
db_session.query(Granule).filter(
Granule.id == recent_event_s2_created["value"]["Id"]
)
).all()
assert len(granules) == 1


def test_link_push_subscription_user_auth_rejects_incorrect(
link_subscription_endpoint_url: str,
):
"""Test that we reject incorrect authentication"""
url = f"{link_subscription_endpoint_url}events"
resp = requests.post(
url,
auth=(
"foo",
"bar",
),
json={},
)

# ensure correct response (401 Unauthorized)
assert resp.status_code == 401
2 changes: 1 addition & 1 deletion lambdas/downloader/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ db = {editable = true, path = "./../../layers/db"}
pytest-docker = "==2.0.1"
alembic = "==1.12.1"
moto = "==5.0.17"
psycopg2 = "==2.9.10"
psycopg2-binary = "==2.9.10"
assertpy = "==1.1"
responses = "==0.23.1"
freezegun = "==1.0.0"
Expand Down
Loading