Skip to content

Commit

Permalink
update add_field_to signature
Browse files Browse the repository at this point in the history
- Merges target and content arguments into a single field argument.
  • Loading branch information
dtrai2 committed Nov 12, 2024
1 parent f4648c8 commit 905e53e
Show file tree
Hide file tree
Showing 24 changed files with 165 additions and 156 deletions.
31 changes: 21 additions & 10 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from logprep.abc.exceptions import LogprepException
from logprep.metrics.metrics import Metric
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.util.helper import add_field_to, get_dotted_field_value
from logprep.util.helper import add_batch_to, add_field_to, get_dotted_field_value
from logprep.util.time import UTC, TimeParser
from logprep.util.validators import dict_structure_validator

Expand Down Expand Up @@ -304,13 +304,19 @@ def _add_env_enrichment_to_event(self, event: dict):
enrichments = self._config.preprocessing.get("enrich_by_env_variables")
if not enrichments:
return
for target_field, variable_name in enrichments.items():
add_field_to(event, target_field, os.environ.get(variable_name, ""))
fields = {
target: os.environ.get(variable_name, "")
for target, variable_name in enrichments.items()
}
add_batch_to(event, fields)

def _add_arrival_time_information_to_event(self, event: dict):
now = TimeParser.now()
target_field = self._config.preprocessing.get("log_arrival_time_target_field")
add_field_to(event, target_field, now.isoformat())
new_field = {
self._config.preprocessing.get(
"log_arrival_time_target_field"
): TimeParser.now().isoformat()
}
add_field_to(event, new_field)

def _add_arrival_timedelta_information_to_event(self, event: dict):
log_arrival_timedelta_config = self._config.preprocessing.get("log_arrival_timedelta")
Expand All @@ -326,13 +332,13 @@ def _add_arrival_timedelta_information_to_event(self, event: dict):
TimeParser.from_string(log_arrival_time).astimezone(UTC)
- TimeParser.from_string(time_reference).astimezone(UTC)
).total_seconds()
add_field_to(event, target_field, delta_time_sec)
add_field_to(event, field={target_field: delta_time_sec})

def _add_version_information_to_event(self, event: dict):
"""Add the version information to the event"""
target_field = self._config.preprocessing.get("version_info_target_field")
# pylint: disable=protected-access
add_field_to(event, target_field, self._config._version_information)
add_field_to(event, field={target_field: self._config._version_information})
# pylint: enable=protected-access

def _add_hmac_to(self, event_dict, raw_event) -> dict:
Expand Down Expand Up @@ -385,6 +391,11 @@ def _add_hmac_to(self, event_dict, raw_event) -> dict:
digestmod=hashlib.sha256,
).hexdigest()
compressed = zlib.compress(received_orig_message, level=-1)
hmac_output = {"hmac": hmac, "compressed_base64": base64.b64encode(compressed).decode()}
add_field_to(event_dict, hmac_options.get("output_field"), hmac_output)
new_field = {
hmac_options.get("output_field"): {
"hmac": hmac,
"compressed_base64": base64.b64encode(compressed).decode(),
}
}
add_field_to(event_dict, new_field)
return event_dict
11 changes: 6 additions & 5 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,15 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None):
if failure_tags is None:
failure_tags = rule.failure_tags
if tags is None:
add_and_overwrite(event, "tags", sorted(list({*failure_tags})))
new_field = {"tags": sorted(list({*failure_tags}))}
else:
add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags})))
new_field = {"tags": sorted(list({*tags, *failure_tags}))}
add_and_overwrite(event, new_field)
if isinstance(error, ProcessingWarning):
if error.tags:
tags = tags if tags else []
add_and_overwrite(event, "tags", sorted(list({*error.tags, *tags, *failure_tags})))
new_field = {"tags": sorted(list({*error.tags, *tags, *failure_tags}))}
add_and_overwrite(event, new_field)
self.result.warnings.append(error)
else:
self.result.warnings.append(ProcessingWarning(str(error), event, rule))
Expand All @@ -382,8 +384,7 @@ def _has_missing_values(self, event, rule, source_field_dict):
def _write_target_field(self, event: dict, rule: "Rule", result: any) -> None:
add_field_to(
event,
target_field=rule.target_field,
content=result,
field={rule.target_field: result},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand Down
6 changes: 3 additions & 3 deletions logprep/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ def inner(self, *args, **kwargs): # nosemgrep
if hasattr(self, "rule_type"):
event = args[0]
if event:
add_field_to(event, f"processing_times.{self.rule_type}", duration)
add_field_to(event, field={f"processing_times.{self.rule_type}": duration})
if hasattr(self, "_logprep_config"): # attribute of the Pipeline class
event = args[0]
if event:
add_field_to(event, "processing_times.pipeline", duration)
add_field_to(event, "processing_times.hostname", gethostname())
add_field_to(event, field={"processing_times.pipeline": duration})
add_field_to(event, field={"processing_times.hostname": gethostname()})
return result

return inner
Expand Down
3 changes: 1 addition & 2 deletions logprep/processor/clusterer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ def _cluster(self, event: dict, rule: ClustererRule):
cluster_signature = cluster_signature_based_on_message
add_field_to(
event,
self._config.output_field_name,
cluster_signature,
field={self._config.output_field_name: cluster_signature},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand Down
9 changes: 5 additions & 4 deletions logprep/processor/dissector/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ def _apply_rules(self, event, rule):

def _apply_mapping(self, event, rule):
action_mappings_sorted_by_position = sorted(
self._get_mappings(event, rule), key=lambda x: x[5]
self._get_mappings(event, rule), key=lambda x: x[-1]
)
for action, *args, _ in action_mappings_sorted_by_position:
action(*args)

def _get_mappings(self, event, rule) -> List[Tuple[Callable, dict, str, str, str, int]]:
def _get_mappings(self, event, rule) -> List[Tuple[Callable, dict, dict, str, int]]:
current_field = None
target_field_mapping = {}
for rule_action in rule.actions:
Expand Down Expand Up @@ -84,12 +84,13 @@ def _get_mappings(self, event, rule) -> List[Tuple[Callable, dict, str, str, str
target_field = target_field_mapping.get(target_field.lstrip("&"))
if strip_char:
content = content.strip(strip_char)
yield rule_action, event, target_field, content, separator, position
field = {target_field: content}
yield rule_action, event, field, separator, position

def _apply_convert_datatype(self, event, rule):
for target_field, converter in rule.convert_actions:
try:
target_value = converter(get_dotted_field_value(event, target_field))
add_field_to(event, target_field, target_value, overwrite_target_field=True)
add_field_to(event, {target_field: target_value}, overwrite_target_field=True)
except ValueError as error:
self._handle_warning_error(event, rule, error)
17 changes: 8 additions & 9 deletions logprep/processor/domain_label_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,20 @@ def _apply_rules(self, event, rule: DomainLabelExtractorRule):

if self._is_valid_ip(domain):
tagging_field.append(f"ip_in_{rule.source_fields[0].replace('.', '_')}")
add_and_overwrite(event, self._config.tagging_field_name, tagging_field)
add_and_overwrite(event, field={self._config.tagging_field_name: tagging_field})
return

labels = self._tld_extractor(domain)
if labels.suffix != "":
targets = [
f"{rule.target_field}.registered_domain",
f"{rule.target_field}.top_level_domain",
f"{rule.target_field}.subdomain",
]
contents = [f"{labels.domain}.{labels.suffix}", labels.suffix, labels.subdomain]
add_batch_to(event, targets, contents, overwrite_target_field=rule.overwrite_target)
fields = {
f"{rule.target_field}.registered_domain": f"{labels.domain}.{labels.suffix}",
f"{rule.target_field}.top_level_domain": labels.suffix,
f"{rule.target_field}.subdomain": labels.subdomain,
}
add_batch_to(event, fields, overwrite_target_field=rule.overwrite_target)
else:
tagging_field.append(f"invalid_domain_in_{rule.source_fields[0].replace('.', '_')}")
add_and_overwrite(event, self._config.tagging_field_name, tagging_field)
add_and_overwrite(event, field={self._config.tagging_field_name: tagging_field})

@staticmethod
def _is_valid_ip(domain):
Expand Down
8 changes: 5 additions & 3 deletions logprep/processor/domain_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ def _resolve_ip(self, domain, hash_string=None):

def _store_debug_infos(self, event, requires_storing):
event_dbg = {
"obtained_from_cache": not requires_storing,
"cache_size": len(self._domain_ip_map.keys()),
"resolved_ip_debug": {
"obtained_from_cache": not requires_storing,
"cache_size": len(self._domain_ip_map.keys()),
}
}
add_field_to(event, "resolved_ip_debug", event_dbg, overwrite_target_field=True)
add_field_to(event, event_dbg, overwrite_target_field=True)
21 changes: 11 additions & 10 deletions logprep/processor/field_manager/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def _apply_mapping(self, event, rule, rule_args):
if not any(source_field_values):
return
source_field_values, targets = self._filter_missing_fields(source_field_values, targets)
add_batch_to(event, targets, source_field_values, extend_target_list, overwrite_target)
add_batch_to(
event, dict(zip(targets, source_field_values)), extend_target_list, overwrite_target
)
if rule.delete_source_fields:
for dotted_field in source_fields:
pop_dotted_field_value(event, dotted_field)
Expand All @@ -104,7 +106,7 @@ def _write_to_single_target(self, args, extend_target_list, overwrite_target, ru
case State(
extend=True, overwrite=True, single_source_element=False, target_is_list=False
):
add_and_overwrite(event, target_field, source_fields_values)
add_and_overwrite(event, field={target_field: source_fields_values})
return

case State(
Expand All @@ -116,36 +118,35 @@ def _write_to_single_target(self, args, extend_target_list, overwrite_target, ru
):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*flattened_source_fields]
add_and_overwrite(event, target_field, source_fields_values)
add_and_overwrite(event, field={target_field: source_fields_values})
return

case State(extend=True, overwrite=False, target_is_list=False, target_is_none=True):
add_and_overwrite(event, target_field, source_fields_values)
add_and_overwrite(event, field={target_field: source_fields_values})
return

case State(extend=True, overwrite=False, target_is_list=False):
source_fields_values = [target_field_value, *source_fields_values]
add_and_overwrite(event, target_field, source_fields_values)
add_and_overwrite(event, field={target_field: source_fields_values})
return

case State(
extend=True, overwrite=False, single_source_element=False, target_is_list=True
):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*target_field_value, *flattened_source_fields]
add_and_overwrite(event, target_field, source_fields_values)
add_and_overwrite(event, field={target_field: source_fields_values})
return

case State(overwrite=True, extend=True):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*flattened_source_fields]
add_and_overwrite(event, target_field, source_fields_values)
add_and_overwrite(event, field={target_field: source_fields_values})
return

case _:
add_field_to(
event, target_field, source_fields_values, state.extend, state.overwrite
)
field = {target_field: source_fields_values}
add_field_to(event, field, state.extend, state.overwrite)

def _overwrite_from_source_values(self, source_fields_values):
duplicates = []
Expand Down
33 changes: 15 additions & 18 deletions logprep/processor/generic_adder/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from logprep.factory_error import InvalidConfigurationError
from logprep.processor.generic_adder.mysql_connector import MySQLConnector
from logprep.processor.generic_adder.rule import GenericAdderRule
from logprep.util.helper import get_dotted_field_value, add_batch_to
from logprep.util.helper import add_batch_to, get_dotted_field_value


def sql_config_validator(_, attribute, value):
Expand Down Expand Up @@ -224,35 +224,32 @@ def _apply_rules(self, event: dict, rule: GenericAdderRule):
FieldExistsWarning
Raises if an addition would overwrite an existing field or value.
"""
items_to_add = rule.add
use_db = rule.db_target and self._db_table
if use_db:
self._update_db_table()
items_to_add = self._get_items_to_add_from_db(event, rule)
else:
items_to_add = rule.add.items()

if items_to_add:
targets, contents = zip(*items_to_add)
add_batch_to(event, targets, contents, rule.extend_target_list, rule.overwrite_target)
add_batch_to(event, items_to_add, rule.extend_target_list, rule.overwrite_target)

def _get_items_to_add_from_db(self, event: dict, rule: GenericAdderRule) -> list:
def _get_items_to_add_from_db(self, event: dict, rule: GenericAdderRule) -> dict | None:
"""Get the sub part of the value from the event using a regex pattern"""
items_to_add = []
if not rule.db_pattern:
return items_to_add

return
value_to_check_in_db = get_dotted_field_value(event, rule.db_target)
match_with_value_in_db = rule.db_pattern.match(value_to_check_in_db)
if match_with_value_in_db:
# Get values to add from db table using the sub part
value_to_map = match_with_value_in_db.group(1).upper()
add_from_db = self._db_table.get(value_to_map, [])

if rule.db_destination_prefix:
for idx, _ in enumerate(add_from_db):
if not add_from_db[idx][0].startswith(rule.db_destination_prefix):
add_from_db[idx][0] = f"{rule.db_destination_prefix}.{add_from_db[idx][0]}"

for item in add_from_db:
items_to_add.append(item)
return items_to_add
add_from_db = [
(self._add_prefix_if_not_present(key, rule), value)
for key, value in add_from_db
]
return dict(add_from_db)

def _add_prefix_if_not_present(self, key: str, rule: "GenericAdderRule") -> str:
if not key.startswith(rule.db_destination_prefix):
return f"{rule.db_destination_prefix}.{key}"
return key
3 changes: 1 addition & 2 deletions logprep/processor/generic_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ def _apply_rules(self, event, rule):
try:
add_field_to(
event,
target_field,
content,
field={target_field: content},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand Down
13 changes: 5 additions & 8 deletions logprep/processor/geoip_enricher/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,13 @@ def _apply_rules(self, event, rule):
geoip_data = self._try_getting_geoip_data(ip_string)
if not geoip_data:
return
filtered_geoip_data = {k: v for k, v in geoip_data.items() if v is not None}
targets, contents = zip(*filtered_geoip_data.items())
targets = [
rule.customize_target_subfields.get(target, f"{rule.target_field}.{target}")
for target in targets
]
fields = {
rule.customize_target_subfields.get(target, f"{rule.target_field}.{target}"): value
for target, value in geoip_data.items()
}
add_batch_to(
event,
targets,
contents,
fields,
extends_lists=False,
overwrite_target_field=rule.overwrite_target,
)
5 changes: 1 addition & 4 deletions logprep/processor/grokker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,9 @@ def _apply_rules(self, event: dict, rule: GrokkerRule):
if result is None or result == {}:
continue
matches.append(True)
filtered_items = {k: v for k, v in result.items() if v is not None}
targets, contents = zip(*filtered_items.items())
add_batch_to(
event,
targets,
contents,
result,
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand Down
3 changes: 1 addition & 2 deletions logprep/processor/hyperscan_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ def _apply_rules(self, event: dict, rule: HyperscanResolverRule):
try:
add_field_to(
event,
resolve_target,
dest_val,
field={resolve_target: dest_val},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand Down
12 changes: 7 additions & 5 deletions logprep/processor/labeler/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ def setup(self):

def _apply_rules(self, event, rule):
"""Applies the rule to the current event"""
targets = [f"label.{key}" for key in rule.label.keys()]
contents = rule.label.values()
add_batch_to(event, targets, contents, extends_lists=True)
fields = {key: value for key, value in rule.prefixed_label.items()}
add_batch_to(event, fields, extends_lists=True)
# convert sets into sorted lists
contents = [sorted(set(get_dotted_field_value(event, target))) for target in targets]
add_batch_to(event, targets, contents, overwrite_target_field=True)
fields = {
key: sorted(set(get_dotted_field_value(event, key)))
for key, _ in rule.prefixed_label.items()
}
add_batch_to(event, fields, overwrite_target_field=True)
4 changes: 4 additions & 0 deletions logprep/processor/labeler/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ def label(self) -> dict:

# pylint: enable=C0111

@property
def prefixed_label(self) -> dict:
return {f"label.{key}": value for key, value in self.label.items()}

def conforms_to_schema(self, schema: LabelingSchema) -> bool:
"""Check if labels are valid."""
return schema.validate_labels(self._config.label)
Expand Down
Loading

0 comments on commit 905e53e

Please sign in to comment.