Skip to content

Commit

Permalink
Merge pull request #95 from redcordlau/master
Browse files Browse the repository at this point in the history
feat(kinesis): support Kinesis PutRecords
  • Loading branch information
horike37 authored Sep 4, 2020
2 parents a1c6cb7 + 7d80b8c commit 6185b13
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 12 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ custom:
queryStringParam: myKey # use query string param
streamName: { Ref: 'YourStream' }
cors: true
- kinesis: # PutRecords
path: /kinesis
method: post
action: PutRecords
streamName: { Ref: 'YourStream' }
cors: true

resources:
Resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ custom:
queryStringParam: myKey # use query string param
streamName: { Ref: 'YourStream' }
cors: true
- kinesis:
path: /kinesis6
action: PutRecord
method: post
streamName: { Ref: 'YourStream' }
cors: true
- kinesis:
path: /kinesis7
action: PutRecords
method: post
streamName: { Ref: 'YourStream' }
cors: true

resources:
Resources:
Expand Down
40 changes: 40 additions & 0 deletions __tests__/integration/kinesis/multiple-integrations/tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,44 @@ describe('Multiple Kinesis Proxy Integrations Test', () => {
expect(body).to.have.own.property('ShardId')
expect(body).to.have.own.property('SequenceNumber')
})

it('should get correct response from kinesis proxy endpoints with action "PutRecord" with default partitionkey', async () => {
const stream = 'kinesis6'
const testEndpoint = `${endpoint}/${stream}`
const response = await fetch(testEndpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: `data for stream ${stream}` })
})
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
expect(response.status).to.be.equal(200)
const body = await response.json()
expect(body).to.have.own.property('ShardId')
expect(body).to.have.own.property('SequenceNumber')
})

it('should get correct response from kinesis proxy endpoints with action "PutRecords" with default partitionkey', async () => {
const stream = 'kinesis7'
const testEndpoint = `${endpoint}/${stream}`
const response = await fetch(testEndpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
records: [
{ data: 'some data', 'partition-key': 'some key' },
{ data: 'some other data', 'partition-key': 'some key' }
]
})
})
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
expect(response.status).to.be.equal(200)
const body = await response.json()
expect(body).to.have.own.property('FailedRecordCount')
expect(body).to.have.own.property('Records')
expect(body.Records.length).to.be.equal(2)
expect(body.Records[0]).to.have.own.property('ShardId')
expect(body.Records[0]).to.have.own.property('SequenceNumber')
expect(body.Records[1]).to.have.own.property('ShardId')
expect(body.Records[1]).to.have.own.property('SequenceNumber')
})
})
8 changes: 7 additions & 1 deletion lib/apiGateway/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,13 @@ const allowedProxies = ['kinesis', 'sqs', 's3', 'sns', 'dynamodb', 'eventbridge'

const proxiesSchemas = {
kinesis: Joi.object({
kinesis: proxy.append({ streamName: stringOrRef.required(), partitionKey, request, response })
kinesis: proxy.append({
action: Joi.string().valid('PutRecord', 'PutRecords'),
streamName: stringOrRef.required(),
partitionKey,
request,
response
})
}),
s3: Joi.object({
s3: proxy.append({
Expand Down
21 changes: 21 additions & 0 deletions lib/apiGateway/validate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,27 @@ describe('#validateServiceProxies()', () => {

expect(() => serverlessApigatewayServiceProxy.validateServiceProxies()).to.not.throw()
})

it('should throw error if action is not "PutRecord" and "PutRecords"', () => {
serverlessApigatewayServiceProxy.serverless.service.custom = {
apiGatewayServiceProxies: [
{
kinesis: {
streamName: 'yourStream',
path: 'kinesis',
method: 'post',
action: 'xxxxxx',
request: { template: [] }
}
}
]
}

expect(() => serverlessApigatewayServiceProxy.validateServiceProxies()).to.throw(
serverless.classes.Error,
'child "kinesis" fails because [child "action" fails because ["action" must be one of [PutRecord, PutRecords]]]'
)
})
})

describe('s3', () => {
Expand Down
2 changes: 1 addition & 1 deletion lib/package/kinesis/compileIamRoleToKinesis.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ module.exports = {
},
{
Effect: 'Allow',
Action: ['kinesis:PutRecord'],
Action: ['kinesis:PutRecord', 'kinesis:PutRecords'],
Resource: policyResource
}
]
Expand Down
2 changes: 1 addition & 1 deletion lib/package/kinesis/compileIamRoleToKinesis.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ describe('#compileIamRoleToKinesis()', () => {
},
{
Effect: 'Allow',
Action: ['kinesis:PutRecord'],
Action: ['kinesis:PutRecord', 'kinesis:PutRecords'],
Resource: [
{
'Fn::Sub': [
Expand Down
41 changes: 32 additions & 9 deletions lib/package/kinesis/compileMethodsToKinesis.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ module.exports = {
Type: 'AWS',
Credentials: roleArn,
Uri: {
'Fn::Sub': 'arn:${AWS::Partition}:apigateway:${AWS::Region}:kinesis:action/PutRecord'
'Fn::Sub':
'arn:${AWS::Partition}:apigateway:${AWS::Region}:kinesis:action/' +
(http.action || 'PutRecord')
},
PassthroughBehavior: 'NEVER',
RequestTemplates: this.getKinesisIntegrationRequestTemplates(http)
Expand Down Expand Up @@ -133,15 +135,36 @@ module.exports = {
buildDefaultKinesisRequestTemplate(http) {
const objectRequestParam = this.getKinesisObjectRequestParameter(http)

return {
'Fn::Sub': [
'{"StreamName":"${StreamName}","Data":"${Data}","PartitionKey":"${PartitionKey}"}',
{
StreamName: http.streamName,
Data: '$util.base64Encode($input.body)',
PartitionKey: `${objectRequestParam}`
switch (http.action) {
case 'PutRecords':
return {
'Fn::Sub': [
'{"StreamName":"${StreamName}","Records":${Records}}',
{
StreamName: http.streamName,
Records: `[
#foreach($elem in $input.path('$.records'))
{
"Data": "$util.base64Encode($elem.data)",
"PartitionKey": "$elem.partition-key"
}#if($foreach.hasNext),#end
#end
]`
}
]
}
case 'PutRecord':
default:
return {
'Fn::Sub': [
'{"StreamName":"${StreamName}","Data":"${Data}","PartitionKey":"${PartitionKey}"}',
{
StreamName: http.streamName,
Data: '$util.base64Encode($input.body)',
PartitionKey: `${objectRequestParam}`
}
]
}
]
}
}
}
Loading

0 comments on commit 6185b13

Please sign in to comment.