Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yadhap/job schedular refactor #679

Merged
merged 2 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
936 changes: 174 additions & 762 deletions server/job-scheduler.js

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions server/jobSchedularMethods/apiKeys.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const path = require("path");

const logger = require("../config/logger");

const APIKEY_MONITORING = "submitApiKeyMonitoring.js";

/**
* Schedule key check job.
* This function adds a job to the job scheduler to check the keys at specific intervals.
* @async
* @function scheduleKeyCheck
* @memberof module:jobSchedularMethods/apiKeys
* @throws {Error} If an error occurs while scheduling the job.
*/

async function scheduleKeyCheck() {
try {
let jobName = "key-check-" + new Date().getTime();
this.bree.add({
name: jobName,
interval: "at 05:30am also at 05:30pm",
path: path.join(__dirname, "..", "jobs", APIKEY_MONITORING),
worker: {
workerData: {
jobName: jobName,
WORKER_CREATED_AT: Date.now(),
},
},
});

this.bree.start(jobName);
logger.info("📺 KEY MONITORING STARTED ...");
} catch (err) {
logger.error(err);
}
}

module.exports = {
scheduleKeyCheck,
};
203 changes: 203 additions & 0 deletions server/jobSchedularMethods/breeJobs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
const logger = require("../config/logger");
const path = require("path");

function createNewBreeJob({
uniqueJobName,
cron,
jobfileName,
sprayedFileScope,
manualJob_meta,
sprayFileName,
sprayDropZone,
applicationId,
dataflowId,
dataflowVersionId = null,
clusterId,
metaData,
jobName,
contact,
jobType,
status,
jobId,
title,
jobExecutionGroupId,
}) {
const job = {
name: uniqueJobName,
path: path.join(__dirname, "..", "jobs", jobfileName),
worker: {
workerData: {
WORKER_CREATED_AT: Date.now(),
dataflowVersionId,
sprayedFileScope,
manualJob_meta,
sprayFileName,
sprayDropZone,
applicationId,
dataflowId,
clusterId,
metaData,
jobName,
contact,
jobType,
status,
jobId,
title,
jobExecutionGroupId,
},
},
};
if (cron) {
job.cron = cron;
job.worker.workerData.isCronJob = true;
} else {
job.timeout = 0;
job.worker.workerData.isCronJob = false;
}
this.bree.add(job);
}

async function removeJobFromScheduler(name) {
try {
const existingJob = this.bree.config.jobs.find((job) => job.name === name);
if (existingJob) {
await this.bree.remove(name);
logger.info(`📢 -Job removed from Bree ${existingJob.name}`);
return { success: true, job: existingJob, jobs: this.bree.config.jobs };
}
} catch (err) {
// Handle error
logger.error(err);
return {
success: false,
message: err.message,
jobs: this.bree.config.jobs,
};
}
}

async function removeAllFromBree(namePart) {
try {
const existingJobs = this.bree.config.jobs.filter((job) =>
job.name.includes(namePart)
);
if (existingJobs.length > 0) {
for (const job of existingJobs) {
try {
await this.bree.remove(job.name);
logger.info(`📢 -Job removed from Bree ${job.name}`);
} catch (error) {
logger.error(error);
}
}
}
} catch (err) {
logger.error(err);
}
}

function getAllJobs() {
return this.bree.config.jobs;
}

async function stopJob(jobName) {
const job = this.bree.config.jobs.find((job) => job.name === jobName);
try {
if (job) {
await this.bree.stop(jobName);
return { success: true, job, jobs: this.bree.config.jobs };
} else {
return {
success: false,
message: "job is not found",
jobs: this.bree.config.jobs,
};
}
} catch (err) {
return {
success: false,
message: err.message,
jobs: this.bree.config.jobs,
};
}
}

async function stopAllJobs() {
try {
const allJobs = [...this.bree.config.jobs];
await this.bree.stop();
return { success: true, jobs: allJobs };
} catch (err) {
return {
success: false,
message: err.message,
jobs: this.bree.config.jobs,
};
}
}

function startJob(jobName) {
const job = this.bree.config.jobs.find((job) => job.name === jobName);
try {
if (job) {
this.bree.start(jobName);
return { success: true, job, jobs: this.bree.config.jobs };
} else {
return {
success: false,
message: "job is not found",
jobs: this.bree.config.jobs,
};
}
} catch (err) {
return {
success: false,
message: err.message,
jobs: this.bree.config.jobs,
};
}
}

function startAllJobs() {
try {
const allJobs = [...this.bree.config.jobs];
this.bree.start();
return { success: true, jobs: allJobs };
} catch (err) {
return {
success: false,
message: err.message,
jobs: this.bree.config.jobs,
};
}
}

function logBreeJobs() {
if (process.env.NODE_ENV === "production") return; //do not polute logs during production;
const jobs = this.bree.config.jobs;
logger.verbose("📢 Bree jobs:");
for (const job of jobs) {
if (job.name.includes("job-status-poller")) continue; // hide status poller from logs
if (job.name.includes("file-monitoring")) continue; // hide file monitoring from logs
logger.verbose({
name: job.name,
cron: job.cron,
jobName: job.worker?.workerData?.jobName,
dataflowId: job.worker?.workerData?.dataflowId,
dataflowVersionId: job.worker?.workerData?.dataflowVersionId,
group: job.worker?.workerData?.jobExecutionGroupId,
});
}
}

module.exports = {
createNewBreeJob,
removeJobFromScheduler,
removeAllFromBree,
getAllJobs,
stopJob,
stopAllJobs,
startJob,
startAllJobs,
logBreeJobs,
};
82 changes: 82 additions & 0 deletions server/jobSchedularMethods/clusterJobs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
const path = require("path");

const logger = require("../config/logger");
const models = require("../models");

const ClusterMonitoring = models.clusterMonitoring;
const CLUSTER_TIMEZONE_OFFSET = "clustertimezoneoffset.js";
const CLUSTER_USAGE_HISTORY_TRACKER = "submitClusterUsageTracker.js";
const SUBMIT_CLUSTER_MONITORING_JOB = "submitClusterMonitoring.js";

async function scheduleClusterTimezoneOffset() {
logger.info("☸ CLUSTER TIMEZONE OFFSET STARTED ...");
try {
let jobName = "cluster-timezone-offset-" + new Date().getTime();
this.bree.add({
name: jobName,
interval: "at 02:30am also at 02:30pm",
path: path.join(__dirname, "..", "jobs", CLUSTER_TIMEZONE_OFFSET),
worker: {
workerData: {
jobName: jobName,
WORKER_CREATED_AT: Date.now(),
},
},
});

this.bree.start(jobName);
} catch (err) {
logger.error(err);
}
}

async function createClusterUsageHistoryJob() {
const uniqueJobName = `Cluster Usage History Tracker`;
const job = {
interval: 14400000, // 4 hours
name: uniqueJobName,
path: path.join(__dirname, "..", "jobs", CLUSTER_USAGE_HISTORY_TRACKER),
};
this.bree.add(job);
this.bree.start(uniqueJobName);
logger.info("📈 CLUSTER USAGE HISTORY TRACKER JOB STARTED ...");
}

function createClusterMonitoringBreeJob({ clusterMonitoring_id, cron }) {
const uniqueJobName = `Cluster Monitoring - ${clusterMonitoring_id}`;
const job = {
cron,
name: uniqueJobName,
path: path.join(__dirname, "jobs", SUBMIT_CLUSTER_MONITORING_JOB),
worker: {
workerData: { clusterMonitoring_id },
},
};
this.bree.add(job);
this.bree.start(uniqueJobName);
}

async function scheduleClusterMonitoringOnServerStart() {
try {
logger.info("📺 CLUSTER MONITORING STARTED ...");
const clusterMonitoring = await ClusterMonitoring.findAll({ raw: true });
for (let monitoring of clusterMonitoring) {
const { id, cron, isActive } = monitoring;
if (isActive) {
this.createClusterMonitoringBreeJob({
clusterMonitoring_id: id,
cron,
});
}
}
} catch (err) {
logger.error(err);
}
}

module.exports = {
scheduleClusterTimezoneOffset,
createClusterUsageHistoryJob,
createClusterMonitoringBreeJob,
scheduleClusterMonitoringOnServerStart,
};
Loading
Loading