diff --git a/docker-compose.yml b/docker-compose.yml index 1e36f3d86..d4bdc85ec 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -127,6 +127,32 @@ services: timeout: 30s retries: 3 + # + # OPTIONAL SERVICES + # Start separately if you need them via `docker compose up ` + # + + datadog: + image: gcr.io/datadoghq/agent:latest + ports: + - 8125:8125/udp + expose: + - 8125/udp + environment: + DD_DOGSTATSD_NON_LOCAL_TRAFFIC: true + DD_USE_DOGSTATSD: true + DD_APM_ENABLED: false + DD_AC_EXCLUDE: '.*' + DD_ENV: 'dev' + DD_SERVICE: 'querybook' + DD_VERSION: 'latest' + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - /proc/:/host/proc/:ro + - /sys/fs/cgroup:/host/sys/fs/cgroup:ro + profiles: + - datadog + # EMAIL SERVER EXAMPLE # If you need email to work use this # dockerhostforward: diff --git a/querybook/server/env.py b/querybook/server/env.py index b634adf15..99b819ddb 100644 --- a/querybook/server/env.py +++ b/querybook/server/env.py @@ -139,3 +139,10 @@ class QuerybookSettings(object): VECTOR_STORE_CONFIG = get_env_config("VECTOR_STORE_CONFIG") or {} EMBEDDINGS_PROVIDER = get_env_config("EMBEDDINGS_PROVIDER") EMBEDDINGS_CONFIG = get_env_config("EMBEDDINGS_CONFIG") or {} + + # Datadog + DD_AGENT_HOST = get_env_config("DD_AGENT_HOST", optional=True) + DD_DOGSTATSD_PORT = int(get_env_config("DD_DOGSTATSD_PORT", optional=True) or 8125) + DD_PREFIX = get_env_config("DD_PREFIX", optional=True) + DD_SERVICE = get_env_config("DD_SERVICE", optional=True) or "querybook" + DD_TAGS = get_env_config("DD_TAGS", optional=True) or [] diff --git a/querybook/server/lib/celery/celery_stats.py b/querybook/server/lib/celery/celery_stats.py new file mode 100644 index 000000000..43fce3366 --- /dev/null +++ b/querybook/server/lib/celery/celery_stats.py @@ -0,0 +1,27 @@ +import time +import threading +from lib.stats_logger import stats_logger, ACTIVE_WORKERS, ACTIVE_TASKS + + +def send_stats_logger_metrics(celery): + while True: + i = celery.control.inspect() + + active = i.active() or {} + + active_workers = list(active.keys()) + active_tasks = 0 + for worker in active_workers: + if worker in active: + active_tasks += len(active[worker]) + + stats_logger.gauge(ACTIVE_WORKERS, len(active_workers)) + stats_logger.gauge(ACTIVE_TASKS, active_tasks) + time.sleep(5) + + +def start_stats_logger_monitor(celery): + thread = threading.Thread( + target=send_stats_logger_metrics, args=[celery], daemon=True + ) + thread.start() diff --git a/querybook/server/lib/stats_logger/__init__.py b/querybook/server/lib/stats_logger/__init__.py index 687043bc9..0a184d7f6 100644 --- a/querybook/server/lib/stats_logger/__init__.py +++ b/querybook/server/lib/stats_logger/__init__.py @@ -7,8 +7,12 @@ WS_CONNECTIONS = "ws.connections" SQL_SESSION_FAILURES = "sql_session.failures" TASK_FAILURES = "task.failures" +TASK_SUCCESSES = "task.successes" +TASK_RECEIVED = "task.received" REDIS_OPERATIONS = "redis.operations" QUERY_EXECUTIONS = "query.executions" +ACTIVE_WORKERS = "celery.active_workers" +ACTIVE_TASKS = "celery.active_tasks" logger_name = QuerybookSettings.STATS_LOGGER_NAME diff --git a/querybook/server/lib/stats_logger/all_stats_loggers.py b/querybook/server/lib/stats_logger/all_stats_loggers.py index 3f570786b..0fba53a03 100644 --- a/querybook/server/lib/stats_logger/all_stats_loggers.py +++ b/querybook/server/lib/stats_logger/all_stats_loggers.py @@ -1,4 +1,6 @@ from lib.utils.import_helper import import_module_with_default + +from .loggers.datadog_stats_logger import DatadogStatsLogger from .loggers.null_stats_logger import NullStatsLogger from .loggers.console_stats_logger import ConsoleStatsLogger @@ -8,11 +10,19 @@ default=[], ) -ALL_STATS_LOGGERS = [NullStatsLogger(), ConsoleStatsLogger()] + ALL_PLUGIN_STATS_LOGGERS +ALL_STATS_LOGGERS = [ + NullStatsLogger(), + ConsoleStatsLogger(), + DatadogStatsLogger(), +] + ALL_PLUGIN_STATS_LOGGERS def get_stats_logger_class(name: str): for logger in ALL_STATS_LOGGERS: if logger.logger_name == name: + if hasattr(logger, "initialize") and callable( + getattr(logger, "initialize") + ): + logger.initialize() return logger raise ValueError(f"Unknown stats logger name {name}") diff --git a/querybook/server/lib/stats_logger/loggers/datadog_stats_logger.py b/querybook/server/lib/stats_logger/loggers/datadog_stats_logger.py new file mode 100644 index 000000000..699fa54a5 --- /dev/null +++ b/querybook/server/lib/stats_logger/loggers/datadog_stats_logger.py @@ -0,0 +1,93 @@ +from lib.stats_logger.base_stats_logger import BaseStatsLogger +from env import QuerybookSettings +from lib.logger import get_logger + +LOG = get_logger(__file__) + + +class DatadogStatsLogger(BaseStatsLogger): + """ + Stats Logger implemention for Datadog using DogStatsD. + + Required environment variables: + - DD_API_KEY: The API key for Datadog. + - DD_AGENT_HOST: The host of the Datadog agent. + - DD_DOGSTATSD_PORT: The port of the Datadog agent, defaults to 8125. + - DD_SERVICE: The service name. + + Optional environment variables: + - DD_PREFIX: The prefix for all metrics. + - DD_TAGS: Additional tags to be added to all metrics. + """ + + metric_prefix = "" + dd_tags = [] + _statsd = None + + def metric_prefix_helper(self, key): + return self.metric_prefix + "." + key + + def tag_helper(self, tags): + if tags: + return [f"{k}:{v}" for k, v in tags.items()] + return [] + + def initialize(self): + try: + from datadog import initialize, statsd + + self._statsd = statsd + except ImportError: + raise ImportError( + "Datadog is not installed. Please install `requirements/datadog/datadog.txt` to use the Datadog stats logger." + ) + + if QuerybookSettings.DD_AGENT_HOST and QuerybookSettings.DD_DOGSTATSD_PORT: + LOG.info("Initializing Datadog") + + self.dd_tags = ( + QuerybookSettings.DD_TAGS.split(",") + if QuerybookSettings.DD_TAGS + else [] + ) + self.metric_prefix = ( + QuerybookSettings.DD_PREFIX or QuerybookSettings.DD_SERVICE + ) + + options = { + "statsd_host": QuerybookSettings.DD_AGENT_HOST, + "statsd_port": QuerybookSettings.DD_DOGSTATSD_PORT, + "statsd_constant_tags": self.dd_tags, + } + + initialize(**options) + else: + LOG.info("Datadog environment variables are not set") + + @property + def logger_name(self) -> str: + return "datadog" + + def incr(self, key: str, tags: dict[str, str] = None) -> None: + self._statsd.increment( + self.metric_prefix_helper(key), + 1, + tags=self.tag_helper(tags), + ) + + def decr(self, key: str, tags: dict[str, str] = None) -> None: + self._statsd.decrement( + self.metric_prefix_helper(key), + 1, + tags=self.tag_helper(tags), + ) + + def timing(self, key: str, value: float, tags: dict[str, str] = None) -> None: + self._statsd.histogram( + self.metric_prefix_helper(key), value, tags=self.tag_helper(tags) + ) + + def gauge(self, key: str, value: float, tags: dict[str, str] = None) -> None: + self._statsd.gauge( + self.metric_prefix_helper(key), value, tags=self.tag_helper(tags) + ) diff --git a/querybook/server/tasks/all_tasks.py b/querybook/server/tasks/all_tasks.py index 609eac5ea..f9afc3535 100644 --- a/querybook/server/tasks/all_tasks.py +++ b/querybook/server/tasks/all_tasks.py @@ -1,12 +1,23 @@ -from celery.signals import celeryd_init, task_failure +from celery.signals import ( + celeryd_init, + task_failure, + task_success, + task_received, + beat_init, +) from celery.utils.log import get_task_logger from importlib import import_module from app.flask_app import celery from env import QuerybookSettings from lib.logger import get_logger -from lib.stats_logger import TASK_FAILURES, stats_logger -from logic.schedule import get_schedule_task_type +from lib.stats_logger import ( + TASK_FAILURES, + TASK_SUCCESSES, + TASK_RECEIVED, + stats_logger, +) +from lib.celery.celery_stats import start_stats_logger_monitor from .export_query_execution import export_query_execution_task from .run_query import run_query_task @@ -20,6 +31,7 @@ from .presto_hive_function_scrapper import presto_hive_function_scrapper from .db_clean_up_jobs import run_all_db_clean_up_jobs from .disable_scheduled_docs import disable_scheduled_docs +from logic.schedule import get_schedule_task_type LOG = get_logger(__file__) @@ -60,5 +72,24 @@ def configure_workers(sender=None, conf=None, **kwargs): @task_failure.connect def handle_task_failure(sender, signal, *args, **kwargs): - task_type = get_schedule_task_type(sender.name) - stats_logger.incr(TASK_FAILURES, tags={"task_type": task_type}) + tags = {"task_name": sender.name, "task_type": get_schedule_task_type(sender.name)} + stats_logger.incr(TASK_FAILURES, tags=tags) + + +@task_success.connect +def handle_task_success(sender, signal, *args, **kwargs): + tags = {"task_name": sender.name, "task_type": get_schedule_task_type(sender.name)} + stats_logger.incr(TASK_SUCCESSES, tags=tags) + + +@task_received.connect +def handle_task_received(sender, signal, *args, **kwargs): + tags = {"task_name": kwargs["request"].name} + tags["task_type"] = get_schedule_task_type(tags["task_name"]) + stats_logger.incr(TASK_RECEIVED, tags=tags) + + +@beat_init.connect +def start_celery_stats_logging(sender=None, conf=None, **kwargs): + LOG.info("Starting Celery Beat") + start_stats_logger_monitor(celery) diff --git a/requirements/datadog/datadog.txt b/requirements/datadog/datadog.txt new file mode 100644 index 000000000..2b9866e0f --- /dev/null +++ b/requirements/datadog/datadog.txt @@ -0,0 +1,2 @@ +datadog==0.47.0 +ddtrace==2.0.2