Skip to content

Commit

Permalink
[tool] log_siem_extractor.py: use new dita doc for check
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanpoelen committed Sep 5, 2024
1 parent 0aa5c58 commit 42bd174
Showing 1 changed file with 42 additions and 41 deletions.
83 changes: 42 additions & 41 deletions tools/log_siem_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import re
import sys
import glob
from typing import Iterable, TextIO, Callable, Any, NewType, TypeVar, NamedTuple
from itertools import chain
from os.path import join as path_join
Expand All @@ -26,15 +27,15 @@ def identity(x: T) -> T:

class DataLog(NamedTuple):
formated_logs: set[str]
used_params: list[list[str]]
used_params: set[tuple[str, ...]]
optional_params: set[str]

LogFormatType = NewType('LogFormatType', dict[str, DataLog])


def create_data_log():
return DataLog(formated_logs=set(),
used_params=[],
used_params=set(),
optional_params=set())


Expand Down Expand Up @@ -67,7 +68,7 @@ def print_alert_on_list(msg: str, texts: Iterable[str], color: bool) -> None:


def inject_optional_param_from_used_params(d: LogFormatType):
for _, datalog in d.items():
for datalog in d.values():
if not datalog.used_params:
continue
counter_param = Counter(p for params in datalog.used_params for p in params)
Expand All @@ -90,7 +91,7 @@ def update_dict(d: LogFormatType,
data = ''.join(f' {colored(k)}={v}' for k, v in kvs_list)
data = f'{cat}="{logid}"{data}'
d[logid].formated_logs.add(data)
d[logid].used_params.append([k for k, v in kvs_list])
d[logid].used_params.add(tuple(k for k, v in kvs_list))

proxy_logs = log_format_builder()
vnc_logs = log_format_builder()
Expand Down Expand Up @@ -166,7 +167,7 @@ def update(d, process, filenames): # noqa: ANN001, ANN202
for (logid, desc) in server_cert_regex.findall(text):
log = rdp_logs[logid]
log.formated_logs.add(f'{cat}="{logid}" {colored("description")}={desc}')
log.used_params.append(['description'])
log.used_params.add(('description',))

elif dirpath.startswith(f'{src_path}/mod/vnc'):
update(vnc_logs, log6_process, filenames)
Expand Down Expand Up @@ -214,8 +215,8 @@ def copy_in_rdp_and_vnc(logs):
for k, d in logs.items():
rdp_logs[k].formated_logs.update(d.formated_logs)
vnc_logs[k].formated_logs.update(d.formated_logs)
rdp_logs[k].used_params.extend(d.used_params)
vnc_logs[k].used_params.extend(d.used_params)
rdp_logs[k].used_params.update(d.used_params)
vnc_logs[k].used_params.update(d.used_params)

copy_in_rdp_and_vnc(rdp_and_vnc_logs)

Expand Down Expand Up @@ -247,58 +248,58 @@ def copy_in_rdp_and_vnc(logs):
return proxy_logs, rdp_logs, vnc_logs


def extract_doc_siem(docfile: str) -> tuple[LogFormatType, # proxy
LogFormatType, # rdp
LogFormatType]: # vnc
def extract_doc_siem(docdir: str) -> tuple[LogFormatType, # proxy
LogFormatType, # rdp
LogFormatType]: # vnc
# Format:
# <section>
# (
# ( <para>...</para> )+ # contains log
# ( <note>...</note> )* # contains optional value (<literal>...</literal>)
# ( <codeblock>...</codeblock> )+ # contains log
# ( <note type="note">...</note> )* # contains optional value (<codeph>...</codeph>)
# )+
# </section>
reg_base = (
r'<para(?: role="[^"]*")?><literal>(rdpproxy: \[rdpproxy\]|(?:rdpproxy: )?\[(?:RDP|VNC) Session\])'
r' (?:(?!type=|<).)*type=["”]([^"”]+)["”][^<]*'
)
block_regex = re.compile(reg_base + r'((?:(?!</section>).)*)', re.DOTALL)
log_regex = re.compile(reg_base + r'|<note>((?:(?!</note>).)*)', re.DOTALL)
optional_values_regex = re.compile(r'<literal>(\w+)</literal>')

cat = r'rdpproxy: \[rdpproxy\]|(?:rdpproxy: )?\[(?:RDP|VNC) Session\]'
block_regex = re.compile(fr'<codeblock[^>]*>(?:{cat}) [^<]*((?:(?!</section>).)*)', re.DOTALL)
# split by rdpproxy: [rdpproxy] / wabengine: [wabauth] / others... / <note>
split_part_regex = re.compile(r'(?=(?:(?:^|\n|<codeblock[^>]*>)(?:\w+: \[\w+\]|(?:rdpproxy: )?\[(?:RDP|VNC) Session\]) |<note[ >]))', re.DOTALL | re.MULTILINE)
siem_log_regex = re.compile(fr'^(?:\n|<codeblock[^>]*>)?(({cat}) (?:(?!type=|<).)*type=["”]([^"”]+)["”][^<]*)', re.DOTALL | re.MULTILINE)

# accept <codeph>h</codeph>, <codeph>h</codeph> and <codeph>h</codeph> is optional
# reject '<codeph>h</codeph> :' and '<codeph>h</codeph>.'
optional_values_regex = re.compile(r'<codeph>(\w+)</codeph>(?:,|[^[:symbol:] ]|\s+\w)')
kv_siem_cpp_regex = re.compile(r'(\w+)=["”](?:[^"”\\]|\\.)+["”]')

proxy_logs = log_format_builder()
rdp_logs = log_format_builder()
vnc_logs = log_format_builder()

d = {
dmap = {
'rdpproxy: [rdpproxy]': proxy_logs,
'rdpproxy: [RDP Session]': rdp_logs,
'rdpproxy: [VNC Session]': vnc_logs,
'[RDP Session]': rdp_logs,
'[VNC Session]': vnc_logs,
}

for m in block_regex.finditer(read_xmlfile(docfile)):
previous_logkey = None
for m in log_regex.finditer(m.group(0)):
note = m.group(3)
if not note:
data = m.group(0)
cat = m.group(1)
t = m.group(2)
previous_logkey = (cat, t)
kvs = kv_siem_cpp_regex.findall(data.replace('”', '"'))
datalog = d[cat][t]
datalog.formated_logs.add(data)
datalog.used_params.append(kvs)
else:
optional_values = optional_values_regex.findall(note)
if optional_values:
datalog = d[previous_logkey[0]][previous_logkey[1]]
datalog.optional_params.update(optional_values)

for logs in d.values():
for docfile in glob.glob('**/*.dita', root_dir=docdir, recursive=True):
for m in block_regex.finditer(read_xmlfile(f'{docdir}/{docfile}')):
previous_datalog = None
for part in split_part_regex.split(m.group(0)):
if m := siem_log_regex.match(part):
data = m.group(1)
cat = m.group(2)
ty = m.group(3)
kvs = kv_siem_cpp_regex.findall(data.replace('”', '"'))
datalog = dmap[cat][ty]
datalog.formated_logs.add(data)
datalog.used_params.add(tuple(kvs))
previous_datalog = datalog
elif part.startswith('<'): # is <note>
optional_values = optional_values_regex.findall(part)
if optional_values:
previous_datalog.optional_params.update(optional_values)

for logs in dmap.values():
inject_optional_param_from_used_params(logs)

return proxy_logs, rdp_logs, vnc_logs
Expand Down

0 comments on commit 42bd174

Please sign in to comment.