Skip to content

Commit

Permalink
Fix to issue 12
Browse files Browse the repository at this point in the history
  • Loading branch information
moullos committed Sep 14, 2024
1 parent 2ad214c commit 1fdd29d
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 22 deletions.
36 changes: 16 additions & 20 deletions sigma/backends/crowdstrike/logscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,34 +256,30 @@ def convert_condition_field_eq_val_str(
) -> Union[str, DeferredQueryExpression]:
"""Conversion of field = string value expressions. In logscale we handle everything as regexs"""
try:
if ( # Check conditions for usage of 'startswith' operator
if ( # contains: string starts and ends with wildcard
self.contains_expression is not None
and cond.value.startswith(SpecialChars.WILDCARD_MULTI)
and cond.value.endswith(SpecialChars.WILDCARD_MULTI)
):
expr = self.contains_expression
value = cond.value[1:-1]
elif ( # Same as above but for 'endswith' operator: string starts with wildcard
self.endswith_expression is not None
and cond.value.startswith(SpecialChars.WILDCARD_MULTI)
):
expr = self.endswith_expression
value = cond.value[1:]

elif ( # Same as above but for 'startswith' operator: string ends with wildcard
self.startswith_expression
is not None # 'startswith' operator is defined in backend
and cond.value.endswith(
SpecialChars.WILDCARD_MULTI
) # String ends with wildcard
and not cond.value[
:-1
].contains_special() # Remainder of string doesn't contains special characters
)
):
expr = self.startswith_expression
# If all conditions are fulfilled, use 'startswith' operator instead of equal token
value = cond.value[:-1]
elif ( # Same as above but for 'endswith' operator: string starts with wildcard and doesn't contains further special characters
self.endswith_expression is not None
and cond.value.startswith(SpecialChars.WILDCARD_MULTI)
and not cond.value[1:].contains_special()
):
expr = self.endswith_expression
value = cond.value[1:]
elif ( # contains: string starts and ends with wildcard
self.contains_expression is not None
and cond.value.startswith(SpecialChars.WILDCARD_MULTI)
and cond.value.endswith(SpecialChars.WILDCARD_MULTI)
and not cond.value[1:-1].contains_special()
):
expr = self.contains_expression
value = cond.value[1:-1]
else:
expr = self.re_exact_match
value = cond.value
Expand Down
14 changes: 13 additions & 1 deletion sigma/pipelines/crowdstrike/crowdstrike.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,26 @@ def common_processing_items():
ProcessingItem(
identifier="cql_imagefilename_replace_disk_name",
transformation=ReplaceStringTransformation(
regex="^[C-Z]:", replacement="\\\\Device\\\\HarddiskVolume?\\\\"
regex="[C-Z]:", replacement="\\\\Device\\\\HarddiskVolume?\\\\"
),
field_name_conditions=[
IncludeFieldCondition(fields=["ImageFileName"]),
IncludeFieldCondition(fields=["TargetImageFileName"]),
],
field_name_condition_linking=any,
),
# ImageFileName full path handling with contains
# ProcessingItem(
# identifier="cql_imagefilename_replace_disk_name_contains",
# transformation=ReplaceStringTransformation(
# regex="^\\*[C-Z]:", replacement="*\\\\Device\\\\HarddiskVolume?\\\\"
# ),
# field_name_conditions=[
# IncludeFieldCondition(fields=["ImageFileName"]),
# IncludeFieldCondition(fields=["TargetImageFileName"]),
# ],
# field_name_condition_linking=any,
# ),
# ImageFileName starting with colon handling
ProcessingItem(
identifier="cql_imagefilename_replace_disk_name",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_backend_logscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_crowdstrikelogscale_special_chars(logscale_backend: LogScaleBackend):
"""
)
)
== ["fieldA=/^valueA.*\\$\\^\\.\\|.\\(\\)\\[\\]\\+\/$/i"]
== ["fieldA=/^valueA.*\\$\\^\\.\\|.\\(\\)\\[\\]\\+\\/$/i"]
)


Expand Down
73 changes: 73 additions & 0 deletions tests/test_processing_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,56 @@ def process_creation_sigma_rule_disk_name_colon():
"""
)

@pytest.fixture
def process_creation_sigma_rule_disk_name_contains():
return SigmaCollection.from_yaml(
"""
title: Process Creation Test
status: test
logsource:
category: process_creation
product: windows
detection:
sel:
CommandLine: "test.exe foo bar"
Image|contains: "C:\\\\Windows\\\\System32\\\\cmd.exe"
condition: sel
"""
)

@pytest.fixture
def process_creation_sigma_rule_disk_name_startswith():
return SigmaCollection.from_yaml(
"""
title: Process Creation Test
status: test
logsource:
category: process_creation
product: windows
detection:
sel:
CommandLine: "test.exe foo bar"
Image|startswith: "C:\\\\Windows\\\\System32\\\\cmd.exe"
condition: sel
"""
)

@pytest.fixture
def process_creation_sigma_rule_disk_name_endswith():
return SigmaCollection.from_yaml(
"""
title: Process Creation Test
status: test
logsource:
category: process_creation
product: windows
detection:
sel:
CommandLine: "test.exe foo bar"
Image|endswith: "C:\\\\Windows\\\\System32\\\\cmd.exe"
condition: sel
"""
)

@pytest.fixture
def process_creation_sigma_rule_unsupported_field(field):
Expand Down Expand Up @@ -506,6 +556,29 @@ def test_crowdstrike_falcon_pipeline_replace_disk_name_colon(
== "event_platform=/^Win$/i #event_simpleName=/^ProcessRollup2$/i or #event_simpleName=/^SyntheticProcessRollup2$/i CommandLine=/^test\\.exe foo bar$/i ImageFileName=/\\\\Windows\\\\System32\\\\cmd\\.exe$/i"
)

def test_crowdstrike_falcon_pipeline_replace_disk_name_contains(
resolver: ProcessingPipelineResolver, process_creation_sigma_rule_disk_name_contains
):
assert (
convert_falcon(process_creation_sigma_rule_disk_name_contains, resolver)
== "event_platform=/^Win$/i #event_simpleName=/^ProcessRollup2$/i or #event_simpleName=/^SyntheticProcessRollup2$/i CommandLine=/^test\\.exe foo bar$/i ImageFileName=/\\\\Device\\\\HarddiskVolume.\\\\Windows\\\\System32\\\\cmd\\.exe/i"
)

def test_crowdstrike_falcon_pipeline_replace_disk_name_startswith(
resolver: ProcessingPipelineResolver, process_creation_sigma_rule_disk_name_startswith
):
assert (
convert_falcon(process_creation_sigma_rule_disk_name_startswith, resolver)
== "event_platform=/^Win$/i #event_simpleName=/^ProcessRollup2$/i or #event_simpleName=/^SyntheticProcessRollup2$/i CommandLine=/^test\\.exe foo bar$/i ImageFileName=/^\\\\Device\\\\HarddiskVolume.\\\\Windows\\\\System32\\\\cmd\\.exe/i"
)

def test_crowdstrike_falcon_pipeline_replace_disk_name_endswith(
resolver: ProcessingPipelineResolver, process_creation_sigma_rule_disk_name_endswith
):
assert (
convert_falcon(process_creation_sigma_rule_disk_name_endswith, resolver)
== "event_platform=/^Win$/i #event_simpleName=/^ProcessRollup2$/i or #event_simpleName=/^SyntheticProcessRollup2$/i CommandLine=/^test\\.exe foo bar$/i ImageFileName=/\\\\Device\\\\HarddiskVolume.\\\\Windows\\\\System32\\\\cmd\\.exe$/i"
)

@pytest.mark.parametrize("field", unsupported_process_creation_fields)
def test_crowdstrike_falcon_pipeline_unsupported_field(
Expand Down

0 comments on commit 1fdd29d

Please sign in to comment.