Skip to content

Commit

Permalink
fix(eventstream-handler-node): add system clock offset to event signi…
Browse files Browse the repository at this point in the history
…ng streams
  • Loading branch information
kuhe committed Jun 26, 2024
1 parent fb98e0c commit 1745389
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe("EventSigningStream", () => {
Date = originalDate;
});

it("should sign a eventstream payload properly", (done) => {
it("should sign an eventstream payload properly", (done) => {
const eventStreamCodec = new EventStreamCodec(toUtf8, fromUtf8);
const message1: Message = {
headers: {},
Expand Down
5 changes: 4 additions & 1 deletion packages/eventstream-handler-node/src/EventSigningStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface EventSigningStreamOptions extends TransformOptions {
priorSignature: string;
messageSigner: MessageSigner;
eventStreamCodec: EventStreamCodec;
systemClockOffset?: number;
}

/**
Expand All @@ -20,6 +21,7 @@ export class EventSigningStream extends Transform {
private priorSignature: string;
private messageSigner: MessageSigner;
private eventStreamCodec: EventStreamCodec;
private readonly systemClockOffset: number;

constructor(options: EventSigningStreamOptions) {
super({
Expand All @@ -32,11 +34,12 @@ export class EventSigningStream extends Transform {
this.priorSignature = options.priorSignature;
this.eventStreamCodec = options.eventStreamCodec;
this.messageSigner = options.messageSigner;
this.systemClockOffset = options.systemClockOffset ?? 0;
}

async _transform(chunk: Uint8Array, encoding: string, callback: TransformCallback): Promise<void> {
try {
const now = new Date();
const now = new Date(new Date().getTime() + this.systemClockOffset);
const dateHeader: MessageHeaders = {
":date": { type: "timestamp", value: now },
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ describe(EventStreamPayloadHandler.name, () => {
priorSignature,
eventStreamCodec: expect.anything(),
messageSigner: expect.anything(),
systemClockOffset: 0,
});
});

Expand Down Expand Up @@ -121,6 +122,7 @@ describe(EventStreamPayloadHandler.name, () => {
priorSignature,
eventStreamCodec: expect.anything(),
messageSigner: expect.anything(),
systemClockOffset: 0,
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface EventStreamPayloadHandlerOptions {
messageSigner: Provider<MessageSigner>;
utf8Encoder: Encoder;
utf8Decoder: Decoder;
systemClockOffset?: number;
}

/**
Expand All @@ -37,10 +38,12 @@ export interface EventStreamPayloadHandlerOptions {
export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
private readonly messageSigner: Provider<MessageSigner>;
private readonly eventStreamCodec: EventStreamCodec;
private readonly systemClockOffset: number;

constructor(options: EventStreamPayloadHandlerOptions) {
this.messageSigner = options.messageSigner;
this.eventStreamCodec = new EventStreamCodec(options.utf8Encoder, options.utf8Decoder);
this.systemClockOffset = options.systemClockOffset ?? 0;
}

async handle<T extends MetadataBearer>(
Expand Down Expand Up @@ -79,6 +82,7 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
priorSignature,
eventStreamCodec: this.eventStreamCodec,
messageSigner: await this.messageSigner(),
systemClockOffset: this.systemClockOffset,
});

pipeline(payloadStream, signingStream, request.body, (err: NodeJS.ErrnoException | null) => {
Expand Down
1 change: 1 addition & 0 deletions packages/eventstream-handler-node/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ export const eventStreamPayloadHandlerProvider: EventStreamPayloadHandlerProvide
utf8Encoder: Encoder;
utf8Decoder: Decoder;
messageSigner: Provider<MessageSigner>;
systemClockOffset?: number;
}) => new EventStreamPayloadHandler(options);
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ describe(EventStreamPayloadHandler.name, () => {
});

expect(getEventSigningTransformStream).toHaveBeenCalledTimes(1);
expect(getEventSigningTransformStream).toHaveBeenCalledWith(priorSignature, expect.anything(), expect.anything());
expect(getEventSigningTransformStream).toHaveBeenCalledWith(
priorSignature,
expect.anything(),
expect.anything(),
0
);
});

it("should call event signer with request signature from query string if no signature headers are found", async () => {
Expand All @@ -118,7 +123,12 @@ describe(EventStreamPayloadHandler.name, () => {
});

expect(getEventSigningTransformStream).toHaveBeenCalledTimes(1);
expect(getEventSigningTransformStream).toHaveBeenCalledWith(priorSignature, expect.anything(), expect.anything());
expect(getEventSigningTransformStream).toHaveBeenCalledWith(
priorSignature,
expect.anything(),
expect.anything(),
0
);
});

it("should start piping to request payload through event signer if downstream middleware returns", async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface EventStreamPayloadHandlerOptions {
messageSigner: Provider<MessageSigner>;
utf8Encoder: Encoder;
utf8Decoder: Decoder;
systemClockOffset?: number;
}

/**
Expand All @@ -31,10 +32,12 @@ export interface EventStreamPayloadHandlerOptions {
export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
private readonly messageSigner: Provider<MessageSigner>;
private readonly eventStreamCodec: EventStreamCodec;
private readonly systemClockOffset: number;

constructor(options: EventStreamPayloadHandlerOptions) {
this.messageSigner = options.messageSigner;
this.eventStreamCodec = new EventStreamCodec(options.utf8Encoder, options.utf8Decoder);
this.systemClockOffset = options.systemClockOffset ?? 0;
}

async handle<T extends MetadataBearer>(
Expand Down Expand Up @@ -69,7 +72,8 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
const signingStream = getEventSigningTransformStream(
priorSignature,
await this.messageSigner(),
this.eventStreamCodec
this.eventStreamCodec,
this.systemClockOffset
);

const signedPayload = payload.pipeThrough(signingStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ export const eventStreamPayloadHandlerProvider: EventStreamPayloadHandlerProvide
utf8Encoder: Encoder;
utf8Decoder: Decoder;
messageSigner: Provider<MessageSigner>;
systemClockOffset?: number;
}) => new EventStreamPayloadHandler(options);
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ describe(getEventSigningTransformStream.name, () => {
sign: mockMessageSigner,
signMessage: mockMessageSigner,
},
eventStreamCodec
eventStreamCodec,
0
);
const output: Array<MessageHeaders> = [];

Expand Down
5 changes: 3 additions & 2 deletions packages/middleware-websocket/src/get-event-signing-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import { fromHex } from "@smithy/util-hex-encoding";
export const getEventSigningTransformStream = (
initialSignature: string,
messageSigner: MessageSigner,
eventStreamCodec: EventStreamCodec
eventStreamCodec: EventStreamCodec,
systemClockOffset: number
): TransformStream<Uint8Array, Uint8Array> => {
let priorSignature = initialSignature;
const transformer: Transformer<Uint8Array, Uint8Array> = {
start() {},
async transform(chunk, controller) {
try {
const now = new Date();
const now = new Date(new Date().getTime() + systemClockOffset);
const dateHeader: MessageHeaders = {
":date": { type: "timestamp", value: now },
};
Expand Down

0 comments on commit 1745389

Please sign in to comment.