diff --git a/Dockerfile b/Dockerfile index 9c88809..c1fed6f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,27 @@ -FROM python:3-slim +FROM python:3-alpine + WORKDIR /app + +# Install build dependencies +RUN apk add --no-cache --virtual .build-deps gcc musl-dev + +# Copy and install requirements COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt + +# Remove build dependencies +RUN apk del .build-deps + +# Copy the script COPY adguard_exporter.py . + +# Expose the metrics port EXPOSE 8000 + +# Set environment variables (can be overridden at runtime) +ENV LOG_FILE_PATH=/adguard/work/data/querylog.json +ENV METRICS_PORT=8000 +ENV UPDATE_INTERVAL=10 + +# Run the exporter CMD ["python", "./adguard_exporter.py"] diff --git a/adguard_exporter.py b/adguard_exporter.py index 8adc49b..291f2d2 100755 --- a/adguard_exporter.py +++ b/adguard_exporter.py @@ -3,11 +3,26 @@ import os import sys import threading +import logging from prometheus_client import make_wsgi_app, Counter, Gauge from collections import Counter as CollectionsCounter, defaultdict from wsgiref.simple_server import make_server +import heapq +import orjson +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +import signal -# Define separate Prometheus metrics +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Configuration (can be overridden by environment variables) +LOG_FILE_PATH = os.environ.get('LOG_FILE_PATH', '/adguard/work/data/querylog.json') +METRICS_PORT = int(os.environ.get('METRICS_PORT', 8000)) +UPDATE_INTERVAL = int(os.environ.get('UPDATE_INTERVAL', 10)) + +# Define Prometheus metrics dns_queries = Counter('agh_dns_queries', 'Total number of DNS queries') blocked_queries = Counter('agh_blocked_dns_queries', 'Total number of blocked DNS queries') query_types = Counter('agh_dns_query_types', 'Types of DNS queries', ['query_type']) @@ -17,147 +32,234 @@ average_response_time = Gauge('agh_dns_average_response_time', 'Average response time for DNS queries in milliseconds') average_upstream_response_time = Gauge('agh_dns_average_upstream_response_time', 'Average response time by upstream server', ['server']) -# Define counters to track hosts and upstream response times -host_counter = CollectionsCounter() -blocked_host_counter = CollectionsCounter() -total_response_time = 0 -total_queries = 0 -upstream_response_times = defaultdict(list) +class TopHosts: + def __init__(self, max_size=100): + self.max_size = max_size + self.counter = CollectionsCounter() + self.heap = [] + self.lock = threading.Lock() -log_file_path = '/opt/adguardhome/work/data/querylog.json' -position_file_path = '/opt/adguardhome/work/data/.position' + def add(self, host): + with self.lock: + self.counter[host] += 1 + count = self.counter[host] + self.heap = [(c, h) for c, h in self.heap if h != host] + if len(self.heap) < self.max_size: + heapq.heappush(self.heap, (count, host)) + elif count > self.heap[0][0]: + heapq.heappushpop(self.heap, (count, host)) -def get_last_position(): - if os.path.exists(position_file_path): - try: - with open(position_file_path, 'r') as f: - pos, inode = f.read().strip().split('\n') - pos = int(pos) - inode = int(inode) - print(f"Read last position: {pos}, inode: {inode}") - sys.stdout.flush() - return pos, inode - except (ValueError, OSError) as e: - print(f"Error reading last position: {e}") - sys.stdout.flush() - return 0, None - else: - print("Position file not found, starting from the beginning.") - sys.stdout.flush() - return 0, None - -def save_last_position(pos, inode): - with open(position_file_path, 'w') as f: - f.write(f"{pos}\n{inode}") - -def read_new_lines(file, start_pos): - file.seek(start_pos) - lines = file.readlines() - new_pos = file.tell() - return lines, new_pos - -def update_top_hosts(counter, metric, top_n): - top_items = counter.most_common(top_n) - metric._metrics.clear() - for item in top_items: - metric.labels(item[0]).inc(item[1]) - -def calculate_averages(): - if total_queries > 0: - avg_response_time = total_response_time / total_queries - average_response_time.set(avg_response_time) - - for upstream, times in upstream_response_times.items(): - if times: - avg_upstream_time = sum(times) / len(times) - average_upstream_response_time.labels(upstream).set(avg_upstream_time) - -def parse_and_export(lines): - global host_counter, blocked_host_counter, total_response_time, total_queries, upstream_response_times - - for line in lines: - if line.strip(): - try: - data = json.loads(line) - host = data.get('QH', 'unknown') - query_type = data.get('QT', 'unknown') - is_blocked = data.get('Result', {}).get('IsFiltered', False) - result_reason = str(data.get('Result', {}).get('Reason', 'unknown')) - status = 'blocked' if is_blocked else 'success' - elapsed_ns = data.get('Elapsed', 0) - upstream = data.get('Upstream', 'unknown') - - dns_queries.inc() - query_types.labels(query_type).inc() - - if not is_blocked: - host_counter[host] += 1 - - # Convert nanoseconds to milliseconds - elapsed_ms = elapsed_ns / 1_000_000 - total_response_time += elapsed_ms - total_queries += 1 - - if upstream != 'unknown': - upstream_response_times[upstream].append(elapsed_ms) - - if is_blocked and result_reason == '3': - blocked_queries.inc() - blocked_host_counter[host] += 1 - if is_blocked and result_reason == '7': - safe_search_enforced_hosts.labels(host).inc() - - # Update Prometheus metrics with top 100 hosts - update_top_hosts(host_counter, top_hosts, 100) - update_top_hosts(blocked_host_counter, top_blocked_hosts, 100) - except json.JSONDecodeError as e: - print(f"Error decoding JSON: {e}, line: {line}") - sys.stdout.flush() - pass - - calculate_averages() - -def start_metrics_server(): - app = make_wsgi_app() - httpd = make_server('', 8000, app) - print("Prometheus metrics server started on port 8000, /metrics endpoint") - sys.stdout.flush() - httpd.serve_forever() + def get_top(self): + with self.lock: + return sorted(self.heap, reverse=True) -if __name__ == '__main__': - metrics_thread = threading.Thread(target=start_metrics_server) - metrics_thread.daemon = True - metrics_thread.start() +class MetricsCollector: + def __init__(self): + self.top_hosts = TopHosts(max_size=100) + self.top_blocked_hosts = TopHosts(max_size=100) + self.response_times = [] + self.upstream_response_times = defaultdict(list) + self.window_size = 300 # 5 minutes in seconds + self.lock = threading.Lock() - while not os.path.exists(log_file_path): - print(f"Waiting for {log_file_path} to be created...") - sys.stdout.flush() - time.sleep(10) + def update_metrics(self, data): + current_time = time.time() + host = data.get('QH', 'unknown') + query_type = data.get('QT', 'unknown') + is_blocked = data.get('Result', {}).get('IsFiltered', False) + result_reason = str(data.get('Result', {}).get('Reason', 'unknown')) + elapsed_ns = data.get('Elapsed', 0) + upstream = data.get('Upstream', 'unknown') - print(f"Log file {log_file_path} found") - sys.stdout.flush() - last_position, last_inode = get_last_position() + dns_queries.inc() + query_types.labels(query_type).inc() - while True: - try: - current_inode = os.stat(log_file_path).st_ino + if not is_blocked: + self.top_hosts.add(host) + + elapsed_ms = elapsed_ns / 1_000_000 # Convert nanoseconds to milliseconds + with self.lock: + self.response_times.append((current_time, elapsed_ms)) + + if upstream != 'unknown': + with self.lock: + self.upstream_response_times[upstream].append((current_time, elapsed_ms)) + + if is_blocked and result_reason == '3': + blocked_queries.inc() + self.top_blocked_hosts.add(host) + if is_blocked and result_reason == '7': + safe_search_enforced_hosts.labels(host).inc() + + self.update_prometheus_metrics() + + def update_prometheus_metrics(self): + current_time = time.time() + cutoff_time = current_time - self.window_size + + # Update top hosts metrics + top_hosts._metrics.clear() + for count, host in self.top_hosts.get_top(): + top_hosts.labels(host).inc(count) + + # Update top blocked hosts metrics + top_blocked_hosts._metrics.clear() + for count, host in self.top_blocked_hosts.get_top(): + top_blocked_hosts.labels(host).inc(count) + + with self.lock: + recent_response_times = [rt for t, rt in self.response_times if t > cutoff_time] + if recent_response_times: + avg_response_time = sum(recent_response_times) / len(recent_response_times) + average_response_time.set(avg_response_time) + + for upstream, times in self.upstream_response_times.items(): + recent_times = [rt for t, rt in times if t > cutoff_time] + if recent_times: + avg_upstream_time = sum(recent_times) / len(recent_times) + average_upstream_response_time.labels(upstream).set(avg_upstream_time) + + self.response_times = [(t, rt) for t, rt in self.response_times if t > cutoff_time] + for upstream in self.upstream_response_times: + self.upstream_response_times[upstream] = [(t, rt) for t, rt in self.upstream_response_times[upstream] if t > cutoff_time] + +class LogHandler(FileSystemEventHandler): + def __init__(self, log_file_path, metrics_collector): + self.log_file_path = log_file_path + self.metrics_collector = metrics_collector + self.last_position = 0 + self.last_update_time = time.time() + self.is_initialized = False + self.start_time = time.time() + self.last_inode = None + self.initial_load() + + def get_inode(self): + return os.stat(self.log_file_path).st_ino if os.path.exists(self.log_file_path) else None - if last_inode and last_inode != current_inode: - last_position = 0 - print(f"Log file rotated, resetting position to {last_position}") - sys.stdout.flush() + def initial_load(self): + logger.info(f"Performing initial load of log file: {self.log_file_path}") + self.last_inode = self.get_inode() + if self.last_inode and os.path.exists(self.log_file_path): + with open(self.log_file_path, 'r') as log_file: + for line in log_file: + if line.strip(): + try: + data = orjson.loads(line) + self.metrics_collector.update_metrics(data) + except orjson.JSONDecodeError: + logger.error(f"Error decoding JSON: {line}") + self.last_position = log_file.tell() + self.last_update_time = time.time() + self.is_initialized = True + logger.info(f"Initial load complete. Processed up to position {self.last_position}") + else: + logger.warning(f"Log file does not exist: {self.log_file_path}") - with open(log_file_path, 'r') as log_file: - new_lines, new_position = read_new_lines(log_file, last_position) - if new_lines: - parse_and_export(new_lines) - save_last_position(new_position, current_inode) + def on_created(self, event): + if event.src_path == self.log_file_path: + logger.info(f"Log file created: {self.log_file_path}") + self.process_new_lines() - last_position = new_position - last_inode = current_inode + def on_modified(self, event): + if event.src_path == self.log_file_path: + logger.info(f"Log file modified: {self.log_file_path}") + self.process_new_lines() + def process_new_lines(self): + try: + current_inode = self.get_inode() + if current_inode != self.last_inode: + logger.info(f"Log file rotated. New inode detected: {current_inode}") + self.last_position = 0 # Reset position to start of the new file + self.last_inode = current_inode + + if not os.path.exists(self.log_file_path): + logger.warning(f"Log file does not exist: {self.log_file_path}") + return + + with open(self.log_file_path, 'r') as log_file: + log_file.seek(self.last_position) + lines = log_file.readlines() + logger.info(f"Processing {len(lines)} new lines") + for line in lines: + if line.strip(): + try: + data = orjson.loads(line) + self.metrics_collector.update_metrics(data) + except orjson.JSONDecodeError: + logger.error(f"Error decoding JSON: {line}") + self.last_position = log_file.tell() + self.last_update_time = time.time() + self.is_initialized = True except Exception as e: - print(f"Error during processing: {e}") - sys.stdout.flush() + logger.error(f"Error processing log file: {e}") + + def is_ready(self): + return self.is_initialized or time.time() - self.start_time < 120 + + def is_healthy(self): + return (self.is_initialized or time.time() - self.start_time < 120) and \ + (not os.path.exists(self.log_file_path) or time.time() - self.last_update_time < UPDATE_INTERVAL * 2) + +class HealthServer: + def __init__(self, log_handler): + self.log_handler = log_handler + + def livez(self, environ, start_response): + status = '200 OK' if self.log_handler.is_healthy() else '503 Service Unavailable' + headers = [('Content-type', 'text/plain; charset=utf-8')] + start_response(status, headers) + return [b"Alive" if status == '200 OK' else b"Unhealthy"] + + def readyz(self, environ, start_response): + status = '200 OK' if self.log_handler.is_ready() else '503 Service Unavailable' + headers = [('Content-type', 'text/plain; charset=utf-8')] + start_response(status, headers) + return [b"Ready" if status == '200 OK' else b"Not Ready"] + +def start_metrics_server(port, health_server): + def combined_app(environ, start_response): + if environ['PATH_INFO'] == '/livez': + return health_server.livez(environ, start_response) + elif environ['PATH_INFO'] == '/readyz': + return health_server.readyz(environ, start_response) + return make_wsgi_app()(environ, start_response) + + httpd = make_server('', port, combined_app) + logger.info(f"Prometheus metrics server started on port {port}, /metrics, /livez, and /readyz endpoints") + httpd.serve_forever() + +def graceful_shutdown(signum, frame): + logger.info("Received shutdown signal. Exiting...") + observer.stop() + observer.join() + sys.exit(0) + +if __name__ == '__main__': + metrics_collector = MetricsCollector() + log_handler = LogHandler(LOG_FILE_PATH, metrics_collector) + health_server = HealthServer(log_handler) + + observer = Observer() + observer.schedule(log_handler, path=os.path.dirname(LOG_FILE_PATH), recursive=False) + observer.start() + + metrics_thread = threading.Thread(target=start_metrics_server, args=(METRICS_PORT, health_server)) + metrics_thread.daemon = True + metrics_thread.start() + + signal.signal(signal.SIGTERM, graceful_shutdown) + signal.signal(signal.SIGINT, graceful_shutdown) - time.sleep(10) + try: + while True: + time.sleep(UPDATE_INTERVAL) + except KeyboardInterrupt: + pass + finally: + observer.stop() + observer.join() + # Make sure the metrics server is properly stopped + logger.info("Shutting down the metrics server.") + sys.exit(0) diff --git a/requirements.txt b/requirements.txt index a0753df..44cc46e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ prometheus_client +watchdog +orjson