Skip to content

Commit

Permalink
Better logging for backpressure (#81648)
Browse files Browse the repository at this point in the history
This just adds host/port of the service to the logs.
  • Loading branch information
kneeyo1 authored Dec 10, 2024
1 parent c7ee897 commit 040076d
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 1 deletion.
31 changes: 30 additions & 1 deletion src/sentry/processing/backpressure/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class ServiceMemory:
used: int
available: int
percentage: float
host: str | None = None
port: int | None = None

def __init__(self, name: str, used: int, available: int):
self.name = name
Expand All @@ -21,6 +23,12 @@ def __init__(self, name: str, used: int, available: int):
self.percentage = used / available


@dataclass
class NodeInfo:
host: str | None
port: int | None


def query_rabbitmq_memory_usage(host: str) -> ServiceMemory:
"""Returns the currently used memory and the memory limit of a
RabbitMQ host.
Expand Down Expand Up @@ -51,6 +59,23 @@ def get_memory_usage(node_id: str, info: Mapping[str, Any]) -> ServiceMemory:
return ServiceMemory(node_id, memory_used, memory_available)


def get_host_port_info(node_id: str, cluster: Cluster) -> NodeInfo:
"""
Extract the host and port of the redis node in the cluster.
"""
try:
if isinstance(cluster, RedisCluster):
# RedisCluster node mapping
node = cluster.connection_pool.nodes.nodes.get(node_id)
return NodeInfo(node["host"], node["port"])
else:
# rb.Cluster node mapping
node = cluster.hosts[node_id]
return NodeInfo(node.host, node.port)
except Exception:
return NodeInfo(None, None)


def iter_cluster_memory_usage(cluster: Cluster) -> Generator[ServiceMemory, None, None]:
"""
A generator that yields redis `INFO` results for each of the nodes in the `cluster`.
Expand All @@ -65,4 +90,8 @@ def iter_cluster_memory_usage(cluster: Cluster) -> Generator[ServiceMemory, None
cluster_info = promise.value

for node_id, info in cluster_info.items():
yield get_memory_usage(node_id, info)
node_info = get_host_port_info(node_id, cluster)
memory_usage = get_memory_usage(node_id, info)
memory_usage.host = node_info.host
memory_usage.port = node_info.port
yield memory_usage
10 changes: 10 additions & 0 deletions src/sentry/processing/backpressure/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ def check_service_health(services: Mapping[str, Service]) -> MutableMapping[str,
reasons = []

logger.info("Checking service `%s` (configured high watermark: %s):", name, high_watermark)
memory = None
try:
for memory in check_service_memory(service):
if memory.percentage >= high_watermark:
reasons.append(memory)
logger.info("Checking node: %s:%s", memory.host, memory.port)
logger.info(
" name: %s, used: %s, available: %s, percentage: %s",
memory.name,
Expand All @@ -101,6 +103,14 @@ def check_service_health(services: Mapping[str, Service]) -> MutableMapping[str,
scope.set_tag("service", name)
sentry_sdk.capture_exception(e)
unhealthy_services[name] = e
host = memory.host if memory else "unknown"
port = memory.port if memory else "unknown"
logger.exception(
"Error while processing node %s:%s for service %s",
host,
port,
service,
)
else:
unhealthy_services[name] = reasons

Expand Down
24 changes: 24 additions & 0 deletions tests/sentry/processing/backpressure/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def test_rb_cluster_returns_some_usage() -> None:
assert memory.used > 0
assert memory.available > 0
assert 0.0 < memory.percentage < 1.0
assert memory.host == "localhost"
assert memory.port == 6379


@use_redis_cluster()
Expand All @@ -35,6 +37,8 @@ def test_redis_cluster_cluster_returns_some_usage() -> None:
assert memory.used > 0
assert memory.available > 0
assert 0.0 < memory.percentage < 1.0
assert memory.host == "127.0.0.1"
assert memory.port in {7000, 7001, 7002, 7003, 7004, 7005}


@use_redis_cluster(high_watermark=100)
Expand All @@ -47,6 +51,14 @@ def test_redis_health():
assert isinstance(redis_services, list)
assert len(redis_services) == 0

usage = list(iter_cluster_memory_usage(services["redis"].cluster))
for memory in usage:
assert memory.used >= 0
assert memory.available > 0
assert 0.0 < memory.percentage <= 1.0
assert memory.host == "127.0.0.1"
assert memory.port in {7000, 7001, 7002, 7003, 7004, 7005}


@use_redis_cluster(high_watermark=0)
def test_redis_unhealthy_state():
Expand All @@ -57,3 +69,15 @@ def test_redis_unhealthy_state():
redis_services = unhealthy_services.get("redis")
assert isinstance(redis_services, list)
assert len(redis_services) == 6

usage = list(iter_cluster_memory_usage(services["redis"].cluster))
for memory in usage:
assert memory.used >= 0
assert memory.available > 0
assert 0.0 < memory.percentage <= 1.0
assert memory.host == "127.0.0.1"
assert memory.port in {7000, 7001, 7002, 7003, 7004, 7005}

for memory in redis_services:
assert memory.host == "127.0.0.1"
assert memory.port in {7000, 7001, 7002, 7003, 7004, 7005}

0 comments on commit 040076d

Please sign in to comment.