Skip to content

Commit

Permalink
Restored mail job (#306)
Browse files Browse the repository at this point in the history
Co-authored-by: Rajat Saxena <[email protected]>
  • Loading branch information
rajat1saxena and Rajat Saxena authored Nov 19, 2023
1 parent 85c57dc commit 57af357
Show file tree
Hide file tree
Showing 70 changed files with 751 additions and 32 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
4 changes: 3 additions & 1 deletion apps/queue/.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ EMAIL_USER=email_user
EMAIL_PASS=email_pass
EMAIL_HOST=email_host
DB_CONNECTION_STRING=mongodb://db.string
SEQUENCE_BOUNCE_LIMIT=3
REDIS_HOST=localhost
REDIS_PORT=6379
SEQUENCE_BOUNCE_LIMIT=3
2 changes: 2 additions & 0 deletions apps/queue/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
},
"dependencies": {
"@courselit/common-models": "workspace:^",
"bullmq": "^4.14.0",
"express": "^4.18.2",
"mongoose": "^8.0.0",
"nodemailer": "^6.9.2",
"pino": "^8.14.1",
Expand Down
13 changes: 13 additions & 0 deletions apps/queue/src/domain/mail/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { MailJob } from "./model/mail-job";
import mailQueue from "./queue";

export async function addMailJob({ to, subject, body, from }: MailJob) {
for (let recipient of to) {
await mailQueue.add("mail", {
to: recipient,
subject,
body,
from,
});
}
}
10 changes: 10 additions & 0 deletions apps/queue/src/domain/mail/model/mail-job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { z } from "zod";

export const MailJob = z.object({
to: z.string().array(),
from: z.string(),
subject: z.string(),
body: z.string(),
});

export type MailJob = z.infer<typeof MailJob>;
8 changes: 6 additions & 2 deletions apps/queue/src/domain/mail/process-ongoing-sequences.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const transporter = nodemailer.createTransport({
},
});

const sequenceBounceLimit = process.env.SEQUENCE_BOUNCE_LIMIT
? +process.env.SEQUENCE_BOUNCE_LIMIT
: 3;

export async function processOngoingSequences(): Promise<void> {
while (true) {
const currentTime = new Date().getTime();
Expand All @@ -25,7 +29,7 @@ export async function processOngoingSequences(): Promise<void> {
const dueOngoingSequences: OngoingSequence[] =
await OngoingSequenceModel.find({
nextEmailScheduledTime: { $lt: currentTime },
retryCount: { $lt: +process.env.SEQUENCE_BOUNCE_LIMIT },
retryCount: { $lt: sequenceBounceLimit },
});

for (const ongoingSequence of dueOngoingSequences) {
Expand Down Expand Up @@ -134,7 +138,7 @@ async function attemptMailSending({
});
} catch (err: any) {
ongoingSequence.retryCount++;
if (ongoingSequence.retryCount >= +process.env.SEQUENCE_BOUNCE_LIMIT) {
if (ongoingSequence.retryCount >= sequenceBounceLimit) {
sequence.report.sequence.failed = [
...sequence.report.sequence.failed,
ongoingSequence.userId,
Expand Down
6 changes: 6 additions & 0 deletions apps/queue/src/domain/mail/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Queue } from "bullmq";
import redis from "../../redis";

const mailQueue = new Queue("mail", { connection: redis });

export default mailQueue;
34 changes: 34 additions & 0 deletions apps/queue/src/domain/mail/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Worker } from "bullmq";
import nodemailer from "nodemailer";
import redis from "../../redis";
import { logger } from "../../logger";

const transporter = nodemailer.createTransport({
host: process.env.EMAIL_HOST,
port: +(process.env.EMAIL_PORT || 587),
auth: {
user: process.env.EMAIL_USER,
pass: process.env.EMAIL_PASS,
},
});

const worker = new Worker(
"mail",
async (job) => {
const { to, from, subject, body } = job.data;

try {
await transporter.sendMail({
from,
to,
subject,
html: body,
});
} catch (err: any) {
logger.error(err);
}
},
{ connection: redis },
);

export default worker;
24 changes: 16 additions & 8 deletions apps/queue/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import { connectToDatabase } from "./db";
import { processOngoingSequences } from "./domain/mail/process-ongoing-sequences";
import { processRules } from "./domain/mail/process-rules";
import express from "express";
import jobRoutes from "./job/routes";

(async () => {
await connectToDatabase();
// start workers
import "./domain/mail/worker";
import { startEmailAutomation } from "./start-email-automation";

processOngoingSequences();
processRules();
})();
const app = express();
app.use(express.json());

app.use("/job", jobRoutes);

startEmailAutomation();

const port = process.env.PORT || 80;
app.listen(port, () => {
console.log(`Queue server running at ${port}`);
});
22 changes: 22 additions & 0 deletions apps/queue/src/job/routes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import express from "express";
import { addMailJob } from "../domain/mail/handler";
import { logger } from "../logger";
import { MailJob } from "../domain/mail/model/mail-job";

const router = express.Router();

router.post("/mail", async (req: express.Request, res: express.Response) => {
try {
const { to, from, subject, body } = req.body;
MailJob.parse({ to, from, subject, body });

await addMailJob({ to, from, subject, body });

res.status(200).json({ message: "Success" });
} catch (err: any) {
logger.error(err);
res.status(500).json({ error: err.message });
}
});

export default router;
4 changes: 4 additions & 0 deletions apps/queue/src/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export default {
host: process.env.REDIS_HOST || "localhost",
port: +(process.env.REDIS_PORT || 6379),
};
10 changes: 10 additions & 0 deletions apps/queue/src/start-email-automation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { connectToDatabase } from "./db";
import { processOngoingSequences } from "./domain/mail/process-ongoing-sequences";
import { processRules } from "./domain/mail/process-rules";

export async function startEmailAutomation() {
await connectToDatabase();

processOngoingSequences();
processRules();
}
Loading

1 comment on commit 57af357

@vercel
Copy link

@vercel vercel bot commented on 57af357 Nov 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

courselit – ./

courselit-codelit.vercel.app
courselit-git-main-codelit.vercel.app
*.clqa.xyz

Please sign in to comment.