Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support for sending custom datadog metrics from querybook #1390

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
26 changes: 26 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,32 @@ services:
timeout: 30s
retries: 3

#
# OPTIONAL SERVICES
# Start separately if you need them via `docker compose up <service>`
#

datadog:
image: gcr.io/datadoghq/agent:latest
ports:
- 8125:8125/udp
expose:
- 8125/udp
environment:
DD_DOGSTATSD_NON_LOCAL_TRAFFIC: true
DD_USE_DOGSTATSD: true
DD_APM_ENABLED: false
DD_AC_EXCLUDE: '.*'
DD_ENV: 'dev'
DD_SERVICE: 'querybook'
DD_VERSION: 'latest'
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /proc/:/host/proc/:ro
- /sys/fs/cgroup:/host/sys/fs/cgroup:ro
profiles:
- datadog

# EMAIL SERVER EXAMPLE
# If you need email to work use this
# dockerhostforward:
Expand Down
7 changes: 7 additions & 0 deletions querybook/server/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,10 @@ class QuerybookSettings(object):
VECTOR_STORE_CONFIG = get_env_config("VECTOR_STORE_CONFIG") or {}
EMBEDDINGS_PROVIDER = get_env_config("EMBEDDINGS_PROVIDER")
EMBEDDINGS_CONFIG = get_env_config("EMBEDDINGS_CONFIG") or {}

# Datadog
DD_AGENT_HOST = get_env_config("DD_AGENT_HOST", optional=True)
DD_DOGSTATSD_PORT = int(get_env_config("DD_DOGSTATSD_PORT", optional=True) or 8125)
DD_PREFIX = get_env_config("DD_PREFIX", optional=True)
DD_SERVICE = get_env_config("DD_SERVICE", optional=True) or "querybook"
DD_TAGS = get_env_config("DD_TAGS", optional=True) or []
27 changes: 27 additions & 0 deletions querybook/server/lib/celery/celery_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import time
import threading
from lib.stats_logger import stats_logger, ACTIVE_WORKERS, ACTIVE_TASKS


def send_stats_logger_metrics(celery):
while True:
i = celery.control.inspect()

active = i.active() or {}

active_workers = list(active.keys())
active_tasks = 0
for worker in active_workers:
if worker in active:
active_tasks += len(active[worker])

stats_logger.gauge(ACTIVE_WORKERS, len(active_workers))
stats_logger.gauge(ACTIVE_TASKS, active_tasks)
time.sleep(5)


def start_stats_logger_monitor(celery):
thread = threading.Thread(
target=send_stats_logger_metrics, args=[celery], daemon=True
)
thread.start()
4 changes: 4 additions & 0 deletions querybook/server/lib/stats_logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
WS_CONNECTIONS = "ws.connections"
SQL_SESSION_FAILURES = "sql_session.failures"
TASK_FAILURES = "task.failures"
TASK_SUCCESSES = "task.successes"
TASK_RECEIVED = "task.received"
REDIS_OPERATIONS = "redis.operations"
QUERY_EXECUTIONS = "query.executions"
ACTIVE_WORKERS = "celery.active_workers"
ACTIVE_TASKS = "celery.active_tasks"


logger_name = QuerybookSettings.STATS_LOGGER_NAME
Expand Down
12 changes: 11 additions & 1 deletion querybook/server/lib/stats_logger/all_stats_loggers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from lib.utils.import_helper import import_module_with_default

from .loggers.datadog_stats_logger import DatadogStatsLogger
from .loggers.null_stats_logger import NullStatsLogger
from .loggers.console_stats_logger import ConsoleStatsLogger

Expand All @@ -8,11 +10,19 @@
default=[],
)

ALL_STATS_LOGGERS = [NullStatsLogger(), ConsoleStatsLogger()] + ALL_PLUGIN_STATS_LOGGERS
ALL_STATS_LOGGERS = [
NullStatsLogger(),
ConsoleStatsLogger(),
DatadogStatsLogger(),
] + ALL_PLUGIN_STATS_LOGGERS


def get_stats_logger_class(name: str):
for logger in ALL_STATS_LOGGERS:
if logger.logger_name == name:
if hasattr(logger, "initialize") and callable(
getattr(logger, "initialize")
):
logger.initialize()
return logger
raise ValueError(f"Unknown stats logger name {name}")
93 changes: 93 additions & 0 deletions querybook/server/lib/stats_logger/loggers/datadog_stats_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from lib.stats_logger.base_stats_logger import BaseStatsLogger
from env import QuerybookSettings
from lib.logger import get_logger

LOG = get_logger(__file__)


class DatadogStatsLogger(BaseStatsLogger):
"""
Stats Logger implemention for Datadog using DogStatsD.

Required environment variables:
- DD_API_KEY: The API key for Datadog.
- DD_AGENT_HOST: The host of the Datadog agent.
- DD_DOGSTATSD_PORT: The port of the Datadog agent, defaults to 8125.
- DD_SERVICE: The service name.

Optional environment variables:
- DD_PREFIX: The prefix for all metrics.
- DD_TAGS: Additional tags to be added to all metrics.
"""

metric_prefix = ""
dd_tags = []
_statsd = None

def metric_prefix_helper(self, key):
return self.metric_prefix + "." + key

def tag_helper(self, tags):
if tags:
return [f"{k}:{v}" for k, v in tags.items()]
return []

def initialize(self):
try:
from datadog import initialize, statsd

self._statsd = statsd
except ImportError:
raise ImportError(
"Datadog is not installed. Please install `requirements/datadog/datadog.txt` to use the Datadog stats logger."
)

if QuerybookSettings.DD_AGENT_HOST and QuerybookSettings.DD_DOGSTATSD_PORT:
LOG.info("Initializing Datadog")

self.dd_tags = (
QuerybookSettings.DD_TAGS.split(",")
if QuerybookSettings.DD_TAGS
else []
)
self.metric_prefix = (
QuerybookSettings.DD_PREFIX or QuerybookSettings.DD_SERVICE
)

options = {
"statsd_host": QuerybookSettings.DD_AGENT_HOST,
"statsd_port": QuerybookSettings.DD_DOGSTATSD_PORT,
"statsd_constant_tags": self.dd_tags,
}

initialize(**options)
else:
LOG.info("Datadog environment variables are not set")

@property
def logger_name(self) -> str:
return "datadog"

def incr(self, key: str, tags: dict[str, str] = None) -> None:
self._statsd.increment(
self.metric_prefix_helper(key),
1,
tags=self.tag_helper(tags),
)

def decr(self, key: str, tags: dict[str, str] = None) -> None:
self._statsd.decrement(
self.metric_prefix_helper(key),
1,
tags=self.tag_helper(tags),
)

def timing(self, key: str, value: float, tags: dict[str, str] = None) -> None:
self._statsd.histogram(
self.metric_prefix_helper(key), value, tags=self.tag_helper(tags)
)

def gauge(self, key: str, value: float, tags: dict[str, str] = None) -> None:
self._statsd.gauge(
self.metric_prefix_helper(key), value, tags=self.tag_helper(tags)
)
41 changes: 36 additions & 5 deletions querybook/server/tasks/all_tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
from celery.signals import celeryd_init, task_failure
from celery.signals import (
celeryd_init,
task_failure,
task_success,
task_received,
beat_init,
)
from celery.utils.log import get_task_logger
from importlib import import_module

from app.flask_app import celery
from env import QuerybookSettings
from lib.logger import get_logger
from lib.stats_logger import TASK_FAILURES, stats_logger
from logic.schedule import get_schedule_task_type
from lib.stats_logger import (
TASK_FAILURES,
TASK_SUCCESSES,
TASK_RECEIVED,
stats_logger,
)
from lib.celery.celery_stats import start_stats_logger_monitor

from .export_query_execution import export_query_execution_task
from .run_query import run_query_task
Expand All @@ -20,6 +31,7 @@
from .presto_hive_function_scrapper import presto_hive_function_scrapper
from .db_clean_up_jobs import run_all_db_clean_up_jobs
from .disable_scheduled_docs import disable_scheduled_docs
from logic.schedule import get_schedule_task_type

LOG = get_logger(__file__)

Expand Down Expand Up @@ -60,5 +72,24 @@ def configure_workers(sender=None, conf=None, **kwargs):

@task_failure.connect
def handle_task_failure(sender, signal, *args, **kwargs):
task_type = get_schedule_task_type(sender.name)
stats_logger.incr(TASK_FAILURES, tags={"task_type": task_type})
tags = {"task_name": sender.name, "task_type": get_schedule_task_type(sender.name)}
stats_logger.incr(TASK_FAILURES, tags=tags)


@task_success.connect
def handle_task_success(sender, signal, *args, **kwargs):
tags = {"task_name": sender.name, "task_type": get_schedule_task_type(sender.name)}
stats_logger.incr(TASK_SUCCESSES, tags=tags)


@task_received.connect
def handle_task_received(sender, signal, *args, **kwargs):
tags = {"task_name": kwargs["request"].name}
tags["task_type"] = get_schedule_task_type(tags["task_name"])
stats_logger.incr(TASK_RECEIVED, tags=tags)


@beat_init.connect
def start_celery_stats_logging(sender=None, conf=None, **kwargs):
LOG.info("Starting Celery Beat")
start_stats_logger_monitor(celery)
2 changes: 2 additions & 0 deletions requirements/datadog/datadog.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
datadog==0.47.0
ddtrace==2.0.2
Loading