Skip to content

Commit

Permalink
feat(*): migrate to aws sdk v3
Browse files Browse the repository at this point in the history
  • Loading branch information
Filipe Torrado committed Mar 24, 2022
1 parent 65de65e commit 4bb2bd0
Show file tree
Hide file tree
Showing 7 changed files with 8,981 additions and 4,512 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ jobs:
if: ${{ github.ref != 'refs/heads/master' }}
uses: wagoid/commitlint-github-action@v3

- name: Use Node.js 12.x
- name: Use Node.js 16.x
uses: actions/setup-node@v1
with:
node-version: 12.x
node-version: 16.x

- name: Install packages & build & run tests
run: |
Expand Down
13,272 changes: 8,875 additions & 4,397 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@
"messages"
],
"dependencies": {
"@aws-sdk/client-sqs": "^3.55.0",
"@types/lodash": "^4.14.173",
"lodash": "^4.17.21"
},
"devDependencies": {
"@types/chai": "^4.2.22",
"@types/mocha": "^9.0.0",
"@types/node": "^16.9.6",
"@types/sinon": "^10.0.3",
"@types/sinon": "^10.0.11",
"@typescript-eslint/eslint-plugin": "^4.31.2",
"@typescript-eslint/parser": "^4.31.2",
"aws-sdk": "^2.993.0",
"aws-sdk-mock": "^5.3.0",
"aws-sdk-client-mock": "^0.6.2",
"chai": "^4.3.4",
"coveralls": "^3.1.1",
"eslint": "^7.32.0",
Expand Down
46 changes: 25 additions & 21 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { SQS } from 'aws-sdk';
import { Binary } from 'aws-sdk/clients/sqs';
import {
MessageAttributeValue,
SendMessageRequest,
SQS
} from '@aws-sdk/client-sqs';
import { cloneDeep, get } from 'lodash';

const partialCompare = (
Expand All @@ -15,17 +18,13 @@ const partialCompare = (
return Object.keys(criteria).some((key) => get(body, key) === criteria[key]);
};

export type ParsedAttributeValue = number | string | Buffer | Uint8Array;

const composeMessageAttributes = (
attributes: Record<string, number | string | SQS.Binary>
attributes: Record<string, ParsedAttributeValue>
) =>
Object.keys(attributes).reduce(
(
result: Record<
string,
{ DataType: string; StringValue?: string; BinaryValue?: Buffer }
>,
key: string
) => {
(result: Record<string, MessageAttributeValue>, key: string) => {
const attributeValue = attributes[key];

if (typeof attributeValue === 'string') {
Expand All @@ -39,7 +38,10 @@ const composeMessageAttributes = (
StringValue: attributeValue.toString(),
DataType: 'Number'
};
} else if (attributeValue instanceof Buffer) {
} else if (
attributeValue instanceof Buffer ||
attributeValue instanceof Uint8Array
) {
result[key] = {
BinaryValue: attributeValue,
DataType: 'Binary'
Expand All @@ -53,9 +55,11 @@ const composeMessageAttributes = (
{}
);

const parseMessageAttributes = (attributes: SQS.MessageBodyAttributeMap) =>
const parseMessageAttributes = (
attributes: Record<string, MessageAttributeValue>
) =>
Object.keys(attributes).reduce(
(result: Record<string, number | string | Binary>, key) => {
(result: Record<string, ParsedAttributeValue>, key) => {
const attribute = attributes[key];

switch (attribute.DataType) {
Expand Down Expand Up @@ -87,12 +91,12 @@ interface SqsMoveOptions {
excludes?: string | Record<string, unknown>;
transformBody?: (
body: string | Record<string, unknown>,
messageAttributes: Record<string, number | string | SQS.Binary>
messageAttributes: Record<string, ParsedAttributeValue>
) => string | Record<string, unknown>;
transformMessageAttributes?: (
body: string | Record<string, unknown>,
messageAttributes: Record<string, number | string | SQS.Binary>
) => Record<string, number | string | SQS.Binary>;
messageAttributes: Record<string, ParsedAttributeValue>
) => Record<string, ParsedAttributeValue>;
json: boolean;
}

Expand All @@ -102,7 +106,7 @@ const SqsMoveOptionsDefaults: SqsMoveOptions = {
};

export async function sqsMove(
sqsInstance: AWS.SQS,
sqsInstance: SQS,
fromQueueUrl: string,
toQueueUrl: string,
options: Partial<SqsMoveOptions> = {}
Expand Down Expand Up @@ -138,7 +142,7 @@ export async function sqsMove(
let filteredCount = 0;

while (true) {
const response = await sqsInstance.receiveMessage(receiveOptions).promise();
const response = await sqsInstance.receiveMessage(receiveOptions);

if (!Array.isArray(response.Messages)) {
break;
Expand Down Expand Up @@ -190,7 +194,7 @@ export async function sqsMove(
}

if (shouldMove) {
const sendOptions: SQS.SendMessageRequest = {
const sendOptions: SendMessageRequest = {
QueueUrl: toQueueUrl,
MessageBody: messageBody
};
Expand All @@ -213,8 +217,8 @@ export async function sqsMove(
ReceiptHandle: message.ReceiptHandle
};

await sqsInstance.sendMessage(sendOptions).promise();
await sqsInstance.deleteMessage(deleteOptions).promise();
await sqsInstance.sendMessage(sendOptions);
await sqsInstance.deleteMessage(deleteOptions);

movedCount++;
} else {
Expand Down
106 changes: 52 additions & 54 deletions test/index.ts
Original file line number Diff line number Diff line change
@@ -1,69 +1,67 @@
import awsMock from 'aws-sdk-mock';
import { ParsedAttributeValue } from './../src/index';
import sqsMock from './mocks/sqs';
import { expect } from 'chai';
import sinon from 'sinon';
import { sqsMove } from '../src/index';
import { fakeTextMessages, fakeJsonMessages } from './mocks/sqsMessages';
import type Sinon from 'sinon';
import type { SQS } from 'aws-sdk';
import {
DeleteMessageCommand,
DeleteMessageCommandInput,
DeleteMessageCommandOutput,
SendMessageCommand,
SendMessageCommandInput,
SendMessageCommandOutput,
SQS
} from '@aws-sdk/client-sqs';

const fakeFromQueueUrl =
'https://sqs.eu-west-1.amazonaws.com/123456789012/some-dead-letter-queue';
const fakeToQueueUrl =
'https://sqs.eu-west-1.amazonaws.com/123456789012/some-original-queue';

describe('SQS Move', () => {
let deleteMessageSpy: Sinon.SinonSpy<
[
params: SQS.DeleteMessageRequest,
callback: (error: AWS.AWSError, data: Record<string, unknown>) => void
],
(
params: SQS.DeleteMessageRequest,
callback: (error: AWS.AWSError, data: Record<string, unknown>) => void
) => void
>;
let sendMessageSpy: Sinon.SinonSpy<
[
params: SQS.SendMessageRequest,
callback: (error: AWS.AWSError, data: Record<string, unknown>) => void
],
(
params: SQS.SendMessageRequest,
callback: (error: AWS.AWSError, data: Record<string, unknown>) => void
) => void
[SendMessageCommandInput],
SendMessageCommandOutput
>;
let deleteMessageSpy: Sinon.SinonSpy<
[DeleteMessageCommandInput],
DeleteMessageCommandOutput
>;
const sendMessageCallback = (_input: SendMessageCommandInput) => ({
$metadata: {},
MessageId: '123'
});
const deleteMessageCallback = (_input: DeleteMessageCommandInput) => ({
$metadata: {}
});

beforeEach(() => {
sendMessageSpy = sinon.spy((_params, callback) =>
callback(null, { MessageId: 123 })
);
deleteMessageSpy = sinon.spy((_params, callback) =>
callback(null, { MessageId: 123 })
);
});
afterEach(() => {
awsMock.restore('SQS');
sendMessageSpy = sinon.spy(sendMessageCallback);
deleteMessageSpy = sinon.spy(deleteMessageCallback);
});

it('should move message from queue to queue', async () => {
const sqs = sqsMock(fakeTextMessages, {
sendCallback: sendMessageSpy,
deleteCallback: deleteMessageSpy
});
const sqs = new SQS({});
const mock = sqsMock(fakeTextMessages, sqs);
mock.on(SendMessageCommand).callsFake(sendMessageSpy);
mock.on(DeleteMessageCommand).callsFake(deleteMessageSpy);

const counts = await sqsMove(sqs, fakeFromQueueUrl, fakeToQueueUrl);

expect(counts).to.deep.equal({ movedCount: 3, filteredCount: 0 });
expect(sendMessageSpy.callCount).to.equal(3);
expect(deleteMessageSpy.callCount).to.equal(3);

mock.reset();
});

it('should preserve MessageAttributes, MessageDeduplicationId & MessageGroupId', async () => {
const sqs = sqsMock(fakeJsonMessages, {
sendCallback: sendMessageSpy,
deleteCallback: deleteMessageSpy
});
const sqs = new SQS({});
const mock = sqsMock(fakeJsonMessages, sqs);
mock.on(SendMessageCommand).callsFake(sendMessageSpy);
mock.on(DeleteMessageCommand).callsFake(deleteMessageSpy);

await sqsMove(sqs, fakeFromQueueUrl, fakeToQueueUrl);

Expand Down Expand Up @@ -109,10 +107,10 @@ describe('SQS Move', () => {
});

it('should move messages with text includes & excludes', async () => {
const sqs = sqsMock(fakeTextMessages, {
sendCallback: sendMessageSpy,
deleteCallback: deleteMessageSpy
});
const sqs = new SQS({});
const mock = sqsMock(fakeTextMessages, sqs);
mock.on(SendMessageCommand).callsFake(sendMessageSpy);
mock.on(DeleteMessageCommand).callsFake(deleteMessageSpy);

const counts = await sqsMove(sqs, fakeFromQueueUrl, fakeToQueueUrl, {
includes: 'message',
Expand All @@ -126,10 +124,10 @@ describe('SQS Move', () => {
});

it('should move messages with object includes & excludes', async () => {
const sqs = sqsMock(fakeJsonMessages, {
sendCallback: sendMessageSpy,
deleteCallback: deleteMessageSpy
});
const sqs = new SQS({});
const mock = sqsMock(fakeJsonMessages, sqs);
mock.on(SendMessageCommand).callsFake(sendMessageSpy);
mock.on(DeleteMessageCommand).callsFake(deleteMessageSpy);

const counts = await sqsMove(sqs, fakeFromQueueUrl, fakeToQueueUrl, {
includes: { 'user.country': 'BE' },
Expand All @@ -143,16 +141,16 @@ describe('SQS Move', () => {
});

it('should move messages with transformBody', async () => {
const sqs = sqsMock(fakeJsonMessages, {
sendCallback: sendMessageSpy,
deleteCallback: deleteMessageSpy
});
const sqs = new SQS({});
const mock = sqsMock(fakeJsonMessages, sqs);
mock.on(SendMessageCommand).callsFake(sendMessageSpy);
mock.on(DeleteMessageCommand).callsFake(deleteMessageSpy);

await sqsMove(sqs, fakeFromQueueUrl, fakeToQueueUrl, {
transformBody: (
body: {
user: Record<string, string>;
traceId: number | string | SQS.Binary;
traceId: ParsedAttributeValue;
},
messageAttributes
) => {
Expand All @@ -178,10 +176,10 @@ describe('SQS Move', () => {
});

it('should move messages with transformMessageAttributes', async () => {
const sqs = sqsMock(fakeJsonMessages, {
sendCallback: sendMessageSpy,
deleteCallback: deleteMessageSpy
});
const sqs = new SQS({});
const mock = sqsMock(fakeJsonMessages, sqs);
mock.on(SendMessageCommand).callsFake(sendMessageSpy);
mock.on(DeleteMessageCommand).callsFake(deleteMessageSpy);

await sqsMove(sqs, fakeFromQueueUrl, fakeToQueueUrl, {
transformMessageAttributes: (
Expand Down
53 changes: 21 additions & 32 deletions test/mocks/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,30 @@
import AWSMock from 'aws-sdk-mock';
import AWS, { SQS } from 'aws-sdk';

type Options = {
sendCallback?: (
params: AWS.SQS.SendMessageRequest,
callback: (error: AWS.AWSError, data: Record<string, unknown>) => void
) => void;
deleteCallback?: (
params: AWS.SQS.DeleteMessageRequest,
callback: (error: AWS.AWSError, data: Record<string, unknown>) => void
) => void;
};
import { AwsStub, mockClient } from 'aws-sdk-client-mock';
import {
Message,
ReceiveMessageCommand,
ServiceInputTypes,
ServiceOutputTypes,
SQSClient
} from '@aws-sdk/client-sqs';

export default (
messages: SQS.MessageList,
{ sendCallback, deleteCallback }: Options = {}
): AWS.SQS => {
messages: Array<Message>,
sqsClient: SQSClient
): AwsStub<ServiceInputTypes, ServiceOutputTypes> => {
let receiveCount = 0;

AWSMock.mock('SQS', 'receiveMessage', (_params, callback) => {
const RequestId = `r${receiveCount + 1}`;
const sqsMock = mockClient(sqsClient);

if (receiveCount >= messages.length) {
// Simulate empty queue
callback(null, { ResponseMetadata: { RequestId } });
}
sqsMock.on(ReceiveMessageCommand).callsFake(() => {
const result = {
$metadata: { requestId: `r${receiveCount + 1}` },
Messages:
receiveCount >= messages.length ? undefined : [messages[receiveCount]]
};
receiveCount += 1;

callback(null, {
ResponseMetadata: { RequestId },
Messages: [messages[receiveCount]]
});

receiveCount++;
return result;
});

AWSMock.mock('SQS', 'sendMessage', sendCallback);
AWSMock.mock('SQS', 'deleteMessage', deleteCallback);

return new AWS.SQS();
return sqsMock;
};
Loading

0 comments on commit 4bb2bd0

Please sign in to comment.