Skip to content

Commit

Permalink
[Integration][AWS] | Improved Concurrency Control and Eliminated Like…
Browse files Browse the repository at this point in the history
…lihood of Thundering Herd (#1063)

# Description

**What:**

- Refactored semaphore implementation to effectively limit concurrency
across tasks.
- Added a util `semaphore_async_iterator` to enable seamless control
over concurrent executions per kind (can be re-used in other
integrations).
- Removed iterative calls to the cache for tracking token expiry,
reducing the likelihood of a thundering herd problem.

**Why:**

- The previous implementation limited concurrency within tasks
(accounts), ideally, this use case requires concurrent limits is to be
global, thus across accounts.
- Iterative cache calls could potentially cause a thundering herd
problem when cache expires and token needed to be refreshed.

**How:**

- Applied the semaphore correctly by wrapping task creation with the
semaphore, ensuring proper concurrency control.
- Implemented unit tests using `pytest` to verify semaphore
functionality and concurrency limits.
- Optimized cache usage by eliminating unnecessary iterative calls for
tracking expiry.

## Type of change

Please leave one option from the following and delete the rest:

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [x] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

#### All tests should be run against the port production environment
(using a testing org).

### Core testing checklist

- [x] Integration able to create all default resources from scratch
- [x] Resync finishes successfully
- [x] Resync able to create entities
- [x] Resync able to update entities
- [x] Resync able to detect and delete entities
- [x] Scheduled resync able to abort existing resync and start a new one
- [x] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [x] Tested deletion of entities that don't pass the selector

### Integration testing checklist

- [x] Integration able to create all default resources from scratch
- [x] Resync able to create entities
- [x] Resync able to update entities
- [x] Resync able to detect and delete entities
- [x] Resync finishes successfully
- [x] If new resource kind is added or updated in the integration, add
example raw data, mapping, and expected result to the `examples` folder
in the integration directory.
- [x] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [x] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [x] Handled rate limiting
- [x] Handled pagination
- [x] Implemented the code in async
- [x] Support Multi account


## Screenshots

NB: Warning log in the screenshot below is just for demonstration
purposes.
<img width="1380" alt="image"
src="https://github.com/user-attachments/assets/16c636ef-e4b1-4912-84f5-24e3035e0fbc">

## API Documentation

Provide links to the API documentation used for this integration.
  • Loading branch information
mk-armah authored Oct 15, 2024
1 parent 8d4c934 commit 7a06b33
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 78 deletions.
9 changes: 9 additions & 0 deletions integrations/aws/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.2.49 (2024-10-14)


### Improvements

- Removed iterative calls to the cache for tracking expiry, reducing the likelihood of a thundering herd problem.
- Enhanced semaphore implementation to properly limit concurrency across tasks, rather than within tasks, improving performance and resource utilization.


## 0.2.48 (2024-10-14)


Expand Down
143 changes: 87 additions & 56 deletions integrations/aws/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
is_server_error,
semaphore,
)
from port_ocean.utils.async_iterators import stream_async_iterators_tasks
from port_ocean.utils.async_iterators import (
stream_async_iterators_tasks,
semaphore_async_iterator,
)
import functools


async def _handle_global_resource_resync(
Expand Down Expand Up @@ -71,25 +75,23 @@ async def resync_resources_for_account(
credentials: AwsCredentials, kind: str
) -> ASYNC_GENERATOR_RESYNC_TYPE:
"""Function to handle fetching resources for a single account."""
errors, regions = [], []

async with semaphore: # limit the number of concurrent tasks
errors, regions = [], []

if is_global_resource(kind):
async for batch in _handle_global_resource_resync(kind, credentials):
yield batch
else:
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
yield batch
except Exception as exc:
regions.append(session.region_name)
errors.append(exc)
continue
if errors:
message = f"Failed to fetch {kind} for these regions {regions} with {len(errors)} errors in account {credentials.account_id}"
raise ExceptionGroup(message, errors)
if is_global_resource(kind):
async for batch in _handle_global_resource_resync(kind, credentials):
yield batch
else:
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
yield batch
except Exception as exc:
regions.append(session.region_name)
errors.append(exc)
continue
if errors:
message = f"Failed to fetch {kind} for these regions {regions} with {len(errors)} errors in account {credentials.account_id}"
raise ExceptionGroup(message, errors)


@ocean.on_resync()
Expand All @@ -99,11 +101,15 @@ async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:

await update_available_access_credentials()
tasks = [
resync_resources_for_account(credentials, kind)
semaphore_async_iterator(
semaphore,
functools.partial(resync_resources_for_account, credentials, kind),
)
async for credentials in get_accounts()
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand All @@ -119,18 +125,23 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

tasks = [
resync_custom_kind(
kind,
session,
"elasticache",
"describe_cache_clusters",
"CacheClusters",
"Marker",
semaphore_async_iterator(
semaphore,
functools.partial(
resync_custom_kind,
kind,
session,
"elasticache",
"describe_cache_clusters",
"CacheClusters",
"Marker",
),
)
async for session in get_sessions()
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand All @@ -139,19 +150,24 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

tasks = [
resync_custom_kind(
kind,
session,
"elbv2",
"describe_load_balancers",
"LoadBalancers",
"Marker",
semaphore_async_iterator(
semaphore,
functools.partial(
resync_custom_kind,
kind,
session,
"elbv2",
"describe_load_balancers",
"LoadBalancers",
"Marker",
),
)
async for session in get_sessions()
]

if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand All @@ -160,59 +176,74 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

tasks = [
resync_custom_kind(
kind,
session,
"acm",
"list_certificates",
"CertificateSummaryList",
"NextToken",
semaphore_async_iterator(
semaphore,
functools.partial(
resync_custom_kind,
kind,
session,
"acm",
"list_certificates",
"CertificateSummaryList",
"NextToken",
),
)
async for session in get_sessions()
]

if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.AMI_IMAGE)
async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()
tasks = [
resync_custom_kind(
kind,
session,
"ec2",
"describe_images",
"Images",
"NextToken",
{"Owners": ["self"]},
semaphore_async_iterator(
semaphore,
functools.partial(
resync_custom_kind,
kind,
session,
"ec2",
"describe_images",
"Images",
"NextToken",
{"Owners": ["self"]},
),
)
async for session in get_sessions()
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.CLOUDFORMATION_STACK)
async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()
tasks = [
resync_custom_kind(
kind,
session,
"cloudformation",
"describe_stacks",
"Stacks",
"NextToken",
semaphore_async_iterator(
semaphore,
functools.partial(
resync_custom_kind,
kind,
session,
"cloudformation",
"describe_stacks",
"Stacks",
"NextToken",
),
)
async for session in get_sessions()
]

if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand Down
2 changes: 1 addition & 1 deletion integrations/aws/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aws"
version = "0.2.48"
version = "0.2.49"
description = "This integration will map all your resources in all the available accounts to your Port entities"
authors = ["Shalev Avhar <[email protected]>", "Erik Zaadi <[email protected]>"]

Expand Down
32 changes: 14 additions & 18 deletions integrations/aws/utils/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from asyncio import Lock

from port_ocean.utils.async_iterators import stream_async_iterators_tasks
from utils.misc import semaphore

_session_manager: SessionManager = SessionManager()

Expand Down Expand Up @@ -81,23 +80,20 @@ async def get_sessions(
"""
await update_available_access_credentials()

async with semaphore:
if custom_account_id:
credentials = _session_manager.find_credentials_by_account_id(
custom_account_id
)
async for session in session_factory(
credentials, custom_region, use_default_region
):
yield session
else:
tasks = [
session_factory(credentials, custom_region, use_default_region)
async for credentials in get_accounts()
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
yield batch
if custom_account_id:
credentials = _session_manager.find_credentials_by_account_id(custom_account_id)
async for session in session_factory(
credentials, custom_region, use_default_region
):
yield session
else:
tasks = [
session_factory(credentials, custom_region, use_default_region)
async for credentials in get_accounts()
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
yield batch


def validate_request(request: Request) -> tuple[bool, str]:
Expand Down
6 changes: 3 additions & 3 deletions integrations/aws/utils/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from utils.aws import get_sessions

from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from utils.aws import _session_manager, update_available_access_credentials
from utils.aws import _session_manager
from utils.overrides import AWSResourceConfig
from botocore.config import Config as Boto3Config

Expand Down Expand Up @@ -127,7 +127,7 @@ async def resync_custom_kind(
next_token = None
if not describe_method_params:
describe_method_params = {}
while await update_available_access_credentials():
while True:
async with session.client(service_name) as client:
try:
params: dict[str, Any] = describe_method_params
Expand Down Expand Up @@ -172,7 +172,7 @@ async def resync_cloudcontrol(
account_id = await _session_manager.find_account_id_by_session(session)
logger.info(f"Resyncing {kind} in account {account_id} in region {region}")
next_token = None
while await update_available_access_credentials():
while True:
async with session.client("cloudcontrol") as cloudcontrol:
try:
params = {
Expand Down

0 comments on commit 7a06b33

Please sign in to comment.