Skip to content

Commit

Permalink
refactor: remove event in favour of a simpler solution (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
eliassjogreen authored May 21, 2023
1 parent 5f800b3 commit 09e7cbc
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 88 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ This example demonstrates how to use the AMQP message queue with the json
serializer and deserializer:

```ts
import { AMQPMessageQueue } from "https://deno.land/x/mess/message_queue/amqp.ts";
import { AMQPMessageQueue } from "https://deno.land/x/mess/message_queue/implementations/amqp/mod.ts";
import * as json from "https://deno.land/x/mess/serializer_deserializer/json.ts";

const queue = new AMQPMessageQueue("test", {
serializerDeserializer: json,
connection: Deno.env.get("AMQP"),
serializerDeserializer: deno,
connection: "amqp://guest:guest@localhost:5672",
});

for await (const [event] of queue.on("message")) {
for await (const event of queue) {
console.log(event.data);
event.deferred.resolve();
await event.deferred.resolve();
}
```

Expand Down
3 changes: 1 addition & 2 deletions deps.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from "https://deno.land/[email protected]/async/mod.ts";
export * from "https://deno.land/x/[email protected]/mod.ts";
export * from "https://deno.land/[email protected]/async/mod.ts";
12 changes: 12 additions & 0 deletions examples/amqp_dequeue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { AMQPMessageQueue } from "../message_queue/implementations/amqp/mod.ts";
import * as deno from "../serializer_deserializer/deno.ts";

const queue = new AMQPMessageQueue("test", {
serializerDeserializer: deno,
connection: "amqp://guest:guest@localhost:5672",
});

for await (const event of queue) {
console.log(event.data);
await event.deferred.resolve();
}
12 changes: 12 additions & 0 deletions examples/amqp_enqueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { AMQPMessageQueue } from "../message_queue/implementations/amqp/mod.ts";
import * as deno from "../serializer_deserializer/deno.ts";

const queue = new AMQPMessageQueue("test", {
serializerDeserializer: deno,
connection: "amqp://guest:guest@localhost:5672",
});

while (true) {
await queue.queueMessage({ t: Math.random() });
}
//await queue.close();
12 changes: 0 additions & 12 deletions examples/simple_amqp.ts

This file was deleted.

155 changes: 102 additions & 53 deletions message_queue/amqp.ts → message_queue/implementations/amqp/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import {
connect,
} from "https://deno.land/x/[email protected]/mod.ts";

import { MessageEvent } from "./message_event.ts";
import { MessageQueue, MessageQueueOptions } from "./message_queue.ts";
import { MessageEvent } from "../../message_event.ts";
import { MessageQueue, MessageQueueOptions } from "../../message_queue.ts";
import { deferred } from "../../../deps.ts";

export type AMQPMessageQueueOptions<T> = MessageQueueOptions<T> & {
connection?: AmqpConnectOptions | string;
Expand All @@ -21,10 +22,27 @@ export class AMQPMessageQueue<T = any> extends MessageQueue<T> {
#initialized = false;
#initialization: Promise<void>;
#closed = false;
#deferred = deferred<MessageEvent<T>>();

get #ready(): boolean {
return this.#initialized && !this.#closed &&
this.#connection != undefined && this.#channel != undefined;
get connection() {
return this.#connection;
}

get channel() {
return this.#channel;
}

get closed() {
return this.#closed;
}

get ready(): boolean {
return (
this.#initialized &&
!this.closed &&
this.connection != undefined &&
this.channel != undefined
);
}

constructor(name: string, options: AMQPMessageQueueOptions<T>) {
Expand All @@ -35,25 +53,16 @@ export class AMQPMessageQueue<T = any> extends MessageQueue<T> {
async #initialize(options: AMQPMessageQueueOptions<T>) {
// @ts-ignore This is actually ok, but technically not according to typescript
this.#connection = await connect(options.connection);
this.#connection.closed().then(() => this.#closed = true);
this.#connection.closed().then(() => (this.#closed = true));

this.#channel = await this.#connection.openChannel();
this.#channel.closed().then(() => this.#closed = true);
this.#channel.closed().then(() => (this.#closed = true));
await this.#channel.declareQueue({ queue: this.name });

this.#channel.consume(
{ queue: this.name, consumerTag: this.id },
this.#consume.bind(this),
);

this.#initialized = true;
}

async #consume(
args: BasicDeliver,
props: BasicProperties,
data: Uint8Array,
): Promise<void> {
#consume(args: BasicDeliver, props: BasicProperties, data: Uint8Array): void {
if (
props.headers &&
"origin" in props.headers &&
Expand All @@ -74,66 +83,90 @@ export class AMQPMessageQueue<T = any> extends MessageQueue<T> {
);
}

const settle = deferred<void>();
const event = new MessageEvent<T>({
data: this.decode(data),
origin: args.consumerTag,
settle,
});

await this.emit("message", event);

let requeue = true;
try {
await event.deferred;
} catch (shouldRequeue) {
requeue = shouldRequeue;
} finally {
switch (event.deferred.state) {
case "fulfilled": {
await this.#channel!.ack({ deliveryTag: args.deliveryTag });
break;
event.deferred
.catch((shouldRequeue) => {
requeue = shouldRequeue;
})
.finally(async () => {
this.#assertReady();

switch (event.deferred.state) {
case "fulfilled": {
await this.channel.ack({ deliveryTag: args.deliveryTag });
break;
}
case "rejected": {
await this.channel.reject({
deliveryTag: args.deliveryTag,
requeue,
});
break;
}
case "pending":
case "ignored": {
break;
}
}
case "rejected": {
await this.#channel!.reject({
deliveryTag: args.deliveryTag,
requeue,
});
break;
}
case "pending":
case "ignored": {
break;
}
}
}
}

async close(): Promise<void> {
if (this.#ready && this.#connection) {
await this.#connection.close();
}
settle.resolve();
});

await this.off("message");
this.#closed = true;
this.#deferred.resolve(event);
}

async queueMessage(message: T): Promise<void> {
const data = this.encode(message);

#assertOpen(): asserts this is this & { closed: false } {
if (this.#closed) {
throw new Error("This AMQPMessageQueue has already been closed");
}
}

async #ensureInitialized() {
if (!this.#initialized) {
await this.#initialization;
}
}

if (!this.#ready) {
#assertReady(): asserts this is this & {
ready: true;
closed: false;
connection: AmqpConnection;
channel: AmqpChannel;
} {
if (!this.ready) {
throw new Error(
"This AMQPMessageQueue is not ready for an unknown reason",
);
}
}

async close(): Promise<void> {
if (this.channel) {
await this.channel.close();
}

if (this.connection) {
await this.connection.close();
}

this.#closed = true;
}

async queueMessage(message: T): Promise<void> {
const data = this.encode(message);

this.#assertOpen();
await this.#ensureInitialized();
this.#assertReady();

await this.#channel!.publish(
await this.channel.publish(
{ routingKey: this.name },
{
contentType: this.serializerDeserializer?.contentType,
Expand All @@ -144,4 +177,20 @@ export class AMQPMessageQueue<T = any> extends MessageQueue<T> {
data,
);
}

async *[Symbol.asyncIterator](): AsyncIterator<MessageEvent<T>> {
this.#assertOpen();
await this.#ensureInitialized();
this.#assertReady();

await this.channel.consume(
{ queue: this.name, consumerTag: this.id },
this.#consume.bind(this),
);

while (this.ready) {
yield await this.#deferred;
this.#deferred = deferred();
}
}
}
3 changes: 2 additions & 1 deletion message_queue/message_event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
export type MessageEventInit<T> = {
data: T;
origin: string;
settle: Promise<void>;
};

export class MessageEvent<T> {
Expand All @@ -28,6 +29,6 @@ export class MessageEvent<T> {
constructor(init: MessageEventInit<T>) {
this.#data = init.data;
this.#origin = init.origin;
this.#deferred = messageEventDeferred();
this.#deferred = messageEventDeferred(init.settle);
}
}
19 changes: 12 additions & 7 deletions message_queue/message_event_deferred.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,36 @@ export type MessageEventDeferredState =
export interface MessageEventDeferred extends Promise<void> {
readonly state: MessageEventDeferredState;

resolve(): void;
reject(requeue?: boolean): void;
ignore(): void;
resolve(): Promise<void>;
reject(requeue?: boolean): Promise<void>;
ignore(): Promise<void>;
}

/**
* Creates a Promise with the `reject`, `resolve` and `ignore` functions placed as
* methods on the promise object itself.
*/
export function messageEventDeferred(): MessageEventDeferred {
export function messageEventDeferred(
settle: Promise<void>,
): MessageEventDeferred {
let methods;
let state: MessageEventDeferredState = "pending";
const promise = new Promise<void>((resolve, reject) => {
methods = {
resolve() {
async resolve() {
state = "fulfilled";
resolve();
await settle;
},
reject(requeue?: boolean) {
async reject(requeue?: boolean) {
state = "rejected";
reject(requeue ?? true);
await settle;
},
ignore() {
async ignore() {
state = "ignored";
resolve();
await settle;
},
};
});
Expand Down
11 changes: 3 additions & 8 deletions message_queue/message_queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { EventEmitter } from "../deps.ts";
import { EncoderDecoder } from "../encoder_decoder/types.ts";
import { SerializerDeserializer } from "../serializer_deserializer/types.ts";
import { MessageEvent } from "./message_event.ts";
Expand All @@ -17,13 +16,9 @@ export type MaybeSerializerDeserializer<T> = T extends Uint8Array
? SerializerDeserializer<T> | undefined
: SerializerDeserializer<T>;

export type MessageQueueEvents<T> = {
"message": [MessageEvent<T>];
};

// deno-lint-ignore no-explicit-any
export abstract class MessageQueue<T = any>
extends EventEmitter<MessageQueueEvents<T>> {
implements AsyncIterable<MessageEvent<T>> {
#name: string;
#encoderDecoder?: EncoderDecoder;
#serializerDeserializer: MaybeSerializerDeserializer<T>;
Expand All @@ -46,8 +41,6 @@ export abstract class MessageQueue<T = any>
}

constructor(name: string, options: MessageQueueOptions<T>) {
super(options.maxListenersPerEvent ?? 1);

this.#name = name;
this.#encoderDecoder = options.encoderDecoder;
this.#serializerDeserializer = options
Expand Down Expand Up @@ -93,4 +86,6 @@ export abstract class MessageQueue<T = any>

abstract close(): Promise<void>;
abstract queueMessage(message: T): Promise<void>;

abstract [Symbol.asyncIterator](): AsyncIterator<MessageEvent<T>>;
}

0 comments on commit 09e7cbc

Please sign in to comment.