Skip to content

Commit

Permalink
Add support for attributes and delay seconds for SQS (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickcaballero authored Oct 5, 2023
1 parent 7007e62 commit baed8a3
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 23 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ export default async function () {

// Send message to test queue
await sqs.sendMessage(testQueue, JSON.stringify({ value: '123' }))

// Send message with attributes to test queue
await sqs.sendMessage(testQueue, JSON.stringify({ value: '123' }), {
messageAttributes: {
'my-attribute': {
type: 'String',
value: 'my-attribute-value'
}
}
})
}
```

Expand Down
2 changes: 1 addition & 1 deletion build/aws.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/aws.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/index.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/index.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/sqs.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/sqs.js.map

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions examples/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,14 @@ export default async function () {

// Send message to test queue
await sqs.sendMessage(testQueue, JSON.stringify({ value: '123' }))

// Send message with attributes to test queue
await sqs.sendMessage(testQueue, JSON.stringify({ value: '123' }), {
messageAttributes: {
'my-attribute': {
type: 'String',
value: 'my-attribute-value'
}
}
})
}
42 changes: 38 additions & 4 deletions src/internal/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { AWSClient } from './client'
import { AWSConfig } from './config'
import { SignatureV4, InvalidSignatureError } from './signature'
import { InvalidSignatureError, SignatureV4 } from './signature'
import { HTTPHeaders, SignedHTTPRequest } from './http'
import http, { RefinedResponse, ResponseType } from 'k6/http'
import { toFormUrlEncoded } from './utils'
Expand Down Expand Up @@ -45,7 +45,7 @@ export class SQSClient extends AWSClient {
async sendMessage(
queueUrl: string,
messageBody: string,
options: { messageDeduplicationId?: string; messageGroupId?: string } = {}
options: SendMessageOptions = {}
): Promise<Message> {
const method = 'POST'

Expand All @@ -64,6 +64,28 @@ export class SQSClient extends AWSClient {
body = { ...body, MessageGroupId: options.messageGroupId }
}

if (typeof options.messageAttributes !== 'undefined') {
/*
* A single message attribute is represented as 3 separate parameters: name, value, and type.
* The name of the value parameter varies based on the data type.
* See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html#SQS-SendMessage-request-MessageAttributes
* for more information.
*/
const attributeParameters = Object.entries(options.messageAttributes).reduce((params, [name, attribute], i) => {
const valueParameterSuffix = attribute.type === 'Binary' ? 'BinaryValue' : 'StringValue'
return Object.assign(params, {
[`MessageAttribute.${i + 1}.Name`]: name,
[`MessageAttribute.${i + 1}.Value.${valueParameterSuffix}`]: attribute.value,
[`MessageAttribute.${i + 1}.Value.DataType`]: attribute.type
})
}, {} as Record<string, string>)
body = { ...body, ...attributeParameters };
}

if (typeof options.delaySeconds !== 'undefined') {
body = { ...body, DelaySeconds: options.delaySeconds };
}

const signedRequest: SignedHTTPRequest = this.signature.sign(
{
method: 'POST',
Expand Down Expand Up @@ -213,15 +235,27 @@ export class SQSServiceError extends AWSError {
type SQSOperation = 'ListQueues' | 'SendMessage'

export interface SendMessageOptions {
/*
/**
* The message deduplication ID for FIFO queues
*/
messageDeduplicationId?: string

/*
/**
* The message group ID for FIFO queues
*/
messageGroupId?: string

/**
* The message attributes
*/
messageAttributes?: {
[name: string]: { type: 'String' | 'Number' | 'Binary', value: string }
}

/**
* The length of time, in seconds, for which to delay a specific message.
*/
delaySeconds?: number
}

export interface ListQueuesRequestParameters {
Expand Down
18 changes: 9 additions & 9 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ const testData = {
},
}

export default async function testSuite(data) {
signatureV4TestSuite(data)
await s3TestSuite(data)
await secretsManagerTestSuite(data)
await kmsTestSuite(data)
await sqsTestSuite(data)
await ssmTestSuite(data)
await kinesisTestSuite(data)
await eventBridgeTestSuite(data)
export default async function testSuite() {
signatureV4TestSuite()
await s3TestSuite(testData)
await secretsManagerTestSuite(testData)
await kmsTestSuite(testData)
await sqsTestSuite(testData)
await ssmTestSuite(testData)
await kinesisTestSuite(testData)
await eventBridgeTestSuite(testData)
}
32 changes: 28 additions & 4 deletions tests/internal/sqs.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import { asyncDescribe } from './helpers.js'
import { b64encode } from 'k6/encoding'
import { SQSClient, SQSServiceError } from '../../build/sqs.js'

export async function sqsTestSuite(data) {
const sqsClient = new SQSClient(data.awsConfig)
sqsClient.host = data.awsConfig.endpoint

// As initialized in the setup script
const queueUrl = 'test1'
const fifoQueueUrl = 'test-queue.fifo'

await asyncDescribe('sqs.listQueues', async (expect) => {
// Act
const queues = await sqsClient.listQueues()
Expand Down Expand Up @@ -39,6 +36,33 @@ export async function sqsTestSuite(data) {
expect(message.bodyMD5).to.equal('098f6bcd4621d373cade4e832627b4f6')
})

await asyncDescribe('sqs.sendMessageWithAttributes', async (expect) => {
// Arrange
const queues = await sqsClient.listQueues()
const standardQueueUrl = queues.urls[0]

// Act
const message = await sqsClient.sendMessage(standardQueueUrl, 'test', {
messageAttributes: {
'test-string': {
type: 'String',
value: 'test'
},
'test-number': {
type: 'Number',
value: '23'
},
'test-binary': {
type: 'Binary',
value: b64encode('test')
}
}
})

// Assert
expect(message.id).to.be.a('string')
})

await asyncDescribe('sqs.sendFIFOMessage', async (expect) => {
// Arrange
const queues = await sqsClient.listQueues()
Expand Down

0 comments on commit baed8a3

Please sign in to comment.