diff --git a/lib/stacks/data.ts b/lib/stacks/data.ts index 98168b270a..b40dcc322c 100644 --- a/lib/stacks/data.ts +++ b/lib/stacks/data.ts @@ -612,6 +612,28 @@ export class Data extends cdk.NestedStack { }, ).next(failureState); + const checkSeaDataProgressTask = + new cdk.aws_stepfunctions_tasks.LambdaInvoke( + this, + "CheckSeaDataProgress", + { + lambdaFunction: checkConsumerLag, + outputPath: "$.Payload", + payload: cdk.aws_stepfunctions.TaskInput.fromObject({ + brokerString, + triggers: [ + { + function: lambdaFunctions.sinkMain.functionName, + topics: ["aws.seatool.ksql.onemac.agg.State_Plan"], + }, + ], + }), + }, + ).addCatch(notifyOfFailureStep, { + errors: ["States.ALL"], + resultPath: "$.error", + }); + const checkDataProgressTask = new cdk.aws_stepfunctions_tasks.LambdaInvoke( this, "CheckDataProgress", @@ -624,7 +646,6 @@ export class Data extends cdk.NestedStack { { function: lambdaFunctions.sinkMain.functionName, topics: [ - "aws.seatool.ksql.onemac.agg.State_Plan", "aws.onemac.migration.cdc", `${topicNamespace}aws.onemac.migration.cdc`, "aws.seatool.debezium.changed_date.SEA.dbo.State_Plan", @@ -708,7 +729,7 @@ export class Data extends cdk.NestedStack { .next( new cdk.aws_stepfunctions_tasks.LambdaInvoke( this, - "StartIndexingData", + "StartIndexingSeaData", { lambdaFunction: createTriggers, outputPath: "$.Payload", @@ -722,33 +743,7 @@ export class Data extends cdk.NestedStack { triggers: [ { function: lambdaFunctions.sinkMain.functionName, - topics: [ - "aws.seatool.ksql.onemac.agg.State_Plan", - "aws.onemac.migration.cdc", - `${topicNamespace}aws.onemac.migration.cdc`, - "aws.seatool.debezium.changed_date.SEA.dbo.State_Plan", - ], - }, - { - function: lambdaFunctions.sinkChangelog.functionName, - topics: [ - "aws.onemac.migration.cdc", - `${topicNamespace}aws.onemac.migration.cdc`, - ], - }, - { - function: lambdaFunctions.sinkTypes.functionName, - topics: ["aws.seatool.debezium.cdc.SEA.dbo.SPA_Type"], - batchSize: 10000, - }, - { - function: lambdaFunctions.sinkSubtypes.functionName, - topics: ["aws.seatool.debezium.cdc.SEA.dbo.Type"], - batchSize: 10000, - }, - { - function: lambdaFunctions.sinkCpocs.functionName, - topics: ["aws.seatool.debezium.cdc.SEA.dbo.Officers"], + topics: ["aws.seatool.ksql.onemac.agg.State_Plan"], }, ], }), @@ -758,37 +753,20 @@ export class Data extends cdk.NestedStack { resultPath: "$.error", }), ) - .next(checkDataProgressTask) + .next(checkSeaDataProgressTask) .next( - new cdk.aws_stepfunctions.Choice(this, "IsDataReady") + new cdk.aws_stepfunctions.Choice(this, "IsSeaDataReady") .when( cdk.aws_stepfunctions.Condition.booleanEquals("$.ready", true), new cdk.aws_stepfunctions_tasks.LambdaInvoke( this, - "StartIndexingInsights", + "DeleteSeaDataTriggers", { - lambdaFunction: createTriggers, + lambdaFunction: deleteTriggers, outputPath: "$.Payload", payload: cdk.aws_stepfunctions.TaskInput.fromObject({ "Context.$": "$$", - osDomain: `https://${openSearchDomainEndpoint}`, - brokerString, - securityGroup: lambdaSecurityGroup.securityGroupId, - consumerGroupPrefix, - subnets: privateSubnets.map((subnet) => subnet.subnetId), - triggers: [ - { - function: lambdaFunctions.sinkInsights.functionName, - topics: ["aws.seatool.ksql.onemac.agg.State_Plan"], - }, - { - function: lambdaFunctions.sinkLegacyInsights.functionName, - topics: [ - "aws.onemac.migration.cdc", - `${topicNamespace}aws.onemac.migration.cdc`, - ], - }, - ], + functions: [lambdaFunctions["sinkMain"].functionName], }), }, ) @@ -796,16 +774,122 @@ export class Data extends cdk.NestedStack { errors: ["States.ALL"], resultPath: "$.error", }) - .next(notifyState("NotifyOfSuccess", true)) - .next(new cdk.aws_stepfunctions.Succeed(this, "SuccessState")), + .next( + new cdk.aws_stepfunctions_tasks.LambdaInvoke( + this, + "StartIndexingData", + { + lambdaFunction: createTriggers, + outputPath: "$.Payload", + payload: cdk.aws_stepfunctions.TaskInput.fromObject({ + "Context.$": "$$", + osDomain: `https://${openSearchDomainEndpoint}`, + brokerString, + securityGroup: lambdaSecurityGroup.securityGroupId, + consumerGroupPrefix, + subnets: privateSubnets.map((subnet) => subnet.subnetId), + triggers: [ + { + function: lambdaFunctions.sinkMain.functionName, + topics: [ + "aws.onemac.migration.cdc", + `${topicNamespace}aws.onemac.migration.cdc`, + "aws.seatool.debezium.changed_date.SEA.dbo.State_Plan", + ], + }, + { + function: lambdaFunctions.sinkChangelog.functionName, + topics: [ + "aws.onemac.migration.cdc", + `${topicNamespace}aws.onemac.migration.cdc`, + ], + }, + { + function: lambdaFunctions.sinkTypes.functionName, + topics: ["aws.seatool.debezium.cdc.SEA.dbo.SPA_Type"], + batchSize: 10000, + }, + { + function: lambdaFunctions.sinkSubtypes.functionName, + topics: ["aws.seatool.debezium.cdc.SEA.dbo.Type"], + batchSize: 10000, + }, + { + function: lambdaFunctions.sinkCpocs.functionName, + topics: ["aws.seatool.debezium.cdc.SEA.dbo.Officers"], + }, + ], + }), + }, + ).addCatch(notifyOfFailureStep, { + errors: ["States.ALL"], + resultPath: "$.error", + }), + ) + .next(checkDataProgressTask) + .next( + new cdk.aws_stepfunctions.Choice(this, "IsDataReady") + .when( + cdk.aws_stepfunctions.Condition.booleanEquals( + "$.ready", + true, + ), + // here we conditionally slap seatoolbackon + new cdk.aws_stepfunctions_tasks.LambdaInvoke( + this, + "StartConditionallyIndexingSeaData", + { + lambdaFunction: createTriggers, + outputPath: "$.Payload", + payload: cdk.aws_stepfunctions.TaskInput.fromObject({ + "Context.$": "$$", + osDomain: `https://${openSearchDomainEndpoint}`, + brokerString, + securityGroup: lambdaSecurityGroup.securityGroupId, + consumerGroupPrefix, + subnets: privateSubnets.map( + (subnet) => subnet.subnetId, + ), + triggers: [ + { + function: lambdaFunctions.sinkMain.functionName, + topics: [ + "aws.seatool.ksql.onemac.agg.State_Plan", + ], + }, + ], + }), + }, + ) + .addCatch(notifyOfFailureStep, { + errors: ["States.ALL"], + resultPath: "$.error", + }) + .next(notifyState("NotifyOfSuccess", true)) + .next( + new cdk.aws_stepfunctions.Succeed(this, "SuccessState"), + ), + ) + .when( + cdk.aws_stepfunctions.Condition.booleanEquals( + "$.ready", + false, + ), + new cdk.aws_stepfunctions.Wait(this, "WaitForData", { + time: cdk.aws_stepfunctions.WaitTime.duration( + cdk.Duration.seconds(3), + ), + }).next(checkDataProgressTask), + ), + ), ) .when( cdk.aws_stepfunctions.Condition.booleanEquals("$.ready", false), - new cdk.aws_stepfunctions.Wait(this, "WaitForData", { + new cdk.aws_stepfunctions.Wait(this, "WaitForSeaData", { time: cdk.aws_stepfunctions.WaitTime.duration( cdk.Duration.seconds(3), ), - }).next(checkDataProgressTask), + }).next(checkSeaDataProgressTask), ), );