Skip to content

Commit

Permalink
Fixing signal dedup and snooze conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
kevgliss committed Dec 15, 2023
1 parent ba4b53c commit f0d45f8
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
],
"[python]": {
"editor.codeActionsOnSave": {
"source.organizeImports": false
"source.organizeImports": "never"
}
},
}
161 changes: 89 additions & 72 deletions src/dispatch/signal/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,88 @@ def update_instance(
return signal_instance


def filter_snooze(*, db_session: Session, signal_instance: SignalInstance) -> bool:
for f in signal_instance.signal.filters:
if f.mode != SignalFilterMode.active:
continue

if f.action != SignalFilterAction.snooze:
continue

if f.expiration.replace(tzinfo=timezone.utc) <= datetime.now(timezone.utc):
continue

query = db_session.query(SignalInstance).filter(
SignalInstance.signal_id == signal_instance.signal_id
)
query = apply_filter_specific_joins(SignalInstance, f.expression, query)
query = apply_filters(query, f.expression)
# an expression is not required for snoozing, if absent we snooze regardless of entity
if f.expression:
instances = query.filter(SignalInstance.id == signal_instance.id).all()

if instances:
signal_instance.filter_action = SignalFilterAction.snooze
break
else:
signal_instance.filter_action = SignalFilterAction.snooze
break

return signal_instance


def filter_dedup(*, db_session: Session, signal_instance: SignalInstance) -> bool:
for f in signal_instance.signal.filters:
if f.mode != SignalFilterMode.active:
continue

if f.action != SignalFilterAction.deduplicate:
continue

query = db_session.query(SignalInstance).filter(
SignalInstance.signal_id == signal_instance.signal_id
)
query = apply_filter_specific_joins(SignalInstance, f.expression, query)
query = apply_filters(query, f.expression)

window = datetime.now(timezone.utc) - timedelta(minutes=f.window)
query = query.filter(SignalInstance.created_at >= window)
query = query.join(SignalInstance.entities).filter(
Entity.id.in_([e.id for e in signal_instance.entities])
)
query = query.filter(SignalInstance.id != signal_instance.id)

# get the earliest instance
query = query.order_by(asc(SignalInstance.created_at))
instances = query.all()

if instances:
# associate with existing case
signal_instance.case_id = instances[0].case_id
signal_instance.filter_action = SignalFilterAction.deduplicate
break
# apply default deduplication rule
else:
default_dedup_window = datetime.now(timezone.utc) - timedelta(hours=1)
instance = (
db_session.query(SignalInstance)
.filter(
SignalInstance.signal_id == signal_instance.signal_id,
SignalInstance.created_at >= default_dedup_window,
SignalInstance.id != signal_instance.id,
SignalInstance.case_id.isnot(None), # noqa
)
.with_entities(SignalInstance.case_id)
.order_by(desc(SignalInstance.created_at))
.first()
)
if instance:
signal_instance.case_id = instance.case_id
signal_instance.filter_action = SignalFilterAction.deduplicate

return signal_instance


def filter_signal(*, db_session: Session, signal_instance: SignalInstance) -> bool:
"""
Apply filter actions to the signal instance.
Expand All @@ -585,83 +667,18 @@ def filter_signal(*, db_session: Session, signal_instance: SignalInstance) -> bo
Returns:
bool: True if the signal instance is filtered, False otherwise.
"""

filtered = False
for f in signal_instance.signal.filters:
if f.mode != SignalFilterMode.active:
continue

query = db_session.query(SignalInstance).filter(
SignalInstance.signal_id == signal_instance.signal_id
)
query = apply_filter_specific_joins(SignalInstance, f.expression, query)
query = apply_filters(query, f.expression)

# order matters, check for snooze before deduplication
# we check to see if the current instances match's it's signals snooze filter
if f.action == SignalFilterAction.snooze:
if f.expiration.replace(tzinfo=timezone.utc) <= datetime.now(timezone.utc):
continue

# an expression is not required for snoozing, if absent we snooze regardless of entity
if f.expression:
instances = query.filter(SignalInstance.id == signal_instance.id).all()

if instances:
signal_instance.filter_action = SignalFilterAction.snooze
filtered = True
break
else:
signal_instance.filter_action = SignalFilterAction.snooze
filtered = True
break

elif f.action == SignalFilterAction.deduplicate:
window = datetime.now(timezone.utc) - timedelta(minutes=f.window)
query = query.filter(SignalInstance.created_at >= window)
query = query.join(SignalInstance.entities).filter(
Entity.id.in_([e.id for e in signal_instance.entities])
)
query = query.filter(SignalInstance.id != signal_instance.id)

# get the earliest instance
query = query.order_by(asc(SignalInstance.created_at))
instances = query.all()
signal_instance = filter_snooze(db_session=db_session, signal_instance=signal_instance)

if instances:
# associate with existing case
signal_instance.case_id = instances[0].case_id
signal_instance.filter_action = SignalFilterAction.deduplicate
filtered = True
break
else:
# Check if there's a deduplication rule set on the signal
has_dedup_filter = any(
f.action == SignalFilterAction.deduplicate for f in signal_instance.signal.filters
)
# Apply the default deduplication rule if there's no deduplication rule set on the signal
# and the signal instance is not snoozed
if not has_dedup_filter and not filtered:
default_dedup_window = datetime.now(timezone.utc) - timedelta(hours=1)
instance = (
db_session.query(SignalInstance)
.filter(
SignalInstance.signal_id == signal_instance.signal_id,
SignalInstance.created_at >= default_dedup_window,
SignalInstance.id != signal_instance.id,
SignalInstance.case_id.isnot(None), # noqa
)
.with_entities(SignalInstance.case_id)
.order_by(desc(SignalInstance.created_at))
.first()
)
if instance:
signal_instance.case_id = instance.case_id
signal_instance.filter_action = SignalFilterAction.deduplicate
filtered = True
# we only dedupe if we haven't been snoozed
if not signal_instance.filter_action:
signal_instance = filter_dedup(db_session=db_session, signal_instance=signal_instance)

if not filtered:
if not signal_instance.filter_action:
signal_instance.filter_action = SignalFilterAction.none
else:
filtered = True

db_session.commit()
return filtered
61 changes: 61 additions & 0 deletions tests/signal/test_signal_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,67 @@ def test_filter_actions_deduplicate(session, signal, project):
assert signal_instance_2.filter_action == SignalFilterAction.deduplicate


def test_filter_action_with_dedupe_and_snooze(session, signal, project):
from datetime import datetime, timedelta, timezone
from dispatch.signal.models import (
SignalFilter,
SignalInstance,
SignalFilterAction,
)
from dispatch.signal.service import filter_signal
from dispatch.entity_type.models import EntityType
from dispatch.entity.models import Entity

entity_type = EntityType(
name="dedupe1+snooze",
jpath="id",
regular_expression=None,
project=project,
)
session.add(entity_type)

entity = Entity(name="dedupe1+snooze", description="test", value="foo", entity_type=entity_type)
session.add(entity)

# create instance
signal_instance_1 = SignalInstance(
raw=json.dumps({"id": "foo"}), project=project, signal=signal, entities=[entity]
)
session.add(signal_instance_1)

signal_instance_2 = SignalInstance(
raw=json.dumps({"id": "foo"}), project=project, signal=signal, entities=[entity]
)
session.add(signal_instance_2)
session.commit()
# create deduplicate signal filter
signal_filter = SignalFilter(
name="dedupe1",
description="test",
expression=[
{"or": [{"model": "EntityType", "field": "id", "op": "==", "value": entity_type.id}]}
],
action=SignalFilterAction.deduplicate,
window=5,
project=project,
)
signal.filters.append(signal_filter)

signal_filter = SignalFilter(
name="snooze0",
description="test",
expression=[{"or": [{"model": "Entity", "field": "id", "op": "==", "value": entity.id}]}],
action=SignalFilterAction.snooze,
expiration=datetime.now(tz=timezone.utc) + timedelta(minutes=5),
project=project,
)
signal.filters.append(signal_filter)

session.commit()
assert filter_signal(db_session=session, signal_instance=signal_instance_2)
assert signal_instance_2.filter_action == SignalFilterAction.snooze


def test_filter_actions_snooze(session, entity, signal, project):
from datetime import datetime, timedelta, timezone
from dispatch.signal.models import (
Expand Down

0 comments on commit f0d45f8

Please sign in to comment.