From 77e18ce03f0e5f54acdfaa0ff60a818180b520f9 Mon Sep 17 00:00:00 2001 From: Shalev Avhar <51760613+shalev007@users.noreply.github.com> Date: Mon, 5 Aug 2024 14:32:54 +0300 Subject: [PATCH] [AWS] Bug fix global resources not synced for all accounts (#845) # Description What - Fixed an issue in the AWS integration where S3 buckets were not synced for all accounts. Why - S3 buckets are a global resource type, meaning we only search for the first region showing these buckets and then stop searching to avoid rate limiting with AWS and wasting time. This caused users with multi-account setups to only list their first account's buckets. How - Added an iteration of accounts on top of the existing process to ensure that S3 buckets are synced across all accounts. ## Type of change Please leave one option from the following and delete the rest: - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration. --------- Co-authored-by: Shalev Avhar --- integrations/aws/CHANGELOG.md | 7 ++ integrations/aws/aws/aws_credentials.py | 6 +- integrations/aws/main.py | 49 +++++++-- integrations/aws/pyproject.toml | 2 +- integrations/aws/utils/aws.py | 15 ++- integrations/aws/utils/resources.py | 133 ++++++++++++------------ 6 files changed, 128 insertions(+), 84 deletions(-) diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index f11bc5559f..aa61478dd7 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +# Port_Ocean 0.2.24 (2024-08-05) + +### Improvements + +- Fix global resources not reading through all accounts + + ## 0.2.23 (2024-08-05) diff --git a/integrations/aws/aws/aws_credentials.py b/integrations/aws/aws/aws_credentials.py index 8b80d7172b..fa30d9d0f5 100644 --- a/integrations/aws/aws/aws_credentials.py +++ b/integrations/aws/aws/aws_credentials.py @@ -1,4 +1,4 @@ -from typing import Any, AsyncIterator, Coroutine, Optional +from typing import AsyncIterator, Optional import aioboto3 @@ -50,6 +50,6 @@ async def create_session(self, region: Optional[str] = None) -> aioboto3.Session async def create_session_for_each_region( self, - ) -> AsyncIterator[Coroutine[Any, Any, aioboto3.Session]]: + ) -> AsyncIterator[aioboto3.Session]: for region in self.enabled_regions: - yield self.create_session(region) + yield await self.create_session(region) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 8ee88b9597..d0d7673514 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -6,6 +6,7 @@ from starlette import responses from pydantic import BaseModel +from aws.aws_credentials import AwsCredentials from port_ocean.core.models import Entity from utils.resources import ( @@ -18,6 +19,8 @@ from utils.aws import ( describe_accessible_accounts, + get_accounts, + get_default_region_from_credentials, get_sessions, update_available_access_credentials, validate_request, @@ -33,21 +36,51 @@ ) +async def _handle_global_resource_resync( + kind: str, + credentials: AwsCredentials, +) -> ASYNC_GENERATOR_RESYNC_TYPE: + denied_access_to_default_region = False + default_region = get_default_region_from_credentials(credentials) + default_session = await credentials.create_session(default_region) + try: + async for batch in resync_cloudcontrol(kind, default_session): + yield batch + except Exception as e: + if is_access_denied_exception(e): + denied_access_to_default_region = True + else: + raise e + + if denied_access_to_default_region: + logger.info(f"Trying to resync {kind} in all regions until success") + async for session in credentials.create_session_for_each_region(): + try: + async for batch in resync_cloudcontrol(kind, session): + yield batch + break + except Exception as e: + if not is_access_denied_exception(e): + raise e + + @ocean.on_resync() async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: if kind in iter(ResourceKindsWithSpecialHandling): return await update_available_access_credentials() is_global = is_global_resource(kind) - try: - async for batch in resync_cloudcontrol(kind, is_global): - yield batch - except Exception as e: - if is_access_denied_exception(e): - async for batch in resync_cloudcontrol( - kind, is_global=False, stop_on_first_region=True - ): + async for credentials in get_accounts(): + if is_global: + async for batch in _handle_global_resource_resync(kind, credentials): yield batch + else: + async for session in credentials.create_session_for_each_region(): + try: + async for batch in resync_cloudcontrol(kind, session): + yield batch + except Exception: + continue @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.ACCOUNT) diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index ecef84a796..7acb996ec8 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.23" +version = "0.2.24" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] diff --git a/integrations/aws/utils/aws.py b/integrations/aws/utils/aws.py index 895ecbf8fc..93fa3e5aa9 100644 --- a/integrations/aws/utils/aws.py +++ b/integrations/aws/utils/aws.py @@ -40,6 +40,15 @@ def get_default_region_from_credentials( return credentials.default_regions[0] if credentials.default_regions else None +async def get_accounts() -> AsyncIterator[AwsCredentials]: + """ + Gets the AWS account IDs that the current IAM role can access. + """ + await update_available_access_credentials() + for credentials in _session_manager._aws_credentials: + yield credentials + + async def get_sessions( custom_account_id: Optional[str] = None, custom_region: Optional[str] = None, @@ -59,10 +68,10 @@ async def get_sessions( yield await credentials.create_session(custom_region) else: async for session in credentials.create_session_for_each_region(): - yield await session + yield session return - for credentials in _session_manager._aws_credentials: + async for credentials in get_accounts(): if use_default_region: default_region = get_default_region_from_credentials(credentials) yield await credentials.create_session(default_region) @@ -70,7 +79,7 @@ async def get_sessions( yield await credentials.create_session(custom_region) else: async for session in credentials.create_session_for_each_region(): - yield await session + yield session def validate_request(request: Request) -> tuple[bool, str]: diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index 7878d462a3..d8f408f314 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -163,80 +163,75 @@ async def resync_custom_kind( async def resync_cloudcontrol( - kind: str, is_global: bool = False, stop_on_first_region: bool = False + kind: str, session: aioboto3.Session ) -> ASYNC_GENERATOR_RESYNC_TYPE: use_get_resource_api = typing.cast( AWSResourceConfig, event.resource_config ).selector.use_get_resource_api - found_data = False - async for session in get_sessions(None, None, is_global): - region = session.region_name - logger.info(f"Resyncing {kind} in region {region}") - account_id = await _session_manager.find_account_id_by_session(session) - next_token = None - while True: - async with session.client("cloudcontrol") as cloudcontrol: - try: - params = { - "TypeName": kind, - } - if next_token: - params["NextToken"] = next_token - - response = await cloudcontrol.list_resources(**params) - next_token = response.get("NextToken") - resources = response.get("ResourceDescriptions", []) - if not resources: - break - found_data = True - page_resources = [] - if use_get_resource_api: - resources = await asyncio.gather( - *( - describe_single_resource( - kind, - instance.get("Identifier"), - account_id=account_id, - region=region, - ) - for instance in resources + region = session.region_name + account_id = await _session_manager.find_account_id_by_session(session) + logger.info(f"Resyncing {kind} in account {account_id} in region {region}") + next_token = None + while True: + async with session.client("cloudcontrol") as cloudcontrol: + try: + params = { + "TypeName": kind, + } + if next_token: + params["NextToken"] = next_token + + response = await cloudcontrol.list_resources(**params) + next_token = response.get("NextToken") + resources = response.get("ResourceDescriptions", []) + if not resources: + break + page_resources = [] + if use_get_resource_api: + resources = await asyncio.gather( + *( + describe_single_resource( + kind, + instance.get("Identifier"), + account_id=account_id, + region=region, ) - ) - else: - resources = [ - { - "Identifier": instance.get("Identifier"), - "Properties": json.loads(instance.get("Properties")), - } for instance in resources - ] - - for instance in resources: - serialized = instance.copy() - serialized.update( - { - CustomProperties.KIND: kind, - CustomProperties.ACCOUNT_ID: account_id, - CustomProperties.REGION: region, - } - ) - page_resources.append( - fix_unserializable_date_properties(serialized) ) - logger.info( - f"Fetched batch of {len(page_resources)} from {kind} in region {region}" ) - yield page_resources - - if not next_token: - break - except cloudcontrol.exceptions.ClientError as e: - if is_access_denied_exception(e): - if not is_global: - logger.warning( - f"Skipping resyncing {kind} in region {region} due to missing access permissions" - ) - break # no need to continue querying on the same region since we don't have access - raise e - if found_data and stop_on_first_region: - return + else: + resources = [ + { + "Identifier": instance.get("Identifier"), + "Properties": json.loads(instance.get("Properties")), + } + for instance in resources + ] + + for instance in resources: + serialized = instance.copy() + serialized.update( + { + CustomProperties.KIND: kind, + CustomProperties.ACCOUNT_ID: account_id, + CustomProperties.REGION: region, + } + ) + page_resources.append( + fix_unserializable_date_properties(serialized) + ) + logger.info( + f"Fetched batch of {len(page_resources)} from {kind} in region {region}" + ) + yield page_resources + + if not next_token: + break + except Exception as e: + if is_access_denied_exception(e): + logger.warning( + f"Skipping resyncing {kind} in region {region} in account {account_id} due to missing access permissions" + ) + else: + logger.warning(f"Error resyncing {kind} in region {region}, {e}") + raise e