From 4f3eb89818a3be7a4f09f4d7c89a6c086d3330db Mon Sep 17 00:00:00 2001 From: mytlogos Date: Tue, 10 May 2022 17:56:11 +0200 Subject: [PATCH] feat(scraper): add job metrics --- .../src/externals/jobScraperManager.ts | 1 + packages/scraper/src/externals/types.ts | 2 + packages/scraper/src/metrics.ts | 91 +++++++++++++++++++ packages/scraper/src/startCrawler.ts | 1 + 4 files changed, 95 insertions(+) create mode 100644 packages/scraper/src/metrics.ts diff --git a/packages/scraper/src/externals/jobScraperManager.ts b/packages/scraper/src/externals/jobScraperManager.ts index c5d98bc1..b1968852 100644 --- a/packages/scraper/src/externals/jobScraperManager.ts +++ b/packages/scraper/src/externals/jobScraperManager.ts @@ -572,6 +572,7 @@ export class JobScraperManager { type: "finished", jobName: item.name, jobId: item.id, + jobType: item.type, jobTrack: { modifications: store.get("modifications") || {}, network: store.get("network") || { diff --git a/packages/scraper/src/externals/types.ts b/packages/scraper/src/externals/types.ts index 6443ef63..44f4356b 100644 --- a/packages/scraper/src/externals/types.ts +++ b/packages/scraper/src/externals/types.ts @@ -11,6 +11,7 @@ import { Id, ExternalStorageUser, JobTrack, + ScrapeName, } from "enterprise-core/dist/types"; import { MediaType } from "enterprise-core/dist/tools"; import { JobCallback } from "../jobManager"; @@ -74,6 +75,7 @@ export type JobChannelMessage = StartJobChannelMessage | EndJobChannelMessage; export interface EndJobChannelMessage extends BasicJobChannelMessage { messageType: "jobs"; type: "finished"; + jobType: ScrapeName; jobName: string; jobId: number; timestamp: number; diff --git a/packages/scraper/src/metrics.ts b/packages/scraper/src/metrics.ts new file mode 100644 index 00000000..aafce329 --- /dev/null +++ b/packages/scraper/src/metrics.ts @@ -0,0 +1,91 @@ +import { Counter, Gauge } from "prom-client"; +import { channel } from "diagnostics_channel"; +import { EndJobChannelMessage, JobQueueChannelMessage } from "./externals/types"; + +const jobMaxCount = new Gauge({ + name: "scraper_job_count_max", + help: "Maximum number of jobs", +}); + +const jobQueueCount = new Gauge({ + name: "scraper_job_count_queue", + help: "Number of queued jobs", +}); + +const jobActiveCount = new Gauge({ + name: "scraper_job_count_active", + help: "Number of active jobs", +}); + +const jobResultCount = new Counter({ + name: "scraper_job_result_count", + help: "Count of finished jobs", + labelNames: ["result", "jobType"], +}); + +const jobModificationsCount = new Counter({ + name: "scraper_job_modification_count", + help: "Count of modified database entities", + labelNames: ["type", "entity", "jobType"], +}); + +const jobDBQueryCount = new Counter({ + name: "scraper_job_db_queries_count", + help: "Number of database queries", + labelNames: ["jobType"], +}); + +const jobNetworkQueryCount = new Counter({ + name: "scraper_job_network_query_count", + help: "Number of network queries", + labelNames: ["jobType"], +}); + +const jobNetworkSendCount = new Counter({ + name: "scraper_job_bytes_send_count", + help: "Network bytes send", + labelNames: ["jobType"], +}); + +const jobNetworkReceivedCount = new Counter({ + name: "scraper_job_bytes_received_count", + help: "Network bytes received", + labelNames: ["jobType"], +}); + +channel("enterprise-jobqueue").subscribe((message) => { + if (message && typeof message !== "object") { + return; + } + // @ts-expect-error + if ("messageType" in message && message.messageType === "jobqueue") { + const item = message as any as JobQueueChannelMessage; + + jobMaxCount.set(item.max); + jobQueueCount.set(item.queued); + jobActiveCount.set(item.active); + } +}); + +channel("enterprise-jobs").subscribe((message) => { + if (message && typeof message !== "object") { + return; + } + // @ts-expect-error + if ("messageType" in message && message.messageType === "jobs" && message.type === "finished") { + const item = message as any as EndJobChannelMessage; + + jobResultCount.inc({ result: item.result, jobType: item.jobType }, 1); + + for (const [key, value] of Object.entries(item.jobTrack.modifications)) { + jobModificationsCount.inc({ type: "delete", entity: key, jobType: item.jobType }, value.deleted); + jobModificationsCount.inc({ type: "update", entity: key, jobType: item.jobType }, value.updated); + jobModificationsCount.inc({ type: "insert", entity: key, jobType: item.jobType }, value.created); + } + + jobDBQueryCount.inc({ jobType: item.jobType }, item.jobTrack.queryCount); + jobNetworkQueryCount.inc({ jobType: item.jobType }, item.jobTrack.network.count); + jobNetworkSendCount.inc({ jobType: item.jobType }, item.jobTrack.network.sent); + jobNetworkReceivedCount.inc({ jobType: item.jobType }, item.jobTrack.network.received); + } +}); diff --git a/packages/scraper/src/startCrawler.ts b/packages/scraper/src/startCrawler.ts index 81611c9e..1f293e9d 100644 --- a/packages/scraper/src/startCrawler.ts +++ b/packages/scraper/src/startCrawler.ts @@ -19,6 +19,7 @@ import { } from "./externals/hookManager"; import path from "path"; import { readFileSync } from "fs"; +import "./metrics"; collectDefaultMetrics({ labels: {