diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index 70388b1e59..7446787537 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.2.49 (2024-10-14) + + +### Improvements + +- Removed iterative calls to the cache for tracking expiry, reducing the likelihood of a thundering herd problem. +- Enhanced semaphore implementation to properly limit concurrency across tasks, rather than within tasks, improving performance and resource utilization. + + ## 0.2.48 (2024-10-14) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index b3b48249d7..065dd902af 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -36,7 +36,11 @@ is_server_error, semaphore, ) -from port_ocean.utils.async_iterators import stream_async_iterators_tasks +from port_ocean.utils.async_iterators import ( + stream_async_iterators_tasks, + semaphore_async_iterator, +) +import functools async def _handle_global_resource_resync( @@ -71,25 +75,23 @@ async def resync_resources_for_account( credentials: AwsCredentials, kind: str ) -> ASYNC_GENERATOR_RESYNC_TYPE: """Function to handle fetching resources for a single account.""" + errors, regions = [], [] - async with semaphore: # limit the number of concurrent tasks - errors, regions = [], [] - - if is_global_resource(kind): - async for batch in _handle_global_resource_resync(kind, credentials): - yield batch - else: - async for session in credentials.create_session_for_each_region(): - try: - async for batch in resync_cloudcontrol(kind, session): - yield batch - except Exception as exc: - regions.append(session.region_name) - errors.append(exc) - continue - if errors: - message = f"Failed to fetch {kind} for these regions {regions} with {len(errors)} errors in account {credentials.account_id}" - raise ExceptionGroup(message, errors) + if is_global_resource(kind): + async for batch in _handle_global_resource_resync(kind, credentials): + yield batch + else: + async for session in credentials.create_session_for_each_region(): + try: + async for batch in resync_cloudcontrol(kind, session): + yield batch + except Exception as exc: + regions.append(session.region_name) + errors.append(exc) + continue + if errors: + message = f"Failed to fetch {kind} for these regions {regions} with {len(errors)} errors in account {credentials.account_id}" + raise ExceptionGroup(message, errors) @ocean.on_resync() @@ -99,11 +101,15 @@ async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_resources_for_account(credentials, kind) + semaphore_async_iterator( + semaphore, + functools.partial(resync_resources_for_account, credentials, kind), + ) async for credentials in get_accounts() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -119,18 +125,23 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "elasticache", - "describe_cache_clusters", - "CacheClusters", - "Marker", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "elasticache", + "describe_cache_clusters", + "CacheClusters", + "Marker", + ), ) async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -139,19 +150,24 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "elbv2", - "describe_load_balancers", - "LoadBalancers", - "Marker", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "elbv2", + "describe_load_balancers", + "LoadBalancers", + "Marker", + ), ) async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -160,19 +176,24 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "acm", - "list_certificates", - "CertificateSummaryList", - "NextToken", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "acm", + "list_certificates", + "CertificateSummaryList", + "NextToken", + ), ) async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -180,19 +201,24 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "ec2", - "describe_images", - "Images", - "NextToken", - {"Owners": ["self"]}, + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "ec2", + "describe_images", + "Images", + "NextToken", + {"Owners": ["self"]}, + ), ) async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -200,19 +226,24 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "cloudformation", - "describe_stacks", - "Stacks", - "NextToken", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "cloudformation", + "describe_stacks", + "Stacks", + "NextToken", + ), ) async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index 888ec408c3..2b619b48e3 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.48" +version = "0.2.49" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] diff --git a/integrations/aws/utils/aws.py b/integrations/aws/utils/aws.py index 840de59138..4de0a93a2b 100644 --- a/integrations/aws/utils/aws.py +++ b/integrations/aws/utils/aws.py @@ -11,7 +11,6 @@ from asyncio import Lock from port_ocean.utils.async_iterators import stream_async_iterators_tasks -from utils.misc import semaphore _session_manager: SessionManager = SessionManager() @@ -81,23 +80,20 @@ async def get_sessions( """ await update_available_access_credentials() - async with semaphore: - if custom_account_id: - credentials = _session_manager.find_credentials_by_account_id( - custom_account_id - ) - async for session in session_factory( - credentials, custom_region, use_default_region - ): - yield session - else: - tasks = [ - session_factory(credentials, custom_region, use_default_region) - async for credentials in get_accounts() - ] - if tasks: - async for batch in stream_async_iterators_tasks(*tasks): - yield batch + if custom_account_id: + credentials = _session_manager.find_credentials_by_account_id(custom_account_id) + async for session in session_factory( + credentials, custom_region, use_default_region + ): + yield session + else: + tasks = [ + session_factory(credentials, custom_region, use_default_region) + async for credentials in get_accounts() + ] + if tasks: + async for batch in stream_async_iterators_tasks(*tasks): + yield batch def validate_request(request: Request) -> tuple[bool, str]: diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index 0b9e26e393..d8f408f314 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -14,7 +14,7 @@ from utils.aws import get_sessions from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE -from utils.aws import _session_manager, update_available_access_credentials +from utils.aws import _session_manager from utils.overrides import AWSResourceConfig from botocore.config import Config as Boto3Config @@ -127,7 +127,7 @@ async def resync_custom_kind( next_token = None if not describe_method_params: describe_method_params = {} - while await update_available_access_credentials(): + while True: async with session.client(service_name) as client: try: params: dict[str, Any] = describe_method_params @@ -172,7 +172,7 @@ async def resync_cloudcontrol( account_id = await _session_manager.find_account_id_by_session(session) logger.info(f"Resyncing {kind} in account {account_id} in region {region}") next_token = None - while await update_available_access_credentials(): + while True: async with session.client("cloudcontrol") as cloudcontrol: try: params = {