Skip to content

Commit

Permalink
[tool] log_siem_extractor.py: fix cert id extractor and add -m and -q…
Browse files Browse the repository at this point in the history
… for disabled check on missing/unknown log siem or bad quote
  • Loading branch information
jonathanpoelen committed Nov 24, 2023
1 parent 96ce259 commit d5aa6fb
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 27 deletions.
4 changes: 4 additions & 0 deletions tools/c++-analyzer/lua-checker/checkers/log6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,17 @@ function terminate()
end

-- add not extracted id
-- NOTE synchronize with tools/log_siem_extractor.py
for _,id in ipairs({
'PROBE_STATUS',
-- special case in tools/log_siem_extractor.py (server_cert_regex)
-- @{
'SERVER_CERTIFICATE_ERROR',
'SERVER_CERTIFICATE_MATCH_SUCCESS',
'CERTIFICATE_CHECK_SUCCESS',
'SERVER_CERTIFICATE_NEW',
'SERVER_CERTIFICATE_MATCH_FAILURE',
-- @}
}) do
if ids[id] ~= 0 then
utils.print_error(id .. ' is already used, please update script\n')
Expand Down
82 changes: 55 additions & 27 deletions tools/log_siem_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def to_process(log_extractor, kv_extractor):
),
kv_log6_regex, colored)

server_cert_regex = re.compile(r'\{\n\s*LogId::(\w+),\s*("[^"]+")')

log_id_sesprobe_regex = re.compile(r'EXECUTABLE_LOG6_ID_AND_NAME\(\s*([A-Z0-9_]+)\s*\)([^)]*)')
kv_sesprobe_regex = re.compile(r'"([^"]+)"()') # capture an empty value
sesprob_process = to_process(log_id_sesprobe_regex, kv_sesprobe_regex)
Expand All @@ -101,13 +103,19 @@ def update(d, process, filenames):
for text in map(cppfile_from_dirpath, filenames):
process(d, text)

cat = colored('type')

for dirpath, _, filenames_ in os.walk(src_path):
filenames = filter(filter_valid_cpp, filenames_)

if dirpath.startswith(f'{src_path}/mod/rdp'):
for text in map(cppfile_from_dirpath, filenames):
for filename in filenames:
text = cppfile_from_dirpath(filename)
log6_process(rdp_logs, text)
sesprob_process(rdp_logs, text)
if filename == 'rdp_negociation.cpp' and dirpath == f'{src_path}/mod/rdp':
for (logid, desc) in server_cert_regex.findall(text):
rdp_logs.setdefault(logid, set()).add(f'{cat}="{logid}" description={desc}')

elif dirpath.startswith(f'{src_path}/mod/vnc'):
update(vnc_logs, log6_process, filenames)
Expand Down Expand Up @@ -223,11 +231,25 @@ def __init__(self,
self.colored = color_builder('31;1') if color else identity
self.colored2 = color_builder('34;3;1') if color else identity

def check(self,
doc_proxy_logs: LogFormatType,
doc_rdp_logs: LogFormatType,
doc_vnc_logs: LogFormatType,
) -> bool:
def check_bad_quote(self,
doc_proxy_logs: LogFormatType,
doc_rdp_logs: LogFormatType,
doc_vnc_logs: LogFormatType,
) -> str:
it = chain(doc_proxy_logs.values(),
doc_rdp_logs.values(),
doc_vnc_logs.values())
bad_logs = set(log for log in chain.from_iterable(it) if '”' in log)
if bad_logs:
l = '\n - '.join(sorted(bad_logs))
return f'Log with ” instead of ":\n - {l}'
return ''

def check_missing_or_unknown(self,
doc_proxy_logs: LogFormatType,
doc_rdp_logs: LogFormatType,
doc_vnc_logs: LogFormatType,
) -> bool:
doc_proxy_ids = doc_proxy_logs.keys()
doc_rdp_ids = doc_rdp_logs.keys()
doc_vnc_ids = doc_vnc_logs.keys()
Expand All @@ -243,32 +265,28 @@ def check(self,
colored = self.colored
colored2 = self.colored2

def show_missing(msg, l, d):
msgs = []

def append_missing(msg, l, d):
if l:
list_format = lambda l: "\n ".join(sorted(l))
formats = ''.join(f'\n - {colored(k)}:\n {list_format(d[k])}' for k in sorted(l))
print(colored2(msg), ':', formats, '\n', sep='')
msgs.append(f'{colored2(msg)}:{formats}')

show_missing('Missing log SIEM for [rdpproxy]', proxy_missing, self.proxy_logs)
show_missing('Missing log SIEM for [RDP Session]', rdp_missing, self.rdp_logs)
show_missing('Missing log SIEM for [VNC Session]', vnc_missing, self.vnc_logs)
append_missing('Missing log SIEM for [rdpproxy]', proxy_missing, self.proxy_logs)
append_missing('Missing log SIEM for [RDP Session]', rdp_missing, self.rdp_logs)
append_missing('Missing log SIEM for [VNC Session]', vnc_missing, self.vnc_logs)

def show_unknown(msg, l):
def append_unknown(msg, l):
if l:
print(colored2(msg), ':\n - ', '\n - '.join(map(colored, sorted(l))), '\n', sep='')
elems = '\n - '.join(map(colored, sorted(l)))
msgs.append(f'{colored2(msg)}:\n - {elems}')

show_unknown('Unknown log SIEM for [rdpproxy]', proxy_unknown)
show_unknown('Unknown log SIEM for [RDP Session]', rdp_unknown)
show_unknown('Unknown log SIEM for [VNC Session]', vnc_unknown)

bad_logs = set(log for log in chain.from_iterable(chain(doc_proxy_logs.values(),
doc_rdp_logs.values(),
doc_vnc_logs.values())) if '”' in log)
if bad_logs:
print('Log with ” instead of ":\n - ', end='')
print('\n - '.join(sorted(bad_logs)))
append_unknown('Unknown log SIEM for [rdpproxy]', proxy_unknown)
append_unknown('Unknown log SIEM for [RDP Session]', rdp_unknown)
append_unknown('Unknown log SIEM for [VNC Session]', vnc_unknown)

return bool(proxy_missing or rdp_missing or vnc_missing or proxy_unknown or rdp_unknown or vnc_unknown)
return '\n\n'.join(msgs)


def print_siem_format(proxy_logs: LogFormatType,
Expand Down Expand Up @@ -313,6 +331,8 @@ def show(name, logs):
parser.add_argument('-c', '--check-doc', action='append')
parser.add_argument('-p', '--python-ids', action='store_true')
parser.add_argument('-o', '--output-python', default=sys.stdout, type=argparse.FileType('w'))
parser.add_argument('-q', '--no-check-quote', action='store_true')
parser.add_argument('-m', '--no-check-missing-or-unknown', action='store_true')
parser.add_argument('--color', action='store_true')
parser.add_argument('path', nargs='?', default='src')

Expand All @@ -328,10 +348,18 @@ def show(name, logs):
if args.check_doc:
color_file = color_builder('33') if args.color else identity
doc_checker = DocSiemChecker(*siems, args.color)
insert_nl = False
for docfile in args.check_doc:
print(f'{color_file(docfile)}:\n')
doc_checker.check(*extract_doc_siem(docfile))
print()
doc_siems = extract_doc_siem(docfile)
err1 = '' if args.no_check_missing_or_unknown \
else doc_checker.check_missing_or_unknown(*doc_siems)
err2 = '' if args.no_check_quote \
else doc_checker.check_bad_quote(*doc_siems)
if err1 or err2:
nl = '\n\n' if insert_nl else ''
sep = '\n\n' if err1 and err2 else ''
print(f'{nl}{color_file(docfile)}:\n\n{err1}{sep}{err2}')
insert_nl = True
has_action = True

if not has_action or args.python_ids:
Expand Down

0 comments on commit d5aa6fb

Please sign in to comment.