Skip to content

Commit

Permalink
add namespace parameter to indexer cache
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Oct 18, 2023
1 parent 05f116b commit d73c1b9
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 206 deletions.
98 changes: 75 additions & 23 deletions src/sentry/sentry_metrics/indexer/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
_INDEXER_CACHE_FETCH_METRIC = "sentry_metrics.indexer.memcache.fetch"


BULK_RECORD_CACHE_NAME_SPACE = "br"
RESOLVE_CACHE_NAMESPACE = "res"


class StringIndexerCache:
def __init__(self, cache_name: str, partition_key: str):
self.version = 1
Expand All @@ -47,13 +51,22 @@ def randomized_ttl(self) -> int:
jitter = random.uniform(0, 0.25) * cache_ttl
return int(cache_ttl + jitter)

def make_cache_key(self, key: str) -> str:
def _make_cache_key(self, key: str) -> str:
use_case_id, org_id, string = key.split(":", 2)
org_string = org_id + ":" + string
hashed = md5_text(org_string).hexdigest()

return f"indexer:{self.partition_key}:org:str:{use_case_id}:{hashed}"

# The new namespaced version of the above function, eventually this will replace
# _make_cache_key
def _make_namespaced_cache_key(self, namespace: str, key: str) -> str:
use_case_id, org_id, string = key.split(":", 2)
org_string = f"{org_id}:{string}"
hashed = md5_text(org_string).hexdigest()

return f"indexer:{self.partition_key}:{namespace}:org:str:{use_case_id}:{hashed}"

def _format_results(
self, keys: Iterable[str], results: Mapping[str, Optional[int]]
) -> MutableMapping[str, Optional[int]]:
Expand All @@ -67,40 +80,76 @@ def _format_results(
"""
formatted: MutableMapping[str, Optional[int]] = {}
for key in keys:
cache_key = self.make_cache_key(key)
cache_key = self._make_cache_key(key)
formatted[key] = results.get(cache_key)

return formatted

# The new namespaced version of the above function, eventually this will replace
# _format_results
def _format_namespaced_results(
self, namespace: str, keys: Iterable[str], results: Mapping[str, Optional[int]]
) -> MutableMapping[str, Optional[int]]:
"""
Takes in keys formatted like "use_case_id:org_id:string", and results that have the
internally used hashed key such as:
{"indexer:org:str:transactions:b0a0e436f6fa42b9e33e73befbdbb9ba": 2}
and returns results that replace the hashed internal key with the externally
used key:
{"transactions:3:a": 2}
"""
formatted: MutableMapping[str, Optional[int]] = {}
for key in keys:
cache_key = self._make_namespaced_cache_key(namespace, key)
formatted[key] = results.get(cache_key)

return formatted

def get(self, key: str) -> int:
result: int = self.cache.get(self.make_cache_key(key), version=self.version)
return result
def get(self, namespace: str, key: str) -> int:
if options.get("sentry-metrics.indexer.read-new-cache-namespace"):
result = self.cache.get(
self._make_namespaced_cache_key(namespace, key), version=self.version
)
return int(result.split(":")[0])
return self.cache.get(self._make_cache_key(key), version=self.version)

def set(self, key: str, value: int) -> None:
def set(self, namespace: str, key: str, value: int) -> None:
self.cache.set(
key=self.make_cache_key(key),
key=self._make_cache_key(key),
value=value,
timeout=self.randomized_ttl,
version=self.version,
)

def get_many(self, keys: Iterable[str]) -> MutableMapping[str, Optional[int]]:
cache_keys = {self.make_cache_key(key): key for key in keys}
results: Mapping[str, Optional[int]] = self.cache.get_many(
cache_keys.keys(), version=self.version
)
return self._format_results(keys, results)
def get_many(self, namespace: str, keys: Iterable[str]) -> MutableMapping[str, Optional[int]]:
if options.get("sentry-metrics.indexer.read-new-cache-namespace"):
cache_keys = {self._make_namespaced_cache_key(namespace, key): key for key in keys}
namespaced_results: MutableMapping[str, Optional[int]] = {
k: int(v.split(":")[0])
for k, v in self.cache.get_many(cache_keys.keys(), version=self.version)
}
return self._format_namespaced_results(
namespace,
keys,
namespaced_results,
)
else:
cache_keys = {self._make_cache_key(key): key for key in keys}
results: Mapping[str, Optional[int]] = self.cache.get_many(
cache_keys.keys(), version=self.version
)
return self._format_results(keys, results)

def set_many(self, key_values: Mapping[str, int]) -> None:
cache_key_values = {self.make_cache_key(k): v for k, v in key_values.items()}
def set_many(self, namespace: str, key_values: Mapping[str, int]) -> None:
cache_key_values = {self._make_cache_key(k): v for k, v in key_values.items()}
self.cache.set_many(cache_key_values, timeout=self.randomized_ttl, version=self.version)

def delete(self, key: str) -> None:
cache_key = self.make_cache_key(key)
def delete(self, namespace: str, key: str) -> None:
cache_key = self._make_cache_key(key)
self.cache.delete(cache_key, version=self.version)

def delete_many(self, keys: Sequence[str]) -> None:
cache_keys = [self.make_cache_key(key) for key in keys]
def delete_many(self, namespace: str, keys: Sequence[str]) -> None:
cache_keys = [self._make_cache_key(key) for key in keys]
self.cache.delete_many(cache_keys, version=self.version)


Expand All @@ -112,10 +161,11 @@ def __init__(self, cache: StringIndexerCache, indexer: StringIndexer) -> None:
def bulk_record(
self, strings: Mapping[UseCaseID, Mapping[OrgId, Set[str]]]
) -> UseCaseKeyResults:

cache_keys = UseCaseKeyCollection(strings)
metrics.gauge("sentry_metrics.indexer.lookups_per_batch", value=cache_keys.size)
cache_key_strs = cache_keys.as_strings()
cache_results = self.cache.get_many(cache_key_strs)
cache_results = self.cache.get_many(BULK_RECORD_CACHE_NAME_SPACE, cache_key_strs)

hits = [k for k, v in cache_results.items() if v is not None]

Expand Down Expand Up @@ -155,7 +205,9 @@ def bulk_record(
}
)

self.cache.set_many(db_record_key_results.get_mapped_strings_to_ints())
self.cache.set_many(
BULK_RECORD_CACHE_NAME_SPACE, db_record_key_results.get_mapped_strings_to_ints()
)

return cache_key_results.merge(db_record_key_results)

Expand All @@ -166,7 +218,7 @@ def record(self, use_case_id: UseCaseID, org_id: int, string: str) -> Optional[i
@metric_path_key_compatible_resolve
def resolve(self, use_case_id: UseCaseID, org_id: int, string: str) -> Optional[int]:
key = f"{use_case_id.value}:{org_id}:{string}"
result = self.cache.get(key)
result = self.cache.get(RESOLVE_CACHE_NAMESPACE, key)

if result and isinstance(result, int):
metrics.incr(
Expand All @@ -188,7 +240,7 @@ def resolve(self, use_case_id: UseCaseID, org_id: int, string: str) -> Optional[
_INDEXER_CACHE_RESOLVE_CACHE_REPLENISHMENT_METRIC,
tags={"use_case": use_case_id.value},
)
self.cache.set(key, id)
self.cache.set(RESOLVE_CACHE_NAMESPACE, key, id)

return id

Expand Down
Loading

0 comments on commit d73c1b9

Please sign in to comment.