From bde302fbe9c3ea638c8bf1b6848bce7412c7aaf8 Mon Sep 17 00:00:00 2001 From: Konstantin Krastev Date: Sun, 17 Feb 2019 00:51:36 +0200 Subject: [PATCH 1/7] Apply v1.2 CIS benchmark requirements Py3 compatibility --- .../aws-cis-foundation-benchmark-checklist.py | 444 ++++++++---------- 1 file changed, 201 insertions(+), 243 deletions(-) diff --git a/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py b/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py index 4c28862..5bb2bd5 100644 --- a/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py +++ b/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py @@ -29,7 +29,7 @@ # --- Script controls --- # CIS Benchmark version referenced. Only used in web report. -AWS_CIS_BENCHMARK_VERSION = "1.1" +AWS_CIS_BENCHMARK_VERSION = "1.2" # Would you like a HTML file generated with the result? # This file will be delivered using a signed URL. @@ -135,7 +135,7 @@ def control_1_1_root_use(credreport): pass else: print("Something went wrong") - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except #pylint: disable=broad-except # 1.2 Ensure multi-factor authentication (MFA) is enabled for all IAM users that have a console password (Scored) @@ -162,7 +162,7 @@ def control_1_2_mfa_on_password_enabled_iam(credreport): result = False failReason = "No MFA on users with password. " offenders.append(str(credreport[i]['arn'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.3 Ensure credentials unused for 90 days or greater are disabled (Scored) @@ -218,7 +218,7 @@ def control_1_3_unused_credentials(credreport): except: # Never used pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.4 Ensure access keys are rotated every 90 days or less (Scored) @@ -283,7 +283,7 @@ def control_1_4_rotated_keys(credreport): offenders.append(str(credreport[i]['arn']) + ":unused key2") except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.5 Ensure IAM password policy requires at least one uppercase letter (Scored) @@ -309,7 +309,7 @@ def control_1_5_password_policy_uppercase(passwordpolicy): if passwordpolicy['RequireUppercaseCharacters'] is False: result = False failReason = "Password policy does not require at least one uppercase letter" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.6 Ensure IAM password policy requires at least one lowercase letter (Scored) @@ -335,7 +335,7 @@ def control_1_6_password_policy_lowercase(passwordpolicy): if passwordpolicy['RequireLowercaseCharacters'] is False: result = False failReason = "Password policy does not require at least one uppercase letter" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.7 Ensure IAM password policy requires at least one symbol (Scored) @@ -361,7 +361,7 @@ def control_1_7_password_policy_symbol(passwordpolicy): if passwordpolicy['RequireSymbols'] is False: result = False failReason = "Password policy does not require at least one symbol" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.8 Ensure IAM password policy requires at least one number (Scored) @@ -387,7 +387,7 @@ def control_1_8_password_policy_number(passwordpolicy): if passwordpolicy['RequireNumbers'] is False: result = False failReason = "Password policy does not require at least one number" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.9 Ensure IAM password policy requires minimum length of 14 or greater (Scored) @@ -413,7 +413,7 @@ def control_1_9_password_policy_length(passwordpolicy): if passwordpolicy['MinimumPasswordLength'] < 14: result = False failReason = "Password policy does not require at least 14 characters" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.10 Ensure IAM password policy prevents password reuse (Scored) @@ -445,7 +445,7 @@ def control_1_10_password_policy_reuse(passwordpolicy): except: result = False failReason = "Password policy does not prevent reusing last 24 passwords" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.11 Ensure IAM password policy expires passwords within 90 days or less (Scored) @@ -475,7 +475,7 @@ def control_1_11_password_policy_expire(passwordpolicy): else: result = False failReason = "Password policy does not expire passwords after 90 days or less" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.12 Ensure no root account access key exists (Scored) @@ -497,7 +497,7 @@ def control_1_12_root_key_exists(credreport): if (credreport[0]['access_key_1_active'] == "true") or (credreport[0]['access_key_2_active'] == "true"): result = False failReason = "Root have active access keys" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.13 Ensure MFA is enabled for the "root" account (Scored) @@ -517,7 +517,7 @@ def control_1_13_root_mfa_enabled(): if response['SummaryMap']['AccountMFAEnabled'] != 1: result = False failReason = "Root account not using MFA" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.14 Ensure hardware MFA is enabled for the "root" account (Scored) @@ -550,7 +550,7 @@ def control_1_14_root_hardware_mfa_enabled(): else: result = False failReason = "Root account not using MFA" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.15 Ensure security questions are registered in the AWS account (Not Scored/Manual) @@ -567,7 +567,7 @@ def control_1_15_security_questions_registered(): description = "Ensure security questions are registered in the AWS account, please verify manually" scored = False failReason = "Control not implemented using API, please verify manually" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.16 Ensure IAM policies are attached only to groups or roles (Scored) @@ -599,11 +599,10 @@ def control_1_16_no_policies_on_iam_users(): result = False failReason = "IAM user have inline policy attached" offenders.append(str(n['Arn'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except - -# 1.17 Enable detailed billing (Scored) -def control_1_17_detailed_billing_enabled(): +# 1.17 Maintain current contact details (not Scored) +def control_1_17_maintain_current_contact_details(): """Summary Returns: @@ -613,47 +612,14 @@ def control_1_17_detailed_billing_enabled(): failReason = "" offenders = [] control = "1.17" - description = "Enable detailed billing, please verify manually" - scored = True - failReason = "Control not implemented using API, please verify manually" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} - - -# 1.18 Ensure IAM Master and IAM Manager roles are active (Scored) -def control_1_18_ensure_iam_master_and_manager_roles(): - """Summary - - Returns: - TYPE: Description - """ - result = "True" - failReason = "No IAM Master or IAM Manager role created" - offenders = [] - control = "1.18" - description = "Ensure IAM Master and IAM Manager roles are active. Control under review/investigation" - scored = True - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} - - -# 1.19 Maintain current contact details (Scored) -def control_1_19_maintain_current_contact_details(): - """Summary - - Returns: - TYPE: Description - """ - result = "Manual" - failReason = "" - offenders = [] - control = "1.19" description = "Maintain current contact details, please verify manually" - scored = True + scored = False failReason = "Control not implemented using API, please verify manually" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except -# 1.20 Ensure security contact information is registered (Scored) -def control_1_20_ensure_security_contact_details(): +# 1.18 Ensure security contact information is registered (not Scored) +def control_1_18_ensure_security_contact_details(): """Summary Returns: @@ -662,15 +628,15 @@ def control_1_20_ensure_security_contact_details(): result = "Manual" failReason = "" offenders = [] - control = "1.20" + control = "1.18" description = "Ensure security contact information is registered, please verify manually" - scored = True + scored = False failReason = "Control not implemented using API, please verify manually" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except -# 1.21 Ensure IAM instance roles are used for AWS resource access from instances (Scored) -def control_1_21_ensure_iam_instance_roles_used(): +# 1.19 Ensure IAM instance roles are used for AWS resource access from instances (Scored) +def control_1_19_ensure_iam_instance_roles_used(): """Summary Returns: @@ -679,7 +645,7 @@ def control_1_21_ensure_iam_instance_roles_used(): result = True failReason = "" offenders = [] - control = "1.21" + control = "1.19" description = "Ensure IAM instance roles are used for AWS resource access from instances, application code is not audited" scored = True failReason = "Instance not assigned IAM role for EC2" @@ -693,11 +659,11 @@ def control_1_21_ensure_iam_instance_roles_used(): except: result = False offenders.append(str(response['Reservations'][n]['Instances'][0]['InstanceId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except -# 1.22 Ensure a support role has been created to manage incidents with AWS Support (Scored) -def control_1_22_ensure_incident_management_roles(): +# 1.20 Ensure a support role has been created to manage incidents with AWS Support (Scored) +def control_1_20_ensure_incident_management_roles(): """Summary Returns: @@ -706,7 +672,7 @@ def control_1_22_ensure_incident_management_roles(): result = True failReason = "" offenders = [] - control = "1.22" + control = "1.20" description = "Ensure a support role has been created to manage incidents with AWS Support" scored = True offenders = [] @@ -720,11 +686,11 @@ def control_1_22_ensure_incident_management_roles(): except: result = False failReason = "AWSSupportAccess policy not created" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except -# 1.23 Do not setup access keys during initial user setup for all IAM users that have a console password (Not Scored) -def control_1_23_no_active_initial_access_keys_with_iam_user(credreport): +# 1.21 Do not setup access keys during initial user setup for all IAM users that have a console password (Not Scored) +def control_1_21_no_active_initial_access_keys_with_iam_user(credreport): """Summary Returns: @@ -733,7 +699,7 @@ def control_1_23_no_active_initial_access_keys_with_iam_user(credreport): result = True failReason = "" offenders = [] - control = "1.23" + control = "1.21" description = "Do not setup access keys during initial user setup for all IAM users that have a console password" scored = False offenders = [] @@ -747,11 +713,11 @@ def control_1_23_no_active_initial_access_keys_with_iam_user(credreport): result = False failReason = "Users with keys created at user creation time found" offenders.append(str(credreport[n]['arn']) + ":" + str(m['AccessKeyId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except -# 1.24 Ensure IAM policies that allow full "*:*" administrative privileges are not created (Scored) -def control_1_24_no_overly_permissive_policies(): +# 1.22 Ensure IAM policies that allow full "*:*" administrative privileges are not created (Scored) +def control_1_22_no_overly_permissive_policies(): """Summary Returns: @@ -760,7 +726,7 @@ def control_1_24_no_overly_permissive_policies(): result = True failReason = "" offenders = [] - control = "1.24" + control = "1.22" description = "Ensure IAM policies that allow full administrative privileges are not created" scored = True offenders = [] @@ -794,7 +760,7 @@ def control_1_24_no_overly_permissive_policies(): result = False failReason = "Found full administrative policy" offenders.append(str(m['Arn'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # --- 2 Logging --- @@ -815,7 +781,7 @@ def control_2_1_ensure_cloud_trail_all_regions(cloudtrails): control = "2.1" description = "Ensure CloudTrail is enabled in all regions" scored = True - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: if o['IsMultiRegionTrail']: client = boto3.client('cloudtrail', region_name=m) @@ -827,7 +793,7 @@ def control_2_1_ensure_cloud_trail_all_regions(cloudtrails): break if result is False: failReason = "No enabled multi region trails found" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.2 Ensure CloudTrail log file validation is enabled (Scored) @@ -846,7 +812,7 @@ def control_2_2_ensure_cloudtrail_validation(cloudtrails): control = "2.2" description = "Ensure CloudTrail log file validation is enabled" scored = True - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: if o['LogFileValidationEnabled'] is False: result = False @@ -854,10 +820,10 @@ def control_2_2_ensure_cloudtrail_validation(cloudtrails): offenders.append(str(o['TrailARN'])) offenders = set(offenders) offenders = list(offenders) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except -# 2.3 Ensure the S3 bucket CloudTrail logs to is not publicly accessible (Scored) +# 2.3 Ensure the S3 bucket used to store CloudTrail logs is not publicly accessible (Scored) def control_2_3_ensure_cloudtrail_bucket_not_public(cloudtrails): """Summary @@ -873,7 +839,7 @@ def control_2_3_ensure_cloudtrail_bucket_not_public(cloudtrails): control = "2.3" description = "Ensure the S3 bucket CloudTrail logs to is not publicly accessible" scored = True - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: # We only want to check cases where there is a bucket if "S3BucketName" in str(o): @@ -904,7 +870,7 @@ def control_2_3_ensure_cloudtrail_bucket_not_public(cloudtrails): result = False offenders.append(str(o['TrailARN']) + "NoS3Logging") failReason = "Cloudtrail not configured to log to S3. " + failReason - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.4 Ensure CloudTrail trails are integrated with CloudWatch Logs (Scored) @@ -923,7 +889,7 @@ def control_2_4_ensure_cloudtrail_cloudwatch_logs_integration(cloudtrails): control = "2.4" description = "Ensure CloudTrail trails are integrated with CloudWatch Logs" scored = True - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if "arn:aws:logs" in o['CloudWatchLogsLogGroupArn']: @@ -936,7 +902,7 @@ def control_2_4_ensure_cloudtrail_cloudwatch_logs_integration(cloudtrails): result = False failReason = "CloudTrails without CloudWatch Logs discovered" offenders.append(str(o['TrailARN'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.5 Ensure AWS Config is enabled in all regions (Scored) @@ -1006,7 +972,7 @@ def control_2_5_ensure_config_all_regions(regions): result = False failReason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" offenders.append("Global:NotRecording") - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.6 Ensure S3 bucket access logging is enabled on the CloudTrail S3 bucket (Scored) @@ -1025,7 +991,7 @@ def control_2_6_ensure_cloudtrail_bucket_logging(cloudtrails): control = "2.6" description = "Ensure S3 bucket access logging is enabled on the CloudTrail S3 bucket" scored = True - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: # it is possible to have a cloudtrail configured with a nonexistant bucket try: @@ -1041,7 +1007,7 @@ def control_2_6_ensure_cloudtrail_bucket_logging(cloudtrails): result = False failReason = failReason + "CloudTrail S3 bucket without logging discovered" offenders.append("Trail:" + str(o['TrailARN']) + " - S3Bucket:" + str(o['S3BucketName'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.7 Ensure CloudTrail logs are encrypted at rest using KMS CMKs (Scored) @@ -1060,7 +1026,7 @@ def control_2_7_ensure_cloudtrail_encryption_kms(cloudtrails): control = "2.7" description = "Ensure CloudTrail logs are encrypted at rest using KMS CMKs" scored = True - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['KmsKeyId']: @@ -1069,7 +1035,7 @@ def control_2_7_ensure_cloudtrail_encryption_kms(cloudtrails): result = False failReason = "CloudTrail not using KMS CMK for encryption discovered" offenders.append("Trail:" + str(o['TrailARN'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.8 Ensure rotation for customer created CMKs is enabled (Scored) @@ -1101,7 +1067,48 @@ def control_2_8_ensure_kms_cmk_rotation(regions): offenders.append("Key:" + str(keyDescription['KeyMetadata']['Arn'])) except: pass # Ignore keys without permission, for example ACM key - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + + +# moved from 4.3 to 2.9 in v1.2 of CISFB +# 2.9 Ensure VPC flow logging is enabled in all VPCs (Scored) +def control_2_9_ensure_flow_logs_enabled_on_all_vpc(regions): + """Summary + + Returns: + TYPE: Description + """ + result = True + failReason = "" + offenders = [] + control = "2.9" + description = "Ensure VPC flow logging is enabled in all VPCs" + scored = True + for n in regions: + client = boto3.client('ec2', region_name=n) + flowlogs = client.describe_flow_logs( + # No paginator support in boto atm. + ) + activeLogs = [] + for m in flowlogs['FlowLogs']: + if "vpc-" in str(m['ResourceId']): + activeLogs.append(m['ResourceId']) + vpcs = client.describe_vpcs( + Filters=[ + { + 'Name': 'state', + 'Values': [ + 'available', + ] + }, + ] + ) + for m in vpcs['Vpcs']: + if not str(m['VpcId']) in str(activeLogs): + result = False + failReason = "VPC without active VPC Flow Logs found" + offenders.append(str(n) + " : " + str(m['VpcId'])) + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # --- Monitoring --- @@ -1120,7 +1127,7 @@ def control_3_1_ensure_log_metric_filter_unauthorized_api_calls(cloudtrails): description = "Ensure log metric filter unauthorized api calls" scored = True failReason = "Incorrect log metric alerts for unauthorized_api_calls" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1146,7 +1153,7 @@ def control_3_1_ensure_log_metric_filter_unauthorized_api_calls(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.2 Ensure a log metric filter and alarm exist for Management Console sign-in without MFA (Scored) @@ -1163,7 +1170,7 @@ def control_3_2_ensure_log_metric_filter_console_signin_no_mfa(cloudtrails): description = "Ensure a log metric filter and alarm exist for Management Console sign-in without MFA" scored = True failReason = "Incorrect log metric alerts for management console signin without MFA" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1189,7 +1196,7 @@ def control_3_2_ensure_log_metric_filter_console_signin_no_mfa(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.3 Ensure a log metric filter and alarm exist for usage of "root" account (Scored) @@ -1206,7 +1213,7 @@ def control_3_3_ensure_log_metric_filter_root_usage(cloudtrails): description = "Ensure a log metric filter and alarm exist for root usage" scored = True failReason = "Incorrect log metric alerts for root usage" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1216,7 +1223,8 @@ def control_3_3_ensure_log_metric_filter_root_usage(cloudtrails): logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.userIdentity\.type\s*=\s*\"?Root", "\$\.userIdentity\.invokedBy\s*NOT\s*EXISTS", "\$\.eventType\s*\!=\s*\"?AwsServiceEvent(\"|\)|\s)"] + patterns = ["\$\.userIdentity\.type\s*=\s*\"?Root", "\$\.userIdentity\.invokedBy\s*NOT\s*EXISTS", + "\$\.eventType\s*\!=\s*\"?AwsServiceEvent(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1232,7 +1240,7 @@ def control_3_3_ensure_log_metric_filter_root_usage(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.4 Ensure a log metric filter and alarm exist for IAM policy changes (Scored) @@ -1249,7 +1257,7 @@ def control_3_4_ensure_log_metric_iam_policy_change(cloudtrails): description = "Ensure a log metric filter and alarm exist for IAM changes" scored = True failReason = "Incorrect log metric alerts for IAM policy changes" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1259,7 +1267,14 @@ def control_3_4_ensure_log_metric_iam_policy_change(cloudtrails): logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.eventName\s*=\s*\"?DeleteGroupPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteRolePolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteUserPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutGroupPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutRolePolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutUserPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?CreatePolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeletePolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?CreatePolicyVersion(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeletePolicyVersion(\"|\)|\s)", "\$\.eventName\s*=\s*\"?AttachRolePolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DetachRolePolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?AttachUserPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DetachUserPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?AttachGroupPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DetachGroupPolicy(\"|\)|\s)"] + patterns = ["\$\.eventName\s*=\s*\"?DeleteGroupPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteRolePolicy(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?DeleteUserPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutGroupPolicy(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?PutRolePolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutUserPolicy(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?CreatePolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeletePolicy(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?CreatePolicyVersion(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeletePolicyVersion(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?AttachRolePolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DetachRolePolicy(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?AttachUserPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DetachUserPolicy(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?AttachGroupPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DetachGroupPolicy(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1275,7 +1290,7 @@ def control_3_4_ensure_log_metric_iam_policy_change(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.5 Ensure a log metric filter and alarm exist for CloudTrail configuration changes (Scored) @@ -1292,7 +1307,7 @@ def control_3_5_ensure_log_metric_cloudtrail_configuration_changes(cloudtrails): description = "Ensure a log metric filter and alarm exist for CloudTrail configuration changes" scored = True failReason = "Incorrect log metric alerts for CloudTrail configuration changes" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1302,7 +1317,9 @@ def control_3_5_ensure_log_metric_cloudtrail_configuration_changes(cloudtrails): logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.eventName\s*=\s*\"?CreateTrail(\"|\)|\s)", "\$\.eventName\s*=\s*\"?UpdateTrail(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteTrail(\"|\)|\s)", "\$\.eventName\s*=\s*\"?StartLogging(\"|\)|\s)", "\$\.eventName\s*=\s*\"?StopLogging(\"|\)|\s)"] + patterns = ["\$\.eventName\s*=\s*\"?CreateTrail(\"|\)|\s)", "\$\.eventName\s*=\s*\"?UpdateTrail(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?DeleteTrail(\"|\)|\s)", "\$\.eventName\s*=\s*\"?StartLogging(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?StopLogging(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1318,7 +1335,7 @@ def control_3_5_ensure_log_metric_cloudtrail_configuration_changes(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.6 Ensure a log metric filter and alarm exist for AWS Management Console authentication failures (Scored) @@ -1335,7 +1352,7 @@ def control_3_6_ensure_log_metric_console_auth_failures(cloudtrails): description = "Ensure a log metric filter and alarm exist for console auth failures" scored = True failReason = "Ensure a log metric filter and alarm exist for console auth failures" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1361,7 +1378,7 @@ def control_3_6_ensure_log_metric_console_auth_failures(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.7 Ensure a log metric filter and alarm exist for disabling or scheduled deletion of customer created CMKs (Scored) @@ -1378,7 +1395,7 @@ def control_3_7_ensure_log_metric_disabling_scheduled_delete_of_kms_cmk(cloudtra description = "Ensure a log metric filter and alarm exist for disabling or scheduling deletion of KMS CMK" scored = True failReason = "Ensure a log metric filter and alarm exist for disabling or scheduling deletion of KMS CMK" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1388,7 +1405,8 @@ def control_3_7_ensure_log_metric_disabling_scheduled_delete_of_kms_cmk(cloudtra logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.eventSource\s*=\s*\"?kms\.amazonaws\.com(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DisableKey(\"|\)|\s)", "\$\.eventName\s*=\s*\"?ScheduleKeyDeletion(\"|\)|\s)"] + patterns = ["\$\.eventSource\s*=\s*\"?kms\.amazonaws\.com(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DisableKey(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?ScheduleKeyDeletion(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1404,7 +1422,7 @@ def control_3_7_ensure_log_metric_disabling_scheduled_delete_of_kms_cmk(cloudtra result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.8 Ensure a log metric filter and alarm exist for S3 bucket policy changes (Scored) @@ -1421,7 +1439,7 @@ def control_3_8_ensure_log_metric_s3_bucket_policy_changes(cloudtrails): description = "Ensure a log metric filter and alarm exist for S3 bucket policy changes" scored = True failReason = "Ensure a log metric filter and alarm exist for S3 bucket policy changes" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1431,7 +1449,11 @@ def control_3_8_ensure_log_metric_s3_bucket_policy_changes(cloudtrails): logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.eventSource\s*=\s*\"?s3\.amazonaws\.com(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutBucketAcl(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutBucketPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutBucketCors(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutBucketLifecycle(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutBucketReplication(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteBucketPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteBucketCors(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteBucketLifecycle(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteBucketReplication(\"|\)|\s)"] + patterns = ["\$\.eventSource\s*=\s*\"?s3\.amazonaws\.com(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutBucketAcl(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?PutBucketPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutBucketCors(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?PutBucketLifecycle(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutBucketReplication(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?DeleteBucketPolicy(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteBucketCors(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?DeleteBucketLifecycle(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteBucketReplication(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1447,7 +1469,7 @@ def control_3_8_ensure_log_metric_s3_bucket_policy_changes(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.9 Ensure a log metric filter and alarm exist for AWS Config configuration changes (Scored) @@ -1464,7 +1486,7 @@ def control_3_9_ensure_log_metric_config_configuration_changes(cloudtrails): description = "Ensure a log metric filter and alarm exist for for AWS Config configuration changes" scored = True failReason = "Ensure a log metric filter and alarm exist for for AWS Config configuration changes" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1474,7 +1496,9 @@ def control_3_9_ensure_log_metric_config_configuration_changes(cloudtrails): logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.eventSource\s*=\s*\"?config\.amazonaws\.com(\"|\)|\s)", "\$\.eventName\s*=\s*\"?StopConfigurationRecorder(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteDeliveryChannel(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutDeliveryChannel(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutConfigurationRecorder(\"|\)|\s)"] + patterns = ["\$\.eventSource\s*=\s*\"?config\.amazonaws\.com(\"|\)|\s)", "\$\.eventName\s*=\s*\"?StopConfigurationRecorder(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?DeleteDeliveryChannel(\"|\)|\s)", "\$\.eventName\s*=\s*\"?PutDeliveryChannel(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?PutConfigurationRecorder(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1490,7 +1514,7 @@ def control_3_9_ensure_log_metric_config_configuration_changes(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.10 Ensure a log metric filter and alarm exist for security group changes (Scored) @@ -1507,7 +1531,7 @@ def control_3_10_ensure_log_metric_security_group_changes(cloudtrails): description = "Ensure a log metric filter and alarm exist for security group changes" scored = True failReason = "Ensure a log metric filter and alarm exist for security group changes" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1517,7 +1541,9 @@ def control_3_10_ensure_log_metric_security_group_changes(cloudtrails): logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.eventName\s*=\s*\"?AuthorizeSecurityGroupIngress(\"|\)|\s)", "\$\.eventName\s*=\s*\"?AuthorizeSecurityGroupEgress(\"|\)|\s)", "\$\.eventName\s*=\s*\"?RevokeSecurityGroupIngress(\"|\)|\s)", "\$\.eventName\s*=\s*\"?RevokeSecurityGroupEgress(\"|\)|\s)", "\$\.eventName\s*=\s*\"?CreateSecurityGroup(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteSecurityGroup(\"|\)|\s)"] + patterns = ["\$\.eventName\s*=\s*\"?AuthorizeSecurityGroupIngress(\"|\)|\s)", "\$\.eventName\s*=\s*\"?AuthorizeSecurityGroupEgress(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?RevokeSecurityGroupIngress(\"|\)|\s)", "\$\.eventName\s*=\s*\"?RevokeSecurityGroupEgress(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?CreateSecurityGroup(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteSecurityGroup(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1533,7 +1559,7 @@ def control_3_10_ensure_log_metric_security_group_changes(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.11 Ensure a log metric filter and alarm exist for changes to Network Access Control Lists (NACL) (Scored) @@ -1550,7 +1576,7 @@ def control_3_11_ensure_log_metric_nacl(cloudtrails): description = "Ensure a log metric filter and alarm exist for changes to Network Access Control Lists (NACL)" scored = True failReason = "Ensure a log metric filter and alarm exist for changes to Network Access Control Lists (NACL)" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1560,7 +1586,10 @@ def control_3_11_ensure_log_metric_nacl(cloudtrails): logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.eventName\s*=\s*\"?CreateNetworkAcl(\"|\)|\s)", "\$\.eventName\s*=\s*\"?CreateNetworkAclEntry(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteNetworkAcl(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteNetworkAclEntry(\"|\)|\s)", "\$\.eventName\s*=\s*\"?ReplaceNetworkAclEntry(\"|\)|\s)", "\$\.eventName\s*=\s*\"?ReplaceNetworkAclAssociation(\"|\)|\s)"] + patterns = ["\$\.eventName\s*=\s*\"?CreateNetworkAcl(\"|\)|\s)", "\$\.eventName\s*=\s*\"?CreateNetworkAclEntry(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?DeleteNetworkAcl(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteNetworkAclEntry(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?ReplaceNetworkAclEntry(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?ReplaceNetworkAclAssociation(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1576,7 +1605,7 @@ def control_3_11_ensure_log_metric_nacl(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.12 Ensure a log metric filter and alarm exist for changes to network gateways (Scored) @@ -1593,7 +1622,7 @@ def control_3_12_ensure_log_metric_changes_to_network_gateways(cloudtrails): description = "Ensure a log metric filter and alarm exist for changes to network gateways" scored = True failReason = "Ensure a log metric filter and alarm exist for changes to network gateways" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1603,7 +1632,9 @@ def control_3_12_ensure_log_metric_changes_to_network_gateways(cloudtrails): logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.eventName\s*=\s*\"?CreateCustomerGateway(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteCustomerGateway(\"|\)|\s)", "\$\.eventName\s*=\s*\"?AttachInternetGateway(\"|\)|\s)", "\$\.eventName\s*=\s*\"?CreateInternetGateway(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteInternetGateway(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DetachInternetGateway(\"|\)|\s)"] + patterns = ["\$\.eventName\s*=\s*\"?CreateCustomerGateway(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteCustomerGateway(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?AttachInternetGateway(\"|\)|\s)", "\$\.eventName\s*=\s*\"?CreateInternetGateway(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?DeleteInternetGateway(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DetachInternetGateway(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1619,7 +1650,7 @@ def control_3_12_ensure_log_metric_changes_to_network_gateways(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.13 Ensure a log metric filter and alarm exist for route table changes (Scored) @@ -1636,7 +1667,7 @@ def control_3_13_ensure_log_metric_changes_to_route_tables(cloudtrails): description = "Ensure a log metric filter and alarm exist for route table changes" scored = True failReason = "Ensure a log metric filter and alarm exist for route table changes" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1646,7 +1677,10 @@ def control_3_13_ensure_log_metric_changes_to_route_tables(cloudtrails): logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.eventName\s*=\s*\"?CreateRoute(\"|\)|\s)", "\$\.eventName\s*=\s*\"?CreateRouteTable(\"|\)|\s)", "\$\.eventName\s*=\s*\"?ReplaceRoute(\"|\)|\s)", "\$\.eventName\s*=\s*\"?ReplaceRouteTableAssociation(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteRouteTable(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteRoute(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DisassociateRouteTable(\"|\)|\s)"] + patterns = ["\$\.eventName\s*=\s*\"?CreateRoute(\"|\)|\s)", "\$\.eventName\s*=\s*\"?CreateRouteTable(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?ReplaceRoute(\"|\)|\s)", "\$\.eventName\s*=\s*\"?ReplaceRouteTableAssociation(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?DeleteRouteTable(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteRoute(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?DisassociateRouteTable(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1662,7 +1696,7 @@ def control_3_13_ensure_log_metric_changes_to_route_tables(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.14 Ensure a log metric filter and alarm exist for VPC changes (Scored) @@ -1679,7 +1713,7 @@ def control_3_14_ensure_log_metric_changes_to_vpc(cloudtrails): description = "Ensure a log metric filter and alarm exist for VPC changes" scored = True failReason = "Ensure a log metric filter and alarm exist for VPC changes" - for m, n in cloudtrails.iteritems(): + for m, n in cloudtrails.items(): for o in n: try: if o['CloudWatchLogsLogGroupArn']: @@ -1689,7 +1723,12 @@ def control_3_14_ensure_log_metric_changes_to_vpc(cloudtrails): logGroupName=group ) for p in filters['metricFilters']: - patterns = ["\$\.eventName\s*=\s*\"?CreateVpc(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteVpc(\"|\)|\s)", "\$\.eventName\s*=\s*\"?ModifyVpcAttribute(\"|\)|\s)", "\$\.eventName\s*=\s*\"?AcceptVpcPeeringConnection(\"|\)|\s)", "\$\.eventName\s*=\s*\"?CreateVpcPeeringConnection(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteVpcPeeringConnection(\"|\)|\s)", "\$\.eventName\s*=\s*\"?RejectVpcPeeringConnection(\"|\)|\s)", "\$\.eventName\s*=\s*\"?AttachClassicLinkVpc(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DetachClassicLinkVpc(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DisableVpcClassicLink(\"|\)|\s)", "\$\.eventName\s*=\s*\"?EnableVpcClassicLink(\"|\)|\s)"] + patterns = ["\$\.eventName\s*=\s*\"?CreateVpc(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteVpc(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?ModifyVpcAttribute(\"|\)|\s)", "\$\.eventName\s*=\s*\"?AcceptVpcPeeringConnection(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?CreateVpcPeeringConnection(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DeleteVpcPeeringConnection(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?RejectVpcPeeringConnection(\"|\)|\s)", "\$\.eventName\s*=\s*\"?AttachClassicLinkVpc(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?DetachClassicLinkVpc(\"|\)|\s)", "\$\.eventName\s*=\s*\"?DisableVpcClassicLink(\"|\)|\s)", + "\$\.eventName\s*=\s*\"?EnableVpcClassicLink(\"|\)|\s)"] if find_in_string(patterns, str(p['filterPattern'])): cwclient = boto3.client('cloudwatch', region_name=m) response = cwclient.describe_alarms_for_metric( @@ -1705,24 +1744,7 @@ def control_3_14_ensure_log_metric_changes_to_vpc(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} - - -# 3.15 Ensure appropriate subscribers to each SNS topic (Not Scored) -def control_3_15_verify_sns_subscribers(): - """Summary - - Returns: - TYPE: Description - """ - result = "Manual" - failReason = "" - offenders = [] - control = "3.15" - description = "Ensure appropriate subscribers to each SNS topic, please verify manually" - scored = False - failReason = "Control not implemented using API, please verify manually" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # --- Networking --- @@ -1756,7 +1778,7 @@ def control_4_1_ensure_ssh_not_open_to_world(regions): result = False failReason = "Found Security Group with port 22 open to the world (0.0.0.0/0)" offenders.append(str(n) + " : " + str(m['GroupId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 4.2 Ensure no security groups allow ingress from 0.0.0.0/0 to port 3389 (Scored) @@ -1788,11 +1810,11 @@ def control_4_2_ensure_rdp_not_open_to_world(regions): result = False failReason = "Found Security Group with port 3389 open to the world (0.0.0.0/0)" offenders.append(str(n) + " : " + str(m['GroupId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except -# 4.3 Ensure VPC flow logging is enabled in all VPCs (Scored) -def control_4_3_ensure_flow_logs_enabled_on_all_vpc(regions): +# 4.3 Ensure the default security group of every VPC restricts all traffic (Scored) +def control_4_3_ensure_default_security_groups_restricts_traffic(regions): """Summary Returns: @@ -1802,46 +1824,6 @@ def control_4_3_ensure_flow_logs_enabled_on_all_vpc(regions): failReason = "" offenders = [] control = "4.3" - description = "Ensure VPC flow logging is enabled in all VPCs" - scored = True - for n in regions: - client = boto3.client('ec2', region_name=n) - flowlogs = client.describe_flow_logs( - # No paginator support in boto atm. - ) - activeLogs = [] - for m in flowlogs['FlowLogs']: - if "vpc-" in str(m['ResourceId']): - activeLogs.append(m['ResourceId']) - vpcs = client.describe_vpcs( - Filters=[ - { - 'Name': 'state', - 'Values': [ - 'available', - ] - }, - ] - ) - for m in vpcs['Vpcs']: - if not str(m['VpcId']) in str(activeLogs): - result = False - failReason = "VPC without active VPC Flow Logs found" - offenders.append(str(n) + " : " + str(m['VpcId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} - - -# 4.4 Ensure the default security group of every VPC restricts all traffic (Scored) -def control_4_4_ensure_default_security_groups_restricts_traffic(regions): - """Summary - - Returns: - TYPE: Description - """ - result = True - failReason = "" - offenders = [] - control = "4.4" description = "Ensure the default security group of every VPC restricts all traffic" scored = True for n in regions: @@ -1861,11 +1843,11 @@ def control_4_4_ensure_default_security_groups_restricts_traffic(regions): result = False failReason = "Default security groups with ingress or egress rules discovered" offenders.append(str(n) + " : " + str(m['GroupId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except -# 4.5 Ensure routing tables for VPC peering are "least access" (Not Scored) -def control_4_5_ensure_route_tables_are_least_access(regions): +# 4.4 Ensure routing tables for VPC peering are "least access" (Not Scored) +def control_4_4_ensure_route_tables_are_least_access(regions): """Summary Returns: @@ -1874,36 +1856,7 @@ def control_4_5_ensure_route_tables_are_least_access(regions): result = True failReason = "" offenders = [] - control = "4.5" - description = "Ensure routing tables for VPC peering are least access" - scored = False - for n in regions: - client = boto3.client('ec2', region_name=n) - response = client.describe_route_tables() - for m in response['RouteTables']: - for o in m['Routes']: - try: - if o['VpcPeeringConnectionId']: - if int(str(o['DestinationCidrBlock']).split("/", 1)[1]) < 24: - result = False - failReason = "Large CIDR block routed to peer discovered, please investigate" - offenders.append(str(n) + " : " + str(m['RouteTableId'])) - except: - pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} - - -# 4.5 Ensure routing tables for VPC peering are "least access" (Not Scored) -def control_4_5_ensure_route_tables_are_least_access(regions): - """Summary - - Returns: - TYPE: Description - """ - result = True - failReason = "" - offenders = [] - control = "4.5" + control = "4.4" description = "Ensure routing tables for VPC peering are least access" scored = False for n in regions: @@ -1919,7 +1872,7 @@ def control_4_5_ensure_route_tables_are_least_access(regions): offenders.append(str(n) + " : " + str(m['RouteTableId'])) except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} + return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # --- Central functions --- @@ -1943,7 +1896,7 @@ def get_cred_report(): return status response = IAM_CLIENT.get_credential_report() report = [] - reader = csv.DictReader(response['Content'].splitlines(), delimiter=',') + reader = csv.DictReader(response['Content'].decode("UTF-8").splitlines(), delimiter=',') for row in reader: report.append(row) @@ -2086,12 +2039,20 @@ def json2html(controlResult, account): """ table = [] shortReport = shortAnnotation(controlResult) - table.append("\n\n\n\n\n

AWS CIS Foundation Framework

\n
") + table.append("\n\n\n\n\n

AWS CIS Foundation Framework

\n\ +
") table.append("") table.append("") table.append("") table.append("") - table.append("") + table.append("") table.append("
Account: " + account + "
Report date: " + time.strftime("%c") + "
Benchmark version: " + AWS_CIS_BENCHMARK_VERSION + "
Whitepaper location: https://d0.awsstatic.com/whitepapers/compliance/AWS_CIS_Foundations_Benchmark.pdf
Whitepaper location: https://d0.awsstatic.com/whitepapers/compliance/AWS_CIS_Foundations_Benchmark.pdf
" + shortReport + "


") tableHeadOuter = "" tableHeadInner = "
" @@ -2133,11 +2094,11 @@ def s3report(htmlReport, account): reportName = "cis_report_" + str(account) + "_" + str(datetime.now().strftime('%Y%m%d_%H%M')) + ".html" else: reportName = "cis_report.html" - with tempfile.NamedTemporaryFile(delete=False) as f: + with tempfile.NamedTemporaryFile(delete=False, mode="w") as f: for item in htmlReport: f.write(item) f.flush() - try: + try: f.close() S3_CLIENT.upload_file(f.name, S3_WEB_REPORT_BUCKET, reportName) os.unlink(f.name) @@ -2281,14 +2242,12 @@ def lambda_handler(event, context): control1.append(control_1_14_root_hardware_mfa_enabled()) control1.append(control_1_15_security_questions_registered()) control1.append(control_1_16_no_policies_on_iam_users()) - control1.append(control_1_17_detailed_billing_enabled()) - control1.append(control_1_18_ensure_iam_master_and_manager_roles()) - control1.append(control_1_19_maintain_current_contact_details()) - control1.append(control_1_20_ensure_security_contact_details()) - control1.append(control_1_21_ensure_iam_instance_roles_used()) - control1.append(control_1_22_ensure_incident_management_roles()) - control1.append(control_1_23_no_active_initial_access_keys_with_iam_user(cred_report)) - control1.append(control_1_24_no_overly_permissive_policies()) + control1.append(control_1_17_maintain_current_contact_details()) + control1.append(control_1_18_ensure_security_contact_details()) + control1.append(control_1_19_ensure_iam_instance_roles_used()) + control1.append(control_1_20_ensure_incident_management_roles()) + control1.append(control_1_21_no_active_initial_access_keys_with_iam_user(cred_report)) + control1.append(control_1_22_no_overly_permissive_policies()) control2 = [] control2.append(control_2_1_ensure_cloud_trail_all_regions(cloud_trails)) @@ -2299,6 +2258,7 @@ def lambda_handler(event, context): control2.append(control_2_6_ensure_cloudtrail_bucket_logging(cloud_trails)) control2.append(control_2_7_ensure_cloudtrail_encryption_kms(cloud_trails)) control2.append(control_2_8_ensure_kms_cmk_rotation(region_list)) + control2.append(control_2_9_ensure_flow_logs_enabled_on_all_vpc(region_list)) control3 = [] control3.append(control_3_1_ensure_log_metric_filter_unauthorized_api_calls(cloud_trails)) @@ -2315,14 +2275,12 @@ def lambda_handler(event, context): control3.append(control_3_12_ensure_log_metric_changes_to_network_gateways(cloud_trails)) control3.append(control_3_13_ensure_log_metric_changes_to_route_tables(cloud_trails)) control3.append(control_3_14_ensure_log_metric_changes_to_vpc(cloud_trails)) - control3.append(control_3_15_verify_sns_subscribers()) control4 = [] control4.append(control_4_1_ensure_ssh_not_open_to_world(region_list)) control4.append(control_4_2_ensure_rdp_not_open_to_world(region_list)) - control4.append(control_4_3_ensure_flow_logs_enabled_on_all_vpc(region_list)) - control4.append(control_4_4_ensure_default_security_groups_restricts_traffic(region_list)) - control4.append(control_4_5_ensure_route_tables_are_least_access(region_list)) + control4.append(control_4_3_ensure_default_security_groups_restricts_traffic(region_list)) + control4.append(control_4_4_ensure_route_tables_are_least_access(region_list)) # Join results controls = [] @@ -2400,4 +2358,4 @@ def lambda_handler(event, context): boto3.setup_default_session(region_name='us-east-1') else: boto3.setup_default_session(profile_name=profile_name, region_name='us-east-1') - lambda_handler("test", "test") + lambda_handler("test", "test") \ No newline at end of file From 5d6d98fdc4c97b7953dc4cd32b037eb65fa53ebe Mon Sep 17 00:00:00 2001 From: Konstantin Krastev Date: Tue, 19 Feb 2019 21:36:05 +0200 Subject: [PATCH 2/7] style optimization logging multiprocessing per control set json report to s3 bucket --- .../aws-cis-foundation-benchmark-checklist.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py b/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py index 5bb2bd5..b041963 100644 --- a/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py +++ b/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py @@ -37,7 +37,7 @@ # Where should the report be delivered to? # Make sure to update permissions for the Lambda role if you change bucket name. -S3_WEB_REPORT_BUCKET = "CHANGE_ME_TO_YOUR_S3_BUCKET" +S3_REPORT_BUCKET = "CHANGE_ME_TO_YOUR_S3_BUCKET" # Create separate report files? # This will add date and account number as prefix. Example: cis_report_111111111111_161220_1213.html @@ -2100,7 +2100,7 @@ def s3report(htmlReport, account): f.flush() try: f.close() - S3_CLIENT.upload_file(f.name, S3_WEB_REPORT_BUCKET, reportName) + S3_CLIENT.upload_file(f.name, S3_REPORT_BUCKET, reportName) os.unlink(f.name) except Exception as e: return "Failed to upload report to S3 because: " + str(e) @@ -2108,7 +2108,7 @@ def s3report(htmlReport, account): signedURL = S3_CLIENT.generate_presigned_url( 'get_object', Params={ - 'Bucket': S3_WEB_REPORT_BUCKET, + 'Bucket': S3_REPORT_BUCKET, 'Key': reportName }, ExpiresIn=ttl) From 646f17c2f931f183159b968335c0ac32107cb791 Mon Sep 17 00:00:00 2001 From: Konstantin Krastev Date: Tue, 19 Feb 2019 21:40:44 +0200 Subject: [PATCH 3/7] style optimization logging multiprocessing per control set json report to s3 bucket --- .../aws-cis-foundation-benchmark-checklist.py | 1086 ++++++++++------- 1 file changed, 631 insertions(+), 455 deletions(-) diff --git a/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py b/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py index b041963..407b952 100644 --- a/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py +++ b/aws_cis_foundation_framework/aws-cis-foundation-benchmark-checklist.py @@ -7,23 +7,25 @@ IAM_CLIENT (TYPE): Description REGIONS (list): Description S3_WEB_REPORT (bool): Description - S3_WEB_REPORT_BUCKET (str): Description + S3_REPORT_BUCKET (str): Description S3_WEB_REPORT_EXPIRE (str): Description S3_WEB_REPORT_OBFUSCATE_ACCOUNT (bool): Description SCRIPT_OUTPUT_JSON (bool): Description """ -from __future__ import print_function import json import csv import time import sys import re -import tempfile import getopt +from io import BytesIO, StringIO import os from datetime import datetime import boto3 +from botocore.client import Config +import logging +from multiprocessing.pool import ThreadPool # --- Script controls --- @@ -31,18 +33,23 @@ # CIS Benchmark version referenced. Only used in web report. AWS_CIS_BENCHMARK_VERSION = "1.2" -# Would you like a HTML file generated with the result? -# This file will be delivered using a signed URL. +# Would you like to upload reports to S3 bucket ? +# Files will be delivered using a signed URL. S3_WEB_REPORT = True # Where should the report be delivered to? # Make sure to update permissions for the Lambda role if you change bucket name. -S3_REPORT_BUCKET = "CHANGE_ME_TO_YOUR_S3_BUCKET" +S3_REPORT_BUCKET = None # Create separate report files? # This will add date and account number as prefix. Example: cis_report_111111111111_161220_1213.html S3_WEB_REPORT_NAME_DETAILS = True +# Create separate report files? +# This will add date and account number as prefix. Example: cis_report_111111111111_161220_1213.json +S3_JSON_NAME_DETAILS = True + + # How many hours should the report be available? Default = 168h/7days S3_WEB_REPORT_EXPIRE = "168" @@ -75,25 +82,48 @@ CONTROL_1_1_DAYS = 0 +# Log to stdout +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +streamformater = logging.Formatter("[%(levelname)s] %(message)s") + +logstreamhandler = logging.StreamHandler() +logstreamhandler.setLevel(logging.INFO) +logstreamhandler.setFormatter(streamformater) +logger.addHandler(logstreamhandler) + + # --- Global --- IAM_CLIENT = boto3.client('iam') -S3_CLIENT = boto3.client('s3') +S3_CLIENT = boto3.client('s3', config=Config(s3={'addressing_style': 'path'}, signature_version='s3v4')) +def time_decorator(original_func): + + def wrapper(*args, **kwargs): + start = time.time() + result = original_func(*args, **kwargs) + end = time.time() + logger.info('{} is executed in {:.2f} seconds.'.format(original_func.__name__, end-start)) + return result + return wrapper # --- 1 Identity and Access Management --- # 1.1 Avoid the use of the "root" account (Scored) -def control_1_1_root_use(credreport): +@time_decorator +def control_1_1_root_use(resource): """Summary Args: - credreport (TYPE): Description + resource (TYPE): Description Returns: TYPE: Description """ + + credreport = resource['credreport'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.1" description = "Avoid the use of the root account" @@ -105,41 +135,43 @@ def control_1_1_root_use(credreport): frm = "%Y-%m-%dT%H:%M:%S+00:00" try: - pwdDelta = (datetime.strptime(now, frm) - datetime.strptime(credreport[0]['password_last_used'], frm)) - if (pwdDelta.days == CONTROL_1_1_DAYS) & (pwdDelta.seconds > 0): # Used within last 24h - failReason = "Used within 24h" + pwd_delta = (datetime.strptime(now, frm) - datetime.strptime(credreport[0]['password_last_used'], frm)) + if (pwd_delta.days == CONTROL_1_1_DAYS) & (pwd_delta.seconds > 0): # Used within last 24h + fail_reason = "Used within 24h" result = False except: if credreport[0]['password_last_used'] == "N/A" or "no_information": pass else: - print("Something went wrong") + logger.error("Something went wrong") try: - key1Delta = (datetime.strptime(now, frm) - datetime.strptime(credreport[0]['access_key_1_last_used_date'], frm)) - if (key1Delta.days == CONTROL_1_1_DAYS) & (key1Delta.seconds > 0): # Used within last 24h - failReason = "Used within 24h" + key1_delta = (datetime.strptime(now, frm) - datetime.strptime(credreport[0]['access_key_1_last_used_date'], frm)) + if (key1_delta.days == CONTROL_1_1_DAYS) & (key1_delta.seconds > 0): # Used within last 24h + fail_reason = "Used within 24h" result = False except: if credreport[0]['access_key_1_last_used_date'] == "N/A" or "no_information": pass else: - print("Something went wrong") + logger.error("Something went wrong") try: - key2Delta = datetime.strptime(now, frm) - datetime.strptime(credreport[0]['access_key_2_last_used_date'], frm) - if (key2Delta.days == CONTROL_1_1_DAYS) & (key2Delta.seconds > 0): # Used within last 24h - failReason = "Used within 24h" + key2_delta = datetime.strptime(now, frm) - datetime.strptime(credreport[0]['access_key_2_last_used_date'], frm) + if (key2_delta.days == CONTROL_1_1_DAYS) & (key2_delta.seconds > 0): # Used within last 24h + fail_reason = "Used within 24h" result = False except: if credreport[0]['access_key_2_last_used_date'] == "N/A" or "no_information": pass else: - print("Something went wrong") - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except #pylint: disable=broad-except + logger.error("Something went wrong") + + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except #pylint: disable=broad-except # 1.2 Ensure multi-factor authentication (MFA) is enabled for all IAM users that have a console password (Scored) -def control_1_2_mfa_on_password_enabled_iam(credreport): +@time_decorator +def control_1_2_mfa_on_password_enabled_iam(resource): """Summary Args: @@ -148,8 +180,10 @@ def control_1_2_mfa_on_password_enabled_iam(credreport): Returns: TYPE: Description """ + + credreport = resource['credreport'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.2" description = "Ensure multi-factor authentication (MFA) is enabled for all IAM users that have a console password" @@ -160,13 +194,14 @@ def control_1_2_mfa_on_password_enabled_iam(credreport): # Verify if password users have MFA assigned if credreport[i]['mfa_active'] == "false": result = False - failReason = "No MFA on users with password. " + fail_reason = "No MFA on users with password. " offenders.append(str(credreport[i]['arn'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.3 Ensure credentials unused for 90 days or greater are disabled (Scored) -def control_1_3_unused_credentials(credreport): +@time_decorator +def control_1_3_unused_credentials(resource): """Summary Args: @@ -175,8 +210,9 @@ def control_1_3_unused_credentials(credreport): Returns: TYPE: Description """ + credreport = resource['credreport'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.3" description = "Ensure credentials unused for 90 days or greater are disabled" @@ -193,7 +229,7 @@ def control_1_3_unused_credentials(credreport): # Verify password have been used in the last 90 days if delta.days > 90: result = False - failReason = "Credentials unused > 90 days detected. " + fail_reason = "Credentials unused > 90 days detected. " offenders.append(str(credreport[i]['arn']) + ":password") except: pass # Never used @@ -203,7 +239,7 @@ def control_1_3_unused_credentials(credreport): # Verify password have been used in the last 90 days if delta.days > 90: result = False - failReason = "Credentials unused > 90 days detected. " + fail_reason = "Credentials unused > 90 days detected. " offenders.append(str(credreport[i]['arn']) + ":key1") except: pass @@ -213,16 +249,17 @@ def control_1_3_unused_credentials(credreport): # Verify password have been used in the last 90 days if delta.days > 90: result = False - failReason = "Credentials unused > 90 days detected. " + fail_reason = "Credentials unused > 90 days detected. " offenders.append(str(credreport[i]['arn']) + ":key2") except: # Never used pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.4 Ensure access keys are rotated every 90 days or less (Scored) -def control_1_4_rotated_keys(credreport): +@time_decorator +def control_1_4_rotated_keys(resource): """Summary Args: @@ -231,8 +268,9 @@ def control_1_4_rotated_keys(credreport): Returns: TYPE: Description """ + credreport = resource['credreport'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.4" description = "Ensure access keys are rotated every 90 days or less" @@ -249,7 +287,7 @@ def control_1_4_rotated_keys(credreport): # Verify keys have rotated in the last 90 days if delta.days > 90: result = False - failReason = "Key rotation >90 days or not used since rotation" + fail_reason = "Key rotation >90 days or not used since rotation" offenders.append(str(credreport[i]['arn']) + ":unrotated key1") except: pass @@ -259,7 +297,7 @@ def control_1_4_rotated_keys(credreport): # Verify keys have been used since rotation. if last_used_datetime < last_rotated_datetime: result = False - failReason = "Key rotation >90 days or not used since rotation" + fail_reason = "Key rotation >90 days or not used since rotation" offenders.append(str(credreport[i]['arn']) + ":unused key1") except: pass @@ -269,7 +307,7 @@ def control_1_4_rotated_keys(credreport): # Verify keys have rotated in the last 90 days if delta.days > 90: result = False - failReason = "Key rotation >90 days or not used since rotation" + fail_reason = "Key rotation >90 days or not used since rotation" offenders.append(str(credreport[i]['arn']) + ":unrotated key2") except: pass @@ -279,15 +317,16 @@ def control_1_4_rotated_keys(credreport): # Verify keys have been used since rotation. if last_used_datetime < last_rotated_datetime: result = False - failReason = "Key rotation >90 days or not used since rotation" + fail_reason = "Key rotation >90 days or not used since rotation" offenders.append(str(credreport[i]['arn']) + ":unused key2") except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.5 Ensure IAM password policy requires at least one uppercase letter (Scored) -def control_1_5_password_policy_uppercase(passwordpolicy): +@time_decorator +def control_1_5_password_policy_uppercase(resource): """Summary Args: @@ -296,24 +335,26 @@ def control_1_5_password_policy_uppercase(passwordpolicy): Returns: TYPE: Description """ + passwordpolicy = resource['passwordpolicy'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.5" description = "Ensure IAM password policy requires at least one uppercase letter" scored = True if passwordpolicy is False: result = False - failReason = "Account does not have a IAM password policy." + fail_reason = "Account does not have a IAM password policy." else: if passwordpolicy['RequireUppercaseCharacters'] is False: result = False - failReason = "Password policy does not require at least one uppercase letter" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Password policy does not require at least one uppercase letter" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.6 Ensure IAM password policy requires at least one lowercase letter (Scored) -def control_1_6_password_policy_lowercase(passwordpolicy): +@time_decorator +def control_1_6_password_policy_lowercase(resource): """Summary Args: @@ -322,24 +363,26 @@ def control_1_6_password_policy_lowercase(passwordpolicy): Returns: TYPE: Description """ + passwordpolicy = resource['passwordpolicy'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.6" description = "Ensure IAM password policy requires at least one lowercase letter" scored = True if passwordpolicy is False: result = False - failReason = "Account does not have a IAM password policy." + fail_reason = "Account does not have a IAM password policy." else: if passwordpolicy['RequireLowercaseCharacters'] is False: result = False - failReason = "Password policy does not require at least one uppercase letter" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Password policy does not require at least one uppercase letter" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.7 Ensure IAM password policy requires at least one symbol (Scored) -def control_1_7_password_policy_symbol(passwordpolicy): +@time_decorator +def control_1_7_password_policy_symbol(resource): """Summary Args: @@ -348,24 +391,26 @@ def control_1_7_password_policy_symbol(passwordpolicy): Returns: TYPE: Description """ + passwordpolicy = resource['passwordpolicy'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.7" description = "Ensure IAM password policy requires at least one symbol" scored = True if passwordpolicy is False: result = False - failReason = "Account does not have a IAM password policy." + fail_reason = "Account does not have a IAM password policy." else: if passwordpolicy['RequireSymbols'] is False: result = False - failReason = "Password policy does not require at least one symbol" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Password policy does not require at least one symbol" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.8 Ensure IAM password policy requires at least one number (Scored) -def control_1_8_password_policy_number(passwordpolicy): +@time_decorator +def control_1_8_password_policy_number(resource): """Summary Args: @@ -374,24 +419,26 @@ def control_1_8_password_policy_number(passwordpolicy): Returns: TYPE: Description """ + passwordpolicy = resource['passwordpolicy'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.8" description = "Ensure IAM password policy requires at least one number" scored = True if passwordpolicy is False: result = False - failReason = "Account does not have a IAM password policy." + fail_reason = "Account does not have a IAM password policy." else: if passwordpolicy['RequireNumbers'] is False: result = False - failReason = "Password policy does not require at least one number" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Password policy does not require at least one number" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.9 Ensure IAM password policy requires minimum length of 14 or greater (Scored) -def control_1_9_password_policy_length(passwordpolicy): +@time_decorator +def control_1_9_password_policy_length(resource): """Summary Args: @@ -400,24 +447,26 @@ def control_1_9_password_policy_length(passwordpolicy): Returns: TYPE: Description """ + passwordpolicy = resource['passwordpolicy'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.9" description = "Ensure IAM password policy requires minimum length of 14 or greater" scored = True if passwordpolicy is False: result = False - failReason = "Account does not have a IAM password policy." + fail_reason = "Account does not have a IAM password policy." else: if passwordpolicy['MinimumPasswordLength'] < 14: result = False - failReason = "Password policy does not require at least 14 characters" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Password policy does not require at least 14 characters" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.10 Ensure IAM password policy prevents password reuse (Scored) -def control_1_10_password_policy_reuse(passwordpolicy): +@time_decorator +def control_1_10_password_policy_reuse(resource): """Summary Args: @@ -426,30 +475,32 @@ def control_1_10_password_policy_reuse(passwordpolicy): Returns: TYPE: Description """ + passwordpolicy = resource['passwordpolicy'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.10" description = "Ensure IAM password policy prevents password reuse" scored = True if passwordpolicy is False: result = False - failReason = "Account does not have a IAM password policy." + fail_reason = "Account does not have a IAM password policy." else: try: if passwordpolicy['PasswordReusePrevention'] == 24: pass else: result = False - failReason = "Password policy does not prevent reusing last 24 passwords" + fail_reason = "Password policy does not prevent reusing last 24 passwords" except: result = False - failReason = "Password policy does not prevent reusing last 24 passwords" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Password policy does not prevent reusing last 24 passwords" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.11 Ensure IAM password policy expires passwords within 90 days or less (Scored) -def control_1_11_password_policy_expire(passwordpolicy): +@time_decorator +def control_1_11_password_policy_expire(resource): """Summary Args: @@ -458,28 +509,30 @@ def control_1_11_password_policy_expire(passwordpolicy): Returns: TYPE: Description """ + passwordpolicy = resource['passwordpolicy'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.11" description = "Ensure IAM password policy expires passwords within 90 days or less" scored = True if passwordpolicy is False: result = False - failReason = "Account does not have a IAM password policy." + fail_reason = "Account does not have a IAM password policy." else: if passwordpolicy['ExpirePasswords'] is True: if 0 < passwordpolicy['MaxPasswordAge'] > 90: result = False - failReason = "Password policy does not expire passwords after 90 days or less" + fail_reason = "Password policy does not expire passwords after 90 days or less" else: result = False - failReason = "Password policy does not expire passwords after 90 days or less" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Password policy does not expire passwords after 90 days or less" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.12 Ensure no root account access key exists (Scored) -def control_1_12_root_key_exists(credreport): +@time_decorator +def control_1_12_root_key_exists(resource): """Summary Args: @@ -488,27 +541,30 @@ def control_1_12_root_key_exists(credreport): Returns: TYPE: Description """ + credreport = resource['credreport'] + result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.12" description = "Ensure no root account access key exists" scored = True if (credreport[0]['access_key_1_active'] == "true") or (credreport[0]['access_key_2_active'] == "true"): result = False - failReason = "Root have active access keys" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Root have active access keys" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.13 Ensure MFA is enabled for the "root" account (Scored) -def control_1_13_root_mfa_enabled(): +@time_decorator +def control_1_13_root_mfa_enabled(resource): """Summary Returns: TYPE: Description """ result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.13" description = "Ensure MFA is enabled for the root account" @@ -516,19 +572,20 @@ def control_1_13_root_mfa_enabled(): response = IAM_CLIENT.get_account_summary() if response['SummaryMap']['AccountMFAEnabled'] != 1: result = False - failReason = "Root account not using MFA" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Root account not using MFA" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.14 Ensure hardware MFA is enabled for the "root" account (Scored) -def control_1_14_root_hardware_mfa_enabled(): +@time_decorator +def control_1_14_root_hardware_mfa_enabled(resource): """Summary Returns: TYPE: Description """ result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.14" description = "Ensure hardware MFA is enabled for the root account" @@ -545,40 +602,42 @@ def control_1_14_root_hardware_mfa_enabled(): for n in page['VirtualMFADevices']: pagedResult.append(n) if "mfa/root-account-mfa-device" in str(pagedResult): - failReason = "Root account not using hardware MFA" + fail_reason = "Root account not using hardware MFA" result = False else: result = False - failReason = "Root account not using MFA" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Root account not using MFA" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.15 Ensure security questions are registered in the AWS account (Not Scored/Manual) -def control_1_15_security_questions_registered(): +@time_decorator +def control_1_15_security_questions_registered(resource): """Summary Returns: TYPE: Description """ result = "Manual" - failReason = "" + fail_reason = "" offenders = [] control = "1.15" description = "Ensure security questions are registered in the AWS account, please verify manually" scored = False - failReason = "Control not implemented using API, please verify manually" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Control not implemented using API, please verify manually" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.16 Ensure IAM policies are attached only to groups or roles (Scored) -def control_1_16_no_policies_on_iam_users(): +@time_decorator +def control_1_16_no_policies_on_iam_users(resource): """Summary Returns: TYPE: Description """ result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.16" description = "Ensure IAM policies are attached only to groups or roles" @@ -597,58 +656,60 @@ def control_1_16_no_policies_on_iam_users(): ) if policies['PolicyNames'] != []: result = False - failReason = "IAM user have inline policy attached" + fail_reason = "IAM user have inline policy attached" offenders.append(str(n['Arn'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.17 Maintain current contact details (not Scored) -def control_1_17_maintain_current_contact_details(): +@time_decorator +def control_1_17_maintain_current_contact_details(resource): """Summary Returns: TYPE: Description """ result = "Manual" - failReason = "" + fail_reason = "" offenders = [] control = "1.17" description = "Maintain current contact details, please verify manually" scored = False - failReason = "Control not implemented using API, please verify manually" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Control not implemented using API, please verify manually" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.18 Ensure security contact information is registered (not Scored) -def control_1_18_ensure_security_contact_details(): +@time_decorator +def control_1_18_ensure_security_contact_details(resource): """Summary Returns: TYPE: Description """ result = "Manual" - failReason = "" + fail_reason = "" offenders = [] control = "1.18" description = "Ensure security contact information is registered, please verify manually" scored = False - failReason = "Control not implemented using API, please verify manually" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Control not implemented using API, please verify manually" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.19 Ensure IAM instance roles are used for AWS resource access from instances (Scored) -def control_1_19_ensure_iam_instance_roles_used(): +@time_decorator +def control_1_19_ensure_iam_instance_roles_used(resource): """Summary Returns: TYPE: Description """ result = True - failReason = "" offenders = [] control = "1.19" description = "Ensure IAM instance roles are used for AWS resource access from instances, application code is not audited" scored = True - failReason = "Instance not assigned IAM role for EC2" + fail_reason = "Instance not assigned IAM role for EC2" client = boto3.client('ec2') response = client.describe_instances() offenders = [] @@ -659,18 +720,19 @@ def control_1_19_ensure_iam_instance_roles_used(): except: result = False offenders.append(str(response['Reservations'][n]['Instances'][0]['InstanceId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.20 Ensure a support role has been created to manage incidents with AWS Support (Scored) -def control_1_20_ensure_incident_management_roles(): +@time_decorator +def control_1_20_ensure_incident_management_roles(resource): """Summary Returns: TYPE: Description """ result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.20" description = "Ensure a support role has been created to manage incidents with AWS Support" @@ -682,22 +744,24 @@ def control_1_20_ensure_incident_management_roles(): ) if (len(response['PolicyGroups']) + len(response['PolicyUsers']) + len(response['PolicyRoles'])) == 0: result = False - failReason = "No user, group or role assigned AWSSupportAccess" + fail_reason = "No user, group or role assigned AWSSupportAccess" except: result = False - failReason = "AWSSupportAccess policy not created" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "AWSSupportAccess policy not created" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.21 Do not setup access keys during initial user setup for all IAM users that have a console password (Not Scored) -def control_1_21_no_active_initial_access_keys_with_iam_user(credreport): +@time_decorator +def control_1_21_no_active_initial_access_keys_with_iam_user(resource): """Summary Returns: TYPE: Description """ + credreport = resource['credreport'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.21" description = "Do not setup access keys during initial user setup for all IAM users that have a console password" @@ -711,20 +775,21 @@ def control_1_21_no_active_initial_access_keys_with_iam_user(credreport): for m in response['AccessKeyMetadata']: if re.sub(r"\s", "T", str(m['CreateDate'])) == credreport[n]['user_creation_time']: result = False - failReason = "Users with keys created at user creation time found" + fail_reason = "Users with keys created at user creation time found" offenders.append(str(credreport[n]['arn']) + ":" + str(m['AccessKeyId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 1.22 Ensure IAM policies that allow full "*:*" administrative privileges are not created (Scored) -def control_1_22_no_overly_permissive_policies(): +@time_decorator +def control_1_22_no_overly_permissive_policies(resource): """Summary Returns: TYPE: Description """ result = True - failReason = "" + fail_reason = "" offenders = [] control = "1.22" description = "Ensure IAM policies that allow full administrative privileges are not created" @@ -735,11 +800,11 @@ def control_1_22_no_overly_permissive_policies(): Scope='Local', OnlyAttached=False, ) - pagedResult = [] + paged_result = [] for page in response_iterator: for n in page['Policies']: - pagedResult.append(n) - for m in pagedResult: + paged_result.append(n) + for m in paged_result: policy = IAM_CLIENT.get_policy_version( PolicyArn=m['Arn'], VersionId=m['DefaultVersionId'] @@ -758,15 +823,16 @@ def control_1_22_no_overly_permissive_policies(): if 'Action' in n.keys() and n['Effect'] == 'Allow': if ("'*'" in str(n['Action']) or str(n['Action']) == "*") and ("'*'" in str(n['Resource']) or str(n['Resource']) == "*"): result = False - failReason = "Found full administrative policy" + fail_reason = "Found full administrative policy" offenders.append(str(m['Arn'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # --- 2 Logging --- # 2.1 Ensure CloudTrail is enabled in all regions (Scored) -def control_2_1_ensure_cloud_trail_all_regions(cloudtrails): +@time_decorator +def control_2_1_ensure_cloud_trail_all_regions(resource): """Summary Args: @@ -775,8 +841,9 @@ def control_2_1_ensure_cloud_trail_all_regions(cloudtrails): Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "2.1" description = "Ensure CloudTrail is enabled in all regions" @@ -792,12 +859,13 @@ def control_2_1_ensure_cloud_trail_all_regions(cloudtrails): result = True break if result is False: - failReason = "No enabled multi region trails found" - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "No enabled multi region trails found" + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.2 Ensure CloudTrail log file validation is enabled (Scored) -def control_2_2_ensure_cloudtrail_validation(cloudtrails): +@time_decorator +def control_2_2_ensure_cloudtrail_validation(resource): """Summary Args: @@ -806,8 +874,9 @@ def control_2_2_ensure_cloudtrail_validation(cloudtrails): Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "2.2" description = "Ensure CloudTrail log file validation is enabled" @@ -816,15 +885,16 @@ def control_2_2_ensure_cloudtrail_validation(cloudtrails): for o in n: if o['LogFileValidationEnabled'] is False: result = False - failReason = "CloudTrails without log file validation discovered" + fail_reason = "CloudTrails without log file validation discovered" offenders.append(str(o['TrailARN'])) offenders = set(offenders) offenders = list(offenders) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.3 Ensure the S3 bucket used to store CloudTrail logs is not publicly accessible (Scored) -def control_2_3_ensure_cloudtrail_bucket_not_public(cloudtrails): +@time_decorator +def control_2_3_ensure_cloudtrail_bucket_not_public(resource): """Summary Args: @@ -833,8 +903,9 @@ def control_2_3_ensure_cloudtrail_bucket_not_public(cloudtrails): Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "2.3" description = "Ensure the S3 bucket CloudTrail logs to is not publicly accessible" @@ -850,31 +921,32 @@ def control_2_3_ensure_cloudtrail_bucket_not_public(cloudtrails): if re.search(r'(global/AllUsers|global/AuthenticatedUsers)', str(p['Grantee'])): result = False offenders.append(str(o['TrailARN']) + ":PublicBucket") - if "Publically" not in failReason: - failReason = failReason + "Publically accessible CloudTrail bucket discovered." + if "Publically" not in fail_reason: + fail_reason = fail_reason + "Publically accessible CloudTrail bucket discovered." except Exception as e: result = False if "AccessDenied" in str(e): offenders.append(str(o['TrailARN']) + ":AccessDenied") - if "Missing" not in failReason: - failReason = "Missing permissions to verify bucket ACL. " + failReason + if "Missing" not in fail_reason: + fail_reason = "Missing permissions to verify bucket ACL. " + fail_reason elif "NoSuchBucket" in str(e): offenders.append(str(o['TrailARN']) + ":NoBucket") - if "Trailbucket" not in failReason: - failReason = "Trailbucket doesn't exist. " + failReason + if "Trailbucket" not in fail_reason: + fail_reason = "Trailbucket doesn't exist. " + fail_reason else: offenders.append(str(o['TrailARN']) + ":CannotVerify") - if "Cannot" not in failReason: - failReason = "Cannot verify bucket ACL. " + failReason + if "Cannot" not in fail_reason: + fail_reason = "Cannot verify bucket ACL. " + fail_reason else: result = False offenders.append(str(o['TrailARN']) + "NoS3Logging") - failReason = "Cloudtrail not configured to log to S3. " + failReason - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + fail_reason = "Cloudtrail not configured to log to S3. " + fail_reason + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.4 Ensure CloudTrail trails are integrated with CloudWatch Logs (Scored) -def control_2_4_ensure_cloudtrail_cloudwatch_logs_integration(cloudtrails): +@time_decorator +def control_2_4_ensure_cloudtrail_cloudwatch_logs_integration(resource): """Summary Args: @@ -883,8 +955,9 @@ def control_2_4_ensure_cloudtrail_cloudwatch_logs_integration(cloudtrails): Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "2.4" description = "Ensure CloudTrail trails are integrated with CloudWatch Logs" @@ -896,49 +969,51 @@ def control_2_4_ensure_cloudtrail_cloudwatch_logs_integration(cloudtrails): pass else: result = False - failReason = "CloudTrails without CloudWatch Logs discovered" + fail_reason = "CloudTrails without CloudWatch Logs discovered" offenders.append(str(o['TrailARN'])) except: result = False - failReason = "CloudTrails without CloudWatch Logs discovered" + fail_reason = "CloudTrails without CloudWatch Logs discovered" offenders.append(str(o['TrailARN'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.5 Ensure AWS Config is enabled in all regions (Scored) -def control_2_5_ensure_config_all_regions(regions): +@time_decorator +def control_2_5_ensure_config_all_regions(resource): """Summary Returns: TYPE: Description """ + regions = resource['regions'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "2.5" description = "Ensure AWS Config is enabled in all regions" scored = True - globalConfigCapture = False # Only one region needs to capture global events + global_config_capture = False # Only one region needs to capture global events for n in regions: - configClient = boto3.client('config', region_name=n) - response = configClient.describe_configuration_recorder_status() + config_client = boto3.client('config', region_name=n) + response = config_client.describe_configuration_recorder_status() # Get recording status try: if not response['ConfigurationRecordersStatus'][0]['recording'] is True: result = False - failReason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" + fail_reason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" offenders.append(str(n) + ":NotRecording") except: result = False - failReason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" + fail_reason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" offenders.append(str(n) + ":NotRecording") # Verify that each region is capturing all events - response = configClient.describe_configuration_recorders() + response = config_client.describe_configuration_recorders() try: if not response['ConfigurationRecorders'][0]['recordingGroup']['allSupported'] is True: result = False - failReason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" + fail_reason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" offenders.append(str(n) + ":NotAllEvents") except: pass # This indicates that Config is disabled in the region and will be captured above. @@ -946,37 +1021,38 @@ def control_2_5_ensure_config_all_regions(regions): # Check if region is capturing global events. Fail is verified later since only one region needs to capture them. try: if response['ConfigurationRecorders'][0]['recordingGroup']['includeGlobalResourceTypes'] is True: - globalConfigCapture = True + global_config_capture = True except: pass # Verify the delivery channels - response = configClient.describe_delivery_channel_status() + response = config_client.describe_delivery_channel_status() try: if response['DeliveryChannelsStatus'][0]['configHistoryDeliveryInfo']['lastStatus'] != "SUCCESS": result = False - failReason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" + fail_reason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" offenders.append(str(n) + ":S3orSNSDelivery") except: pass # Will be captured by earlier rule try: if response['DeliveryChannelsStatus'][0]['configStreamDeliveryInfo']['lastStatus'] != "SUCCESS": result = False - failReason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" + fail_reason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" offenders.append(str(n) + ":SNSDelivery") except: pass # Will be captured by earlier rule # Verify that global events is captured by any region - if globalConfigCapture is False: + if global_config_capture is False: result = False - failReason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" + fail_reason = "Config not enabled in all regions, not capturing all/global events or delivery channel errors" offenders.append("Global:NotRecording") - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.6 Ensure S3 bucket access logging is enabled on the CloudTrail S3 bucket (Scored) -def control_2_6_ensure_cloudtrail_bucket_logging(cloudtrails): +@time_decorator +def control_2_6_ensure_cloudtrail_bucket_logging(resource): """Summary Args: @@ -985,8 +1061,9 @@ def control_2_6_ensure_cloudtrail_bucket_logging(cloudtrails): Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "2.6" description = "Ensure S3 bucket access logging is enabled on the CloudTrail S3 bucket" @@ -998,20 +1075,21 @@ def control_2_6_ensure_cloudtrail_bucket_logging(cloudtrails): response = S3_CLIENT.get_bucket_logging(Bucket=o['S3BucketName']) except: result = False - failReason = "Cloudtrail not configured to log to S3. " + fail_reason = "Cloudtrail not configured to log to S3. " offenders.append(str(o['TrailARN'])) try: if response['LoggingEnabled']: pass except: result = False - failReason = failReason + "CloudTrail S3 bucket without logging discovered" + fail_reason = fail_reason + "CloudTrail S3 bucket without logging discovered" offenders.append("Trail:" + str(o['TrailARN']) + " - S3Bucket:" + str(o['S3BucketName'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.7 Ensure CloudTrail logs are encrypted at rest using KMS CMKs (Scored) -def control_2_7_ensure_cloudtrail_encryption_kms(cloudtrails): +@time_decorator +def control_2_7_ensure_cloudtrail_encryption_kms(resource): """Summary Args: @@ -1020,8 +1098,9 @@ def control_2_7_ensure_cloudtrail_encryption_kms(cloudtrails): Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "2.7" description = "Ensure CloudTrail logs are encrypted at rest using KMS CMKs" @@ -1033,20 +1112,22 @@ def control_2_7_ensure_cloudtrail_encryption_kms(cloudtrails): pass except: result = False - failReason = "CloudTrail not using KMS CMK for encryption discovered" + fail_reason = "CloudTrail not using KMS CMK for encryption discovered" offenders.append("Trail:" + str(o['TrailARN'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 2.8 Ensure rotation for customer created CMKs is enabled (Scored) -def control_2_8_ensure_kms_cmk_rotation(regions): +@time_decorator +def control_2_8_ensure_kms_cmk_rotation(resource): """Summary Returns: TYPE: Description """ + regions = resource['regions'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "2.8" description = "Ensure rotation for customer created CMKs is enabled" @@ -1063,23 +1144,25 @@ def control_2_8_ensure_kms_cmk_rotation(regions): keyDescription = kms_client.describe_key(KeyId=n['KeyId']) if "Default master key that protects my" not in str(keyDescription['KeyMetadata']['Description']): # Ignore service keys result = False - failReason = "KMS CMK rotation not enabled" + fail_reason = "KMS CMK rotation not enabled" offenders.append("Key:" + str(keyDescription['KeyMetadata']['Arn'])) except: pass # Ignore keys without permission, for example ACM key - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # moved from 4.3 to 2.9 in v1.2 of CISFB # 2.9 Ensure VPC flow logging is enabled in all VPCs (Scored) -def control_2_9_ensure_flow_logs_enabled_on_all_vpc(regions): +@time_decorator +def control_2_9_ensure_flow_logs_enabled_on_all_vpc(resource): """Summary Returns: TYPE: Description """ + regions = resource['regions'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "2.9" description = "Ensure VPC flow logging is enabled in all VPCs" @@ -1106,27 +1189,29 @@ def control_2_9_ensure_flow_logs_enabled_on_all_vpc(regions): for m in vpcs['Vpcs']: if not str(m['VpcId']) in str(activeLogs): result = False - failReason = "VPC without active VPC Flow Logs found" + fail_reason = "VPC without active VPC Flow Logs found" offenders.append(str(n) + " : " + str(m['VpcId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # --- Monitoring --- # 3.1 Ensure a log metric filter and alarm exist for unauthorized API calls (Scored) -def control_3_1_ensure_log_metric_filter_unauthorized_api_calls(cloudtrails): +@time_decorator +def control_3_1_ensure_log_metric_filter_unauthorized_api_calls(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.1" description = "Ensure log metric filter unauthorized api calls" scored = True - failReason = "Incorrect log metric alerts for unauthorized_api_calls" + fail_reason = "Incorrect log metric alerts for unauthorized_api_calls" for m, n in cloudtrails.items(): for o in n: try: @@ -1144,8 +1229,8 @@ def control_3_1_ensure_log_metric_filter_unauthorized_api_calls(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1153,23 +1238,25 @@ def control_3_1_ensure_log_metric_filter_unauthorized_api_calls(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.2 Ensure a log metric filter and alarm exist for Management Console sign-in without MFA (Scored) -def control_3_2_ensure_log_metric_filter_console_signin_no_mfa(cloudtrails): +@time_decorator +def control_3_2_ensure_log_metric_filter_console_signin_no_mfa(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.2" description = "Ensure a log metric filter and alarm exist for Management Console sign-in without MFA" scored = True - failReason = "Incorrect log metric alerts for management console signin without MFA" + fail_reason = "Incorrect log metric alerts for management console signin without MFA" for m, n in cloudtrails.items(): for o in n: try: @@ -1187,8 +1274,8 @@ def control_3_2_ensure_log_metric_filter_console_signin_no_mfa(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1196,23 +1283,25 @@ def control_3_2_ensure_log_metric_filter_console_signin_no_mfa(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.3 Ensure a log metric filter and alarm exist for usage of "root" account (Scored) -def control_3_3_ensure_log_metric_filter_root_usage(cloudtrails): +@time_decorator +def control_3_3_ensure_log_metric_filter_root_usage(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.3" description = "Ensure a log metric filter and alarm exist for root usage" scored = True - failReason = "Incorrect log metric alerts for root usage" + fail_reason = "Incorrect log metric alerts for root usage" for m, n in cloudtrails.items(): for o in n: try: @@ -1231,8 +1320,8 @@ def control_3_3_ensure_log_metric_filter_root_usage(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1240,23 +1329,25 @@ def control_3_3_ensure_log_metric_filter_root_usage(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.4 Ensure a log metric filter and alarm exist for IAM policy changes (Scored) -def control_3_4_ensure_log_metric_iam_policy_change(cloudtrails): +@time_decorator +def control_3_4_ensure_log_metric_iam_policy_change(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.4" description = "Ensure a log metric filter and alarm exist for IAM changes" scored = True - failReason = "Incorrect log metric alerts for IAM policy changes" + fail_reason = "Incorrect log metric alerts for IAM policy changes" for m, n in cloudtrails.items(): for o in n: try: @@ -1281,8 +1372,8 @@ def control_3_4_ensure_log_metric_iam_policy_change(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1290,23 +1381,25 @@ def control_3_4_ensure_log_metric_iam_policy_change(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.5 Ensure a log metric filter and alarm exist for CloudTrail configuration changes (Scored) -def control_3_5_ensure_log_metric_cloudtrail_configuration_changes(cloudtrails): +@time_decorator +def control_3_5_ensure_log_metric_cloudtrail_configuration_changes(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.5" description = "Ensure a log metric filter and alarm exist for CloudTrail configuration changes" scored = True - failReason = "Incorrect log metric alerts for CloudTrail configuration changes" + fail_reason = "Incorrect log metric alerts for CloudTrail configuration changes" for m, n in cloudtrails.items(): for o in n: try: @@ -1326,8 +1419,8 @@ def control_3_5_ensure_log_metric_cloudtrail_configuration_changes(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1335,23 +1428,24 @@ def control_3_5_ensure_log_metric_cloudtrail_configuration_changes(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.6 Ensure a log metric filter and alarm exist for AWS Management Console authentication failures (Scored) -def control_3_6_ensure_log_metric_console_auth_failures(cloudtrails): +@time_decorator +def control_3_6_ensure_log_metric_console_auth_failures(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" offenders = [] control = "3.6" description = "Ensure a log metric filter and alarm exist for console auth failures" scored = True - failReason = "Ensure a log metric filter and alarm exist for console auth failures" + fail_reason = "Ensure a log metric filter and alarm exist for console auth failures" for m, n in cloudtrails.items(): for o in n: try: @@ -1369,8 +1463,8 @@ def control_3_6_ensure_log_metric_console_auth_failures(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1378,23 +1472,24 @@ def control_3_6_ensure_log_metric_console_auth_failures(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.7 Ensure a log metric filter and alarm exist for disabling or scheduled deletion of customer created CMKs (Scored) -def control_3_7_ensure_log_metric_disabling_scheduled_delete_of_kms_cmk(cloudtrails): +@time_decorator +def control_3_7_ensure_log_metric_disabling_scheduled_delete_of_kms_cmk(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" offenders = [] control = "3.7" description = "Ensure a log metric filter and alarm exist for disabling or scheduling deletion of KMS CMK" scored = True - failReason = "Ensure a log metric filter and alarm exist for disabling or scheduling deletion of KMS CMK" + fail_reason = "Ensure a log metric filter and alarm exist for disabling or scheduling deletion of KMS CMK" for m, n in cloudtrails.items(): for o in n: try: @@ -1413,8 +1508,8 @@ def control_3_7_ensure_log_metric_disabling_scheduled_delete_of_kms_cmk(cloudtra MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1422,23 +1517,25 @@ def control_3_7_ensure_log_metric_disabling_scheduled_delete_of_kms_cmk(cloudtra result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.8 Ensure a log metric filter and alarm exist for S3 bucket policy changes (Scored) -def control_3_8_ensure_log_metric_s3_bucket_policy_changes(cloudtrails): +@time_decorator +def control_3_8_ensure_log_metric_s3_bucket_policy_changes(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.8" description = "Ensure a log metric filter and alarm exist for S3 bucket policy changes" scored = True - failReason = "Ensure a log metric filter and alarm exist for S3 bucket policy changes" + fail_reason = "Ensure a log metric filter and alarm exist for S3 bucket policy changes" for m, n in cloudtrails.items(): for o in n: try: @@ -1460,8 +1557,8 @@ def control_3_8_ensure_log_metric_s3_bucket_policy_changes(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1469,23 +1566,25 @@ def control_3_8_ensure_log_metric_s3_bucket_policy_changes(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.9 Ensure a log metric filter and alarm exist for AWS Config configuration changes (Scored) -def control_3_9_ensure_log_metric_config_configuration_changes(cloudtrails): +@time_decorator +def control_3_9_ensure_log_metric_config_configuration_changes(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.9" description = "Ensure a log metric filter and alarm exist for for AWS Config configuration changes" scored = True - failReason = "Ensure a log metric filter and alarm exist for for AWS Config configuration changes" + fail_reason = "Ensure a log metric filter and alarm exist for for AWS Config configuration changes" for m, n in cloudtrails.items(): for o in n: try: @@ -1505,8 +1604,8 @@ def control_3_9_ensure_log_metric_config_configuration_changes(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1514,23 +1613,25 @@ def control_3_9_ensure_log_metric_config_configuration_changes(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.10 Ensure a log metric filter and alarm exist for security group changes (Scored) -def control_3_10_ensure_log_metric_security_group_changes(cloudtrails): +@time_decorator +def control_3_10_ensure_log_metric_security_group_changes(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.10" description = "Ensure a log metric filter and alarm exist for security group changes" scored = True - failReason = "Ensure a log metric filter and alarm exist for security group changes" + fail_reason = "Ensure a log metric filter and alarm exist for security group changes" for m, n in cloudtrails.items(): for o in n: try: @@ -1550,8 +1651,8 @@ def control_3_10_ensure_log_metric_security_group_changes(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1559,23 +1660,25 @@ def control_3_10_ensure_log_metric_security_group_changes(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.11 Ensure a log metric filter and alarm exist for changes to Network Access Control Lists (NACL) (Scored) -def control_3_11_ensure_log_metric_nacl(cloudtrails): +@time_decorator +def control_3_11_ensure_log_metric_nacl(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.11" description = "Ensure a log metric filter and alarm exist for changes to Network Access Control Lists (NACL)" scored = True - failReason = "Ensure a log metric filter and alarm exist for changes to Network Access Control Lists (NACL)" + fail_reason = "Ensure a log metric filter and alarm exist for changes to Network Access Control Lists (NACL)" for m, n in cloudtrails.items(): for o in n: try: @@ -1596,8 +1699,8 @@ def control_3_11_ensure_log_metric_nacl(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1605,23 +1708,25 @@ def control_3_11_ensure_log_metric_nacl(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.12 Ensure a log metric filter and alarm exist for changes to network gateways (Scored) -def control_3_12_ensure_log_metric_changes_to_network_gateways(cloudtrails): +@time_decorator +def control_3_12_ensure_log_metric_changes_to_network_gateways(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.12" description = "Ensure a log metric filter and alarm exist for changes to network gateways" scored = True - failReason = "Ensure a log metric filter and alarm exist for changes to network gateways" + fail_reason = "Ensure a log metric filter and alarm exist for changes to network gateways" for m, n in cloudtrails.items(): for o in n: try: @@ -1641,8 +1746,8 @@ def control_3_12_ensure_log_metric_changes_to_network_gateways(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1650,23 +1755,25 @@ def control_3_12_ensure_log_metric_changes_to_network_gateways(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.13 Ensure a log metric filter and alarm exist for route table changes (Scored) -def control_3_13_ensure_log_metric_changes_to_route_tables(cloudtrails): +@time_decorator +def control_3_13_ensure_log_metric_changes_to_route_tables(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.13" description = "Ensure a log metric filter and alarm exist for route table changes" scored = True - failReason = "Ensure a log metric filter and alarm exist for route table changes" + fail_reason = "Ensure a log metric filter and alarm exist for route table changes" for m, n in cloudtrails.items(): for o in n: try: @@ -1687,8 +1794,8 @@ def control_3_13_ensure_log_metric_changes_to_route_tables(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1696,23 +1803,25 @@ def control_3_13_ensure_log_metric_changes_to_route_tables(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 3.14 Ensure a log metric filter and alarm exist for VPC changes (Scored) -def control_3_14_ensure_log_metric_changes_to_vpc(cloudtrails): +@time_decorator +def control_3_14_ensure_log_metric_changes_to_vpc(resource): """Summary Returns: TYPE: Description """ + cloudtrails = resource['cloudtrails'] result = False - failReason = "" + fail_reason = "" offenders = [] control = "3.14" description = "Ensure a log metric filter and alarm exist for VPC changes" scored = True - failReason = "Ensure a log metric filter and alarm exist for VPC changes" + fail_reason = "Ensure a log metric filter and alarm exist for VPC changes" for m, n in cloudtrails.items(): for o in n: try: @@ -1735,8 +1844,8 @@ def control_3_14_ensure_log_metric_changes_to_vpc(cloudtrails): MetricName=p['metricTransformations'][0]['metricName'], Namespace=p['metricTransformations'][0]['metricNamespace'] ) - snsClient = boto3.client('sns', region_name=m) - subscribers = snsClient.list_subscriptions_by_topic( + sns_client = boto3.client('sns', region_name=m) + subscribers = sns_client.list_subscriptions_by_topic( TopicArn=response['MetricAlarms'][0]['AlarmActions'][0] # Pagination not used since only 1 subscriber required ) @@ -1744,20 +1853,22 @@ def control_3_14_ensure_log_metric_changes_to_vpc(cloudtrails): result = True except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # --- Networking --- # 4.1 Ensure no security groups allow ingress from 0.0.0.0/0 to port 22 (Scored) -def control_4_1_ensure_ssh_not_open_to_world(regions): +@time_decorator +def control_4_1_ensure_ssh_not_open_to_world(resource): """Summary Returns: TYPE: Description """ + regions = resource['regions'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "4.1" description = "Ensure no security groups allow ingress from 0.0.0.0/0 to port 22" @@ -1771,25 +1882,27 @@ def control_4_1_ensure_ssh_not_open_to_world(regions): try: if int(o['FromPort']) <= 22 <= int(o['ToPort']) and '0.0.0.0/0' in str(o['IpRanges']): result = False - failReason = "Found Security Group with port 22 open to the world (0.0.0.0/0)" + fail_reason = "Found Security Group with port 22 open to the world (0.0.0.0/0)" offenders.append(str(m['GroupId'])) except: if str(o['IpProtocol']) == "-1" and '0.0.0.0/0' in str(o['IpRanges']): result = False - failReason = "Found Security Group with port 22 open to the world (0.0.0.0/0)" + fail_reason = "Found Security Group with port 22 open to the world (0.0.0.0/0)" offenders.append(str(n) + " : " + str(m['GroupId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 4.2 Ensure no security groups allow ingress from 0.0.0.0/0 to port 3389 (Scored) -def control_4_2_ensure_rdp_not_open_to_world(regions): +@time_decorator +def control_4_2_ensure_rdp_not_open_to_world(resource): """Summary Returns: TYPE: Description """ + regions = resource['regions'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "4.2" description = "Ensure no security groups allow ingress from 0.0.0.0/0 to port 3389" @@ -1803,25 +1916,27 @@ def control_4_2_ensure_rdp_not_open_to_world(regions): try: if int(o['FromPort']) <= 3389 <= int(o['ToPort']) and '0.0.0.0/0' in str(o['IpRanges']): result = False - failReason = "Found Security Group with port 3389 open to the world (0.0.0.0/0)" + fail_reason = "Found Security Group with port 3389 open to the world (0.0.0.0/0)" offenders.append(str(m['GroupId'])) except: if str(o['IpProtocol']) == "-1" and '0.0.0.0/0' in str(o['IpRanges']): result = False - failReason = "Found Security Group with port 3389 open to the world (0.0.0.0/0)" + fail_reason = "Found Security Group with port 3389 open to the world (0.0.0.0/0)" offenders.append(str(n) + " : " + str(m['GroupId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 4.3 Ensure the default security group of every VPC restricts all traffic (Scored) -def control_4_3_ensure_default_security_groups_restricts_traffic(regions): +@time_decorator +def control_4_3_ensure_default_security_groups_restricts_traffic(resource): """Summary Returns: TYPE: Description """ + regions = resource['regions'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "4.3" description = "Ensure the default security group of every VPC restricts all traffic" @@ -1841,20 +1956,22 @@ def control_4_3_ensure_default_security_groups_restricts_traffic(regions): for m in response['SecurityGroups']: if not (len(m['IpPermissions']) + len(m['IpPermissionsEgress'])) == 0: result = False - failReason = "Default security groups with ingress or egress rules discovered" + fail_reason = "Default security groups with ingress or egress rules discovered" offenders.append(str(n) + " : " + str(m['GroupId'])) - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # 4.4 Ensure routing tables for VPC peering are "least access" (Not Scored) -def control_4_4_ensure_route_tables_are_least_access(regions): +@time_decorator +def control_4_4_ensure_route_tables_are_least_access(resource): """Summary Returns: TYPE: Description """ + regions = resource['regions'] result = True - failReason = "" + fail_reason = "" offenders = [] control = "4.4" description = "Ensure routing tables for VPC peering are least access" @@ -1868,11 +1985,11 @@ def control_4_4_ensure_route_tables_are_least_access(regions): if o['VpcPeeringConnectionId']: if int(str(o['DestinationCidrBlock']).split("/", 1)[1]) < 24: result = False - failReason = "Large CIDR block routed to peer discovered, please investigate" + fail_reason = "Large CIDR block routed to peer discovered, please investigate" offenders.append(str(n) + " : " + str(m['RouteTableId'])) except: pass - return {'Result': result, 'failReason': failReason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except + return {'Result': result, 'failReason': fail_reason, 'Offenders': offenders, 'ScoredControl': scored, 'Description': description, 'ControlId': control} #pylint: disable=broad-except # --- Central functions --- @@ -1990,55 +2107,55 @@ def get_account_number(): return account -def set_evaluation(invokeEvent, mainEvent, annotation): +def set_evaluation(invokeEvent, main_event, annotation): """Summary Args: - event (TYPE): Description + main_event (TYPE): Description annotation (TYPE): Description Returns: TYPE: Description """ - configClient = boto3.client('config') + config_client = boto3.client('config') if len(annotation) > 0: - configClient.put_evaluations( + config_client.put_evaluations( Evaluations=[ { 'ComplianceResourceType': 'AWS::::Account', - 'ComplianceResourceId': mainEvent['accountId'], + 'ComplianceResourceId': main_event['accountId'], 'ComplianceType': 'NON_COMPLIANT', 'Annotation': str(annotation), 'OrderingTimestamp': invokeEvent['notificationCreationTime'] }, ], - ResultToken=mainEvent['resultToken'] + ResultToken=main_event['resultToken'] ) else: - configClient.put_evaluations( + config_client.put_evaluations( Evaluations=[ { 'ComplianceResourceType': 'AWS::::Account', - 'ComplianceResourceId': mainEvent['accountId'], + 'ComplianceResourceId': main_event['accountId'], 'ComplianceType': 'COMPLIANT', 'OrderingTimestamp': invokeEvent['notificationCreationTime'] }, ], - ResultToken=mainEvent['resultToken'] + ResultToken=main_event['resultToken'] ) -def json2html(controlResult, account): +def json2html(control_result, account): """Summary Args: - controlResult (TYPE): Description + control_result (TYPE): Description Returns: TYPE: Description """ table = [] - shortReport = shortAnnotation(controlResult) + short_report = short_annotation(control_result) table.append("\n\n