Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][AWS] | Fix InvalidToken Exceptions Due to Improper Token Refresh Calls #1190

Open
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

mk-armah
Copy link
Member

@mk-armah mk-armah commented Nov 28, 2024

Description

What:

We've encountered timeouts and InvalidToken exceptions despite implementing a time-based token refresh strategy designed to refresh the token when it's 80% close to expiry, this occurs arbitrarily.

Why:

The token refresh may not be occurring at the appropriate points in the application.
It appears that the refresh is triggered while an active session is still utilizing the old token.
When the token is replaced during an ongoing session, the active session becomes obsolete, leading to InvalidToken exceptions.

The session refresh call is placed incorrectly within the application flow.
Active sessions are not updated with the new token after a refresh, causing them to fail when attempting to access AWS resources.

How (Proposed Solution):

Review and adjust the placement of the token refresh logic to ensure it does not interfere with active sessions.
Implement a check to verify if a session's token is valid or has a significant amount of time remaining before each resource (kind) retrieval operation.
Ensure that new tokens are only used for new sessions, or gracefully handle the transition for active sessions.

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • Integration able to create all default resources from scratch
  • Resync finishes successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Scheduled resync able to abort existing resync and start a new one
  • Tested with at least 2 integrations from scratch
  • Tested with Kafka and Polling event listeners
  • Tested deletion of entities that don't pass the selector

Integration testing checklist

  • Integration able to create all default resources from scratch
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Resync finishes successfully
  • If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • Docs PR link here

Preflight checklist

  • Handled rate limiting
  • Handled pagination
  • Implemented the code in async
  • Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.

@mk-armah mk-armah requested a review from a team as a code owner November 28, 2024 09:50
@shalev007
Copy link
Contributor

Can you explain how will this solve the expired tokens issue?
I only saw you added cache layer to the tokens

Comment on lines +92 to +94
logger.info(
f"Handling global resource {kind} for account {credentials.account_id}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

)
async with session.client("sts") as sts_client:
initial_credentials = await self._refresh_credentials(sts_client)
refresh_credentials = partial(self._refresh_credentials, sts_client)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is using the same sts_client here thread safe?
I mean since we are running in parallel, can sharing the sts_client result in unexpected errors?

@@ -121,6 +123,10 @@ async def _get_organization_session(self) -> aioboto3.Session | None:
def _get_account_read_role_name(self) -> str:
return ocean.integration_config.get("account_read_role_name", "")

@staticmethod
def _assume_role_duration_seconds() -> int:
return int(ocean.integration_config.get("assume_role_duration", 900))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this default to ASSUME_ROLE_DURATION_SECONDS?

Comment on lines 80 to 97
async with session.client("sts") as sts_client:
initial_credentials = await self._refresh_credentials(sts_client)
refresh_credentials = partial(self._refresh_credentials, sts_client)
refreshable_credentials = (
AioRefreshableCredentials.create_from_metadata(
metadata=initial_credentials,
refresh_using=refresh_credentials,
method="sts-assume-role",
)
)
botocore_session = get_session()
setattr(botocore_session, "_credentials", refreshable_credentials)
if region:
botocore_session.set_config_variable("region", region)
autorefresh_session = aioboto3.Session(
botocore_session=botocore_session
)
return autorefresh_session
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty complex part of the code what do you think about adding maybe some comments about why this code exists and what is happening baisically?

)

async def create_session_for_each_region(
async def create_refreshable_session_for_each_region(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why change the name pf this function, from an API standpoint the person using this function it will not matter if it's refreshable or not, WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

Copy link
Contributor

@shalev007 shalev007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants