Skip to content

Commit

Permalink
fix(core): reject all promises in pool during shutdown (nrwl#22188)
Browse files Browse the repository at this point in the history
  • Loading branch information
FrozenPandaz authored Mar 6, 2024
1 parent d2cd822 commit 84d96cc
Showing 1 changed file with 24 additions and 28 deletions.
52 changes: 24 additions & 28 deletions packages/nx/src/project-graph/plugins/plugin-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ const pool: Set<ChildProcess> = new Set();

const pidMap = new Map<number, { name: string; pending: Set<string> }>();

interface PromiseBankEntry {
promise: Promise<unknown>;
resolver: (result: any) => void;
rejector: (err: any) => void;
}

// transaction id (tx) -> Promise, Resolver, Rejecter
// Makes sure that we can resolve the correct promise when the worker sends back the result
const promiseBank = new Map<
string,
{
promise: Promise<unknown>;
resolver: (result: any) => void;
rejecter: (err: any) => void;
}
>();
const promiseBank = new Map<string, PromiseBankEntry>();

export function loadRemoteNxPlugin(plugin: PluginConfiguration, root: string) {
// this should only really be true when running unit tests within
Expand Down Expand Up @@ -71,17 +70,14 @@ export async function shutdownPluginWorkers() {

const pending = getPendingPromises(pool, pidMap);

if (pending.length > 0) {
// logger.verbose(
// `[plugin-pool] waiting for ${pending.length} pending operations to complete`
// );
await Promise.all(pending);
for (const pendingPromise of pending) {
pendingPromise.rejector(new Error('Shutting down'));
}

// logger.verbose(`[plugin-pool] all pending operations completed`);

for (const p of pool) {
p.kill('SIGINT');
for (const childProcess of pool) {
childProcess.kill('SIGINT');
}

// logger.verbose(`[plugin-pool] all workers killed`);
Expand Down Expand Up @@ -167,31 +163,31 @@ function createWorkerHandler(
}
},
createDependenciesResult: ({ tx, ...result }) => {
const { resolver, rejecter } = promiseBank.get(tx);
const { resolver, rejector } = promiseBank.get(tx);
if (result.success) {
resolver(result.dependencies);
} else if (result.success === false) {
rejecter(result.error);
rejector(result.error);
}
pidMap.get(worker.pid)?.pending.delete(tx);
promiseBank.delete(tx);
},
createNodesResult: ({ tx, ...result }) => {
const { resolver, rejecter } = promiseBank.get(tx);
const { resolver, rejector } = promiseBank.get(tx);
if (result.success) {
resolver(result.result);
} else if (result.success === false) {
rejecter(result.error);
rejector(result.error);
}
pidMap.get(worker.pid)?.pending.delete(tx);
promiseBank.delete(tx);
},
processProjectGraphResult: ({ tx, ...result }) => {
const { resolver, rejecter } = promiseBank.get(tx);
const { resolver, rejector } = promiseBank.get(tx);
if (result.success) {
resolver(result.graph);
} else if (result.success === false) {
rejecter(result.error);
rejector(result.error);
}
pidMap.get(worker.pid)?.pending.delete(tx);
promiseBank.delete(tx);
Expand All @@ -204,8 +200,8 @@ function createWorkerExitHandler(worker: ChildProcess) {
return () => {
if (!pluginWorkersShutdown) {
pidMap.get(worker.pid)?.pending.forEach((tx) => {
const { rejecter } = promiseBank.get(tx);
rejecter(
const { rejector } = promiseBank.get(tx);
rejector(
new Error(
`Plugin worker ${
pidMap.get(worker.pid).name ?? worker.pid
Expand All @@ -228,11 +224,11 @@ function getPendingPromises(
pool: Set<ChildProcess>,
pidMap: Map<number, { name: string; pending: Set<string> }>
) {
const pendingTxs: Array<Promise<unknown>> = [];
const pendingTxs: Array<PromiseBankEntry> = [];
for (const p of pool) {
const { pending } = pidMap.get(p.pid) ?? { pending: new Set() };
for (const tx of pending) {
pendingTxs.push(promiseBank.get(tx)?.promise);
pendingTxs.push(promiseBank.get(tx));
}
}
return pendingTxs;
Expand All @@ -243,11 +239,11 @@ function registerPendingPromise(
pending: Set<string>,
callback: () => void
): Promise<any> {
let resolver, rejecter;
let resolver, rejector;

const promise = new Promise((res, rej) => {
resolver = res;
rejecter = rej;
rejector = rej;

callback();
}).then((val) => {
Expand All @@ -261,7 +257,7 @@ function registerPendingPromise(
promiseBank.set(tx, {
promise,
resolver,
rejecter,
rejector,
});
return promise;
}

0 comments on commit 84d96cc

Please sign in to comment.