generated from cds-snc/project-template
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feat/migrate aws health functions (#601)
* Refactor SRE command function for better readability and maintainability * fix: add missing test * feat: update module to use organizations list accounts fn() * fix: patch the constant in the test * feat: move security hub functions into integrations * feat: move security hub functions into integrations * feat: move guard duty functions to integrations * feat: move config functions to integrations * feat: move cost explorer to integrations * chore: fmt * feat: remove unused assume function * chore: fmt * fix: make kwargs case conversion optional
- Loading branch information
Showing
14 changed files
with
558 additions
and
208 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
import os | ||
from integrations.aws.client import execute_aws_api_call, handle_aws_api_errors | ||
|
||
AUDIT_ROLE_ARN = os.environ.get("AWS_AUDIT_ACCOUNT_ROLE_ARN") | ||
|
||
|
||
@handle_aws_api_errors | ||
def describe_aggregate_compliance_by_config_rules(config_aggregator_name, filters): | ||
"""Retrieves the aggregate compliance of AWS Config rules for an account. | ||
Args: | ||
config_aggregator_name (str): The name of the AWS Config aggregator. | ||
filters (dict): Filters to apply to the compliance results. | ||
Returns: | ||
list: A list of compliance objects | ||
""" | ||
params = { | ||
"ConfigurationAggregatorName": config_aggregator_name, | ||
"Filters": filters, | ||
} | ||
response = execute_aws_api_call( | ||
"config", | ||
"describe_aggregate_compliance_by_config_rules", | ||
paginated=True, | ||
keys=["AggregateComplianceByConfigRules"], | ||
role_arn=AUDIT_ROLE_ARN, | ||
convert_kwargs=False, | ||
**params, | ||
) | ||
return response if response else [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from logging import getLogger | ||
import os | ||
from .client import execute_aws_api_call, handle_aws_api_errors | ||
|
||
logger = getLogger(__name__) | ||
ORG_ROLE_ARN = os.environ.get("AWS_ORG_ACCOUNT_ROLE_ARN") | ||
|
||
|
||
@handle_aws_api_errors | ||
def get_cost_and_usage(time_period, granularity, metrics, filter=None, group_by=None): | ||
params = { | ||
"TimePeriod": time_period, | ||
"Granularity": granularity, | ||
"Metrics": metrics, | ||
} | ||
if filter: | ||
params["Filter"] = filter | ||
if group_by: | ||
params["GroupBy"] = group_by | ||
|
||
return execute_aws_api_call( | ||
"ce", | ||
"get_cost_and_usage", | ||
role_arn=ORG_ROLE_ARN, | ||
convert_kwargs=False, | ||
**params, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import os | ||
from integrations.aws.client import execute_aws_api_call, handle_aws_api_errors | ||
|
||
LOGGING_ROLE_ARN = os.environ.get("AWS_LOGGING_ACCOUNT_ROLE_ARN") | ||
|
||
|
||
@handle_aws_api_errors | ||
def list_detectors(): | ||
"""Retrieves all detectors from AWS GuardDuty | ||
Returns: | ||
list: A list of detector objects. | ||
""" | ||
response = execute_aws_api_call( | ||
"guardduty", | ||
"list_detectors", | ||
paginated=True, | ||
keys=["DetectorIds"], | ||
role_arn=LOGGING_ROLE_ARN, | ||
) | ||
return response if response else [] | ||
|
||
|
||
@handle_aws_api_errors | ||
def get_findings_statistics(detector_id, finding_criteria=None): | ||
"""Retrieves the findings statistics for a given detector | ||
Args: | ||
detector_id (str): The ID of the detector. | ||
finding_criteria (dict, optional): The criteria to use to filter the findings | ||
Returns: | ||
dict: The findings statistics. | ||
""" | ||
|
||
params = { | ||
"DetectorId": detector_id, | ||
"FindingStatisticTypes": ["COUNT_BY_SEVERITY"], | ||
} | ||
if finding_criteria: | ||
params["FindingCriteria"] = finding_criteria | ||
|
||
response = execute_aws_api_call( | ||
"guardduty", | ||
"get_findings_statistics", | ||
role_arn=LOGGING_ROLE_ARN, | ||
convert_kwargs=False, | ||
**params, | ||
) | ||
|
||
return response if response else {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
import os | ||
from integrations.aws.client import execute_aws_api_call, handle_aws_api_errors | ||
|
||
LOGGING_ROLE_ARN = os.environ.get("AWS_LOGGING_ACCOUNT_ROLE_ARN") | ||
|
||
|
||
@handle_aws_api_errors | ||
def get_findings(filters): | ||
"""Retrieves all findings from AWS Security Hub | ||
Args: | ||
filters (dict): Filters to apply to the findings. | ||
Returns: | ||
list: A list of finding objects. | ||
""" | ||
response = execute_aws_api_call( | ||
"securityhub", | ||
"get_findings", | ||
paginated=True, | ||
role_arn=LOGGING_ROLE_ARN, | ||
filters=filters, | ||
) | ||
return response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.