-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: granule links via event subscription #44
Conversation
11de9cf
to
d781b5a
Compare
42fc344
to
6da8e77
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note - migrate without changes from original test_link_fetcher_handler.py
lambdas/link_fetcher/app/common.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note - migrate without changes from original handler.py ~> search_handler.py
code. These functions are useful for the search or event based link fetchers because they include functionality for,
SearchResult
model- parsing tile id, getting acceptable tile id list, and filtering
SearchResult
- publishing SQS message as part of DB transaction to prevent message if there's some sort of DB integrity error (e.g., dupes)
If can we fully adopt the event based approach it'd be possible to store the checksum, which I don't think is available from the search query? We might be able to avoid a call to the OData API in the downloader. The event subscription looks like a part of the OData services so there's a relatively high amount of metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is modified from the example ESA linked so I've included attribution in the header. The code example covered getting an auth token and basic create/list/delete subscriptions operations. Happy to change anything about this
I figured we could use this to create the subscription manually for now, but it's not strictly needed (not used in CI or deployment). The subscriptions API can also update existing subscriptions (i.e., to rotate credentials, to pause/resume, or to point to another API endpoint) but I haven't added that yet without a use in mind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ceholden, fantastic work on this PR!
I didn't find anything to block approval. I just found minor things, and had some questions for clarification, and some "food for thought."
Thanks @chuckwondo! I'll get to your feedback later today or first thing tomorrow but it looks like great input, really appreciate your close review! |
Co-authored-by: Chuck Daniels <[email protected]> Co-authored-by: Ciaran Sweet <[email protected]>
4fdb6f4
to
88f909e
Compare
Hey @chuckwondo, thanks for your reviews! I think I addressed your feedback and this PR is now up to date with I won't turn on the pushes to this endpoint until after some more acceptance testing in another stage deployment that writes to a non-production bucket, but it'd be nice to get it in the main Git branch in case there are other changes that come up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, as always. Just a few "food for thought" comments you can choose to ignore. However, if you do use my comments regarding freeze_time, and they work, you should then be able to drop the freezegun dependency.
# we can't inject an older "utc_now" into `process_notification` so we have to | ||
# patch `datetime.now()` with freezegun | ||
with freeze_time(event_s2_created["value"]["PublicationDate"]): | ||
resp = test_client.post( | ||
"/events", | ||
json=event_s2_created, | ||
auth=( | ||
self.endpoint_config.notification_username, | ||
self.endpoint_config.notification_password, | ||
), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I believe we can inject an older "utc_now" by doing the following, along with the suggestion I made for subscription_endpoint.py
.
However, feel free to skip this (along with the corresponding suggestion on subscription_endpoint.py
), although I'm curious to see if this would work.
# we can't inject an older "utc_now" into `process_notification` so we have to | |
# patch `datetime.now()` with freezegun | |
with freeze_time(event_s2_created["value"]["PublicationDate"]): | |
resp = test_client.post( | |
"/events", | |
json=event_s2_created, | |
auth=( | |
self.endpoint_config.notification_username, | |
self.endpoint_config.notification_password, | |
), | |
) | |
setattr( | |
test_client.app, | |
"now_utc", | |
lambda: datetime.fromisoformat( | |
event_s2_created["value"]["PublicationDate"] | |
), | |
) | |
resp = test_client.post( | |
"/events", | |
json=event_s2_created, | |
auth=( | |
self.endpoint_config.notification_username, | |
self.endpoint_config.notification_password, | |
), | |
) |
process_notification( | ||
notification=notification, | ||
accepted_tile_ids=accepted_tile_ids, | ||
session_maker=session_maker, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
process_notification( | |
notification=notification, | |
accepted_tile_ids=accepted_tile_ids, | |
session_maker=session_maker, | |
) | |
process_notification( | |
notification=notification, | |
accepted_tile_ids=accepted_tile_ids, | |
session_maker=session_maker, | |
now_utc=getattr( | |
request.app, "now_utc", lambda: datetime.now(tz=timezone.utc) | |
), | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice idea! Appreciate you pushing for this sort of thing. I ended up doing this a little differently, but still was able to inject the "what time is it?" dependency
Instead of storing the lookup function on the FastAPI application I added now_utc
as a parameter to the build_app
function. To me this felt like a little bit more direct than setting/getting an attribute off of the .app
itself, and I think it mirrors how the config
is injected into the FastAPI app builder
I could not remove freezegun
as a dependency because a handful of tests related to the scheduled search-based link fetcher code still use it. This was the commit with the changes, 23ed64f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
Co-authored-by: Chuck Daniels <[email protected]>
Co-authored-by: Chuck Daniels <[email protected]>
84b9eec
to
0d8ca93
Compare
0d8ca93
to
23ed64f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! I like the adjustment you made to build_app
.
What I am changing
This PR adds an API endpoint to handle ESA's "push" subscription notification for new data. This endpoint is intended to handle new granule "created" event notifications for Sentinel-2. This PR also adds a CLI script to list, create, and terminate subscriptions. This script defines the filters that we wish to apply for new granules, which at the present is limited to filtering to only Sentinel-2 imagery.
For more background on ESA's subscription system, the design of this handler, and preliminary results, see this ticket, https://github.com/NASA-IMPACT/hls_development/issues/300
This subscription handling endpoint is intended to be run alongside the existing scheduled "link fetcher" that runs once a day. Once we have more evidence that this is working as expected we can remove the old "link fetcher" and associated machinery (date generator Lambda, StepFunction, and the
GranuleCount
tracking table).How I did it
At a high level one of the implementation choices was "where should this live?" especially given that both the event and polling based link fetching was to live side-by-side for a time. In other words they will share some code related to the message payload, state tracking, and sending messages to the queue. I had thought of two possible alternatives,
lambdas/
and either,lambdas/link_fetcher
,common.py
Either way we still have 2x sets of Lambda functions, log groups, and alerts! I picked option 2 because sharing the same directory didn't cause any blockers (i.e., Lambda zip getting too large), was relatively less complex than migrating to another directory, avoided setting up yet another workspace in CI and Makefiles, and eventually we'll be deleting the old link fetcher so this shared living arrangement will only be temporary.
With that high level decision made, there are a few other steps I want to call out for review,
hls-s2-downloader-serverless/{stage}/esa-subscription-credentials
secretin AWS SecretsManager and populated it with the user/password that I used when creating the 'push' subscription on ESA's side.
manage_subscription.py
script to create the "push" subscription to our endpoint.05.10
software (see announcement, https://dataspace.copernicus.eu/news/2024-9-2-sentinel-2-collection-1-products-availability).How you can test it
There's a few ways to test this,
make unit-tests
from root, ormake test
from thelambdas/link_fetcher
moduleIDENTIIFIER=event-subs
for the past ~2 weeks. I've purged the queue a few times as I've made changes, for example to ignore reprocessed granules that were acquired in ~2022. Still though this has been useful to check in bulk how many granule messages we're getting. Below is a screenshot of the queue depth for the last 2 days that shows we're getting approximately the ~5-7k messages per day that we usually get from the schedule based link fetcher.