Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Serialize type #2578

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
RedisClient,
WorkerOptions,
} from '../interfaces';
import { MinimalQueue } from '../types';
import { MinimalQueue, Serialize } from '../types';
import {
delay,
DELAY_TIME_1,
Expand Down Expand Up @@ -55,7 +55,7 @@ export interface WorkerListener<
*
* This event is triggered when a job enters the 'active' state.
*/
active: (job: Job<DataType, ResultType, NameType>, prev: string) => void;
active: (job: Job<Serialize<DataType>, ResultType, NameType>, prev: string) => void;

/**
* Listen to 'closing' event.
Expand All @@ -77,7 +77,7 @@ export interface WorkerListener<
* This event is triggered when a job has successfully completed.
*/
completed: (
job: Job<DataType, ResultType, NameType>,
job: Job<Serialize<DataType>, ResultType, NameType>,
result: ResultType,
prev: string,
) => void;
Expand Down Expand Up @@ -106,7 +106,7 @@ export interface WorkerListener<
* reaches the stalled limit and it is deleted by the removeOnFail option.
*/
failed: (
job: Job<DataType, ResultType, NameType> | undefined,
job: Job<Serialize<DataType>, ResultType, NameType> | undefined,
error: Error,
prev: string,
) => void;
Expand All @@ -127,7 +127,7 @@ export interface WorkerListener<
* world.
*/
progress: (
job: Job<DataType, ResultType, NameType>,
job: Job<Serialize<DataType>, ResultType, NameType>,
progress: number | object,
) => void;

Expand Down Expand Up @@ -171,7 +171,7 @@ export class Worker<

private abortDelayController: AbortController | null = null;
private asyncFifoQueue: AsyncFifoQueue<void | Job<
DataType,
Serialize<DataType>,
ResultType,
NameType
>>;
Expand All @@ -187,7 +187,7 @@ export class Worker<
private _repeat: Repeat;

protected paused: Promise<void>;
protected processFn: Processor<DataType, ResultType, NameType>;
protected processFn: Processor<Serialize<DataType>, ResultType, NameType>;
protected running = false;

static RateLimitError(): Error {
Expand All @@ -196,7 +196,7 @@ export class Worker<

constructor(
name: string,
processor?: string | URL | null | Processor<DataType, ResultType, NameType>,
processor?: string | URL | null | Processor<Serialize<DataType>, ResultType, NameType>,
opts?: WorkerOptions,
Connection?: typeof RedisConnection,
) {
Expand Down Expand Up @@ -289,7 +289,7 @@ export class Worker<
useWorkerThreads: this.opts.useWorkerThreads,
});

this.processFn = sandbox<DataType, ResultType, NameType>(
this.processFn = sandbox<Serialize<DataType>, ResultType, NameType>(
processor,
this.childPool,
).bind(this);
Expand Down Expand Up @@ -348,7 +348,7 @@ export class Worker<
}

protected callProcessJob(
job: Job<DataType, ResultType, NameType>,
job: Job<Serialize<DataType>, ResultType, NameType>,
token: string,
): Promise<ResultType> {
return this.processFn(job, token);
Expand All @@ -357,9 +357,9 @@ export class Worker<
protected createJob(
data: JobJsonRaw,
jobId: string,
): Job<DataType, ResultType, NameType> {
): Job<Serialize<DataType>, ResultType, NameType> {
return this.Job.fromJSON(this as MinimalQueue, data, jobId) as Job<
DataType,
Serialize<DataType>,
ResultType,
NameType
>;
Expand Down Expand Up @@ -423,7 +423,7 @@ export class Worker<
this.startLockExtenderTimer(jobsInProgress);

const asyncFifoQueue = (this.asyncFifoQueue =
new AsyncFifoQueue<void | Job<DataType, ResultType, NameType>>());
new AsyncFifoQueue<void | Job<Serialize<DataType>, ResultType, NameType>>());

let tokenPostfix = 0;

Expand All @@ -450,7 +450,7 @@ export class Worker<
const token = `${this.id}:${tokenPostfix++}`;

const fetchedJob = this.retryIfFailed<void | Job<
DataType,
Serialize<DataType>,
ResultType,
NameType
>>(
Expand Down Expand Up @@ -484,18 +484,18 @@ export class Worker<

// Since there can be undefined jobs in the queue (when a job fails or queue is empty)
// we iterate until we find a job.
let job: Job<DataType, ResultType, NameType> | void;
let job: Job<Serialize<DataType>, ResultType, NameType> | void;
do {
job = await asyncFifoQueue.fetch();
} while (!job && asyncFifoQueue.numQueued() > 0);

if (job) {
const token = job.token;
asyncFifoQueue.add(
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
this.retryIfFailed<void | Job<Serialize<DataType>, ResultType, NameType>>(
() =>
this.processJob(
<Job<DataType, ResultType, NameType>>job,
<Job<Serialize<DataType>, ResultType, NameType>>job,
token,
() => asyncFifoQueue.numTotal() <= this.opts.concurrency,
jobsInProgress,
Expand Down Expand Up @@ -533,7 +533,7 @@ export class Worker<
bclient: RedisClient,
token: string,
{ block = true }: GetNextJobOptions = {},
): Promise<Job<DataType, ResultType, NameType> | undefined> {
): Promise<Job<Serialize<DataType>, ResultType, NameType> | undefined> {
if (this.paused) {
if (block) {
await this.paused;
Expand Down Expand Up @@ -608,7 +608,7 @@ will never work with more accuracy than 1ms. */
client: RedisClient,
token: string,
name?: string,
): Promise<Job<DataType, ResultType, NameType>> {
): Promise<Job<Serialize<DataType>, ResultType, NameType>> {
const [jobData, id, limitUntil, delayUntil] =
await this.scripts.moveToActive(client, token, name);
this.updateDelays(limitUntil, delayUntil);
Expand Down Expand Up @@ -719,7 +719,7 @@ will never work with more accuracy than 1ms. */
jobData?: JobJsonRaw,
jobId?: string,
token?: string,
): Promise<Job<DataType, ResultType, NameType>> {
): Promise<Job<Serialize<DataType>, ResultType, NameType>> {
if (!jobData) {
if (!this.drained) {
this.emit('drained');
Expand All @@ -740,11 +740,11 @@ will never work with more accuracy than 1ms. */
}

async processJob(
job: Job<DataType, ResultType, NameType>,
job: Job<Serialize<DataType>, ResultType, NameType>,
token: string,
fetchNextCallback = () => true,
jobsInProgress: Set<{ job: Job; ts: number }>,
): Promise<void | Job<DataType, ResultType, NameType>> {
): Promise<void | Job<Serialize<DataType>, ResultType, NameType>> {
if (!job || this.closing || this.paused) {
return;
}
Expand Down Expand Up @@ -1051,10 +1051,10 @@ will never work with more accuracy than 1ms. */

stalled.forEach((jobId: string) => this.emit('stalled', jobId, 'active'));

const jobPromises: Promise<Job<DataType, ResultType, NameType>>[] = [];
const jobPromises: Promise<Job<Serialize<DataType>, ResultType, NameType>>[] = [];
for (let i = 0; i < failed.length; i++) {
jobPromises.push(
Job.fromId<DataType, ResultType, NameType>(
Job.fromId<Serialize<DataType>, ResultType, NameType>(
this as MinimalQueue,
failed[i],
),
Expand All @@ -1069,8 +1069,8 @@ will never work with more accuracy than 1ms. */
this.notifyFailedJobs(await Promise.all(jobPromises));
}

private notifyFailedJobs(failedJobs: Job<DataType, ResultType, NameType>[]) {
failedJobs.forEach((job: Job<DataType, ResultType, NameType>) =>
private notifyFailedJobs(failedJobs: Job<Serialize<DataType>, ResultType, NameType>[]) {
failedJobs.forEach((job: Job<Serialize<DataType>, ResultType, NameType>) =>
this.emit(
'failed',
job,
Expand All @@ -1081,7 +1081,7 @@ will never work with more accuracy than 1ms. */
}

private moveLimitedBackToWait(
job: Job<DataType, ResultType, NameType>,
job: Job<Serialize<DataType>, ResultType, NameType>,
token: string,
) {
return this.scripts.moveJobFromActiveToWait(job.id, token);
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from './job-json-sandbox';
export * from './job-options';
export * from './job-type';
export * from './repeat-strategy';
export * from './serialize';
146 changes: 146 additions & 0 deletions src/types/serialize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* The `Serialize` type emulates the operation `JSON.parse(JSON.stringify(x))` done
* when passing the `data` to a `Job` in a `Worker`.
*/
export type Serialize<T> =
/**
* If the input is any then the output must as well be any
*/
IsAny<T> extends true
? any
: /**
* Under the hood, JSON.stringify calls the `toJSON()` method on his parameter.
*/
T extends { toJSON(): infer U }
? U extends JsonValue
? U
: unknown
: /**
* Primitives
*/
T extends JsonPrimitive
? T
: /**
* Primitive wrappers
*/
T extends String
? string
: T extends Number
? number
: T extends Boolean
? boolean
: /**
* JSON.stringify returns always `{}` for a `Promise`
*/
T extends Promise<unknown>
? EmptyObject
: /**
* Map
*/
T extends Map<unknown, unknown>
? EmptyObject
: /**
* Set
*/
T extends Set<unknown>
? EmptyObject
: /**
* Array views
*/
T extends TypedArray
? Record<string, number>
: /**
* Some object can't be serialized, so we remove them.
*/
T extends NotJson
? never
: /**
* Arrays
*/
T extends []
? []
: T extends readonly [infer F, ...infer R]
? [NeverToNull<Serialize<F>>, ...Serialize<R>]
: T extends readonly unknown[]
? Array<NeverToNull<Serialize<T[number]>>>
: /**
* Objects
*/
T extends Record<keyof unknown, unknown>
? Prettify<SerializeObject<T>>
: /**
* Unknown
*/
unknown extends T
? unknown
: never;

/**
* Some utils.
*/

type NotJson = undefined | symbol | ((...args: any[]) => unknown);

// value is always not JSON => true
// value is always JSON => false
// value is somtimes JSON, sometimes not JSON => boolean
// note: cannot be inlined as logic requires union distribution
type ValueIsNotJson<T> = T extends NotJson ? true : false;

// note: remove optionality so that produced values are never `undefined`,
// only `true`, `false`, or `boolean`
type IsNotJson<T> = { [K in keyof T]-?: ValueIsNotJson<T[K]> };

type SerializeValues<T> = { [K in keyof T]: Serialize<T[K]> };

type SerializeObject<T extends Record<keyof unknown, unknown>> =
// required
{
[K in keyof T as unknown extends T[K]
? never
: IsNotJson<T>[K] extends false
? K
: never]: SerializeValues<T>[K];
} & {
// optional
[K in keyof T as unknown extends T[K]
? K
: // if the value is always JSON, then it's not optional
IsNotJson<T>[K] extends false
? never
: // if the value is always not JSON, omit it entirely
IsNotJson<T>[K] extends true
? never
: // if the value is mixed, then it's optional
K]?: SerializeValues<T>[K];
};

type JsonPrimitive = string | number | boolean | null;

type JsonArray = JsonValue[] | readonly JsonValue[];

type JsonObject = { [K in string]: JsonValue } & { [K in string]?: JsonValue };

type JsonValue = JsonPrimitive | JsonObject | JsonArray;

type TypedArray =
| Int8Array
| Uint8Array
| Uint8ClampedArray
| Int16Array
| Uint16Array
| Int32Array
| Uint32Array
| Float32Array
| Float64Array
| BigInt64Array
| BigUint64Array;

type Prettify<T> = { [K in keyof T]: T[K] } & {};

type NeverToNull<T> = [T] extends [never] ? null : T;

declare const emptyObjectSymbol: unique symbol;
type EmptyObject = { [emptyObjectSymbol]?: never };

type IsAny<T> = 0 extends 1 & T ? true : false;
Loading