Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: eap EndpointTraceItemTable spare aggregate attribute transformer #6817

Merged
merged 6 commits into from
Jan 28, 2025

Conversation

kylemumma
Copy link
Member

@kylemumma kylemumma commented Jan 24, 2025

this pr addresses this ticket for EndpointTraceItemTable. https://github.com/getsentry/eap-planning/issues/158

It makes it so when we have a query like

SELECT sum(wing.count)
...
GROUP BY animal_type

it will add

SELECT sum(wing.count)
WHERE hasAttribute(wing.count)
...
GROUP BY animal_type

Major change

The major change in this PR is the addition of SparseAggregateAttributeTransformer this is a class responsible for transforming the original TraceItemTableRequest into the new one with the additional filter. This query transformation step is then added as a step in the resolver that all requests go through.

I think it should be sufficient to only do EndpointTraceItemTable and not EndpointTimeSeries since timeseries wont graph the 0 data but ill ask tony to make sure. If timeseries is needed it should only be 1-2 days of extra work.

@kylemumma kylemumma requested review from a team as code owners January 24, 2025 18:40
@@ -2101,6 +2136,94 @@ def test_bad_aggregation_filter(self, setup_teardown: Any) -> None:
with pytest.raises(BadSnubaRPCRequestException):
EndpointTraceItemTable().execute(message)

def test_sparse_aggregate(self, setup_teardown: Any) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test ensures desired behavior as described in this ticket https://github.com/getsentry/eap-planning/issues/158
that there arent the 0 rows

@@ -131,6 +131,41 @@ def gen_message(
}


def write_eap_span(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is unrelated to the PR, I added it to make gen_message easier to use in my tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are unit tests for the transformer

Copy link

codecov bot commented Jan 24, 2025

❌ 2 Tests Failed:

Tests completed Failed Passed Skipped
2865 2 2863 15
View the top 2 failed tests by shortest run time
tests.clickhouse.optimize.test_optimize_scheduler::test_get_next_schedule_raises_exception
Stack Traces | 0.001s run time
Traceback (most recent call last):
  File ".../clickhouse/optimize/test_optimize_scheduler.py", line 242, in test_get_next_schedule_raises_exception
    with pytest.raises(OptimizedSchedulerTimeout):
  File ".../local/lib/python3.11.../site-packages/_pytest/python_api.py", line 970, in __exit__
    fail(self.message)
  File ".../local/lib/python3.11.../site-packages/_pytest/outcomes.py", line 196, in fail
    raise Failed(msg=reason, pytrace=pytrace)
Failed: DID NOT RAISE <class 'snuba.clickhouse.optimize.optimize_scheduler.OptimizedSchedulerTimeout'>
tests.datasets.test_errors_replacer.TestReplacer::test_process_offset_twice
Stack Traces | 0.297s run time
Traceback (most recent call last):
  File ".../tests/datasets/test_errors_replacer.py", line 370, in test_process_offset_twice
    assert self.replacer.process_message(message) is None
AssertionError: assert (ReplacementMessageMetadata(partition_index=1, offset=42, consumer_group='consumer_group'), UnmergeGroupsReplacement(state_name=<ReplacerState.ERRORS: 'errors'>, timestamp=datetime.datetime(2025, 1, 25, 0, 25, 3, 226182), hashes=['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], all_columns=[FlattenedColumn(None, 'project_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'event_id', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'platform', schemas.String(modifiers=None)), FlattenedColumn(None, 'environment', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'release', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'dist', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v4', schemas.IPv4(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v6', schemas.IPv6(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user', schemas.String(modifiers=None)), FlattenedColumn(None, 'user_id', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_email', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_method', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_referer', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn('tags', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('tags', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('flags', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('flags', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'transaction_name', schemas.String(modifiers=None)), FlattenedColumn(None, 'span_id', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'trace_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'partition', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'offset', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'message_timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'retention_days', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'deleted', schemas.UInt(8, modifiers=None)), FlattenedColumn(None, 'group_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'primary_hash', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'received', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'message', schemas.String(modifiers=None)), FlattenedColumn(None, 'title', schemas.String(modifiers=None)), FlattenedColumn(None, 'culprit', schemas.String(modifiers=None)), FlattenedColumn(None, 'level', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'location', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'type', schemas.String(modifiers=None)), FlattenedColumn('exception_stacks', 'type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'value', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_handled', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'abs_path', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'colno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'filename', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'function', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'lineno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'in_app', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'package', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'module', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'stack_level', schemas.Array(schemas.UInt(16, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn(None, 'exception_main_thread', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_integrations', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'name', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'version', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'trace_sampled', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'num_processing_errors', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'replay_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False)))], project_id=1, previous_group_id=1, new_group_id=2)) is None
 +  where (ReplacementMessageMetadata(partition_index=1, offset=42, consumer_group='consumer_group'), UnmergeGroupsReplacement(state_name=<ReplacerState.ERRORS: 'errors'>, timestamp=datetime.datetime(2025, 1, 25, 0, 25, 3, 226182), hashes=['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], all_columns=[FlattenedColumn(None, 'project_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'event_id', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'platform', schemas.String(modifiers=None)), FlattenedColumn(None, 'environment', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'release', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'dist', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v4', schemas.IPv4(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v6', schemas.IPv6(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user', schemas.String(modifiers=None)), FlattenedColumn(None, 'user_id', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_email', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_method', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_referer', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn('tags', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('tags', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('flags', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('flags', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'transaction_name', schemas.String(modifiers=None)), FlattenedColumn(None, 'span_id', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'trace_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'partition', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'offset', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'message_timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'retention_days', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'deleted', schemas.UInt(8, modifiers=None)), FlattenedColumn(None, 'group_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'primary_hash', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'received', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'message', schemas.String(modifiers=None)), FlattenedColumn(None, 'title', schemas.String(modifiers=None)), FlattenedColumn(None, 'culprit', schemas.String(modifiers=None)), FlattenedColumn(None, 'level', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'location', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'type', schemas.String(modifiers=None)), FlattenedColumn('exception_stacks', 'type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'value', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_handled', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'abs_path', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'colno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'filename', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'function', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'lineno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'in_app', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'package', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'module', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'stack_level', schemas.Array(schemas.UInt(16, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn(None, 'exception_main_thread', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_integrations', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'name', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'version', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'trace_sampled', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'num_processing_errors', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'replay_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False)))], project_id=1, previous_group_id=1, new_group_id=2)) = <bound method ReplacerWorker.process_message of <snuba.replacer.ReplacerWorker object at 0x7f414f1a2310>>(Message({Partition(topic=Topic(name='replacements'), index=1): 43}))
 +    where <bound method ReplacerWorker.process_message of <snuba.replacer.ReplacerWorker object at 0x7f414f1a2310>> = <snuba.replacer.ReplacerWorker object at 0x7f414f1a2310>.process_message
 +      where <snuba.replacer.ReplacerWorker object at 0x7f414f1a2310> = <tests.datasets.test_errors_replacer.TestReplacer object at 0x7f415891a210>.replacer

To view more test analytics, go to the Test Analytics Dashboard
📢 Thoughts on this report? Let us know!

Copy link
Contributor

@davidtsuk davidtsuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the following suggestion:
When we do aggregations we already transform them into aggregate_functionIf which has the needed conditions. The problem is that the If combinator in clickhouse returns the default value (0) if no rows match the condition. One way to address this is to add the orNull combinator to the aggregation so that it returns Null when no rows match the condition. We could then decide what to do with the null values depending on the behaviour we want. This would also be the first part of addressing https://github.com/getsentry/eap-planning/issues/169.

This would also ensure we don't add additional overhead from copying the request and adding conditions that we already add to the aggregations.

@onkar
Copy link
Member

onkar commented Jan 24, 2025

One way to address this is to add the orNull combinator to the aggregation so that it returns Null when no rows match the condition. We could then decide what to do with the null values depending on the behaviour we want. This would also be the first part of addressing getsentry/eap-planning#169.

@davidtsuk I don't think I follow it. https://github.com/getsentry/eap-planning/issues/169 requires us to distinguish between empty and null, whereas you seem to suggest that we should return null when no rows match the condition (i.e. empty result). Isn't that contradictory to what's expected in the ticket?

@davidtsuk
Copy link
Contributor

davidtsuk commented Jan 24, 2025

@davidtsuk I don't think I follow it. getsentry/eap-planning#169 requires us to distinguish between empty and null, whereas you seem to suggest that we should return null when no rows match the condition (i.e. empty result). Isn't that contradictory to what's expected in the ticket?

@onkar Our protobuf definition currently doesn't allow us to distinguish between a null result and a default result (i.e. empty). The point I was trying to make is that we should distinguish between them because a default value (e.g. 0) can actually represent a result (e.g. if you add -2 + 2) so we cannot use empty/default values to represent the case where no rows were aggregated.

kylemumma added a commit that referenced this pull request Jan 27, 2025
if you have the following filter in your `TraceItemTableRequest`
`exists(sentry.duration_ms)`
it will get translated to `isNotNull(sentry.duration_ms)`
which causes a clickhouse error bc that column doesnt actually exists,
everywhere else is is translated to `duration_ms AS sentry.duration_ms`

so this PR fixes that bug by translating these special columns properly
by using the `attribute_key_to_expression` function that is used
everywhere else.

honestly I dont know many internal details of this function so im
slightly worried that there will be edge cases that could break things.
but based on the name of the function and how i see it used elsewhere in
the codebase, I will assume that if the tests pass with this change then
its fine.

this bug is blocking #6817
assert measurement_reliabilities == [
Reliability.RELIABILITY_LOW,
Reliability.RELIABILITY_UNSPECIFIED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have these changed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you look to the top of the test, the messages with bar have no custom_measurement https://github.com/getsentry/snuba/pull/6817/files/cb324ebce1e4392d60bc65138a8ae15d0af652b9#diff-440653fbc080c88c781f59384bcd3722c93ec37540b50595da42233e67449c1aR478-R481

it used to be the case that those rows would still be included in the results, but due to the new query processor that adds the hasAttribute(custom_measurement) filter, they are no longer part of the result

@kylemumma kylemumma merged commit 5742336 into master Jan 28, 2025
31 checks passed
@kylemumma kylemumma deleted the krm/sparseaggatt branch January 28, 2025 18:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants