Skip to content

Commit

Permalink
feat: allow pgboss schema name to be configurable (#28)
Browse files Browse the repository at this point in the history
* feat: allow schema name to be configured via environment variable

* rename to consumerSchema
  • Loading branch information
jenbutongit authored Sep 18, 2024
1 parent ebe5c7a commit 888f6f2
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,6 @@ See [TROUBLESHOOTING.md](./TROUBLESHOOTING.md) on how to troubleshoot the worker
| DELETE_ARCHIVED_IN_DAYS | string/int | 7 | How long to keep jobs in pgboss.archive before deleting it |
| SUBMISSION_REQUEST_TIMEOUT | string/int | 2000 | How long to keep the POST request alive for in milliseconds. This should be higher (20-30s) if integrating into CASEBOOK/Orbit which has long response times |
| NEW_JOB_CHECK_INTERVAL | string/int | 2000 | The frequency to check for new jobs in milliseconds |
| QUEUE_SCHEMA | string | pgboss | The schema name for pgboss to use. If it does not exist, pgboss will create the schema and related tables in this schema. |

Types are described as string/int since kubernetes only accepts strings. Strings are parsed into int.
7 changes: 6 additions & 1 deletion TROUBLESHOOTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ kubectl run -it --rm --env PGPASSWORD='<PASSWORD>' --env PAGER= --image=postgres
Replace PASSWORD with the password for the database, ENDPOINT_URL with the endpoint URL for the database.



[pgboss](https://github.com/timgit/pg-boss) is used to manage queueing jobs. On application start, pgboss will automatically create necessary tables in the database.

By default, pgboss will create the schema `pgboss`. The tables will be created in this schema.
This is configured using the environment variable `QUEUE_SCHEMA`. If you have set this to a different schema, you will need to change the schema in the following queries.

For example, if `QUEUE_SCHEMA` is set to `pgboss_v9`, query the job table with `select * from pgboss_v9.job;`


### Jobs table
The jobs table `pgboss.job` is where all the current jobs are stored. Jobs will remain here, until they are completed or failed. Then they will move to `pgboss.archive`

Expand Down
1 change: 1 addition & 0 deletions worker/config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"url": "QUEUE_URL",
"archiveFailedAfterDays": "ARCHIVE_FAILED_AFTER_DAYS",
"deleteArchivedAfterDays": "DELETE_ARCHIVED_IN_DAYS",
"schema": "QUEUE_SCHEMA"
},
"Submission": {
"requestTimeout": "SUBMISSION_REQUEST_TIMEOUT"
Expand Down
1 change: 1 addition & 0 deletions worker/config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module.exports = {
url: "postgres://user:root@localhost:5432/queue",
archiveFailedInDays: 30,
deleteArchivedAfterDays: 7,
schema: "pgboss",
},
Submission: {
requestTimeout: 2000,
Expand Down
20 changes: 14 additions & 6 deletions worker/src/Consumer/getConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,32 @@ import pino from "pino";

const URL = config.get<string>("Queue.url");
const logger = pino().child({ method: "Consumer.create" });
let consumer;

const MINUTE_IN_S = 60;
const HOUR_IN_S = MINUTE_IN_S * 60;
const DAY_IN_S = HOUR_IN_S * 24;

const archiveFailedAfterDays = parseInt(config.get<string>("Queue.archiveFailedInDays"));
const deleteAfterDays = parseInt(config.get<string>("Queue.deleteArchivedAfterDays"));
const schema = config.get<string>("Queue.schema");

const registeredConsumers: Record<string, PgBoss> = {};

logger.info(`archiveFailedAfterDays: ${archiveFailedAfterDays}, deleteAfterDays: ${deleteAfterDays}`);

type ConsumerOptions = {
schema?: string;
};
/**
* Sets up database connection via PgBoss and creates an instance of a "consumer" (consumes the queue).
*/
export async function create() {
export async function create(options: ConsumerOptions) {
const { schema } = options;
const boss = new PgBoss({
connectionString: URL,
archiveFailedAfterSeconds: archiveFailedAfterDays * DAY_IN_S,
deleteAfterDays,
schema,
});

boss.on("error", (error) => {
Expand All @@ -47,13 +54,14 @@ export async function create() {
* `getConsumer` should be used whenever an instance of a consumer is needed.
* This is to prevent too many database connections from being opened unnecessarily.
*/
export async function getConsumer() {
export async function getConsumer(name?: string) {
const consumerSchema = name ?? schema;
const consumer = registeredConsumers[consumerSchema];
try {
if (!consumer) {
const boss = await create();
consumer = boss;
registeredConsumers[consumerSchema] = await create({ schema: consumerSchema });
}
return consumer;
return registeredConsumers[consumerSchema];
} catch (e) {
logger.error(e);
process.exit(1);
Expand Down

0 comments on commit 888f6f2

Please sign in to comment.