Skip to content

Commit

Permalink
Merge pull request #685 from hpcc-systems/yadhap/decoupling-notificat…
Browse files Browse the repository at this point in the history
…ion-flow

Decoupling Notification Flow
  • Loading branch information
FancMa01 authored Feb 1, 2024
2 parents d5fe218 + 2cbf02b commit 8855868
Show file tree
Hide file tree
Showing 16 changed files with 1,168 additions and 16 deletions.
61 changes: 61 additions & 0 deletions server/config/emailConfig.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
1. The send Email function treats emails as sent if the SMTP server accepts the email for delivery.
2. It does not guarantee that the email will be delivered to the recipient's inbox.
For example - if the user provided email is correctly formatted but not a valid email address,
the SMTP server will accept the email for delivery but will not deliver it to the recipient's inbox.
3. To Debug email delivery issues, first check notification_queue table. if it does not exists there check table that stores sent notification.
Next application Logs and SMTP server logs if available.
*/

//Packages imports
const nodemailer = require("nodemailer");

//Local imports
const logger = require("./logger");

// SMTP configuration
const smtpConfig = {
host: process.env.EMAIL_SMTP_HOST,
port: process.env.EMAIL_PORT,
secure: false, // use SSL,
tls: { rejectUnauthorized: false },
sender: process.env.EMAIL_SENDER,
timeout: 10000, // in milliseconds
debug: true, // set debug to true to see debug logs
};


//Create transporter
const transporter = nodemailer.createTransport(smtpConfig);

// Send email function
const sendEmail = ({receiver, cc, subject, plainTextBody, htmlBody}) => {
return new Promise((resolve, reject) => {
const mailOptions = {
from: smtpConfig.sender,
to: receiver,
cc: cc,
subject: subject,
text: plainTextBody ? plainTextBody : null,
html: htmlBody ? htmlBody : null,
};
transporter.sendMail(mailOptions, function (error, info) {
if (error) {
reject(error);
}
resolve(info);
});
});
};

// Re-try options
const retryOptions = {
maxRetries: 3,
retryDelays: [1, 2, 3], // in minutes - Exponential backoff strategy
};

// Exports
module.exports = {
sendEmail,
retryOptions,
};
13 changes: 12 additions & 1 deletion server/config/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ const { createLogger, format, transports } = require('winston');
// docs: https://github.com/winstonjs/winston
const isProduction = process.env.NODE_ENV === 'production';

// Print logs in color depending on log type
format.colorize().addColors({
error: "red",
warn: "yellow",
info: "green",
http: "magenta",
});

const getFormat = () =>
format.combine(
isProduction ? format.uncolorize() : format.colorize({ all: true }), // adding or removing colors depending on logs type;
Expand All @@ -27,7 +35,10 @@ let DEFAULT_LOG_LEVEL = 'http';
// Initialize logger
const logger = createLogger({
exitOnError: false,
format: format.combine(format.errors({ stack: true }), format.timestamp()), // this will be common setting for all transports;
format: format.combine(
format.errors({ stack: true }),
format.timestamp()
), // this will be common setting for all transports;
transports: [
new transports.Console({
...common,
Expand Down
17 changes: 12 additions & 5 deletions server/job-scheduler.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const Bree = require("bree");

const logger = require("./config/logger");

const {
logBreeJobs,
createNewBreeJob,
Expand All @@ -13,7 +12,6 @@ const {
startJob,
startAllJobs,
} = require("./jobSchedularMethods/breeJobs.js");

const {
scheduleCheckForJobsWithSingleDependency,
executeJob,
Expand All @@ -27,13 +25,11 @@ const {
createClusterMonitoringBreeJob,
scheduleClusterMonitoringOnServerStart,
} = require("./jobSchedularMethods/clusterJobs.js");

const {
createJobMonitoringBreeJob,
scheduleJobMonitoringOnServerStart,
scheduleJobStatusPolling,
} = require("./jobSchedularMethods/hpccJobs.js");

const {
createLandingZoneFileMonitoringBreeJob,
createLogicalFileMonitoringBreeJob,
Expand All @@ -43,6 +39,8 @@ const {
scheduleFileMonitoringOnServerStart,
scheduleFileMonitoring,
} = require("./jobSchedularMethods/hpccFiles.js");
const { scheduleKeyCheck } = require("./jobSchedularMethods/apiKeys.js");
const {scheduleEmailNotificationProcessing, scheduleTeamsNotificationProcessing} = require("./jobSchedularMethods/notificationJobs.js");

const {
createOrbitMegaphoneJob,
Expand Down Expand Up @@ -121,8 +119,10 @@ class JobScheduler {
await this.scheduleSuperFileMonitoringOnServerStart();
await this.scheduleClusterMonitoringOnServerStart();
await this.scheduleKeyCheck();
await this.scheduleJobMonitoringOnServerStart();
// await this.scheduleJobMonitoringOnServerStart();
await this.createClusterUsageHistoryJob();
await this.scheduleEmailNotificationProcessing();
await this.scheduleTeamsNotificationProcessing();
await this.scheduleOrbitMonitoringOnServerStart();
await this.createOrbitMegaphoneJob();

Expand Down Expand Up @@ -325,6 +325,13 @@ class JobScheduler {
return scheduleKeyCheck.call(this);
}

//Process notification queue
scheduleEmailNotificationProcessing() {
return scheduleEmailNotificationProcessing.call(this);
}
scheduleTeamsNotificationProcessing(){
return scheduleTeamsNotificationProcessing.call(this);
}
//orbit jobs
createOrbitMegaphoneJob() {
return createOrbitMegaphoneJob.call(this);
Expand Down
56 changes: 56 additions & 0 deletions server/jobSchedularMethods/notificationJobs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

const path = require("path");
const logger = require("../config/logger");
const PROCESS_EMAIL_NOTIFICATIONS = path.join("notifications", "processEmailNotifications.js");
const PROCESS_TEAMS_NOTIFICATIONS = path.join("notifications", "processTeamsNotifications.js");

async function scheduleEmailNotificationProcessing() {
try {
let jobName = "email-notification-processing-" + new Date().getTime();
this.bree.add({
name: jobName,
interval: "10s", // Make it 120 seconds in production
path: path.join(__dirname, "..", "jobs", PROCESS_EMAIL_NOTIFICATIONS),
worker: {
workerData: {
jobName: jobName,
WORKER_CREATED_AT: Date.now(),
},
},
});

this.bree.start(jobName);
logger.info("🔔 E-MAIL NOTIFICATION PROCESSING STARTED ...");
} catch (err) {
console.error(err);
}
}

async function scheduleTeamsNotificationProcessing() {
try {
let jobName = "teams-notification-processing-" + new Date().getTime();
this.bree.add({
name: jobName,
interval: "10s", // Make it 120 seconds in production
path: path.join(__dirname, "..", "jobs", PROCESS_TEAMS_NOTIFICATIONS),
worker: {
workerData: {
jobName: jobName,
WORKER_CREATED_AT: Date.now(),
},
},
});

this.bree.start(jobName);
logger.info("🔔 TEAMS NOTIFICATION PROCESSING STARTED ...");
} catch (err) {
console.error(err);
}
}



module.exports = {
scheduleEmailNotificationProcessing,
scheduleTeamsNotificationProcessing,
};
61 changes: 61 additions & 0 deletions server/jobs/notifications/notificationsHelperFunctions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
const path = require("path");
const ejs = require("ejs");
const fs = require("fs");
const models = require("../../models");
const logger = require("../../config/logger");

const NotificationQueue = models.notification_queue;
const { retryOptions: { maxRetries, retryDelays } } = require("../../config/emailConfig");

// Renders HTML template for email notification
const renderEmailBody = ({ notificationOrigin, emailData }) => {
const templatePath = path.join( __dirname, "..", "..", "notificationTemplates","email", `${notificationOrigin}.ejs`);
const template = fs.readFileSync(templatePath, "utf-8")
return ejs.render(template, emailData);
};

// Function to calculate the retryAfter time
const calculateRetryAfter = ({
attemptCount,
retryDelays, // Configs related to emails should not be passed as params
maxRetries,
currentDateTime,
}) => {
if (attemptCount === maxRetries - 1) {
return null;
} else {
return new Date(currentDateTime + retryDelays[attemptCount] * 60000);
}
};

//Update notification queue on error
async function updateNotificationQueueOnError({ notificationId,
attemptCount,
notification,
error
}) {
try {
await NotificationQueue.update(
{
attemptCount: attemptCount + 1,
failureMessage: { err: error.message, notification },
reTryAfter: calculateRetryAfter({
attemptCount,
retryDelays: retryDelays,
maxRetries: maxRetries,
currentDateTime: Date.now(),
}),
},
{ where: { id: notificationId } }
);
} catch (updateError) {
logger.error(updateError);
}
}


module.exports = {
renderEmailBody,
calculateRetryAfter,
updateNotificationQueueOnError,
};
Loading

0 comments on commit 8855868

Please sign in to comment.