Skip to content

Commit

Permalink
make it work...
Browse files Browse the repository at this point in the history
  • Loading branch information
13bfrancis committed Aug 6, 2024
1 parent f9ac131 commit 9794a99
Showing 1 changed file with 139 additions and 55 deletions.
194 changes: 139 additions & 55 deletions lib/stacks/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,28 @@ export class Data extends cdk.NestedStack {
},
).next(failureState);

const checkSeaDataProgressTask =
new cdk.aws_stepfunctions_tasks.LambdaInvoke(
this,
"CheckSeaDataProgress",
{
lambdaFunction: checkConsumerLag,
outputPath: "$.Payload",
payload: cdk.aws_stepfunctions.TaskInput.fromObject({
brokerString,
triggers: [
{
function: lambdaFunctions.sinkMain.functionName,
topics: ["aws.seatool.ksql.onemac.agg.State_Plan"],
},
],
}),
},
).addCatch(notifyOfFailureStep, {
errors: ["States.ALL"],
resultPath: "$.error",
});

const checkDataProgressTask = new cdk.aws_stepfunctions_tasks.LambdaInvoke(
this,
"CheckDataProgress",
Expand All @@ -624,7 +646,6 @@ export class Data extends cdk.NestedStack {
{
function: lambdaFunctions.sinkMain.functionName,
topics: [
"aws.seatool.ksql.onemac.agg.State_Plan",
"aws.onemac.migration.cdc",
`${topicNamespace}aws.onemac.migration.cdc`,
"aws.seatool.debezium.changed_date.SEA.dbo.State_Plan",
Expand Down Expand Up @@ -708,7 +729,7 @@ export class Data extends cdk.NestedStack {
.next(
new cdk.aws_stepfunctions_tasks.LambdaInvoke(
this,
"StartIndexingData",
"StartIndexingSeaData",
{
lambdaFunction: createTriggers,
outputPath: "$.Payload",
Expand All @@ -722,33 +743,7 @@ export class Data extends cdk.NestedStack {
triggers: [
{
function: lambdaFunctions.sinkMain.functionName,
topics: [
"aws.seatool.ksql.onemac.agg.State_Plan",
"aws.onemac.migration.cdc",
`${topicNamespace}aws.onemac.migration.cdc`,
"aws.seatool.debezium.changed_date.SEA.dbo.State_Plan",
],
},
{
function: lambdaFunctions.sinkChangelog.functionName,
topics: [
"aws.onemac.migration.cdc",
`${topicNamespace}aws.onemac.migration.cdc`,
],
},
{
function: lambdaFunctions.sinkTypes.functionName,
topics: ["aws.seatool.debezium.cdc.SEA.dbo.SPA_Type"],
batchSize: 10000,
},
{
function: lambdaFunctions.sinkSubtypes.functionName,
topics: ["aws.seatool.debezium.cdc.SEA.dbo.Type"],
batchSize: 10000,
},
{
function: lambdaFunctions.sinkCpocs.functionName,
topics: ["aws.seatool.debezium.cdc.SEA.dbo.Officers"],
topics: ["aws.seatool.ksql.onemac.agg.State_Plan"],
},
],
}),
Expand All @@ -758,54 +753,143 @@ export class Data extends cdk.NestedStack {
resultPath: "$.error",
}),
)
.next(checkDataProgressTask)
.next(checkSeaDataProgressTask)
.next(
new cdk.aws_stepfunctions.Choice(this, "IsDataReady")
new cdk.aws_stepfunctions.Choice(this, "IsSeaDataReady")
.when(
cdk.aws_stepfunctions.Condition.booleanEquals("$.ready", true),
new cdk.aws_stepfunctions_tasks.LambdaInvoke(
this,
"StartIndexingInsights",
"DeleteSeaDataTriggers",
{
lambdaFunction: createTriggers,
lambdaFunction: deleteTriggers,
outputPath: "$.Payload",
payload: cdk.aws_stepfunctions.TaskInput.fromObject({
"Context.$": "$$",
osDomain: `https://${openSearchDomainEndpoint}`,
brokerString,
securityGroup: lambdaSecurityGroup.securityGroupId,
consumerGroupPrefix,
subnets: privateSubnets.map((subnet) => subnet.subnetId),
triggers: [
{
function: lambdaFunctions.sinkInsights.functionName,
topics: ["aws.seatool.ksql.onemac.agg.State_Plan"],
},
{
function: lambdaFunctions.sinkLegacyInsights.functionName,
topics: [
"aws.onemac.migration.cdc",
`${topicNamespace}aws.onemac.migration.cdc`,
],
},
],
functions: [lambdaFunctions["sinkMain"].functionName],
}),
},
)
.addCatch(notifyOfFailureStep, {
errors: ["States.ALL"],
resultPath: "$.error",
})
.next(notifyState("NotifyOfSuccess", true))
.next(new cdk.aws_stepfunctions.Succeed(this, "SuccessState")),
.next(
new cdk.aws_stepfunctions_tasks.LambdaInvoke(
this,
"StartIndexingData",
{
lambdaFunction: createTriggers,
outputPath: "$.Payload",
payload: cdk.aws_stepfunctions.TaskInput.fromObject({
"Context.$": "$$",
osDomain: `https://${openSearchDomainEndpoint}`,
brokerString,
securityGroup: lambdaSecurityGroup.securityGroupId,
consumerGroupPrefix,
subnets: privateSubnets.map((subnet) => subnet.subnetId),
triggers: [
{
function: lambdaFunctions.sinkMain.functionName,
topics: [
"aws.onemac.migration.cdc",
`${topicNamespace}aws.onemac.migration.cdc`,
"aws.seatool.debezium.changed_date.SEA.dbo.State_Plan",
],
},
{
function: lambdaFunctions.sinkChangelog.functionName,
topics: [
"aws.onemac.migration.cdc",
`${topicNamespace}aws.onemac.migration.cdc`,
],
},
{
function: lambdaFunctions.sinkTypes.functionName,
topics: ["aws.seatool.debezium.cdc.SEA.dbo.SPA_Type"],
batchSize: 10000,
},
{
function: lambdaFunctions.sinkSubtypes.functionName,
topics: ["aws.seatool.debezium.cdc.SEA.dbo.Type"],
batchSize: 10000,
},
{
function: lambdaFunctions.sinkCpocs.functionName,
topics: ["aws.seatool.debezium.cdc.SEA.dbo.Officers"],
},
],
}),
},
).addCatch(notifyOfFailureStep, {
errors: ["States.ALL"],
resultPath: "$.error",
}),
)
.next(checkDataProgressTask)
.next(
new cdk.aws_stepfunctions.Choice(this, "IsDataReady")
.when(
cdk.aws_stepfunctions.Condition.booleanEquals(
"$.ready",
true,
),
// here we conditionally slap seatoolbackon
new cdk.aws_stepfunctions_tasks.LambdaInvoke(
this,
"StartConditionallyIndexingSeaData",
{
lambdaFunction: createTriggers,
outputPath: "$.Payload",
payload: cdk.aws_stepfunctions.TaskInput.fromObject({
"Context.$": "$$",
osDomain: `https://${openSearchDomainEndpoint}`,
brokerString,
securityGroup: lambdaSecurityGroup.securityGroupId,
consumerGroupPrefix,
subnets: privateSubnets.map(
(subnet) => subnet.subnetId,
),
triggers: [
{
function: lambdaFunctions.sinkMain.functionName,
topics: [
"aws.seatool.ksql.onemac.agg.State_Plan",
],
},
],
}),
},
)
.addCatch(notifyOfFailureStep, {
errors: ["States.ALL"],
resultPath: "$.error",
})
.next(notifyState("NotifyOfSuccess", true))
.next(
new cdk.aws_stepfunctions.Succeed(this, "SuccessState"),
),
)
.when(
cdk.aws_stepfunctions.Condition.booleanEquals(
"$.ready",
false,
),
new cdk.aws_stepfunctions.Wait(this, "WaitForData", {
time: cdk.aws_stepfunctions.WaitTime.duration(
cdk.Duration.seconds(3),
),
}).next(checkDataProgressTask),
),
),
)
.when(
cdk.aws_stepfunctions.Condition.booleanEquals("$.ready", false),
new cdk.aws_stepfunctions.Wait(this, "WaitForData", {
new cdk.aws_stepfunctions.Wait(this, "WaitForSeaData", {
time: cdk.aws_stepfunctions.WaitTime.duration(
cdk.Duration.seconds(3),
),
}).next(checkDataProgressTask),
}).next(checkSeaDataProgressTask),
),
);

Expand Down

0 comments on commit 9794a99

Please sign in to comment.