From 419d6b341eb66e3d376a79f3f2a6fb534265d66b Mon Sep 17 00:00:00 2001 From: Thomas Patzke Date: Mon, 18 Nov 2024 00:04:21 +0100 Subject: [PATCH] Integration of fieldref into base backend --- sigma/backends/test/backend.py | 5 + sigma/conversion/base.py | 50 ++++++-- sigma/modifiers.py | 8 +- sigma/processing/transformations/base.py | 2 +- sigma/types.py | 4 +- tests/test_conversion_base.py | 144 +++++++++++++++++++++++ tests/test_modifiers.py | 6 +- 7 files changed, 201 insertions(+), 18 deletions(-) diff --git a/sigma/backends/test/backend.py b/sigma/backends/test/backend.py index 09ae8d0c..e1768660 100644 --- a/sigma/backends/test/backend.py +++ b/sigma/backends/test/backend.py @@ -70,6 +70,11 @@ class TextQueryTestBackend(TextQueryBackend): } field_equals_field_expression: ClassVar[str] = "{field1}=fieldref({field2})" + field_equals_field_startswith_expression: ClassVar[str] = ( + "{field1}=fieldref_startswith({field2})" + ) + field_equals_field_endswith_expression: ClassVar[str] = "{field1}=fieldref_endswith({field2})" + field_equals_field_contains_expression: ClassVar[str] = "{field1}=fieldref_contains({field2})" field_null_expression: ClassVar[str] = "{field} is null" diff --git a/sigma/conversion/base.py b/sigma/conversion/base.py index 4920b52c..c48b182a 100644 --- a/sigma/conversion/base.py +++ b/sigma/conversion/base.py @@ -889,9 +889,12 @@ class variables. If this is not sufficient, the respective methods can be implem ) # Expression for comparing two event fields - field_equals_field_expression: ClassVar[Optional[str]] = ( - None # Field comparison expression with the placeholders {field1} and {field2} corresponding to left field and right value side of Sigma detection item - ) + # Field comparison expression with the placeholders {field1} and {field2} corresponding to left field and right value side of Sigma detection item + field_equals_field_expression: ClassVar[Optional[str]] = None + field_equals_field_startswith_expression: ClassVar[Optional[str]] = None + field_equals_field_endswith_expression: ClassVar[Optional[str]] = None + field_equals_field_contains_expression: ClassVar[Optional[str]] = None + field_equals_field_escaping_quoting: Tuple[bool, bool] = ( True, True, @@ -1650,16 +1653,47 @@ def convert_condition_field_eq_field_escape_and_quote( ) def convert_condition_field_eq_field( - self, cond: SigmaFieldReference, state: ConversionState + self, cond: ConditionFieldEqualsValueExpression, state: ConversionState ) -> Union[str, DeferredQueryExpression]: """Conversion of comparision of two fields.""" field1, field2 = self.convert_condition_field_eq_field_escape_and_quote( cond.field, cond.value.field ) - return self.field_equals_field_expression.format( - field1=field1, - field2=field2, - ) + value = cond.value + if value.starts_with and value.ends_with: + if self.field_equals_field_contains_expression is None: + raise NotImplementedError( + "Field reference contains expression is not supported by backend." + ) + return self.field_equals_field_contains_expression.format( + field1=field1, + field2=field2, + ) + elif value.starts_with: + if self.field_equals_field_startswith_expression is None: + raise NotImplementedError( + "Field reference startswith expression is not supported by backend." + ) + return self.field_equals_field_startswith_expression.format( + field1=field1, + field2=field2, + ) + elif value.ends_with: + if self.field_equals_field_endswith_expression is None: + raise NotImplementedError( + "Field reference endswith expression is not supported by backend." + ) + return self.field_equals_field_endswith_expression.format( + field1=field1, + field2=field2, + ) + else: + if self.field_equals_field_expression is None: + raise NotImplementedError("Field reference expression is not supported by backend.") + return self.field_equals_field_expression.format( + field1=field1, + field2=field2, + ) def convert_condition_field_eq_val_null( self, cond: ConditionFieldEqualsValueExpression, state: ConversionState diff --git a/sigma/modifiers.py b/sigma/modifiers.py index 3109318b..841fd7b7 100644 --- a/sigma/modifiers.py +++ b/sigma/modifiers.py @@ -132,8 +132,8 @@ def modify( val.regexp = val.regexp + ".*" val.compile() elif isinstance(val, SigmaFieldReference): - val.wildcard_start = SpecialChars.WILDCARD_MULTI - val.wildcard_end = SpecialChars.WILDCARD_MULTI + val.starts_with = True + val.ends_with = True return val @@ -151,7 +151,7 @@ def modify( val.regexp = val.regexp + ".*" val.compile() elif isinstance(val, SigmaFieldReference): - val.wildcard_end = SpecialChars.WILDCARD_MULTI + val.starts_with = True return val @@ -169,7 +169,7 @@ def modify( val.regexp = ".*" + val.regexp val.compile() elif isinstance(val, SigmaFieldReference): - val.wildcard_start = SpecialChars.WILDCARD_MULTI + val.ends_with = True return val diff --git a/sigma/processing/transformations/base.py b/sigma/processing/transformations/base.py index 5de785b3..adb0eebc 100644 --- a/sigma/processing/transformations/base.py +++ b/sigma/processing/transformations/base.py @@ -207,7 +207,7 @@ def apply_detection_item( ): new_values.extend( ( - SigmaFieldReference(mapped_field) + SigmaFieldReference(mapped_field, value.starts_with, value.ends_with) for mapped_field in self._apply_field_name(value.field) ) ) diff --git a/sigma/types.py b/sigma/types.py index c125783a..812a0e2e 100644 --- a/sigma/types.py +++ b/sigma/types.py @@ -828,8 +828,8 @@ class SigmaFieldReference(NoPlainConversionMixin, SigmaType): """Type for referencing to other fields for comparison between them.""" field: str - wildcard_start: Union[SpecialChars, None] = None - wildcard_end: Union[SpecialChars, None] = None + starts_with: bool = False + ends_with: bool = False @dataclass diff --git a/tests/test_conversion_base.py b/tests/test_conversion_base.py index 79cbab90..985c1c87 100644 --- a/tests/test_conversion_base.py +++ b/tests/test_conversion_base.py @@ -1600,6 +1600,27 @@ def test_convert_compare_fields(test_backend): ) +def test_convert_compare_fields_unsupported(test_backend, monkeypatch): + monkeypatch.setattr(test_backend, "field_equals_field_expression", None) + with pytest.raises(NotImplementedError): + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|fieldref: "field B" + field A|fieldref: fieldB + condition: sel + """ + ) + ) + + def test_convert_compare_fields_noquote(test_backend: TextQueryTestBackend): test_backend.field_equals_field_expression = "`{field1}`=`{field2}`" test_backend.field_equals_field_escaping_quoting = (False, False) @@ -1668,6 +1689,129 @@ def test_convert_compare_fields_differentiation_prefix(test_backend): ) +def test_convert_compare_fields_startswith(test_backend): + assert ( + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|fieldref|startswith: fieldB + condition: sel + """ + ) + ) + == ["mappedA=fieldref_startswith(mappedB)"] + ) + + +def test_convert_compare_fields_startswith_unsupported(test_backend, monkeypatch): + monkeypatch.setattr(test_backend, "field_equals_field_startswith_expression", None) + with pytest.raises(NotImplementedError): + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|fieldref|startswith: fieldB + condition: sel + """ + ) + ) + + +def test_convert_compare_fields_endswith(test_backend): + assert ( + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|fieldref|endswith: fieldB + condition: sel + """ + ) + ) + == ["mappedA=fieldref_endswith(mappedB)"] + ) + + +def test_convert_compare_fields_endswith_unsupported(test_backend, monkeypatch): + monkeypatch.setattr(test_backend, "field_equals_field_endswith_expression", None) + with pytest.raises(NotImplementedError): + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|fieldref|endswith: fieldB + condition: sel + """ + ) + ) + + +def test_convert_compare_fields_contains(test_backend): + assert ( + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|fieldref|contains: fieldB + condition: sel + """ + ) + ) + == ["mappedA=fieldref_contains(mappedB)"] + ) + + +def test_convert_compare_fields_contains_unsupported(test_backend, monkeypatch): + monkeypatch.setattr(test_backend, "field_equals_field_contains_expression", None) + with pytest.raises(NotImplementedError): + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|fieldref|contains: fieldB + condition: sel + """ + ) + ) + + def test_convert_compare_fields_wrong_type(test_backend): with pytest.raises(SigmaTypeError): assert test_backend.convert( diff --git a/tests/test_modifiers.py b/tests/test_modifiers.py index 06742f19..09e22496 100644 --- a/tests/test_modifiers.py +++ b/tests/test_modifiers.py @@ -421,7 +421,7 @@ def test_fieldref_contains(dummy_detection_item): fieldref = SigmaFieldReferenceModifier(dummy_detection_item, []).modify(SigmaString("field")) assert ( SigmaContainsModifier(dummy_detection_item, [SigmaFieldReferenceModifier]).modify(fieldref) - ) == SigmaFieldReference("field", SpecialChars.WILDCARD_MULTI, SpecialChars.WILDCARD_MULTI) + ) == SigmaFieldReference("field", True, True) def test_fieldref_startswith(dummy_detection_item): @@ -430,14 +430,14 @@ def test_fieldref_startswith(dummy_detection_item): SigmaStartswithModifier(dummy_detection_item, [SigmaFieldReferenceModifier]).modify( fieldref ) - ) == SigmaFieldReference("field", None, SpecialChars.WILDCARD_MULTI) + ) == SigmaFieldReference("field", True, False) def test_fieldref_endswith(dummy_detection_item): fieldref = SigmaFieldReferenceModifier(dummy_detection_item, []).modify(SigmaString("field")) assert ( SigmaEndswithModifier(dummy_detection_item, [SigmaFieldReferenceModifier]).modify(fieldref) - ) == SigmaFieldReference("field", SpecialChars.WILDCARD_MULTI, None) + ) == SigmaFieldReference("field", False, True) def test_fieldref_wildcard(dummy_detection_item):