Skip to content

Commit

Permalink
fix(eventstream-handler-node): add system clock offset to event signi…
Browse files Browse the repository at this point in the history
…ng streams
  • Loading branch information
kuhe committed Jun 26, 2024
1 parent fb98e0c commit 79ef154
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe("EventSigningStream", () => {
Date = originalDate;
});

it("should sign a eventstream payload properly", (done) => {
it("should sign an eventstream payload properly", (done) => {
const eventStreamCodec = new EventStreamCodec(toUtf8, fromUtf8);
const message1: Message = {
headers: {},
Expand Down Expand Up @@ -62,6 +62,7 @@ describe("EventSigningStream", () => {
signMessage: mockMessageSigner,
},
eventStreamCodec,
systemClockOffsetProvider: async () => 0,
});
const output: Array<MessageHeaders> = [];
signingStream.on("data", (chunk) => {
Expand Down
7 changes: 5 additions & 2 deletions packages/eventstream-handler-node/src/EventSigningStream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EventStreamCodec } from "@smithy/eventstream-codec";
import { MessageHeaders, MessageSigner } from "@smithy/types";
import { MessageHeaders, MessageSigner, Provider } from "@smithy/types";
import { Transform, TransformCallback, TransformOptions } from "stream";

/**
Expand All @@ -9,6 +9,7 @@ export interface EventSigningStreamOptions extends TransformOptions {
priorSignature: string;
messageSigner: MessageSigner;
eventStreamCodec: EventStreamCodec;
systemClockOffsetProvider: Provider<number>;
}

/**
Expand All @@ -20,6 +21,7 @@ export class EventSigningStream extends Transform {
private priorSignature: string;
private messageSigner: MessageSigner;
private eventStreamCodec: EventStreamCodec;
private readonly systemClockOffsetProvider: Provider<number>;

constructor(options: EventSigningStreamOptions) {
super({
Expand All @@ -32,11 +34,12 @@ export class EventSigningStream extends Transform {
this.priorSignature = options.priorSignature;
this.eventStreamCodec = options.eventStreamCodec;
this.messageSigner = options.messageSigner;
this.systemClockOffsetProvider = options.systemClockOffsetProvider;
}

async _transform(chunk: Uint8Array, encoding: string, callback: TransformCallback): Promise<void> {
try {
const now = new Date();
const now = new Date(new Date().getTime() + (await this.systemClockOffsetProvider()));
const dateHeader: MessageHeaders = {
":date": { type: "timestamp", value: now },
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ describe(EventStreamPayloadHandler.name, () => {
priorSignature,
eventStreamCodec: expect.anything(),
messageSigner: expect.anything(),
systemClockOffsetProvider: expect.any(Function),
});
});

Expand Down Expand Up @@ -121,6 +122,7 @@ describe(EventStreamPayloadHandler.name, () => {
priorSignature,
eventStreamCodec: expect.anything(),
messageSigner: expect.anything(),
systemClockOffsetProvider: expect.any(Function),
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface EventStreamPayloadHandlerOptions {
messageSigner: Provider<MessageSigner>;
utf8Encoder: Encoder;
utf8Decoder: Decoder;
systemClockOffset?: number;
}

/**
Expand All @@ -37,10 +38,12 @@ export interface EventStreamPayloadHandlerOptions {
export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
private readonly messageSigner: Provider<MessageSigner>;
private readonly eventStreamCodec: EventStreamCodec;
private readonly systemClockOffsetProvider: Provider<number>;

constructor(options: EventStreamPayloadHandlerOptions) {
this.messageSigner = options.messageSigner;
this.eventStreamCodec = new EventStreamCodec(options.utf8Encoder, options.utf8Decoder);
this.systemClockOffsetProvider = async () => options.systemClockOffset ?? 0;
}

async handle<T extends MetadataBearer>(
Expand Down Expand Up @@ -79,6 +82,7 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
priorSignature,
eventStreamCodec: this.eventStreamCodec,
messageSigner: await this.messageSigner(),
systemClockOffsetProvider: this.systemClockOffsetProvider,
});

pipeline(payloadStream, signingStream, request.body, (err: NodeJS.ErrnoException | null) => {
Expand Down
1 change: 1 addition & 0 deletions packages/eventstream-handler-node/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ export const eventStreamPayloadHandlerProvider: EventStreamPayloadHandlerProvide
utf8Encoder: Encoder;
utf8Decoder: Decoder;
messageSigner: Provider<MessageSigner>;
systemClockOffset?: number;
}) => new EventStreamPayloadHandler(options);
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ describe(EventStreamPayloadHandler.name, () => {
});

expect(getEventSigningTransformStream).toHaveBeenCalledTimes(1);
expect(getEventSigningTransformStream).toHaveBeenCalledWith(priorSignature, expect.anything(), expect.anything());
expect(getEventSigningTransformStream).toHaveBeenCalledWith(
priorSignature,
expect.anything(),
expect.anything(),
expect.anything()
);
});

it("should call event signer with request signature from query string if no signature headers are found", async () => {
Expand All @@ -118,7 +123,12 @@ describe(EventStreamPayloadHandler.name, () => {
});

expect(getEventSigningTransformStream).toHaveBeenCalledTimes(1);
expect(getEventSigningTransformStream).toHaveBeenCalledWith(priorSignature, expect.anything(), expect.anything());
expect(getEventSigningTransformStream).toHaveBeenCalledWith(
priorSignature,
expect.anything(),
expect.anything(),
expect.anything()
);
});

it("should start piping to request payload through event signer if downstream middleware returns", async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface EventStreamPayloadHandlerOptions {
messageSigner: Provider<MessageSigner>;
utf8Encoder: Encoder;
utf8Decoder: Decoder;
systemClockOffset?: number;
}

/**
Expand All @@ -31,10 +32,12 @@ export interface EventStreamPayloadHandlerOptions {
export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
private readonly messageSigner: Provider<MessageSigner>;
private readonly eventStreamCodec: EventStreamCodec;
private readonly systemClockOffsetProvider: Provider<number>;

constructor(options: EventStreamPayloadHandlerOptions) {
this.messageSigner = options.messageSigner;
this.eventStreamCodec = new EventStreamCodec(options.utf8Encoder, options.utf8Decoder);
this.systemClockOffsetProvider = async () => options.systemClockOffset ?? 0;
}

async handle<T extends MetadataBearer>(
Expand Down Expand Up @@ -69,7 +72,8 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
const signingStream = getEventSigningTransformStream(
priorSignature,
await this.messageSigner(),
this.eventStreamCodec
this.eventStreamCodec,
this.systemClockOffsetProvider
);

const signedPayload = payload.pipeThrough(signingStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ export const eventStreamPayloadHandlerProvider: EventStreamPayloadHandlerProvide
utf8Encoder: Encoder;
utf8Decoder: Decoder;
messageSigner: Provider<MessageSigner>;
systemClockOffset?: number;
}) => new EventStreamPayloadHandler(options);
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ describe(getEventSigningTransformStream.name, () => {
sign: mockMessageSigner,
signMessage: mockMessageSigner,
},
eventStreamCodec
eventStreamCodec,
0
);
const output: Array<MessageHeaders> = [];

Expand Down
9 changes: 6 additions & 3 deletions packages/middleware-websocket/src/get-event-signing-stream.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import { EventStreamCodec } from "@smithy/eventstream-codec";
import { MessageHeaders, MessageSigner } from "@smithy/types";
import { MessageHeaders, MessageSigner, Provider } from "@smithy/types";
import { fromHex } from "@smithy/util-hex-encoding";

/**
* Get a transform stream that signs the eventstream
* Implementation replicated from @aws-sdk/eventstream-handler-node::EventSigningStream
* but modified to be compatible with WHATWG stream interface
*
* @internal
*/
export const getEventSigningTransformStream = (
initialSignature: string,
messageSigner: MessageSigner,
eventStreamCodec: EventStreamCodec
eventStreamCodec: EventStreamCodec,
systemClockOffsetProvider: Provider<number>
): TransformStream<Uint8Array, Uint8Array> => {
let priorSignature = initialSignature;
const transformer: Transformer<Uint8Array, Uint8Array> = {
start() {},
async transform(chunk, controller) {
try {
const now = new Date();
const now = new Date(new Date().getTime() + (await systemClockOffsetProvider()));
const dateHeader: MessageHeaders = {
":date": { type: "timestamp", value: now },
};
Expand Down

0 comments on commit 79ef154

Please sign in to comment.