Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decoupling Notification Flow #685

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions server/config/emailConfig.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
1. The send Email function treats emails as sent if the SMTP server accepts the email for delivery.
2. It does not guarantee that the email will be delivered to the recipient's inbox.
For example - if the user provided email is correctly formatted but not a valid email address,
the SMTP server will accept the email for delivery but will not deliver it to the recipient's inbox.
3. To Debug email delivery issues, first check notification_queue table. if it does not exists there check table that stores sent notification.
Next application Logs and SMTP server logs if available.
*/

//Packages imports
const nodemailer = require("nodemailer");

//Local imports
const logger = require("./logger");

// SMTP configuration
const smtpConfig = {
host: process.env.EMAIL_SMTP_HOST,
port: process.env.EMAIL_PORT,
secure: false, // use SSL,
tls: { rejectUnauthorized: false },
sender: process.env.EMAIL_SENDER,
timeout: 10000, // in milliseconds
debug: true, // set debug to true to see debug logs
};


//Create transporter
const transporter = nodemailer.createTransport(smtpConfig);

// Send email function
const sendEmail = ({receiver, cc, subject, plainTextBody, htmlBody}) => {
return new Promise((resolve, reject) => {
const mailOptions = {
from: smtpConfig.sender,
to: receiver,
cc: cc,
subject: subject,
text: plainTextBody ? plainTextBody : null,
html: htmlBody ? htmlBody : null,
};
transporter.sendMail(mailOptions, function (error, info) {
if (error) {
reject(error);
}
resolve(info);
});
});
};

// Re-try options
const retryOptions = {
maxRetries: 3,
retryDelays: [1, 2, 3], // in minutes - Exponential backoff strategy
};

// Exports
module.exports = {
sendEmail,
retryOptions,
};
13 changes: 12 additions & 1 deletion server/config/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ const { createLogger, format, transports } = require('winston');
// docs: https://github.com/winstonjs/winston
const isProduction = process.env.NODE_ENV === 'production';

// Print logs in color depending on log type
format.colorize().addColors({
error: "red",
warn: "yellow",
info: "green",
http: "magenta",
});

const getFormat = () =>
format.combine(
isProduction ? format.uncolorize() : format.colorize({ all: true }), // adding or removing colors depending on logs type;
Expand All @@ -27,7 +35,10 @@ let DEFAULT_LOG_LEVEL = 'http';
// Initialize logger
const logger = createLogger({
exitOnError: false,
format: format.combine(format.errors({ stack: true }), format.timestamp()), // this will be common setting for all transports;
format: format.combine(
format.errors({ stack: true }),
format.timestamp()
), // this will be common setting for all transports;
transports: [
new transports.Console({
...common,
Expand Down
17 changes: 12 additions & 5 deletions server/job-scheduler.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const Bree = require("bree");

const logger = require("./config/logger");

const {
logBreeJobs,
createNewBreeJob,
Expand All @@ -13,7 +12,6 @@ const {
startJob,
startAllJobs,
} = require("./jobSchedularMethods/breeJobs.js");

const {
scheduleCheckForJobsWithSingleDependency,
executeJob,
Expand All @@ -27,13 +25,11 @@ const {
createClusterMonitoringBreeJob,
scheduleClusterMonitoringOnServerStart,
} = require("./jobSchedularMethods/clusterJobs.js");

const {
createJobMonitoringBreeJob,
scheduleJobMonitoringOnServerStart,
scheduleJobStatusPolling,
} = require("./jobSchedularMethods/hpccJobs.js");

const {
createLandingZoneFileMonitoringBreeJob,
createLogicalFileMonitoringBreeJob,
Expand All @@ -43,6 +39,8 @@ const {
scheduleFileMonitoringOnServerStart,
scheduleFileMonitoring,
} = require("./jobSchedularMethods/hpccFiles.js");
const { scheduleKeyCheck } = require("./jobSchedularMethods/apiKeys.js");
const {scheduleEmailNotificationProcessing, scheduleTeamsNotificationProcessing} = require("./jobSchedularMethods/notificationJobs.js");

const {
createOrbitMegaphoneJob,
Expand Down Expand Up @@ -121,8 +119,10 @@ class JobScheduler {
await this.scheduleSuperFileMonitoringOnServerStart();
await this.scheduleClusterMonitoringOnServerStart();
await this.scheduleKeyCheck();
await this.scheduleJobMonitoringOnServerStart();
// await this.scheduleJobMonitoringOnServerStart();
await this.createClusterUsageHistoryJob();
await this.scheduleEmailNotificationProcessing();
await this.scheduleTeamsNotificationProcessing();
await this.scheduleOrbitMonitoringOnServerStart();
await this.createOrbitMegaphoneJob();

Expand Down Expand Up @@ -325,6 +325,13 @@ class JobScheduler {
return scheduleKeyCheck.call(this);
}

//Process notification queue
scheduleEmailNotificationProcessing() {
return scheduleEmailNotificationProcessing.call(this);
}
scheduleTeamsNotificationProcessing(){
return scheduleTeamsNotificationProcessing.call(this);
}
//orbit jobs
createOrbitMegaphoneJob() {
return createOrbitMegaphoneJob.call(this);
Expand Down
56 changes: 56 additions & 0 deletions server/jobSchedularMethods/notificationJobs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

const path = require("path");
const logger = require("../config/logger");
const PROCESS_EMAIL_NOTIFICATIONS = path.join("notifications", "processEmailNotifications.js");
const PROCESS_TEAMS_NOTIFICATIONS = path.join("notifications", "processTeamsNotifications.js");

async function scheduleEmailNotificationProcessing() {
try {
let jobName = "email-notification-processing-" + new Date().getTime();
this.bree.add({
name: jobName,
interval: "10s", // Make it 120 seconds in production
path: path.join(__dirname, "..", "jobs", PROCESS_EMAIL_NOTIFICATIONS),
worker: {
workerData: {
jobName: jobName,
WORKER_CREATED_AT: Date.now(),
},
},
});

this.bree.start(jobName);
logger.info("🔔 E-MAIL NOTIFICATION PROCESSING STARTED ...");
} catch (err) {
console.error(err);
}
}

async function scheduleTeamsNotificationProcessing() {
try {
let jobName = "teams-notification-processing-" + new Date().getTime();
this.bree.add({
name: jobName,
interval: "10s", // Make it 120 seconds in production
path: path.join(__dirname, "..", "jobs", PROCESS_TEAMS_NOTIFICATIONS),
worker: {
workerData: {
jobName: jobName,
WORKER_CREATED_AT: Date.now(),
},
},
});

this.bree.start(jobName);
logger.info("🔔 TEAMS NOTIFICATION PROCESSING STARTED ...");
} catch (err) {
console.error(err);
}
}



module.exports = {
scheduleEmailNotificationProcessing,
scheduleTeamsNotificationProcessing,
};
61 changes: 61 additions & 0 deletions server/jobs/notifications/notificationsHelperFunctions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
const path = require("path");
const ejs = require("ejs");
const fs = require("fs");
const models = require("../../models");
const logger = require("../../config/logger");

const NotificationQueue = models.notification_queue;
const { retryOptions: { maxRetries, retryDelays } } = require("../../config/emailConfig");

// Renders HTML template for email notification
const renderEmailBody = ({ notificationOrigin, emailData }) => {
const templatePath = path.join( __dirname, "..", "..", "notificationTemplates","email", `${notificationOrigin}.ejs`);
const template = fs.readFileSync(templatePath, "utf-8")
return ejs.render(template, emailData);
};

// Function to calculate the retryAfter time
const calculateRetryAfter = ({
attemptCount,
retryDelays, // Configs related to emails should not be passed as params
maxRetries,
currentDateTime,
}) => {
if (attemptCount === maxRetries - 1) {
return null;
} else {
return new Date(currentDateTime + retryDelays[attemptCount] * 60000);
}
};

//Update notification queue on error
async function updateNotificationQueueOnError({ notificationId,
attemptCount,
notification,
error
}) {
try {
await NotificationQueue.update(
{
attemptCount: attemptCount + 1,
failureMessage: { err: error.message, notification },
reTryAfter: calculateRetryAfter({
attemptCount,
retryDelays: retryDelays,
maxRetries: maxRetries,
currentDateTime: Date.now(),
}),
},
{ where: { id: notificationId } }
);
} catch (updateError) {
logger.error(updateError);
}
}


module.exports = {
renderEmailBody,
calculateRetryAfter,
updateNotificationQueueOnError,
};
Loading
Loading