Skip to content
This repository has been archived by the owner on May 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #179 from davidahouse/house/176/stop-worker-cleanly
Browse files Browse the repository at this point in the history
✨ Improved graceful shutdown
  • Loading branch information
davidahouse authored Sep 10, 2020
2 parents 187aab2 + 9b43ec6 commit 83c213d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ npm-debug.log*
yarn-debug.log*
yarn-error.log*
.DS_Store
.stampederc
.stampederc*
.*rc
startWorkers.sh
stopWorkers.sh
Expand Down
3 changes: 3 additions & 0 deletions .ubolt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ newversion:
version:
command: npm view stampede-worker version
description: Show the current version of the package
runlocal:
command: bin/stampede-worker.js --config .stampederc-local
description: Run worker locally against test docker containers
44 changes: 40 additions & 4 deletions bin/stampede-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ let lastTask = {};
let currentSpawnedTask = null;
let currentTaskStartTime = null;
let currentTaskTimeout = conf.taskTimeout;
let pendingShutdown = false;

logger.info(figlet.textSync("stampede", { horizontalLayout: "full" }));
logger.info(module.exports.version);
Expand Down Expand Up @@ -136,14 +137,24 @@ process.on("SIGINT", function () {
gracefulShutdown();
});

process.on("SIGHUP", function () {
gracefulShutdown();
});

/**
* gracefulShutdown
*/
async function gracefulShutdown() {
logger.info("Closing queues");
await workerQueue.close();
await responseQueue.close();
process.exit(0);
if (currentSpawnedTask != null) {
pendingShutdown = true;
} else {
logger.info("Closing worker queue");
await workerQueue.close();
logger.info("Closing response queue");
await responseQueue.close();
logger.info("Done");
process.exit(0);
}
}

/**
Expand Down Expand Up @@ -253,6 +264,17 @@ async function handleTask(task, responseQueue) {
);
currentSpawnedTask = null;
currentTaskStartTime = null;

// If we kill the task due to a pending shutdown then just close our worker queue and return
// so that the task will get re-queued
if (pendingShutdown == true) {
task.status = "queued";
task.stats.startedAt = null;
await updateTask(task, responseQueue);
await workerQueue.close();
return;
}

logger.verbose("Updating task record to capture completed state");
const finishedAt = new Date();
task.stats.finishedAt = finishedAt;
Expand Down Expand Up @@ -309,6 +331,20 @@ async function handleHeartbeat(queue) {
}
}

// If we are trying to shutdown during a task execution, go ahead and kill the task
if (pendingShutdown == true) {
if (currentSpawnedTask != null) {
currentSpawnedTask.kill();
currentTaskStartTime = null;
currentTaskTimeout = conf.taskTimeout;
} else {
logger.info("Closing response queue");
await responseQueue.close();
logger.info("Done");
process.exit(0);
}
}

setTimeout(handleHeartbeat, conf.heartbeatInterval, queue);
}

Expand Down

0 comments on commit 83c213d

Please sign in to comment.