From 491a4cbffd0a832cf13eef7c59095329767d5d51 Mon Sep 17 00:00:00 2001 From: Nicolas Frisoni Date: Wed, 14 Feb 2024 08:45:59 +0100 Subject: [PATCH] suricatals: rules info api impl --- suricatals/langserver.py | 3 +++ suricatals/tests_rules.py | 52 ++++++++++++++++++++++++++++++++------- 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/suricatals/langserver.py b/suricatals/langserver.py index 9c8927b..32321f4 100644 --- a/suricatals/langserver.py +++ b/suricatals/langserver.py @@ -428,6 +428,9 @@ def analyse_file(self, filepath, engine_analysis=True, **kwargs): file_obj.load_from_disk() return file_obj.check_file(engine_analysis=engine_analysis, **kwargs) + def rules_infos(self, rule_buffer, **kwargs): + return self.rules_tester.rules_infos(rule_buffer, **kwargs) + class JSONRPC2Error(Exception): def __init__(self, code, message, data=None): diff --git a/suricatals/tests_rules.py b/suricatals/tests_rules.py index 164c658..388f6e4 100644 --- a/suricatals/tests_rules.py +++ b/suricatals/tests_rules.py @@ -422,24 +422,58 @@ def generate_config(self, tmpdir, config_buffer=None, related_files=None, return config_file - def rule_buffer(self, rule_buffer, engine_analysis=True, config_buffer=None, related_files=None, - reference_config=None, classification_config=None, extra_buffers=None): - # create temp directory - tmpdir = tempfile.mkdtemp() + def _prepare_conf(self, rule_buffer, tmpdir, **kwargs): # write the rule file in temp dir rule_file = os.path.join(tmpdir, "file.rules") with open(rule_file, 'w', encoding='utf-8') as rf: rf.write(rule_buffer) - if extra_buffers: - for filename, content in extra_buffers.items(): + if kwargs.get('extra_buffers'): + for filename, content in kwargs['extra_buffers'].items(): full_path = os.path.join(tmpdir, filename) with open(full_path, 'w', encoding="utf-8") as f: f.write(content) - config_file = self.generate_config(tmpdir, config_buffer=config_buffer, - related_files=related_files, reference_config=reference_config, - classification_config=classification_config) + return self.generate_config( + tmpdir, + config_buffer=kwargs.get('config_buffer'), + related_files=kwargs.get('related_files'), + reference_config=kwargs.get('reference_config'), + classification_config=kwargs.get('classification_config') + ) + + def rules_infos(self, rule_buffer, **kwargs): + tmpdir = tempfile.mkdtemp() + config_file = self._prepare_conf(rule_buffer, tmpdir, **kwargs) + rule_file = os.path.join(tmpdir, "file.rules") + + suri_cmd = [self.suricata_binary, '--engine-analysis', '-l', tmpdir, '-S', rule_file, '-c', config_file] + suriprocess = subprocess.Popen(suri_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + suriprocess.communicate() + + res = {} + json_path = os.path.join(tmpdir, 'rules.json') + with open(json_path, 'r', encoding='utf-8') as f: + for line in f.readlines(): + content = json.loads(line) + res[content['id']] = content + + return res + + def rule_buffer(self, rule_buffer, engine_analysis=True, config_buffer=None, related_files=None, + reference_config=None, classification_config=None, extra_buffers=None): + tmpdir = tempfile.mkdtemp() + config_file = self._prepare_conf( + rule_buffer, + tmpdir, + config_buffer=config_buffer, + related_files=related_files, + reference_config=reference_config, + classification_config=classification_config, + extra_buffers=extra_buffers + ) + + rule_file = os.path.join(tmpdir, "file.rules") suri_cmd = [self.suricata_binary, '-T', '-l', tmpdir, '-S', rule_file, '-c', config_file] # start suricata in test mode