From a4a7be7168b36c6cf4efe1e1075102f483710af4 Mon Sep 17 00:00:00 2001 From: yadhap Dahal Date: Fri, 5 Jan 2024 09:35:14 -0500 Subject: [PATCH] re-factored job-scheduler file --- server/job-scheduler.js | 936 ++++-------------- server/jobSchedularMethods/apiKeys.js | 40 + server/jobSchedularMethods/breeJobs.js | 203 ++++ server/jobSchedularMethods/clusterJobs.js | 82 ++ server/jobSchedularMethods/hpccFiles.js | 219 ++++ server/jobSchedularMethods/hpccJobs.js | 71 ++ .../routineJobsMonitoring.js | 47 + server/jobSchedularMethods/workFlowJobs.js | 332 +++++++ 8 files changed, 1168 insertions(+), 762 deletions(-) create mode 100644 server/jobSchedularMethods/apiKeys.js create mode 100644 server/jobSchedularMethods/breeJobs.js create mode 100644 server/jobSchedularMethods/clusterJobs.js create mode 100644 server/jobSchedularMethods/hpccFiles.js create mode 100644 server/jobSchedularMethods/hpccJobs.js create mode 100644 server/jobSchedularMethods/routineJobsMonitoring.js create mode 100644 server/jobSchedularMethods/workFlowJobs.js diff --git a/server/job-scheduler.js b/server/job-scheduler.js index 58526f27e..4b33d7d18 100644 --- a/server/job-scheduler.js +++ b/server/job-scheduler.js @@ -1,38 +1,51 @@ -const { v4: uuidv4 } = require("uuid"); const Bree = require("bree"); -const path = require("path"); -const workflowUtil = require("./utils/workflow-util.js"); const logger = require("./config/logger"); -const models = require("./models"); - -const Job = models.job; -const MessageBasedJobs = models.message_based_jobs; -const DataflowVersions = models.dataflow_versions; -const JobExecution = models.job_execution; -const FileMonitoring = models.fileMonitoring; -const ClusterMonitoring = models.clusterMonitoring; -const JobMonitoring = models.jobMonitoring; -const filemonitoring_superfile = models.filemonitoring_superfiles; - -const SUBMIT_JOB_FILE_NAME = "submitJob.js"; -const SUBMIT_QUERY_PUBLISH = "submitPublishQuery.js"; -const SUBMIT_SPRAY_JOB_FILE_NAME = "submitSprayJob.js"; -const SUBMIT_SCRIPT_JOB_FILE_NAME = "submitScriptJob.js"; -const SUBMIT_MANUAL_JOB_FILE_NAME = "submitManualJob.js"; -const SUBMIT_GITHUB_JOB_FILE_NAME = "submitGithubJob.js"; -const SUBMIT_LANDINGZONE_FILEMONITORING_FILE_NAME = - "submitLandingZoneFileMonitoring.js"; -const SUBMIT_LOGICAL_FILEMONITORING_FILE_NAME = - "submitLogicalFileMonitoring.js"; -const JOB_STATUS_POLLER = "statusPoller.js"; -const FILE_MONITORING = "fileMonitoringPoller.js"; -const CLUSTER_TIMEZONE_OFFSET = "clustertimezoneoffset.js"; -const SUBMIT_CLUSTER_MONITORING_JOB = "submitClusterMonitoring.js"; -const APIKEY_MONITORING = "submitApiKeyMonitoring.js"; -const JOB_MONITORING = "submitJobMonitoring.js"; -const SUBMIT_SUPER_FILEMONITORING_FILE_NAME = "submitSuperFileMonitoring.js"; -const CLUSTER_USAGE_HISTORY_TRACKER = "submitClusterUsageTracker.js"; + +const { + logBreeJobs, + createNewBreeJob, + removeJobFromScheduler, + removeAllFromBree, + getAllJobs, + stopJob, + stopAllJobs, + startJob, + startAllJobs, +} = require("./jobSchedularMethods/breeJobs.js"); + +const { + scheduleCheckForJobsWithSingleDependency, + executeJob, + scheduleActiveCronJobs, + scheduleMessageBasedJobs, + addJobToScheduler, +} = require("./jobSchedularMethods/workFlowJobs.js"); +const { + scheduleClusterTimezoneOffset, + createClusterUsageHistoryJob, + createClusterMonitoringBreeJob, + scheduleClusterMonitoringOnServerStart, +} = require("./jobSchedularMethods/clusterJobs.js"); + +const { + createJobMonitoringBreeJob, + scheduleJobMonitoringOnServerStart, + scheduleJobStatusPolling, +} = require("./jobSchedularMethods/hpccJobs.js"); + +const { + createLandingZoneFileMonitoringBreeJob, + createLogicalFileMonitoringBreeJob, + createSuperFileMonitoringBreeJob, + scheduleSuperFileMonitoringOnServerStart, + scheduleFileMonitoringBreeJob, + scheduleFileMonitoringOnServerStart, + scheduleFileMonitoring, +} = require("./jobSchedularMethods/hpccFiles.js"); + +const { scheduleKeyCheck } = require("./jobSchedularMethods/apiKeys.js"); + class JobScheduler { constructor() { @@ -102,318 +115,13 @@ class JobScheduler { await this.scheduleKeyCheck(); await this.scheduleJobMonitoringOnServerStart(); await this.createClusterUsageHistoryJob(); - logger.info("✔️ JOB SCHEDULER BOOTSTRAPPED..."); })(); } - async scheduleCheckForJobsWithSingleDependency({ - dependsOnJobId, - dataflowId, - dataflowVersionId, - jobExecutionGroupId, - }) { - try { - const dataflowVersion = await DataflowVersions.findOne({ - where: { id: dataflowVersionId }, - attributes: ["graph"], - }); - if (!dataflowVersion) throw new Error("Dataflow version does not exist"); - - let dependentJobs = dataflowVersion.graph.cells.reduce((acc, cell) => { - if (cell?.data?.schedule?.dependsOn?.includes(dependsOnJobId)) - acc.push({ jobId: cell.data.assetId }); - return acc; - }, []); - - if (dependentJobs.length === 0 && dataflowId) { - try { - logger.info( - "WORKFLOW EXECUTION COMPLETE, Checking if subscribed for notifications." - ); - await workflowUtil.notifyWorkflow({ - dataflowId, - jobExecutionGroupId, - status: "completed", - }); - } catch (error) { - logger.error( - "WORKFLOW EXECUTION COMPLETE NOTIFICATION FAILED", - error - ); - } - } else { - logger.verbose(`✔️ FOUND ${dependentJobs.length} DEPENDENT JOB/S`); - //List of dependent job ids - let dependentJobsIds = dependentJobs.map((job) => job.jobId); - //Check if any of the dependent job are already in submitted state - const activeJobs = await JobExecution.findAll({ - where: { - dataflowId: dataflowId, - jobId: dependentJobsIds, - status: ["submitted", "blocked", "cow"], - }, - attributes: ["jobId"], - raw: true, - }); - const activeJobIds = activeJobs.map((activeJob) => activeJob.jobId); - //Remove already submitted jobs from dependent jobs array - dependentJobs = dependentJobs.filter( - (dependentJob) => !activeJobIds.includes(dependentJob.jobId) - ); - - for (const dependentJob of dependentJobs) { - try { - let job = await Job.findOne({ where: { id: dependentJob.jobId } }); - let status; - const isSprayJob = job.jobType == "Spray"; - const isScriptJob = job.jobType == "Script"; - const isManualJob = job.jobType === "Manual"; - const isQueryPublishJob = job.jobType === "Query Publish"; - - const isGitHubJob = job.metaData?.isStoredOnGithub; - - logger.info( - `🔄 EXECUTING DEPENDANT JOB "${job.name}" id:${job.id}; dflow: ${dataflowId};` - ); - - const commonWorkerData = { - applicationId: job.application_id, - clusterId: job.cluster_id, - dataflowId: dataflowId, - jobExecutionGroupId, - jobType: job.jobType, - dataflowVersionId, - jobName: job.name, - title: job.title, - jobId: job.id, - }; - - if (isSprayJob) { - status = this.executeJob({ - ...commonWorkerData, - sprayFileName: job.sprayFileName, - sprayDropZone: job.sprayDropZone, - sprayedFileScope: job.sprayedFileScope, - jobfileName: SUBMIT_SPRAY_JOB_FILE_NAME, - }); - } else if (isScriptJob) { - status = this.executeJob({ - jobfileName: SUBMIT_SCRIPT_JOB_FILE_NAME, - ...commonWorkerData, - }); - } else if (isManualJob) { - status = this.executeJob({ - ...commonWorkerData, - status: "wait", - jobfileName: SUBMIT_MANUAL_JOB_FILE_NAME, - manualJob_meta: { - jobType: "Manual", - jobName: job.name, - notifiedTo: job.contact, - }, - }); - } else if (isGitHubJob) { - status = this.executeJob({ - ...commonWorkerData, - metaData: job.metaData, - jobfileName: SUBMIT_GITHUB_JOB_FILE_NAME, - }); - } else if (isQueryPublishJob) { - status = this.executeJob({ - jobfileName: SUBMIT_QUERY_PUBLISH, - ...commonWorkerData, - }); - } else { - status = this.executeJob({ - jobfileName: SUBMIT_JOB_FILE_NAME, - ...commonWorkerData, - }); - } - if (!status.success) throw status; - } catch (error) { - // failed to execute dependent job through bree. User will be notified inside worker - logger.error("Failed to execute dependent job through bree", error); - } - } - } - } catch (error) { - logger.error(error); - const message = `Error happened while trying to execute workflow, try to 'Save version' and execute it again. | Error: ${error.message} `; - await workflowUtil.notifyWorkflow({ - dataflowId, - jobExecutionGroupId, - status: "error", - exceptions: message, - }); - } - } - - executeJob(jobData) { - try { - let uniqueJobName = - jobData.jobName + - "-" + - jobData.dataflowId + - "-" + - jobData.jobId + - "-" + - uuidv4(); - this.createNewBreeJob({ ...jobData, uniqueJobName }); - this.bree.start(uniqueJobName); - logger.info(`✔️ BREE HAS STARTED JOB: "${uniqueJobName}"`); - this.logBreeJobs(); - return { - success: true, - message: `Successfully executed ${jobData.jobName}`, - }; - } catch (err) { - logger.error(err); - return { - success: false, - contact: jobData.contact, - jobName: jobData.jobName, - clusterId: jobData.clusterId, - dataflowId: jobData.dataflowId, - message: `Error executing ${jobName} - ${err.message}`, - }; - } - } - - async scheduleActiveCronJobs() { - try { - // get all graphs active graphs - const dataflowsVersions = await DataflowVersions.findAll({ - where: { isLive: true }, - attributes: ["id", "graph", "dataflowId"], - }); - - for (const dataflowsVersion of dataflowsVersions) { - const cronScheduledNodes = - dataflowsVersion.graph?.cells?.filter( - (cell) => cell.data?.schedule?.cron - ) || []; - if (cronScheduledNodes.length > 0) { - for (const node of cronScheduledNodes) { - try { - const job = await Job.findOne({ - where: { id: node.data.assetId }, - }); - if (!job) throw new Error(`Failed to schedule job ${job.name}`); - - const isSprayJob = job.jobType == "Spray"; - const isScriptJob = job.jobType == "Script"; - const isManualJob = job.jobType === "Manual"; - const isGitHubJob = job.metaData?.isStoredOnGithub; - - const workerData = { - dataflowVersionId: dataflowsVersion.id, - dataflowId: dataflowsVersion.dataflowId, - applicationId: job.application_id, - cron: node.data.schedule.cron, - clusterId: job.cluster_id, - jobType: job.jobType, - jobName: job.name, - title: job.title, - jobId: job.id, - skipLog: true, - }; - - workerData.jobfileName = SUBMIT_JOB_FILE_NAME; - - if (isScriptJob) jobfileName = SUBMIT_SCRIPT_JOB_FILE_NAME; - if (isSprayJob) { - workerData.jobfileName = SUBMIT_SPRAY_JOB_FILE_NAME; - workerData.sprayedFileScope = job.sprayedFileScope; - workerData.sprayFileName = job.sprayFileName; - workerData.sprayDropZone = job.sprayDropZone; - } - if (isManualJob) { - workerData.manualJob_meta = { - jobType: "Manual", - jobName: job.name, - notifiedTo: job.contact, - notifiedOn: new Date().getTime(), - }; - workerData.jobfileName = SUBMIT_MANUAL_JOB_FILE_NAME; - workerData.contact = job.contact; - } - if (isGitHubJob) { - workerData.jobfileName = SUBMIT_GITHUB_JOB_FILE_NAME; - workerData.metaData = job.metaData; - } - //finally add the job to the scheduler - this.addJobToScheduler(workerData); - } catch (error) { - logger.error(error); - } - } - } - } - } catch (error) { - logger.error(error); - } - logger.verbose( - `📢 ACTIVE CRON JOBS (${this.bree.config.jobs.length}) (does not include dependent jobs):` - ); - this.logBreeJobs(); - } - - async scheduleMessageBasedJobs(message) { - try { - let job = await Job.findOne({ - where: { name: message.jobName }, - attributes: { exclude: ["assetId"] }, - }); - if (job) { - let messageBasedJobs = await MessageBasedJobs.findAll({ - where: { jobId: job.id }, - }); - for (const messageBasedjob of messageBasedJobs) { - this.executeJob({ - jobId: job.id, - jobName: job.name, - jobType: job.jobType, - clusterId: job.cluster_id, - sprayFileName: job.sprayFileName, - sprayDropZone: job.sprayDropZone, - sprayedFileScope: job.sprayedFileScope, - dataflowId: messageBasedjob.dataflowId, - applicationId: messageBasedjob.applicationId, - jobfileName: - job.jobType == "Script" - ? SUBMIT_SCRIPT_JOB_FILE_NAME - : SUBMIT_JOB_FILE_NAME, - }); - } - } else { - logger.warn("📢 COULD NOT FIND JOB WITH NAME " + message.jobName); - } - } catch (err) { - logger.error(err); - } - } - - addJobToScheduler({ skipLog = false, ...jobData }) { - try { - let uniqueJobName = - jobData.jobName + "-" + jobData.dataflowId + "-" + jobData.jobId; - this.createNewBreeJob({ uniqueJobName, ...jobData }); - this.bree.start(uniqueJobName); - - logger.info( - `📢 JOB WAS SCHEDULED AS - ${uniqueJobName}, job-scheduler.js: addJobToScheduler` - ); - !skipLog && this.logBreeJobs(); - - return { success: true }; - } catch (err) { - logger.error(err); - const part2 = err.message.split(" an ")?.[1]; // error message is not user friendly, we will trim it to have everything after "an". - if (part2) err.message = part2; - return { success: false, error: err.message }; - } + //Bree related methods + logBreeJobs() { + return logBreeJobs.call(this); } - createNewBreeJob({ uniqueJobName, cron, @@ -435,469 +143,173 @@ class JobScheduler { title, jobExecutionGroupId, }) { - const job = { - name: uniqueJobName, - path: path.join(__dirname, "jobs", jobfileName), - worker: { - workerData: { - WORKER_CREATED_AT: Date.now(), - dataflowVersionId, - sprayedFileScope, - manualJob_meta, - sprayFileName, - sprayDropZone, - applicationId, - dataflowId, - clusterId, - metaData, - jobName, - contact, - jobType, - status, - jobId, - title, - jobExecutionGroupId, - }, - }, - }; - if (cron) { - job.cron = cron; - job.worker.workerData.isCronJob = true; - } else { - job.timeout = 0; - job.worker.workerData.isCronJob = false; - } - this.bree.add(job); - } - // ------------------------------------------------------------------------------------------- - //CREATE - LZ File Monitoring Bree Job - createLandingZoneFileMonitoringBreeJob({ filemonitoring_id, name, cron }) { - const job = { + return createNewBreeJob.call(this, { + uniqueJobName, cron, - name, - path: path.join( - __dirname, - "jobs", - SUBMIT_LANDINGZONE_FILEMONITORING_FILE_NAME - ), - worker: { - workerData: { filemonitoring_id }, - }, - }; - this.bree.add(job); + jobfileName, + sprayedFileScope, + manualJob_meta, + sprayFileName, + sprayDropZone, + applicationId, + dataflowId, + dataflowVersionId, + clusterId, + metaData, + jobName, + contact, + jobType, + status, + jobId, + title, + jobExecutionGroupId, + }); } - //CREATE - Logical File Monitoring Bree Job - createLogicalFileMonitoringBreeJob({ filemonitoring_id, name, cron }) { - const job = { - cron, - name, - path: path.join( - __dirname, - "jobs", - SUBMIT_LOGICAL_FILEMONITORING_FILE_NAME - ), - worker: { - workerData: { filemonitoring_id }, - }, - }; - this.bree.add(job); - } // --------------------------------------------------------------------------------------------- - createSuperFileMonitoringBreeJob({ filemonitoring_id, cron }) { - const uniqueJobName = `Superfile Monitoring - ${filemonitoring_id}`; - const job = { - cron, - name: uniqueJobName, - path: path.join(__dirname, "jobs", SUBMIT_SUPER_FILEMONITORING_FILE_NAME), - worker: { - workerData: { filemonitoring_id }, - }, - }; - this.bree.add(job); - this.bree.start(uniqueJobName); + async removeJobFromScheduler(name) { + return await removeJobFromScheduler.call(this, name); } - async scheduleSuperFileMonitoringOnServerStart() { - try { - logger.info("📺 SUPER FILE MONITORING STARTED ..."); - const superfileMonitoring = await filemonitoring_superfile.findAll({ - raw: true, - }); - for (let monitoring of superfileMonitoring) { - const { id, cron, monitoringActive } = monitoring; - if (monitoringActive) { - this.createSuperFileMonitoringBreeJob({ - filemonitoring_id: id, - cron, - }); - } - } - } catch (err) { - logger.error(err); - } + + async removeAllFromBree() { + return await removeAllFromBree.call(this); } - // -------------------------------------------------------------------------------------------- - // SCHEDULE - LZ File Monitoring Bree Job - async scheduleFileMonitoringBreeJob({ - filemonitoring_id, - name, - cron, - monitoringAssetType, + getAllJobs() { + return getAllJobs.call(this); + } + + async stopJob(jobName) { + return await stopJob.call(this, jobName); + } + + async stopAllJobs() { + return await stopAllJobs.call(this); + } + + startJob(jobName) { + return startJob.call(this, jobName); + } + + startAllJobs() { + return startAllJobs.call(this); + } + + // Jobs in a workflow + scheduleCheckForJobsWithSingleDependency({ + dependsOnJobId, + dataflowId, + dataflowVersionId, + jobExecutionGroupId, }) { - const uniqueName = `${name}-${filemonitoring_id}`; - if (monitoringAssetType === "landingZoneFile") { - this.createLandingZoneFileMonitoringBreeJob({ - filemonitoring_id, - name: uniqueName, - cron, - }); - this.bree.start(uniqueName); // Starts the recently added bree job - } else if (monitoringAssetType === "logicalFiles") { - this.createLogicalFileMonitoringBreeJob({ - filemonitoring_id, - name: uniqueName, - cron, - }); - this.bree.start(uniqueName); - } else if (monitoringAssetType === "superFiles") { - this.createSuperFileMonitoringBreeJob({ - filemonitoring_id, - name: uniqueName, - cron, - }); - } + scheduleCheckForJobsWithSingleDependency.call(this, { + dependsOnJobId, + dataflowId, + dataflowVersionId, + jobExecutionGroupId, + }); } - // When server starts - check file monitoring and add all those jobs to bree - async scheduleFileMonitoringOnServerStart() { - try { - const activeLandingZoneFileMonitoring = await FileMonitoring.findAll({ - where: { - monitoringActive: true, - // monitoringAssetType: "landingZoneFile", - }, - raw: true, - }); - for (const monitoring of activeLandingZoneFileMonitoring) { - await this.scheduleFileMonitoringBreeJob({ - filemonitoring_id: monitoring.id, - name: monitoring.name, - cron: monitoring.cron, - monitoringAssetType: monitoring.monitoringAssetType, - }); - } - } catch (err) { - logger.error(err); - } + executeJob(jobData) { + return executeJob.call(this, jobData); } - // When server starts - start key monitoring job - async scheduleKeyCheck() { - try { - let jobName = "key-check-" + new Date().getTime(); - this.bree.add({ - name: jobName, - interval: "at 05:30am also at 05:30pm", - path: path.join(__dirname, "jobs", APIKEY_MONITORING), - worker: { - workerData: { - jobName: jobName, - WORKER_CREATED_AT: Date.now(), - }, - }, - }); - - this.bree.start(jobName); - logger.info("📺 KEY MONITORING STARTED ..."); - } catch (err) { - logger.error(err); - } + scheduleActiveCronJobs() { + return scheduleActiveCronJobs.call(this); } - // --------------------------------------------------------------------------------------------- - // Cluster monitoring bree job - createClusterMonitoringBreeJob({ clusterMonitoring_id, cron }) { - const uniqueJobName = `Cluster Monitoring - ${clusterMonitoring_id}`; - const job = { - cron, - name: uniqueJobName, - path: path.join(__dirname, "jobs", SUBMIT_CLUSTER_MONITORING_JOB), - worker: { - workerData: { clusterMonitoring_id }, - }, - }; - this.bree.add(job); - this.bree.start(uniqueJobName); + scheduleMessageBasedJobs(message) { + return scheduleMessageBasedJobs.call(this, message); } - async scheduleClusterMonitoringOnServerStart() { - try { - logger.info("📺 CLUSTER MONITORING STARTED ..."); - const clusterMonitoring = await ClusterMonitoring.findAll({ raw: true }); - for (let monitoring of clusterMonitoring) { - const { id, cron, isActive } = monitoring; - if (isActive) { - this.createClusterMonitoringBreeJob({ - clusterMonitoring_id: id, - cron, - }); - } - } - } catch (err) { - logger.error(err); - } + addJobToScheduler(jobData) { + return addJobToScheduler.call(this, jobData); } - // -------------------------------------------------------------------------------------------- - // Job monitoring bree job - createJobMonitoringBreeJob({ jobMonitoring_id, cron }) { - const uniqueJobName = `Job Monitoring - ${jobMonitoring_id}`; - const job = { - cron, - name: uniqueJobName, - path: path.join(__dirname, "jobs", JOB_MONITORING), - worker: { - jobMonitoring_id, - }, - }; - this.bree.add(job); - this.bree.start(uniqueJobName); + + // Cluster jobs + scheduleClusterTimezoneOffset() { + return scheduleClusterTimezoneOffset.call(this); + } + createClusterUsageHistoryJob() { + return createClusterUsageHistoryJob.call(this); } - async scheduleJobMonitoringOnServerStart() { - try { - logger.info("🕗 JOB MONITORING STARTED ..."); - const jobMonitoring = await JobMonitoring.findAll({ raw: true }); - for (let monitoring of jobMonitoring) { - const { id, cron, isActive } = monitoring; - if (isActive) { - this.createJobMonitoringBreeJob({ - jobMonitoring_id: id, - cron, - }); - } - } - } catch (err) { - logger.error(err); - } + createClusterMonitoringBreeJob({ clusterMonitoring_id, cron }) { + return createClusterMonitoringBreeJob.call(this, { + clusterMonitoring_id, + cron, + }); } - // -------------------------------------------------------------------------------------------- - // Cluster monitoring bree job - async createClusterUsageHistoryJob(){ - const uniqueJobName = `Cluster Usage History Tracker`; - const job = { - interval: 14400000, // 4 hours - name: uniqueJobName, - path: path.join(__dirname, "jobs", CLUSTER_USAGE_HISTORY_TRACKER), - }; - this.bree.add(job); - this.bree.start(uniqueJobName); - logger.info("📈 CLUSTER USAGE HISTORY TRACKER JOB STARTED ..."); -} - // -------------------------------------------------------------------------------------------- + scheduleClusterMonitoringOnServerStart() { + return scheduleClusterMonitoringOnServerStart.call(this); + } - async removeJobFromScheduler(name) { - try { - const existingJob = this.bree.config.jobs.find( - (job) => job.name === name - ); - if (existingJob) { - await this.bree.remove(name); - logger.info(`📢 -Job removed from Bree ${existingJob.name}`); - return { success: true, job: existingJob, jobs: this.bree.config.jobs }; - } - } catch (err) { - logger.error(err); - return { - success: false, - message: err.message, - jobs: this.bree.config.jobs, - }; - } + // Job Monitoring + createJobMonitoringBreeJob(jobMonitoring_id, cron) { + return createJobMonitoringBreeJob.call(this, jobMonitoring_id, cron); } - async removeAllFromBree(namePart) { - try { - const existingJobs = this.bree.config.jobs.filter((job) => - job.name.includes(namePart) - ); - if (existingJobs.length > 0) { - for (const job of existingJobs) { - try { - await this.bree.remove(job.name); - logger.info(`📢 -Job removed from Bree ${job.name}`); - } catch (error) { - logger.error(error); - } - } - } - } catch (err) { - logger.error(err); - } + scheduleJobMonitoringOnServerStart() { + return scheduleJobMonitoringOnServerStart.call(this); } - async scheduleJobStatusPolling() { - logger.info("📢 STATUS POLLING SCHEDULER STARTED..."); - - try { - let jobName = "job-status-poller-" + new Date().getTime(); - - this.bree.add({ - name: jobName, - interval: "20s", - path: path.join(__dirname, "jobs", JOB_STATUS_POLLER), - worker: { - workerData: { - jobName: jobName, - WORKER_CREATED_AT: Date.now(), - }, - }, - }); - - this.bree.start(jobName); - } catch (err) { - logger.error(err); - } + scheduleJobStatusPolling() { + return scheduleJobStatusPolling.call(this); } - // FILE MONITORING POLLER - async scheduleFileMonitoring() { - logger.info("📂 FILE MONITORING STARTED ..."); - try { - let jobName = "file-monitoring-" + new Date().getTime(); - this.bree.add({ - name: jobName, - interval: "500s", - path: path.join(__dirname, "jobs", FILE_MONITORING), - worker: { - workerData: { - jobName: jobName, - WORKER_CREATED_AT: Date.now(), - }, - }, - }); - - this.bree.start(jobName); - } catch (err) { - logger.error(err); - } + // file Monitoring + createLandingZoneFileMonitoringBreeJob({ filemonitoring_id, name, cron }) { + return createLandingZoneFileMonitoringBreeJob.call(this, { + filemonitoring_id, + name, + cron, + }); } - // Cluster Timezone Offset Checker - async scheduleClusterTimezoneOffset() { - logger.info("☸ CLUSTER TIMEZONE OFFSET STARTED ..."); - try { - let jobName = "cluster-timezone-offset-" + new Date().getTime(); - this.bree.add({ - name: jobName, - interval: "at 02:30am also at 02:30pm", - path: path.join(__dirname, "jobs", CLUSTER_TIMEZONE_OFFSET), - worker: { - workerData: { - jobName: jobName, - WORKER_CREATED_AT: Date.now(), - }, - }, - }); - - this.bree.start(jobName); - } catch (err) { - logger.error(err); - } + createLogicalFileMonitoringBreeJob({ filemonitoring_id, name, cron }) { + return createLogicalFileMonitoringBreeJob.call(this, { + filemonitoring_id, + name, + cron, + }); } - logBreeJobs() { - if (process.env.NODE_ENV === "production") return; //do not polute logs during production; - const jobs = this.bree.config.jobs; - logger.verbose("📢 Bree jobs:"); - for (const job of jobs) { - if (job.name.includes("job-status-poller")) continue; // hide status poller from logs - if (job.name.includes("file-monitoring")) continue; // hide file monitoring from logs - logger.verbose({ - name: job.name, - cron: job.cron, - jobName: job.worker?.workerData?.jobName, - dataflowId: job.worker?.workerData?.dataflowId, - dataflowVersionId: job.worker?.workerData?.dataflowVersionId, - group: job.worker?.workerData?.jobExecutionGroupId, - }); - } + createSuperFileMonitoringBreeJob({ filemonitoring_id, cron }) { + return createSuperFileMonitoringBreeJob.call(this, { + filemonitoring_id, + cron, + }); } - getAllJobs() { - return this.bree.config.jobs; + scheduleSuperFileMonitoringOnServerStart() { + return scheduleSuperFileMonitoringOnServerStart.call(this); } - async stopJob(jobName) { - const job = this.bree.config.jobs.find((job) => job.name === jobName); - try { - if (job) { - await this.bree.stop(jobName); - return { success: true, job, jobs: this.bree.config.jobs }; - } else { - return { - success: false, - message: "job is not found", - jobs: this.bree.config.jobs, - }; - } - } catch (err) { - return { - success: false, - message: err.message, - jobs: this.bree.config.jobs, - }; - } + scheduleFileMonitoringBreeJob({ + filemonitoring_id, + name, + cron, + monitoringAssetType, + }) { + return scheduleFileMonitoringBreeJob.call(this, { + filemonitoring_id, + name, + cron, + monitoringAssetType, + }); } - async stopAllJobs() { - try { - const allJobs = [...this.bree.config.jobs]; - await this.bree.stop(); - return { success: true, jobs: allJobs }; - } catch (err) { - return { - success: false, - message: err.message, - jobs: this.bree.config.jobs, - }; - } + scheduleFileMonitoringOnServerStart() { + return scheduleFileMonitoringOnServerStart.call(this); } - startJob(jobName) { - const job = this.bree.config.jobs.find((job) => job.name === jobName); - try { - if (job) { - this.bree.start(jobName); - return { success: true, job, jobs: this.bree.config.jobs }; - } else { - return { - success: false, - message: "job is not found", - jobs: this.bree.config.jobs, - }; - } - } catch (err) { - return { - success: false, - message: err.message, - jobs: this.bree.config.jobs, - }; - } + scheduleFileMonitoring() { + return scheduleFileMonitoring.call(this); } - startAllJobs() { - try { - const allJobs = [...this.bree.config.jobs]; - this.bree.start(); - return { success: true, jobs: allJobs }; - } catch (err) { - return { - success: false, - message: err.message, - jobs: this.bree.config.jobs, - }; - } + // API keys check + scheduleKeyCheck() { + return scheduleKeyCheck.call(this); } } diff --git a/server/jobSchedularMethods/apiKeys.js b/server/jobSchedularMethods/apiKeys.js new file mode 100644 index 000000000..1a421d3f9 --- /dev/null +++ b/server/jobSchedularMethods/apiKeys.js @@ -0,0 +1,40 @@ +const path = require("path"); + +const logger = require("../config/logger"); + +const APIKEY_MONITORING = "submitApiKeyMonitoring.js"; + +/** + * Schedule key check job. + * This function adds a job to the job scheduler to check the keys at specific intervals. + * @async + * @function scheduleKeyCheck + * @memberof module:jobSchedularMethods/apiKeys + * @throws {Error} If an error occurs while scheduling the job. + */ + +async function scheduleKeyCheck() { + try { + let jobName = "key-check-" + new Date().getTime(); + this.bree.add({ + name: jobName, + interval: "at 05:30am also at 05:30pm", + path: path.join(__dirname, "..", "jobs", APIKEY_MONITORING), + worker: { + workerData: { + jobName: jobName, + WORKER_CREATED_AT: Date.now(), + }, + }, + }); + + this.bree.start(jobName); + logger.info("📺 KEY MONITORING STARTED ..."); + } catch (err) { + logger.error(err); + } +} + +module.exports = { + scheduleKeyCheck, +}; diff --git a/server/jobSchedularMethods/breeJobs.js b/server/jobSchedularMethods/breeJobs.js new file mode 100644 index 000000000..7a981b0ad --- /dev/null +++ b/server/jobSchedularMethods/breeJobs.js @@ -0,0 +1,203 @@ +const logger = require("../config/logger"); +const path = require("path"); + +function createNewBreeJob({ + uniqueJobName, + cron, + jobfileName, + sprayedFileScope, + manualJob_meta, + sprayFileName, + sprayDropZone, + applicationId, + dataflowId, + dataflowVersionId = null, + clusterId, + metaData, + jobName, + contact, + jobType, + status, + jobId, + title, + jobExecutionGroupId, +}) { + const job = { + name: uniqueJobName, + path: path.join(__dirname, "..", "jobs", jobfileName), + worker: { + workerData: { + WORKER_CREATED_AT: Date.now(), + dataflowVersionId, + sprayedFileScope, + manualJob_meta, + sprayFileName, + sprayDropZone, + applicationId, + dataflowId, + clusterId, + metaData, + jobName, + contact, + jobType, + status, + jobId, + title, + jobExecutionGroupId, + }, + }, + }; + if (cron) { + job.cron = cron; + job.worker.workerData.isCronJob = true; + } else { + job.timeout = 0; + job.worker.workerData.isCronJob = false; + } + this.bree.add(job); +} + +async function removeJobFromScheduler(name) { + try { + const existingJob = this.bree.config.jobs.find((job) => job.name === name); + if (existingJob) { + await this.bree.remove(name); + logger.info(`📢 -Job removed from Bree ${existingJob.name}`); + return { success: true, job: existingJob, jobs: this.bree.config.jobs }; + } + } catch (err) { + // Handle error + logger.error(err); + return { + success: false, + message: err.message, + jobs: this.bree.config.jobs, + }; + } +} + +async function removeAllFromBree(namePart) { + try { + const existingJobs = this.bree.config.jobs.filter((job) => + job.name.includes(namePart) + ); + if (existingJobs.length > 0) { + for (const job of existingJobs) { + try { + await this.bree.remove(job.name); + logger.info(`📢 -Job removed from Bree ${job.name}`); + } catch (error) { + logger.error(error); + } + } + } + } catch (err) { + logger.error(err); + } +} + +function getAllJobs() { + return this.bree.config.jobs; +} + +async function stopJob(jobName) { + const job = this.bree.config.jobs.find((job) => job.name === jobName); + try { + if (job) { + await this.bree.stop(jobName); + return { success: true, job, jobs: this.bree.config.jobs }; + } else { + return { + success: false, + message: "job is not found", + jobs: this.bree.config.jobs, + }; + } + } catch (err) { + return { + success: false, + message: err.message, + jobs: this.bree.config.jobs, + }; + } +} + +async function stopAllJobs() { + try { + const allJobs = [...this.bree.config.jobs]; + await this.bree.stop(); + return { success: true, jobs: allJobs }; + } catch (err) { + return { + success: false, + message: err.message, + jobs: this.bree.config.jobs, + }; + } +} + +function startJob(jobName) { + const job = this.bree.config.jobs.find((job) => job.name === jobName); + try { + if (job) { + this.bree.start(jobName); + return { success: true, job, jobs: this.bree.config.jobs }; + } else { + return { + success: false, + message: "job is not found", + jobs: this.bree.config.jobs, + }; + } + } catch (err) { + return { + success: false, + message: err.message, + jobs: this.bree.config.jobs, + }; + } +} + +function startAllJobs() { + try { + const allJobs = [...this.bree.config.jobs]; + this.bree.start(); + return { success: true, jobs: allJobs }; + } catch (err) { + return { + success: false, + message: err.message, + jobs: this.bree.config.jobs, + }; + } +} + +function logBreeJobs() { + if (process.env.NODE_ENV === "production") return; //do not polute logs during production; + const jobs = this.bree.config.jobs; + logger.verbose("📢 Bree jobs:"); + for (const job of jobs) { + if (job.name.includes("job-status-poller")) continue; // hide status poller from logs + if (job.name.includes("file-monitoring")) continue; // hide file monitoring from logs + logger.verbose({ + name: job.name, + cron: job.cron, + jobName: job.worker?.workerData?.jobName, + dataflowId: job.worker?.workerData?.dataflowId, + dataflowVersionId: job.worker?.workerData?.dataflowVersionId, + group: job.worker?.workerData?.jobExecutionGroupId, + }); + } +} + +module.exports = { + createNewBreeJob, + removeJobFromScheduler, + removeAllFromBree, + getAllJobs, + stopJob, + stopAllJobs, + startJob, + startAllJobs, + logBreeJobs, +}; diff --git a/server/jobSchedularMethods/clusterJobs.js b/server/jobSchedularMethods/clusterJobs.js new file mode 100644 index 000000000..ac1fdb4e3 --- /dev/null +++ b/server/jobSchedularMethods/clusterJobs.js @@ -0,0 +1,82 @@ +const path = require("path"); + +const logger = require("../config/logger"); +const models = require("../models"); + +const ClusterMonitoring = models.clusterMonitoring; +const CLUSTER_TIMEZONE_OFFSET = "clustertimezoneoffset.js"; +const CLUSTER_USAGE_HISTORY_TRACKER = "submitClusterUsageTracker.js"; +const SUBMIT_CLUSTER_MONITORING_JOB = "submitClusterMonitoring.js"; + +async function scheduleClusterTimezoneOffset() { + logger.info("☸ CLUSTER TIMEZONE OFFSET STARTED ..."); + try { + let jobName = "cluster-timezone-offset-" + new Date().getTime(); + this.bree.add({ + name: jobName, + interval: "at 02:30am also at 02:30pm", + path: path.join(__dirname, "..", "jobs", CLUSTER_TIMEZONE_OFFSET), + worker: { + workerData: { + jobName: jobName, + WORKER_CREATED_AT: Date.now(), + }, + }, + }); + + this.bree.start(jobName); + } catch (err) { + logger.error(err); + } +} + +async function createClusterUsageHistoryJob() { + const uniqueJobName = `Cluster Usage History Tracker`; + const job = { + interval: 14400000, // 4 hours + name: uniqueJobName, + path: path.join(__dirname, "..", "jobs", CLUSTER_USAGE_HISTORY_TRACKER), + }; + this.bree.add(job); + this.bree.start(uniqueJobName); + logger.info("📈 CLUSTER USAGE HISTORY TRACKER JOB STARTED ..."); +} + +function createClusterMonitoringBreeJob({ clusterMonitoring_id, cron }) { + const uniqueJobName = `Cluster Monitoring - ${clusterMonitoring_id}`; + const job = { + cron, + name: uniqueJobName, + path: path.join(__dirname, "jobs", SUBMIT_CLUSTER_MONITORING_JOB), + worker: { + workerData: { clusterMonitoring_id }, + }, + }; + this.bree.add(job); + this.bree.start(uniqueJobName); +} + +async function scheduleClusterMonitoringOnServerStart() { + try { + logger.info("📺 CLUSTER MONITORING STARTED ..."); + const clusterMonitoring = await ClusterMonitoring.findAll({ raw: true }); + for (let monitoring of clusterMonitoring) { + const { id, cron, isActive } = monitoring; + if (isActive) { + this.createClusterMonitoringBreeJob({ + clusterMonitoring_id: id, + cron, + }); + } + } + } catch (err) { + logger.error(err); + } +} + +module.exports = { + scheduleClusterTimezoneOffset, + createClusterUsageHistoryJob, + createClusterMonitoringBreeJob, + scheduleClusterMonitoringOnServerStart, +}; diff --git a/server/jobSchedularMethods/hpccFiles.js b/server/jobSchedularMethods/hpccFiles.js new file mode 100644 index 000000000..5a9af6d65 --- /dev/null +++ b/server/jobSchedularMethods/hpccFiles.js @@ -0,0 +1,219 @@ +const path = require("path"); + +const models = require("../models"); +const logger = require("../config/logger"); + +const SUBMIT_LANDINGZONE_FILEMONITORING_FILE_NAME = + "submitLandingZoneFileMonitoring.js"; +const SUBMIT_LOGICAL_FILEMONITORING_FILE_NAME = + "submitLogicalFileMonitoring.js"; +const SUBMIT_SUPER_FILEMONITORING_FILE_NAME = "submitSuperFileMonitoring.js"; +const FILE_MONITORING = "fileMonitoringPoller.js"; + +const filemonitoring_superfile = models.filemonitoring_superfiles; +const FileMonitoring = models.fileMonitoring; + +function createLandingZoneFileMonitoringBreeJob({ + filemonitoring_id, + name, + cron, +}) { + const job = { + cron, + name, + path: path.join( + __dirname, + "..", + "jobs", + SUBMIT_LANDINGZONE_FILEMONITORING_FILE_NAME + ), + worker: { + workerData: { filemonitoring_id }, + }, + }; + this.bree.add(job); +} + +function createLogicalFileMonitoringBreeJob({ filemonitoring_id, name, cron }) { + const job = { + cron, + name, + path: path.join( + __dirname, + "..", + "jobs", + SUBMIT_LOGICAL_FILEMONITORING_FILE_NAME + ), + worker: { + workerData: { filemonitoring_id }, + }, + }; + this.bree.add(job); +} + +function createSuperFileMonitoringBreeJob({ filemonitoring_id, cron }) { + const uniqueJobName = `Superfile Monitoring - ${filemonitoring_id}`; + const job = { + cron, + name: uniqueJobName, + path: path.join( + __dirname, + "..", + "jobs", + SUBMIT_SUPER_FILEMONITORING_FILE_NAME + ), + worker: { + workerData: { filemonitoring_id }, + }, + }; + this.bree.add(job); + this.bree.start(uniqueJobName); +} + +async function scheduleSuperFileMonitoringOnServerStart() { + try { + logger.info("📺 SUPER FILE MONITORING STARTED ..."); + const superfileMonitoring = await filemonitoring_superfile.findAll({ + raw: true, + }); + for (let monitoring of superfileMonitoring) { + const { id, cron, monitoringActive } = monitoring; + if (monitoringActive) { + this.createSuperFileMonitoringBreeJob({ + filemonitoring_id: id, + cron, + }); + } + } + } catch (err) { + logger.error(err); + } +} + +async function scheduleFileMonitoringBreeJob({ + filemonitoring_id, + name, + cron, + monitoringAssetType, +}) { + const uniqueName = `${name}-${filemonitoring_id}`; + if (monitoringAssetType === "landingZoneFile") { + this.createLandingZoneFileMonitoringBreeJob({ + filemonitoring_id, + name: uniqueName, + cron, + }); + this.bree.start(uniqueName); // Starts the recently added bree job + } else if (monitoringAssetType === "logicalFiles") { + this.createLogicalFileMonitoringBreeJob({ + filemonitoring_id, + name: uniqueName, + cron, + }); + this.bree.start(uniqueName); + } else if (monitoringAssetType === "superFiles") { + this.createSuperFileMonitoringBreeJob({ + filemonitoring_id, + name: uniqueName, + cron, + }); + } +} + +async function scheduleFileMonitoringOnServerStart() { + try { + const activeLandingZoneFileMonitoring = await FileMonitoring.findAll({ + where: { + monitoringActive: true, + // monitoringAssetType: "landingZoneFile", + }, + raw: true, + }); + for (const monitoring of activeLandingZoneFileMonitoring) { + await this.scheduleFileMonitoringBreeJob({ + filemonitoring_id: monitoring.id, + name: monitoring.name, + cron: monitoring.cron, + monitoringAssetType: monitoring.monitoringAssetType, + }); + } + } catch (err) { + logger.error(err); + } +} + +async function scheduleFileMonitoringOnServerStart() { + try { + const activeLandingZoneFileMonitoring = await FileMonitoring.findAll({ + where: { + monitoringActive: true, + // monitoringAssetType: "landingZoneFile", + }, + raw: true, + }); + for (const monitoring of activeLandingZoneFileMonitoring) { + await this.scheduleFileMonitoringBreeJob({ + filemonitoring_id: monitoring.id, + name: monitoring.name, + cron: monitoring.cron, + monitoringAssetType: monitoring.monitoringAssetType, + }); + } + } catch (err) { + logger.error(err); + } +} + +async function scheduleFileMonitoringOnServerStart() { + try { + const activeLandingZoneFileMonitoring = await FileMonitoring.findAll({ + where: { + monitoringActive: true, + // monitoringAssetType: "landingZoneFile", + }, + raw: true, + }); + for (const monitoring of activeLandingZoneFileMonitoring) { + await this.scheduleFileMonitoringBreeJob({ + filemonitoring_id: monitoring.id, + name: monitoring.name, + cron: monitoring.cron, + monitoringAssetType: monitoring.monitoringAssetType, + }); + } + } catch (err) { + logger.error(err); + } +} + +async function scheduleFileMonitoring() { + logger.info("📂 FILE MONITORING STARTED ..."); + try { + let jobName = "file-monitoring-" + new Date().getTime(); + this.bree.add({ + name: jobName, + interval: "500s", + path: path.join(__dirname, "..", "jobs", FILE_MONITORING), + worker: { + workerData: { + jobName: jobName, + WORKER_CREATED_AT: Date.now(), + }, + }, + }); + + this.bree.start(jobName); + } catch (err) { + logger.error(err); + } +} + +module.exports = { + createLandingZoneFileMonitoringBreeJob, + createLogicalFileMonitoringBreeJob, + createSuperFileMonitoringBreeJob, + scheduleSuperFileMonitoringOnServerStart, + scheduleFileMonitoringBreeJob, + scheduleFileMonitoringOnServerStart, + scheduleFileMonitoring, +}; diff --git a/server/jobSchedularMethods/hpccJobs.js b/server/jobSchedularMethods/hpccJobs.js new file mode 100644 index 000000000..ce83274ba --- /dev/null +++ b/server/jobSchedularMethods/hpccJobs.js @@ -0,0 +1,71 @@ +const path = require("path"); + +const models = require("../models"); +const logger = require("../config/logger"); + +const JobMonitoring = models.jobMonitoring; +const JOB_MONITORING = "submitJobMonitoring.js"; +const JOB_STATUS_POLLER = "statusPoller.js"; +const ROUTINE_JOBS = "routineJobs.js"; + +function createJobMonitoringBreeJob({ jobMonitoring_id, cron }) { + const uniqueJobName = `Job Monitoring - ${jobMonitoring_id}`; + const job = { + cron, + name: uniqueJobName, + path: path.join(__dirname, "..", "jobs", JOB_MONITORING), + worker: { + jobMonitoring_id, + }, + }; + this.bree.add(job); + this.bree.start(uniqueJobName); +} + +async function scheduleJobMonitoringOnServerStart() { + try { + logger.info("🕗 JOB MONITORING STARTED ..."); + const jobMonitoring = await JobMonitoring.findAll({ raw: true }); + for (let monitoring of jobMonitoring) { + const { id, cron, isActive } = monitoring; + if (isActive) { + this.createJobMonitoringBreeJob({ + jobMonitoring_id: id, + cron, + }); + } + } + } catch (err) { + logger.error(err); + } +} + +async function scheduleJobStatusPolling() { + logger.info("📢 STATUS POLLING SCHEDULER STARTED..."); + + try { + let jobName = "job-status-poller-" + new Date().getTime(); + + this.bree.add({ + name: jobName, + interval: "20s", + path: path.join(__dirname, "..", "jobs", JOB_STATUS_POLLER), + worker: { + workerData: { + jobName: jobName, + WORKER_CREATED_AT: Date.now(), + }, + }, + }); + + this.bree.start(jobName); + } catch (err) { + logger.error(err); + } +} + +module.exports = { + createJobMonitoringBreeJob, + scheduleJobMonitoringOnServerStart, + scheduleJobStatusPolling, +}; diff --git a/server/jobSchedularMethods/routineJobsMonitoring.js b/server/jobSchedularMethods/routineJobsMonitoring.js new file mode 100644 index 000000000..b46acae77 --- /dev/null +++ b/server/jobSchedularMethods/routineJobsMonitoring.js @@ -0,0 +1,47 @@ +const path = require("path"); +const logger = require("../config/logger"); + +const FLAG_JOBS_TO_RUN_TODAY = "flagJobsToRunToday.js"; +const ROUTINE_JOBS_STATUS = "routineJobsStatus.js"; + +async function flagJobsToRunToday() { + const uniqueJobName = `Flag Jobs To Run Today`; + const job = { + cron: "0 0 * * *", + name: uniqueJobName, + path: path.join(__dirname, "..", "jobs", FLAG_JOBS_TO_RUN_TODAY), + }; + this.bree.add(job); + this.bree.start(uniqueJobName); + logger.info("⛳ FLAGGING JOBS THAT ARE SCHEDULED FOR TODAY ..."); +} + +// Check if a jobs that are supposed to run today have already run today. +async function checkRoutineJobsStatus() { + logger.info("📢 POOLING STARTED FOR ROUTINE JOBS"); + + try { + let jobName = "routine-Jobs-poller-" + new Date().getTime(); + + this.bree.add({ + name: jobName, + interval: "20s", //TODO - Increase to few hrs + path: path.join(__dirname, "..", "jobs", ROUTINE_JOBS_STATUS), + worker: { + workerData: { + jobName: jobName, + WORKER_CREATED_AT: Date.now(), + }, + }, + }); + + this.bree.start(jobName); + } catch (err) { + logger.error(err); + } +} + +module.exports = { + flagJobsToRunToday, + checkRoutineJobsStatus, +}; diff --git a/server/jobSchedularMethods/workFlowJobs.js b/server/jobSchedularMethods/workFlowJobs.js new file mode 100644 index 000000000..24638ed8c --- /dev/null +++ b/server/jobSchedularMethods/workFlowJobs.js @@ -0,0 +1,332 @@ +const { v4: uuidv4 } = require("uuid"); + +const models = require("../models"); +const logger = require("../config/logger"); +const workflowUtil = require("../utils/workflow-util"); + +const SUBMIT_JOB_FILE_NAME = "submitJob.js"; +const SUBMIT_SPRAY_JOB_FILE_NAME = "submitSprayJob.js"; +const SUBMIT_SCRIPT_JOB_FILE_NAME = "submitScriptJob.js"; +const SUBMIT_MANUAL_JOB_FILE_NAME = "submitManualJob.js"; +const SUBMIT_GITHUB_JOB_FILE_NAME = "submitGithubJob.js"; +const SUBMIT_QUERY_PUBLISH = "submitQueryPublish.js"; + +const DataflowVersions = models.dataflow_versions; +const JobExecution = models.job_execution; +const Job = models.job; +const MessageBasedJobs = models.message_based_jobs; + +async function scheduleCheckForJobsWithSingleDependency({ + dependsOnJobId, + dataflowId, + dataflowVersionId, + jobExecutionGroupId, +}) { + try { + const dataflowVersion = await DataflowVersions.findOne({ + where: { id: dataflowVersionId }, + attributes: ["graph"], + }); + if (!dataflowVersion) throw new Error("Dataflow version does not exist"); + + let dependentJobs = dataflowVersion.graph.cells.reduce((acc, cell) => { + if (cell?.data?.schedule?.dependsOn?.includes(dependsOnJobId)) + acc.push({ jobId: cell.data.assetId }); + return acc; + }, []); + + if (dependentJobs.length === 0 && dataflowId) { + try { + logger.info( + "WORKFLOW EXECUTION COMPLETE, Checking if subscribed for notifications." + ); + await workflowUtil.notifyWorkflow({ + dataflowId, + jobExecutionGroupId, + status: "completed", + }); + } catch (error) { + logger.error("WORKFLOW EXECUTION COMPLETE NOTIFICATION FAILED", error); + } + } else { + logger.verbose(`✔️ FOUND ${dependentJobs.length} DEPENDENT JOB/S`); + //List of dependent job ids + let dependentJobsIds = dependentJobs.map((job) => job.jobId); + //Check if any of the dependent job are already in submitted state + const activeJobs = await JobExecution.findAll({ + where: { + dataflowId: dataflowId, + jobId: dependentJobsIds, + status: ["submitted", "blocked"], + }, + attributes: ["jobId"], + raw: true, + }); + const activeJobIds = activeJobs.map((activeJob) => activeJob.jobId); + //Remove already submitted jobs from dependent jobs array + dependentJobs = dependentJobs.filter( + (dependentJob) => !activeJobIds.includes(dependentJob.jobId) + ); + + for (const dependentJob of dependentJobs) { + try { + let job = await Job.findOne({ where: { id: dependentJob.jobId } }); + let status; + const isSprayJob = job.jobType == "Spray"; + const isScriptJob = job.jobType == "Script"; + const isManualJob = job.jobType === "Manual"; + const isQueryPublishJob = job.jobType === "Query Publish"; + + const isGitHubJob = job.metaData?.isStoredOnGithub; + + logger.info( + `🔄 EXECUTING DEPENDANT JOB "${job.name}" id:${job.id}; dflow: ${dataflowId};` + ); + + const commonWorkerData = { + applicationId: job.application_id, + clusterId: job.cluster_id, + dataflowId: dataflowId, + jobExecutionGroupId, + jobType: job.jobType, + dataflowVersionId, + jobName: job.name, + title: job.title, + jobId: job.id, + }; + + if (isSprayJob) { + status = this.executeJob({ + ...commonWorkerData, + sprayFileName: job.sprayFileName, + sprayDropZone: job.sprayDropZone, + sprayedFileScope: job.sprayedFileScope, + jobfileName: SUBMIT_SPRAY_JOB_FILE_NAME, + }); + } else if (isScriptJob) { + status = this.executeJob({ + jobfileName: SUBMIT_SCRIPT_JOB_FILE_NAME, + ...commonWorkerData, + }); + } else if (isManualJob) { + status = this.executeJob({ + ...commonWorkerData, + status: "wait", + jobfileName: SUBMIT_MANUAL_JOB_FILE_NAME, + manualJob_meta: { + jobType: "Manual", + jobName: job.name, + notifiedTo: job.contact, + }, + }); + } else if (isGitHubJob) { + status = this.executeJob({ + ...commonWorkerData, + metaData: job.metaData, + jobfileName: SUBMIT_GITHUB_JOB_FILE_NAME, + }); + } else if (isQueryPublishJob) { + status = this.executeJob({ + jobfileName: SUBMIT_QUERY_PUBLISH, + ...commonWorkerData, + }); + } else { + status = this.executeJob({ + jobfileName: SUBMIT_JOB_FILE_NAME, + ...commonWorkerData, + }); + } + if (!status.success) throw status; + } catch (error) { + // failed to execute dependent job through bree. User will be notified inside worker + logger.error("Failed to execute dependent job through bree", error); + } + } + } + } catch (error) { + logger.error(error); + const message = `Error happened while trying to execute workflow, try to 'Save version' and execute it again. | Error: ${error.message} `; + await workflowUtil.notifyWorkflow({ + dataflowId, + jobExecutionGroupId, + status: "error", + exceptions: message, + }); + } +} + +function executeJob(jobData) { + try { + let uniqueJobName = + jobData.jobName + + "-" + + jobData.dataflowId + + "-" + + jobData.jobId + + "-" + + uuidv4(); + this.createNewBreeJob({ ...jobData, uniqueJobName }); + this.bree.start(uniqueJobName); + logger.info(`✔️ BREE HAS STARTED JOB: "${uniqueJobName}"`); + this.logBreeJobs(); + + return { + success: true, + message: `Successfully executed ${jobData.jobName}`, + }; + } catch (err) { + logger.error(err); + return { + success: false, + contact: jobData.contact, + jobName: jobData.jobName, + clusterId: jobData.clusterId, + dataflowId: jobData.dataflowId, + message: `Error executing ${jobName} - ${err.message}`, + }; + } +} + +async function scheduleActiveCronJobs() { + try { + // get all active graphs + const dataflowsVersions = await DataflowVersions.findAll({ + where: { isLive: true }, + attributes: ["id", "graph", "dataflowId"], + }); + + for (const dataflowsVersion of dataflowsVersions) { + const cronScheduledNodes = + dataflowsVersion.graph?.cells?.filter( + (cell) => cell.data?.schedule?.cron + ) || []; + if (cronScheduledNodes.length > 0) { + for (const node of cronScheduledNodes) { + try { + const job = await Job.findOne({ + where: { id: node.data.assetId }, + }); + if (!job) throw new Error(`Failed to schedule job ${job.name}`); + + const isSprayJob = job.jobType == "Spray"; + const isScriptJob = job.jobType == "Script"; + const isManualJob = job.jobType === "Manual"; + const isGitHubJob = job.metaData?.isStoredOnGithub; + + const workerData = { + dataflowVersionId: dataflowsVersion.id, + dataflowId: dataflowsVersion.dataflowId, + applicationId: job.application_id, + cron: node.data.schedule.cron, + clusterId: job.cluster_id, + jobType: job.jobType, + jobName: job.name, + title: job.title, + jobId: job.id, + skipLog: true, + }; + + workerData.jobfileName = SUBMIT_JOB_FILE_NAME; + + if (isScriptJob) + workerData.jobfileName = SUBMIT_SCRIPT_JOB_FILE_NAME; + if (isSprayJob) { + workerData.jobfileName = SUBMIT_SPRAY_JOB_FILE_NAME; + workerData.sprayedFileScope = job.sprayedFileScope; + workerData.sprayFileName = job.sprayFileName; + workerData.sprayDropZone = job.sprayDropZone; + } + if (isManualJob) { + workerData.manualJob_meta = { + jobType: "Manual", + jobName: job.name, + notifiedTo: job.contact, + notifiedOn: new Date().getTime(), + }; + workerData.jobfileName = SUBMIT_MANUAL_JOB_FILE_NAME; + workerData.contact = job.contact; + } + if (isGitHubJob) { + workerData.jobfileName = SUBMIT_GITHUB_JOB_FILE_NAME; + workerData.metaData = job.metaData; + } + // finally add the job to the scheduler + this.addJobToScheduler(workerData); + } catch (error) { + logger.error(error); + } + } + } + } + } catch (error) { + logger.error(error); + } + logger.verbose( + `📢 ACTIVE CRON JOBS (${this.bree.config.jobs.length}) (does not include dependent jobs):` + ); + this.logBreeJobs(); +} + +async function scheduleMessageBasedJobs(message) { + try { + let job = await Job.findOne({ + where: { name: message.jobName }, + attributes: { exclude: ["assetId"] }, + }); + if (job) { + let messageBasedJobs = await MessageBasedJobs.findAll({ + where: { jobId: job.id }, + }); + for (const messageBasedjob of messageBasedJobs) { + this.executeJob({ + jobId: job.id, + jobName: job.name, + jobType: job.jobType, + clusterId: job.cluster_id, + sprayFileName: job.sprayFileName, + sprayDropZone: job.sprayDropZone, + sprayedFileScope: job.sprayedFileScope, + dataflowId: messageBasedjob.dataflowId, + applicationId: messageBasedjob.applicationId, + jobfileName: + job.jobType == "Script" + ? SUBMIT_SCRIPT_JOB_FILE_NAME + : SUBMIT_JOB_FILE_NAME, + }); + } + } else { + logger.warn("📢 COULD NOT FIND JOB WITH NAME " + message.jobName); + } + } catch (err) { + logger.error(err); + } +} + +function addJobToScheduler({ skipLog = false, ...jobData }) { + try { + let uniqueJobName = + jobData.jobName + "-" + jobData.dataflowId + "-" + jobData.jobId; + this.createNewBreeJob({ uniqueJobName, ...jobData }); + this.bree.start(uniqueJobName); + + logger.info( + `📢 JOB WAS SCHEDULED AS - ${uniqueJobName}, job-scheduler.js: addJobToScheduler` + ); + !skipLog && this.logBreeJobs(); + + return { success: true }; + } catch (err) { + logger.error(err); + const part2 = err.message.split(" an ")?.[1]; // error message is not user friendly, we will trim it to have everything after "an". + if (part2) err.message = part2; + return { success: false, error: err.message }; + } +} + +module.exports = { + scheduleCheckForJobsWithSingleDependency, + executeJob, + scheduleActiveCronJobs, + scheduleMessageBasedJobs, + addJobToScheduler, +};