From d5aa6fb4fd5c14708fa42579fa4c75603cea878b Mon Sep 17 00:00:00 2001 From: Jonathan Poelen Date: Fri, 24 Nov 2023 18:34:09 +0100 Subject: [PATCH] [tool] log_siem_extractor.py: fix cert id extractor and add -m and -q for disabled check on missing/unknown log siem or bad quote --- .../lua-checker/checkers/log6.lua | 4 + tools/log_siem_extractor.py | 82 +++++++++++++------ 2 files changed, 59 insertions(+), 27 deletions(-) diff --git a/tools/c++-analyzer/lua-checker/checkers/log6.lua b/tools/c++-analyzer/lua-checker/checkers/log6.lua index 8cf4845cc6..01698caedd 100755 --- a/tools/c++-analyzer/lua-checker/checkers/log6.lua +++ b/tools/c++-analyzer/lua-checker/checkers/log6.lua @@ -134,13 +134,17 @@ function terminate() end -- add not extracted id + -- NOTE synchronize with tools/log_siem_extractor.py for _,id in ipairs({ 'PROBE_STATUS', + -- special case in tools/log_siem_extractor.py (server_cert_regex) + -- @{ 'SERVER_CERTIFICATE_ERROR', 'SERVER_CERTIFICATE_MATCH_SUCCESS', 'CERTIFICATE_CHECK_SUCCESS', 'SERVER_CERTIFICATE_NEW', 'SERVER_CERTIFICATE_MATCH_FAILURE', + -- @} }) do if ids[id] ~= 0 then utils.print_error(id .. ' is already used, please update script\n') diff --git a/tools/log_siem_extractor.py b/tools/log_siem_extractor.py index 27262e5720..c7e107b7d2 100755 --- a/tools/log_siem_extractor.py +++ b/tools/log_siem_extractor.py @@ -77,6 +77,8 @@ def to_process(log_extractor, kv_extractor): ), kv_log6_regex, colored) + server_cert_regex = re.compile(r'\{\n\s*LogId::(\w+),\s*("[^"]+")') + log_id_sesprobe_regex = re.compile(r'EXECUTABLE_LOG6_ID_AND_NAME\(\s*([A-Z0-9_]+)\s*\)([^)]*)') kv_sesprobe_regex = re.compile(r'"([^"]+)"()') # capture an empty value sesprob_process = to_process(log_id_sesprobe_regex, kv_sesprobe_regex) @@ -101,13 +103,19 @@ def update(d, process, filenames): for text in map(cppfile_from_dirpath, filenames): process(d, text) + cat = colored('type') + for dirpath, _, filenames_ in os.walk(src_path): filenames = filter(filter_valid_cpp, filenames_) if dirpath.startswith(f'{src_path}/mod/rdp'): - for text in map(cppfile_from_dirpath, filenames): + for filename in filenames: + text = cppfile_from_dirpath(filename) log6_process(rdp_logs, text) sesprob_process(rdp_logs, text) + if filename == 'rdp_negociation.cpp' and dirpath == f'{src_path}/mod/rdp': + for (logid, desc) in server_cert_regex.findall(text): + rdp_logs.setdefault(logid, set()).add(f'{cat}="{logid}" description={desc}') elif dirpath.startswith(f'{src_path}/mod/vnc'): update(vnc_logs, log6_process, filenames) @@ -223,11 +231,25 @@ def __init__(self, self.colored = color_builder('31;1') if color else identity self.colored2 = color_builder('34;3;1') if color else identity - def check(self, - doc_proxy_logs: LogFormatType, - doc_rdp_logs: LogFormatType, - doc_vnc_logs: LogFormatType, - ) -> bool: + def check_bad_quote(self, + doc_proxy_logs: LogFormatType, + doc_rdp_logs: LogFormatType, + doc_vnc_logs: LogFormatType, + ) -> str: + it = chain(doc_proxy_logs.values(), + doc_rdp_logs.values(), + doc_vnc_logs.values()) + bad_logs = set(log for log in chain.from_iterable(it) if '”' in log) + if bad_logs: + l = '\n - '.join(sorted(bad_logs)) + return f'Log with ” instead of ":\n - {l}' + return '' + + def check_missing_or_unknown(self, + doc_proxy_logs: LogFormatType, + doc_rdp_logs: LogFormatType, + doc_vnc_logs: LogFormatType, + ) -> bool: doc_proxy_ids = doc_proxy_logs.keys() doc_rdp_ids = doc_rdp_logs.keys() doc_vnc_ids = doc_vnc_logs.keys() @@ -243,32 +265,28 @@ def check(self, colored = self.colored colored2 = self.colored2 - def show_missing(msg, l, d): + msgs = [] + + def append_missing(msg, l, d): if l: list_format = lambda l: "\n ".join(sorted(l)) formats = ''.join(f'\n - {colored(k)}:\n {list_format(d[k])}' for k in sorted(l)) - print(colored2(msg), ':', formats, '\n', sep='') + msgs.append(f'{colored2(msg)}:{formats}') - show_missing('Missing log SIEM for [rdpproxy]', proxy_missing, self.proxy_logs) - show_missing('Missing log SIEM for [RDP Session]', rdp_missing, self.rdp_logs) - show_missing('Missing log SIEM for [VNC Session]', vnc_missing, self.vnc_logs) + append_missing('Missing log SIEM for [rdpproxy]', proxy_missing, self.proxy_logs) + append_missing('Missing log SIEM for [RDP Session]', rdp_missing, self.rdp_logs) + append_missing('Missing log SIEM for [VNC Session]', vnc_missing, self.vnc_logs) - def show_unknown(msg, l): + def append_unknown(msg, l): if l: - print(colored2(msg), ':\n - ', '\n - '.join(map(colored, sorted(l))), '\n', sep='') + elems = '\n - '.join(map(colored, sorted(l))) + msgs.append(f'{colored2(msg)}:\n - {elems}') - show_unknown('Unknown log SIEM for [rdpproxy]', proxy_unknown) - show_unknown('Unknown log SIEM for [RDP Session]', rdp_unknown) - show_unknown('Unknown log SIEM for [VNC Session]', vnc_unknown) - - bad_logs = set(log for log in chain.from_iterable(chain(doc_proxy_logs.values(), - doc_rdp_logs.values(), - doc_vnc_logs.values())) if '”' in log) - if bad_logs: - print('Log with ” instead of ":\n - ', end='') - print('\n - '.join(sorted(bad_logs))) + append_unknown('Unknown log SIEM for [rdpproxy]', proxy_unknown) + append_unknown('Unknown log SIEM for [RDP Session]', rdp_unknown) + append_unknown('Unknown log SIEM for [VNC Session]', vnc_unknown) - return bool(proxy_missing or rdp_missing or vnc_missing or proxy_unknown or rdp_unknown or vnc_unknown) + return '\n\n'.join(msgs) def print_siem_format(proxy_logs: LogFormatType, @@ -313,6 +331,8 @@ def show(name, logs): parser.add_argument('-c', '--check-doc', action='append') parser.add_argument('-p', '--python-ids', action='store_true') parser.add_argument('-o', '--output-python', default=sys.stdout, type=argparse.FileType('w')) + parser.add_argument('-q', '--no-check-quote', action='store_true') + parser.add_argument('-m', '--no-check-missing-or-unknown', action='store_true') parser.add_argument('--color', action='store_true') parser.add_argument('path', nargs='?', default='src') @@ -328,10 +348,18 @@ def show(name, logs): if args.check_doc: color_file = color_builder('33') if args.color else identity doc_checker = DocSiemChecker(*siems, args.color) + insert_nl = False for docfile in args.check_doc: - print(f'{color_file(docfile)}:\n') - doc_checker.check(*extract_doc_siem(docfile)) - print() + doc_siems = extract_doc_siem(docfile) + err1 = '' if args.no_check_missing_or_unknown \ + else doc_checker.check_missing_or_unknown(*doc_siems) + err2 = '' if args.no_check_quote \ + else doc_checker.check_bad_quote(*doc_siems) + if err1 or err2: + nl = '\n\n' if insert_nl else '' + sep = '\n\n' if err1 and err2 else '' + print(f'{nl}{color_file(docfile)}:\n\n{err1}{sep}{err2}') + insert_nl = True has_action = True if not has_action or args.python_ids: