Skip to content

Commit

Permalink
[AWS] Bug fix global resources not synced for all accounts (#845)
Browse files Browse the repository at this point in the history
# Description

What - 
Fixed an issue in the AWS integration where S3 buckets were not synced
for all accounts.

Why - 
S3 buckets are a global resource type, meaning we only search for the
first region showing these buckets and then stop searching to avoid rate
limiting with AWS and wasting time. This caused users with multi-account
setups to only list their first account's buckets.

How - 
Added an iteration of accounts on top of the existing process to ensure
that S3 buckets are synced across all accounts.

## Type of change

Please leave one option from the following and delete the rest:

- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.

---------

Co-authored-by: Shalev Avhar <[email protected]>
  • Loading branch information
shalev007 and Shalev Avhar authored Aug 5, 2024
1 parent 6e04361 commit 77e18ce
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 84 deletions.
7 changes: 7 additions & 0 deletions integrations/aws/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

# Port_Ocean 0.2.24 (2024-08-05)

### Improvements

- Fix global resources not reading through all accounts


## 0.2.23 (2024-08-05)


Expand Down
6 changes: 3 additions & 3 deletions integrations/aws/aws/aws_credentials.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, AsyncIterator, Coroutine, Optional
from typing import AsyncIterator, Optional
import aioboto3


Expand Down Expand Up @@ -50,6 +50,6 @@ async def create_session(self, region: Optional[str] = None) -> aioboto3.Session

async def create_session_for_each_region(
self,
) -> AsyncIterator[Coroutine[Any, Any, aioboto3.Session]]:
) -> AsyncIterator[aioboto3.Session]:
for region in self.enabled_regions:
yield self.create_session(region)
yield await self.create_session(region)
49 changes: 41 additions & 8 deletions integrations/aws/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from starlette import responses
from pydantic import BaseModel

from aws.aws_credentials import AwsCredentials
from port_ocean.core.models import Entity

from utils.resources import (
Expand All @@ -18,6 +19,8 @@

from utils.aws import (
describe_accessible_accounts,
get_accounts,
get_default_region_from_credentials,
get_sessions,
update_available_access_credentials,
validate_request,
Expand All @@ -33,21 +36,51 @@
)


async def _handle_global_resource_resync(
kind: str,
credentials: AwsCredentials,
) -> ASYNC_GENERATOR_RESYNC_TYPE:
denied_access_to_default_region = False
default_region = get_default_region_from_credentials(credentials)
default_session = await credentials.create_session(default_region)
try:
async for batch in resync_cloudcontrol(kind, default_session):
yield batch
except Exception as e:
if is_access_denied_exception(e):
denied_access_to_default_region = True
else:
raise e

if denied_access_to_default_region:
logger.info(f"Trying to resync {kind} in all regions until success")
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
yield batch
break
except Exception as e:
if not is_access_denied_exception(e):
raise e


@ocean.on_resync()
async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
if kind in iter(ResourceKindsWithSpecialHandling):
return
await update_available_access_credentials()
is_global = is_global_resource(kind)
try:
async for batch in resync_cloudcontrol(kind, is_global):
yield batch
except Exception as e:
if is_access_denied_exception(e):
async for batch in resync_cloudcontrol(
kind, is_global=False, stop_on_first_region=True
):
async for credentials in get_accounts():
if is_global:
async for batch in _handle_global_resource_resync(kind, credentials):
yield batch
else:
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
yield batch
except Exception:
continue


@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.ACCOUNT)
Expand Down
2 changes: 1 addition & 1 deletion integrations/aws/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aws"
version = "0.2.23"
version = "0.2.24"
description = "This integration will map all your resources in all the available accounts to your Port entities"
authors = ["Shalev Avhar <[email protected]>", "Erik Zaadi <[email protected]>"]

Expand Down
15 changes: 12 additions & 3 deletions integrations/aws/utils/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ def get_default_region_from_credentials(
return credentials.default_regions[0] if credentials.default_regions else None


async def get_accounts() -> AsyncIterator[AwsCredentials]:
"""
Gets the AWS account IDs that the current IAM role can access.
"""
await update_available_access_credentials()
for credentials in _session_manager._aws_credentials:
yield credentials


async def get_sessions(
custom_account_id: Optional[str] = None,
custom_region: Optional[str] = None,
Expand All @@ -59,18 +68,18 @@ async def get_sessions(
yield await credentials.create_session(custom_region)
else:
async for session in credentials.create_session_for_each_region():
yield await session
yield session
return

for credentials in _session_manager._aws_credentials:
async for credentials in get_accounts():
if use_default_region:
default_region = get_default_region_from_credentials(credentials)
yield await credentials.create_session(default_region)
elif custom_region:
yield await credentials.create_session(custom_region)
else:
async for session in credentials.create_session_for_each_region():
yield await session
yield session


def validate_request(request: Request) -> tuple[bool, str]:
Expand Down
133 changes: 64 additions & 69 deletions integrations/aws/utils/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,80 +163,75 @@ async def resync_custom_kind(


async def resync_cloudcontrol(
kind: str, is_global: bool = False, stop_on_first_region: bool = False
kind: str, session: aioboto3.Session
) -> ASYNC_GENERATOR_RESYNC_TYPE:
use_get_resource_api = typing.cast(
AWSResourceConfig, event.resource_config
).selector.use_get_resource_api
found_data = False
async for session in get_sessions(None, None, is_global):
region = session.region_name
logger.info(f"Resyncing {kind} in region {region}")
account_id = await _session_manager.find_account_id_by_session(session)
next_token = None
while True:
async with session.client("cloudcontrol") as cloudcontrol:
try:
params = {
"TypeName": kind,
}
if next_token:
params["NextToken"] = next_token

response = await cloudcontrol.list_resources(**params)
next_token = response.get("NextToken")
resources = response.get("ResourceDescriptions", [])
if not resources:
break
found_data = True
page_resources = []
if use_get_resource_api:
resources = await asyncio.gather(
*(
describe_single_resource(
kind,
instance.get("Identifier"),
account_id=account_id,
region=region,
)
for instance in resources
region = session.region_name
account_id = await _session_manager.find_account_id_by_session(session)
logger.info(f"Resyncing {kind} in account {account_id} in region {region}")
next_token = None
while True:
async with session.client("cloudcontrol") as cloudcontrol:
try:
params = {
"TypeName": kind,
}
if next_token:
params["NextToken"] = next_token

response = await cloudcontrol.list_resources(**params)
next_token = response.get("NextToken")
resources = response.get("ResourceDescriptions", [])
if not resources:
break
page_resources = []
if use_get_resource_api:
resources = await asyncio.gather(
*(
describe_single_resource(
kind,
instance.get("Identifier"),
account_id=account_id,
region=region,
)
)
else:
resources = [
{
"Identifier": instance.get("Identifier"),
"Properties": json.loads(instance.get("Properties")),
}
for instance in resources
]

for instance in resources:
serialized = instance.copy()
serialized.update(
{
CustomProperties.KIND: kind,
CustomProperties.ACCOUNT_ID: account_id,
CustomProperties.REGION: region,
}
)
page_resources.append(
fix_unserializable_date_properties(serialized)
)
logger.info(
f"Fetched batch of {len(page_resources)} from {kind} in region {region}"
)
yield page_resources

if not next_token:
break
except cloudcontrol.exceptions.ClientError as e:
if is_access_denied_exception(e):
if not is_global:
logger.warning(
f"Skipping resyncing {kind} in region {region} due to missing access permissions"
)
break # no need to continue querying on the same region since we don't have access
raise e
if found_data and stop_on_first_region:
return
else:
resources = [
{
"Identifier": instance.get("Identifier"),
"Properties": json.loads(instance.get("Properties")),
}
for instance in resources
]

for instance in resources:
serialized = instance.copy()
serialized.update(
{
CustomProperties.KIND: kind,
CustomProperties.ACCOUNT_ID: account_id,
CustomProperties.REGION: region,
}
)
page_resources.append(
fix_unserializable_date_properties(serialized)
)
logger.info(
f"Fetched batch of {len(page_resources)} from {kind} in region {region}"
)
yield page_resources

if not next_token:
break
except Exception as e:
if is_access_denied_exception(e):
logger.warning(
f"Skipping resyncing {kind} in region {region} in account {account_id} due to missing access permissions"
)
else:
logger.warning(f"Error resyncing {kind} in region {region}, {e}")
raise e

0 comments on commit 77e18ce

Please sign in to comment.