Skip to content

Commit

Permalink
feat: improve exit handling
Browse files Browse the repository at this point in the history
- stop database befor exiting
- return more promises where available
- stop all server
- do no call process.exit somewhere else, instead call gracefulShutdown
  • Loading branch information
mytlogos committed Aug 5, 2022
1 parent 3c6e48d commit 5b5a22f
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 16 deletions.
1 change: 1 addition & 0 deletions packages/core/src/database/storages/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ export const notificationStorage = createStorage<NotificationContext>("notificat
*/
export const startStorage = (): void => poolProvider.start();

// gets called by gracefulShutdown in exit.ts, after every handler was called, so do not register a handler
export const stopStorage = (): EmptyPromise => poolProvider.stop();

export const waitStorage = (): EmptyPromise => poolProvider.startPromise;
23 changes: 15 additions & 8 deletions packages/core/src/exit.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import process, { nextTick } from "process";
import { stopStorage } from "./database/storages/storage";
import logger from "./logger";

const registeredHandlers = new Set<ExitHandler>();
Expand All @@ -17,7 +18,7 @@ export function registerOnExitHandler(handler: ExitHandler) {
/**
* Call every exit handler and exit after all result promises are settled.
*/
function signalHandler() {
export function gracefulShutdown() {
const promises: Array<Promise<void>> = [];
const nowMillis = Date.now();

Expand All @@ -32,12 +33,18 @@ function signalHandler() {
Promise.allSettled(promises).finally(() => {
const timeLeft = Math.max(500, 5000 - (Date.now() - nowMillis));
logger.info(`Exiting in ${timeLeft}ms`);
// close logger and wait a bit for all transports to finish before exiting
logger.close();
// wait at least 500ms but at most 5000ms to exit from now on, depending how much time
// the listeners already took
setTimeout(() => nextTick(() => process.exit(1)), timeLeft);
// close the database
stopStorage()
.catch(console.log)
// close logger and wait a bit for all transports to finish before exiting
.then(() => logger.close())
.finally(() => {
// wait at least 500ms but at most 5000ms to exit from now on, depending how much time
// the listeners already took
// never use exitCode 0, every exit is more or less an exceptional case
setTimeout(() => nextTick(() => process.exit(1)), timeLeft);
});
});
}
process.on("SIGTERM", signalHandler);
process.on("SIGINT", signalHandler);
process.on("SIGTERM", gracefulShutdown);
process.on("SIGINT", gracefulShutdown);
5 changes: 5 additions & 0 deletions packages/core/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { MESSAGE } from "triple-beam";

let filePrefix: string;
const appName = process.env.NODE_APP_NAME || process.env.name;

if (appName) {
filePrefix = appName.replace(/[^\w-]/g, "") + "-";
} else {
Expand Down Expand Up @@ -249,6 +250,10 @@ export default {
log("silly", value, meta);
},
close() {
const promise = new Promise((resolve) => {
logger.addListener("close", resolve);
});
logger.close();
return promise;
},
};
7 changes: 4 additions & 3 deletions packages/scraper/src/externals/jobScraperManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { ScrapeJob } from "./scrapeJobs";
import diagnostic_channel from "diagnostics_channel";
import { SchedulingStrategy, Strategies } from "./scheduling";
import { JobError } from "enterprise-core/dist/error";
import { gracefulShutdown } from "enterprise-core/dist/exit";

const missingConnections = new Set<Date>();
const jobChannel = diagnostic_channel.channel("enterprise-jobs");
Expand Down Expand Up @@ -346,13 +347,13 @@ export class JobScraperManager {
if (maxDate) {
if (maxDate < now && this.queue.invalidRunning(maxDate, 5)) {
logger.error("Restarting Process due to long running jobs");
process.exit(1);
gracefulShutdown();
return;
}
now.setHours(-2);
if (maxDate < now && this.queue.invalidRunning(maxDate, 1)) {
logger.error("Restarting Process due to long running jobs");
process.exit(1);
gracefulShutdown();
return;
}
}
Expand All @@ -377,7 +378,7 @@ export class JobScraperManager {
storageRunning: runningJobs.length,
queueRunning: this.queue.runningJobs,
});
process.exit(1);
gracefulShutdown();
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions packages/scraper/src/externals/scraperTools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import {
tocScraperEntries,
} from "./hookManager";
import { DatabaseError, MissingEntityError, ValidationError } from "enterprise-core/dist/error";
import { registerOnExitHandler } from "enterprise-core/dist/exit";

interface ScrapeableFilterResult {
available: string[];
Expand Down Expand Up @@ -718,6 +719,12 @@ export function checkTocContent(content: TocContent, allowMinusOne = false): voi
const cache = new Cache({ size: 500, deleteOnExpire: true, stdTTL: 60 * 60 * 2 });
const errorCache = new Cache({ size: 500, deleteOnExpire: true, stdTTL: 60 * 60 * 2 });

// clear any timers
registerOnExitHandler(() => {
cache.close();
errorCache.close();
});

export interface ListScrapeEvent {
external: { cookies: string; uuid: Uuid; userUuid: Uuid; type: number };
lists: ListScrapeResult;
Expand Down
20 changes: 16 additions & 4 deletions packages/scraper/src/startCrawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ import path from "path";
import { readFileSync } from "fs";
import "./metrics";

// start websocket server
// eslint-disable-next-line import/first
import "./websocket";
import { registerOnExitHandler } from "enterprise-core/dist/exit";

collectDefaultMetrics({
labels: {
NODE_APP_INSTANCE: "enterprise-crawler",
},
});

// start websocket server
// eslint-disable-next-line import/first
import "./websocket";

const debugMessenger = debug("enterprise-lister:crawler");
logger.info(`Process PID: ${process.pid} in environment '${process.env.NODE_ENV || ""}'`);
// first start storage, then crawler, as crawler depends on storage
Expand Down Expand Up @@ -156,6 +157,17 @@ function onError(error: any) {
}

function onListening() {
registerOnExitHandler(() => {
return new Promise((resolve, reject) => {
server.close((err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
});
const address = server.address();
console.log("Listening: ", address);

Expand Down
2 changes: 2 additions & 0 deletions packages/scraper/src/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import diagnostic_channel from "diagnostics_channel";
import { ScraperChannel, WSRequest } from "./externals/types";
import { DefaultJobScraper } from "./externals/jobScraperManager";
import { publishQueues } from "./externals/queueManager";
import { registerOnExitHandler } from "enterprise-core/dist/exit";

const ws = new Websocket.Server({
port: 3001,
});

ws.on("connection", (socket) => {
registerOnExitHandler(() => new Promise((resolve, reject) => ws.close((err) => (err ? reject(err) : resolve()))));
socket.on("message", (data) => {
try {
let msg: WSRequest;
Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/deviceVerificator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import diagram from "dgram";
import { isString } from "enterprise-core/dist/tools";
import env from "enterprise-core/dist/env";
import logger from "enterprise-core/dist/logger";
import { registerOnExitHandler } from "enterprise-core/dist/exit";

const PORT = env.port;

const server = diagram.createSocket("udp4");

server.on("listening", () => {
registerOnExitHandler(() => new Promise((resolve) => server.close(resolve)));
const address = server.address();
server.setBroadcast(true);
if (isString(address)) {
Expand Down
14 changes: 13 additions & 1 deletion packages/server/src/startServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "./deviceVerificator";
import logger from "enterprise-core/dist/logger";
import { getMainInterface } from "enterprise-core/dist/tools";
import { AppStatus } from "enterprise-core/dist/status";
import { registerOnExitHandler } from "enterprise-core/dist/exit";
import process from "node:process";

const port = env.port;
Expand All @@ -24,7 +25,7 @@ app.set("port", port);
* Listen on provided port, on all network interfaces.
* Add event listener on server.
*/
app.listen(port).on("error", onError).on("listening", onListening);
const server = app.listen(port).on("error", onError).on("listening", onListening);

/**
* Event listener for HTTP server "error" event.
Expand Down Expand Up @@ -57,6 +58,17 @@ const status = new AppStatus("server");
* Event listener for HTTP server "listening" event.
*/
function onListening() {
registerOnExitHandler(() => {
return new Promise((resolve, reject) => {
server.close((err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
});
const bind = "port " + port;

const interfaceIp = getMainInterface() || "";
Expand Down

0 comments on commit 5b5a22f

Please sign in to comment.