diff --git a/README.md b/README.md index 60f5beb..7ece583 100644 --- a/README.md +++ b/README.md @@ -79,5 +79,6 @@ See [TROUBLESHOOTING.md](./TROUBLESHOOTING.md) on how to troubleshoot the worker | DELETE_ARCHIVED_IN_DAYS | string/int | 7 | How long to keep jobs in pgboss.archive before deleting it | | SUBMISSION_REQUEST_TIMEOUT | string/int | 2000 | How long to keep the POST request alive for in milliseconds. This should be higher (20-30s) if integrating into CASEBOOK/Orbit which has long response times | | NEW_JOB_CHECK_INTERVAL | string/int | 2000 | The frequency to check for new jobs in milliseconds | +| QUEUE_SCHEMA | string | pgboss | The schema name for pgboss to use. If it does not exist, pgboss will create the schema and related tables in this schema. | Types are described as string/int since kubernetes only accepts strings. Strings are parsed into int. \ No newline at end of file diff --git a/TROUBLESHOOTING.md b/TROUBLESHOOTING.md index f2f6329..0ac57bd 100644 --- a/TROUBLESHOOTING.md +++ b/TROUBLESHOOTING.md @@ -20,9 +20,14 @@ kubectl run -it --rm --env PGPASSWORD='' --env PAGER= --image=postgres Replace PASSWORD with the password for the database, ENDPOINT_URL with the endpoint URL for the database. - [pgboss](https://github.com/timgit/pg-boss) is used to manage queueing jobs. On application start, pgboss will automatically create necessary tables in the database. +By default, pgboss will create the schema `pgboss`. The tables will be created in this schema. +This is configured using the environment variable `QUEUE_SCHEMA`. If you have set this to a different schema, you will need to change the schema in the following queries. + +For example, if `QUEUE_SCHEMA` is set to `pgboss_v9`, query the job table with `select * from pgboss_v9.job;` + + ### Jobs table The jobs table `pgboss.job` is where all the current jobs are stored. Jobs will remain here, until they are completed or failed. Then they will move to `pgboss.archive` diff --git a/worker/config/custom-environment-variables.json b/worker/config/custom-environment-variables.json index 3525bfb..1956283 100644 --- a/worker/config/custom-environment-variables.json +++ b/worker/config/custom-environment-variables.json @@ -3,6 +3,7 @@ "url": "QUEUE_URL", "archiveFailedAfterDays": "ARCHIVE_FAILED_AFTER_DAYS", "deleteArchivedAfterDays": "DELETE_ARCHIVED_IN_DAYS", + "schema": "QUEUE_SCHEMA" }, "Submission": { "requestTimeout": "SUBMISSION_REQUEST_TIMEOUT" diff --git a/worker/config/default.js b/worker/config/default.js index 43b9196..75f048e 100644 --- a/worker/config/default.js +++ b/worker/config/default.js @@ -3,6 +3,7 @@ module.exports = { url: "postgres://user:root@localhost:5432/queue", archiveFailedInDays: 30, deleteArchivedAfterDays: 7, + schema: "pgboss", }, Submission: { requestTimeout: 2000, diff --git a/worker/src/Consumer/getConsumer.ts b/worker/src/Consumer/getConsumer.ts index 1a0d230..94a821a 100644 --- a/worker/src/Consumer/getConsumer.ts +++ b/worker/src/Consumer/getConsumer.ts @@ -5,7 +5,6 @@ import pino from "pino"; const URL = config.get("Queue.url"); const logger = pino().child({ method: "Consumer.create" }); -let consumer; const MINUTE_IN_S = 60; const HOUR_IN_S = MINUTE_IN_S * 60; @@ -13,17 +12,25 @@ const DAY_IN_S = HOUR_IN_S * 24; const archiveFailedAfterDays = parseInt(config.get("Queue.archiveFailedInDays")); const deleteAfterDays = parseInt(config.get("Queue.deleteArchivedAfterDays")); +const schema = config.get("Queue.schema"); + +const registeredConsumers: Record = {}; logger.info(`archiveFailedAfterDays: ${archiveFailedAfterDays}, deleteAfterDays: ${deleteAfterDays}`); +type ConsumerOptions = { + schema?: string; +}; /** * Sets up database connection via PgBoss and creates an instance of a "consumer" (consumes the queue). */ -export async function create() { +export async function create(options: ConsumerOptions) { + const { schema } = options; const boss = new PgBoss({ connectionString: URL, archiveFailedAfterSeconds: archiveFailedAfterDays * DAY_IN_S, deleteAfterDays, + schema, }); boss.on("error", (error) => { @@ -47,13 +54,14 @@ export async function create() { * `getConsumer` should be used whenever an instance of a consumer is needed. * This is to prevent too many database connections from being opened unnecessarily. */ -export async function getConsumer() { +export async function getConsumer(name?: string) { + const consumerSchema = name ?? schema; + const consumer = registeredConsumers[consumerSchema]; try { if (!consumer) { - const boss = await create(); - consumer = boss; + registeredConsumers[consumerSchema] = await create({ schema: consumerSchema }); } - return consumer; + return registeredConsumers[consumerSchema]; } catch (e) { logger.error(e); process.exit(1);