Skip to content

Commit

Permalink
feat: deno kv queue support (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
eliassjogreen authored Aug 25, 2023
1 parent 09e7cbc commit 8483a58
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 11 deletions.
8 changes: 8 additions & 0 deletions examples/kv_dequeue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { KvMessageQueue } from "../message_queue/implementations/kv/mod.ts";

const queue = new KvMessageQueue("test", { path: "./queue" });

for await (const event of queue) {
console.log(event.data);
await event.deferred.resolve();
}
8 changes: 8 additions & 0 deletions examples/kv_enqueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { KvMessageQueue } from "../message_queue/implementations/kv/mod.ts";

const queue = new KvMessageQueue("test", { path: "./queue" });

while (true) {
await queue.queueMessage({ t: Math.random() });
}
//await queue.close();
209 changes: 209 additions & 0 deletions message_queue/implementations/kv/mod.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import { MessageEvent } from "../../message_event.ts";
import { MessageQueue, MessageQueueOptions } from "../../message_queue.ts";
import { deferred } from "../../../deps.ts";

type DenoKvEnqueueOptions = Parameters<Deno.Kv["enqueue"]>[1];

type KvMessageQueueMessage = {
queue: string;
options: DenoKvEnqueueOptions;
timestamp: number;
origin: string;
contentType?: string;
contentEncoding?: string;
data: unknown;
};

export type KvMessageQueueOptions<T> =
& MessageQueueOptions<T>
& (
| {
kv: Deno.Kv;
}
| {
path?: string;
}
);

// deno-lint-ignore no-explicit-any
export class KvMessageQueue<T = any> extends MessageQueue<T> {
#kv?: Deno.Kv;
#initialized = false;
#initialization: Promise<void>;
#closed = false;
#deferred = deferred<MessageEvent<T>>();

get kv() {
return this.#kv;
}

get closed() {
return this.#closed;
}

get ready(): boolean {
return this.#initialized && !this.closed && this.kv != undefined;
}

constructor(name: string, options: KvMessageQueueOptions<T> = {}) {
super(name, options);
this.#initialization = this.#initialize(options);
}

async #initialize(options: KvMessageQueueOptions<T>) {
if ("kv" in options) {
this.#kv = options.kv;
} else {
this.#kv = await Deno.openKv(options.path);
}

this.#initialized = true;
}

#consume(message: KvMessageQueueMessage): void {
if (message.origin === this.id) {
throw new Error(
"The KvMessageQueue recieved its own message, this should be unreachable",
);
}

if (
this.encoderDecoder &&
this.encoderDecoder.contentEncoding !== message.contentEncoding
) {
throw new TypeError(
`Expected message to have the contentEncoding ${this.encoderDecoder?.contentEncoding} but found ${message.contentEncoding}`,
);
}

if (
this.serializerDeserializer &&
this.serializerDeserializer.contentType !== message.contentType
) {
throw new TypeError(
`Expected message to have the contentType ${this.encoderDecoder?.contentEncoding} but found ${message.contentEncoding}`,
);
}

const settle = deferred<void>();
const event = new MessageEvent<T>({
data: this.serializerDeserializer != undefined ||
this.encoderDecoder != undefined
? this.decode(message.data as Uint8Array)
: (message.data as T),
origin: message.origin,
settle,
});

let requeue = true;
event.deferred
.catch((shouldRequeue) => {
requeue = shouldRequeue;
})
.finally(async () => {
this.#assertReady();

switch (event.deferred.state) {
case "fulfilled": {
break;
}
case "rejected": {
if (!requeue) {
break;
}

settle.resolve();
this.#deferred.resolve(event);
throw new Error("KvMessageQueue rejection exception");
}
case "pending":
case "ignored": {
await this.kv.enqueue(
message,
message.options,
);
break;
}
}

settle.resolve();
});

this.#deferred.resolve(event);
}

#assertOpen(): asserts this is this & { closed: false } {
if (this.#closed) {
throw new Error("This KvMessageQueue has already been closed");
}
}

async #ensureInitialized() {
if (!this.#initialized) {
await this.#initialization;
}
}

#assertReady(): asserts this is this & {
ready: true;
closed: false;
kv: Deno.Kv;
} {
if (!this.ready) {
throw new Error("This KvMessageQueue is not ready for an unknown reason");
}
}

// deno-lint-ignore require-await
async close(): Promise<void> {
if (this.kv) {
this.kv.close();
}

this.#closed = true;
}

async queueMessage(
message: T,
options?: DenoKvEnqueueOptions,
): Promise<void> {
const data = this.serializerDeserializer != undefined ||
this.encoderDecoder != undefined
? this.encode(message)
: message;

this.#assertOpen();
await this.#ensureInitialized();
this.#assertReady();

await this.kv.enqueue(
{
queue: this.name,
contentType: this.serializerDeserializer?.contentType,
contentEncoding: this.encoderDecoder?.contentEncoding,
timestamp: Date.now(),
origin: this.id,
options,
data,
} satisfies KvMessageQueueMessage,
options,
);
}

async *[Symbol.asyncIterator](): AsyncIterator<MessageEvent<T>> {
this.#assertOpen();
await this.#ensureInitialized();
this.#assertReady();

const listener = this.kv.listenQueue(
this.#consume.bind(this) as (message: unknown) => Promise<void>,
);

while (this.ready) {
yield await this.#deferred;
this.#deferred = deferred();
}

await listener;
}
}
19 changes: 8 additions & 11 deletions message_queue/message_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@ import { EncoderDecoder } from "../encoder_decoder/types.ts";
import { SerializerDeserializer } from "../serializer_deserializer/types.ts";
import { MessageEvent } from "./message_event.ts";

export type MessageQueueOptions<T> =
& (T extends Uint8Array
? { serializerDeserializer?: SerializerDeserializer<T> }
: { serializerDeserializer: SerializerDeserializer<T> })
& {
encoderDecoder?: EncoderDecoder;
maxListenersPerEvent?: number;
id?: string;
};
export type MessageQueueOptions<T> = {
serializerDeserializer?: SerializerDeserializer<T>;
} & {
encoderDecoder?: EncoderDecoder;
id?: string;
};

export type MaybeSerializerDeserializer<T> = T extends Uint8Array
? SerializerDeserializer<T> | undefined
Expand Down Expand Up @@ -40,7 +37,7 @@ export abstract class MessageQueue<T = any>
return this.#id;
}

constructor(name: string, options: MessageQueueOptions<T>) {
constructor(name: string, options: MessageQueueOptions<T> = {}) {
this.#name = name;
this.#encoderDecoder = options.encoderDecoder;
this.#serializerDeserializer = options
Expand Down Expand Up @@ -85,7 +82,7 @@ export abstract class MessageQueue<T = any>
}

abstract close(): Promise<void>;
abstract queueMessage(message: T): Promise<void>;
abstract queueMessage(message: T, options?: unknown): Promise<void>;

abstract [Symbol.asyncIterator](): AsyncIterator<MessageEvent<T>>;
}

0 comments on commit 8483a58

Please sign in to comment.