Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][AWS] | Improved Concurrency Control and Eliminated Likelihood of Thundering Herd #1063

Merged
merged 15 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions integrations/aws/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.2.48 (2024-10-11)


### Improvements

- Removed iterative calls to the cache for tracking expiry, reducing the likelihood of a thundering herd problem.
- Enhanced semaphore implementation to properly limit concurrency across tasks, rather than within tasks, improving performance and resource utilization.


## 0.2.47 (2024-10-09)


Expand Down
143 changes: 87 additions & 56 deletions integrations/aws/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
is_server_error,
semaphore,
)
from port_ocean.utils.async_iterators import stream_async_iterators_tasks
from port_ocean.utils.async_iterators import (
stream_async_iterators_tasks,
semaphore_async_iterator,
)
import functools


async def _handle_global_resource_resync(
Expand Down Expand Up @@ -71,25 +75,23 @@ async def resync_resources_for_account(
credentials: AwsCredentials, kind: str
) -> ASYNC_GENERATOR_RESYNC_TYPE:
"""Function to handle fetching resources for a single account."""
errors, regions = [], []

async with semaphore: # limit the number of concurrent tasks
errors, regions = [], []

if is_global_resource(kind):
async for batch in _handle_global_resource_resync(kind, credentials):
yield batch
else:
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
yield batch
except Exception as exc:
regions.append(session.region_name)
errors.append(exc)
continue
if errors:
message = f"Failed to fetch {kind} for these regions {regions} with {len(errors)} errors in account {credentials.account_id}"
raise ExceptionGroup(message, errors)
if is_global_resource(kind):
async for batch in _handle_global_resource_resync(kind, credentials):
yield batch
else:
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
yield batch
except Exception as exc:
regions.append(session.region_name)
errors.append(exc)
continue
if errors:
message = f"Failed to fetch {kind} for these regions {regions} with {len(errors)} errors in account {credentials.account_id}"
raise ExceptionGroup(message, errors)


@ocean.on_resync()
Expand All @@ -99,11 +101,15 @@ async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:

await update_available_access_credentials()
tasks = [
resync_resources_for_account(credentials, kind)
semaphore_async_iterator(
semaphore,
functools.partial(resync_resources_for_account, credentials, kind),
)
async for credentials in get_accounts()
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand All @@ -119,18 +125,23 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

tasks = [
resync_custom_kind(
kind,
session,
"elasticache",
"describe_cache_clusters",
"CacheClusters",
"Marker",
semaphore_async_iterator(
semaphore,
functools.partial(
resync_custom_kind,
kind,
session,
"elasticache",
"describe_cache_clusters",
"CacheClusters",
"Marker",
),
)
async for session in get_sessions()
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand All @@ -139,19 +150,24 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

tasks = [
resync_custom_kind(
kind,
session,
"elbv2",
"describe_load_balancers",
"LoadBalancers",
"Marker",
semaphore_async_iterator(
semaphore,
functools.partial(
resync_custom_kind,
kind,
session,
"elbv2",
"describe_load_balancers",
"LoadBalancers",
"Marker",
),
)
async for session in get_sessions()
]

if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand All @@ -160,59 +176,74 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

tasks = [
resync_custom_kind(
kind,
session,
"acm",
"list_certificates",
"CertificateSummaryList",
"NextToken",
semaphore_async_iterator(
semaphore,
functools.partial(
resync_custom_kind,
kind,
session,
"acm",
"list_certificates",
"CertificateSummaryList",
"NextToken",
),
)
async for session in get_sessions()
]

if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.AMI_IMAGE)
async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()
tasks = [
resync_custom_kind(
kind,
session,
"ec2",
"describe_images",
"Images",
"NextToken",
{"Owners": ["self"]},
semaphore_async_iterator(
semaphore,
functools.partial(
resync_custom_kind,
kind,
session,
"ec2",
"describe_images",
"Images",
"NextToken",
{"Owners": ["self"]},
),
)
async for session in get_sessions()
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.CLOUDFORMATION_STACK)
async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()
tasks = [
resync_custom_kind(
kind,
session,
"cloudformation",
"describe_stacks",
"Stacks",
"NextToken",
semaphore_async_iterator(
semaphore,
functools.partial(
resync_custom_kind,
kind,
session,
"cloudformation",
"describe_stacks",
"Stacks",
"NextToken",
),
)
async for session in get_sessions()
]

if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand Down
2 changes: 1 addition & 1 deletion integrations/aws/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aws"
version = "0.2.47"
version = "0.2.48"
description = "This integration will map all your resources in all the available accounts to your Port entities"
authors = ["Shalev Avhar <[email protected]>", "Erik Zaadi <[email protected]>"]

Expand Down
32 changes: 14 additions & 18 deletions integrations/aws/utils/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from asyncio import Lock

from port_ocean.utils.async_iterators import stream_async_iterators_tasks
from utils.misc import semaphore

_session_manager: SessionManager = SessionManager()

Expand Down Expand Up @@ -81,23 +80,20 @@ async def get_sessions(
"""
await update_available_access_credentials()

async with semaphore:
if custom_account_id:
credentials = _session_manager.find_credentials_by_account_id(
custom_account_id
)
async for session in session_factory(
credentials, custom_region, use_default_region
):
yield session
else:
tasks = [
session_factory(credentials, custom_region, use_default_region)
async for credentials in get_accounts()
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
yield batch
if custom_account_id:
credentials = _session_manager.find_credentials_by_account_id(custom_account_id)
async for session in session_factory(
credentials, custom_region, use_default_region
):
yield session
else:
tasks = [
session_factory(credentials, custom_region, use_default_region)
async for credentials in get_accounts()
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
yield batch


def validate_request(request: Request) -> tuple[bool, str]:
Expand Down
6 changes: 3 additions & 3 deletions integrations/aws/utils/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from utils.aws import get_sessions

from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from utils.aws import _session_manager, update_available_access_credentials
from utils.aws import _session_manager
from utils.overrides import AWSResourceConfig
from botocore.config import Config as Boto3Config

Expand Down Expand Up @@ -127,7 +127,7 @@ async def resync_custom_kind(
next_token = None
if not describe_method_params:
describe_method_params = {}
while await update_available_access_credentials():
omby8888 marked this conversation as resolved.
Show resolved Hide resolved
while True:
async with session.client(service_name) as client:
try:
params: dict[str, Any] = describe_method_params
Expand Down Expand Up @@ -172,7 +172,7 @@ async def resync_cloudcontrol(
account_id = await _session_manager.find_account_id_by_session(session)
logger.info(f"Resyncing {kind} in account {account_id} in region {region}")
next_token = None
while await update_available_access_credentials():
omby8888 marked this conversation as resolved.
Show resolved Hide resolved
while True:
async with session.client("cloudcontrol") as cloudcontrol:
try:
params = {
Expand Down
Loading