diff --git a/README.md b/README.md index e4e33b0..4d9dd6f 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,12 @@ custom: queryStringParam: myKey # use query string param streamName: { Ref: 'YourStream' } cors: true + - kinesis: # PutRecords + path: /kinesis + method: post + action: PutRecords + streamName: { Ref: 'YourStream' } + cors: true resources: Resources: diff --git a/__tests__/integration/kinesis/multiple-integrations/service/serverless.yml b/__tests__/integration/kinesis/multiple-integrations/service/serverless.yml index 1583643..e66f11b 100644 --- a/__tests__/integration/kinesis/multiple-integrations/service/serverless.yml +++ b/__tests__/integration/kinesis/multiple-integrations/service/serverless.yml @@ -43,6 +43,18 @@ custom: queryStringParam: myKey # use query string param streamName: { Ref: 'YourStream' } cors: true + - kinesis: + path: /kinesis6 + action: PutRecord + method: post + streamName: { Ref: 'YourStream' } + cors: true + - kinesis: + path: /kinesis7 + action: PutRecords + method: post + streamName: { Ref: 'YourStream' } + cors: true resources: Resources: diff --git a/__tests__/integration/kinesis/multiple-integrations/tests.js b/__tests__/integration/kinesis/multiple-integrations/tests.js index 189d3f9..09af99f 100644 --- a/__tests__/integration/kinesis/multiple-integrations/tests.js +++ b/__tests__/integration/kinesis/multiple-integrations/tests.js @@ -96,4 +96,44 @@ describe('Multiple Kinesis Proxy Integrations Test', () => { expect(body).to.have.own.property('ShardId') expect(body).to.have.own.property('SequenceNumber') }) + + it('should get correct response from kinesis proxy endpoints with action "PutRecord" with default partitionkey', async () => { + const stream = 'kinesis6' + const testEndpoint = `${endpoint}/${stream}` + const response = await fetch(testEndpoint, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ message: `data for stream ${stream}` }) + }) + expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*') + expect(response.status).to.be.equal(200) + const body = await response.json() + expect(body).to.have.own.property('ShardId') + expect(body).to.have.own.property('SequenceNumber') + }) + + it('should get correct response from kinesis proxy endpoints with action "PutRecords" with default partitionkey', async () => { + const stream = 'kinesis7' + const testEndpoint = `${endpoint}/${stream}` + const response = await fetch(testEndpoint, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + records: [ + { data: 'some data', 'partition-key': 'some key' }, + { data: 'some other data', 'partition-key': 'some key' } + ] + }) + }) + expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*') + expect(response.status).to.be.equal(200) + const body = await response.json() + expect(body).to.have.own.property('FailedRecordCount') + expect(body).to.have.own.property('Records') + expect(body.Records.length).to.be.equal(2) + expect(body.Records[0]).to.have.own.property('ShardId') + expect(body.Records[0]).to.have.own.property('SequenceNumber') + expect(body.Records[1]).to.have.own.property('ShardId') + expect(body.Records[1]).to.have.own.property('SequenceNumber') + }) }) diff --git a/lib/apiGateway/schema.js b/lib/apiGateway/schema.js index 5049828..cd6778d 100644 --- a/lib/apiGateway/schema.js +++ b/lib/apiGateway/schema.js @@ -240,7 +240,13 @@ const allowedProxies = ['kinesis', 'sqs', 's3', 'sns', 'dynamodb', 'eventbridge' const proxiesSchemas = { kinesis: Joi.object({ - kinesis: proxy.append({ streamName: stringOrRef.required(), partitionKey, request, response }) + kinesis: proxy.append({ + action: Joi.string().valid('PutRecord', 'PutRecords'), + streamName: stringOrRef.required(), + partitionKey, + request, + response + }) }), s3: Joi.object({ s3: proxy.append({ diff --git a/lib/apiGateway/validate.test.js b/lib/apiGateway/validate.test.js index 7ac3207..0a2add1 100644 --- a/lib/apiGateway/validate.test.js +++ b/lib/apiGateway/validate.test.js @@ -1068,6 +1068,27 @@ describe('#validateServiceProxies()', () => { expect(() => serverlessApigatewayServiceProxy.validateServiceProxies()).to.not.throw() }) + + it('should throw error if action is not "PutRecord" and "PutRecords"', () => { + serverlessApigatewayServiceProxy.serverless.service.custom = { + apiGatewayServiceProxies: [ + { + kinesis: { + streamName: 'yourStream', + path: 'kinesis', + method: 'post', + action: 'xxxxxx', + request: { template: [] } + } + } + ] + } + + expect(() => serverlessApigatewayServiceProxy.validateServiceProxies()).to.throw( + serverless.classes.Error, + 'child "kinesis" fails because [child "action" fails because ["action" must be one of [PutRecord, PutRecords]]]' + ) + }) }) describe('s3', () => { diff --git a/lib/package/kinesis/compileIamRoleToKinesis.js b/lib/package/kinesis/compileIamRoleToKinesis.js index 42269d0..423f776 100644 --- a/lib/package/kinesis/compileIamRoleToKinesis.js +++ b/lib/package/kinesis/compileIamRoleToKinesis.js @@ -52,7 +52,7 @@ module.exports = { }, { Effect: 'Allow', - Action: ['kinesis:PutRecord'], + Action: ['kinesis:PutRecord', 'kinesis:PutRecords'], Resource: policyResource } ] diff --git a/lib/package/kinesis/compileIamRoleToKinesis.test.js b/lib/package/kinesis/compileIamRoleToKinesis.test.js index d1fd010..2549dd5 100644 --- a/lib/package/kinesis/compileIamRoleToKinesis.test.js +++ b/lib/package/kinesis/compileIamRoleToKinesis.test.js @@ -79,7 +79,7 @@ describe('#compileIamRoleToKinesis()', () => { }, { Effect: 'Allow', - Action: ['kinesis:PutRecord'], + Action: ['kinesis:PutRecord', 'kinesis:PutRecords'], Resource: [ { 'Fn::Sub': [ diff --git a/lib/package/kinesis/compileMethodsToKinesis.js b/lib/package/kinesis/compileMethodsToKinesis.js index 55191fe..05ebad0 100644 --- a/lib/package/kinesis/compileMethodsToKinesis.js +++ b/lib/package/kinesis/compileMethodsToKinesis.js @@ -53,7 +53,9 @@ module.exports = { Type: 'AWS', Credentials: roleArn, Uri: { - 'Fn::Sub': 'arn:${AWS::Partition}:apigateway:${AWS::Region}:kinesis:action/PutRecord' + 'Fn::Sub': + 'arn:${AWS::Partition}:apigateway:${AWS::Region}:kinesis:action/' + + (http.action || 'PutRecord') }, PassthroughBehavior: 'NEVER', RequestTemplates: this.getKinesisIntegrationRequestTemplates(http) @@ -133,15 +135,36 @@ module.exports = { buildDefaultKinesisRequestTemplate(http) { const objectRequestParam = this.getKinesisObjectRequestParameter(http) - return { - 'Fn::Sub': [ - '{"StreamName":"${StreamName}","Data":"${Data}","PartitionKey":"${PartitionKey}"}', - { - StreamName: http.streamName, - Data: '$util.base64Encode($input.body)', - PartitionKey: `${objectRequestParam}` + switch (http.action) { + case 'PutRecords': + return { + 'Fn::Sub': [ + '{"StreamName":"${StreamName}","Records":${Records}}', + { + StreamName: http.streamName, + Records: `[ + #foreach($elem in $input.path('$.records')) + { + "Data": "$util.base64Encode($elem.data)", + "PartitionKey": "$elem.partition-key" + }#if($foreach.hasNext),#end + #end +]` + } + ] + } + case 'PutRecord': + default: + return { + 'Fn::Sub': [ + '{"StreamName":"${StreamName}","Data":"${Data}","PartitionKey":"${PartitionKey}"}', + { + StreamName: http.streamName, + Data: '$util.base64Encode($input.body)', + PartitionKey: `${objectRequestParam}` + } + ] } - ] } } } diff --git a/lib/package/kinesis/compileMethodsToKinesis.test.js b/lib/package/kinesis/compileMethodsToKinesis.test.js index 0351cce..77d6337 100644 --- a/lib/package/kinesis/compileMethodsToKinesis.test.js +++ b/lib/package/kinesis/compileMethodsToKinesis.test.js @@ -908,4 +908,215 @@ describe('#compileMethodsToKinesis()', () => { .ApiGatewayMethodkinesisPost.Properties.RequestParameters ).to.be.deep.equal({ 'method.request.header.Custom-Header': true }) }) + + it('should create corresponding resources when kinesis proxies with custome action "PutRecord" are given', () => { + serverlessApigatewayServiceProxy.validated = { + events: [ + { + serviceName: 'kinesis', + http: { + streamName: 'myStream', + path: 'kinesis', + method: 'post', + action: 'PutRecord', + auth: { + authorizationType: 'NONE' + } + } + } + ] + } + serverlessApigatewayServiceProxy.apiGatewayRestApiLogicalId = 'ApiGatewayRestApi' + serverlessApigatewayServiceProxy.apiGatewayResources = { + kinesis: { + name: 'kinesis', + resourceLogicalId: 'ApiGatewayResourceKinesis' + } + } + + serverlessApigatewayServiceProxy.compileMethodsToKinesis() + expect(serverless.service.provider.compiledCloudFormationTemplate.Resources).to.deep.equal({ + ApiGatewayMethodkinesisPost: { + Type: 'AWS::ApiGateway::Method', + Properties: { + HttpMethod: 'POST', + RequestParameters: {}, + AuthorizationType: 'NONE', + AuthorizationScopes: undefined, + AuthorizerId: undefined, + ApiKeyRequired: false, + ResourceId: { Ref: 'ApiGatewayResourceKinesis' }, + RestApiId: { Ref: 'ApiGatewayRestApi' }, + Integration: { + IntegrationHttpMethod: 'POST', + Type: 'AWS', + Credentials: { 'Fn::GetAtt': ['ApigatewayToKinesisRole', 'Arn'] }, + Uri: { + 'Fn::Sub': 'arn:${AWS::Partition}:apigateway:${AWS::Region}:kinesis:action/PutRecord' + }, + PassthroughBehavior: 'NEVER', + RequestTemplates: { + 'application/json': { + 'Fn::Sub': [ + '{"StreamName":"${StreamName}","Data":"${Data}","PartitionKey":"${PartitionKey}"}', + { + StreamName: 'myStream', + Data: '$util.base64Encode($input.body)', + PartitionKey: '$context.requestId' + } + ] + }, + 'application/x-www-form-urlencoded': { + 'Fn::Sub': [ + '{"StreamName":"${StreamName}","Data":"${Data}","PartitionKey":"${PartitionKey}"}', + { + StreamName: 'myStream', + Data: '$util.base64Encode($input.body)', + PartitionKey: '$context.requestId' + } + ] + } + }, + IntegrationResponses: [ + { + StatusCode: 200, + SelectionPattern: 200, + ResponseParameters: {}, + ResponseTemplates: {} + }, + { + StatusCode: 400, + SelectionPattern: 400, + ResponseParameters: {}, + ResponseTemplates: {} + }, + { + StatusCode: 500, + SelectionPattern: 500, + ResponseParameters: {}, + ResponseTemplates: {} + } + ] + }, + MethodResponses: [ + { ResponseParameters: {}, ResponseModels: {}, StatusCode: 200 }, + { ResponseParameters: {}, ResponseModels: {}, StatusCode: 400 }, + { ResponseParameters: {}, ResponseModels: {}, StatusCode: 500 } + ] + } + } + }) + }) + + it('should create corresponding resources when kinesis proxies with custome action "PutRecords" are given', () => { + serverlessApigatewayServiceProxy.validated = { + events: [ + { + serviceName: 'kinesis', + http: { + streamName: 'myStream', + path: 'kinesis', + method: 'post', + action: 'PutRecords', + auth: { + authorizationType: 'NONE' + } + } + } + ] + } + serverlessApigatewayServiceProxy.apiGatewayRestApiLogicalId = 'ApiGatewayRestApi' + serverlessApigatewayServiceProxy.apiGatewayResources = { + kinesis: { + name: 'kinesis', + resourceLogicalId: 'ApiGatewayResourceKinesis' + } + } + + serverlessApigatewayServiceProxy.compileMethodsToKinesis() + + expect(serverless.service.provider.compiledCloudFormationTemplate.Resources).to.deep.equal({ + ApiGatewayMethodkinesisPost: { + Type: 'AWS::ApiGateway::Method', + Properties: { + HttpMethod: 'POST', + RequestParameters: {}, + AuthorizationType: 'NONE', + AuthorizationScopes: undefined, + AuthorizerId: undefined, + ApiKeyRequired: false, + ResourceId: { Ref: 'ApiGatewayResourceKinesis' }, + RestApiId: { Ref: 'ApiGatewayRestApi' }, + Integration: { + IntegrationHttpMethod: 'POST', + Type: 'AWS', + Credentials: { 'Fn::GetAtt': ['ApigatewayToKinesisRole', 'Arn'] }, + Uri: { + 'Fn::Sub': 'arn:${AWS::Partition}:apigateway:${AWS::Region}:kinesis:action/PutRecords' + }, + PassthroughBehavior: 'NEVER', + RequestTemplates: { + 'application/json': { + 'Fn::Sub': [ + '{"StreamName":"${StreamName}","Records":${Records}}', + { + StreamName: 'myStream', + Records: `[ + #foreach($elem in $input.path('$.records')) + { + "Data": "$util.base64Encode($elem.data)", + "PartitionKey": "$elem.partition-key" + }#if($foreach.hasNext),#end + #end +]` + } + ] + }, + 'application/x-www-form-urlencoded': { + 'Fn::Sub': [ + '{"StreamName":"${StreamName}","Records":${Records}}', + { + StreamName: 'myStream', + Records: `[ + #foreach($elem in $input.path('$.records')) + { + "Data": "$util.base64Encode($elem.data)", + "PartitionKey": "$elem.partition-key" + }#if($foreach.hasNext),#end + #end +]` + } + ] + } + }, + IntegrationResponses: [ + { + StatusCode: 200, + SelectionPattern: 200, + ResponseParameters: {}, + ResponseTemplates: {} + }, + { + StatusCode: 400, + SelectionPattern: 400, + ResponseParameters: {}, + ResponseTemplates: {} + }, + { + StatusCode: 500, + SelectionPattern: 500, + ResponseParameters: {}, + ResponseTemplates: {} + } + ] + }, + MethodResponses: [ + { ResponseParameters: {}, ResponseModels: {}, StatusCode: 200 }, + { ResponseParameters: {}, ResponseModels: {}, StatusCode: 400 }, + { ResponseParameters: {}, ResponseModels: {}, StatusCode: 500 } + ] + } + } + }) + }) })