diff --git a/src/sentry/relay/config/metric_extraction.py b/src/sentry/relay/config/metric_extraction.py index d086b2f603f344..76d09b4db75f25 100644 --- a/src/sentry/relay/config/metric_extraction.py +++ b/src/sentry/relay/config/metric_extraction.py @@ -35,6 +35,7 @@ from sentry.search.events.types import ParamsType, QueryBuilderConfig from sentry.snuba.dataset import Dataset from sentry.snuba.metrics.extraction import ( + WIDGET_QUERY_CACHE_MAX_CHUNKS, MetricSpec, MetricSpecType, OnDemandMetricSpec, @@ -204,20 +205,39 @@ def _get_alert_metric_specs( return specs -def _bulk_cache_query_key(project: Project) -> str: - return f"on-demand.bulk-query-cache.{project.organization.id}" +def _bulk_cache_query_key(project: Project, chunk: int) -> str: + return f"on-demand.bulk-query-cache.{chunk}.{project.organization.id}" -def _get_bulk_cached_query(project: Project) -> dict[str, Any]: - query_bulk_cache_key = _bulk_cache_query_key(project) - cache_result = cache.get(query_bulk_cache_key, None) - sentry_sdk.set_tag("on_demand_metrics.query_cache", cache_result is None) - return cache_result +def _get_bulk_cached_query(project: Project) -> tuple[dict[int, dict[str, bool]], list[int]]: + cache_result = {} + cold_cache_chunks = [] + for i in range(WIDGET_QUERY_CACHE_MAX_CHUNKS): + query_bulk_cache_key = _bulk_cache_query_key(project, i) + chunk_result = cache.get(query_bulk_cache_key, None) + if chunk_result is None: + cold_cache_chunks.append(i) + sentry_sdk.set_tag(f"on_demand_metrics.query_cache.{i}", chunk_result is None) + cache_result[i] = chunk_result or {} + sentry_sdk.set_extra("cold_cache_chunks", cold_cache_chunks) + metrics.incr("on_demand_metrics.query_cache_cold_keys", amount=len(cold_cache_chunks)) + return cache_result, cold_cache_chunks -def _set_bulk_cached_query(project: Project, query_cache: dict[str, Any]) -> None: - query_bulk_cache_key = _bulk_cache_query_key(project) - cache.set(query_bulk_cache_key, query_cache, timeout=5400) +def _set_bulk_cached_query_chunk( + project: Project, chunk_cache: dict[str, bool], chunk: int +) -> None: + query_bulk_cache_key = _bulk_cache_query_key(project, chunk) + cache.set( + query_bulk_cache_key, chunk_cache, timeout=900 + (137 * chunk) + ) # Add prime number jitter per cache. All cache turns over between 15-25 mins + + +def _set_bulk_cached_query( + project: Project, query_cache: dict[int, dict[str, bool]], cold_cache_chunks: list[int] +) -> None: + for i in cold_cache_chunks: + _set_bulk_cached_query_chunk(project, query_cache[i], i) @metrics.wraps("on_demand_metrics._get_widget_metric_specs") @@ -247,9 +267,7 @@ def _get_widget_metric_specs( "on_demand_metrics.widgets_to_process", amount=len(widget_queries), sample_rate=1.0 ) - organization_bulk_query_cache = _get_bulk_cached_query(project) - save_organization_bulk_cache = not bool(organization_bulk_query_cache) - organization_bulk_query_cache = {} + organization_bulk_query_cache, cold_bulk_cache_chunks = _get_bulk_cached_query(project) ignored_widget_ids: dict[int, bool] = {} specs_for_widget: dict[int, list[HashedMetricSpec]] = defaultdict(list) @@ -309,8 +327,7 @@ def _get_widget_metric_specs( _update_state_with_spec_limit(trimmed_specs, widget_query_for_spec_hash) metrics.incr("on_demand_metrics.widget_query_specs", amount=len(specs)) if in_random_rollout("on_demand_metrics.cache_should_use_on_demand"): - if save_organization_bulk_cache: - _set_bulk_cached_query(project, organization_bulk_query_cache) + _set_bulk_cached_query(project, organization_bulk_query_cache, cold_bulk_cache_chunks) return specs @@ -439,7 +456,7 @@ def convert_widget_query_to_metric( project: Project, widget_query: DashboardWidgetQuery, prefilling: bool, - organization_bulk_query_cache: dict[str, Any] | None = None, + organization_bulk_query_cache: dict[int, dict[str, bool]] | None = None, ) -> list[HashedMetricSpec]: """ Converts a passed metrics widget query to one or more MetricSpecs. @@ -467,7 +484,7 @@ def _generate_metric_specs( project: Project, prefilling: bool, groupbys: Sequence[str] | None = None, - organization_bulk_query_cache: dict[str, Any] | None = None, + organization_bulk_query_cache: dict[int, dict[str, bool]] | None = None, ) -> list[HashedMetricSpec]: metrics_specs = [] metrics.incr("on_demand_metrics.before_widget_spec_generation") @@ -739,7 +756,7 @@ def _convert_aggregate_and_query_to_metrics( prefilling: bool, spec_type: MetricSpecType = MetricSpecType.SIMPLE_QUERY, groupbys: Sequence[str] | None = None, - organization_bulk_query_cache: dict[str, Any] | None = None, + organization_bulk_query_cache: dict[int, dict[str, bool]] | None = None, ) -> Sequence[HashedMetricSpec] | None: """ Converts an aggregate and a query to a metric spec with its hash value. diff --git a/src/sentry/snuba/metrics/extraction.py b/src/sentry/snuba/metrics/extraction.py index 3c14460999f4bf..a5d3fda635bb24 100644 --- a/src/sentry/snuba/metrics/extraction.py +++ b/src/sentry/snuba/metrics/extraction.py @@ -59,6 +59,9 @@ "user_misery": SPEC_VERSION_TWO_FLAG, } +# Splits the bulk cache for on-demand resolution into N chunks +WIDGET_QUERY_CACHE_MAX_CHUNKS = 6 + # This helps us control the different spec versions # in order to migrate customers from invalid specs @@ -670,7 +673,7 @@ def should_use_on_demand_metrics( query: str | None, groupbys: Sequence[str] | None = None, prefilling: bool = False, - organization_bulk_query_cache: dict[str, Any] | None = None, + organization_bulk_query_cache: dict[int, dict[str, bool]] | None = None, ) -> bool: if in_random_rollout("on_demand_metrics.cache_should_use_on_demand"): if organization_bulk_query_cache is None: @@ -678,10 +681,14 @@ def should_use_on_demand_metrics( dataset_str = dataset.value if isinstance(dataset, Enum) else str(dataset or "") groupbys_str = ",".join(sorted(groupbys)) if groupbys else "" - local_cache_key = md5_text( + local_cache_md5 = md5_text( f"{dataset_str}-{aggregate}-{query or ''}-{groupbys_str}-prefilling={prefilling}" - ).hexdigest() - cached_result = organization_bulk_query_cache.get(local_cache_key, None) + ) + local_cache_digest_chunk = local_cache_md5.digest()[0] % WIDGET_QUERY_CACHE_MAX_CHUNKS + local_cache_key = local_cache_md5.hexdigest() + cached_result = organization_bulk_query_cache.get(local_cache_digest_chunk, {}).get( + local_cache_key, None + ) if cached_result: metrics.incr("on_demand_metrics.should_use_on_demand_metrics.cache_hit") return cached_result @@ -700,7 +707,7 @@ def should_use_on_demand_metrics( prefilling=prefilling, ) metrics.incr("on_demand_metrics.should_use_on_demand_metrics.cache_miss") - organization_bulk_query_cache[local_cache_key] = result + organization_bulk_query_cache[local_cache_digest_chunk][local_cache_key] = result return result return _should_use_on_demand_metrics( diff --git a/tests/sentry/relay/config/test_metric_extraction.py b/tests/sentry/relay/config/test_metric_extraction.py index 863d5756efb00d..bf1ca7b6a5829b 100644 --- a/tests/sentry/relay/config/test_metric_extraction.py +++ b/tests/sentry/relay/config/test_metric_extraction.py @@ -12,7 +12,7 @@ from sentry.models.transaction_threshold import ProjectTransactionThreshold, TransactionMetric from sentry.relay.config.experimental import TimeChecker from sentry.relay.config.metric_extraction import ( - _set_bulk_cached_query, + _set_bulk_cached_query_chunk, get_current_widget_specs, get_metric_extraction_config, ) @@ -766,24 +766,24 @@ def test_get_metric_extraction_config_alerts_and_widgets_off(default_project: Pr @django_db_all def test_get_metric_extraction_config_uses_cache_for_widgets(default_project: Project) -> None: # widgets should be skipped if the feature is off - original_set_bulk_cached_query = _set_bulk_cached_query + original_set_bulk_cached_query = _set_bulk_cached_query_chunk with ( Feature({ON_DEMAND_METRICS: True, ON_DEMAND_METRICS_WIDGETS: True}), override_options({"on_demand_metrics.cache_should_use_on_demand": 1.0}), mock.patch( - "sentry.relay.config.metric_extraction._set_bulk_cached_query" - ) as mock_set_cache_spy, + "sentry.relay.config.metric_extraction._set_bulk_cached_query_chunk" + ) as mock_set_cache_chunk_spy, ): - mock_set_cache_spy.side_effect = original_set_bulk_cached_query + mock_set_cache_chunk_spy.side_effect = original_set_bulk_cached_query create_widget(["count()"], "transaction.duration:>=1000", default_project) get_metric_extraction_config(TimeChecker(timedelta(seconds=0)), default_project) - assert mock_set_cache_spy.call_count == 1 + assert mock_set_cache_chunk_spy.call_count == 6 # One for each chunk get_metric_extraction_config(TimeChecker(timedelta(seconds=0)), default_project) - assert mock_set_cache_spy.call_count == 1 + assert mock_set_cache_chunk_spy.call_count == 6 @django_db_all