Skip to content

Commit

Permalink
feat(metrics): Add ability for indexer cache to write new schema (#58414
Browse files Browse the repository at this point in the history
)

# This PR

- Read the sentry option
`sentry-metrics.indexer.write-new-cache-namespace` (set to False
default), and if it's True, write to the new cache schema along with the
old cache schema.

**Previous change in this stack PR**
#58178
  • Loading branch information
john-z-yang authored Oct 26, 2023
1 parent dd79f1d commit 15af691
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 23 deletions.
47 changes: 37 additions & 10 deletions src/sentry/sentry_metrics/indexer/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import random
from datetime import datetime
from typing import Collection, Iterable, Mapping, MutableMapping, Optional, Sequence, Set

from django.conf import settings
Expand Down Expand Up @@ -33,7 +34,10 @@
_INDEXER_CACHE_FETCH_METRIC = "sentry_metrics.indexer.memcache.fetch"


BULK_RECORD_CACHE_NAME_SPACE = "br"
NAMESPACED_WRITE_FEAT_FLAG = "sentry-metrics.indexer.write-new-cache-namespace"
NAMESPACED_READ_FEAT_FLAG = "sentry-metrics.indexer.read-new-cache-namespace"

BULK_RECORD_CACHE_NAMESPACE = "br"
RESOLVE_CACHE_NAMESPACE = "res"


Expand Down Expand Up @@ -67,6 +71,9 @@ def _make_namespaced_cache_key(self, namespace: str, key: str) -> str:

return f"indexer:{self.partition_key}:{namespace}:org:str:{use_case_id}:{hashed}"

def _make_cache_val(self, val: int, timestamp: int):
return f"{val}:{timestamp}"

def _format_results(
self, keys: Iterable[str], results: Mapping[str, Optional[int]]
) -> MutableMapping[str, Optional[int]]:
Expand Down Expand Up @@ -109,11 +116,10 @@ def _validate_result(self, result: Optional[str]) -> Optional[int]:
if result is None:
return None
result, _ = result.split(":")

return int(result)

def get(self, namespace: str, key: str) -> Optional[int]:
if options.get("sentry-metrics.indexer.read-new-cache-namespace"):
if options.get(NAMESPACED_READ_FEAT_FLAG):
result = self.cache.get(
self._make_namespaced_cache_key(namespace, key), version=self.version
)
Expand All @@ -127,9 +133,16 @@ def set(self, namespace: str, key: str, value: int) -> None:
timeout=self.randomized_ttl,
version=self.version,
)
if options.get(NAMESPACED_WRITE_FEAT_FLAG):
self.cache.set(
key=self._make_namespaced_cache_key(namespace, key),
value=self._make_cache_val(value, int(datetime.utcnow().timestamp())),
timeout=self.randomized_ttl,
version=self.version,
)

def get_many(self, namespace: str, keys: Iterable[str]) -> MutableMapping[str, Optional[int]]:
if options.get("sentry-metrics.indexer.read-new-cache-namespace"):
if options.get(NAMESPACED_READ_FEAT_FLAG):
cache_keys = {self._make_namespaced_cache_key(namespace, key): key for key in keys}
namespaced_results: MutableMapping[str, Optional[int]] = {
k: self._validate_result(v)
Expand All @@ -150,14 +163,28 @@ def get_many(self, namespace: str, keys: Iterable[str]) -> MutableMapping[str, O
def set_many(self, namespace: str, key_values: Mapping[str, int]) -> None:
cache_key_values = {self._make_cache_key(k): v for k, v in key_values.items()}
self.cache.set_many(cache_key_values, timeout=self.randomized_ttl, version=self.version)
if options.get(NAMESPACED_WRITE_FEAT_FLAG):
timestamp = int(datetime.utcnow().timestamp())
namespaced_cache_key_values = {
self._make_namespaced_cache_key(namespace, k): self._make_cache_val(v, timestamp)
for k, v in key_values.items()
}
self.cache.set_many(
namespaced_cache_key_values, timeout=self.randomized_ttl, version=self.version
)

def delete(self, namespace: str, key: str) -> None:
cache_key = self._make_cache_key(key)
self.cache.delete(cache_key, version=self.version)
self.cache.delete(self._make_cache_key(key), version=self.version)
if options.get(NAMESPACED_WRITE_FEAT_FLAG):
self.cache.delete(self._make_namespaced_cache_key(namespace, key), version=self.version)

def delete_many(self, namespace: str, keys: Sequence[str]) -> None:
cache_keys = [self._make_cache_key(key) for key in keys]
self.cache.delete_many(cache_keys, version=self.version)
self.cache.delete_many([self._make_cache_key(key) for key in keys], version=self.version)
if options.get(NAMESPACED_WRITE_FEAT_FLAG):
self.cache.delete_many(
[self._make_namespaced_cache_key(namespace, key) for key in keys],
version=self.version,
)


class CachingIndexer(StringIndexer):
Expand All @@ -172,7 +199,7 @@ def bulk_record(
cache_keys = UseCaseKeyCollection(strings)
metrics.gauge("sentry_metrics.indexer.lookups_per_batch", value=cache_keys.size)
cache_key_strs = cache_keys.as_strings()
cache_results = self.cache.get_many(BULK_RECORD_CACHE_NAME_SPACE, cache_key_strs)
cache_results = self.cache.get_many(BULK_RECORD_CACHE_NAMESPACE, cache_key_strs)

hits = [k for k, v in cache_results.items() if v is not None]

Expand Down Expand Up @@ -213,7 +240,7 @@ def bulk_record(
)

self.cache.set_many(
BULK_RECORD_CACHE_NAME_SPACE, db_record_key_results.get_mapped_strings_to_ints()
BULK_RECORD_CACHE_NAMESPACE, db_record_key_results.get_mapped_strings_to_ints()
)

return cache_key_results.merge(db_record_key_results)
Expand Down
42 changes: 36 additions & 6 deletions tests/sentry/sentry_metrics/test_all_indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,12 @@ def test_static_and_non_static_strings_generic_metrics(indexer):


def test_indexer(indexer, indexer_cache, use_case_id):
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
org1_id = 1
org2_id = 2
strings = {"hello", "hey", "hi"}
Expand Down Expand Up @@ -257,7 +262,12 @@ def test_resolve_and_reverse_resolve(indexer, indexer_cache, use_case_id):
"""
Test `resolve` and `reverse_resolve` methods
"""
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
org1_id = 1
strings = {"hello", "hey", "hi"}

Expand Down Expand Up @@ -285,7 +295,12 @@ def test_already_created_plus_written_results(indexer, indexer_cache, use_case_i
Test that we correctly combine db read results with db write results
for the same organization.
"""
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
org_id = 1234

raw_indexer = indexer
Expand Down Expand Up @@ -332,7 +347,12 @@ def test_already_cached_plus_read_results(indexer, indexer_cache, use_case_id) -
Test that we correctly combine cached results with read results
for the same organization.
"""
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
org_id = 8
cached = {
f"{use_case_id.value}:{org_id}:beep": 10,
Expand Down Expand Up @@ -371,7 +391,12 @@ def test_already_cached_plus_read_results(indexer, indexer_cache, use_case_id) -


def test_read_when_bulk_record(indexer, use_case_id):
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
strings = {
use_case_id: {
1: {"a"},
Expand Down Expand Up @@ -483,7 +508,12 @@ def test_bulk_reverse_resolve(indexer):
Tests reverse resolve properly returns the corresponding strings
in the proper order when given a combination of shared and non-shared ids.
"""
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
org_id = 7
use_case_id = UseCaseID.SESSIONS # any use case would do
static_indexer = StaticStringIndexer(indexer)
Expand Down
102 changes: 95 additions & 7 deletions tests/sentry/sentry_metrics/test_indexer_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ def use_case_id() -> str:


def test_cache(use_case_id: str) -> None:
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
cache.clear()
namespace = "test"
assert indexer_cache.get(namespace, f"{use_case_id}:1:blah:123") is None
Expand All @@ -32,9 +37,67 @@ def test_cache(use_case_id: str) -> None:
indexer_cache.delete(namespace, f"{use_case_id}:1:blah:123")
assert indexer_cache.get(namespace, f"{use_case_id}:1:blah:123") is None

with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": True,
}
):
cache.clear()
namespace = "test"
assert indexer_cache.get(namespace, f"{use_case_id}:1:blah:123") is None
indexer_cache.set(namespace, f"{use_case_id}:1:blah:123", 1)
assert indexer_cache.get(namespace, f"{use_case_id}:1:blah:123") == 1

indexer_cache.delete(namespace, f"{use_case_id}:1:blah:123")
assert indexer_cache.get(namespace, f"{use_case_id}:1:blah:123") is None

with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": True,
"sentry-metrics.indexer.write-new-cache-namespace": True,
}
):
cache.clear()
namespace = "test"
assert indexer_cache.get(namespace, f"{use_case_id}:1:blah:123") is None
indexer_cache.set(namespace, f"{use_case_id}:1:blah:123", 1)
assert indexer_cache.get(namespace, f"{use_case_id}:1:blah:123") == 1

indexer_cache.delete(namespace, f"{use_case_id}:1:blah:123")
assert indexer_cache.get(namespace, f"{use_case_id}:1:blah:123") is None

with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": True,
"sentry-metrics.indexer.write-new-cache-namespace": True,
}
):
cache.clear()
namespace_1 = "1"
namespace_2 = "2"
assert indexer_cache.get(namespace_1, f"{use_case_id}:1:blah:123") is None
indexer_cache.set(namespace_1, f"{use_case_id}:1:blah:123", 1)
assert indexer_cache.get(namespace_1, f"{use_case_id}:1:blah:123") == 1

indexer_cache.delete(namespace_1, f"{use_case_id}:1:blah:123")
assert indexer_cache.get(namespace_1, f"{use_case_id}:1:blah:123") is None

assert indexer_cache.get(namespace_2, f"{use_case_id}:1:blah:123") is None
indexer_cache.set(namespace_2, f"{use_case_id}:1:blah:123", 2)
assert indexer_cache.get(namespace_2, f"{use_case_id}:1:blah:123") == 2

indexer_cache.delete(namespace_2, f"{use_case_id}:1:blah:123")
assert indexer_cache.get(namespace_2, f"{use_case_id}:1:blah:123") is None


def test_cache_many(use_case_id: str) -> None:
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
cache.clear()
namespace = "test"
values = {f"{use_case_id}:100:hello": 2, f"{use_case_id}:100:bye": 3}
Expand All @@ -53,7 +116,12 @@ def test_cache_many(use_case_id: str) -> None:


def test_make_cache_key(use_case_id: str) -> None:
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
cache.clear()
namespace = "test"
orgId = 1
Expand All @@ -64,7 +132,12 @@ def test_make_cache_key(use_case_id: str) -> None:

assert key == f"indexer:test:org:str:{use_case_id}:{hashed}"

with override_options({"sentry-metrics.indexer.read-new-cache-namespace": True}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": True,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
cache.clear()
namespace = "test"
orgId = 1
Expand All @@ -77,14 +150,24 @@ def test_make_cache_key(use_case_id: str) -> None:


def test_formatted_results(use_case_id: str) -> None:
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
cache.clear()
namespace = "test"
values = {f"{use_case_id}:1:::hello": 2, f"{use_case_id}:1:::bye": 3}
results = {indexer_cache._make_cache_key(k): v for k, v in values.items()}
assert indexer_cache._format_results(list(values.keys()), results) == values

with override_options({"sentry-metrics.indexer.read-new-cache-namespace": True}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": True,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
cache.clear()
namespace = "test"
values = {
Expand Down Expand Up @@ -114,7 +197,12 @@ def test_ttl_jitter() -> None:


def test_separate_namespacing() -> None:
with override_options({"sentry-metrics.indexer.read-new-cache-namespace": False}):
with override_options(
{
"sentry-metrics.indexer.read-new-cache-namespace": False,
"sentry-metrics.indexer.write-new-cache-namespace": False,
}
):
namespace = "test"
indexer_cache.set(namespace, "sessions:3:what", 1)
assert indexer_cache.get(namespace, "sessions:3:what") == 1
Expand Down

0 comments on commit 15af691

Please sign in to comment.