pip install https://github.com/wagov/nbdev-squ/archive/refs/heads/main.tar.gz
Before using, config needs to be loaded into squ.core.cache
, which can
be done automatically from json in a keyvault by setting the env var
SQU_CONFIG
to "keyvault/tenantid"
.
export SQU_CONFIG="{{ keyvault }}/{{ tenantid }}"
from nbdev_squ import api, clients
import io, pandas
# Load workspace info from datalake blob storage
df = api.list_workspaces(fmt="df"); print(df.shape)
# Load workspace info from introspection of azure graph
df = api.list_securityinsights(); print(df.shape)
# Kusto query to Sentinel workspaces via Azure Lighthouse
df = api.query_all("SecurityIncident | take 20", fmt="df"); print(df.shape)
# Kusto queries to Sentinel workspaces via Azure Lighthouse (batches up to 100 queries at a time)
df = api.query_all(["SecurityAlert | take 20" for a in range(10)]); print(df.shape)
# Kusto query to ADX
#df = api.adxtable2df(api.adx_query("kusto query | take 20"))
# General azure cli cmd
api.azcli(["config", "set", "extension.use_dynamic_install=yes_without_prompt"])
print(len(api.azcli(["account", "list"])))
# Various pre-configured api clients
# RunZero
#response = clients.runzero.get("/export/org/assets.csv", params={"search": "has_public:t AND alive:t AND (protocol:rdp OR protocol:vnc OR protocol:teamviewer OR protocol:telnet OR protocol:ftp)"})
#runzero_assets = pandas.read_csv(io.StringIO(response.text))
# Jira
#issues = clients.jira.jql("updated > -1d")["issues"]
# AbuseIPDB
#clients.abuseipdb.check_ip("1.1.1.1")
badips_df = api.query_all("""
SecurityIncident
| where Classification == "TruePositive"
| mv-expand AlertIds
| project tostring(AlertIds)
| join SecurityAlert on $left.AlertIds == $right.SystemAlertId
| mv-expand todynamic(Entities)
| project Entities.Address
| where isnotempty(Entities_Address)
| distinct tostring(Entities_Address)
""", timespan=pandas.Timedelta("45d"))
df = api.query_all("find where ClientIP startswith '172.16.' | evaluate bag_unpack(pack_) | take 40000")
df = api.query_all("""union withsource="_table" *
| extend _ingestion_time_bin = bin(ingestion_time(), 1h)
| summarize take_any(*) by _table, _ingestion_time_bin
| project pack=pack_all(true)""")
import json
pandas.DataFrame(list(df["pack"].apply(json.loads)))