Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(COGS): Add UsageAccumulator and use in Metrics Indexer #57524

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions requirements-base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ zstandard>=0.18.0
msgpack>=1.0.4
cryptography>=38.0.3

sentry-usage-accountant==0.0.10

# celery
billiard>=3.6.4
kombu>=4.6.11
Expand Down
1 change: 1 addition & 0 deletions requirements-dev-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ sentry-kafka-schemas==0.1.29
sentry-redis-tools==0.1.7
sentry-relay==0.8.31
sentry-sdk==1.31.0
sentry-usage-accountant==0.0.10
simplejson==3.17.6
six==1.16.0
sniffio==1.2.0
Expand Down
1 change: 1 addition & 0 deletions requirements-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ sentry-kafka-schemas==0.1.29
sentry-redis-tools==0.1.7
sentry-relay==0.8.31
sentry-sdk==1.31.0
sentry-usage-accountant==0.0.10
simplejson==3.17.6
six==1.16.0
snuba-sdk==2.0.3
Expand Down
67 changes: 67 additions & 0 deletions src/sentry/cogs/accountant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations

import logging

from arroyo.backends.kafka import KafkaProducer, build_kafka_configuration
from django.conf import settings
from usageaccountant import UsageAccumulator, UsageUnit

logger = logging.getLogger("usageaccountant")


accumulator: UsageAccumulator | None = None


def _accumulator(create: bool = False) -> UsageAccumulator | None:
global accumulator
if accumulator is None and create:
from sentry.utils.kafka_config import (
get_kafka_producer_cluster_options,
get_topic_definition,
)

producer = KafkaProducer(
build_kafka_configuration(
get_kafka_producer_cluster_options(
get_topic_definition(settings.KAFKA_SHARED_RESOURCES_USAGE)["cluster"]
)
)
)
accumulator = UsageAccumulator(producer=producer)
return accumulator


def record_cogs(resource_id: str, app_feature: str, amount: int, usage_type: UsageUnit) -> None:
"""
Spins up an instance (if it does not exist) of UsageAccumulator and records
cogs data to the configured shared resources usage topic.

*GOTCHAS*
- You must call `close_cogs_recorder` in order to flush and close the producer
used by the UsageAccumulator
- Eg. if this is used in a consumer, call the close function in the consumers
close out path
"""
try:
accumulator = _accumulator(create=True)
assert accumulator is not None
accumulator.record(resource_id, app_feature, amount, usage_type)
except Exception as err:
logger.warning("Could not record COGS due to error: %r", err, exc_info=True)


def close_cogs_recorder() -> None:
"""
Flushes and closes any Producer used by UsageAccumulator.

This producer only gets created if the `record_cogs` function is called at least
once.
"""
logger.info("Flushing and closing cogs recorder if it exists...")
try:
accumulator = _accumulator()
if accumulator is not None:
accumulator.flush()
accumulator.close()
except Exception as err:
logger.error("Error shutting down COGS producer: %r", err, exc_info=True)
2 changes: 2 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3239,6 +3239,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
KAFKA_EVENTSTREAM_GENERIC = "generic-events"
KAFKA_GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log"
KAFKA_GROUP_ATTRIBUTES = "group-attributes"
KAFKA_SHARED_RESOURCES_USAGE = "shared-resources-usage"

# spans
KAFKA_INGEST_SPANS = "ingest-spans"
Expand Down Expand Up @@ -3291,6 +3292,7 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
KAFKA_GROUP_ATTRIBUTES: {"cluster": "default"},
KAFKA_INGEST_SPANS: {"cluster": "default"},
KAFKA_SNUBA_SPANS: {"cluster": "default"},
KAFKA_SHARED_RESOURCES_USAGE: {"cluster": "default"},
}


Expand Down
7 changes: 7 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,13 @@
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# An option to tune the percentage of indexer batches that get analyzed for throughput and published to shared-resources-usage for cogs analysis
register(
"sentry-metrics.indexer.record-throughput-cogs-rollout",
default=0.0,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# Global and per-organization limits on the writes to the string indexer's DB.
#
# Format is a list of dictionaries of format {
Expand Down
12 changes: 12 additions & 0 deletions src/sentry/sentry_metrics/consumers/indexer/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric
from sentry_kafka_schemas.schema_types.snuba_generic_metrics_v1 import GenericMetric
from sentry_kafka_schemas.schema_types.snuba_metrics_v1 import Metric
from usageaccountant import UsageUnit

from sentry import options
from sentry.cogs.accountant import record_cogs
from sentry.sentry_metrics.aggregation_option_registry import get_aggregation_option
from sentry.sentry_metrics.configuration import MAX_INDEXED_COLUMN_LENGTH
from sentry.sentry_metrics.consumers.indexer.common import IndexerOutputMessageBatch, MessageBatch
Expand Down Expand Up @@ -502,4 +504,14 @@ def reconstruct_messages(
self.__message_size_max[use_case_id],
tags={"use_case_id": use_case_id.value},
)
self.__record_cogs(use_case_id)
return new_messages

def __record_cogs(self, use_case_id: UseCaseID) -> None:
if random.random() <= options.get("sentry-metrics.indexer.record-throughput-cogs-rollout"):
record_cogs(
resource_id="sentry_metrics_indexer_processor",
app_feature=f"sentrymetricsindexer_{use_case_id.value}",
amount=self.__message_size_sum[use_case_id],
usage_type=UsageUnit.BYTES,
)
3 changes: 3 additions & 0 deletions src/sentry/utils/kafka.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import logging
import signal

from sentry.cogs.accountant import close_cogs_recorder

logger = logging.getLogger(__name__)


def run_processor_with_signals(processor):
def handler(signum, frame):
processor.signal_shutdown()
close_cogs_recorder()

signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
Expand Down