diff --git a/examples/kv_dequeue.ts b/examples/kv_dequeue.ts new file mode 100644 index 0000000..19ef5e2 --- /dev/null +++ b/examples/kv_dequeue.ts @@ -0,0 +1,8 @@ +import { KvMessageQueue } from "../message_queue/implementations/kv/mod.ts"; + +const queue = new KvMessageQueue("test", { path: "./queue" }); + +for await (const event of queue) { + console.log(event.data); + await event.deferred.resolve(); +} diff --git a/examples/kv_enqueue.ts b/examples/kv_enqueue.ts new file mode 100644 index 0000000..fc6c4e1 --- /dev/null +++ b/examples/kv_enqueue.ts @@ -0,0 +1,8 @@ +import { KvMessageQueue } from "../message_queue/implementations/kv/mod.ts"; + +const queue = new KvMessageQueue("test", { path: "./queue" }); + +while (true) { + await queue.queueMessage({ t: Math.random() }); +} +//await queue.close(); diff --git a/message_queue/implementations/kv/mod.ts b/message_queue/implementations/kv/mod.ts new file mode 100644 index 0000000..97a6c70 --- /dev/null +++ b/message_queue/implementations/kv/mod.ts @@ -0,0 +1,209 @@ +import { MessageEvent } from "../../message_event.ts"; +import { MessageQueue, MessageQueueOptions } from "../../message_queue.ts"; +import { deferred } from "../../../deps.ts"; + +type DenoKvEnqueueOptions = Parameters[1]; + +type KvMessageQueueMessage = { + queue: string; + options: DenoKvEnqueueOptions; + timestamp: number; + origin: string; + contentType?: string; + contentEncoding?: string; + data: unknown; +}; + +export type KvMessageQueueOptions = + & MessageQueueOptions + & ( + | { + kv: Deno.Kv; + } + | { + path?: string; + } + ); + +// deno-lint-ignore no-explicit-any +export class KvMessageQueue extends MessageQueue { + #kv?: Deno.Kv; + #initialized = false; + #initialization: Promise; + #closed = false; + #deferred = deferred>(); + + get kv() { + return this.#kv; + } + + get closed() { + return this.#closed; + } + + get ready(): boolean { + return this.#initialized && !this.closed && this.kv != undefined; + } + + constructor(name: string, options: KvMessageQueueOptions = {}) { + super(name, options); + this.#initialization = this.#initialize(options); + } + + async #initialize(options: KvMessageQueueOptions) { + if ("kv" in options) { + this.#kv = options.kv; + } else { + this.#kv = await Deno.openKv(options.path); + } + + this.#initialized = true; + } + + #consume(message: KvMessageQueueMessage): void { + if (message.origin === this.id) { + throw new Error( + "The KvMessageQueue recieved its own message, this should be unreachable", + ); + } + + if ( + this.encoderDecoder && + this.encoderDecoder.contentEncoding !== message.contentEncoding + ) { + throw new TypeError( + `Expected message to have the contentEncoding ${this.encoderDecoder?.contentEncoding} but found ${message.contentEncoding}`, + ); + } + + if ( + this.serializerDeserializer && + this.serializerDeserializer.contentType !== message.contentType + ) { + throw new TypeError( + `Expected message to have the contentType ${this.encoderDecoder?.contentEncoding} but found ${message.contentEncoding}`, + ); + } + + const settle = deferred(); + const event = new MessageEvent({ + data: this.serializerDeserializer != undefined || + this.encoderDecoder != undefined + ? this.decode(message.data as Uint8Array) + : (message.data as T), + origin: message.origin, + settle, + }); + + let requeue = true; + event.deferred + .catch((shouldRequeue) => { + requeue = shouldRequeue; + }) + .finally(async () => { + this.#assertReady(); + + switch (event.deferred.state) { + case "fulfilled": { + break; + } + case "rejected": { + if (!requeue) { + break; + } + + settle.resolve(); + this.#deferred.resolve(event); + throw new Error("KvMessageQueue rejection exception"); + } + case "pending": + case "ignored": { + await this.kv.enqueue( + message, + message.options, + ); + break; + } + } + + settle.resolve(); + }); + + this.#deferred.resolve(event); + } + + #assertOpen(): asserts this is this & { closed: false } { + if (this.#closed) { + throw new Error("This KvMessageQueue has already been closed"); + } + } + + async #ensureInitialized() { + if (!this.#initialized) { + await this.#initialization; + } + } + + #assertReady(): asserts this is this & { + ready: true; + closed: false; + kv: Deno.Kv; + } { + if (!this.ready) { + throw new Error("This KvMessageQueue is not ready for an unknown reason"); + } + } + + // deno-lint-ignore require-await + async close(): Promise { + if (this.kv) { + this.kv.close(); + } + + this.#closed = true; + } + + async queueMessage( + message: T, + options?: DenoKvEnqueueOptions, + ): Promise { + const data = this.serializerDeserializer != undefined || + this.encoderDecoder != undefined + ? this.encode(message) + : message; + + this.#assertOpen(); + await this.#ensureInitialized(); + this.#assertReady(); + + await this.kv.enqueue( + { + queue: this.name, + contentType: this.serializerDeserializer?.contentType, + contentEncoding: this.encoderDecoder?.contentEncoding, + timestamp: Date.now(), + origin: this.id, + options, + data, + } satisfies KvMessageQueueMessage, + options, + ); + } + + async *[Symbol.asyncIterator](): AsyncIterator> { + this.#assertOpen(); + await this.#ensureInitialized(); + this.#assertReady(); + + const listener = this.kv.listenQueue( + this.#consume.bind(this) as (message: unknown) => Promise, + ); + + while (this.ready) { + yield await this.#deferred; + this.#deferred = deferred(); + } + + await listener; + } +} diff --git a/message_queue/message_queue.ts b/message_queue/message_queue.ts index a745395..5046843 100644 --- a/message_queue/message_queue.ts +++ b/message_queue/message_queue.ts @@ -2,15 +2,12 @@ import { EncoderDecoder } from "../encoder_decoder/types.ts"; import { SerializerDeserializer } from "../serializer_deserializer/types.ts"; import { MessageEvent } from "./message_event.ts"; -export type MessageQueueOptions = - & (T extends Uint8Array - ? { serializerDeserializer?: SerializerDeserializer } - : { serializerDeserializer: SerializerDeserializer }) - & { - encoderDecoder?: EncoderDecoder; - maxListenersPerEvent?: number; - id?: string; - }; +export type MessageQueueOptions = { + serializerDeserializer?: SerializerDeserializer; +} & { + encoderDecoder?: EncoderDecoder; + id?: string; +}; export type MaybeSerializerDeserializer = T extends Uint8Array ? SerializerDeserializer | undefined @@ -40,7 +37,7 @@ export abstract class MessageQueue return this.#id; } - constructor(name: string, options: MessageQueueOptions) { + constructor(name: string, options: MessageQueueOptions = {}) { this.#name = name; this.#encoderDecoder = options.encoderDecoder; this.#serializerDeserializer = options @@ -85,7 +82,7 @@ export abstract class MessageQueue } abstract close(): Promise; - abstract queueMessage(message: T): Promise; + abstract queueMessage(message: T, options?: unknown): Promise; abstract [Symbol.asyncIterator](): AsyncIterator>; }