diff --git a/README.md b/README.md index 7cfa8d0..c2baafb 100644 --- a/README.md +++ b/README.md @@ -15,17 +15,17 @@ This example demonstrates how to use the AMQP message queue with the json serializer and deserializer: ```ts -import { AMQPMessageQueue } from "https://deno.land/x/mess/message_queue/amqp.ts"; +import { AMQPMessageQueue } from "https://deno.land/x/mess/message_queue/implementations/amqp/mod.ts"; import * as json from "https://deno.land/x/mess/serializer_deserializer/json.ts"; const queue = new AMQPMessageQueue("test", { - serializerDeserializer: json, - connection: Deno.env.get("AMQP"), + serializerDeserializer: deno, + connection: "amqp://guest:guest@localhost:5672", }); -for await (const [event] of queue.on("message")) { +for await (const event of queue) { console.log(event.data); - event.deferred.resolve(); + await event.deferred.resolve(); } ``` diff --git a/deps.ts b/deps.ts index 4b22a07..fa69194 100644 --- a/deps.ts +++ b/deps.ts @@ -1,2 +1 @@ -export * from "https://deno.land/std@0.186.0/async/mod.ts"; -export * from "https://deno.land/x/event@2.0.1/mod.ts"; +export * from "https://deno.land/std@0.188.0/async/mod.ts"; diff --git a/examples/amqp_dequeue.ts b/examples/amqp_dequeue.ts new file mode 100644 index 0000000..5ab4243 --- /dev/null +++ b/examples/amqp_dequeue.ts @@ -0,0 +1,12 @@ +import { AMQPMessageQueue } from "../message_queue/implementations/amqp/mod.ts"; +import * as deno from "../serializer_deserializer/deno.ts"; + +const queue = new AMQPMessageQueue("test", { + serializerDeserializer: deno, + connection: "amqp://guest:guest@localhost:5672", +}); + +for await (const event of queue) { + console.log(event.data); + await event.deferred.resolve(); +} diff --git a/examples/amqp_enqueue.ts b/examples/amqp_enqueue.ts new file mode 100644 index 0000000..2af408b --- /dev/null +++ b/examples/amqp_enqueue.ts @@ -0,0 +1,12 @@ +import { AMQPMessageQueue } from "../message_queue/implementations/amqp/mod.ts"; +import * as deno from "../serializer_deserializer/deno.ts"; + +const queue = new AMQPMessageQueue("test", { + serializerDeserializer: deno, + connection: "amqp://guest:guest@localhost:5672", +}); + +while (true) { + await queue.queueMessage({ t: Math.random() }); +} +//await queue.close(); diff --git a/examples/simple_amqp.ts b/examples/simple_amqp.ts deleted file mode 100644 index ad912cd..0000000 --- a/examples/simple_amqp.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { AMQPMessageQueue } from "../message_queue/amqp.ts"; -import * as deno from "../serializer_deserializer/deno.ts"; - -const queue = new AMQPMessageQueue("test", { - serializerDeserializer: deno, - connection: Deno.env.get("AMQP"), -}); - -for await (const [event] of queue.on("message")) { - console.log(event.data); - event.deferred.resolve(); -} diff --git a/message_queue/amqp.ts b/message_queue/implementations/amqp/mod.ts similarity index 53% rename from message_queue/amqp.ts rename to message_queue/implementations/amqp/mod.ts index cb347e5..e3fdb1a 100644 --- a/message_queue/amqp.ts +++ b/message_queue/implementations/amqp/mod.ts @@ -7,8 +7,9 @@ import { connect, } from "https://deno.land/x/amqp@v0.23.1/mod.ts"; -import { MessageEvent } from "./message_event.ts"; -import { MessageQueue, MessageQueueOptions } from "./message_queue.ts"; +import { MessageEvent } from "../../message_event.ts"; +import { MessageQueue, MessageQueueOptions } from "../../message_queue.ts"; +import { deferred } from "../../../deps.ts"; export type AMQPMessageQueueOptions = MessageQueueOptions & { connection?: AmqpConnectOptions | string; @@ -21,10 +22,27 @@ export class AMQPMessageQueue extends MessageQueue { #initialized = false; #initialization: Promise; #closed = false; + #deferred = deferred>(); - get #ready(): boolean { - return this.#initialized && !this.#closed && - this.#connection != undefined && this.#channel != undefined; + get connection() { + return this.#connection; + } + + get channel() { + return this.#channel; + } + + get closed() { + return this.#closed; + } + + get ready(): boolean { + return ( + this.#initialized && + !this.closed && + this.connection != undefined && + this.channel != undefined + ); } constructor(name: string, options: AMQPMessageQueueOptions) { @@ -35,25 +53,16 @@ export class AMQPMessageQueue extends MessageQueue { async #initialize(options: AMQPMessageQueueOptions) { // @ts-ignore This is actually ok, but technically not according to typescript this.#connection = await connect(options.connection); - this.#connection.closed().then(() => this.#closed = true); + this.#connection.closed().then(() => (this.#closed = true)); this.#channel = await this.#connection.openChannel(); - this.#channel.closed().then(() => this.#closed = true); + this.#channel.closed().then(() => (this.#closed = true)); await this.#channel.declareQueue({ queue: this.name }); - this.#channel.consume( - { queue: this.name, consumerTag: this.id }, - this.#consume.bind(this), - ); - this.#initialized = true; } - async #consume( - args: BasicDeliver, - props: BasicProperties, - data: Uint8Array, - ): Promise { + #consume(args: BasicDeliver, props: BasicProperties, data: Uint8Array): void { if ( props.headers && "origin" in props.headers && @@ -74,66 +83,90 @@ export class AMQPMessageQueue extends MessageQueue { ); } + const settle = deferred(); const event = new MessageEvent({ data: this.decode(data), origin: args.consumerTag, + settle, }); - await this.emit("message", event); - let requeue = true; - try { - await event.deferred; - } catch (shouldRequeue) { - requeue = shouldRequeue; - } finally { - switch (event.deferred.state) { - case "fulfilled": { - await this.#channel!.ack({ deliveryTag: args.deliveryTag }); - break; + event.deferred + .catch((shouldRequeue) => { + requeue = shouldRequeue; + }) + .finally(async () => { + this.#assertReady(); + + switch (event.deferred.state) { + case "fulfilled": { + await this.channel.ack({ deliveryTag: args.deliveryTag }); + break; + } + case "rejected": { + await this.channel.reject({ + deliveryTag: args.deliveryTag, + requeue, + }); + break; + } + case "pending": + case "ignored": { + break; + } } - case "rejected": { - await this.#channel!.reject({ - deliveryTag: args.deliveryTag, - requeue, - }); - break; - } - case "pending": - case "ignored": { - break; - } - } - } - } - async close(): Promise { - if (this.#ready && this.#connection) { - await this.#connection.close(); - } + settle.resolve(); + }); - await this.off("message"); - this.#closed = true; + this.#deferred.resolve(event); } - async queueMessage(message: T): Promise { - const data = this.encode(message); - + #assertOpen(): asserts this is this & { closed: false } { if (this.#closed) { throw new Error("This AMQPMessageQueue has already been closed"); } + } + async #ensureInitialized() { if (!this.#initialized) { await this.#initialization; } + } - if (!this.#ready) { + #assertReady(): asserts this is this & { + ready: true; + closed: false; + connection: AmqpConnection; + channel: AmqpChannel; + } { + if (!this.ready) { throw new Error( "This AMQPMessageQueue is not ready for an unknown reason", ); } + } + + async close(): Promise { + if (this.channel) { + await this.channel.close(); + } + + if (this.connection) { + await this.connection.close(); + } + + this.#closed = true; + } + + async queueMessage(message: T): Promise { + const data = this.encode(message); + + this.#assertOpen(); + await this.#ensureInitialized(); + this.#assertReady(); - await this.#channel!.publish( + await this.channel.publish( { routingKey: this.name }, { contentType: this.serializerDeserializer?.contentType, @@ -144,4 +177,20 @@ export class AMQPMessageQueue extends MessageQueue { data, ); } + + async *[Symbol.asyncIterator](): AsyncIterator> { + this.#assertOpen(); + await this.#ensureInitialized(); + this.#assertReady(); + + await this.channel.consume( + { queue: this.name, consumerTag: this.id }, + this.#consume.bind(this), + ); + + while (this.ready) { + yield await this.#deferred; + this.#deferred = deferred(); + } + } } diff --git a/message_queue/message_event.ts b/message_queue/message_event.ts index e7da4c4..cba478f 100644 --- a/message_queue/message_event.ts +++ b/message_queue/message_event.ts @@ -6,6 +6,7 @@ import { export type MessageEventInit = { data: T; origin: string; + settle: Promise; }; export class MessageEvent { @@ -28,6 +29,6 @@ export class MessageEvent { constructor(init: MessageEventInit) { this.#data = init.data; this.#origin = init.origin; - this.#deferred = messageEventDeferred(); + this.#deferred = messageEventDeferred(init.settle); } } diff --git a/message_queue/message_event_deferred.ts b/message_queue/message_event_deferred.ts index b398e78..6dedcba 100644 --- a/message_queue/message_event_deferred.ts +++ b/message_queue/message_event_deferred.ts @@ -10,31 +10,36 @@ export type MessageEventDeferredState = export interface MessageEventDeferred extends Promise { readonly state: MessageEventDeferredState; - resolve(): void; - reject(requeue?: boolean): void; - ignore(): void; + resolve(): Promise; + reject(requeue?: boolean): Promise; + ignore(): Promise; } /** * Creates a Promise with the `reject`, `resolve` and `ignore` functions placed as * methods on the promise object itself. */ -export function messageEventDeferred(): MessageEventDeferred { +export function messageEventDeferred( + settle: Promise, +): MessageEventDeferred { let methods; let state: MessageEventDeferredState = "pending"; const promise = new Promise((resolve, reject) => { methods = { - resolve() { + async resolve() { state = "fulfilled"; resolve(); + await settle; }, - reject(requeue?: boolean) { + async reject(requeue?: boolean) { state = "rejected"; reject(requeue ?? true); + await settle; }, - ignore() { + async ignore() { state = "ignored"; resolve(); + await settle; }, }; }); diff --git a/message_queue/message_queue.ts b/message_queue/message_queue.ts index 2302a03..a745395 100644 --- a/message_queue/message_queue.ts +++ b/message_queue/message_queue.ts @@ -1,4 +1,3 @@ -import { EventEmitter } from "../deps.ts"; import { EncoderDecoder } from "../encoder_decoder/types.ts"; import { SerializerDeserializer } from "../serializer_deserializer/types.ts"; import { MessageEvent } from "./message_event.ts"; @@ -17,13 +16,9 @@ export type MaybeSerializerDeserializer = T extends Uint8Array ? SerializerDeserializer | undefined : SerializerDeserializer; -export type MessageQueueEvents = { - "message": [MessageEvent]; -}; - // deno-lint-ignore no-explicit-any export abstract class MessageQueue - extends EventEmitter> { + implements AsyncIterable> { #name: string; #encoderDecoder?: EncoderDecoder; #serializerDeserializer: MaybeSerializerDeserializer; @@ -46,8 +41,6 @@ export abstract class MessageQueue } constructor(name: string, options: MessageQueueOptions) { - super(options.maxListenersPerEvent ?? 1); - this.#name = name; this.#encoderDecoder = options.encoderDecoder; this.#serializerDeserializer = options @@ -93,4 +86,6 @@ export abstract class MessageQueue abstract close(): Promise; abstract queueMessage(message: T): Promise; + + abstract [Symbol.asyncIterator](): AsyncIterator>; }