diff --git a/tools/log_siem_extractor.py b/tools/log_siem_extractor.py index 266574358e..d8ed9e9024 100755 --- a/tools/log_siem_extractor.py +++ b/tools/log_siem_extractor.py @@ -2,6 +2,7 @@ import os import re import sys +import glob from typing import Iterable, TextIO, Callable, Any, NewType, TypeVar, NamedTuple from itertools import chain from os.path import join as path_join @@ -26,7 +27,7 @@ def identity(x: T) -> T: class DataLog(NamedTuple): formated_logs: set[str] - used_params: list[list[str]] + used_params: set[tuple[str, ...]] optional_params: set[str] LogFormatType = NewType('LogFormatType', dict[str, DataLog]) @@ -34,7 +35,7 @@ class DataLog(NamedTuple): def create_data_log(): return DataLog(formated_logs=set(), - used_params=[], + used_params=set(), optional_params=set()) @@ -67,7 +68,7 @@ def print_alert_on_list(msg: str, texts: Iterable[str], color: bool) -> None: def inject_optional_param_from_used_params(d: LogFormatType): - for _, datalog in d.items(): + for datalog in d.values(): if not datalog.used_params: continue counter_param = Counter(p for params in datalog.used_params for p in params) @@ -90,7 +91,7 @@ def update_dict(d: LogFormatType, data = ''.join(f' {colored(k)}={v}' for k, v in kvs_list) data = f'{cat}="{logid}"{data}' d[logid].formated_logs.add(data) - d[logid].used_params.append([k for k, v in kvs_list]) + d[logid].used_params.add(tuple(k for k, v in kvs_list)) proxy_logs = log_format_builder() vnc_logs = log_format_builder() @@ -166,7 +167,7 @@ def update(d, process, filenames): # noqa: ANN001, ANN202 for (logid, desc) in server_cert_regex.findall(text): log = rdp_logs[logid] log.formated_logs.add(f'{cat}="{logid}" {colored("description")}={desc}') - log.used_params.append(['description']) + log.used_params.add(('description',)) elif dirpath.startswith(f'{src_path}/mod/vnc'): update(vnc_logs, log6_process, filenames) @@ -214,8 +215,8 @@ def copy_in_rdp_and_vnc(logs): for k, d in logs.items(): rdp_logs[k].formated_logs.update(d.formated_logs) vnc_logs[k].formated_logs.update(d.formated_logs) - rdp_logs[k].used_params.extend(d.used_params) - vnc_logs[k].used_params.extend(d.used_params) + rdp_logs[k].used_params.update(d.used_params) + vnc_logs[k].used_params.update(d.used_params) copy_in_rdp_and_vnc(rdp_and_vnc_logs) @@ -247,31 +248,32 @@ def copy_in_rdp_and_vnc(logs): return proxy_logs, rdp_logs, vnc_logs -def extract_doc_siem(docfile: str) -> tuple[LogFormatType, # proxy - LogFormatType, # rdp - LogFormatType]: # vnc +def extract_doc_siem(docdir: str) -> tuple[LogFormatType, # proxy + LogFormatType, # rdp + LogFormatType]: # vnc # Format: #
# ( - # ( ... )+ # contains log - # ( ... )* # contains optional value (...) + # ( ... )+ # contains log + # ( ... )* # contains optional value (...) # )+ #
- reg_base = ( - r'(rdpproxy: \[rdpproxy\]|(?:rdpproxy: )?\[(?:RDP|VNC) Session\])' - r' (?:(?!type=|<).)*type=["”]([^"”]+)["”][^<]*' - ) - block_regex = re.compile(reg_base + r'((?:(?!).)*)', re.DOTALL) - log_regex = re.compile(reg_base + r'|((?:(?!).)*)', re.DOTALL) - optional_values_regex = re.compile(r'(\w+)') - + cat = r'rdpproxy: \[rdpproxy\]|(?:rdpproxy: )?\[(?:RDP|VNC) Session\]' + block_regex = re.compile(fr']*>(?:{cat}) [^<]*((?:(?!).)*)', re.DOTALL) + # split by rdpproxy: [rdpproxy] / wabengine: [wabauth] / others... / + split_part_regex = re.compile(r'(?=(?:(?:^|\n|]*>)(?:\w+: \[\w+\]|(?:rdpproxy: )?\[(?:RDP|VNC) Session\]) |]))', re.DOTALL | re.MULTILINE) + siem_log_regex = re.compile(fr'^(?:\n|]*>)?(({cat}) (?:(?!type=|<).)*type=["”]([^"”]+)["”][^<]*)', re.DOTALL | re.MULTILINE) + + # accept h, h and h is optional + # reject 'h :' and 'h.' + optional_values_regex = re.compile(r'(\w+)(?:,|[^[:symbol:] ]|\s+\w)') kv_siem_cpp_regex = re.compile(r'(\w+)=["”](?:[^"”\\]|\\.)+["”]') proxy_logs = log_format_builder() rdp_logs = log_format_builder() vnc_logs = log_format_builder() - d = { + dmap = { 'rdpproxy: [rdpproxy]': proxy_logs, 'rdpproxy: [RDP Session]': rdp_logs, 'rdpproxy: [VNC Session]': vnc_logs, @@ -279,26 +281,25 @@ def extract_doc_siem(docfile: str) -> tuple[LogFormatType, # proxy '[VNC Session]': vnc_logs, } - for m in block_regex.finditer(read_xmlfile(docfile)): - previous_logkey = None - for m in log_regex.finditer(m.group(0)): - note = m.group(3) - if not note: - data = m.group(0) - cat = m.group(1) - t = m.group(2) - previous_logkey = (cat, t) - kvs = kv_siem_cpp_regex.findall(data.replace('”', '"')) - datalog = d[cat][t] - datalog.formated_logs.add(data) - datalog.used_params.append(kvs) - else: - optional_values = optional_values_regex.findall(note) - if optional_values: - datalog = d[previous_logkey[0]][previous_logkey[1]] - datalog.optional_params.update(optional_values) - - for logs in d.values(): + for docfile in glob.glob('**/*.dita', root_dir=docdir, recursive=True): + for m in block_regex.finditer(read_xmlfile(f'{docdir}/{docfile}')): + previous_datalog = None + for part in split_part_regex.split(m.group(0)): + if m := siem_log_regex.match(part): + data = m.group(1) + cat = m.group(2) + ty = m.group(3) + kvs = kv_siem_cpp_regex.findall(data.replace('”', '"')) + datalog = dmap[cat][ty] + datalog.formated_logs.add(data) + datalog.used_params.add(tuple(kvs)) + previous_datalog = datalog + elif part.startswith('<'): # is + optional_values = optional_values_regex.findall(part) + if optional_values: + previous_datalog.optional_params.update(optional_values) + + for logs in dmap.values(): inject_optional_param_from_used_params(logs) return proxy_logs, rdp_logs, vnc_logs