From ddce22b35680eb9796caeaa5276d0c643ae4119f Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 15 Nov 2024 01:14:02 +0530 Subject: [PATCH] feat: maintenance mode enhancements --- flow/cmd/maintenance.go | 35 ++++++++++++++++++++++++++++++++--- flow/main.go | 9 +++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/flow/cmd/maintenance.go b/flow/cmd/maintenance.go index 474a67db3..790ad4aa7 100644 --- a/flow/cmd/maintenance.go +++ b/flow/cmd/maintenance.go @@ -7,6 +7,7 @@ import ( "fmt" "log/slog" "os" + "time" "github.com/aws/smithy-go/ptr" "go.temporal.io/sdk/client" @@ -33,6 +34,7 @@ type MaintenanceCLIParams struct { FlowTlsEnabled bool SkipOnApiVersionMatch bool SkipOnNoMirrors bool + SkipOnNoSnapshotMirrors bool UseMaintenanceTaskQueue bool AssumeSkippedMaintenanceWorkflows bool } @@ -140,7 +142,7 @@ func skipStartMaintenanceIfNeeded(ctx context.Context, args *MaintenanceCLIParam }) } } - if args.SkipOnApiVersionMatch || args.SkipOnNoMirrors { + if args.SkipOnApiVersionMatch || args.SkipOnNoMirrors || args.SkipOnNoSnapshotMirrors { if args.FlowGrpcAddress == "" { return false, errors.New("flow address is required when skipping based on API") } @@ -176,20 +178,47 @@ func skipStartMaintenanceIfNeeded(ctx context.Context, args *MaintenanceCLIParam }) } } - if args.SkipOnNoMirrors { + if args.SkipOnNoMirrors || args.SkipOnNoSnapshotMirrors { slog.Info("Checking if there are any mirrors") + startTime := time.Now() mirrors, err := peerFlowClient.ListMirrors(ctx, &protos.ListMirrorsRequest{}) if err != nil { return false, err } slog.Info("Got mirrors from flow", "mirrors", mirrors.Mirrors) - if len(mirrors.Mirrors) == 0 { + if args.SkipOnNoMirrors && len(mirrors.Mirrors) == 0 { slog.Info("Skipping maintenance workflow due to no mirrors") return true, WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{ Skipped: true, SkippedReason: ptr.String("No mirrors found"), }) } + if args.SkipOnNoSnapshotMirrors { + for _, mirror := range mirrors.Mirrors { + if time.Now().Sub(startTime) > 30*time.Second { + slog.Warn("Mirrors list was fetched more than 30 seconds ago, assuming mirrors list is stale and not skipping maintenance workflow") + return false, nil + } + mirrorInfo, err := peerFlowClient.MirrorStatus(ctx, &protos.MirrorStatusRequest{ + FlowJobName: mirror.Name, + IncludeFlowInfo: false, + }) + if err != nil { + return false, err + } + if mirrorInfo.CurrentFlowState == protos.FlowStatus_STATUS_SNAPSHOT || + mirrorInfo.CurrentFlowState == protos.FlowStatus_STATUS_SETUP { + slog.Info("Found an active snapshot mirror, not skipping maintenance workflow", "mirror", mirror.Name, + "state", mirrorInfo.CurrentFlowState) + return false, nil + } + } + slog.Info("Skipping maintenance workflow due to no snapshot mirrors") + return true, WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{ + Skipped: true, + SkippedReason: ptr.String("No snapshot mirrors found"), + }) + } } } return false, nil diff --git a/flow/main.go b/flow/main.go index 9d499e957..0ee5c4310 100644 --- a/flow/main.go +++ b/flow/main.go @@ -91,6 +91,13 @@ func main() { Sources: cli.EnvVars("MAINTENANCE_SKIP_ON_NO_MIRRORS"), } + maintenanceSkipOnNoSnapshotMirrorsFlag := &cli.BoolFlag{ + Name: "skip-on-no-snapshot-mirrors", + Value: false, + Usage: "Skip maintenance flow if there are no snapshot mirrors", + Sources: cli.EnvVars("MAINTENANCE_SKIP_ON_NO_SNAPSHOT_MIRRORS"), + } + flowGrpcAddressFlag := &cli.StringFlag{ Name: "flow-grpc-address", Value: "", @@ -211,6 +218,7 @@ func main() { temporalNamespaceFlag, maintenanceModeWorkflowFlag, maintenanceSkipOnApiVersionMatchFlag, + maintenanceSkipOnNoSnapshotMirrorsFlag, maintenanceSkipOnNoMirrorsFlag, flowGrpcAddressFlag, flowTlsEnabledFlag, @@ -227,6 +235,7 @@ func main() { Mode: clicmd.String(maintenanceModeWorkflowFlag.Name), SkipOnApiVersionMatch: clicmd.Bool(maintenanceSkipOnApiVersionMatchFlag.Name), SkipOnNoMirrors: clicmd.Bool(maintenanceSkipOnNoMirrorsFlag.Name), + SkipOnNoSnapshotMirrors: clicmd.Bool(maintenanceSkipOnNoSnapshotMirrorsFlag.Name), FlowGrpcAddress: clicmd.String(flowGrpcAddressFlag.Name), FlowTlsEnabled: clicmd.Bool(flowTlsEnabledFlag.Name), UseMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name),