diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 1cec4caa5c..344c96014e 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -2,20 +2,11 @@ import { parseExpression } from 'cron-parser'; import { RedisClient, RepeatBaseOptions, RepeatOptions } from '../interfaces'; import { JobsOptions, RepeatStrategy } from '../types'; import { Job } from './job'; +import { JobSchedulerJson, JobSchedulerTemplateJson } from '../interfaces'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; - -export interface JobSchedulerJson { - key: string; // key is actually the job scheduler id - name: string; - id?: string | null; - endDate: number | null; - tz: string | null; - pattern: string | null; - every?: string | null; - next?: number; -} +import { optsAsJSON, optsFromJSON } from '../utils'; export class JobScheduler extends QueueBase { private repeatStrategy: RepeatStrategy; @@ -103,6 +94,8 @@ export class JobScheduler extends QueueBase { (multi) as RedisClient, jobSchedulerId, nextMillis, + JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), + optsAsJSON(opts), { name: jobName, endDate: endDate ? new Date(endDate).getTime() : undefined, @@ -241,22 +234,47 @@ export class JobScheduler extends QueueBase { }; } - async getJobScheduler(id: string): Promise { + async getJobScheduler(id: string): Promise> { const client = await this.client; - const jobData = await client.hgetall(this.toKey('repeat:' + id)); + const schedulerAttributes = await client.hgetall( + this.toKey('repeat:' + id), + ); - if (jobData) { + if (schedulerAttributes) { return { key: id, - name: jobData.name, - endDate: parseInt(jobData.endDate) || null, - tz: jobData.tz || null, - pattern: jobData.pattern || null, - every: jobData.every || null, + name: schedulerAttributes.name, + endDate: parseInt(schedulerAttributes.endDate) || null, + tz: schedulerAttributes.tz || null, + pattern: schedulerAttributes.pattern || null, + every: schedulerAttributes.every || null, + ...(schedulerAttributes.data || schedulerAttributes.opts + ? { + template: this.getTemplateFromJSON( + schedulerAttributes.data, + schedulerAttributes.opts, + ), + } + : {}), }; } } + private getTemplateFromJSON( + rawData?: string, + rawOpts?: string, + ): JobSchedulerTemplateJson { + console.log(typeof rawOpts); + const template: JobSchedulerTemplateJson = {}; + if (rawData) { + template.data = JSON.parse(rawData); + } + if (rawOpts) { + template.opts = optsFromJSON(rawOpts); + } + return template; + } + async getJobSchedulers( start = 0, end = -1, diff --git a/src/classes/job.ts b/src/classes/job.ts index b63e5cdf49..21a73cff81 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -30,6 +30,8 @@ import { parseObjectValues, tryCatch, removeUndefinedFields, + optsAsJSON, + optsFromJSON, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -324,7 +326,7 @@ export class Job< jobId?: string, ): Job { const data = JSON.parse(json.data || '{}'); - const opts = Job.optsFromJSON(json.opts); + const opts = optsFromJSON(json.opts); const job = new this( queue, @@ -388,27 +390,6 @@ export class Job< this.scripts = new Scripts(this.queue); } - private static optsFromJSON(rawOpts?: string): JobsOptions { - const opts = JSON.parse(rawOpts || '{}'); - - const optionEntries = Object.entries(opts) as Array< - [keyof RedisJobOptions, any] - >; - - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if ((optsDecodeMap as Record)[attributeName]) { - options[(optsDecodeMap as Record)[attributeName]] = - value; - } else { - options[attributeName] = value; - } - } - - return options as JobsOptions; - } - /** * Fetches a Job from the queue given the passed job id. * @@ -469,7 +450,7 @@ export class Job< id: this.id, name: this.name, data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data), - opts: removeUndefinedFields(this.optsAsJSON(this.opts)), + opts: optsAsJSON(this.opts), parent: this.parent ? { ...this.parent } : undefined, parentKey: this.parentKey, progress: this.progress, @@ -487,24 +468,6 @@ export class Job< }); } - private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { - const optionEntries = Object.entries(opts) as Array< - [keyof JobsOptions, any] - >; - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if ((optsEncodeMap as Record)[attributeName]) { - options[(optsEncodeMap as Record)[attributeName]] = - value; - } else { - options[attributeName] = value; - } - } - - return options as RedisJobOptions; - } - /** * Prepares a job to be passed to Sandbox. * @returns diff --git a/src/classes/queue.ts b/src/classes/queue.ts index f642233ac3..3afa5691c0 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -3,6 +3,7 @@ import { BaseJobOptions, BulkJobOptions, IoredisListener, + JobSchedulerJson, QueueOptions, RepeatableJob, RepeatOptions, @@ -571,7 +572,7 @@ export class Queue< * * @param id - identifier of scheduler. */ - async getJobScheduler(id: string): Promise { + async getJobScheduler(id: string): Promise> { return (await this.jobScheduler).getJobScheduler(id); } diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index c9156b291d..0897d905a9 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -311,6 +311,8 @@ export class Scripts { client: RedisClient, jobSchedulerId: string, nextMillis: number, + templateData: string, + templateOpts: RedisJobOptions, opts: RepeatableOptions, ): Promise { const queueKeys = this.queue.keys; @@ -319,7 +321,15 @@ export class Scripts { queueKeys.repeat, queueKeys.delayed, ]; - const args = [nextMillis, pack(opts), jobSchedulerId, queueKeys['']]; + + const args = [ + nextMillis, + pack(opts), + jobSchedulerId, + templateData, + pack(templateOpts), + queueKeys[''], + ]; return this.execCommand(client, 'addJobScheduler', keys.concat(args)); } diff --git a/src/commands/addJobScheduler-2.lua b/src/commands/addJobScheduler-2.lua index 8b7a8084eb..c5a84a641c 100644 --- a/src/commands/addJobScheduler-2.lua +++ b/src/commands/addJobScheduler-2.lua @@ -13,7 +13,9 @@ [4] endDate? [5] every? ARGV[3] jobs scheduler id - ARGV[4] prefix key + ARGV[4] Json stringified template data + ARGV[5] mspacked template opts + ARGV[6] prefix key Output: repeatableKey - OK @@ -24,13 +26,14 @@ local delayedKey = KEYS[2] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[3] -local prefixKey = ARGV[4] +local templateOpts = cmsgpack.unpack(ARGV[5]) +local prefixKey = ARGV[6] -- Includes --- @include "includes/removeJob" -local function storeRepeatableJob(repeatKey, nextMillis, rawOpts) - rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) +local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts) + rcall("ZADD", repeatKey, nextMillis, schedulerId) local opts = cmsgpack.unpack(rawOpts) local optionalValues = {} @@ -54,7 +57,18 @@ local function storeRepeatableJob(repeatKey, nextMillis, rawOpts) table.insert(optionalValues, opts['every']) end - rcall("HMSET", repeatKey .. ":" .. jobSchedulerId, "name", opts['name'], + local jsonTemplateOpts = cjson.encode(templateOpts) + if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then + table.insert(optionalValues, "opts") + table.insert(optionalValues, jsonTemplateOpts) + end + + if templateData and templateData ~= '{}' then + table.insert(optionalValues, "data") + table.insert(optionalValues, templateData) + end + + rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'], unpack(optionalValues)) end @@ -74,4 +88,4 @@ if prevMillis ~= false then end end -return storeRepeatableJob(repeatKey, nextMillis, ARGV[2]) +return storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts) diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index bf8a6ac0be..44a4945006 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -7,6 +7,7 @@ export * from './debounce-options'; export * from './flow-job'; export * from './ioredis-events'; export * from './job-json'; +export * from './job-scheduler-json'; export * from './keep-jobs'; export * from './metrics-options'; export * from './metrics'; diff --git a/src/interfaces/job-scheduler-json.ts b/src/interfaces/job-scheduler-json.ts new file mode 100644 index 0000000000..90796f5dd5 --- /dev/null +++ b/src/interfaces/job-scheduler-json.ts @@ -0,0 +1,18 @@ +import { JobsOptions } from '../types'; + +export interface JobSchedulerTemplateJson { + data?: D; + opts?: Omit; +} + +export interface JobSchedulerJson { + key: string; // key is actually the job scheduler id + name: string; + id?: string | null; + endDate: number | null; + tz: string | null; + pattern: string | null; + every?: string | null; + next?: number; + template?: JobSchedulerTemplateJson; +} diff --git a/src/utils.ts b/src/utils.ts index 4efa2e3467..1bf2eac69c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -17,6 +17,7 @@ import { EventEmitter } from 'events'; import * as semver from 'semver'; import { SpanKind, TelemetryAttributes } from './enums'; +import { JobsOptions, RedisJobOptions } from './types'; export const errorObject: { [index: string]: any } = { value: null }; @@ -270,6 +271,57 @@ export const toString = (value: any): string => { export const QUEUE_EVENT_SUFFIX = ':qe'; +const optsDecodeMap = { + de: 'deduplication', + fpof: 'failParentOnFailure', + idof: 'ignoreDependencyOnFailure', + kl: 'keepLogs', + rdof: 'removeDependencyOnFailure', + tm: 'telemetryMetadata', +}; + +const optsEncodeMap = invertObject(optsDecodeMap); +optsEncodeMap.debounce = 'de'; + +export function optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { + const optionEntries = Object.entries(opts) as Array<[keyof JobsOptions, any]>; + const options: Partial> = {}; + for (const item of optionEntries) { + const [attributeName, value] = item; + if (value !== undefined) { + if ((optsEncodeMap as Record)[attributeName]) { + options[(optsEncodeMap as Record)[attributeName]] = + value; + } else { + options[attributeName] = value; + } + } + } + + return options as RedisJobOptions; +} + +export function optsFromJSON(rawOpts?: string): JobsOptions { + const opts = JSON.parse(rawOpts || '{}'); + + const optionEntries = Object.entries(opts) as Array< + [keyof RedisJobOptions, any] + >; + + const options: Partial> = {}; + for (const item of optionEntries) { + const [attributeName, value] = item; + if ((optsDecodeMap as Record)[attributeName]) { + options[(optsDecodeMap as Record)[attributeName]] = + value; + } else { + options[attributeName] = value; + } + } + + return options as JobsOptions; +} + export function removeUndefinedFields>( obj: Record, ) { diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index a1093895fd..f0f3244af8 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -360,6 +360,11 @@ describe('Job Scheduler', function () { tz: null, pattern: '*/2 * * * * *', every: null, + template: { + data: { + foo: 'bar', + }, + }, }); this.clock.tick(nextTick); @@ -687,6 +692,17 @@ describe('Job Scheduler', function () { name: 'rrule', }); + const scheduler = await queue.getJobScheduler('rrule'); + + expect(scheduler).to.deep.equal({ + key: 'rrule', + name: 'rrule', + endDate: null, + tz: null, + pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=2;WKST=MO', + every: null, + }); + this.clock.tick(nextTick); let prev: any;