From be857835ce2d99e232eeea92075c8966d13b3c9c Mon Sep 17 00:00:00 2001 From: mkarmah Date: Sun, 6 Oct 2024 16:36:48 +0000 Subject: [PATCH 1/7] improvement --- integrations/aws/CHANGELOG.md | 9 ++ integrations/aws/main.py | 133 +++++++++++++--------- integrations/aws/pyproject.toml | 2 +- integrations/aws/tests/utils/test_misc.py | 46 +++++++- integrations/aws/utils/aws.py | 32 +++--- integrations/aws/utils/misc.py | 57 ++++++++++ integrations/aws/utils/resources.py | 6 +- 7 files changed, 207 insertions(+), 78 deletions(-) diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index 84979f202a..f664a3a3e5 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.2.46 (2024-10-06) + + +### Improvements + +- Removed iterative calls to the cache for tracking expiry, reducing the likelihood of a thundering herd problem. +- Enhanced semaphore implementation to properly limit concurrency across tasks, rather than within tasks, improving performance and resource utilization. + + ## 0.2.45 (2024-10-01) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index b3b48249d7..29915b73ee 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -34,9 +34,11 @@ ResourceKindsWithSpecialHandling, is_access_denied_exception, is_server_error, + semaphore_async_iterator, semaphore, ) from port_ocean.utils.async_iterators import stream_async_iterators_tasks +import functools async def _handle_global_resource_resync( @@ -71,25 +73,23 @@ async def resync_resources_for_account( credentials: AwsCredentials, kind: str ) -> ASYNC_GENERATOR_RESYNC_TYPE: """Function to handle fetching resources for a single account.""" + errors, regions = [], [] - async with semaphore: # limit the number of concurrent tasks - errors, regions = [], [] - - if is_global_resource(kind): - async for batch in _handle_global_resource_resync(kind, credentials): - yield batch - else: - async for session in credentials.create_session_for_each_region(): - try: - async for batch in resync_cloudcontrol(kind, session): - yield batch - except Exception as exc: - regions.append(session.region_name) - errors.append(exc) - continue - if errors: - message = f"Failed to fetch {kind} for these regions {regions} with {len(errors)} errors in account {credentials.account_id}" - raise ExceptionGroup(message, errors) + if is_global_resource(kind): + async for batch in _handle_global_resource_resync(kind, credentials): + yield batch + else: + async for session in credentials.create_session_for_each_region(): + try: + async for batch in resync_cloudcontrol(kind, session): + yield batch + except Exception as exc: + regions.append(session.region_name) + errors.append(exc) + continue + if errors: + message = f"Failed to fetch {kind} for these regions {regions} with {len(errors)} errors in account {credentials.account_id}" + raise ExceptionGroup(message, errors) @ocean.on_resync() @@ -99,7 +99,10 @@ async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_resources_for_account(credentials, kind) + semaphore_async_iterator( + semaphore, + functools.partial(resync_resources_for_account, credentials, kind), + ) async for credentials in get_accounts() ] if tasks: @@ -119,13 +122,17 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "elasticache", - "describe_cache_clusters", - "CacheClusters", - "Marker", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "elasticache", + "describe_cache_clusters", + "CacheClusters", + "Marker", + ), ) async for session in get_sessions() ] @@ -139,13 +146,17 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "elbv2", - "describe_load_balancers", - "LoadBalancers", - "Marker", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "elbv2", + "describe_load_balancers", + "LoadBalancers", + "Marker", + ), ) async for session in get_sessions() ] @@ -160,13 +171,17 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "acm", - "list_certificates", - "CertificateSummaryList", - "NextToken", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "acm", + "list_certificates", + "CertificateSummaryList", + "NextToken", + ), ) async for session in get_sessions() ] @@ -180,14 +195,18 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "ec2", - "describe_images", - "Images", - "NextToken", - {"Owners": ["self"]}, + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "ec2", + "describe_images", + "Images", + "NextToken", + {"Owners": ["self"]}, + ), ) async for session in get_sessions() ] @@ -200,13 +219,17 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "cloudformation", - "describe_stacks", - "Stacks", - "NextToken", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "cloudformation", + "describe_stacks", + "Stacks", + "NextToken", + ), ) async for session in get_sessions() ] diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index 771bfcdcbf..8994b28f36 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.45" +version = "0.2.46" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] diff --git a/integrations/aws/tests/utils/test_misc.py b/integrations/aws/tests/utils/test_misc.py index 45104a90ad..c5cf6a4ea6 100644 --- a/integrations/aws/tests/utils/test_misc.py +++ b/integrations/aws/tests/utils/test_misc.py @@ -1,5 +1,8 @@ from utils.misc import is_access_denied_exception -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, AsyncGenerator +import asyncio +from utils.misc import semaphore_async_iterator +import pytest class MockException(Exception): @@ -25,3 +28,44 @@ def test_access_denied_exception_with_other_error() -> None: def test_access_denied_exception_no_response_attribute() -> None: e = Exception("Test exception") assert not is_access_denied_exception(e) + + +@pytest.mark.asyncio +async def test_semaphore_async_iterator() -> None: + max_concurrency = 5 + semaphore = asyncio.BoundedSemaphore(max_concurrency) + + concurrent_tasks = 0 + max_concurrent_tasks = 0 + lock = asyncio.Lock() # Protect shared variables + + num_tasks = 20 + + async def mock_function() -> AsyncGenerator[str, None]: + nonlocal concurrent_tasks, max_concurrent_tasks + + async with lock: + concurrent_tasks += 1 + if concurrent_tasks > max_concurrent_tasks: + max_concurrent_tasks = concurrent_tasks + + await asyncio.sleep(0.1) + yield "result" + + async with lock: + concurrent_tasks -= 1 + + async def consume_iterator(async_iterator) -> None: + async for _ in async_iterator: + pass + + tasks = [ + consume_iterator(semaphore_async_iterator(semaphore, mock_function)) + for _ in range(num_tasks) + ] + await asyncio.gather(*tasks) + + assert ( + max_concurrent_tasks <= max_concurrency + ), f"Max concurrent tasks {max_concurrent_tasks} exceeded semaphore limit {max_concurrency}" + assert concurrent_tasks == 0, "Not all tasks have completed" diff --git a/integrations/aws/utils/aws.py b/integrations/aws/utils/aws.py index 840de59138..4de0a93a2b 100644 --- a/integrations/aws/utils/aws.py +++ b/integrations/aws/utils/aws.py @@ -11,7 +11,6 @@ from asyncio import Lock from port_ocean.utils.async_iterators import stream_async_iterators_tasks -from utils.misc import semaphore _session_manager: SessionManager = SessionManager() @@ -81,23 +80,20 @@ async def get_sessions( """ await update_available_access_credentials() - async with semaphore: - if custom_account_id: - credentials = _session_manager.find_credentials_by_account_id( - custom_account_id - ) - async for session in session_factory( - credentials, custom_region, use_default_region - ): - yield session - else: - tasks = [ - session_factory(credentials, custom_region, use_default_region) - async for credentials in get_accounts() - ] - if tasks: - async for batch in stream_async_iterators_tasks(*tasks): - yield batch + if custom_account_id: + credentials = _session_manager.find_credentials_by_account_id(custom_account_id) + async for session in session_factory( + credentials, custom_region, use_default_region + ): + yield session + else: + tasks = [ + session_factory(credentials, custom_region, use_default_region) + async for credentials in get_accounts() + ] + if tasks: + async for batch in stream_async_iterators_tasks(*tasks): + yield batch def validate_request(request: Request) -> tuple[bool, str]: diff --git a/integrations/aws/utils/misc.py b/integrations/aws/utils/misc.py index ca416373f5..97234fb12e 100644 --- a/integrations/aws/utils/misc.py +++ b/integrations/aws/utils/misc.py @@ -1,4 +1,5 @@ import enum +from typing import Any, AsyncIterator, Callable from port_ocean.context.event import event import asyncio @@ -59,3 +60,59 @@ def get_matching_kinds_and_blueprints_from_config( kinds[resource.kind] = [blueprint] return kinds + + +async def semaphore_async_iterator( + semaphore: asyncio.Semaphore, function: Callable[[], AsyncIterator[Any]] +) -> AsyncIterator[Any]: + """ + Executes an asynchronous iterator function under a semaphore to limit concurrency. + + This function ensures that the provided asynchronous iterator function is executed + while respecting the concurrency limit imposed by the semaphore. It acquires the + semaphore before executing the function and releases it after the function completes, + thus controlling the number of concurrent executions. + + Parameters: + semaphore (asyncio.Semaphore | asyncio.BoundedSemaphore): The semaphore used to limit concurrency. + function (Callable[[], AsyncIterator[Any]]): A nullary asynchronous function, - apply arguments with `functools.partial` or an anonymous function (lambda) + that returns an asynchronous iterator. This function is executed under the semaphore. + + Yields: + Any: The items yielded by the asynchronous iterator function. + + Usage: + ```python + import asyncio + + async def async_iterator_function(param1, param2): + # Your async code here + yield ... + + async def async_generator_function(): + # Your async code to retrieve items + param1 = "your_param1" + yield param1 + + async def main(): + semaphore = asyncio.BoundedSemaphore(50) + param2 = "your_param2" + + tasks = [ + semaphore_async_iterator( + semaphore, + lambda: async_iterator_function(param1, param2) # functools.partial(async_iterator_function, param1, param2) + ) + async for param1 in async_generator_function() + ] + + async for batch in stream_async_iterators_tasks(*tasks): + # Process each batch + pass + + asyncio.run(main()) + ``` + """ + async with semaphore: + async for result in function(): + yield result diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index 0b9e26e393..d8f408f314 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -14,7 +14,7 @@ from utils.aws import get_sessions from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE -from utils.aws import _session_manager, update_available_access_credentials +from utils.aws import _session_manager from utils.overrides import AWSResourceConfig from botocore.config import Config as Boto3Config @@ -127,7 +127,7 @@ async def resync_custom_kind( next_token = None if not describe_method_params: describe_method_params = {} - while await update_available_access_credentials(): + while True: async with session.client(service_name) as client: try: params: dict[str, Any] = describe_method_params @@ -172,7 +172,7 @@ async def resync_cloudcontrol( account_id = await _session_manager.find_account_id_by_session(session) logger.info(f"Resyncing {kind} in account {account_id} in region {region}") next_token = None - while await update_available_access_credentials(): + while True: async with session.client("cloudcontrol") as cloudcontrol: try: params = { From 470613dc839b6d0f26ba50249ca83bc00848fd02 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Sun, 6 Oct 2024 17:18:04 +0000 Subject: [PATCH 2/7] lint --- integrations/aws/tests/utils/test_misc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/aws/tests/utils/test_misc.py b/integrations/aws/tests/utils/test_misc.py index c5cf6a4ea6..c9d9c8fea0 100644 --- a/integrations/aws/tests/utils/test_misc.py +++ b/integrations/aws/tests/utils/test_misc.py @@ -55,7 +55,7 @@ async def mock_function() -> AsyncGenerator[str, None]: async with lock: concurrent_tasks -= 1 - async def consume_iterator(async_iterator) -> None: + async def consume_iterator(async_iterator: Any) -> None: async for _ in async_iterator: pass From 383dd0ffc833c17ed4b1c3bfdda80631604e26e9 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Tue, 8 Oct 2024 08:09:29 +0000 Subject: [PATCH 3/7] now importing semaphore_async_iterator from port_ocean --- integrations/aws/main.py | 6 ++- integrations/aws/tests/utils/test_misc.py | 46 +----------------- integrations/aws/utils/misc.py | 57 ----------------------- 3 files changed, 5 insertions(+), 104 deletions(-) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 29915b73ee..1a9ab07d9b 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -34,10 +34,12 @@ ResourceKindsWithSpecialHandling, is_access_denied_exception, is_server_error, - semaphore_async_iterator, semaphore, ) -from port_ocean.utils.async_iterators import stream_async_iterators_tasks +from port_ocean.utils.async_iterators import ( + stream_async_iterators_tasks, + semaphore_async_iterator, +) import functools diff --git a/integrations/aws/tests/utils/test_misc.py b/integrations/aws/tests/utils/test_misc.py index c9d9c8fea0..45104a90ad 100644 --- a/integrations/aws/tests/utils/test_misc.py +++ b/integrations/aws/tests/utils/test_misc.py @@ -1,8 +1,5 @@ from utils.misc import is_access_denied_exception -from typing import Optional, Dict, Any, AsyncGenerator -import asyncio -from utils.misc import semaphore_async_iterator -import pytest +from typing import Optional, Dict, Any class MockException(Exception): @@ -28,44 +25,3 @@ def test_access_denied_exception_with_other_error() -> None: def test_access_denied_exception_no_response_attribute() -> None: e = Exception("Test exception") assert not is_access_denied_exception(e) - - -@pytest.mark.asyncio -async def test_semaphore_async_iterator() -> None: - max_concurrency = 5 - semaphore = asyncio.BoundedSemaphore(max_concurrency) - - concurrent_tasks = 0 - max_concurrent_tasks = 0 - lock = asyncio.Lock() # Protect shared variables - - num_tasks = 20 - - async def mock_function() -> AsyncGenerator[str, None]: - nonlocal concurrent_tasks, max_concurrent_tasks - - async with lock: - concurrent_tasks += 1 - if concurrent_tasks > max_concurrent_tasks: - max_concurrent_tasks = concurrent_tasks - - await asyncio.sleep(0.1) - yield "result" - - async with lock: - concurrent_tasks -= 1 - - async def consume_iterator(async_iterator: Any) -> None: - async for _ in async_iterator: - pass - - tasks = [ - consume_iterator(semaphore_async_iterator(semaphore, mock_function)) - for _ in range(num_tasks) - ] - await asyncio.gather(*tasks) - - assert ( - max_concurrent_tasks <= max_concurrency - ), f"Max concurrent tasks {max_concurrent_tasks} exceeded semaphore limit {max_concurrency}" - assert concurrent_tasks == 0, "Not all tasks have completed" diff --git a/integrations/aws/utils/misc.py b/integrations/aws/utils/misc.py index 97234fb12e..ca416373f5 100644 --- a/integrations/aws/utils/misc.py +++ b/integrations/aws/utils/misc.py @@ -1,5 +1,4 @@ import enum -from typing import Any, AsyncIterator, Callable from port_ocean.context.event import event import asyncio @@ -60,59 +59,3 @@ def get_matching_kinds_and_blueprints_from_config( kinds[resource.kind] = [blueprint] return kinds - - -async def semaphore_async_iterator( - semaphore: asyncio.Semaphore, function: Callable[[], AsyncIterator[Any]] -) -> AsyncIterator[Any]: - """ - Executes an asynchronous iterator function under a semaphore to limit concurrency. - - This function ensures that the provided asynchronous iterator function is executed - while respecting the concurrency limit imposed by the semaphore. It acquires the - semaphore before executing the function and releases it after the function completes, - thus controlling the number of concurrent executions. - - Parameters: - semaphore (asyncio.Semaphore | asyncio.BoundedSemaphore): The semaphore used to limit concurrency. - function (Callable[[], AsyncIterator[Any]]): A nullary asynchronous function, - apply arguments with `functools.partial` or an anonymous function (lambda) - that returns an asynchronous iterator. This function is executed under the semaphore. - - Yields: - Any: The items yielded by the asynchronous iterator function. - - Usage: - ```python - import asyncio - - async def async_iterator_function(param1, param2): - # Your async code here - yield ... - - async def async_generator_function(): - # Your async code to retrieve items - param1 = "your_param1" - yield param1 - - async def main(): - semaphore = asyncio.BoundedSemaphore(50) - param2 = "your_param2" - - tasks = [ - semaphore_async_iterator( - semaphore, - lambda: async_iterator_function(param1, param2) # functools.partial(async_iterator_function, param1, param2) - ) - async for param1 in async_generator_function() - ] - - async for batch in stream_async_iterators_tasks(*tasks): - # Process each batch - pass - - asyncio.run(main()) - ``` - """ - async with semaphore: - async for result in function(): - yield result From 4f91a63ea5b86ba2bf39abf85a3cda1990b34c07 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Tue, 8 Oct 2024 08:24:46 +0000 Subject: [PATCH 4/7] bumped integration version --- integrations/aws/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index 426e634f1e..c6b6a7f66d 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.46" +version = "0.2.47" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] From 19ffa28832040bc7610e4ff4c216d6fb0a09c562 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Fri, 11 Oct 2024 09:01:05 +0000 Subject: [PATCH 5/7] check credentials validity after every batch ingestion --- integrations/aws/main.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 1a9ab07d9b..065dd902af 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -109,6 +109,7 @@ async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -140,6 +141,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -165,6 +167,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -190,6 +193,7 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -214,6 +218,7 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -238,6 +243,7 @@ async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch From 3464ddaebc728e500b7d7d78a83d293ece3313d0 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Fri, 11 Oct 2024 09:01:53 +0000 Subject: [PATCH 6/7] bumped integration version --- integrations/aws/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index 73a7825a3a..d9ecfd509b 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.47" +version = "0.2.48" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] From 81c3da94552e8ba9dfe72870634591fc735fe395 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Mon, 14 Oct 2024 20:28:32 +0000 Subject: [PATCH 7/7] bumped integration version --- integrations/aws/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index 888ec408c3..2b619b48e3 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.48" +version = "0.2.49" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "]