From da6c1b53d262b5417f68f8502c37f2661e670c6b Mon Sep 17 00:00:00 2001 From: Evgeny Kolesnikov Date: Mon, 11 Dec 2023 10:35:41 +0100 Subject: [PATCH] WIP --- .../package_389-ds-base_removed/rule.yml | 2 +- .../sysctl_kernel_panic_on_oops/rule.yml | 1 - .../{test.profile => test.profile.bak} | 0 ssg/environment.py | 9 +- utils/convert_prodtype_ds_check.py | 82 ++++ ...ert_prodtype_into_default_profile_entry.py | 356 ++++++++++++++++++ 6 files changed, 446 insertions(+), 4 deletions(-) rename products/fedora/profiles/{test.profile => test.profile.bak} (100%) create mode 100755 utils/convert_prodtype_ds_check.py create mode 100755 utils/convert_prodtype_into_default_profile_entry.py diff --git a/linux_os/guide/services/ldap/389_ds/package_389-ds-base_removed/rule.yml b/linux_os/guide/services/ldap/389_ds/package_389-ds-base_removed/rule.yml index a4bd1fc3c5d5..5fc2199de1e0 100644 --- a/linux_os/guide/services/ldap/389_ds/package_389-ds-base_removed/rule.yml +++ b/linux_os/guide/services/ldap/389_ds/package_389-ds-base_removed/rule.yml @@ -1,5 +1,5 @@ documentation_complete: true - + prodtype: rhcos4,rhel7,rhel8,rhel9 title: 'Uninstall 389-ds-base Package' diff --git a/linux_os/guide/system/permissions/restrictions/sysctl_kernel_panic_on_oops/rule.yml b/linux_os/guide/system/permissions/restrictions/sysctl_kernel_panic_on_oops/rule.yml index 7a7d1792705e..0dc1a763c84e 100644 --- a/linux_os/guide/system/permissions/restrictions/sysctl_kernel_panic_on_oops/rule.yml +++ b/linux_os/guide/system/permissions/restrictions/sysctl_kernel_panic_on_oops/rule.yml @@ -1,4 +1,3 @@ -prodtype: fedora documentation_complete: true title: 'Kernel panic on oops' diff --git a/products/fedora/profiles/test.profile b/products/fedora/profiles/test.profile.bak similarity index 100% rename from products/fedora/profiles/test.profile rename to products/fedora/profiles/test.profile.bak diff --git a/ssg/environment.py b/ssg/environment.py index 52a29e03465d..ab588380fcff 100644 --- a/ssg/environment.py +++ b/ssg/environment.py @@ -8,8 +8,13 @@ def open_environment(build_config_yaml_path, product_yaml_path, product_properties_path=None): contents = open_raw(build_config_yaml_path) + product = open_product_environment(product_yaml_path, product_properties_path) + contents.update(product) + return contents + + +def open_product_environment(product_yaml_path, product_properties_path=None): product = load_product_yaml(product_yaml_path) if product_properties_path: product.read_properties_from_directory(product_properties_path) - contents.update(product) - return contents + return product diff --git a/utils/convert_prodtype_ds_check.py b/utils/convert_prodtype_ds_check.py new file mode 100755 index 000000000000..009ab62337b9 --- /dev/null +++ b/utils/convert_prodtype_ds_check.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python + +from __future__ import print_function + +import argparse as ap +from xml.etree import ElementTree + +NAMESPACES = dict( + xccdf_ns="http://scap.nist.gov/schema/scap/source/1.2", + profile_ns="http://checklists.nist.gov/xccdf/1.2", +) + + +def fname_to_etree(fname): + input_tree = ElementTree.parse(fname) + return input_tree + + +def get_rule_results_from_etree(tree): + xpath_expr = ".//{%s}Rule" % NAMESPACES["profile_ns"] + xccdfs = tree.findall(xpath_expr) + return xccdfs + + +def get_profiles_from_etree(tree): + xpath_expr = ".//{%s}Profile" % NAMESPACES["profile_ns"] + xccdfs = tree.findall(xpath_expr) + return xccdfs + + +def get_selections_from_etree(tree): + xpath_expr = ".//{%s}select" % NAMESPACES["profile_ns"] + xccdfs = tree.findall(xpath_expr) + return xccdfs + + +def get_rules_from_etree(tree): + xpath_expr = ".//{%s}Rule" % NAMESPACES["profile_ns"] + xccdfs = tree.findall(xpath_expr) + return xccdfs + + +def extract_tree_from_file(fname): + return fname_to_etree(fname) + + +def make_parser(): + parser = ap.ArgumentParser() + parser.add_argument("first") + return parser.parse_args() + + +if __name__ == "__main__": + #args = make_parser() + f = "../build/ssg-rhel8-ds.xml" #args.first + #first_results = extract_results_from_file(f) + tree = extract_tree_from_file(f) + + profiles = sorted(get_profiles_from_etree(tree), key=lambda x: x.attrib["id"]) + + rules = sorted(get_rules_from_etree(tree), key=lambda x: x.attrib["id"]) + + print(f"Found {len(profiles)} profilies, {len(rules)} rules") + + rules_selections = {} + for p in profiles: + p_id = p.attrib["id"].removeprefix("xccdf_org.ssgproject.content_") + selections = sorted(get_selections_from_etree(p), key=lambda x: x.attrib["idref"]) + print(f"{p_id} (selections: {len(selections)})") + for sel in selections: + r_id = sel.attrib["idref"].removeprefix("xccdf_org.ssgproject.content_") + r_selected = sel.attrib["selected"].lower() == "true" + print(f" {'+' if r_selected else '-'}{r_id}") + r_stats = rules_selections.get(r_id, {"selected": 0, "unselected": 0}) + r_stats["selected" if r_selected else "unselected"] += 1 + rules_selections[r_id] = r_stats + + for r in rules: + r_id = r.attrib["id"].removeprefix("xccdf_org.ssgproject.content_") + r_selected = r.attrib["selected"].lower() == "true" + in_profiles = f"selected: {rules_selections[r_id]['selected']}, unselected: {rules_selections[r_id]['unselected']}" if r_id in rules_selections else "absent" + print(f"{'+' if r_selected else '-'}{r_id} (profiles: {in_profiles})") diff --git a/utils/convert_prodtype_into_default_profile_entry.py b/utils/convert_prodtype_into_default_profile_entry.py new file mode 100755 index 000000000000..27a1a40fa1a5 --- /dev/null +++ b/utils/convert_prodtype_into_default_profile_entry.py @@ -0,0 +1,356 @@ +#!/usr/bin/python3 + +import sys +import os +import argparse +import json + +import ssg.rules +import ssg.utils +import ssg.products +import ssg.rule_yaml +import ssg.build_yaml +import ssg.build_profile +import ssg.entities.profile_base +import ssg.controls +import ssg.build_cpe +import ssg.yaml + +SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +GUIDE_RULES = {} + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("-p", "--product", type=str, action="store", default="rhel8", + help="Product (defaults to all if not set)") + return parser.parse_args() + + +def list_products(rule_obj): + yaml_file, yaml_contents = ssg.rule_yaml.get_yaml_contents(rule_obj) + prodtype_section = ssg.rule_yaml.get_section_lines(yaml_file, yaml_contents, 'prodtype') + + print("Computed products:") + for product in sorted(rule_obj.get('products', [])): + print(" - %s" % product) + print("") + + if prodtype_section: + prodtype_contents = ssg.rule_yaml.parse_from_yaml(yaml_contents, prodtype_section) + prodtype = ssg.rule_yaml.parse_prodtype(prodtype_contents['prodtype']) + print("Listed products:") + for product in prodtype: + print(" - %s" % product) + else: + print("Empty listed prodtype in the file") + + +def add_products(rule_obj, products, silent=False): + yaml_file, yaml_contents = ssg.rule_yaml.get_yaml_contents(rule_obj) + prodtype_section = ssg.rule_yaml.get_section_lines(yaml_file, yaml_contents, 'prodtype') + + if not prodtype_section: + new_prodtype = sorted(set(products)) + new_prodtype_str = ','.join(new_prodtype) + + doc_complete_section = ssg.rule_yaml.get_section_lines(yaml_file, yaml_contents, + 'documentation_complete') + if not doc_complete_section: + print("Cannot modify empty prodtype with missing documentation_complete... " + "Are you sure this is a rule file? %s" % yaml_file, file=sys.stderr) + sys.exit(1) + + start_line = doc_complete_section[1]+1 + + if not silent: + print("Current prodtype is empty, not adding the new prodtype.") + else: + prodtype_contents = ssg.rule_yaml.parse_from_yaml(yaml_contents, prodtype_section) + prodtype = prodtype_contents['prodtype'] + + new_prodtype = ssg.rule_yaml.parse_prodtype(prodtype) + new_prodtype.update(products) + new_prodtype_str = ','.join(sorted(new_prodtype)) + + print("Modifying %s:" % yaml_file) + print(" Current prodtype: %s" % prodtype) + print(" New prodtype: %s" % new_prodtype_str) + + yaml_contents = ssg.rule_yaml.update_key_value(yaml_contents, 'prodtype', + prodtype, new_prodtype_str) + + ssg.utils.write_list_file(yaml_file, yaml_contents) + + +def remove_products(rule_obj, products): + yaml_file, yaml_contents = ssg.rule_yaml.get_yaml_contents(rule_obj) + prodtype_section = ssg.rule_yaml.get_section_lines(yaml_file, yaml_contents, 'prodtype') + + if not prodtype_section: + print("Cannot modify empty prodtype to remove products from %s" % + yaml_file, file=sys.stderr) + sys.exit(1) + + prodtype_contents = ssg.rule_yaml.parse_from_yaml(yaml_contents, prodtype_section) + prodtype = prodtype_contents['prodtype'] + + new_prodtype = ssg.rule_yaml.parse_prodtype(prodtype) + new_prodtype = new_prodtype.difference(products) + new_prodtype_str = ','.join(sorted(new_prodtype)) + + print("Current prodtype: %s" % prodtype) + + if new_prodtype: + print("New prodtype: %s" % new_prodtype_str) + yaml_contents = ssg.rule_yaml.update_key_value(yaml_contents, 'prodtype', + prodtype, new_prodtype_str) + else: + print("New prodtype is empty") + yaml_contents = ssg.rule_yaml.remove_lines(yaml_contents, prodtype_section) + + ssg.utils.write_list_file(yaml_file, yaml_contents) + + +def replace_products(rule_obj, products): + yaml_file, yaml_contents = ssg.rule_yaml.get_yaml_contents(rule_obj) + prodtype_section = ssg.rule_yaml.get_section_lines(yaml_file, yaml_contents, 'prodtype') + + if not prodtype_section: + print("Cannot modify empty prodtype to replace products from %s" % yaml_file, + file=sys.stderr) + sys.exit(1) + + parsed_changes = [] + for product in products: + parsed_product = product.split('~') + if not len(parsed_product) == 2: + print("Invalid product replacement description: %s" % product, + file=sys.stderr) + sys.exit(1) + + change = { + 'match': ssg.rule_yaml.parse_prodtype(parsed_product[0]), + 'replacement': ssg.rule_yaml.parse_prodtype(parsed_product[1]), + } + parsed_changes.append(change) + + prodtype_contents = ssg.rule_yaml.parse_from_yaml(yaml_contents, prodtype_section) + prodtype = prodtype_contents['prodtype'] + + current_prodtypes = ssg.rule_yaml.parse_prodtype(prodtype) + new_prodtypes = set(current_prodtypes) + + for change in parsed_changes: + if change['match'].issubset(current_prodtypes): + new_prodtypes.difference_update(change['match']) + new_prodtypes.update(change['replacement']) + + new_prodtype_str = ','.join(sorted(new_prodtypes)) + + print("Current prodtype: %s" % prodtype) + print("New prodtype: %s" % new_prodtype_str) + + yaml_contents = ssg.rule_yaml.update_key_value(yaml_contents, 'prodtype', + prodtype, new_prodtype_str) + + ssg.utils.write_list_file(yaml_file, yaml_contents) + + +def main1(): + args = parse_args() + + json_file = open(args.json, 'r') + known_rules = json.load(json_file) + + if args.rule_id not in known_rules: + print("Error: rule_id:%s is not known!" % args.rule_id, file=sys.stderr) + print("If you think this is an error, try regenerating the JSON.", file=sys.stderr) + sys.exit(1) + + if args.action != "list" and not args.products: + print("Error: expected a list of products or replace transformations but " + "none given.", file=sys.stderr) + sys.exit(1) + + rule_obj = known_rules[args.rule_id] + print("rule_id:%s\n" % args.rule_id) + + if args.action == "list": + list_products(rule_obj) + elif args.action == "add": + add_products(rule_obj, args.products) + elif args.action == "remove": + remove_products(rule_obj, args.products) + elif args.action == "replace": + replace_products(rule_obj, args.products) + else: + print("Unknown option: %s" % args.action) + + +def collect_rule_ids_and_dirs(rules_dir): + for rule_dir in sorted(ssg.rules.find_rule_dirs(rules_dir)): + yield ssg.rules.get_rule_dir_id(rule_dir), rule_dir + + +def handle_rule_yaml(env, rule_dir): + rule_file = ssg.rules.get_rule_dir_yaml(rule_dir) + rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, env) + return rule_file, rule_yaml + + +def get_rules(product, product_path, env): + guide_path = os.path.abspath(os.path.join(product_path, product['benchmark_root'])) + if guide_path not in GUIDE_RULES: + print(f"Loading rules from '{guide_path}'...", end=""), + GUIDE_RULES[guide_path] = {} + for rule_id, rule_dir in collect_rule_ids_and_dirs(guide_path): + try: + rule_file, rule_yaml = handle_rule_yaml(env, rule_dir) + GUIDE_RULES[guide_path][rule_yaml.id_] = rule_yaml + except ssg.yaml.DocumentationNotComplete: + # Happens on non-debug build when a rule is "documentation-incomplete" + continue + print(len(GUIDE_RULES[guide_path])) + return GUIDE_RULES[guide_path] + + +def get_products_and_rules(root_path): + linux_products, other_products = ssg.products.get_all(root_path) + all_products = linux_products.union(other_products) + for product_id in all_products: + product_path = ssg.products.product_yaml_path(root_path, product_id) + product_props_path = os.path.join(root_path, "product_properties") + product = ssg.products.load_product_yaml(product_path) + product.read_properties_from_directory(product_props_path) + env = dict(product) + rules = get_rules(product, os.path.join(root_path, "products", product_id), env) + yield product_id, product, env, rules + + +def unselect_rules_in_profile(product, profile, profile_path, prof_unsel): + if len(prof_unsel) == 0: + return + + comment = f"# Following rules once had a prodtype incompatible with the {product['product']} product" + with open(profile_path, 'r') as f: + lines = f.readlines() + comment_line = 0 + indent = 0 + sel_start = 0 + sel_end = 0 + already_unselected = [] + for i, line in enumerate(lines): + strip_line = line.strip() + if strip_line.startswith("- '!") or strip_line.startswith('- "!'): + already_unselected.append(strip_line[4:-1]) + continue + if comment in line: + comment_line = i + continue + if strip_line.startswith("#") or not strip_line.strip(): + continue + if line.startswith("selections:"): + sel_start = i + sel_end = len(lines) - 1 + continue + if sel_start != 0: + if not strip_line.startswith("-"): + sel_end = i - 1 + elif indent == 0: + indent = line.find("-") + + #print(f"XXXX: {comment_line}, {sel_start}, {sel_end}, {indent}") + #print(repr(already_unselected)) + + for unsel in prof_unsel: + if unsel in already_unselected: + #print(f"Already unselected {unsel}") + continue + lines.insert(sel_end+1, f"{indent * ' '}- '!{unsel}'\n") + if comment_line == 0: + lines.insert(sel_end+1, f"{indent * ' '}{comment}\n") + + with open(profile_path, 'w') as f: + f.writelines(lines) + + +def select_rules_in_default_profile(product, profiles_root, prof_def_sel): + prefix = (f"documentation_complete: true\n" + f"\n" + f"hidden: true\n" + f"\n" + f"title: Default Profile for {product['full_name']}\n" + f"\n" + f"description: |-\n" + f" This profile contains all the rules that once belonged to the\n" + f" {product['product']} product via 'prodtype'. This profile won't\n" + f" be rendered into an XCCDF Profile entity, nor it will select any\n" + f" of these rules by default. The only purpose of this profile\n" + f" is to keep a rule in the product's XCCDF Benchmark.\n" + f"\n" + f"selections:\n") + + path = os.path.join(SSG_ROOT, "products", product['product'], profiles_root) + with open(f"{path}/default.profile", "w") as f: + f.write(prefix) + for sel in prof_def_sel: + f.write(f" - {sel}\n") + + +def main(): + args = parse_args() + for product_id, product, env, all_rules in get_products_and_rules(SSG_ROOT): + if args.product: + if product_id != args.product: + continue + print(f"Product '{product_id}', profiles:") + + product_cpes = ssg.build_cpe.ProductCPEs() + product_cpes.load_product_cpes(env) + + controls_path = os.path.join(SSG_ROOT, "controls") + controls_manager = ssg.controls.ControlsManager(controls_path, env) + controls_manager.load() + + profile_paths = ssg.products.get_profile_files_from_root(env, product) + all_sels = set() + for profile_path in profile_paths: + profile = ssg.build_yaml.ProfileWithInlinePolicies.from_yaml(profile_path, env, product_cpes) + if profile.id_ == 'default': + # Whatever is there — we are going to overwrite it + continue + profile.resolve_controls(controls_manager) + sels = sorted(profile.selected) + unsels = sorted(profile.unselected) + extends = profile.extends if profile.extends else "no" + print(f" {profile.id_} (extends: {extends}, selections: {len(sels)}/{len(unsels)})", end="") + prof_unsel = set() + for sel in sels: + if sel in all_rules: + if all_rules[sel].prodtype == '' or all_rules[sel].prodtype is None: + raise Exception("Should not happen!") + if product_id not in all_rules[sel].prodtype and all_rules[sel].prodtype != 'all': + if sel not in unsels: + #print(f' - {sel}') + prof_unsel.add(sel) + print(f' -{len(prof_unsel)}') + all_sels.update(sels) + all_sels.difference_update(unsels) + + unselect_rules_in_profile(product, profile, profile_path, prof_unsel) + + prof_def_sel = set() + print(' default (hidden)', end="") + for rul_id, rul in all_rules.items(): + if product_id in rul.prodtype or rul.prodtype == 'all': + if rul_id not in all_sels: + #print(f' + {rul_id}') + prof_def_sel.add(rul_id) + print(f' +{len(prof_def_sel)}') + + select_rules_in_default_profile(product, env['profiles_root'], prof_def_sel) + + +if __name__ == "__main__": + main()