Skip to content

Commit

Permalink
feat: break out (de)serializer (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
eliassjogreen authored May 6, 2023
1 parent 505d3ce commit 5f800b3
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 35 deletions.
2 changes: 1 addition & 1 deletion deps.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export * from "https://deno.land/std@0.170.0/async/mod.ts";
export * from "https://deno.land/std@0.186.0/async/mod.ts";
export * from "https://deno.land/x/[email protected]/mod.ts";
40 changes: 8 additions & 32 deletions message_queue/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
BasicDeliver,
BasicProperties,
connect,
} from "https://deno.land/x/amqp@v0.21.0/mod.ts";
} from "https://deno.land/x/amqp@v0.23.1/mod.ts";

import { MessageEvent } from "./message_event.ts";
import { MessageQueue, MessageQueueOptions } from "./message_queue.ts";
Expand Down Expand Up @@ -52,7 +52,7 @@ export class AMQPMessageQueue<T = any> extends MessageQueue<T> {
async #consume(
args: BasicDeliver,
props: BasicProperties,
rawData: Uint8Array,
data: Uint8Array,
): Promise<void> {
if (
props.headers &&
Expand All @@ -62,7 +62,6 @@ export class AMQPMessageQueue<T = any> extends MessageQueue<T> {
return;
}

// TODO: Break the message decoding out into a helper function
if (this.encoderDecoder?.contentEncoding !== props.contentEncoding) {
throw new TypeError(
`Expected message to have the contentEncoding ${this.encoderDecoder?.contentEncoding} but found ${props.contentEncoding}`,
Expand All @@ -75,17 +74,8 @@ export class AMQPMessageQueue<T = any> extends MessageQueue<T> {
);
}

let data: T | Uint8Array = rawData;
if (this.encoderDecoder) {
data = this.encoderDecoder.decode(data);
}

if (this.serializerDeserializer) {
data = this.serializerDeserializer.deserialize(data);
}

const event = new MessageEvent<T>({
data: data as T,
data: this.decode(data),
origin: args.consumerTag,
});

Expand All @@ -103,7 +93,10 @@ export class AMQPMessageQueue<T = any> extends MessageQueue<T> {
break;
}
case "rejected": {
await this.#channel!.nack({ deliveryTag: args.deliveryTag, requeue });
await this.#channel!.reject({
deliveryTag: args.deliveryTag,
requeue,
});
break;
}
case "pending":
Expand All @@ -124,24 +117,7 @@ export class AMQPMessageQueue<T = any> extends MessageQueue<T> {
}

async queueMessage(message: T): Promise<void> {
// TODO: Break the message encoding out into a helper function
let data: Uint8Array;

if (this.serializerDeserializer) {
data = this.serializerDeserializer.serialize(message);
} else {
if (message instanceof Uint8Array) {
data = new Uint8Array(message);
} else {
throw new TypeError(
"Expected an Uint8Array when calling queueMessage without an serializerDeserializer configured",
);
}
}

if (this.encoderDecoder) {
data = this.encoderDecoder.encode(data);
}
const data = this.encode(message);

if (this.#closed) {
throw new Error("This AMQPMessageQueue has already been closed");
Expand Down
36 changes: 36 additions & 0 deletions message_queue/message_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,42 @@ export abstract class MessageQueue<T = any>
this.#id = options.id ?? crypto.randomUUID();
}

protected encode(message: T): Uint8Array {
let data: Uint8Array;

if (this.serializerDeserializer) {
data = this.serializerDeserializer.serialize(message);
} else {
if (message instanceof Uint8Array) {
data = new Uint8Array(message);
} else {
throw new TypeError(
"Expected an Uint8Array when calling queueMessage without an serializerDeserializer configured",
);
}
}

if (this.encoderDecoder) {
data = this.encoderDecoder.encode(data);
}

return data;
}

protected decode(data: Uint8Array): T {
let message: T | Uint8Array = data;

if (this.encoderDecoder) {
message = this.encoderDecoder.decode(data);
}

if (this.serializerDeserializer) {
message = this.serializerDeserializer.deserialize(data);
}

return message as T;
}

abstract close(): Promise<void>;
abstract queueMessage(message: T): Promise<void>;
}
3 changes: 2 additions & 1 deletion serializer_deserializer/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export interface SerializerDeserializer<T> {
// deno-lint-ignore no-explicit-any
export interface SerializerDeserializer<T = any> {
contentType?: string;
serialize(value: T): Uint8Array;
deserialize(data: Uint8Array): T;
Expand Down
2 changes: 1 addition & 1 deletion serializer_deserializer/v8.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
deserializeAny,
serializeAny,
} from "https://raw.githubusercontent.com/MierenManz/v8_format/main/references/mod.ts";
} from "https://raw.githubusercontent.com/MierenManz/v8_format/8352edf/src/mod.ts";
import { SerializerDeserializer } from "./types.ts";

const v8SerializerDeserializer = {
Expand Down

0 comments on commit 5f800b3

Please sign in to comment.