Skip to content

Commit

Permalink
ref(metrics-extraction): Reduce cold cache peak w/ rolling cache (#68727
Browse files Browse the repository at this point in the history
)

### Summary
This splits up the cache into 6 chunks and spreads out the ttl so at any
one extraction only part of the cache is cold.
  • Loading branch information
k-fish authored and c298lee committed Apr 12, 2024
1 parent 1369000 commit 95bfba3
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 30 deletions.
53 changes: 35 additions & 18 deletions src/sentry/relay/config/metric_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from sentry.search.events.types import ParamsType, QueryBuilderConfig
from sentry.snuba.dataset import Dataset
from sentry.snuba.metrics.extraction import (
WIDGET_QUERY_CACHE_MAX_CHUNKS,
MetricSpec,
MetricSpecType,
OnDemandMetricSpec,
Expand Down Expand Up @@ -204,20 +205,39 @@ def _get_alert_metric_specs(
return specs


def _bulk_cache_query_key(project: Project) -> str:
return f"on-demand.bulk-query-cache.{project.organization.id}"
def _bulk_cache_query_key(project: Project, chunk: int) -> str:
return f"on-demand.bulk-query-cache.{chunk}.{project.organization.id}"


def _get_bulk_cached_query(project: Project) -> dict[str, Any]:
query_bulk_cache_key = _bulk_cache_query_key(project)
cache_result = cache.get(query_bulk_cache_key, None)
sentry_sdk.set_tag("on_demand_metrics.query_cache", cache_result is None)
return cache_result
def _get_bulk_cached_query(project: Project) -> tuple[dict[int, dict[str, bool]], list[int]]:
cache_result = {}
cold_cache_chunks = []
for i in range(WIDGET_QUERY_CACHE_MAX_CHUNKS):
query_bulk_cache_key = _bulk_cache_query_key(project, i)
chunk_result = cache.get(query_bulk_cache_key, None)
if chunk_result is None:
cold_cache_chunks.append(i)
sentry_sdk.set_tag(f"on_demand_metrics.query_cache.{i}", chunk_result is None)
cache_result[i] = chunk_result or {}
sentry_sdk.set_extra("cold_cache_chunks", cold_cache_chunks)
metrics.incr("on_demand_metrics.query_cache_cold_keys", amount=len(cold_cache_chunks))
return cache_result, cold_cache_chunks


def _set_bulk_cached_query(project: Project, query_cache: dict[str, Any]) -> None:
query_bulk_cache_key = _bulk_cache_query_key(project)
cache.set(query_bulk_cache_key, query_cache, timeout=5400)
def _set_bulk_cached_query_chunk(
project: Project, chunk_cache: dict[str, bool], chunk: int
) -> None:
query_bulk_cache_key = _bulk_cache_query_key(project, chunk)
cache.set(
query_bulk_cache_key, chunk_cache, timeout=900 + (137 * chunk)
) # Add prime number jitter per cache. All cache turns over between 15-25 mins


def _set_bulk_cached_query(
project: Project, query_cache: dict[int, dict[str, bool]], cold_cache_chunks: list[int]
) -> None:
for i in cold_cache_chunks:
_set_bulk_cached_query_chunk(project, query_cache[i], i)


@metrics.wraps("on_demand_metrics._get_widget_metric_specs")
Expand Down Expand Up @@ -247,9 +267,7 @@ def _get_widget_metric_specs(
"on_demand_metrics.widgets_to_process", amount=len(widget_queries), sample_rate=1.0
)

organization_bulk_query_cache = _get_bulk_cached_query(project)
save_organization_bulk_cache = not bool(organization_bulk_query_cache)
organization_bulk_query_cache = {}
organization_bulk_query_cache, cold_bulk_cache_chunks = _get_bulk_cached_query(project)

ignored_widget_ids: dict[int, bool] = {}
specs_for_widget: dict[int, list[HashedMetricSpec]] = defaultdict(list)
Expand Down Expand Up @@ -309,8 +327,7 @@ def _get_widget_metric_specs(
_update_state_with_spec_limit(trimmed_specs, widget_query_for_spec_hash)
metrics.incr("on_demand_metrics.widget_query_specs", amount=len(specs))
if in_random_rollout("on_demand_metrics.cache_should_use_on_demand"):
if save_organization_bulk_cache:
_set_bulk_cached_query(project, organization_bulk_query_cache)
_set_bulk_cached_query(project, organization_bulk_query_cache, cold_bulk_cache_chunks)
return specs


Expand Down Expand Up @@ -439,7 +456,7 @@ def convert_widget_query_to_metric(
project: Project,
widget_query: DashboardWidgetQuery,
prefilling: bool,
organization_bulk_query_cache: dict[str, Any] | None = None,
organization_bulk_query_cache: dict[int, dict[str, bool]] | None = None,
) -> list[HashedMetricSpec]:
"""
Converts a passed metrics widget query to one or more MetricSpecs.
Expand Down Expand Up @@ -467,7 +484,7 @@ def _generate_metric_specs(
project: Project,
prefilling: bool,
groupbys: Sequence[str] | None = None,
organization_bulk_query_cache: dict[str, Any] | None = None,
organization_bulk_query_cache: dict[int, dict[str, bool]] | None = None,
) -> list[HashedMetricSpec]:
metrics_specs = []
metrics.incr("on_demand_metrics.before_widget_spec_generation")
Expand Down Expand Up @@ -739,7 +756,7 @@ def _convert_aggregate_and_query_to_metrics(
prefilling: bool,
spec_type: MetricSpecType = MetricSpecType.SIMPLE_QUERY,
groupbys: Sequence[str] | None = None,
organization_bulk_query_cache: dict[str, Any] | None = None,
organization_bulk_query_cache: dict[int, dict[str, bool]] | None = None,
) -> Sequence[HashedMetricSpec] | None:
"""
Converts an aggregate and a query to a metric spec with its hash value.
Expand Down
17 changes: 12 additions & 5 deletions src/sentry/snuba/metrics/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
"user_misery": SPEC_VERSION_TWO_FLAG,
}

# Splits the bulk cache for on-demand resolution into N chunks
WIDGET_QUERY_CACHE_MAX_CHUNKS = 6


# This helps us control the different spec versions
# in order to migrate customers from invalid specs
Expand Down Expand Up @@ -670,18 +673,22 @@ def should_use_on_demand_metrics(
query: str | None,
groupbys: Sequence[str] | None = None,
prefilling: bool = False,
organization_bulk_query_cache: dict[str, Any] | None = None,
organization_bulk_query_cache: dict[int, dict[str, bool]] | None = None,
) -> bool:
if in_random_rollout("on_demand_metrics.cache_should_use_on_demand"):
if organization_bulk_query_cache is None:
organization_bulk_query_cache = {}

dataset_str = dataset.value if isinstance(dataset, Enum) else str(dataset or "")
groupbys_str = ",".join(sorted(groupbys)) if groupbys else ""
local_cache_key = md5_text(
local_cache_md5 = md5_text(
f"{dataset_str}-{aggregate}-{query or ''}-{groupbys_str}-prefilling={prefilling}"
).hexdigest()
cached_result = organization_bulk_query_cache.get(local_cache_key, None)
)
local_cache_digest_chunk = local_cache_md5.digest()[0] % WIDGET_QUERY_CACHE_MAX_CHUNKS
local_cache_key = local_cache_md5.hexdigest()
cached_result = organization_bulk_query_cache.get(local_cache_digest_chunk, {}).get(
local_cache_key, None
)
if cached_result:
metrics.incr("on_demand_metrics.should_use_on_demand_metrics.cache_hit")
return cached_result
Expand All @@ -700,7 +707,7 @@ def should_use_on_demand_metrics(
prefilling=prefilling,
)
metrics.incr("on_demand_metrics.should_use_on_demand_metrics.cache_miss")
organization_bulk_query_cache[local_cache_key] = result
organization_bulk_query_cache[local_cache_digest_chunk][local_cache_key] = result
return result

return _should_use_on_demand_metrics(
Expand Down
14 changes: 7 additions & 7 deletions tests/sentry/relay/config/test_metric_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from sentry.models.transaction_threshold import ProjectTransactionThreshold, TransactionMetric
from sentry.relay.config.experimental import TimeChecker
from sentry.relay.config.metric_extraction import (
_set_bulk_cached_query,
_set_bulk_cached_query_chunk,
get_current_widget_specs,
get_metric_extraction_config,
)
Expand Down Expand Up @@ -766,24 +766,24 @@ def test_get_metric_extraction_config_alerts_and_widgets_off(default_project: Pr
@django_db_all
def test_get_metric_extraction_config_uses_cache_for_widgets(default_project: Project) -> None:
# widgets should be skipped if the feature is off
original_set_bulk_cached_query = _set_bulk_cached_query
original_set_bulk_cached_query = _set_bulk_cached_query_chunk

with (
Feature({ON_DEMAND_METRICS: True, ON_DEMAND_METRICS_WIDGETS: True}),
override_options({"on_demand_metrics.cache_should_use_on_demand": 1.0}),
mock.patch(
"sentry.relay.config.metric_extraction._set_bulk_cached_query"
) as mock_set_cache_spy,
"sentry.relay.config.metric_extraction._set_bulk_cached_query_chunk"
) as mock_set_cache_chunk_spy,
):
mock_set_cache_spy.side_effect = original_set_bulk_cached_query
mock_set_cache_chunk_spy.side_effect = original_set_bulk_cached_query
create_widget(["count()"], "transaction.duration:>=1000", default_project)

get_metric_extraction_config(TimeChecker(timedelta(seconds=0)), default_project)

assert mock_set_cache_spy.call_count == 1
assert mock_set_cache_chunk_spy.call_count == 6 # One for each chunk

get_metric_extraction_config(TimeChecker(timedelta(seconds=0)), default_project)
assert mock_set_cache_spy.call_count == 1
assert mock_set_cache_chunk_spy.call_count == 6


@django_db_all
Expand Down

0 comments on commit 95bfba3

Please sign in to comment.