Skip to content

Commit

Permalink
Add sample: Spotlight Quick Report
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcodes committed Oct 14, 2022
1 parent 0fe3ebc commit 548bacd
Showing 1 changed file with 269 additions and 0 deletions.
269 changes: 269 additions & 0 deletions samples/spotlight/spotlight_quick_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
"""Spotlight results quick report generator.
_______ __ _______ __ __ __
| _ .----.-----.--.--.--.--| | _ | |_.----|__| |--.-----.
|. 1___| _| _ | | | | _ | 1___| _| _| | <| -__|
|. |___|__| |_____|________|_____|____ |____|__| |__|__|__|_____|
|: 1 | |: 1 |
|::.. . | |::.. . | FalconPy SDK
`-------' `-------'
____ __ ___ __ __
/ __/__ ___ / /_/ (_)__ _/ / / /_
_\ \/ _ \/ _ \/ __/ / / _ `/ _ \/ __/
/___/ .__/\___/\__/_/_/\_, /_//_/\__/
/_/ /___/
____ _ __ ___ __
/ __ \__ __(_)___/ /__ / _ \___ ___ ___ ____/ /_
/ /_/ / // / / __/ '_/ / , _/ -_) _ \/ _ \/ __/ __/
\___\_\_,_/_/\__/_/\_\ /_/|_|\__/ .__/\___/_/ \__/
/_/
This example requires crowdstrike-falconpy v1.2.2 or greater.
Easy Object Authentication is also demonstrated in this sample.
"""
import json
import time
from datetime import datetime
from argparse import ArgumentParser, RawTextHelpFormatter, Namespace
from falconpy import Hosts, SpotlightVulnerabilities, _VERSION


SEVERITIES = ["unknown", "none", "low", "medium", "high", "critical"]
CVE_BANNER = r"""
______ _______
/ ___\ \ / / ____|___
| | \ \ / /| _| / __|
| |___ \ V / | |___\__ \
\____| \_/ |_____|___/
"""
HOSTS_BANNER = r"""
_ _ _
| | | | ___ ___| |_ ___
| |_| |/ _ \/ __| __/ __|
| _ | (_) \__ \ |_\__ \
|_| |_|\___/|___/\__|___/
"""
RESULTS_BANNER = r"""
____ _ _
| _ \ ___ ___ _ _| | |_ ___
| |_) / _ \/ __| | | | | __/ __|
| _ < __/\__ \ |_| | | |_\__ \
|_| \_\___||___/\__,_|_|\__|___/
"""
HOST_AUTH = None


def consume_arguments() -> Namespace:
"""Consume any provided command line arguments."""
parser = ArgumentParser(description=__doc__, formatter_class=RawTextHelpFormatter)
req = parser.add_argument_group("required arguments")
req.add_argument("-k", "--client_id", help="CrowdStrike Falcon API Client ID.", required=True)
req.add_argument("-s", "--client_secret",
help="CrowdStrike Falcon API Client Secret.",
required=True
)
parser.add_argument("-d", "--days",
help="Include days from X days backwards (3-45).",
default=0
)
parser.add_argument("-f", "--file",
help="File to import data from.\n"
"Data is queried from the API if this argument is not provided.",
default=None
)
parser.add_argument("-o", "--output",
help="File to output results to.\n"
"Output is not performed if this argument is not provided.",
default=None
)
parser.add_argument("-a", "--allow_dupes",
help="Allow duplicates.",
default=False,
action="store_true"
)

return parser.parse_args()


def query_spotlight(key: str, secret: str, days: str, aft: str = None):
"""Retrieve a batch of Spotlight Vulnerability matches."""

def do_query(qfilter: str):
returned = spotlight.query_vulnerabilities_combined(
filter=qfilter,
after=aft,
sort="updated_timestamp|asc",
limit=400,
facet="cve"
)

return returned["status_code"], returned

spotlight = SpotlightVulnerabilities(client_id=key, client_secret=secret)

global HOST_AUTH # pylint: disable=W0603
HOST_AUTH = spotlight # Save this here so we can use it to auth to hosts

query_filter = "cve.id:!['']+status:!'closed'"
if int(days) >= 3:
query_filter = f"{query_filter}+last_seen_within:'{days}'"
stat, all_results = do_query(query_filter)
while stat == 429:
print("Rate limit met, waiting 0.5 seconds to retry.")
time.sleep(0.5)
stat, all_results = do_query(query_filter)
if stat != 200:
raise SystemExit("Unable to retrieve Spotlight Vulnerability matches.")

return all_results["body"]["meta"]["pagination"]["total"], \
all_results["body"]["meta"]["pagination"]["after"], \
len(all_results["body"]["resources"]), \
all_results["body"]["resources"]


def get_total_sensor_count():
"""Retrieve the total number of available sensors within the tenant."""
hosts = Hosts(auth_object=HOST_AUTH)
returned = hosts.query_devices()
if returned["status_code"] != 200:
returned = "Unknown"
else:
returned = returned["body"]["meta"]["pagination"]["total"]

return returned


def get_worst_hostname(host_id: str):
"""Retrieve the hostname for the host with the most Spotlight matches."""
hosts = Hosts(auth_object=HOST_AUTH)
returned = hosts.get_device_details(host_id)
if returned["status_code"] != 200:
returned = host_id
else:
returned = f"{returned['body']['resources'][0]['hostname']} ({host_id})"

return returned


def inform(msg: str):
"""Send dynamic command line updates."""
print(f"{msg}", end="\r", flush=True)


def process_matches(arg: Namespace):
"""Process Spotlight Vulnerability matches."""
retrieved = 0
if arg.file:
with open(arg.file, "r", encoding="utf-8") as loader:
matches = json.load(loader)
for match_list in matches["sensor"].values():
retrieved += sum(len(x) for x in match_list.values())
else:
matches = {}
matches["sensor"] = {}
matches["cve"] = {}
for sev in SEVERITIES:
matches["cve"][sev] = {}
after = None
total = 1
while retrieved <= total:
total, after, returned, result = query_spotlight(key=arg.client_id,
secret=arg.client_secret,
days=arg.days,
aft=after
)
retrieved += returned
for match in result:
aid = match.get("aid")
cve = match.get("cve", {})
cve_id = cve["id"]
severity_match = cve.get("severity", "unknown").lower()
if aid not in matches["sensor"]:
matches["sensor"][aid] = {}
for sev in SEVERITIES:
matches["sensor"][aid][sev] = []
if cve_id not in matches["sensor"][aid][severity_match] or arg.allow_dupes:
matches["sensor"][aid][severity_match].append(cve_id)
if cve_id not in matches["cve"][severity_match].keys():
matches["cve"][severity_match][cve_id] = []
if aid not in matches["cve"][severity_match][cve_id] or arg.allow_dupes:
matches["cve"][severity_match][cve_id].append(aid)
inform(f" {formatted(retrieved)} of {formatted(total)}")
inform(f"{formatted(retrieved)} total matches retrieved.")
return matches, retrieved


def formatted(num_to_format: int):
"""Format integers for terminal output."""
return f"{num_to_format:,}"


def process_results(output_file: str, matches: dict, total_matched: int): # pylint: disable=R0914
"""Write the output file and display the results."""
total_hosts = get_total_sensor_count()
total_cve_matches = sum(len(m) for m in matches["cve"].values())
worst_aid = max(matches["sensor"],
key=lambda x: sum(len(matches["sensor"][x][sev]) for sev in SEVERITIES)
)
worst_host = get_worst_hostname(worst_aid)
worst_aid_count = sum(len(matches["sensor"][worst_aid][sev]) for sev in SEVERITIES)
total_host_pct = f"{100 * len(matches['sensor']) / total_hosts:0.2f}"
total_host_match_pct = f"{100 * worst_aid_count / total_cve_matches:0.2f}"
worst_critical = max(matches["cve"]["critical"],
key=lambda x: len(matches["cve"]["critical"][x])
)
worst_critical_count = formatted(len(matches['cve']['critical'][worst_critical]))
worst_high = max(matches["cve"]["high"], key=lambda x: len(matches["cve"]["high"][x]))
worst_high_count = formatted(len(matches['cve']['high'][worst_high]))
worst_medium = max(matches["cve"]["medium"], key=lambda x: len(matches["cve"]["medium"][x]))
worst_medium_count = formatted(len(matches['cve']['medium'][worst_medium]))
worst_low = max(matches["cve"]["low"], key=lambda x: len(matches["cve"]["low"][x]))
worst_low_count = formatted(len(matches['cve']['low'][worst_low]))

print(RESULTS_BANNER)
print(f"{formatted(total_cve_matches)} CVEs produced "
f"{formatted(total_matched)} Spotlight Vulnerability matches across "
f"{formatted(total_hosts)} sensors"
)
print(HOSTS_BANNER)
print(f"{formatted(len(matches['sensor']))} hosts "
"were identified with Spotlight Vulnerability matches "
f"({total_host_pct}% of total sensors)"
)
print(f"Worst host: {worst_host} "
f"({formatted(worst_aid_count)} matches for "
f"{total_host_match_pct}% of total CVEs matched)"
)
print(CVE_BANNER)
print(f"{formatted(total_cve_matches)} matched CVEs ("
f"{len(matches['cve']['critical'])} critical, "
f"{len(matches['cve']['high'])} high, "
f"{len(matches['cve']['medium'])} medium, "
f"{len(matches['cve']['low'])} low)"
)
print(f"Critical CVE with the most matches: {worst_critical} "
f"({worst_critical_count} matched hosts)"
)
print(f"High CVE with the most matches: {worst_high} ({worst_high_count} matched hosts)")
print(f"Medium CVE with the most matches: {worst_medium} ({worst_medium_count} matched hosts)")
print(f"Low CVE with the most matches: {worst_low} ({worst_low_count} matched hosts)")
if output_file:
with open(args.output, "w", encoding="utf-8") as save_file:
json.dump(matches, save_file, indent=4)
print(f"The data used in thsi report has been saved to {output_file}")

if __name__ == "__main__":
vers = _VERSION.split(".")
main_minor = float(f"{vers[0]}.{vers[1]}")
patch = int(vers[2])
if main_minor < 1.2 or (main_minor == 1.2 and patch < 2):
raise SystemExit("This sample requires crowdstrike-falconpy v1.2.2 or greater.")
start_time = datetime.now().timestamp()
args = consume_arguments()
if args.file:
HOST_AUTH = Hosts(client_id=args.client_id, client_secret=args.client_secret)
process_results(args.output, *process_matches(args))
total_run_time = datetime.now().timestamp() - start_time
print(f"\nReport generated in {total_run_time:,.2f} seconds.")

0 comments on commit 548bacd

Please sign in to comment.